Coverage for faststream / kafka / subscriber / specification.py: 81%

21 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from faststream._internal.endpoint.subscriber import SubscriberSpecification 

2from faststream.kafka.configs import KafkaBrokerConfig 

3from faststream.specification.asyncapi.utils import resolve_payloads 

4from faststream.specification.schema import Message, Operation, SubscriberSpec 

5from faststream.specification.schema.bindings import ChannelBinding, kafka 

6 

7from .config import KafkaSubscriberSpecificationConfig 

8 

9 

10class KafkaSubscriberSpecification( 

11 SubscriberSpecification[KafkaBrokerConfig, KafkaSubscriberSpecificationConfig], 

12): 

13 @property 

14 def topics(self) -> list[str]: 

15 topics: set[str] = set() 

16 

17 topics.update(f"{self._outer_config.prefix}{t}" for t in self.config.topics) 

18 

19 topics.update( 

20 f"{self._outer_config.prefix}{p.topic}" for p in self.config.partitions 

21 ) 

22 

23 if self.config.pattern: 

24 topics.add(f"{self._outer_config.prefix}{self.config.pattern}") 

25 

26 return list(topics) 

27 

28 @property 

29 def name(self) -> str: 

30 if self.config.title_: 

31 return self.config.title_ 

32 

33 return f"{','.join(self.topics)}:{self.call_name}" 

34 

35 def get_schema(self) -> dict[str, SubscriberSpec]: 

36 payloads = self.get_payloads() 

37 

38 channels = {} 

39 for t in self.topics: 

40 handler_name = self.config.title_ or f"{t}:{self.call_name}" 

41 

42 channels[handler_name] = SubscriberSpec( 

43 description=self.description, 

44 operation=Operation( 

45 message=Message( 

46 title=f"{handler_name}:Message", 

47 payload=resolve_payloads(payloads), 

48 ), 

49 bindings=None, 

50 ), 

51 bindings=ChannelBinding( 

52 kafka=kafka.ChannelBinding( 

53 topic=t, 

54 partitions=None, 

55 replicas=None, 

56 ), 

57 ), 

58 ) 

59 

60 return channels