Coverage for tests / asyncapi / base / v3_0_0 / arguments.py: 98%

270 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import sys 

2from dataclasses import dataclass 

3from enum import Enum 

4from typing import Annotated, Literal 

5 

6import pydantic 

7import pytest 

8from dirty_equals import IsDict, IsPartialDict, IsStr 

9from fast_depends import Depends 

10from fastapi import Depends as APIDepends 

11 

12from faststream import Context 

13from faststream._internal._compat import PYDANTIC_V2 

14from faststream._internal.broker import BrokerUsecase 

15from faststream._internal.fastapi import StreamRouter 

16from tests.marks import pydantic_v2 

17 

18from .basic import AsyncAPI300Factory 

19 

20 

21class FastAPICompatible(AsyncAPI300Factory): 

22 is_fastapi: bool = False 

23 

24 broker_class: BrokerUsecase | StreamRouter 

25 dependency_builder = staticmethod(APIDepends) 

26 

27 def test_default_naming(self) -> None: 

28 broker = self.broker_class() 

29 

30 @broker.subscriber("test") 

31 async def handle(msg) -> None: ... 

32 

33 schema = self.get_spec(broker).to_jsonable() 

34 

35 channel_key = tuple(schema["channels"].keys())[0] # noqa: RUF015 

36 operation_key = tuple(schema["operations"].keys())[0] # noqa: RUF015 

37 

38 assert channel_key == IsStr(regex=r"test[\w:]*:Handle"), channel_key 

39 assert operation_key == IsStr(regex=r"test[\w:]*:HandleSubscribe"), operation_key 

40 

41 def test_custom_naming(self) -> None: 

42 broker = self.broker_class() 

43 

44 @broker.subscriber("test", title="custom_name", description="test description") 

45 async def handle(msg) -> None: ... 

46 

47 schema = self.get_spec(broker).to_jsonable() 

48 

49 channel_key = tuple(schema["channels"].keys())[0] # noqa: RUF015 

50 operation_key = tuple(schema["operations"].keys())[0] # noqa: RUF015 

51 

52 assert channel_key == "custom_name" 

53 assert operation_key == "custom_name" 

54 

55 assert schema["channels"][channel_key]["description"] == "test description" 

56 

57 def test_slash_in_title(self) -> None: 

58 broker = self.broker_class() 

59 

60 @broker.subscriber("test", title="/") 

61 async def handle(msg) -> None: ... 

62 

63 schema = self.get_spec(broker).to_jsonable() 

64 

65 channel_key = tuple(schema["channels"].keys())[0] # noqa: RUF015 

66 operation_key = tuple(schema["operations"].keys())[0] # noqa: RUF015 

67 

68 assert channel_key == "." 

69 assert schema["channels"][channel_key]["address"] == "/" 

70 

71 assert operation_key == ".Subscribe" 

72 

73 assert next(iter(schema["components"]["messages"].keys())) == ".:SubscribeMessage" 

74 assert ( 

75 schema["components"]["messages"][".:SubscribeMessage"]["title"] 

76 == "/:SubscribeMessage" 

77 ) 

78 

79 assert next(iter(schema["components"]["schemas"].keys())) == ".:Message:Payload" 

80 assert ( 

81 schema["components"]["schemas"][".:Message:Payload"]["title"] 

82 == "/:Message:Payload" 

83 ) 

84 

85 def test_docstring_description(self) -> None: 

86 broker = self.broker_class() 

87 

88 @broker.subscriber("test", title="custom_name") 

89 async def handle(msg) -> None: 

90 """Test description.""" 

91 

92 schema = self.get_spec(broker).to_jsonable() 

93 key = tuple(schema["channels"].keys())[0] # noqa: RUF015 

94 

95 assert key == "custom_name" 

96 assert schema["channels"][key]["description"] == "Test description.", schema[ 

97 "channels" 

98 ][key]["description"] 

99 

100 def test_empty(self) -> None: 

101 broker = self.broker_class() 

102 

103 @broker.subscriber("test") 

104 async def handle() -> None: ... 

105 

106 schema = self.get_spec(broker).to_jsonable() 

107 

108 payload = schema["components"]["schemas"] 

109 

110 for key, v in payload.items(): 

111 assert key == "EmptyPayload" 

112 assert v == { 

113 "title": key, 

114 "type": "null", 

115 } 

116 

117 def test_no_type(self) -> None: 

118 broker = self.broker_class() 

119 

120 @broker.subscriber("test") 

