Coverage for faststream / specification / asyncapi / v2_6_0 / schema / bindings / kafka / operation.py: 47%

15 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1"""AsyncAPI Kafka bindings. 

2 

3References: https://github.com/asyncapi/bindings/tree/master/kafka 

4""" 

5 

6from typing import Any 

7 

8from pydantic import BaseModel 

9from typing_extensions import Self 

10 

11from faststream.specification.schema.bindings import kafka 

12 

13 

14class OperationBinding(BaseModel): 

15 """A class to represent an operation binding. 

16 

17 Attributes: 

18 groupId : optional dictionary representing the group ID 

19 clientId : optional dictionary representing the client ID 

20 replyTo : optional dictionary representing the reply-to 

21 bindingVersion : version of the binding (default: "0.4.0") 

22 """ 

23 

24 groupId: dict[str, Any] | None = None 

25 clientId: dict[str, Any] | None = None 

26 replyTo: dict[str, Any] | None = None 

27 bindingVersion: str = "0.4.0" 

28 

29 @classmethod 

30 def from_sub(cls, binding: kafka.OperationBinding | None) -> Self | None: 

31 if not binding: 

32 return None 

33 

34 return cls( 

35 groupId=binding.group_id, 

36 clientId=binding.client_id, 

37 replyTo=binding.reply_to, 

38 ) 

39 

40 @classmethod 

41 def from_pub(cls, binding: kafka.OperationBinding | None) -> Self | None: 

42 if not binding: 

43 return None 

44 

45 return cls( 

46 groupId=binding.group_id, 

47 clientId=binding.client_id, 

48 replyTo=binding.reply_to, 

49 )