Coverage for faststream / _internal / cli / supervisors / basereload.py: 98%
36 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import os
2import threading
3from multiprocessing.context import SpawnProcess
4from typing import TYPE_CHECKING
6from faststream._internal.cli.supervisors.utils import get_subprocess, set_exit
7from faststream._internal.logger import logger
9if TYPE_CHECKING:
10 from faststream._internal.cli.dto import RunArgs, RunFunction
13class BaseReload:
14 """A base class for implementing a reloader process."""
16 _process: SpawnProcess
18 reload_delay: float | None
19 should_exit: threading.Event
20 pid: int
21 reloader_name: str = ""
23 def __init__(
24 self,
25 target: "RunFunction",
26 args: "RunArgs",
27 reload_delay: float | None = 0.5,
28 ) -> None:
29 self._target = target
30 self._args = args
32 self.should_exit = threading.Event()
33 self.pid = os.getpid()
34 self.reload_delay = reload_delay
36 set_exit(lambda *_: self.should_exit.set(), sync=True)
38 def run(self) -> None:
39 self.startup()
40 while not self.should_exit.wait(self.reload_delay):
41 if self.should_restart(): # pragma: no branch
42 self.restart()
43 self.shutdown()
45 def startup(self) -> None:
46 logger.info(
47 "Started reloader process [%s] using %s",
48 self.pid,
49 self.reloader_name,
50 )
51 self._process = self.start_process()
53 def restart(self) -> None:
54 self._stop_process()
55 logger.info("Process successfully reloaded")
56 self._process = self.start_process()
58 def shutdown(self) -> None:
59 self._stop_process()
60 logger.info("Stopping reloader process [%s]", self.pid)
62 def _stop_process(self) -> None:
63 self._process.terminate()
64 self._process.join()
66 def start_process(self, worker_id: int | None = None) -> SpawnProcess:
67 self._args.extra_options["worker_id"] = worker_id
68 process = get_subprocess(target=self._target, args=(self._args,))
69 process.start()
70 return process
72 def should_restart(self) -> bool:
73 msg = "Reload strategies should override should_restart()"
74 raise NotImplementedError(msg)