121 async def handle(msg) -> None: ... 

122 

123 schema = self.get_spec(broker).to_jsonable() 

124 

125 payload = schema["components"]["schemas"] 

126 

127 for key, v in payload.items(): 

128 assert key == "Handle:Message:Payload" 

129 assert v == {"title": key} 

130 

131 def test_simple_type(self) -> None: 

132 broker = self.broker_class() 

133 

134 @broker.subscriber("test") 

135 async def handle(msg: int) -> None: ... 

136 

137 schema = self.get_spec(broker).to_jsonable() 

138 

139 payload = schema["components"]["schemas"] 

140 assert next(iter(schema["channels"].values())).get("description") is None 

141 

142 for key, v in payload.items(): 

143 assert key == "Handle:Message:Payload" 

144 assert v == {"title": key, "type": "integer"} 

145 

146 def test_simple_optional_type(self) -> None: 

147 broker = self.broker_class() 

148 

149 @broker.subscriber("test") 

150 async def handle(msg: int | None) -> None: ... 

151 

152 schema = self.get_spec(broker).to_jsonable() 

153 

154 payload = schema["components"]["schemas"] 

155 

156 for key, v in payload.items(): 

157 assert key == "Handle:Message:Payload" 

158 assert v == IsDict( 

159 { 

160 "anyOf": [{"type": "integer"}, {"type": "null"}], 

161 "title": key, 

162 }, 

163 ) | IsDict( 

164 { # TODO: remove when deprecating PydanticV1 

165 "title": key, 

166 "type": "integer", 

167 }, 

168 ), v 

169 

170 def test_simple_type_with_default(self) -> None: 

171 broker = self.broker_class() 

172 

173 @broker.subscriber("test") 

174 async def handle(msg: int = 1) -> None: ... 

175 

176 schema = self.get_spec(broker).to_jsonable() 

177 

178 payload = schema["components"]["schemas"] 

179 

180 for key, v in payload.items(): 

181 assert key == "Handle:Message:Payload" 

182 assert v == { 

183 "default": 1, 

184 "title": key, 

185 "type": "integer", 

186 } 

187 

188 def test_multi_args_no_type(self) -> None: 

189 broker = self.broker_class() 

190 

191 @broker.subscriber("test") 

192 async def handle(msg, another) -> None: ... 

193 

194 schema = self.get_spec(broker).to_jsonable() 

195 

196 payload = schema["components"]["schemas"] 

197 

198 for key, v in payload.items(): 

199 assert key == "Handle:Message:Payload" 

200 assert v == { 

201 "properties": { 

202 "another": {"title": "Another"}, 

203 "msg": {"title": "Msg"}, 

204 }, 

205 "required": ["msg", "another"], 

206 "title": key, 

207 "type": "object", 

208 } 

209 

210 def test_multi_args_with_type(self) -> None: 

211 broker = self.broker_class() 

212 

213 @broker.subscriber("test") 

214 async def handle(msg: str, another: int) -> None: ... 

215 

216 schema = self.get_spec(broker).to_jsonable() 

217 

218 payload = schema["components"]["schemas"] 

219 

220 for key, v in payload.items(): 

221 assert key == "Handle:Message:Payload" 

222 assert v == { 

223 "properties": { 

224 "another": {"title": "Another", "type": "integer"}, 

225 "msg": {"title": "Msg", "type": "string"}, 

226 }, 

227 "required": ["msg", "another"], 

228 "title": key, 

229 "type": "object", 

230 } 

231 

232 def test_multi_args_with_default(self) -> None: 

233 broker = self.broker_class() 

234 

235 @broker.subscriber("test") 

236 async def handle(msg: str, another: int | None = None) -> None: ... 

237 

238 schema = self.get_spec(broker).to_jsonable() 

239 

240 payload = schema["components"]["schemas"] 

241 

242 for key, v in payload.items(): 

243 assert key == "Handle:Message:Payload" 

244 

245 assert v == { 

246 "properties": { 

247 "another": IsDict( 

248 { 

249 "anyOf": [{"type": "integer"}, {"type": "null"}], 

250 "default": None, 

251 "title": "Another", 

252 }, 

253 ) 

254 | IsDict( 

255 { # TODO: remove when deprecating PydanticV1 

256 "title": "Another", 

257 "type": "integer", 

258 }, 

259 ), 

260 "msg": {"title": "Msg", "type": "string"}, 

261 }, 

262 "required": ["msg"], 

263 "title": key, 

264 "type": "object", 

265 } 

