Coverage for tests / asyncapi / base / v2_6_0 / arguments.py: 98%
280 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import sys
2from dataclasses import dataclass, field
3from enum import Enum
4from typing import Annotated, Literal
6import pydantic
7import pytest
8from dirty_equals import IsDict, IsPartialDict, IsStr
9from fast_depends import Depends
11from faststream import Context
12from faststream._internal.broker import BrokerUsecase
13from tests.marks import PYDANTIC_V2, pydantic_v2
15from .basic import AsyncAPI260Factory
18class FastAPICompatible(AsyncAPI260Factory):
19 is_fastapi: bool = False
21 broker_class: type[BrokerUsecase]
22 dependency_builder = staticmethod(Depends)
24 def test_custom_naming(self) -> None:
25 broker = self.broker_class()
27 @broker.subscriber("test", title="custom_name", description="test description")
28 async def handle(msg) -> None: ...
30 schema = self.get_spec(broker).to_jsonable()
31 key = tuple(schema["channels"].keys())[0] # noqa: RUF015
33 assert key == "custom_name"
34 assert schema["channels"][key]["description"] == "test description"
36 def test_slash_in_title(self) -> None:
37 broker = self.broker_class()
39 @broker.subscriber("test", title="/")
40 async def handle(msg) -> None: ...
42 schema = self.get_spec(broker).to_jsonable()
44 assert next(iter(schema["channels"].keys())) == "/"
46 assert next(iter(schema["components"]["messages"].keys())) == ".:Message"
47 assert schema["components"]["messages"][".:Message"]["title"] == "/:Message"
49 assert next(iter(schema["components"]["schemas"].keys())) == ".:Message:Payload"
50 assert (
51 schema["components"]["schemas"][".:Message:Payload"]["title"]
52 == "/:Message:Payload"
53 )
55 def test_docstring_description(self) -> None:
56 broker = self.broker_class()
58 @broker.subscriber("test", title="custom_name")
59 async def handle(msg) -> None:
60 """Test description."""
62 schema = self.get_spec(broker).to_jsonable()
63 key = tuple(schema["channels"].keys())[0] # noqa: RUF015
65 assert key == "custom_name"
66 assert schema["channels"][key]["description"] == "Test description.", schema[
67 "channels"
68 ][key]
70 def test_empty(self) -> None:
71 broker = self.broker_class()
73 @broker.subscriber("test")
74 async def handle() -> None: ...
76 schema = self.get_spec(broker).to_jsonable()
78 payload = schema["components"]["schemas"]
80 for key, v in payload.items():
81 assert key == "EmptyPayload"
82 assert v == {
83 "title": key,
84 "type": "null",
85 }
87 def test_no_type(self) -> None:
88 broker = self.broker_class()
90 @broker.subscriber("test")
91 async def handle(msg) -> None: ...
93 schema = self.get_spec(broker).to_jsonable()
95 payload = schema["components"]["schemas"]
97 for key, v in payload.items():
98 assert key == "Handle:Message:Payload"
99 assert v == {"title": key}
101 def test_simple_type(self) -> None:
102 broker = self.broker_class()
104 @broker.subscriber("test")
105 async def handle(msg: int) -> None: ...
107 schema = self.get_spec(broker).to_jsonable()
109 payload = schema["components"]["schemas"]
110 assert next(iter(schema["channels"].values())).get("description") is None
112 for key, v in payload.items():
113 assert key == "Handle:Message:Payload"
114 assert v == {"title": key, "type": "integer"}
116 def test_simple_optional_type(self) -> None:
117 broker = self.broker_class()
119 @broker.subscriber("test")
120 async def handle(msg: int | None) -> None: ...
122 schema = self.get_spec(broker).to_jsonable()
124 payload = schema["components"]["schemas"]
126 for key, v in payload.items():
127 assert key == "Handle:Message:Payload"
128 assert v == IsDict(
129 {
130 "anyOf": [{"type": "integer"}, {"type": "null"}],
131 "title": key,
132 },
133 ) | IsDict(
134 { # TODO: remove when deprecating PydanticV1
135 "title": key,
136 "type": "integer",
137 },
138 ), v
140 def test_simple_type_with_default(self) -> None:
141 broker = self.broker_class()
143 @broker.subscriber("test")
144 async def handle(msg: int = 1) -> None: ...
146 schema = self.get_spec(broker).to_jsonable()
148 payload = schema["components"]["schemas"]
150 for key, v in payload.items():
151 assert key == "Handle:Message:Payload"
152 assert v == {
153 "default": 1,
154 "title": key,
155 "type": "integer",
156 }
158 def test_multi_args_no_type(self) -> None:
159 broker = self.broker_class()
161 @broker.subscriber("test")
162 async def handle(msg, another) -> None: ...
164 schema = self.get_spec(broker).to_jsonable()
166 payload = schema["components"]["schemas"]
168 for key, v in payload.items():
169 assert key == "Handle:Message:Payload"
170 assert v == {
171 "properties": {
172 "another": {"title": "Another"},
173 "msg": {"title": "Msg"},
174 },
175 "required": ["msg", "another"],
176 "title": key,
177 "type": "object",
178 }
180 def test_multi_args_with_type(self) -> None:
181 broker = self.broker_class()
183 @broker.subscriber("test")
184 async def handle(msg: str, another: int) -> None: ...
186 schema = self.get_spec(broker).to_jsonable()
188 payload = schema["components"]["schemas"]
190 for key, v in payload.items():
191 assert key == "Handle:Message:Payload"
192 assert v == {
193 "properties": {
194 "another": {"title": "Another", "type": "integer"},
195 "msg": {"title": "Msg", "type": "string"},
196 },
197 "required": ["msg", "another"],
198 "title": key,
199 "type": "object",
200 }
202 def test_multi_args_with_default(self) -> None:
203 broker = self.broker_class()
205 @broker.subscriber("test")
206 async def handle(msg: str, another: int | None = None) -> None: ...
208 schema = self.get_spec(broker).to_jsonable()
210 payload = schema["components"]["schemas"]
212 for key, v in payload.items():
213 assert key == "Handle:Message:Payload"
215 assert v == {
216 "properties": {
217 "another": IsDict(
218 {
219 "anyOf": [{"type": "integer"}, {"type": "null"}],
220 "default": None,
221 "title": "Another",
222 },
223 )
224 | IsDict(
225 { # TODO: remove when deprecating PydanticV1
226 "title": "Another",
227 "type": "integer",
228 },
229 ),
230 "msg": {"title": "Msg", "type": "string"},
231 },
232 "required": ["msg"],
233 "title": key,
234 "type": "object",
235 }
237 def test_dataclass(self) -> None:
238 @dataclass
239 class User:
240 id: int
241 name: str = ""
243 broker = self.broker_class()
245 @broker.subscriber("test")
246 async def handle(user: User) -> None: ...
248 schema = self.get_spec(broker).to_jsonable()
250 payload = schema["components"]["schemas"]
252 for key, v in payload.items():
253 assert key == "User"
254 assert v == {
255 "properties": {
256 "id": {"title": "Id", "type": "integer"},
257 "name": {"default": "", "title": "Name", "type": "string"},
258 },
259 "required": ["id"],
260 "title": key,
261 "type": "object",
262 }
264 def test_dataclasses_nested(self):
265 @dataclass
266 class Product:
267 id: int
268 name: str = ""
270 @dataclass
271 class Order:
272 id: int
273 products: list[Product] = field(default_factory=list)
275 broker = self.broker_class()
277 @broker.subscriber("test")
278 async def handle(order: Order): ...
280 schema = self.get_spec(broker).to_jsonable()
282 payload = schema["components"]["schemas"]
284 assert payload == {
285 "Product": {
286 "properties": {
287 "id": {"title": "Id", "type": "integer"},
288 "name": {"default": "", "title": "Name", "type": "string"},
289 },
290 "required": ["id"],
291 "title": "Product",
292 "type": "object",
293 },
294 "Order": {
295 "properties": {
296 "id": {"title": "Id", "type": "integer"},
297 "products": {
298 "items": {"$ref": "#/components/schemas/Product"},
299 "title": "Products",
300 "type": "array",
301 },
302 },
303 "required": ["id"],
304 "title": "Order",
305 "type": "object",
306 },
307 }
309 def test_pydantic_model(self):
310 class User(pydantic.BaseModel):
311 name: str = ""
312 id: int
314 broker = self.broker_class()
316 @broker.subscriber("test")
317 async def handle(user: User) -> None: ...
319 schema = self.get_spec(broker).to_jsonable()
321 payload = schema["components"]["schemas"]
323 for key, v in payload.items():
324 assert key == "User"
325 assert v == {
326 "properties": {
327 "id": {"title": "Id", "type": "integer"},
328 "name": {"default": "", "title": "Name", "type": "string"},
329 },
330 "required": ["id"],
331 "title": key,
332 "type": "object",
333 }
335 def test_pydantic_model_with_enum(self) -> None:
336 class Status(str, Enum):
337 registered = "registered"
338 banned = "banned"
340 class User(pydantic.BaseModel):
341 name: str = ""
342 id: int
343 status: Status
345 broker = self.broker_class()
347 @broker.subscriber("test")
348 async def handle(user: User) -> None: ...
350 schema = self.get_spec(broker).to_jsonable()
352 payload = schema["components"]["schemas"]
354 assert payload == {
355 "Status": IsPartialDict(
356 {
357 "enum": ["registered", "banned"],
358 "title": "Status",
359 "type": "string",
360 },
361 ),
362 "User": {
363 "properties": {
364 "id": {"title": "Id", "type": "integer"},
365 "name": {"default": "", "title": "Name", "type": "string"},
366 "status": {"$ref": "#/components/schemas/Status"},
367 },
368 "required": ["id", "status"],
369 "title": "User",
370 "type": "object",
371 },
372 }, payload
374 def test_pydantic_model_mixed_regular(self) -> None:
375 class Email(pydantic.BaseModel):
376 addr: str
378 class User(pydantic.BaseModel):
379 name: str = ""
380 id: int
381 email: Email
383 broker = self.broker_class()
385 @broker.subscriber("test")
386 async def handle(user: User, description: str = "") -> None: ...
388 schema = self.get_spec(broker).to_jsonable()
390 payload = schema["components"]["schemas"]
392 assert payload == {
393 "Email": {
394 "title": "Email",
395 "type": "object",
396 "properties": {"addr": {"title": "Addr", "type": "string"}},
397 "required": ["addr"],
398 },
399 "User": {
400 "title": "User",
401 "type": "object",
402 "properties": {
403 "name": {"title": "Name", "default": "", "type": "string"},
404 "id": {"title": "Id", "type": "integer"},
405 "email": {"$ref": "#/components/schemas/Email"},
406 },
407 "required": ["id", "email"],
408 },
409 "Handle:Message:Payload": {
410 "title": "Handle:Message:Payload",
411 "type": "object",
412 "properties": {
413 "user": {"$ref": "#/components/schemas/User"},
414 "description": {
415 "title": "Description",
416 "default": "",
417 "type": "string",
418 },
419 },
420 "required": ["user"],
421 },
422 }
424 def test_nested_models_in_union_should_be_in_schemas(self) -> None:
425 """Test that nested Pydantic models in union types are promoted to components/schemas.
427 Fixes issue #2443: Nested Pydantic models are not included in AsyncAPI
428 components/schemas (inplaced instead).
429 """
431 class Email(pydantic.BaseModel):
432 addr: str
434 class User(pydantic.BaseModel):
435 name: str = ""
436 id: int
437 email: Email
439 class Other(pydantic.BaseModel):
440 id: int
442 broker = self.broker_class()
444 @broker.subscriber("test")
445 async def handle(body: User | Other) -> None: ...
447 schema = self.get_spec(broker).to_jsonable()
449 payload = schema["components"]["schemas"]
451 # Check that nested Email model is promoted to components/schemas
452 assert "Email" in payload
453 assert payload["Email"] == {
454 "title": "Email",
455 "type": "object",
456 "properties": {"addr": {"title": "Addr", "type": "string"}},
457 "required": ["addr"],
458 }
460 def test_nested_models_in_publisher_union_should_be_in_schemas(self) -> None:
461 """Test that nested Pydantic models in publisher union types are promoted to components/schemas.
463 Fixes issue #2443: Nested Pydantic models are not included in AsyncAPI
464 components/schemas (inplaced instead).
465 """
467 class Email(pydantic.BaseModel):
468 addr: str
470 class User(pydantic.BaseModel):
471 name: str = ""
472 id: int
473 email: Email
475 class Other(pydantic.BaseModel):
476 id: int
478 broker = self.broker_class()
480 publisher = broker.publisher("test")
482 @publisher
483 def handle0(msg) -> User: ...
485 @publisher
486 def handle1(msg) -> Other: ...
488 schema = self.get_spec(broker).to_jsonable()
490 payload = schema["components"]["schemas"]
492 # Check that nested Email model is promoted to components/schemas
493 assert "Email" in payload
494 assert payload["Email"] == {
495 "title": "Email",
496 "type": "object",
497 "properties": {"addr": {"title": "Addr", "type": "string"}},
498 "required": ["addr"],
499 }
501 def test_pydantic_model_with_example(self) -> None:
502 class User(pydantic.BaseModel):
503 name: str = ""
504 id: int
506 if PYDANTIC_V2: 506 ↛ 513line 506 didn't jump to line 513 because the condition on line 506 was always true
507 model_config = {
508 "json_schema_extra": {"examples": [{"name": "john", "id": 1}]},
509 }
511 else:
513 class Config:
514 schema_extra = {"examples": [{"name": "john", "id": 1}]} # noqa: RUF012
516 broker = self.broker_class()
518 @broker.subscriber("test")
519 async def handle(user: User) -> None: ...
521 schema = self.get_spec(broker).to_jsonable()
523 payload = schema["components"]["schemas"]
525 for key, v in payload.items():
526 assert key == "User"
527 assert v == {
528 "examples": [{"id": 1, "name": "john"}],
529 "properties": {
530 "id": {"title": "Id", "type": "integer"},
531 "name": {"default": "", "title": "Name", "type": "string"},
532 },
533 "required": ["id"],
534 "title": "User",
535 "type": "object",
536 }
538 def test_pydantic_model_with_keyword_property(self) -> None:
539 class TestModel(pydantic.BaseModel):
540 discriminator: int = 0
542 broker = self.broker_class()
544 @broker.subscriber("test")
545 async def handle(model: TestModel) -> None: ...
547 schema = self.get_spec(broker).to_jsonable()
549 payload = schema["components"]["schemas"]
551 for key, v in payload.items():
552 assert key == "TestModel"
553 assert v == {
554 "properties": {
555 "discriminator": {
556 "default": 0,
557 "title": "Discriminator",
558 "type": "integer",
559 },
560 },
561 "title": key,
562 "type": "object",
563 }
565 def test_ignores_depends(self) -> None:
566 broker = self.broker_class()
568 def dep(name: str = "") -> str:
569 return name
571 def dep2(name2: str) -> str:
572 return name2
574 dependencies = (self.dependency_builder(dep2),)
575 message = self.dependency_builder(dep)
577 @broker.subscriber("test", dependencies=dependencies)
578 async def handle(id: int, message=message) -> None: ...
580 schema = self.get_spec(broker).to_jsonable()
582 payload = schema["components"]["schemas"]
584 for key, v in payload.items():
585 assert key == "Handle:Message:Payload"
586 assert v == {
587 "properties": {
588 "id": {"title": "Id", "type": "integer"},
589 "name": {"default": "", "title": "Name", "type": "string"},
590 "name2": {"title": "Name2", "type": "string"},
591 },
592 "required": ["id", "name2"],
593 "title": key,
594 "type": "object",
595 }, v
597 @pydantic_v2
598 def test_discriminator(self) -> None:
599 class Sub2(pydantic.BaseModel):
600 type: Literal["sub2"]
602 class Sub(pydantic.BaseModel):
603 type: Literal["sub"]
605 broker = self.broker_class()
607 @broker.subscriber("test")
608 async def handle(
609 user: Annotated[Sub2 | Sub, pydantic.Field(discriminator="type")],
610 ): ...
612 schema = self.get_spec(broker).to_jsonable()
614 key = next(iter(schema["components"]["messages"].keys()))
616 assert key == IsStr(regex=r"test[\w:]*:Handle:Message"), key
618 expected_schema = IsPartialDict({
619 "discriminator": "type",
620 "oneOf": [
621 {"$ref": "#/components/schemas/Sub2"},
622 {"$ref": "#/components/schemas/Sub"},
623 ],
624 "title": "Handle:Message:Payload",
625 })
627 fastapi_payload = schema["components"]["schemas"].get("Handle:Message:Payload")
628 if self.is_fastapi:
629 if fastapi_payload: 629 ↛ 637line 629 didn't jump to line 637 because the condition on line 629 was always true
630 assert fastapi_payload == IsPartialDict({
631 "anyOf": [
632 {"$ref": "#/components/schemas/Sub2"},
633 {"$ref": "#/components/schemas/Sub"},
634 ],
635 })
637 expected_schema = (
638 IsPartialDict({"$ref": "#/components/schemas/Handle:Message:Payload"})
639 | expected_schema
640 )
642 assert schema["components"]["messages"][key]["payload"] == expected_schema, (
643 schema["components"]
644 )
646 assert schema["components"]["schemas"] == IsPartialDict({
647 "Sub": {
648 "properties": {
649 "type": IsPartialDict({"const": "sub", "title": "Type"}),
650 },
651 "required": ["type"],
652 "title": "Sub",
653 "type": "object",
654 },
655 "Sub2": {
656 "properties": {
657 "type": IsPartialDict({"const": "sub2", "title": "Type"}),
658 },
659 "required": ["type"],
660 "title": "Sub2",
661 "type": "object",
662 },
663 }), schema["components"]["schemas"]
665 @pydantic_v2
666 def test_nested_discriminator(self) -> None:
667 class Sub2(pydantic.BaseModel):
668 type: Literal["sub2"]
670 class Sub(pydantic.BaseModel):
671 type: Literal["sub"]
673 class Model(pydantic.BaseModel):
674 msg: Sub2 | Sub = pydantic.Field(..., discriminator="type")
676 broker = self.broker_class()
678 @broker.subscriber("test")
679 async def handle(user: Model) -> None: ...
681 schema = self.get_spec(broker).to_jsonable()
683 key = next(iter(schema["components"]["messages"].keys()))
684 assert key == IsStr(regex=r"test[\w:]*:Handle:Message")
685 assert schema["components"] == {
686 "messages": {
687 key: IsPartialDict({
688 "payload": {"$ref": "#/components/schemas/Model"},
689 }),
690 },
691 "schemas": {
692 "Sub": {
693 "properties": {
694 "type": IsPartialDict({"const": "sub", "title": "Type"}),
695 },
696 "required": ["type"],
697 "title": "Sub",
698 "type": "object",
699 },
700 "Sub2": {
701 "properties": {
702 "type": IsPartialDict({"const": "sub2", "title": "Type"}),
703 },
704 "required": ["type"],
705 "title": "Sub2",
706 "type": "object",
707 },
708 "Model": {
709 "properties": {
710 "msg": {
711 "discriminator": "type",
712 "oneOf": [
713 {"$ref": "#/components/schemas/Sub2"},
714 {"$ref": "#/components/schemas/Sub"},
715 ],
716 "title": "Msg",
717 },
718 },
719 "required": ["msg"],
720 "title": "Model",
721 "type": "object",
722 },
723 },
724 }, schema["components"]
726 def test_with_filter(self) -> None:
727 class User(pydantic.BaseModel):
728 name: str = ""
729 id: int
731 broker = self.broker_class()
733 sub = broker.subscriber("test")
735 @sub(
736 filter=lambda m: m.content_type == "application/json",
737 )
738 async def handle(id: int) -> None: ...
740 @sub
741 async def handle_default(msg) -> None: ...
743 schema = self.get_spec(broker).to_jsonable()
745 name, message = next(iter(schema["components"]["messages"].items()))
747 assert name == IsStr(regex=r"test[\w:]*:\[Handle,HandleDefault\]:Message"), name
749 assert len(message["payload"]["oneOf"]) == 2
751 payload = schema["components"]["schemas"]
753 assert "Handle:Message:Payload" in list(payload.keys())
754 assert "HandleDefault:Message:Payload" in list(payload.keys())
757class ArgumentsTestcase(FastAPICompatible):
758 dependency_builder = staticmethod(Depends)
760 def test_pydantic_field(self) -> None:
761 broker = self.broker_class()
763 @broker.subscriber("msg")
764 async def msg(
765 msg: pydantic.PositiveInt = pydantic.Field(
766 1,
767 description="some field",
768 title="Perfect",
769 examples=[1],
770 ),
771 ) -> None: ...
773 schema = self.get_spec(broker).to_jsonable()
775 payload = schema["components"]["schemas"]
777 for key, v in payload.items():
778 assert key == "Perfect"
780 assert v == {
781 "default": 1,
782 "description": "some field",
783 "examples": [1],
784 "exclusiveMinimum": 0,
785 "title": "Perfect",
786 "type": "integer",
787 }
789 def test_ignores_custom_field(self) -> None:
790 broker = self.broker_class()
792 @broker.subscriber("test")
793 async def handle(
794 id: int,
795 user: str | None = None,
796 message=Context(),
797 ) -> None: ...
799 schema = self.get_spec(broker).to_jsonable()
801 payload = schema["components"]["schemas"]
803 for key, v in payload.items():
804 assert v == IsDict(
805 {
806 "properties": {
807 "id": {"title": "Id", "type": "integer"},
808 "user": {
809 "anyOf": [{"type": "string"}, {"type": "null"}],
810 "default": None,
811 "title": "User",
812 },
813 },
814 "required": ["id"],
815 "title": key,
816 "type": "object",
817 },
818 ) | IsDict( # TODO: remove when deprecating PydanticV1
819 {
820 "properties": {
821 "id": {"title": "Id", "type": "integer"},
822 "user": {"title": "User", "type": "string"},
823 },
824 "required": ["id"],
825 "title": "Handle:Message:Payload",
826 "type": "object",
827 },
828 )
830 @pytest.mark.skipif(
831 sys.version_info >= (3, 14),
832 reason="Python 3.14 disallows redefining a class with the same name",
833 )
834 def test_overwrite_schema(self) -> None:
835 @dataclass
836 class User:
837 id: int
838 name: str = ""
840 broker = self.broker_class()
842 @broker.subscriber("test")
843 async def handle(user: User) -> None: ...
845 @dataclass
846 class User:
847 id: int
848 email: str = ""
850 @broker.subscriber("test2")
851 async def second_handle(user: User) -> None: ...
853 with pytest.warns(
854 RuntimeWarning,
855 match="Overwriting the message schema, data types have the same name",
856 ):
857 schema = self.get_spec(broker).to_jsonable()
859 payload = schema["components"]["schemas"]
861 key, value = next(iter(payload.items()))
863 assert key == "User"
864 assert value == {
865 "properties": IsDict({
866 "id": {"title": "Id", "type": "integer"},
867 "email": {"default": "", "title": "Email", "type": "string"},
868 })
869 | IsDict({
870 "id": {"title": "Id", "type": "integer"},
871 "name": {"default": "", "title": "Name", "type": "string"},
872 }),
873 "required": ["id"],
874 "title": key,
875 "type": "object",
876 }