Coverage for faststream / _internal / cli / supervisors / multiprocess.py: 53%

28 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import signal 

2from typing import TYPE_CHECKING 

3 

4from faststream._internal.cli.supervisors.basereload import BaseReload 

5from faststream._internal.logger import logger 

6 

7if TYPE_CHECKING: 

8 from multiprocessing.context import SpawnProcess 

9 

10 from faststream._internal.cli.dto import RunArgs, RunFunction 

11 

12 

13class Multiprocess(BaseReload): 

14 """A class to represent a multiprocess.""" 

15 

16 def __init__( 

17 self, 

18 target: "RunFunction", 

19 args: "RunArgs", 

20 workers: int, 

21 reload_delay: float = 0.5, 

22 ) -> None: 

23 super().__init__(target, args, reload_delay) 

24 

25 self.workers = workers 

26 self.processes: list[SpawnProcess] = [] 

27 

28 def startup(self) -> None: 

29 logger.info("Started parent process [%s]", self.pid) 

30 

31 for worker_id in range(self.workers): 

32 process = self.start_process(worker_id=worker_id) 

33 logger.info("Started child process %s [%s]", worker_id, process.pid) 

34 self.processes.append(process) 

35 

36 def shutdown(self) -> None: 

37 for worker_id, process in enumerate(self.processes): 

38 process.terminate() 

39 logger.info("Stopping child process %s [%s]", worker_id, process.pid) 

40 process.join() 

41 

42 logger.info("Stopping parent process [%s]", self.pid) 

43 

44 def restart(self) -> None: 

45 active_processes = [] 

46 

47 for worker_id, process in enumerate(self.processes): 

48 if process.is_alive(): 

49 active_processes.append(process) 

50 continue 

51 

52 log_msg = "Worker %s (pid:%s) exited with code %s." 

53 if process.exitcode and abs(process.exitcode) == signal.SIGKILL: 

54 log_msg += " Perhaps out of memory?" 

55 logger.error(log_msg, worker_id, process.pid, process.exitcode) 

56 

57 process.kill() 

58 

59 new_process = self.start_process(worker_id=worker_id) 

60 logger.info("Started child process [%s]", new_process.pid) 

61 active_processes.append(new_process) 

62 

63 self.processes = active_processes 

64 

65 def should_restart(self) -> bool: 

66 return not all(p.is_alive() for p in self.processes)