266 

267 def test_dataclass(self) -> None: 

268 @dataclass 

269 class User: 

270 id: int 

271 name: str = "" 

272 

273 broker = self.broker_class() 

274 

275 @broker.subscriber("test") 

276 async def handle(user: User) -> None: ... 

277 

278 schema = self.get_spec(broker).to_jsonable() 

279 

280 payload = schema["components"]["schemas"] 

281 

282 for key, v in payload.items(): 

283 assert key == "User" 

284 assert v == { 

285 "properties": { 

286 "id": {"title": "Id", "type": "integer"}, 

287 "name": {"default": "", "title": "Name", "type": "string"}, 

288 }, 

289 "required": ["id"], 

290 "title": key, 

291 "type": "object", 

292 } 

293 

294 def test_pydantic_model(self) -> None: 

295 class User(pydantic.BaseModel): 

296 name: str = "" 

297 id: int 

298 

299 broker = self.broker_class() 

300 

301 @broker.subscriber("test") 

302 async def handle(user: User) -> None: ... 

303 

304 schema = self.get_spec(broker).to_jsonable() 

305 

306 payload = schema["components"]["schemas"] 

307 

308 for key, v in payload.items(): 

309 assert key == "User" 

310 assert v == { 

311 "properties": { 

312 "id": {"title": "Id", "type": "integer"}, 

313 "name": {"default": "", "title": "Name", "type": "string"}, 

314 }, 

315 "required": ["id"], 

316 "title": key, 

317 "type": "object", 

318 } 

319 

320 def test_pydantic_model_with_enum(self) -> None: 

321 class Status(str, Enum): 

322 registered = "registered" 

323 banned = "banned" 

324 

325 class User(pydantic.BaseModel): 

326 name: str = "" 

327 id: int 

328 status: Status 

329 

330 broker = self.broker_class() 

331 

332 @broker.subscriber("test") 

333 async def handle(user: User) -> None: ... 

334 

335 schema = self.get_spec(broker).to_jsonable() 

336 

337 payload = schema["components"]["schemas"] 

338 

339 assert payload == { 

340 "Status": IsPartialDict( 

341 { 

342 "enum": ["registered", "banned"], 

343 "title": "Status", 

344 "type": "string", 

345 }, 

346 ), 

347 "User": { 

348 "properties": { 

349 "id": {"title": "Id", "type": "integer"}, 

350 "name": {"default": "", "title": "Name", "type": "string"}, 

351 "status": {"$ref": "#/components/schemas/Status"}, 

352 }, 

353 "required": ["id", "status"], 

354 "title": "User", 

355 "type": "object", 

356 }, 

357 }, payload 

358 

359 def test_pydantic_model_mixed_regular(self) -> None: 

360 class Email(pydantic.BaseModel): 

361 addr: str 

362 

363 class User(pydantic.BaseModel): 

364 name: str = "" 

365 id: int 

366 email: Email 

367 

368 broker = self.broker_class() 

369 

370 @broker.subscriber("test") 

371 async def handle(user: User, description: str = "") -> None: ... 

372 

373 schema = self.get_spec(broker).to_jsonable() 

374 

375 payload = schema["components"]["schemas"] 

376 

377 assert payload == { 

378 "Email": { 

379 "title": "Email", 

380 "type": "object", 

381 "properties": {"addr": {"title": "Addr", "type": "string"}}, 

382 "required": ["addr"], 

383 }, 

384 "User": { 

385 "title": "User", 

386 "type": "object", 

387 "properties": { 

388 "name": {"title": "Name", "default": "", "type": "string"}, 

389 "id": {"title": "Id", "type": "integer"}, 

390 "email": {"$ref": "#/components/schemas/Email"}, 

391 }, 

392 "required": ["id", "email"], 

393 }, 

394 "Handle:Message:Payload": { 

395 "title": "Handle:Message:Payload", 

396 "type": "object", 

397 "properties": { 

398 "user": {"$ref": "#/components/schemas/User"}, 

399 "description": { 

400 "title": "Description", 

401 "default": "", 

402 "type": "string", 

403 }, 

404 }, 

405 "required": ["user"], 

406 }, 

407 } 

408 

409 def test_nested_models_in_union_should_be_in_schemas(self) -> None: 

410 """Test that nested Pydantic models in union types are promoted to components/schemas. 

411 

412 Fixes issue #2443: Nested Pydantic models are not included in AsyncAPI 

413 components/schemas (inplaced instead). 

414 """ 

