Coverage for faststream / asgi / factories / asyncapi / try_it_out.py: 90%

69 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from collections.abc import Sequence 

2from contextlib import suppress 

3from functools import lru_cache 

4from typing import TYPE_CHECKING, Any, TypedDict, Union 

5 

6from faststream.asgi.annotations import Request 

7from faststream.asgi.handlers import PostHandler, post 

8from faststream.asgi.response import AsgiResponse, JSONResponse 

9from faststream.exceptions import SubscriberNotFound 

10 

11if TYPE_CHECKING: 

12 from faststream._internal.broker import BrokerUsecase 

13 from faststream._internal.testing.broker import TestBroker 

14 from faststream.specification.schema import Tag, TagDict 

15 

16 

17class TryItOutOptions(TypedDict, total=False): 

18 sendToRealBroker: bool 

19 timestamp: str 

20 

21 

22class TryItOutMessage(TypedDict, total=False): 

23 """Wrapper sent by asyncapi-try-it-plugin. 

24 

25 The plugin always wraps the user's payload inside a nested ``message`` 

26 field together with operation metadata:: 

27 

28 { 

29 "operation_id": "...", 

30 "operation_type": "...", 

31 "message": <actual_user_payload> 

32 } 

33 """ 

34 

35 operation_id: str 

36 operation_type: str 

37 message: Any 

38 

39 

40class TryItOutForm(TypedDict): 

41 channelName: str 

42 message: TryItOutMessage 

43 options: TryItOutOptions 

44 

45 

46class TryItOutProcessor: 

47 """Process try-it-out requests: parse, validate, publish to real or test broker.""" 

48 

49 def __init__(self, broker: "BrokerUsecase[Any, Any]") -> None: 

50 self._broker = broker 

51 

52 registry = _get_broker_registry() 

53 for br_cls, test_broker_cls in registry.items(): 53 ↛ 59line 53 didn't jump to line 59 because the loop on line 53 didn't complete

54 if isinstance(self._broker, br_cls): 

55 self._test_broker_cls = test_broker_cls 

56 break 

57 

58 else: 

59 msg = f"TestBroker not available for {broker}. Please, inspect your dependencies." 

60 raise ValueError(msg) 

61 

62 async def process(self, body: TryItOutForm) -> AsgiResponse: 

63 """Process parsed body: validate, dry-run or publish. Returns response.""" 

64 destination, *_ = body.get("channelName", "").split(":") 

65 

66 if not destination: 

67 return JSONResponse({"details": "Missing channelName"}, 400) 

68 

69 message_wrapper = body.get("message", {}) 

70 payload: Any = message_wrapper.get("message") 

71 options = body.get("options", {}) 

72 use_real_broker = options.get("sendToRealBroker", False) 

73 

74 try: 

75 if use_real_broker: 75 ↛ 76line 75 didn't jump to line 76 because the condition on line 75 was never true

76 await self._broker.publish(payload, destination) 

77 return JSONResponse("ok", 200) 

78 

79 async with self._test_broker_cls(self._broker) as br: 

80 data = await br.request(payload, destination, timeout=30) 

81 decoded = None 

82 with suppress(Exception): 

83 decoded = await data.decode() 

84 return JSONResponse( 

85 decoded if decoded is not None and decoded != b"" else "ok", 200 

86 ) 

87 

88 except SubscriberNotFound: 

89 return JSONResponse({"details": f"{destination} destination not found."}, 404) 

90 

91 except Exception as e: 

92 return JSONResponse({"details": repr(e)}, 500) 

93 

94 

95def make_try_it_out_handler( 

96 broker: "BrokerUsecase[Any, Any]", 

97 description: str | None = None, 

98 tags: Sequence[Union["Tag", "TagDict", dict[str, Any]]] | None = None, 

99 unique_id: str | None = None, 

100 include_in_schema: bool = False, 

101) -> "PostHandler": 

102 """Create POST handler for asyncapi-try-it-plugin to publish messages to broker.""" 

103 processor = TryItOutProcessor(broker) 

104 

105 @post( 

106 description=description, 

107 tags=tags, 

108 unique_id=unique_id, 

109 include_in_schema=include_in_schema, 

110 ) 

111 async def try_it_out(request: Request) -> AsgiResponse: 

112 try: 

113 body: TryItOutForm = await request.json() 

114 

115 except Exception as e: 

116 return JSONResponse({"details": f"Invalid JSON: {e}"}, 400) 

117 

118 return await processor.process(body) 

119 

120 return try_it_out 

121 

122 

123@lru_cache(maxsize=1) 

124def _get_broker_registry() -> dict[ 

125 type["BrokerUsecase[Any, Any]"], 

126 type["TestBroker[Any]"], 

127]: 

128 registry: dict[type[BrokerUsecase[Any, Any]], type[TestBroker[Any]]] = {} 

129 

130 with suppress(ImportError): 

131 from faststream.confluent import ( 

132 KafkaBroker as ConfluentKafkaBroker, 

133 TestKafkaBroker as TestConfluentKafkaBroker, 

134 ) 

135 

136 registry[ConfluentKafkaBroker] = TestConfluentKafkaBroker 

137 

138 with suppress(ImportError): 

139 from faststream.kafka import ( 

140 KafkaBroker as AioKafkaBroker, 

141 TestKafkaBroker as TestAioKafkaBroker, 

142 ) 

143 

144 registry[AioKafkaBroker] = TestAioKafkaBroker 

145 

146 with suppress(ImportError): 

147 from faststream.nats import NatsBroker, TestNatsBroker 

148 

149 registry[NatsBroker] = TestNatsBroker 

150 

151 with suppress(ImportError): 

152 from faststream.rabbit import RabbitBroker, TestRabbitBroker 

153 

154 registry[RabbitBroker] = TestRabbitBroker 

155 

156 with suppress(ImportError): 

157 from faststream.redis import RedisBroker, TestRedisBroker 

158 

159 registry[RedisBroker] = TestRedisBroker 

160 

161 with suppress(ImportError): 

162 from faststream.mqtt import MQTTBroker, TestMQTTBroker 

163 

164 registry[MQTTBroker] = TestMQTTBroker 

165 

166 return registry