Coverage for faststream / confluent / helpers / config.py: 96%

107 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from collections.abc import Callable, Iterable 

2from enum import Enum 

3from typing import TYPE_CHECKING, Any, Literal, Optional 

4 

5from typing_extensions import TypedDict 

6 

7from faststream.__about__ import SERVICE_NAME 

8from faststream._internal.constants import EMPTY 

9from faststream.confluent.security import parse_security 

10 

11if TYPE_CHECKING: 

12 from faststream.security import BaseSecurity 

13 

14 

15class BuiltinFeatures(str, Enum): 

16 gzip = "gzip" 

17 snappy = "snappy" 

18 ssl = "ssl" 

19 sasl = "sasl" 

20 regex = "regex" 

21 lz4 = "lz4" 

22 sasl_gssapi = "sasl_gssapi" 

23 sasl_plain = "sasl_plain" 

24 sasl_scram = "sasl_scram" 

25 plugins = "plugins" 

26 zstd = "zstd" 

27 sasl_oauthbearer = "sasl_oauthbearer" 

28 http = "http" 

29 oidc = "oidc" 

30 

31 

32class Debug(str, Enum): 

33 generic = "generic" 

34 broker = "broker" 

35 topic = "topic" 

36 metadata = "metadata" 

37 feature = "feature" 

38 queue = "queue" 

39 msg = "msg" 

40 protocol = "protocol" 

41 cgrp = "cgrp" 

42 security = "security" 

43 fetch = "fetch" 

44 interceptor = "interceptor" 

45 plugin = "plugin" 

46 consumer = "consumer" 

47 admin = "admin" 

48 eos = "eos" 

49 mock = "mock" 

50 assignor = "assignor" 

51 conf = "conf" 

52 all = "all" 

53 

54 

55class BrokerAddressFamily(str, Enum): 

56 any = "any" 

57 v4 = "v4" 

58 v6 = "v6" 

59 

60 

61class SecurityProtocol(str, Enum): 

62 plaintext = "plaintext" 

63 ssl = "ssl" 

64 sasl_plaintext = "sasl_plaintext" 

65 sasl_ssl = "sasl_ssl" 

66 

67 

68class SASLOAUTHBearerMethod(str, Enum): 

69 default = "default" 

70 oidc = "oidc" 

71 

72 

73class GroupProtocol(str, Enum): 

74 classic = "classic" 

75 consumer = "consumer" 

76 

77 

78class OffsetStoreMethod(str, Enum): 

79 none = "none" 

80 file = "file" 

81 broker = "broker" 

82 

83 

84class IsolationLevel(str, Enum): 

85 read_uncommitted = "read_uncommitted" 

86 read_committed = "read_committed" 

87 

88 

89class CompressionCodec(str, Enum): 

90 none = "none" 

91 gzip = "gzip" 

92 snappy = "snappy" 

93 lz4 = "lz4" 

94 zstd = "zstd" 

95 

96 

97class CompressionType(str, Enum): 

98 none = "none" 

99 gzip = "gzip" 

100 snappy = "snappy" 

101 lz4 = "lz4" 

102 zstd = "zstd" 

103 

104 

105class ClientDNSLookup(str, Enum): 

106 use_all_dns_ips = "use_all_dns_ips" 

107 resolve_canonical_bootstrap_servers_only = "resolve_canonical_bootstrap_servers_only" 

108 

109 

110_SharedConfig = { 

111 "bootstrap_servers": "bootstrap.servers", 

112 "client_id": "client.id", 

113 "allow_auto_create_topics": "allow.auto.create.topics", 

114 "connections_max_idle_ms": "connections.max.idle.ms", 

115 "metadata_max_age_ms": "metadata.max.age.ms", 

116} 

117 

118_ProducerConfig = _SharedConfig | { 

119 "request_timeout_ms": "request.timeout.ms", 

120 "compression_type": "compression.type", 

121 "acks": "acks", 

122 "retry_backoff_ms": "retry.backoff.ms", 

123 "partitioner": "partitioner", 

124 "max_request_size": "message.max.bytes", 

125 "linger_ms": "linger.ms", 

126 "enable_idempotence": "enable.idempotence", 

127 "transactional_id": "transactional.id", 

128 "transaction_timeout_ms": "transaction.timeout.ms", 

129} 

