Coverage for faststream / specification / asyncapi / v2_6_0 / schema / operations.py: 85%

18 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from typing import Any 

2 

3from pydantic import BaseModel 

4from typing_extensions import Self 

5 

6from faststream._internal._compat import PYDANTIC_V2 

7from faststream.specification.schema.operation import Operation as OperationSpec 

8 

9from .bindings import OperationBinding 

10from .message import Message 

11from .tag import Tag 

12from .utils import Reference 

13 

14 

15class Operation(BaseModel): 

16 """A class to represent an operation. 

17 

18 Attributes: 

19 operationId : ID of the operation 

20 summary : summary of the operation 

21 description : description of the operation 

22 bindings : bindings of the operation 

23 message : message of the operation 

24 security : security details of the operation 

25 tags : tags associated with the operation 

26 """ 

27 

28 operationId: str | None = None 

29 summary: str | None = None 

30 description: str | None = None 

31 

32 bindings: OperationBinding | None = None 

33 

34 message: Message | Reference | None 

35 

36 security: dict[str, list[str]] | None = None 

37 

38 # TODO 

39 # traits 

40 

41 tags: list[Tag | dict[str, Any]] | None = None 

42 

43 if PYDANTIC_V2: 43 ↛ 48line 43 didn't jump to line 48 because the condition on line 43 was always true

44 model_config = {"extra": "allow"} 

45 

46 else: 

47 

48 class Config: 

49 extra = "allow" 

50 

51 @classmethod 

52 def from_sub(cls, operation: OperationSpec) -> Self: 

53 return cls( 

54 message=Message.from_spec(operation.message), 

55 bindings=OperationBinding.from_sub(operation.bindings), 

56 operationId=None, 

57 summary=None, 

58 description=None, 

59 tags=None, 

60 security=None, 

61 ) 

62 

63 @classmethod 

64 def from_pub(cls, operation: OperationSpec) -> Self: 

65 return cls( 

66 message=Message.from_spec(operation.message), 

67 bindings=OperationBinding.from_pub(operation.bindings), 

68 operationId=None, 

69 summary=None, 

70 description=None, 

71 tags=None, 

72 security=None, 

73 )