415 

416 class Email(pydantic.BaseModel): 

417 addr: str 

418 

419 class User(pydantic.BaseModel): 

420 name: str = "" 

421 id: int 

422 email: Email 

423 

424 class Other(pydantic.BaseModel): 

425 id: int 

426 

427 broker = self.broker_class() 

428 

429 @broker.subscriber("test") 

430 async def handle(body: User | Other) -> None: ... 

431 

432 schema = self.get_spec(broker).to_jsonable() 

433 

434 payload = schema["components"]["schemas"] 

435 

436 # Check that nested Email model is promoted to components/schemas 

437 assert "Email" in payload 

438 assert payload["Email"] == { 

439 "title": "Email", 

440 "type": "object", 

441 "properties": {"addr": {"title": "Addr", "type": "string"}}, 

442 "required": ["addr"], 

443 } 

444 

445 def test_nested_models_in_publisher_union_should_be_in_schemas(self) -> None: 

446 """Test that nested Pydantic models in publisher union types are promoted to components/schemas. 

447 

448 Fixes issue #2443: Nested Pydantic models are not included in AsyncAPI 

449 components/schemas (inplaced instead). 

450 """ 

451 

452 class Email(pydantic.BaseModel): 

453 addr: str 

454 

455 class User(pydantic.BaseModel): 

456 name: str = "" 

457 id: int 

458 email: Email 

459 

460 class Other(pydantic.BaseModel): 

461 id: int 

462 

463 broker = self.broker_class() 

464 

465 publisher = broker.publisher("test") 

466 

467 @publisher 

468 def handle0(msg) -> User: ... 

469 

470 @publisher 

471 def handle1(msg) -> Other: ... 

472 

473 schema = self.get_spec(broker).to_jsonable() 

474 

475 payload = schema["components"]["schemas"] 

476 

477 # Check that nested Email model is promoted to components/schemas 

478 assert "Email" in payload 

479 assert payload["Email"] == { 

480 "title": "Email", 

481 "type": "object", 

482 "properties": {"addr": {"title": "Addr", "type": "string"}}, 

483 "required": ["addr"], 

484 } 

485 

486 def test_pydantic_model_with_example(self) -> None: 

487 class User(pydantic.BaseModel): 

488 name: str = "" 

489 id: int 

490 

491 if PYDANTIC_V2: 491 ↛ 498line 491 didn't jump to line 498 because the condition on line 491 was always true

492 model_config = { 

493 "json_schema_extra": {"examples": [{"name": "john", "id": 1}]}, 

494 } 

495 

496 else: 

497 

498 class Config: 

499 schema_extra = {"examples": [{"name": "john", "id": 1}]} # noqa: RUF012 

500 

501 broker = self.broker_class() 

502 

503 @broker.subscriber("test") 

504 async def handle(user: User) -> None: ... 

505 

506 schema = self.get_spec(broker).to_jsonable() 

507 

508 payload = schema["components"]["schemas"] 

509 

510 for key, v in payload.items(): 

511 assert key == "User" 

512 assert v == { 

513 "examples": [{"id": 1, "name": "john"}], 

514 "properties": { 

515 "id": {"title": "Id", "type": "integer"}, 

516 "name": {"default": "", "title": "Name", "type": "string"}, 

517 }, 

518 "required": ["id"], 

519 "title": "User", 

520 "type": "object", 

521 } 

522 

523 def test_with_filter(self) -> None: 

524 class User(pydantic.BaseModel): 

525 name: str = "" 

526 id: int 

527 

528 broker = self.broker_class() 

529 

530 sub = broker.subscriber("test") 

531 

532 @sub( # pragma: no branch 

533 filter=lambda m: m.content_type == "application/json", 

534 ) 

535 async def handle(id: int) -> None: ... 

536 

537 @sub 

538 async def handle_default(msg) -> None: ... 

539 

540 schema = self.get_spec(broker).to_jsonable() 

541 

542 assert ( 

543 len( 

544 next(iter(schema["components"]["messages"].values()))["payload"]["oneOf"], 

545 ) 

546 == 2 

547 ) 

548 

549 payload = schema["components"]["schemas"] 

550 

551 assert "Handle:Message:Payload" in list(payload.keys()) 

552 assert "HandleDefault:Message:Payload" in list(payload.keys()) 

553 

554 def test_ignores_depends(self) -> None: 

555 broker = self.broker_class() 

556 

557 def dep(name: str = ""): 

558 return name 

559 

