Coverage for requests_tracker/stack_trace.py: 92%

65 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-18 22:19 +0000

1import inspect 

2import linecache 

3from types import FrameType 

4from typing import Any, Dict, Generator, List, Optional, Tuple 

5 

6from asgiref.local import Local 

7 

8from requests_tracker import settings 

9 

10_local_data = Local() 

11 

12# each tuple is: filename, line_no, func_name, source_line, frame_locals 

13StackTrace = List[Tuple[str, int, str, str, Optional[Dict[str, Any]]]] 

14 

15 

16def _stack_frames(*, skip: int = 0) -> Generator[FrameType, None, None]: 

17 skip += 1 # Skip the frame for this generator. 

18 frame = inspect.currentframe() 

19 while frame is not None: 

20 if skip > 0: 

21 skip -= 1 

22 else: 

23 yield frame 

24 frame = frame.f_back 

25 

26 

27def _is_excluded_frame(frame: FrameType, excluded_modules: Optional[List[str]]) -> bool: 

28 if not excluded_modules: 

29 return False 

30 frame_module = frame.f_globals.get("__name__") 

31 return ( 

32 any( 

33 frame_module == excluded_module 

34 or frame_module.startswith(f"{excluded_module}.") 

35 for excluded_module in excluded_modules 

36 ) 

37 if isinstance(frame_module, str) 

38 else False 

39 ) 

40 

41 

42def get_stack_trace(*, skip: int = 0) -> StackTrace: 

43 """ 

44 Return a processed stack trace for the current call stack. 

45 If the ``ENABLE_STACKTRACES`` setting is False, return an empty :class:`list`. 

46 Otherwise return a :class:`list` of processed stack frame tuples (file name, line 

47 number, function name, source line, frame locals) for the current call stack. The 

48 first entry in the list will be for the bottom of the stack and the last entry will 

49 be for the top of the stack. 

50 ``skip`` is an :class:`int` indicating the number of stack frames above the frame 

51 for this function to omit from the stack trace. The default value of ``0`` means 

52 that the entry for the caller of this function will be the last entry in the 

53 returned stack trace. 

54 """ 

55 config = settings.get_config() 

56 if not config["ENABLE_STACKTRACES"]: 

57 return [] 

58 skip += 1 # Skip the frame for this function. 

59 stack_trace_recorder = getattr(_local_data, "stack_trace_recorder", None) 

60 if stack_trace_recorder is None: 

61 stack_trace_recorder = _StackTraceRecorder() 

62 _local_data.stack_trace_recorder = stack_trace_recorder 

63 return stack_trace_recorder.get_stack_trace( 

64 excluded_modules=config["HIDE_IN_STACKTRACES"], 

65 include_locals=config["ENABLE_STACKTRACES_LOCALS"], 

66 skip=skip, 

67 ) 

68 

69 

70class _StackTraceRecorder: 

71 def __init__(self) -> None: 

72 self.filename_cache: Dict[str, Tuple[str, bool]] = {} 

73 

74 def get_source_file(self, frame: FrameType) -> Tuple[str, bool]: 

75 frame_filename = frame.f_code.co_filename 

76 

77 value = self.filename_cache.get(frame_filename) 

78 if value is None: 

79 filename = inspect.getsourcefile(frame) 

80 if filename is None: 

81 is_source = False 

82 filename = frame_filename 

83 else: 

84 is_source = True 

85 # Ensure linecache validity the first time this recorder 

86 # encounters the filename in this frame. 

87 linecache.checkcache(filename) 

88 value = (filename, is_source) 

89 self.filename_cache[frame_filename] = value 

90 

91 return value 

92 

93 def get_stack_trace( 

94 self, 

95 *, 

96 excluded_modules: Optional[List[str]] = None, 

97 include_locals: bool = False, 

98 skip: int = 0, 

99 ) -> StackTrace: 

100 trace = [] 

101 skip += 1 # Skip the frame for this method. 

102 for frame in _stack_frames(skip=skip): 

103 if _is_excluded_frame(frame, excluded_modules): 

104 continue 

105 

106 filename, is_source = self.get_source_file(frame) 

107 

108 line_no = frame.f_lineno 

109 func_name = frame.f_code.co_name 

110 

111 if is_source: 

112 module = inspect.getmodule(frame, filename) 

113 module_globals = module.__dict__ if module is not None else None 

114 source_line = linecache.getline( 

115 filename, line_no, module_globals 

116 ).strip() 

117 else: 

118 source_line = "" 

119 

120 frame_locals = frame.f_locals if include_locals else None 

121 

122 trace.append((filename, line_no, func_name, source_line, frame_locals)) 

123 trace.reverse() 

124 return trace