Coverage for faststream / confluent / helpers / config.py: 96%
107 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from collections.abc import Callable, Iterable
2from enum import Enum
3from typing import TYPE_CHECKING, Any, Literal, Optional
5from typing_extensions import TypedDict
7from faststream.__about__ import SERVICE_NAME
8from faststream._internal.constants import EMPTY
9from faststream.confluent.security import parse_security
11if TYPE_CHECKING:
12 from faststream.security import BaseSecurity
15class BuiltinFeatures(str, Enum):
16 gzip = "gzip"
17 snappy = "snappy"
18 ssl = "ssl"
19 sasl = "sasl"
20 regex = "regex"
21 lz4 = "lz4"
22 sasl_gssapi = "sasl_gssapi"
23 sasl_plain = "sasl_plain"
24 sasl_scram = "sasl_scram"
25 plugins = "plugins"
26 zstd = "zstd"
27 sasl_oauthbearer = "sasl_oauthbearer"
28 http = "http"
29 oidc = "oidc"
32class Debug(str, Enum):
33 generic = "generic"
34 broker = "broker"
35 topic = "topic"
36 metadata = "metadata"
37 feature = "feature"
38 queue = "queue"
39 msg = "msg"
40 protocol = "protocol"
41 cgrp = "cgrp"
42 security = "security"
43 fetch = "fetch"
44 interceptor = "interceptor"
45 plugin = "plugin"
46 consumer = "consumer"
47 admin = "admin"
48 eos = "eos"
49 mock = "mock"
50 assignor = "assignor"
51 conf = "conf"
52 all = "all"
55class BrokerAddressFamily(str, Enum):
56 any = "any"
57 v4 = "v4"
58 v6 = "v6"
61class SecurityProtocol(str, Enum):
62 plaintext = "plaintext"
63 ssl = "ssl"
64 sasl_plaintext = "sasl_plaintext"
65 sasl_ssl = "sasl_ssl"
68class SASLOAUTHBearerMethod(str, Enum):
69 default = "default"
70 oidc = "oidc"
73class GroupProtocol(str, Enum):
74 classic = "classic"
75 consumer = "consumer"
78class OffsetStoreMethod(str, Enum):
79 none = "none"
80 file = "file"
81 broker = "broker"
84class IsolationLevel(str, Enum):
85 read_uncommitted = "read_uncommitted"
86 read_committed = "read_committed"
89class CompressionCodec(str, Enum):
90 none = "none"
91 gzip = "gzip"
92 snappy = "snappy"
93 lz4 = "lz4"
94 zstd = "zstd"
97class CompressionType(str, Enum):
98 none = "none"
99 gzip = "gzip"
100 snappy = "snappy"
101 lz4 = "lz4"
102 zstd = "zstd"
105class ClientDNSLookup(str, Enum):
106 use_all_dns_ips = "use_all_dns_ips"
107 resolve_canonical_bootstrap_servers_only = "resolve_canonical_bootstrap_servers_only"
110_SharedConfig = {
111 "bootstrap_servers": "bootstrap.servers",
112 "client_id": "client.id",
113 "allow_auto_create_topics": "allow.auto.create.topics",
114 "connections_max_idle_ms": "connections.max.idle.ms",
115 "metadata_max_age_ms": "metadata.max.age.ms",
116}
118_ProducerConfig = _SharedConfig | {
119 "request_timeout_ms": "request.timeout.ms",
120 "compression_type": "compression.type",
121 "acks": "acks",
122 "retry_backoff_ms": "retry.backoff.ms",
123 "partitioner": "partitioner",
124 "max_request_size": "message.max.bytes",
125 "linger_ms": "linger.ms",
126 "enable_idempotence": "enable.idempotence",
127 "transactional_id": "transactional.id",
128 "transaction_timeout_ms": "transaction.timeout.ms",
129}
131_ConsumerConfig = _SharedConfig
133_AdminConfig = _SharedConfig | {
134 "request_timeout_ms": "request.timeout.ms",
135 "retry_backoff_ms": "retry.backoff.ms",
136}
138ConfluentConfig = TypedDict(
139 "ConfluentConfig",
140 {
141 "compression.codec": CompressionCodec | str,
142 "compression.type": CompressionType | str,
143 "client.dns.lookup": ClientDNSLookup | str,
144 "offset.store.method": OffsetStoreMethod | str,
145 "isolation.level": IsolationLevel | str,
146 "sasl.oauthbearer.method": SASLOAUTHBearerMethod | str,
147 "security.protocol": SecurityProtocol | str,
148 "broker.address.family": BrokerAddressFamily | str,
149 "builtin.features": BuiltinFeatures | str,
150 "debug": Debug | str,
151 "group.protocol": GroupProtocol | str,
152 "client.id": str,
153 "metadata.broker.list": str,
154 "bootstrap.servers": str,
155 "message.max.bytes": int,
156 "message.copy.max.bytes": int,
157 "receive.message.max.bytes": int,
158 "max.in.flight.requests.per.connection": int,
159 "max.in.flight": int,
160 "topic.metadata.refresh.interval.ms": int,
161 "metadata.max.age.ms": int,
162 "topic.metadata.refresh.fast.interval.ms": int,
163 "topic.metadata.refresh.fast.cnt": int,
164 "topic.metadata.refresh.sparse": bool,
165 "topic.metadata.propagation.max.ms": int,
166 "topic.blacklist": str,
167 "socket.timeout.ms": int,
168 "socket.blocking.max.ms": int,
169 "socket.send.buffer.bytes": int,
170 "socket.receive.buffer.bytes": int,
171 "socket.keepalive.enable": bool,
172 "socket.nagle.disable": bool,
173 "socket.max.fails": int,
174 "broker.address.ttl": int,
175 "socket.connection.setup.timeout.ms": int,
176 "connections.max.idle.ms": int,
177 "reconnect.backoff.jitter.ms": int,
178 "reconnect.backoff.ms": int,
179 "reconnect.backoff.max.ms": int,
180 "statistics.interval.ms": int,
181 "enabled_events": int,
182 "error_cb": Callable[..., Any],
183 "throttle_cb": Callable[..., Any],
184 "stats_cb": Callable[..., Any],
185 "log_cb": Callable[..., Any],
186 "log_level": int,
187 "log.queue": bool,
188 "log.thread.name": bool,
189 "enable.random.seed": bool,
190 "log.connection.close": bool,
191 "background_event_cb": Callable[..., Any],
192 "socket_cb": Callable[..., Any],
193 "connect_cb": Callable[..., Any],
194 "closesocket_cb": Callable[..., Any],
195 "open_cb": Callable[..., Any],
196 "resolve_cb": Callable[..., Any],
197 "opaque": str,
198 "default_topic_conf": str,
199 "internal.termination.signal": int,
200 "api.version.request": bool,
201 "api.version.request.timeout.ms": int,
202 "api.version.fallback.ms": int,
203 "broker.version.fallback": str,
204 "allow.auto.create.topics": bool,
205 "ssl.cipher.suites": str,
206 "ssl.curves.list": str,
207 "ssl.sigalgs.list": str,
208 "ssl.key.location": str,
209 "ssl.key.password": str,
210 "ssl.key.pem": str,
211 "ssl_key": str,
212 "ssl.certificate.location": str,
213 "ssl.certificate.pem": str,
214 "ssl_certificate": str,
215 "ssl.ca.location": str,
216 "ssl.ca.pem": str,
217 "ssl_ca": str,
218 "ssl.ca.certificate.stores": str,
219 "ssl.crl.location": str,
220 "ssl.keystore.location": str,
221 "ssl.keystore.password": str,
222 "ssl.providers": str,
223 "ssl.engine.location": str,
224 "ssl.engine.id": str,
225 "ssl_engine_callback_data": str,
226 "enable.ssl.certificate.verification": bool,
227 "ssl.endpoint.identification.algorithm": str,
228 "ssl.certificate.verify_cb": Callable[..., Any],
229 "sasl.mechanisms": str,
230 "sasl.mechanism": str,
231 "sasl.kerberos.service.name": str,
232 "sasl.kerberos.principal": str,
233 "sasl.kerberos.kinit.cmd": str,
234 "sasl.kerberos.keytab": str,
235 "sasl.kerberos.min.time.before.relogin": int,
236 "sasl.username": str,
237 "sasl.password": str,
238 "sasl.oauthbearer.config": str,
239 "enable.sasl.oauthbearer.unsecure.jwt": bool, # codespell:ignore unsecure
240 "oauth_cb": Callable[..., Any],
241 "sasl.oauthbearer.client.id": str,
242 "sasl.oauthbearer.client.secret": str,
243 "sasl.oauthbearer.scope": str,
244 "sasl.oauthbearer.extensions": str,
245 "sasl.oauthbearer.token.endpoint.url": str,
246 "plugin.library.paths": str,
247 "interceptors": str,
248 "group.id": str,
249 "group.instance.id": str,
250 "partition.assignment.strategy": str,
251 "session.timeout.ms": str,
252 "heartbeat.interval.ms": str,
253 "group.protocol.type": str,
254 "group.remote.assignor": str,
255 "coordinator.query.interval.ms": int,
256 "max.poll.interval.ms": int,
257 "enable.auto.commit": bool,
258 "auto.commit.interval.ms": int,
259 "enable.auto.offset.store": bool,
260 "queued.min.messages": int,
261 "queued.max.messages.kbytes": int,
262 "fetch.wait.max.ms": int,
263 "fetch.queue.backoff.ms": int,
264 "fetch.message.max.bytes": int,
265 "max.partition.fetch.bytes": int,
266 "fetch.max.bytes": int,
267 "fetch.min.bytes": int,
268 "fetch.error.backoff.ms": int,
269 "consume_cb": Callable[..., Any],
270 "rebalance_cb": Callable[..., Any],
271 "offset_commit_cb": Callable[..., Any],
272 "enable.partition.eof": bool,
273 "check.crcs": bool,
274 "client.rack": str,
275 "transactional.id": str,
276 "transaction.timeout.ms": int,
277 "enable.idempotence": bool,
278 "enable.gapless.guarantee": bool,
279 "queue.buffering.max.messages": int,
280 "queue.buffering.max.kbytes": int,
281 "queue.buffering.max.ms": float,
282 "delivery.timeout.ms": int,
283 "linger.ms": float,
284 "message.send.max.retries": int,
285 "retries": int,
286 "retry.backoff.ms": int,
287 "retry.backoff.max.ms": int,
288 "queue.buffering.backpressure.threshold": int,
289 "batch.num.messages": int,
290 "batch.size": int,
291 "delivery.report.only.error": bool,
292 "dr_cb": Callable[..., Any],
293 "dr_msg_cb": Callable[..., Any],
294 "sticky.partitioning.linger.ms": int,
295 "on_delivery": Callable[..., Any],
296 },
297 total=False,
298)
301class ConfluentFastConfig:
302 def __init__(
303 self,
304 *,
305 security: Optional["BaseSecurity"] = None,
306 config: ConfluentConfig | None = None,
307 # shared
308 bootstrap_servers: str | Iterable[str] = "localhost",
309 retry_backoff_ms: int = 100,
310 client_id: str | None = SERVICE_NAME,
311 allow_auto_create_topics: bool = True,
312 connections_max_idle_ms: int = 9 * 60 * 1000,
313 metadata_max_age_ms: int = 5 * 60 * 1000,
314 # producer
315 request_timeout_ms: int = 40 * 1000,
316 acks: Literal[0, 1, -1, "all"] = EMPTY,
317 compression_type: Literal["gzip", "snappy", "lz4", "zstd"] | None = None,
318 partitioner: str
319 | Callable[[bytes, list[Any], list[Any]], Any] = "consistent_random",
320 max_request_size: int = 1024 * 1024,
321 linger_ms: int = 0,
322 enable_idempotence: bool = False,
323 transactional_id: str | None = None,
324 transaction_timeout_ms: int = 60 * 1000,
325 ) -> None:
326 self.config = parse_security(security) | (config or {})
328 shared_config: dict[str, Any] = {
329 "bootstrap_servers": bootstrap_servers,
330 "client_id": client_id,
331 "allow_auto_create_topics": allow_auto_create_topics,
332 "connections_max_idle_ms": connections_max_idle_ms,
333 "metadata_max_age_ms": metadata_max_age_ms,
334 }
336 # extended consumer options were passed to `broker.subscriber` method
337 self.raw_consumer_config = shared_config
339 self.raw_producer_config = shared_config | {
340 "request_timeout_ms": request_timeout_ms,
341 "partitioner": partitioner,
342 "retry_backoff_ms": retry_backoff_ms,
343 "max_request_size": max_request_size,
344 "linger_ms": linger_ms,
345 "enable_idempotence": enable_idempotence,
346 "transactional_id": transactional_id,
347 "transaction_timeout_ms": transaction_timeout_ms,
348 }
350 if compression_type: 350 ↛ 351line 350 didn't jump to line 351 because the condition on line 350 was never true
351 self.raw_producer_config["compression_type"] = compression_type
353 if acks is EMPTY or acks == "all": 353 ↛ 356line 353 didn't jump to line 356 because the condition on line 353 was always true
354 self.raw_producer_config["acks"] = -1
355 else:
356 self.raw_producer_config["acks"] = acks
358 self.raw_admin_config = shared_config | {
359 "request_timeout_ms": request_timeout_ms,
360 "retry_backoff_ms": retry_backoff_ms,
361 }
363 @property
364 def consumer_config(self) -> dict[str, Any]:
365 return _to_confluent(
366 {_ConsumerConfig[k]: v for k, v in self.raw_consumer_config.items()}
367 | self.config,
368 )
370 @property
371 def producer_config(self) -> dict[str, Any]:
372 return _to_confluent(
373 {_ProducerConfig[k]: v for k, v in self.raw_producer_config.items()}
374 | self.config,
375 )
377 @property
378 def admin_config(self) -> dict[str, Any]:
379 return _to_confluent(
380 {_AdminConfig[k]: v for k, v in self.raw_admin_config.items()} | self.config,
381 )
384def _to_confluent(config: dict[str, Any]) -> dict[str, Any]:
385 data = config.copy()
387 for key, enum in (
388 ("compression.codec", CompressionCodec),
389 ("compression.type", CompressionType),
390 ("client.dns.lookup", ClientDNSLookup),
391 ("offset.store.method", OffsetStoreMethod),
392 ("isolation.level", IsolationLevel),
393 ("sasl.oauthbearer.method", SASLOAUTHBearerMethod),
394 ("security.protocol", SecurityProtocol),
395 ("broker.address.family", BrokerAddressFamily),
396 ("builtin.features", BuiltinFeatures),
397 ("debug", Debug),
398 ("group.protocol", GroupProtocol),
399 ):
400 if v := data.get(key):
401 data[key] = enum(v).value
403 bootstrap_servers = data.get("bootstrap.servers")
404 if ( 404 ↛ 411line 404 didn't jump to line 411 because the condition on line 404 was always true
405 bootstrap_servers
406 and isinstance(bootstrap_servers, Iterable)
407 and not isinstance(bootstrap_servers, str)
408 ):
409 data["bootstrap.servers"] = ",".join(bootstrap_servers)
411 return data