560 def dep2(name2: str): 

561 return name2 

562 

563 dependencies = (self.dependency_builder(dep2),) 

564 message = self.dependency_builder(dep) 

565 

566 @broker.subscriber("test", dependencies=dependencies) 

567 async def handle(id: int, message=message) -> None: ... 

568 

569 schema = self.get_spec(broker).to_jsonable() 

570 

571 payload = schema["components"]["schemas"] 

572 

573 for key, v in payload.items(): 

574 assert key == "Handle:Message:Payload" 

575 assert v == { 

576 "properties": { 

577 "id": {"title": "Id", "type": "integer"}, 

578 "name": {"default": "", "title": "Name", "type": "string"}, 

579 "name2": {"title": "Name2", "type": "string"}, 

580 }, 

581 "required": ["id", "name2"], 

582 "title": key, 

583 "type": "object", 

584 }, v 

585 

586 @pydantic_v2 

587 def test_discriminator(self) -> None: 

588 class Sub2(pydantic.BaseModel): 

589 type: Literal["sub2"] 

590 

591 class Sub(pydantic.BaseModel): 

592 type: Literal["sub"] 

593 

594 broker = self.broker_class() 

595 

596 @broker.subscriber("test") 

597 async def handle( 

598 user: Annotated[Sub2 | Sub, pydantic.Field(discriminator="type")], 

599 ): ... 

600 

601 schema = self.get_spec(broker).to_jsonable() 

602 

603 key = next(iter(schema["components"]["messages"].keys())) 

604 

605 assert key == IsStr(regex=r"test[\w:]*:Handle:SubscribeMessage"), key 

606 

607 p = schema["components"]["messages"][key]["payload"] 

608 assert p == IsPartialDict({ 

609 "$ref": "#/components/schemas/Handle:Message:Payload", 

610 }), p 

611 

612 assert schema["components"]["schemas"] == IsPartialDict({ 

613 "Sub": { 

614 "properties": { 

615 "type": IsPartialDict({"const": "sub", "title": "Type"}), 

616 }, 

617 "required": ["type"], 

618 "title": "Sub", 

619 "type": "object", 

620 }, 

621 "Sub2": { 

622 "properties": { 

623 "type": IsPartialDict({"const": "sub2", "title": "Type"}), 

624 }, 

625 "required": ["type"], 

626 "title": "Sub2", 

627 "type": "object", 

628 }, 

629 }), schema["components"]["schemas"] 

630 

631 payload = schema["components"]["schemas"].get("Handle:Message:Payload") 

632 

633 discriminator_payload = IsPartialDict({ 

634 "discriminator": "type", 

635 "oneOf": [ 

636 {"$ref": "#/components/schemas/Sub2"}, 

637 {"$ref": "#/components/schemas/Sub"}, 

638 ], 

639 "title": "Handle:Message:Payload", 

640 }) 

641 

642 if self.is_fastapi: 

643 assert ( 

644 payload 

645 == IsPartialDict({ 

646 "anyOf": [ 

647 {"$ref": "#/components/schemas/Sub2"}, 

648 {"$ref": "#/components/schemas/Sub"}, 

649 ], 

650 }) 

651 | discriminator_payload 

652 ), payload 

653 

654 else: 

655 assert payload == discriminator_payload 

656 

657 @pydantic_v2 

658 def test_nested_discriminator(self) -> None: 

659 class Sub2(pydantic.BaseModel): 

660 type: Literal["sub2"] 

661 

662 class Sub(pydantic.BaseModel): 

663 type: Literal["sub"] 

664 

665 class Model(pydantic.BaseModel): 

666 msg: Sub2 | Sub = pydantic.Field(..., discriminator="type") 

667 

668 broker = self.broker_class() 

669 

670 @broker.subscriber("test") 

671 async def handle(user: Model) -> None: ... 

672 

673 schema = self.get_spec(broker).to_jsonable() 

674 

675 key = next(iter(schema["components"]["messages"].keys())) 

676 assert key == IsStr(regex=r"test[\w:]*:Handle:SubscribeMessage") 

