Coverage for faststream / specification / asyncapi / v2_6_0 / schema / bindings / kafka / channel.py: 79%
15 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1"""AsyncAPI Kafka bindings.
3References: https://github.com/asyncapi/bindings/tree/master/kafka
4"""
6from pydantic import BaseModel, PositiveInt
7from typing_extensions import Self
9from faststream.specification.schema.bindings import kafka
12class ChannelBinding(BaseModel):
13 """A class to represent a channel binding.
15 Attributes:
16 topic : optional string representing the topic
17 partitions : optional positive integer representing the number of partitions
18 replicas : optional positive integer representing the number of replicas
19 bindingVersion : string representing the binding version
20 """
22 topic: str | None = None
23 partitions: PositiveInt | None = None
24 replicas: PositiveInt | None = None
25 bindingVersion: str = "0.4.0"
27 # TODO:
28 # topicConfiguration
30 @classmethod
31 def from_sub(cls, binding: kafka.ChannelBinding | None) -> Self | None:
32 if binding is None: 32 ↛ 33line 32 didn't jump to line 33 because the condition on line 32 was never true
33 return None
35 return cls(
36 topic=binding.topic,
37 partitions=binding.partitions,
38 replicas=binding.replicas,
39 )
41 @classmethod
42 def from_pub(cls, binding: kafka.ChannelBinding | None) -> Self | None:
43 if binding is None: 43 ↛ 44line 43 didn't jump to line 44 because the condition on line 43 was never true
44 return None
46 return cls(
47 topic=binding.topic,
48 partitions=binding.partitions,
49 replicas=binding.replicas,
50 )