130 

131_ConsumerConfig = _SharedConfig 

132 

133_AdminConfig = _SharedConfig | { 

134 "request_timeout_ms": "request.timeout.ms", 

135 "retry_backoff_ms": "retry.backoff.ms", 

136} 

137 

138ConfluentConfig = TypedDict( 

139 "ConfluentConfig", 

140 { 

141 "compression.codec": CompressionCodec | str, 

142 "compression.type": CompressionType | str, 

143 "client.dns.lookup": ClientDNSLookup | str, 

144 "offset.store.method": OffsetStoreMethod | str, 

145 "isolation.level": IsolationLevel | str, 

146 "sasl.oauthbearer.method": SASLOAUTHBearerMethod | str, 

147 "security.protocol": SecurityProtocol | str, 

148 "broker.address.family": BrokerAddressFamily | str, 

149 "builtin.features": BuiltinFeatures | str, 

150 "debug": Debug | str, 

151 "group.protocol": GroupProtocol | str, 

152 "client.id": str, 

153 "metadata.broker.list": str, 

154 "bootstrap.servers": str, 

155 "message.max.bytes": int, 

156 "message.copy.max.bytes": int, 

157 "receive.message.max.bytes": int, 

158 "max.in.flight.requests.per.connection": int, 

159 "max.in.flight": int, 

160 "topic.metadata.refresh.interval.ms": int, 

161 "metadata.max.age.ms": int, 

162 "topic.metadata.refresh.fast.interval.ms": int, 

163 "topic.metadata.refresh.fast.cnt": int, 

164 "topic.metadata.refresh.sparse": bool, 

165 "topic.metadata.propagation.max.ms": int, 

166 "topic.blacklist": str, 

167 "socket.timeout.ms": int, 

168 "socket.blocking.max.ms": int, 

169 "socket.send.buffer.bytes": int, 

170 "socket.receive.buffer.bytes": int, 

171 "socket.keepalive.enable": bool, 

172 "socket.nagle.disable": bool, 

173 "socket.max.fails": int, 

174 "broker.address.ttl": int, 

175 "socket.connection.setup.timeout.ms": int, 

176 "connections.max.idle.ms": int, 

177 "reconnect.backoff.jitter.ms": int, 

178 "reconnect.backoff.ms": int, 

179 "reconnect.backoff.max.ms": int, 

180 "statistics.interval.ms": int, 

181 "enabled_events": int, 

182 "error_cb": Callable[..., Any], 

183 "throttle_cb": Callable[..., Any], 

184 "stats_cb": Callable[..., Any], 

185 "log_cb": Callable[..., Any], 

186 "log_level": int, 

187 "log.queue": bool, 

188 "log.thread.name": bool, 

189 "enable.random.seed": bool, 

190 "log.connection.close": bool, 

191 "background_event_cb": Callable[..., Any], 

192 "socket_cb": Callable[..., Any], 

193 "connect_cb": Callable[..., Any], 

194 "closesocket_cb": Callable[..., Any], 

195 "open_cb": Callable[..., Any], 

196 "resolve_cb": Callable[..., Any], 

197 "opaque": str, 

198 "default_topic_conf": str, 

199 "internal.termination.signal": int, 

200 "api.version.request": bool, 

201 "api.version.request.timeout.ms": int, 

202 "api.version.fallback.ms": int, 

203 "broker.version.fallback": str, 

204 "allow.auto.create.topics": bool, 

205 "ssl.cipher.suites": str, 

206 "ssl.curves.list": str, 

207 "ssl.sigalgs.list": str, 

208 "ssl.key.location": str, 

209 "ssl.key.password": str, 

210 "ssl.key.pem": str, 

211 "ssl_key": str, 

212 "ssl.certificate.location": str, 

213 "ssl.certificate.pem": str, 

214 "ssl_certificate": str, 

215 "ssl.ca.location": str, 

216 "ssl.ca.pem": str, 

217 "ssl_ca": str, 

218 "ssl.ca.certificate.stores": str, 

219 "ssl.crl.location": str, 

220 "ssl.keystore.location": str, 

221 "ssl.keystore.password": str, 

222 "ssl.providers": str, 

223 "ssl.engine.location": str, 

224 "ssl.engine.id": str, 

225 "ssl_engine_callback_data": str, 

226 "enable.ssl.certificate.verification": bool, 

227 "ssl.endpoint.identification.algorithm": str, 

228 "ssl.certificate.verify_cb": Callable[..., Any], 

229 "sasl.mechanisms": str, 

230 "sasl.mechanism": str, 

231 "sasl.kerberos.service.name": str, 

232 "sasl.kerberos.principal": str, 

233 "sasl.kerberos.kinit.cmd": str, 

234 "sasl.kerberos.keytab": str, 

235 "sasl.kerberos.min.time.before.relogin": int, 

236 "sasl.username": str, 

237 "sasl.password": str, 

238 "sasl.oauthbearer.config": str, 

239 "enable.sasl.oauthbearer.unsecure.jwt": bool, # codespell:ignore unsecure 

240 "oauth_cb": Callable[..., Any], 

241 "sasl.oauthbearer.client.id": str, 

242 "sasl.oauthbearer.client.secret": str, 

243 "sasl.oauthbearer.scope": str, 

244 "sasl.oauthbearer.extensions": str, 

245 "sasl.oauthbearer.token.endpoint.url": str, 

246 "plugin.library.paths": str, 

247 "interceptors": str, 

248 "group.id": str, 

249 "group.instance.id": str, 

250 "partition.assignment.strategy": str, 

251 "session.timeout.ms": str, 

252 "heartbeat.interval.ms": str, 

253 "group.protocol.type": str, 

254 "group.remote.assignor": str, 

255 "coordinator.query.interval.ms": int, 

256 "max.poll.interval.ms": int, 

257 "enable.auto.commit": bool, 

258 "auto.commit.interval.ms": int, 

259 "enable.auto.offset.store": bool, 

260 "queued.min.messages": int, 

261 "queued.max.messages.kbytes": int, 

262 "fetch.wait.max.ms": int, 

263 "fetch.queue.backoff.ms": int, 

264 "fetch.message.max.bytes": int, 

265 "max.partition.fetch.bytes": int, 

266 "fetch.max.bytes": int, 

267 "fetch.min.bytes": int, 

268 "fetch.error.backoff.ms": int, 

269 "consume_cb": Callable[..., Any], 

270 "rebalance_cb": Callable[..., Any], 

271 "offset_commit_cb": Callable[..., Any], 

272 "enable.partition.eof": bool, 

273 "check.crcs": bool, 

274 "client.rack": str, 

275 "transactional.id": str, 

276 "transaction.timeout.ms": int, 

277 "enable.idempotence": bool, 

278 "enable.gapless.guarantee": bool, 

279 "queue.buffering.max.messages": int, 

280 "queue.buffering.max.kbytes": int, 

281 "queue.buffering.max.ms": float, 

282 "delivery.timeout.ms": int, 

283 "linger.ms": float, 

284 "message.send.max.retries": int, 

285 "retries": int, 

286 "retry.backoff.ms": int, 

287 "retry.backoff.max.ms": int, 

288 "queue.buffering.backpressure.threshold": int, 

289 "batch.num.messages": int, 

290 "batch.size": int, 

291 "delivery.report.only.error": bool, 

292 "dr_cb": Callable[..., Any], 

293 "dr_msg_cb": Callable[..., Any], 

294 "sticky.partitioning.linger.ms": int, 

295 "on_delivery": Callable[..., Any], 

296 }, 

297 total=False, 

298) 

