Coverage for faststream / _internal / cli / supervisors / multiprocess.py: 53%
28 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import signal
2from typing import TYPE_CHECKING
4from faststream._internal.cli.supervisors.basereload import BaseReload
5from faststream._internal.logger import logger
7if TYPE_CHECKING:
8 from multiprocessing.context import SpawnProcess
10 from faststream._internal.cli.dto import RunArgs, RunFunction
13class Multiprocess(BaseReload):
14 """A class to represent a multiprocess."""
16 def __init__(
17 self,
18 target: "RunFunction",
19 args: "RunArgs",
20 workers: int,
21 reload_delay: float = 0.5,
22 ) -> None:
23 super().__init__(target, args, reload_delay)
25 self.workers = workers
26 self.processes: list[SpawnProcess] = []
28 def startup(self) -> None:
29 logger.info("Started parent process [%s]", self.pid)
31 for worker_id in range(self.workers):
32 process = self.start_process(worker_id=worker_id)
33 logger.info("Started child process %s [%s]", worker_id, process.pid)
34 self.processes.append(process)
36 def shutdown(self) -> None:
37 for worker_id, process in enumerate(self.processes):
38 process.terminate()
39 logger.info("Stopping child process %s [%s]", worker_id, process.pid)
40 process.join()
42 logger.info("Stopping parent process [%s]", self.pid)
44 def restart(self) -> None:
45 active_processes = []
47 for worker_id, process in enumerate(self.processes):
48 if process.is_alive():
49 active_processes.append(process)
50 continue
52 log_msg = "Worker %s (pid:%s) exited with code %s."
53 if process.exitcode and abs(process.exitcode) == signal.SIGKILL:
54 log_msg += " Perhaps out of memory?"
55 logger.error(log_msg, worker_id, process.pid, process.exitcode)
57 process.kill()
59 new_process = self.start_process(worker_id=worker_id)
60 logger.info("Started child process [%s]", new_process.pid)
61 active_processes.append(new_process)
63 self.processes = active_processes
65 def should_restart(self) -> bool:
66 return not all(p.is_alive() for p in self.processes)