Coverage for requests_tracker/sql/sql_collector.py: 100%
79 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-18 22:19 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-18 22:19 +0000
1import re
2import uuid
3from collections import defaultdict
4from typing import Dict, List, Optional, Tuple
6from requests_tracker.base_collector import Collector
7from requests_tracker.settings import get_config
8from requests_tracker.sql.dataclasses import PerDatabaseInfo, SQLQueryInfo
10SimilarQueryGroupsType = Dict[Tuple[str, str], List[SQLQueryInfo]]
11DuplicateQueryGroupsType = Dict[Tuple[str, Tuple[str, str]], List[SQLQueryInfo]]
14class SQLCollector(Collector):
15 unfiltered_queries: List[SQLQueryInfo]
16 databases: Dict[str, PerDatabaseInfo]
17 sql_time: float
18 transaction_ids: Dict[str, Optional[str]]
20 def __init__(self) -> None:
21 self.databases = {}
22 self.sql_time = 0
23 self.unfiltered_queries = []
24 # synthetic transaction IDs, keyed by DB alias
25 self.transaction_ids = {}
27 @property
28 def queries(self) -> List[SQLQueryInfo]:
29 config = get_config()
30 if ignore_patterns := config.get("IGNORE_SQL_PATTERNS"):
31 return [
32 query
33 for query in self.unfiltered_queries
34 if not any(
35 bool(re.match(pattern, query.raw_sql))
36 for pattern in ignore_patterns
37 )
38 ]
40 return self.unfiltered_queries
42 @property
43 def num_queries(self) -> int:
44 return len(self.queries)
46 def record(self, sql_query_info: SQLQueryInfo) -> None:
47 self.unfiltered_queries.append(sql_query_info)
49 def new_transaction_id(self, alias: str) -> str:
50 """
51 Generate and return a new synthetic transaction ID for the specified DB alias.
52 """
53 trans_id = uuid.uuid4().hex
54 self.transaction_ids[alias] = trans_id
55 return trans_id
57 def current_transaction_id(self, alias: str) -> str:
58 """
59 Return the current synthetic transaction ID for the specified DB alias.
60 """
61 trans_id = self.transaction_ids.get(alias)
62 # Sometimes it is not possible to detect the beginning of the first transaction,
63 # so current_transaction_id() will be called before new_transaction_id(). In
64 # that case there won't yet be a transaction ID. so it is necessary to generate
65 # one using new_transaction_id().
66 if trans_id is None:
67 trans_id = self.new_transaction_id(alias)
68 return trans_id
70 def generate_statistics(self) -> None:
71 similar_query_groups: SimilarQueryGroupsType = defaultdict(list)
72 duplicate_query_groups: DuplicateQueryGroupsType = defaultdict(list)
74 self.databases = {}
75 self.sql_time = 0
77 for query in self.queries:
78 alias = query.alias
79 if alias not in self.databases:
80 self.databases[alias] = PerDatabaseInfo(
81 time_spent=query.duration,
82 num_queries=1,
83 )
84 else:
85 self.databases[alias].time_spent += query.duration
86 self.databases[alias].num_queries += 1
87 self.sql_time += query.duration
89 similar_query_groups[(query.alias, query.sql)].append(query)
90 duplicate_query_groups[
91 (
92 query.alias,
93 (
94 query.raw_sql,
95 repr(tuple(query.raw_params) if query.raw_params else ()),
96 ),
97 )
98 ].append(query)
100 similar_counts: Dict[str, int] = defaultdict(int)
101 for (alias, _), query_group in similar_query_groups.items():
102 count = len(query_group)
104 if count > 1:
105 for query in query_group:
106 query.similar_count = count
107 similar_counts[alias] += count
109 duplicate_counts: Dict[str, int] = defaultdict(int)
110 for (alias, _), query_group in duplicate_query_groups.items():
111 count = len(query_group)
113 if count > 1:
114 for query in query_group:
115 query.duplicate_count = count
116 duplicate_counts[alias] += count
118 for alias in self.databases:
119 self.databases[alias].similar_count = similar_counts[alias]
120 self.databases[alias].duplicate_count = duplicate_counts[alias]
122 @property
123 def total_similar_queries(self) -> int:
124 return sum(database.similar_count for database in self.databases.values())
126 @property
127 def total_duplicate_queries(self) -> int:
128 return sum(database.duplicate_count for database in self.databases.values())
130 def matches_search_filter(self, search: str) -> bool:
131 search = search.lower()
132 return next(
133 (True for query in self.queries if search in query.raw_sql.lower()),
134 False,
135 )