Coverage for faststream / _internal / cli / supervisors / basereload.py: 98%

36 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import os 

2import threading 

3from multiprocessing.context import SpawnProcess 

4from typing import TYPE_CHECKING 

5 

6from faststream._internal.cli.supervisors.utils import get_subprocess, set_exit 

7from faststream._internal.logger import logger 

8 

9if TYPE_CHECKING: 

10 from faststream._internal.cli.dto import RunArgs, RunFunction 

11 

12 

13class BaseReload: 

14 """A base class for implementing a reloader process.""" 

15 

16 _process: SpawnProcess 

17 

18 reload_delay: float | None 

19 should_exit: threading.Event 

20 pid: int 

21 reloader_name: str = "" 

22 

23 def __init__( 

24 self, 

25 target: "RunFunction", 

26 args: "RunArgs", 

27 reload_delay: float | None = 0.5, 

28 ) -> None: 

29 self._target = target 

30 self._args = args 

31 

32 self.should_exit = threading.Event() 

33 self.pid = os.getpid() 

34 self.reload_delay = reload_delay 

35 

36 set_exit(lambda *_: self.should_exit.set(), sync=True) 

37 

38 def run(self) -> None: 

39 self.startup() 

40 while not self.should_exit.wait(self.reload_delay): 

41 if self.should_restart(): # pragma: no branch 

42 self.restart() 

43 self.shutdown() 

44 

45 def startup(self) -> None: 

46 logger.info( 

47 "Started reloader process [%s] using %s", 

48 self.pid, 

49 self.reloader_name, 

50 ) 

51 self._process = self.start_process() 

52 

53 def restart(self) -> None: 

54 self._stop_process() 

55 logger.info("Process successfully reloaded") 

56 self._process = self.start_process() 

57 

58 def shutdown(self) -> None: 

59 self._stop_process() 

60 logger.info("Stopping reloader process [%s]", self.pid) 

61 

62 def _stop_process(self) -> None: 

63 self._process.terminate() 

64 self._process.join() 

65 

66 def start_process(self, worker_id: int | None = None) -> SpawnProcess: 

67 self._args.extra_options["worker_id"] = worker_id 

68 process = get_subprocess(target=self._target, args=(self._args,)) 

69 process.start() 

70 return process 

71 

72 def should_restart(self) -> bool: 

73 msg = "Reload strategies should override should_restart()" 

74 raise NotImplementedError(msg)