Coverage for faststream / kafka / subscriber / specification.py: 81%
21 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from faststream._internal.endpoint.subscriber import SubscriberSpecification
2from faststream.kafka.configs import KafkaBrokerConfig
3from faststream.specification.asyncapi.utils import resolve_payloads
4from faststream.specification.schema import Message, Operation, SubscriberSpec
5from faststream.specification.schema.bindings import ChannelBinding, kafka
7from .config import KafkaSubscriberSpecificationConfig
10class KafkaSubscriberSpecification(
11 SubscriberSpecification[KafkaBrokerConfig, KafkaSubscriberSpecificationConfig],
12):
13 @property
14 def topics(self) -> list[str]:
15 topics: set[str] = set()
17 topics.update(f"{self._outer_config.prefix}{t}" for t in self.config.topics)
19 topics.update(
20 f"{self._outer_config.prefix}{p.topic}" for p in self.config.partitions
21 )
23 if self.config.pattern:
24 topics.add(f"{self._outer_config.prefix}{self.config.pattern}")
26 return list(topics)
28 @property
29 def name(self) -> str:
30 if self.config.title_:
31 return self.config.title_
33 return f"{','.join(self.topics)}:{self.call_name}"
35 def get_schema(self) -> dict[str, SubscriberSpec]:
36 payloads = self.get_payloads()
38 channels = {}
39 for t in self.topics:
40 handler_name = self.config.title_ or f"{t}:{self.call_name}"
42 channels[handler_name] = SubscriberSpec(
43 description=self.description,
44 operation=Operation(
45 message=Message(
46 title=f"{handler_name}:Message",
47 payload=resolve_payloads(payloads),
48 ),
49 bindings=None,
50 ),
51 bindings=ChannelBinding(
52 kafka=kafka.ChannelBinding(
53 topic=t,
54 partitions=None,
55 replicas=None,
56 ),
57 ),
58 )
60 return channels