Coverage for faststream / confluent / subscriber / specification.py: 78%
19 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from faststream._internal.endpoint.subscriber import SubscriberSpecification
2from faststream.confluent.configs import KafkaBrokerConfig
3from faststream.specification.asyncapi.utils import resolve_payloads
4from faststream.specification.schema import Message, Operation, SubscriberSpec
5from faststream.specification.schema.bindings import ChannelBinding, kafka
7from .config import KafkaSubscriberSpecificationConfig
10class KafkaSubscriberSpecification(
11 SubscriberSpecification[KafkaBrokerConfig, KafkaSubscriberSpecificationConfig],
12):
13 @property
14 def topics(self) -> list[str]:
15 topics: set[str] = set()
17 topics.update(f"{self._outer_config.prefix}{t}" for t in self.config.topics)
19 topics.update(
20 f"{self._outer_config.prefix}{p.topic}" for p in self.config.partitions
21 )
23 return list(topics)
25 @property
26 def name(self) -> str:
27 if self.config.title_:
28 return self.config.title_
30 return f"{','.join(self.topics)}:{self.call_name}"
32 def get_schema(self) -> dict[str, SubscriberSpec]:
33 payloads = self.get_payloads()
35 channels = {}
36 for t in self.topics:
37 handler_name = self.config.title_ or f"{t}:{self.call_name}"
39 channels[handler_name] = SubscriberSpec(
40 description=self.description,
41 operation=Operation(
42 message=Message(
43 title=f"{handler_name}:Message",
44 payload=resolve_payloads(payloads),
45 ),
46 bindings=None,
47 ),
48 bindings=ChannelBinding(
49 kafka=kafka.ChannelBinding(
50 topic=t,
51 partitions=None,
52 replicas=None,
53 ),
54 ),
55 )
57 return channels