299 

300 

301class ConfluentFastConfig: 

302 def __init__( 

303 self, 

304 *, 

305 security: Optional["BaseSecurity"] = None, 

306 config: ConfluentConfig | None = None, 

307 # shared 

308 bootstrap_servers: str | Iterable[str] = "localhost", 

309 retry_backoff_ms: int = 100, 

310 client_id: str | None = SERVICE_NAME, 

311 allow_auto_create_topics: bool = True, 

312 connections_max_idle_ms: int = 9 * 60 * 1000, 

313 metadata_max_age_ms: int = 5 * 60 * 1000, 

314 # producer 

315 request_timeout_ms: int = 40 * 1000, 

316 acks: Literal[0, 1, -1, "all"] = EMPTY, 

317 compression_type: Literal["gzip", "snappy", "lz4", "zstd"] | None = None, 

318 partitioner: str 

319 | Callable[[bytes, list[Any], list[Any]], Any] = "consistent_random", 

320 max_request_size: int = 1024 * 1024, 

321 linger_ms: int = 0, 

322 enable_idempotence: bool = False, 

323 transactional_id: str | None = None, 

324 transaction_timeout_ms: int = 60 * 1000, 

325 ) -> None: 

326 self.config = parse_security(security) | (config or {}) 

327 

328 shared_config: dict[str, Any] = { 

329 "bootstrap_servers": bootstrap_servers, 

330 "client_id": client_id, 

331 "allow_auto_create_topics": allow_auto_create_topics, 

332 "connections_max_idle_ms": connections_max_idle_ms, 

333 "metadata_max_age_ms": metadata_max_age_ms, 

334 } 

