Coverage for faststream / asgi / factories / asyncapi / try_it_out.py: 90%
69 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from collections.abc import Sequence
2from contextlib import suppress
3from functools import lru_cache
4from typing import TYPE_CHECKING, Any, TypedDict, Union
6from faststream.asgi.annotations import Request
7from faststream.asgi.handlers import PostHandler, post
8from faststream.asgi.response import AsgiResponse, JSONResponse
9from faststream.exceptions import SubscriberNotFound
11if TYPE_CHECKING:
12 from faststream._internal.broker import BrokerUsecase
13 from faststream._internal.testing.broker import TestBroker
14 from faststream.specification.schema import Tag, TagDict
17class TryItOutOptions(TypedDict, total=False):
18 sendToRealBroker: bool
19 timestamp: str
22class TryItOutMessage(TypedDict, total=False):
23 """Wrapper sent by asyncapi-try-it-plugin.
25 The plugin always wraps the user's payload inside a nested ``message``
26 field together with operation metadata::
28 {
29 "operation_id": "...",
30 "operation_type": "...",
31 "message": <actual_user_payload>
32 }
33 """
35 operation_id: str
36 operation_type: str
37 message: Any
40class TryItOutForm(TypedDict):
41 channelName: str
42 message: TryItOutMessage
43 options: TryItOutOptions
46class TryItOutProcessor:
47 """Process try-it-out requests: parse, validate, publish to real or test broker."""
49 def __init__(self, broker: "BrokerUsecase[Any, Any]") -> None:
50 self._broker = broker
52 registry = _get_broker_registry()
53 for br_cls, test_broker_cls in registry.items(): 53 ↛ 59line 53 didn't jump to line 59 because the loop on line 53 didn't complete
54 if isinstance(self._broker, br_cls):
55 self._test_broker_cls = test_broker_cls
56 break
58 else:
59 msg = f"TestBroker not available for {broker}. Please, inspect your dependencies."
60 raise ValueError(msg)
62 async def process(self, body: TryItOutForm) -> AsgiResponse:
63 """Process parsed body: validate, dry-run or publish. Returns response."""
64 destination, *_ = body.get("channelName", "").split(":")
66 if not destination:
67 return JSONResponse({"details": "Missing channelName"}, 400)
69 message_wrapper = body.get("message", {})
70 payload: Any = message_wrapper.get("message")
71 options = body.get("options", {})
72 use_real_broker = options.get("sendToRealBroker", False)
74 try:
75 if use_real_broker: 75 ↛ 76line 75 didn't jump to line 76 because the condition on line 75 was never true
76 await self._broker.publish(payload, destination)
77 return JSONResponse("ok", 200)
79 async with self._test_broker_cls(self._broker) as br:
80 data = await br.request(payload, destination, timeout=30)
81 decoded = None
82 with suppress(Exception):
83 decoded = await data.decode()
84 return JSONResponse(
85 decoded if decoded is not None and decoded != b"" else "ok", 200
86 )
88 except SubscriberNotFound:
89 return JSONResponse({"details": f"{destination} destination not found."}, 404)
91 except Exception as e:
92 return JSONResponse({"details": repr(e)}, 500)
95def make_try_it_out_handler(
96 broker: "BrokerUsecase[Any, Any]",
97 description: str | None = None,
98 tags: Sequence[Union["Tag", "TagDict", dict[str, Any]]] | None = None,
99 unique_id: str | None = None,
100 include_in_schema: bool = False,
101) -> "PostHandler":
102 """Create POST handler for asyncapi-try-it-plugin to publish messages to broker."""
103 processor = TryItOutProcessor(broker)
105 @post(
106 description=description,
107 tags=tags,
108 unique_id=unique_id,
109 include_in_schema=include_in_schema,
110 )
111 async def try_it_out(request: Request) -> AsgiResponse:
112 try:
113 body: TryItOutForm = await request.json()
115 except Exception as e:
116 return JSONResponse({"details": f"Invalid JSON: {e}"}, 400)
118 return await processor.process(body)
120 return try_it_out
123@lru_cache(maxsize=1)
124def _get_broker_registry() -> dict[
125 type["BrokerUsecase[Any, Any]"],
126 type["TestBroker[Any]"],
127]:
128 registry: dict[type[BrokerUsecase[Any, Any]], type[TestBroker[Any]]] = {}
130 with suppress(ImportError):
131 from faststream.confluent import (
132 KafkaBroker as ConfluentKafkaBroker,
133 TestKafkaBroker as TestConfluentKafkaBroker,
134 )
136 registry[ConfluentKafkaBroker] = TestConfluentKafkaBroker
138 with suppress(ImportError):
139 from faststream.kafka import (
140 KafkaBroker as AioKafkaBroker,
141 TestKafkaBroker as TestAioKafkaBroker,
142 )
144 registry[AioKafkaBroker] = TestAioKafkaBroker
146 with suppress(ImportError):
147 from faststream.nats import NatsBroker, TestNatsBroker
149 registry[NatsBroker] = TestNatsBroker
151 with suppress(ImportError):
152 from faststream.rabbit import RabbitBroker, TestRabbitBroker
154 registry[RabbitBroker] = TestRabbitBroker
156 with suppress(ImportError):
157 from faststream.redis import RedisBroker, TestRedisBroker
159 registry[RedisBroker] = TestRedisBroker
161 with suppress(ImportError):
162 from faststream.mqtt import MQTTBroker, TestMQTTBroker
164 registry[MQTTBroker] = TestMQTTBroker
166 return registry