Coverage for tests / asyncapi / base / v3_0_0 / arguments.py: 98%
270 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import sys
2from dataclasses import dataclass
3from enum import Enum
4from typing import Annotated, Literal
6import pydantic
7import pytest
8from dirty_equals import IsDict, IsPartialDict, IsStr
9from fast_depends import Depends
10from fastapi import Depends as APIDepends
12from faststream import Context
13from faststream._internal._compat import PYDANTIC_V2
14from faststream._internal.broker import BrokerUsecase
15from faststream._internal.fastapi import StreamRouter
16from tests.marks import pydantic_v2
18from .basic import AsyncAPI300Factory
21class FastAPICompatible(AsyncAPI300Factory):
22 is_fastapi: bool = False
24 broker_class: BrokerUsecase | StreamRouter
25 dependency_builder = staticmethod(APIDepends)
27 def test_default_naming(self) -> None:
28 broker = self.broker_class()
30 @broker.subscriber("test")
31 async def handle(msg) -> None: ...
33 schema = self.get_spec(broker).to_jsonable()
35 channel_key = tuple(schema["channels"].keys())[0] # noqa: RUF015
36 operation_key = tuple(schema["operations"].keys())[0] # noqa: RUF015
38 assert channel_key == IsStr(regex=r"test[\w:]*:Handle"), channel_key
39 assert operation_key == IsStr(regex=r"test[\w:]*:HandleSubscribe"), operation_key
41 def test_custom_naming(self) -> None:
42 broker = self.broker_class()
44 @broker.subscriber("test", title="custom_name", description="test description")
45 async def handle(msg) -> None: ...
47 schema = self.get_spec(broker).to_jsonable()
49 channel_key = tuple(schema["channels"].keys())[0] # noqa: RUF015
50 operation_key = tuple(schema["operations"].keys())[0] # noqa: RUF015
52 assert channel_key == "custom_name"
53 assert operation_key == "custom_name"
55 assert schema["channels"][channel_key]["description"] == "test description"
57 def test_slash_in_title(self) -> None:
58 broker = self.broker_class()
60 @broker.subscriber("test", title="/")
61 async def handle(msg) -> None: ...
63 schema = self.get_spec(broker).to_jsonable()
65 channel_key = tuple(schema["channels"].keys())[0] # noqa: RUF015
66 operation_key = tuple(schema["operations"].keys())[0] # noqa: RUF015
68 assert channel_key == "."
69 assert schema["channels"][channel_key]["address"] == "/"
71 assert operation_key == ".Subscribe"
73 assert next(iter(schema["components"]["messages"].keys())) == ".:SubscribeMessage"
74 assert (
75 schema["components"]["messages"][".:SubscribeMessage"]["title"]
76 == "/:SubscribeMessage"
77 )
79 assert next(iter(schema["components"]["schemas"].keys())) == ".:Message:Payload"
80 assert (
81 schema["components"]["schemas"][".:Message:Payload"]["title"]
82 == "/:Message:Payload"
83 )
85 def test_docstring_description(self) -> None:
86 broker = self.broker_class()
88 @broker.subscriber("test", title="custom_name")
89 async def handle(msg) -> None:
90 """Test description."""
92 schema = self.get_spec(broker).to_jsonable()
93 key = tuple(schema["channels"].keys())[0] # noqa: RUF015
95 assert key == "custom_name"
96 assert schema["channels"][key]["description"] == "Test description.", schema[
97 "channels"
98 ][key]["description"]
100 def test_empty(self) -> None:
101 broker = self.broker_class()
103 @broker.subscriber("test")
104 async def handle() -> None: ...
106 schema = self.get_spec(broker).to_jsonable()
108 payload = schema["components"]["schemas"]
110 for key, v in payload.items():
111 assert key == "EmptyPayload"
112 assert v == {
113 "title": key,
114 "type": "null",
115 }
117 def test_no_type(self) -> None:
118 broker = self.broker_class()
120 @broker.subscriber("test")
121 async def handle(msg) -> None: ...
123 schema = self.get_spec(broker).to_jsonable()
125 payload = schema["components"]["schemas"]
127 for key, v in payload.items():
128 assert key == "Handle:Message:Payload"
129 assert v == {"title": key}
131 def test_simple_type(self) -> None:
132 broker = self.broker_class()
134 @broker.subscriber("test")
135 async def handle(msg: int) -> None: ...
137 schema = self.get_spec(broker).to_jsonable()
139 payload = schema["components"]["schemas"]
140 assert next(iter(schema["channels"].values())).get("description") is None
142 for key, v in payload.items():
143 assert key == "Handle:Message:Payload"
144 assert v == {"title": key, "type": "integer"}
146 def test_simple_optional_type(self) -> None:
147 broker = self.broker_class()
149 @broker.subscriber("test")
150 async def handle(msg: int | None) -> None: ...
152 schema = self.get_spec(broker).to_jsonable()
154 payload = schema["components"]["schemas"]
156 for key, v in payload.items():
157 assert key == "Handle:Message:Payload"
158 assert v == IsDict(
159 {
160 "anyOf": [{"type": "integer"}, {"type": "null"}],
161 "title": key,
162 },
163 ) | IsDict(
164 { # TODO: remove when deprecating PydanticV1
165 "title": key,
166 "type": "integer",
167 },
168 ), v
170 def test_simple_type_with_default(self) -> None:
171 broker = self.broker_class()
173 @broker.subscriber("test")
174 async def handle(msg: int = 1) -> None: ...
176 schema = self.get_spec(broker).to_jsonable()
178 payload = schema["components"]["schemas"]
180 for key, v in payload.items():
181 assert key == "Handle:Message:Payload"
182 assert v == {
183 "default": 1,
184 "title": key,
185 "type": "integer",
186 }
188 def test_multi_args_no_type(self) -> None:
189 broker = self.broker_class()
191 @broker.subscriber("test")
192 async def handle(msg, another) -> None: ...
194 schema = self.get_spec(broker).to_jsonable()
196 payload = schema["components"]["schemas"]
198 for key, v in payload.items():
199 assert key == "Handle:Message:Payload"
200 assert v == {
201 "properties": {
202 "another": {"title": "Another"},
203 "msg": {"title": "Msg"},
204 },
205 "required": ["msg", "another"],
206 "title": key,
207 "type": "object",
208 }
210 def test_multi_args_with_type(self) -> None:
211 broker = self.broker_class()
213 @broker.subscriber("test")
214 async def handle(msg: str, another: int) -> None: ...
216 schema = self.get_spec(broker).to_jsonable()
218 payload = schema["components"]["schemas"]
220 for key, v in payload.items():
221 assert key == "Handle:Message:Payload"
222 assert v == {
223 "properties": {
224 "another": {"title": "Another", "type": "integer"},
225 "msg": {"title": "Msg", "type": "string"},
226 },
227 "required": ["msg", "another"],
228 "title": key,
229 "type": "object",
230 }
232 def test_multi_args_with_default(self) -> None:
233 broker = self.broker_class()
235 @broker.subscriber("test")
236 async def handle(msg: str, another: int | None = None) -> None: ...
238 schema = self.get_spec(broker).to_jsonable()
240 payload = schema["components"]["schemas"]
242 for key, v in payload.items():
243 assert key == "Handle:Message:Payload"
245 assert v == {
246 "properties": {
247 "another": IsDict(
248 {
249 "anyOf": [{"type": "integer"}, {"type": "null"}],
250 "default": None,
251 "title": "Another",
252 },
253 )
254 | IsDict(
255 { # TODO: remove when deprecating PydanticV1
256 "title": "Another",
257 "type": "integer",
258 },
259 ),
260 "msg": {"title": "Msg", "type": "string"},
261 },
262 "required": ["msg"],
263 "title": key,
264 "type": "object",
265 }
267 def test_dataclass(self) -> None:
268 @dataclass
269 class User:
270 id: int
271 name: str = ""
273 broker = self.broker_class()
275 @broker.subscriber("test")
276 async def handle(user: User) -> None: ...
278 schema = self.get_spec(broker).to_jsonable()
280 payload = schema["components"]["schemas"]
282 for key, v in payload.items():
283 assert key == "User"
284 assert v == {
285 "properties": {
286 "id": {"title": "Id", "type": "integer"},
287 "name": {"default": "", "title": "Name", "type": "string"},
288 },
289 "required": ["id"],
290 "title": key,
291 "type": "object",
292 }
294 def test_pydantic_model(self) -> None:
295 class User(pydantic.BaseModel):
296 name: str = ""
297 id: int
299 broker = self.broker_class()
301 @broker.subscriber("test")
302 async def handle(user: User) -> None: ...
304 schema = self.get_spec(broker).to_jsonable()
306 payload = schema["components"]["schemas"]
308 for key, v in payload.items():
309 assert key == "User"
310 assert v == {
311 "properties": {
312 "id": {"title": "Id", "type": "integer"},
313 "name": {"default": "", "title": "Name", "type": "string"},
314 },
315 "required": ["id"],
316 "title": key,
317 "type": "object",
318 }
320 def test_pydantic_model_with_enum(self) -> None:
321 class Status(str, Enum):
322 registered = "registered"
323 banned = "banned"
325 class User(pydantic.BaseModel):
326 name: str = ""
327 id: int
328 status: Status
330 broker = self.broker_class()
332 @broker.subscriber("test")
333 async def handle(user: User) -> None: ...
335 schema = self.get_spec(broker).to_jsonable()
337 payload = schema["components"]["schemas"]
339 assert payload == {
340 "Status": IsPartialDict(
341 {
342 "enum": ["registered", "banned"],
343 "title": "Status",
344 "type": "string",
345 },
346 ),
347 "User": {
348 "properties": {
349 "id": {"title": "Id", "type": "integer"},
350 "name": {"default": "", "title": "Name", "type": "string"},
351 "status": {"$ref": "#/components/schemas/Status"},
352 },
353 "required": ["id", "status"],
354 "title": "User",
355 "type": "object",
356 },
357 }, payload
359 def test_pydantic_model_mixed_regular(self) -> None:
360 class Email(pydantic.BaseModel):
361 addr: str
363 class User(pydantic.BaseModel):
364 name: str = ""
365 id: int
366 email: Email
368 broker = self.broker_class()
370 @broker.subscriber("test")
371 async def handle(user: User, description: str = "") -> None: ...
373 schema = self.get_spec(broker).to_jsonable()
375 payload = schema["components"]["schemas"]
377 assert payload == {
378 "Email": {
379 "title": "Email",
380 "type": "object",
381 "properties": {"addr": {"title": "Addr", "type": "string"}},
382 "required": ["addr"],
383 },
384 "User": {
385 "title": "User",
386 "type": "object",
387 "properties": {
388 "name": {"title": "Name", "default": "", "type": "string"},
389 "id": {"title": "Id", "type": "integer"},
390 "email": {"$ref": "#/components/schemas/Email"},
391 },
392 "required": ["id", "email"],
393 },
394 "Handle:Message:Payload": {
395 "title": "Handle:Message:Payload",
396 "type": "object",
397 "properties": {
398 "user": {"$ref": "#/components/schemas/User"},
399 "description": {
400 "title": "Description",
401 "default": "",
402 "type": "string",
403 },
404 },
405 "required": ["user"],
406 },
407 }
409 def test_nested_models_in_union_should_be_in_schemas(self) -> None:
410 """Test that nested Pydantic models in union types are promoted to components/schemas.
412 Fixes issue #2443: Nested Pydantic models are not included in AsyncAPI
413 components/schemas (inplaced instead).
414 """
416 class Email(pydantic.BaseModel):
417 addr: str
419 class User(pydantic.BaseModel):
420 name: str = ""
421 id: int
422 email: Email
424 class Other(pydantic.BaseModel):
425 id: int
427 broker = self.broker_class()
429 @broker.subscriber("test")
430 async def handle(body: User | Other) -> None: ...
432 schema = self.get_spec(broker).to_jsonable()
434 payload = schema["components"]["schemas"]
436 # Check that nested Email model is promoted to components/schemas
437 assert "Email" in payload
438 assert payload["Email"] == {
439 "title": "Email",
440 "type": "object",
441 "properties": {"addr": {"title": "Addr", "type": "string"}},
442 "required": ["addr"],
443 }
445 def test_nested_models_in_publisher_union_should_be_in_schemas(self) -> None:
446 """Test that nested Pydantic models in publisher union types are promoted to components/schemas.
448 Fixes issue #2443: Nested Pydantic models are not included in AsyncAPI
449 components/schemas (inplaced instead).
450 """
452 class Email(pydantic.BaseModel):
453 addr: str
455 class User(pydantic.BaseModel):
456 name: str = ""
457 id: int
458 email: Email
460 class Other(pydantic.BaseModel):
461 id: int
463 broker = self.broker_class()
465 publisher = broker.publisher("test")
467 @publisher
468 def handle0(msg) -> User: ...
470 @publisher
471 def handle1(msg) -> Other: ...
473 schema = self.get_spec(broker).to_jsonable()
475 payload = schema["components"]["schemas"]
477 # Check that nested Email model is promoted to components/schemas
478 assert "Email" in payload
479 assert payload["Email"] == {
480 "title": "Email",
481 "type": "object",
482 "properties": {"addr": {"title": "Addr", "type": "string"}},
483 "required": ["addr"],
484 }
486 def test_pydantic_model_with_example(self) -> None:
487 class User(pydantic.BaseModel):
488 name: str = ""
489 id: int
491 if PYDANTIC_V2: 491 ↛ 498line 491 didn't jump to line 498 because the condition on line 491 was always true
492 model_config = {
493 "json_schema_extra": {"examples": [{"name": "john", "id": 1}]},
494 }
496 else:
498 class Config:
499 schema_extra = {"examples": [{"name": "john", "id": 1}]} # noqa: RUF012
501 broker = self.broker_class()
503 @broker.subscriber("test")
504 async def handle(user: User) -> None: ...
506 schema = self.get_spec(broker).to_jsonable()
508 payload = schema["components"]["schemas"]
510 for key, v in payload.items():
511 assert key == "User"
512 assert v == {
513 "examples": [{"id": 1, "name": "john"}],
514 "properties": {
515 "id": {"title": "Id", "type": "integer"},
516 "name": {"default": "", "title": "Name", "type": "string"},
517 },
518 "required": ["id"],
519 "title": "User",
520 "type": "object",
521 }
523 def test_with_filter(self) -> None:
524 class User(pydantic.BaseModel):
525 name: str = ""
526 id: int
528 broker = self.broker_class()
530 sub = broker.subscriber("test")
532 @sub( # pragma: no branch
533 filter=lambda m: m.content_type == "application/json",
534 )
535 async def handle(id: int) -> None: ...
537 @sub
538 async def handle_default(msg) -> None: ...
540 schema = self.get_spec(broker).to_jsonable()
542 assert (
543 len(
544 next(iter(schema["components"]["messages"].values()))["payload"]["oneOf"],
545 )
546 == 2
547 )
549 payload = schema["components"]["schemas"]
551 assert "Handle:Message:Payload" in list(payload.keys())
552 assert "HandleDefault:Message:Payload" in list(payload.keys())
554 def test_ignores_depends(self) -> None:
555 broker = self.broker_class()
557 def dep(name: str = ""):
558 return name
560 def dep2(name2: str):
561 return name2
563 dependencies = (self.dependency_builder(dep2),)
564 message = self.dependency_builder(dep)
566 @broker.subscriber("test", dependencies=dependencies)
567 async def handle(id: int, message=message) -> None: ...
569 schema = self.get_spec(broker).to_jsonable()
571 payload = schema["components"]["schemas"]
573 for key, v in payload.items():
574 assert key == "Handle:Message:Payload"
575 assert v == {
576 "properties": {
577 "id": {"title": "Id", "type": "integer"},
578 "name": {"default": "", "title": "Name", "type": "string"},
579 "name2": {"title": "Name2", "type": "string"},
580 },
581 "required": ["id", "name2"],
582 "title": key,
583 "type": "object",
584 }, v
586 @pydantic_v2
587 def test_discriminator(self) -> None:
588 class Sub2(pydantic.BaseModel):
589 type: Literal["sub2"]
591 class Sub(pydantic.BaseModel):
592 type: Literal["sub"]
594 broker = self.broker_class()
596 @broker.subscriber("test")
597 async def handle(
598 user: Annotated[Sub2 | Sub, pydantic.Field(discriminator="type")],
599 ): ...
601 schema = self.get_spec(broker).to_jsonable()
603 key = next(iter(schema["components"]["messages"].keys()))
605 assert key == IsStr(regex=r"test[\w:]*:Handle:SubscribeMessage"), key
607 p = schema["components"]["messages"][key]["payload"]
608 assert p == IsPartialDict({
609 "$ref": "#/components/schemas/Handle:Message:Payload",
610 }), p
612 assert schema["components"]["schemas"] == IsPartialDict({
613 "Sub": {
614 "properties": {
615 "type": IsPartialDict({"const": "sub", "title": "Type"}),
616 },
617 "required": ["type"],
618 "title": "Sub",
619 "type": "object",
620 },
621 "Sub2": {
622 "properties": {
623 "type": IsPartialDict({"const": "sub2", "title": "Type"}),
624 },
625 "required": ["type"],
626 "title": "Sub2",
627 "type": "object",
628 },
629 }), schema["components"]["schemas"]
631 payload = schema["components"]["schemas"].get("Handle:Message:Payload")
633 discriminator_payload = IsPartialDict({
634 "discriminator": "type",
635 "oneOf": [
636 {"$ref": "#/components/schemas/Sub2"},
637 {"$ref": "#/components/schemas/Sub"},
638 ],
639 "title": "Handle:Message:Payload",
640 })
642 if self.is_fastapi:
643 assert (
644 payload
645 == IsPartialDict({
646 "anyOf": [
647 {"$ref": "#/components/schemas/Sub2"},
648 {"$ref": "#/components/schemas/Sub"},
649 ],
650 })
651 | discriminator_payload
652 ), payload
654 else:
655 assert payload == discriminator_payload
657 @pydantic_v2
658 def test_nested_discriminator(self) -> None:
659 class Sub2(pydantic.BaseModel):
660 type: Literal["sub2"]
662 class Sub(pydantic.BaseModel):
663 type: Literal["sub"]
665 class Model(pydantic.BaseModel):
666 msg: Sub2 | Sub = pydantic.Field(..., discriminator="type")
668 broker = self.broker_class()
670 @broker.subscriber("test")
671 async def handle(user: Model) -> None: ...
673 schema = self.get_spec(broker).to_jsonable()
675 key = next(iter(schema["components"]["messages"].keys()))
676 assert key == IsStr(regex=r"test[\w:]*:Handle:SubscribeMessage")
677 assert schema["components"] == {
678 "messages": {
679 key: {
680 "title": key,
681 "correlationId": {"location": "$message.header#/correlation_id"},
682 "payload": {"$ref": "#/components/schemas/Model"},
683 },
684 },
685 "schemas": {
686 "Sub": {
687 "properties": {
688 "type": IsPartialDict({"const": "sub", "title": "Type"}),
689 },
690 "required": ["type"],
691 "title": "Sub",
692 "type": "object",
693 },
694 "Sub2": {
695 "properties": {
696 "type": IsPartialDict({"const": "sub2", "title": "Type"}),
697 },
698 "required": ["type"],
699 "title": "Sub2",
700 "type": "object",
701 },
702 "Model": {
703 "properties": {
704 "msg": {
705 "discriminator": "type",
706 "oneOf": [
707 {"$ref": "#/components/schemas/Sub2"},
708 {"$ref": "#/components/schemas/Sub"},
709 ],
710 "title": "Msg",
711 },
712 },
713 "required": ["msg"],
714 "title": "Model",
715 "type": "object",
716 },
717 },
718 }, schema["components"]
721class ArgumentsTestcase(FastAPICompatible):
722 dependency_builder = staticmethod(Depends)
724 def test_pydantic_field(self) -> None:
725 broker = self.broker_class()
727 @broker.subscriber("msg")
728 async def msg(
729 msg: pydantic.PositiveInt = pydantic.Field(
730 1,
731 description="some field",
732 title="Perfect",
733 examples=[1],
734 ),
735 ) -> None: ...
737 schema = self.get_spec(broker).to_jsonable()
739 payload = schema["components"]["schemas"]
741 for key, v in payload.items():
742 assert key == "Perfect"
744 assert v == {
745 "default": 1,
746 "description": "some field",
747 "examples": [1],
748 "exclusiveMinimum": 0,
749 "title": "Perfect",
750 "type": "integer",
751 }
753 def test_ignores_custom_field(self) -> None:
754 broker = self.broker_class()
756 @broker.subscriber("test")
757 async def handle(
758 id: int,
759 user: str | None = None,
760 message=Context(),
761 ) -> None: ...
763 schema = self.get_spec(broker).to_jsonable()
765 payload = schema["components"]["schemas"]
767 for key, v in payload.items():
768 assert v == IsDict(
769 {
770 "properties": {
771 "id": {"title": "Id", "type": "integer"},
772 "user": {
773 "anyOf": [{"type": "string"}, {"type": "null"}],
774 "default": None,
775 "title": "User",
776 },
777 },
778 "required": ["id"],
779 "title": key,
780 "type": "object",
781 },
782 ) | IsDict( # TODO: remove when deprecating PydanticV1
783 {
784 "properties": {
785 "id": {"title": "Id", "type": "integer"},
786 "user": {"title": "User", "type": "string"},
787 },
788 "required": ["id"],
789 "title": "Handle:Message:Payload",
790 "type": "object",
791 },
792 )
794 @pytest.mark.skipif(
795 sys.version_info >= (3, 14),
796 reason="Python 3.14 disallows redefining a class with the same name",
797 )
798 def test_overwrite_schema(self) -> None:
799 @dataclass
800 class User:
801 id: int
802 name: str = ""
804 broker = self.broker_class()
806 @broker.subscriber("test")
807 async def handle(user: User) -> None: ...
809 @dataclass
810 class User:
811 id: int
812 email: str = ""
814 @broker.subscriber("test2")
815 async def second_handle(user: User) -> None: ...
817 with pytest.warns(
818 RuntimeWarning,
819 match="Overwriting the message schema, data types have the same name",
820 ):
821 schema = self.get_spec(broker).to_jsonable()
823 payload = schema["components"]["schemas"]
825 assert len(payload) == 1
827 key, value = next(iter(payload.items()))
829 assert key == "User"
830 assert value == {
831 "properties": IsDict({
832 "id": {"title": "Id", "type": "integer"},
833 "email": {"default": "", "title": "Email", "type": "string"},
834 })
835 | IsDict({
836 "id": {"title": "Id", "type": "integer"},
837 "name": {"default": "", "title": "Name", "type": "string"},
838 }),
839 "required": ["id"],
840 "title": key,
841 "type": "object",
842 }