335 

336 # extended consumer options were passed to `broker.subscriber` method 

337 self.raw_consumer_config = shared_config 

338 

339 self.raw_producer_config = shared_config | { 

340 "request_timeout_ms": request_timeout_ms, 

341 "partitioner": partitioner, 

342 "retry_backoff_ms": retry_backoff_ms, 

343 "max_request_size": max_request_size, 

344 "linger_ms": linger_ms, 

345 "enable_idempotence": enable_idempotence, 

346 "transactional_id": transactional_id, 

347 "transaction_timeout_ms": transaction_timeout_ms, 

348 } 

349 

350 if compression_type: 350 ↛ 351line 350 didn't jump to line 351 because the condition on line 350 was never true

351 self.raw_producer_config["compression_type"] = compression_type 

352 

353 if acks is EMPTY or acks == "all": 353 ↛ 356line 353 didn't jump to line 356 because the condition on line 353 was always true

354 self.raw_producer_config["acks"] = -1 

355 else: 

356 self.raw_producer_config["acks"] = acks 

357 

358 self.raw_admin_config = shared_config | { 

359 "request_timeout_ms": request_timeout_ms, 

360 "retry_backoff_ms": retry_backoff_ms, 

361 } 

362 

363 @property 

364 def consumer_config(self) -> dict[str, Any]: 

365 return _to_confluent( 

366 {_ConsumerConfig[k]: v for k, v in self.raw_consumer_config.items()} 

367 | self.config, 

368 ) 

369 

370 @property 

371 def producer_config(self) -> dict[str, Any]: 

372 return _to_confluent( 

373 {_ProducerConfig[k]: v for k, v in self.raw_producer_config.items()} 

374 | self.config, 

375 ) 

376 

377 @property 

378 def admin_config(self) -> dict[str, Any]: 

379 return _to_confluent( 

380 {_AdminConfig[k]: v for k, v in self.raw_admin_config.items()} | self.config, 

381 ) 

382 

383 

384def _to_confluent(config: dict[str, Any]) -> dict[str, Any]: 

385 data = config.copy() 

386 

387 for key, enum in ( 

388 ("compression.codec", CompressionCodec), 

389 ("compression.type", CompressionType), 

390 ("client.dns.lookup", ClientDNSLookup), 

391 ("offset.store.method", OffsetStoreMethod), 

392 ("isolation.level", IsolationLevel), 

393 ("sasl.oauthbearer.method", SASLOAUTHBearerMethod), 

394 ("security.protocol", SecurityProtocol), 

395 ("broker.address.family", BrokerAddressFamily), 

396 ("builtin.features", BuiltinFeatures), 

397 ("debug", Debug), 

398 ("group.protocol", GroupProtocol), 

399 ): 

400 if v := data.get(key): 

401 data[key] = enum(v).value 

402 

403 bootstrap_servers = data.get("bootstrap.servers") 

404 if ( 404 ↛ 411line 404 didn't jump to line 411 because the condition on line 404 was always true

405 bootstrap_servers 

406 and isinstance(bootstrap_servers, Iterable) 

407 and not isinstance(bootstrap_servers, str) 

408 ): 

409 data["bootstrap.servers"] = ",".join(bootstrap_servers) 

410 

411 return data