Coverage for requests_tracker/sql/sql_collector.py: 100%

79 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-18 22:19 +0000

1import re 

2import uuid 

3from collections import defaultdict 

4from typing import Dict, List, Optional, Tuple 

5 

6from requests_tracker.base_collector import Collector 

7from requests_tracker.settings import get_config 

8from requests_tracker.sql.dataclasses import PerDatabaseInfo, SQLQueryInfo 

9 

10SimilarQueryGroupsType = Dict[Tuple[str, str], List[SQLQueryInfo]] 

11DuplicateQueryGroupsType = Dict[Tuple[str, Tuple[str, str]], List[SQLQueryInfo]] 

12 

13 

14class SQLCollector(Collector): 

15 unfiltered_queries: List[SQLQueryInfo] 

16 databases: Dict[str, PerDatabaseInfo] 

17 sql_time: float 

18 transaction_ids: Dict[str, Optional[str]] 

19 

20 def __init__(self) -> None: 

21 self.databases = {} 

22 self.sql_time = 0 

23 self.unfiltered_queries = [] 

24 # synthetic transaction IDs, keyed by DB alias 

25 self.transaction_ids = {} 

26 

27 @property 

28 def queries(self) -> List[SQLQueryInfo]: 

29 config = get_config() 

30 if ignore_patterns := config.get("IGNORE_SQL_PATTERNS"): 

31 return [ 

32 query 

33 for query in self.unfiltered_queries 

34 if not any( 

35 bool(re.match(pattern, query.raw_sql)) 

36 for pattern in ignore_patterns 

37 ) 

38 ] 

39 

40 return self.unfiltered_queries 

41 

42 @property 

43 def num_queries(self) -> int: 

44 return len(self.queries) 

45 

46 def record(self, sql_query_info: SQLQueryInfo) -> None: 

47 self.unfiltered_queries.append(sql_query_info) 

48 

49 def new_transaction_id(self, alias: str) -> str: 

50 """ 

51 Generate and return a new synthetic transaction ID for the specified DB alias. 

52 """ 

53 trans_id = uuid.uuid4().hex 

54 self.transaction_ids[alias] = trans_id 

55 return trans_id 

56 

57 def current_transaction_id(self, alias: str) -> str: 

58 """ 

59 Return the current synthetic transaction ID for the specified DB alias. 

60 """ 

61 trans_id = self.transaction_ids.get(alias) 

62 # Sometimes it is not possible to detect the beginning of the first transaction, 

63 # so current_transaction_id() will be called before new_transaction_id(). In 

64 # that case there won't yet be a transaction ID. so it is necessary to generate 

65 # one using new_transaction_id(). 

66 if trans_id is None: 

67 trans_id = self.new_transaction_id(alias) 

68 return trans_id 

69 

70 def generate_statistics(self) -> None: 

71 similar_query_groups: SimilarQueryGroupsType = defaultdict(list) 

72 duplicate_query_groups: DuplicateQueryGroupsType = defaultdict(list) 

73 

74 self.databases = {} 

75 self.sql_time = 0 

76 

77 for query in self.queries: 

78 alias = query.alias 

79 if alias not in self.databases: 

80 self.databases[alias] = PerDatabaseInfo( 

81 time_spent=query.duration, 

82 num_queries=1, 

83 ) 

84 else: 

85 self.databases[alias].time_spent += query.duration 

86 self.databases[alias].num_queries += 1 

87 self.sql_time += query.duration 

88 

89 similar_query_groups[(query.alias, query.sql)].append(query) 

90 duplicate_query_groups[ 

91 ( 

92 query.alias, 

93 ( 

94 query.raw_sql, 

95 repr(tuple(query.raw_params) if query.raw_params else ()), 

96 ), 

97 ) 

98 ].append(query) 

99 

100 similar_counts: Dict[str, int] = defaultdict(int) 

101 for (alias, _), query_group in similar_query_groups.items(): 

102 count = len(query_group) 

103 

104 if count > 1: 

105 for query in query_group: 

106 query.similar_count = count 

107 similar_counts[alias] += count 

108 

109 duplicate_counts: Dict[str, int] = defaultdict(int) 

110 for (alias, _), query_group in duplicate_query_groups.items(): 

111 count = len(query_group) 

112 

113 if count > 1: 

114 for query in query_group: 

115 query.duplicate_count = count 

116 duplicate_counts[alias] += count 

117 

118 for alias in self.databases: 

119 self.databases[alias].similar_count = similar_counts[alias] 

120 self.databases[alias].duplicate_count = duplicate_counts[alias] 

121 

122 @property 

123 def total_similar_queries(self) -> int: 

124 return sum(database.similar_count for database in self.databases.values()) 

125 

126 @property 

127 def total_duplicate_queries(self) -> int: 

128 return sum(database.duplicate_count for database in self.databases.values()) 

129 

130 def matches_search_filter(self, search: str) -> bool: 

131 search = search.lower() 

132 return next( 

133 (True for query in self.queries if search in query.raw_sql.lower()), 

134 False, 

135 )