Source code for balancer.core.common.worker

import multiprocessing
import select
import signal
import socket
import sys
import time

from balancer.conf.constants import (
    BALANCE_DEFAULT_GRACEFUL_SHUTDOWN_TIME,
    BALANCER_DEFAULT_BUFFER_SIZE,
)
from balancer.core.logger import logger


[docs]class Worker(multiprocessing.Process): """ A class which handles the worker-side of processing a request (communicating between the back-end worker and the requesting client) """ def __init__( self, host: str, port: int, client_socket: socket.socket, client_host: str, buffer_size: int = BALANCER_DEFAULT_BUFFER_SIZE, ) -> None: super(Worker, self).__init__() self.client_socket = client_socket self.client_host = client_host self.host = host self.port = port self.worker_socket = None self.buffer_size = buffer_size self.failure_connection = multiprocessing.Value("i", 0)
[docs] def close_connections(self): """ It closes the client and worker sockets, and then sets the default signal handler for SIGTERM """ logger.info( f"Closing connections from '{self.client_socket.getsockname()}' to '{self.worker_socket.getsockname()}' ..." ) try: self.worker_socket.shutdown(socket.SHUT_RDWR) except Exception as exc: logger.exception(exc) try: self.worker_socket.close() except Exception as exc: logger.exception(exc) try: self.client_socket.shutdown(socket.SHUT_RDWR) except Exception as exc: logger.exception(exc) try: self.client_socket.close() except Exception as exc: logger.exception(exc) logger.info("Connections closed successfully") signal.signal(signal.SIGTERM, signal.SIG_DFL)
[docs] def close_connections_and_shutdown(self, *args): """ It closes all connections and shuts down the program """ self.close_connections() sys.exit(0)
[docs] def run(self): """ It reads data from the client socket and sends it to the worker socket, and vice versa :return: The data that is being returned is the data that is being sent to the client. """ self.worker_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: self.worker_socket.connect((self.host, self.port)) logger.info(f"Connected to {self.host}:{self.port}") except Exception: logger.exception(f"Couldn't connect to the worker {self.host}:{self.port}") self.failure_connection.value = 1 time.sleep( BALANCE_DEFAULT_GRACEFUL_SHUTDOWN_TIME ) # Give a few seconds for up before it will be removed by the joining thread return signal.signal(signal.SIGTERM, self.close_connections_and_shutdown) try: data_snd = b"" # Send to client data data_rcv = b"" # Data received from client while True: waiting_for_write = [] if data_snd: waiting_for_write.append(self.client_socket) if data_rcv: waiting_for_write.append(self.worker_socket) try: (read, write, err) = select.select( [self.client_socket, self.worker_socket], waiting_for_write, [self.client_socket, self.worker_socket], 0.3, ) except KeyboardInterrupt: break if err: break if self.client_socket in read: next_data = self.client_socket.recv(self.buffer_size) if not next_data: break data_rcv += next_data if self.worker_socket in read: next_data = self.worker_socket.recv(self.buffer_size) if not next_data: break data_snd += next_data if self.worker_socket in write: while data_rcv: self.worker_socket.send(data_rcv[: self.buffer_size]) data_rcv = data_rcv[self.buffer_size:] # fmt: skip if self.client_socket in write: while data_snd: self.client_socket.send(data_snd[: self.buffer_size]) data_snd = data_snd[self.buffer_size:] # fmt: skip logger.debug(f"Data to send: {data_snd}") logger.debug(f"Data to receive: {data_rcv}") except Exception as exc: logger.critical( "Got unexpected behaviour on: %s:%d. Closing connections and shutting down." % (self.host, self.port) ) logger.exception(exc) self.close_connections_and_shutdown()
def __str__(self): return f"<{__class__.__name__} on={self.host}:{self.port}, from={f'%s:%d' % self.client_socket.getsockname()}>"