Coverage for faststream / specification / asyncapi / v2_6_0 / schema / bindings / kafka / operation.py: 47%
15 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1"""AsyncAPI Kafka bindings.
3References: https://github.com/asyncapi/bindings/tree/master/kafka
4"""
6from typing import Any
8from pydantic import BaseModel
9from typing_extensions import Self
11from faststream.specification.schema.bindings import kafka
14class OperationBinding(BaseModel):
15 """A class to represent an operation binding.
17 Attributes:
18 groupId : optional dictionary representing the group ID
19 clientId : optional dictionary representing the client ID
20 replyTo : optional dictionary representing the reply-to
21 bindingVersion : version of the binding (default: "0.4.0")
22 """
24 groupId: dict[str, Any] | None = None
25 clientId: dict[str, Any] | None = None
26 replyTo: dict[str, Any] | None = None
27 bindingVersion: str = "0.4.0"
29 @classmethod
30 def from_sub(cls, binding: kafka.OperationBinding | None) -> Self | None:
31 if not binding:
32 return None
34 return cls(
35 groupId=binding.group_id,
36 clientId=binding.client_id,
37 replyTo=binding.reply_to,
38 )
40 @classmethod
41 def from_pub(cls, binding: kafka.OperationBinding | None) -> Self | None:
42 if not binding:
43 return None
45 return cls(
46 groupId=binding.group_id,
47 clientId=binding.client_id,
48 replyTo=binding.reply_to,
49 )