677 assert schema["components"] == { 

678 "messages": { 

679 key: { 

680 "title": key, 

681 "correlationId": {"location": "$message.header#/correlation_id"}, 

682 "payload": {"$ref": "#/components/schemas/Model"}, 

683 }, 

684 }, 

685 "schemas": { 

686 "Sub": { 

687 "properties": { 

688 "type": IsPartialDict({"const": "sub", "title": "Type"}), 

689 }, 

690 "required": ["type"], 

691 "title": "Sub", 

692 "type": "object", 

693 }, 

694 "Sub2": { 

695 "properties": { 

696 "type": IsPartialDict({"const": "sub2", "title": "Type"}), 

697 }, 

698 "required": ["type"], 

699 "title": "Sub2", 

700 "type": "object", 

701 }, 

702 "Model": { 

703 "properties": { 

704 "msg": { 

705 "discriminator": "type", 

706 "oneOf": [ 

707 {"$ref": "#/components/schemas/Sub2"}, 

708 {"$ref": "#/components/schemas/Sub"}, 

709 ], 

710 "title": "Msg", 

711 }, 

712 }, 

713 "required": ["msg"], 

714 "title": "Model", 

715 "type": "object", 

716 }, 

717 }, 

718 }, schema["components"] 

719 

720 

721class ArgumentsTestcase(FastAPICompatible): 

722 dependency_builder = staticmethod(Depends) 

723 

724 def test_pydantic_field(self) -> None: 

725 broker = self.broker_class() 

726 

727 @broker.subscriber("msg") 

728 async def msg( 

729 msg: pydantic.PositiveInt = pydantic.Field( 

730 1, 

731 description="some field", 

732 title="Perfect", 

733 examples=[1], 

734 ), 

735 ) -> None: ... 

736 

737 schema = self.get_spec(broker).to_jsonable() 

738 

739 payload = schema["components"]["schemas"] 

740 

741 for key, v in payload.items(): 

742 assert key == "Perfect" 

743 

744 assert v == { 

745 "default": 1, 

746 "description": "some field", 

747 "examples": [1], 

748 "exclusiveMinimum": 0, 

749 "title": "Perfect", 

750 "type": "integer", 

751 } 

752 

753 def test_ignores_custom_field(self) -> None: 

754 broker = self.broker_class() 

755 

756 @broker.subscriber("test") 

757 async def handle( 

758 id: int, 

759 user: str | None = None, 

760 message=Context(), 

761 ) -> None: ... 

762 

763 schema = self.get_spec(broker).to_jsonable() 

764 

765 payload = schema["components"]["schemas"] 

766 

767 for key, v in payload.items(): 

768 assert v == IsDict( 

769 { 

770 "properties": { 

771 "id": {"title": "Id", "type": "integer"}, 

772 "user": { 

773 "anyOf": [{"type": "string"}, {"type": "null"}], 

774 "default": None, 

775 "title": "User", 

776 }, 

777 }, 

778 "required": ["id"], 

779 "title": key, 

780 "type": "object", 

781 }, 

782 ) | IsDict( # TODO: remove when deprecating PydanticV1 

783 { 

784 "properties": { 

785 "id": {"title": "Id", "type": "integer"}, 

786 "user": {"title": "User", "type": "string"}, 

787 }, 

788 "required": ["id"], 

789 "title": "Handle:Message:Payload", 

790 "type": "object", 

791 }, 

792 ) 

793 

794 @pytest.mark.skipif( 

795 sys.version_info >= (3, 14), 

796 reason="Python 3.14 disallows redefining a class with the same name", 

797 ) 

798 def test_overwrite_schema(self) -> None: 

799 @dataclass 

800 class User: 

801 id: int 

802 name: str = "" 

803 

804 broker = self.broker_class() 

805 

806 @broker.subscriber("test") 

807 async def handle(user: User) -> None: ... 

808 

809 @dataclass 

810 class User: 

811 id: int 

812 email: str = "" 

813 

814 @broker.subscriber("test2") 

815 async def second_handle(user: User) -> None: ... 

816 

817 with pytest.warns( 

818 RuntimeWarning, 

819 match="Overwriting the message schema, data types have the same name", 

820 ): 

821 schema = self.get_spec(broker).to_jsonable() 

822 

823 payload = schema["components"]["schemas"] 

824 

825 assert len(payload) == 1 

826 

827 key, value = next(iter(payload.items())) 

828 

829 assert key == "User" 

830 assert value == { 

831 "properties": IsDict({ 

832 "id": {"title": "Id", "type": "integer"}, 

833 "email": {"default": "", "title": "Email", "type": "string"}, 

834 }) 

835 | IsDict({ 

836 "id": {"title": "Id", "type": "integer"}, 

837 "name": {"default": "", "title": "Name", "type": "string"}, 

838 }), 

839 "required": ["id"], 

840 "title": key, 

841 "type": "object", 

842 }