Coverage for faststream / specification / asyncapi / v2_6_0 / schema / bindings / kafka / channel.py: 79%

15 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1"""AsyncAPI Kafka bindings. 

2 

3References: https://github.com/asyncapi/bindings/tree/master/kafka 

4""" 

5 

6from pydantic import BaseModel, PositiveInt 

7from typing_extensions import Self 

8 

9from faststream.specification.schema.bindings import kafka 

10 

11 

12class ChannelBinding(BaseModel): 

13 """A class to represent a channel binding. 

14 

15 Attributes: 

16 topic : optional string representing the topic 

17 partitions : optional positive integer representing the number of partitions 

18 replicas : optional positive integer representing the number of replicas 

19 bindingVersion : string representing the binding version 

20 """ 

21 

22 topic: str | None = None 

23 partitions: PositiveInt | None = None 

24 replicas: PositiveInt | None = None 

25 bindingVersion: str = "0.4.0" 

26 

27 # TODO: 

28 # topicConfiguration 

29 

30 @classmethod 

31 def from_sub(cls, binding: kafka.ChannelBinding | None) -> Self | None: 

32 if binding is None: 32 ↛ 33line 32 didn't jump to line 33 because the condition on line 32 was never true

33 return None 

34 

35 return cls( 

36 topic=binding.topic, 

37 partitions=binding.partitions, 

38 replicas=binding.replicas, 

39 ) 

40 

41 @classmethod 

42 def from_pub(cls, binding: kafka.ChannelBinding | None) -> Self | None: 

43 if binding is None: 43 ↛ 44line 43 didn't jump to line 44 because the condition on line 43 was never true

44 return None 

45 

46 return cls( 

47 topic=binding.topic, 

48 partitions=binding.partitions, 

49 replicas=binding.replicas, 

50 )