Coverage for faststream / confluent / subscriber / specification.py: 78%

19 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from faststream._internal.endpoint.subscriber import SubscriberSpecification 

2from faststream.confluent.configs import KafkaBrokerConfig 

3from faststream.specification.asyncapi.utils import resolve_payloads 

4from faststream.specification.schema import Message, Operation, SubscriberSpec 

5from faststream.specification.schema.bindings import ChannelBinding, kafka 

6 

7from .config import KafkaSubscriberSpecificationConfig 

8 

9 

10class KafkaSubscriberSpecification( 

11 SubscriberSpecification[KafkaBrokerConfig, KafkaSubscriberSpecificationConfig], 

12): 

13 @property 

14 def topics(self) -> list[str]: 

15 topics: set[str] = set() 

16 

17 topics.update(f"{self._outer_config.prefix}{t}" for t in self.config.topics) 

18 

19 topics.update( 

20 f"{self._outer_config.prefix}{p.topic}" for p in self.config.partitions 

21 ) 

22 

23 return list(topics) 

24 

25 @property 

26 def name(self) -> str: 

27 if self.config.title_: 

28 return self.config.title_ 

29 

30 return f"{','.join(self.topics)}:{self.call_name}" 

31 

32 def get_schema(self) -> dict[str, SubscriberSpec]: 

33 payloads = self.get_payloads() 

34 

35 channels = {} 

36 for t in self.topics: 

37 handler_name = self.config.title_ or f"{t}:{self.call_name}" 

38 

39 channels[handler_name] = SubscriberSpec( 

40 description=self.description, 

41 operation=Operation( 

42 message=Message( 

43 title=f"{handler_name}:Message", 

44 payload=resolve_payloads(payloads), 

45 ), 

46 bindings=None, 

47 ), 

48 bindings=ChannelBinding( 

49 kafka=kafka.ChannelBinding( 

50 topic=t, 

51 partitions=None, 

52 replicas=None, 

53 ), 

54 ), 

55 ) 

56 

57 return channels