Coverage for tests / asyncapi / base / v2_6_0 / arguments.py: 98%

280 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import sys 

2from dataclasses import dataclass, field 

3from enum import Enum 

4from typing import Annotated, Literal 

5 

6import pydantic 

7import pytest 

8from dirty_equals import IsDict, IsPartialDict, IsStr 

9from fast_depends import Depends 

10 

11from faststream import Context 

12from faststream._internal.broker import BrokerUsecase 

13from tests.marks import PYDANTIC_V2, pydantic_v2 

14 

15from .basic import AsyncAPI260Factory 

16 

17 

18class FastAPICompatible(AsyncAPI260Factory): 

19 is_fastapi: bool = False 

20 

21 broker_class: type[BrokerUsecase] 

22 dependency_builder = staticmethod(Depends) 

23 

24 def test_custom_naming(self) -> None: 

25 broker = self.broker_class() 

26 

27 @broker.subscriber("test", title="custom_name", description="test description") 

28 async def handle(msg) -> None: ... 

29 

30 schema = self.get_spec(broker).to_jsonable() 

31 key = tuple(schema["channels"].keys())[0] # noqa: RUF015 

32 

33 assert key == "custom_name" 

34 assert schema["channels"][key]["description"] == "test description" 

35 

36 def test_slash_in_title(self) -> None: 

37 broker = self.broker_class() 

38 

39 @broker.subscriber("test", title="/") 

40 async def handle(msg) -> None: ... 

41 

42 schema = self.get_spec(broker).to_jsonable() 

43 

44 assert next(iter(schema["channels"].keys())) == "/" 

45 

46 assert next(iter(schema["components"]["messages"].keys())) == ".:Message" 

47 assert schema["components"]["messages"][".:Message"]["title"] == "/:Message" 

48 

49 assert next(iter(schema["components"]["schemas"].keys())) == ".:Message:Payload" 

50 assert ( 

51 schema["components"]["schemas"][".:Message:Payload"]["title"] 

52 == "/:Message:Payload" 

53 ) 

54 

55 def test_docstring_description(self) -> None: 

56 broker = self.broker_class() 

57 

58 @broker.subscriber("test", title="custom_name") 

59 async def handle(msg) -> None: 

60 """Test description.""" 

61 

62 schema = self.get_spec(broker).to_jsonable() 

63 key = tuple(schema["channels"].keys())[0] # noqa: RUF015 

64 

65 assert key == "custom_name" 

66 assert schema["channels"][key]["description"] == "Test description.", schema[ 

67 "channels" 

68 ][key] 

69 

70 def test_empty(self) -> None: 

71 broker = self.broker_class() 

72 

73 @broker.subscriber("test") 

74 async def handle() -> None: ... 

75 

76 schema = self.get_spec(broker).to_jsonable() 

77 

78 payload = schema["components"]["schemas"] 

79 

80 for key, v in payload.items(): 

81 assert key == "EmptyPayload" 

82 assert v == { 

83 "title": key, 

84 "type": "null", 

85 } 

86 

87 def test_no_type(self) -> None: 

88 broker = self.broker_class() 

89 

90 @broker.subscriber("test") 

91 async def handle(msg) -> None: ... 

92 

93 schema = self.get_spec(broker).to_jsonable() 

94 

95 payload = schema["components"]["schemas"] 

96 

97 for key, v in payload.items(): 

98 assert key == "Handle:Message:Payload" 

99 assert v == {"title": key} 

100 

101 def test_simple_type(self) -> None: 

102 broker = self.broker_class() 

103 

104 @broker.subscriber("test") 

105 async def handle(msg: int) -> None: ... 

106 

107 schema = self.get_spec(broker).to_jsonable() 

108 

109 payload = schema["components"]["schemas"] 

110 assert next(iter(schema["channels"].values())).get("description") is None 

111 

112 for key, v in payload.items(): 

113 assert key == "Handle:Message:Payload" 

114 assert v == {"title": key, "type": "integer"} 

115 

116 def test_simple_optional_type(self) -> None: 

117 broker = self.broker_class() 

118 

119 @broker.subscriber("test") 

120 async def handle(msg: int | None) -> None: ... 

121 

122 schema = self.get_spec(broker).to_jsonable() 

123 

124 payload = schema["components"]["schemas"] 

125 

126 for key, v in payload.items(): 

127 assert key == "Handle:Message:Payload" 

128 assert v == IsDict( 

129 { 

130 "anyOf": [{"type": "integer"}, {"type": "null"}], 

131 "title": key, 

132 }, 

133 ) | IsDict( 

134 { # TODO: remove when deprecating PydanticV1 

135 "title": key, 

136 "type": "integer", 

137 }, 

138 ), v 

139 

140 def test_simple_type_with_default(self) -> None: 

141 broker = self.broker_class() 

142 

143 @broker.subscriber("test") 

144 async def handle(msg: int = 1) -> None: ... 

145 

146 schema = self.get_spec(broker).to_jsonable() 

147 

148 payload = schema["components"]["schemas"] 

149 

150 for key, v in payload.items(): 

151 assert key == "Handle:Message:Payload" 

152 assert v == { 

153 "default": 1, 

154 "title": key, 

155 "type": "integer", 

156 } 

157 

158 def test_multi_args_no_type(self) -> None: 

159 broker = self.broker_class() 

160 

161 @broker.subscriber("test") 

162 async def handle(msg, another) -> None: ... 

163 

164 schema = self.get_spec(broker).to_jsonable() 

165 

166 payload = schema["components"]["schemas"] 

167 

168 for key, v in payload.items(): 

169 assert key == "Handle:Message:Payload" 

170 assert v == { 

171 "properties": { 

172 "another": {"title": "Another"}, 

173 "msg": {"title": "Msg"}, 

174 }, 

175 "required": ["msg", "another"], 

176 "title": key, 

177 "type": "object", 

178 } 

179 

180 def test_multi_args_with_type(self) -> None: 

181 broker = self.broker_class() 

182 

183 @broker.subscriber("test") 

184 async def handle(msg: str, another: int) -> None: ... 

185 

186 schema = self.get_spec(broker).to_jsonable() 

187 

188 payload = schema["components"]["schemas"] 

189 

190 for key, v in payload.items(): 

191 assert key == "Handle:Message:Payload" 

192 assert v == { 

193 "properties": { 

194 "another": {"title": "Another", "type": "integer"}, 

195 "msg": {"title": "Msg", "type": "string"}, 

196 }, 

197 "required": ["msg", "another"], 

198 "title": key, 

199 "type": "object", 

200 } 

201 

202 def test_multi_args_with_default(self) -> None: 

203 broker = self.broker_class() 

204 

205 @broker.subscriber("test") 

206 async def handle(msg: str, another: int | None = None) -> None: ... 

207 

208 schema = self.get_spec(broker).to_jsonable() 

209 

210 payload = schema["components"]["schemas"] 

211 

212 for key, v in payload.items(): 

213 assert key == "Handle:Message:Payload" 

214 

215 assert v == { 

216 "properties": { 

217 "another": IsDict( 

218 { 

219 "anyOf": [{"type": "integer"}, {"type": "null"}], 

220 "default": None, 

221 "title": "Another", 

222 }, 

223 ) 

224 | IsDict( 

225 { # TODO: remove when deprecating PydanticV1 

226 "title": "Another", 

227 "type": "integer", 

228 }, 

229 ), 

230 "msg": {"title": "Msg", "type": "string"}, 

231 }, 

232 "required": ["msg"], 

233 "title": key, 

234 "type": "object", 

235 } 

236 

237 def test_dataclass(self) -> None: 

238 @dataclass 

239 class User: 

240 id: int 

241 name: str = "" 

242 

243 broker = self.broker_class() 

244 

245 @broker.subscriber("test") 

246 async def handle(user: User) -> None: ... 

247 

248 schema = self.get_spec(broker).to_jsonable() 

249 

250 payload = schema["components"]["schemas"] 

251 

252 for key, v in payload.items(): 

253 assert key == "User" 

254 assert v == { 

255 "properties": { 

256 "id": {"title": "Id", "type": "integer"}, 

257 "name": {"default": "", "title": "Name", "type": "string"}, 

258 }, 

259 "required": ["id"], 

260 "title": key, 

261 "type": "object", 

262 } 

263 

264 def test_dataclasses_nested(self): 

265 @dataclass 

266 class Product: 

267 id: int 

268 name: str = "" 

269 

270 @dataclass 

271 class Order: 

272 id: int 

273 products: list[Product] = field(default_factory=list) 

274 

275 broker = self.broker_class() 

276 

277 @broker.subscriber("test") 

278 async def handle(order: Order): ... 

279 

280 schema = self.get_spec(broker).to_jsonable() 

281 

282 payload = schema["components"]["schemas"] 

283 

284 assert payload == { 

285 "Product": { 

286 "properties": { 

287 "id": {"title": "Id", "type": "integer"}, 

288 "name": {"default": "", "title": "Name", "type": "string"}, 

289 }, 

290 "required": ["id"], 

291 "title": "Product", 

292 "type": "object", 

293 }, 

294 "Order": { 

295 "properties": { 

296 "id": {"title": "Id", "type": "integer"}, 

297 "products": { 

298 "items": {"$ref": "#/components/schemas/Product"}, 

299 "title": "Products", 

300 "type": "array", 

301 }, 

302 }, 

303 "required": ["id"], 

304 "title": "Order", 

305 "type": "object", 

306 }, 

307 } 

308 

309 def test_pydantic_model(self): 

310 class User(pydantic.BaseModel): 

311 name: str = "" 

312 id: int 

313 

314 broker = self.broker_class() 

315 

316 @broker.subscriber("test") 

317 async def handle(user: User) -> None: ... 

318 

319 schema = self.get_spec(broker).to_jsonable() 

320 

321 payload = schema["components"]["schemas"] 

322 

323 for key, v in payload.items(): 

324 assert key == "User" 

325 assert v == { 

326 "properties": { 

327 "id": {"title": "Id", "type": "integer"}, 

328 "name": {"default": "", "title": "Name", "type": "string"}, 

329 }, 

330 "required": ["id"], 

331 "title": key, 

332 "type": "object", 

333 } 

334 

335 def test_pydantic_model_with_enum(self) -> None: 

336 class Status(str, Enum): 

337 registered = "registered" 

338 banned = "banned" 

339 

340 class User(pydantic.BaseModel): 

341 name: str = "" 

342 id: int 

343 status: Status 

344 

345 broker = self.broker_class() 

346 

347 @broker.subscriber("test") 

348 async def handle(user: User) -> None: ... 

349 

350 schema = self.get_spec(broker).to_jsonable() 

351 

352 payload = schema["components"]["schemas"] 

353 

354 assert payload == { 

355 "Status": IsPartialDict( 

356 { 

357 "enum": ["registered", "banned"], 

358 "title": "Status", 

359 "type": "string", 

360 }, 

361 ), 

362 "User": { 

363 "properties": { 

364 "id": {"title": "Id", "type": "integer"}, 

365 "name": {"default": "", "title": "Name", "type": "string"}, 

366 "status": {"$ref": "#/components/schemas/Status"}, 

367 }, 

368 "required": ["id", "status"], 

369 "title": "User", 

370 "type": "object", 

371 }, 

372 }, payload 

373 

374 def test_pydantic_model_mixed_regular(self) -> None: 

375 class Email(pydantic.BaseModel): 

376 addr: str 

377 

378 class User(pydantic.BaseModel): 

379 name: str = "" 

380 id: int 

381 email: Email 

382 

383 broker = self.broker_class() 

384 

385 @broker.subscriber("test") 

386 async def handle(user: User, description: str = "") -> None: ... 

387 

388 schema = self.get_spec(broker).to_jsonable() 

389 

390 payload = schema["components"]["schemas"] 

391 

392 assert payload == { 

393 "Email": { 

394 "title": "Email", 

395 "type": "object", 

396 "properties": {"addr": {"title": "Addr", "type": "string"}}, 

397 "required": ["addr"], 

398 }, 

399 "User": { 

400 "title": "User", 

401 "type": "object", 

402 "properties": { 

403 "name": {"title": "Name", "default": "", "type": "string"}, 

404 "id": {"title": "Id", "type": "integer"}, 

405 "email": {"$ref": "#/components/schemas/Email"}, 

406 }, 

407 "required": ["id", "email"], 

408 }, 

409 "Handle:Message:Payload": { 

410 "title": "Handle:Message:Payload", 

411 "type": "object", 

412 "properties": { 

413 "user": {"$ref": "#/components/schemas/User"}, 

414 "description": { 

415 "title": "Description", 

416 "default": "", 

417 "type": "string", 

418 }, 

419 }, 

420 "required": ["user"], 

421 }, 

422 } 

423 

424 def test_nested_models_in_union_should_be_in_schemas(self) -> None: 

425 """Test that nested Pydantic models in union types are promoted to components/schemas. 

426 

427 Fixes issue #2443: Nested Pydantic models are not included in AsyncAPI 

428 components/schemas (inplaced instead). 

429 """ 

430 

431 class Email(pydantic.BaseModel): 

432 addr: str 

433 

434 class User(pydantic.BaseModel): 

435 name: str = "" 

436 id: int 

437 email: Email 

438 

439 class Other(pydantic.BaseModel): 

440 id: int 

441 

442 broker = self.broker_class() 

443 

444 @broker.subscriber("test") 

445 async def handle(body: User | Other) -> None: ... 

446 

447 schema = self.get_spec(broker).to_jsonable() 

448 

449 payload = schema["components"]["schemas"] 

450 

451 # Check that nested Email model is promoted to components/schemas 

452 assert "Email" in payload 

453 assert payload["Email"] == { 

454 "title": "Email", 

455 "type": "object", 

456 "properties": {"addr": {"title": "Addr", "type": "string"}}, 

457 "required": ["addr"], 

458 } 

459 

460 def test_nested_models_in_publisher_union_should_be_in_schemas(self) -> None: 

461 """Test that nested Pydantic models in publisher union types are promoted to components/schemas. 

462 

463 Fixes issue #2443: Nested Pydantic models are not included in AsyncAPI 

464 components/schemas (inplaced instead). 

465 """ 

466 

467 class Email(pydantic.BaseModel): 

468 addr: str 

469 

470 class User(pydantic.BaseModel): 

471 name: str = "" 

472 id: int 

473 email: Email 

474 

475 class Other(pydantic.BaseModel): 

476 id: int 

477 

478 broker = self.broker_class() 

479 

480 publisher = broker.publisher("test") 

481 

482 @publisher 

483 def handle0(msg) -> User: ... 

484 

485 @publisher 

486 def handle1(msg) -> Other: ... 

487 

488 schema = self.get_spec(broker).to_jsonable() 

489 

490 payload = schema["components"]["schemas"] 

491 

492 # Check that nested Email model is promoted to components/schemas 

493 assert "Email" in payload 

494 assert payload["Email"] == { 

495 "title": "Email", 

496 "type": "object", 

497 "properties": {"addr": {"title": "Addr", "type": "string"}}, 

498 "required": ["addr"], 

499 } 

500 

501 def test_pydantic_model_with_example(self) -> None: 

502 class User(pydantic.BaseModel): 

503 name: str = "" 

504 id: int 

505 

506 if PYDANTIC_V2: 506 ↛ 513line 506 didn't jump to line 513 because the condition on line 506 was always true

507 model_config = { 

508 "json_schema_extra": {"examples": [{"name": "john", "id": 1}]}, 

509 } 

510 

511 else: 

512 

513 class Config: 

514 schema_extra = {"examples": [{"name": "john", "id": 1}]} # noqa: RUF012 

515 

516 broker = self.broker_class() 

517 

518 @broker.subscriber("test") 

519 async def handle(user: User) -> None: ... 

520 

521 schema = self.get_spec(broker).to_jsonable() 

522 

523 payload = schema["components"]["schemas"] 

524 

525 for key, v in payload.items(): 

526 assert key == "User" 

527 assert v == { 

528 "examples": [{"id": 1, "name": "john"}], 

529 "properties": { 

530 "id": {"title": "Id", "type": "integer"}, 

531 "name": {"default": "", "title": "Name", "type": "string"}, 

532 }, 

533 "required": ["id"], 

534 "title": "User", 

535 "type": "object", 

536 } 

537 

538 def test_pydantic_model_with_keyword_property(self) -> None: 

539 class TestModel(pydantic.BaseModel): 

540 discriminator: int = 0 

541 

542 broker = self.broker_class() 

543 

544 @broker.subscriber("test") 

545 async def handle(model: TestModel) -> None: ... 

546 

547 schema = self.get_spec(broker).to_jsonable() 

548 

549 payload = schema["components"]["schemas"] 

550 

551 for key, v in payload.items(): 

552 assert key == "TestModel" 

553 assert v == { 

554 "properties": { 

555 "discriminator": { 

556 "default": 0, 

557 "title": "Discriminator", 

558 "type": "integer", 

559 }, 

560 }, 

561 "title": key, 

562 "type": "object", 

563 } 

564 

565 def test_ignores_depends(self) -> None: 

566 broker = self.broker_class() 

567 

568 def dep(name: str = "") -> str: 

569 return name 

570 

571 def dep2(name2: str) -> str: 

572 return name2 

573 

574 dependencies = (self.dependency_builder(dep2),) 

575 message = self.dependency_builder(dep) 

576 

577 @broker.subscriber("test", dependencies=dependencies) 

578 async def handle(id: int, message=message) -> None: ... 

579 

580 schema = self.get_spec(broker).to_jsonable() 

581 

582 payload = schema["components"]["schemas"] 

583 

584 for key, v in payload.items(): 

585 assert key == "Handle:Message:Payload" 

586 assert v == { 

587 "properties": { 

588 "id": {"title": "Id", "type": "integer"}, 

589 "name": {"default": "", "title": "Name", "type": "string"}, 

590 "name2": {"title": "Name2", "type": "string"}, 

591 }, 

592 "required": ["id", "name2"], 

593 "title": key, 

594 "type": "object", 

595 }, v 

596 

597 @pydantic_v2 

598 def test_discriminator(self) -> None: 

599 class Sub2(pydantic.BaseModel): 

600 type: Literal["sub2"] 

601 

602 class Sub(pydantic.BaseModel): 

603 type: Literal["sub"] 

604 

605 broker = self.broker_class() 

606 

607 @broker.subscriber("test") 

608 async def handle( 

609 user: Annotated[Sub2 | Sub, pydantic.Field(discriminator="type")], 

610 ): ... 

611 

612 schema = self.get_spec(broker).to_jsonable() 

613 

614 key = next(iter(schema["components"]["messages"].keys())) 

615 

616 assert key == IsStr(regex=r"test[\w:]*:Handle:Message"), key 

617 

618 expected_schema = IsPartialDict({ 

619 "discriminator": "type", 

620 "oneOf": [ 

621 {"$ref": "#/components/schemas/Sub2"}, 

622 {"$ref": "#/components/schemas/Sub"}, 

623 ], 

624 "title": "Handle:Message:Payload", 

625 }) 

626 

627 fastapi_payload = schema["components"]["schemas"].get("Handle:Message:Payload") 

628 if self.is_fastapi: 

629 if fastapi_payload: 629 ↛ 637line 629 didn't jump to line 637 because the condition on line 629 was always true

630 assert fastapi_payload == IsPartialDict({ 

631 "anyOf": [ 

632 {"$ref": "#/components/schemas/Sub2"}, 

633 {"$ref": "#/components/schemas/Sub"}, 

634 ], 

635 }) 

636 

637 expected_schema = ( 

638 IsPartialDict({"$ref": "#/components/schemas/Handle:Message:Payload"}) 

639 | expected_schema 

640 ) 

641 

642 assert schema["components"]["messages"][key]["payload"] == expected_schema, ( 

643 schema["components"] 

644 ) 

645 

646 assert schema["components"]["schemas"] == IsPartialDict({ 

647 "Sub": { 

648 "properties": { 

649 "type": IsPartialDict({"const": "sub", "title": "Type"}), 

650 }, 

651 "required": ["type"], 

652 "title": "Sub", 

653 "type": "object", 

654 }, 

655 "Sub2": { 

656 "properties": { 

657 "type": IsPartialDict({"const": "sub2", "title": "Type"}), 

658 }, 

659 "required": ["type"], 

660 "title": "Sub2", 

661 "type": "object", 

662 }, 

663 }), schema["components"]["schemas"] 

664 

665 @pydantic_v2 

666 def test_nested_discriminator(self) -> None: 

667 class Sub2(pydantic.BaseModel): 

668 type: Literal["sub2"] 

669 

670 class Sub(pydantic.BaseModel): 

671 type: Literal["sub"] 

672 

673 class Model(pydantic.BaseModel): 

674 msg: Sub2 | Sub = pydantic.Field(..., discriminator="type") 

675 

676 broker = self.broker_class() 

677 

678 @broker.subscriber("test") 

679 async def handle(user: Model) -> None: ... 

680 

681 schema = self.get_spec(broker).to_jsonable() 

682 

683 key = next(iter(schema["components"]["messages"].keys())) 

684 assert key == IsStr(regex=r"test[\w:]*:Handle:Message") 

685 assert schema["components"] == { 

686 "messages": { 

687 key: IsPartialDict({ 

688 "payload": {"$ref": "#/components/schemas/Model"}, 

689 }), 

690 }, 

691 "schemas": { 

692 "Sub": { 

693 "properties": { 

694 "type": IsPartialDict({"const": "sub", "title": "Type"}), 

695 }, 

696 "required": ["type"], 

697 "title": "Sub", 

698 "type": "object", 

699 }, 

700 "Sub2": { 

701 "properties": { 

702 "type": IsPartialDict({"const": "sub2", "title": "Type"}), 

703 }, 

704 "required": ["type"], 

705 "title": "Sub2", 

706 "type": "object", 

707 }, 

708 "Model": { 

709 "properties": { 

710 "msg": { 

711 "discriminator": "type", 

712 "oneOf": [ 

713 {"$ref": "#/components/schemas/Sub2"}, 

714 {"$ref": "#/components/schemas/Sub"}, 

715 ], 

716 "title": "Msg", 

717 }, 

718 }, 

719 "required": ["msg"], 

720 "title": "Model", 

721 "type": "object", 

722 }, 

723 }, 

724 }, schema["components"] 

725 

726 def test_with_filter(self) -> None: 

727 class User(pydantic.BaseModel): 

728 name: str = "" 

729 id: int 

730 

731 broker = self.broker_class() 

732 

733 sub = broker.subscriber("test") 

734 

735 @sub( 

736 filter=lambda m: m.content_type == "application/json", 

737 ) 

738 async def handle(id: int) -> None: ... 

739 

740 @sub 

741 async def handle_default(msg) -> None: ... 

742 

743 schema = self.get_spec(broker).to_jsonable() 

744 

745 name, message = next(iter(schema["components"]["messages"].items())) 

746 

747 assert name == IsStr(regex=r"test[\w:]*:\[Handle,HandleDefault\]:Message"), name 

748 

749 assert len(message["payload"]["oneOf"]) == 2 

750 

751 payload = schema["components"]["schemas"] 

752 

753 assert "Handle:Message:Payload" in list(payload.keys()) 

754 assert "HandleDefault:Message:Payload" in list(payload.keys()) 

755 

756 

757class ArgumentsTestcase(FastAPICompatible): 

758 dependency_builder = staticmethod(Depends) 

759 

760 def test_pydantic_field(self) -> None: 

761 broker = self.broker_class() 

762 

763 @broker.subscriber("msg") 

764 async def msg( 

765 msg: pydantic.PositiveInt = pydantic.Field( 

766 1, 

767 description="some field", 

768 title="Perfect", 

769 examples=[1], 

770 ), 

771 ) -> None: ... 

772 

773 schema = self.get_spec(broker).to_jsonable() 

774 

775 payload = schema["components"]["schemas"] 

776 

777 for key, v in payload.items(): 

778 assert key == "Perfect" 

779 

780 assert v == { 

781 "default": 1, 

782 "description": "some field", 

783 "examples": [1], 

784 "exclusiveMinimum": 0, 

785 "title": "Perfect", 

786 "type": "integer", 

787 } 

788 

789 def test_ignores_custom_field(self) -> None: 

790 broker = self.broker_class() 

791 

792 @broker.subscriber("test") 

793 async def handle( 

794 id: int, 

795 user: str | None = None, 

796 message=Context(), 

797 ) -> None: ... 

798 

799 schema = self.get_spec(broker).to_jsonable() 

800 

801 payload = schema["components"]["schemas"] 

802 

803 for key, v in payload.items(): 

804 assert v == IsDict( 

805 { 

806 "properties": { 

807 "id": {"title": "Id", "type": "integer"}, 

808 "user": { 

809 "anyOf": [{"type": "string"}, {"type": "null"}], 

810 "default": None, 

811 "title": "User", 

812 }, 

813 }, 

814 "required": ["id"], 

815 "title": key, 

816 "type": "object", 

817 }, 

818 ) | IsDict( # TODO: remove when deprecating PydanticV1 

819 { 

820 "properties": { 

821 "id": {"title": "Id", "type": "integer"}, 

822 "user": {"title": "User", "type": "string"}, 

823 }, 

824 "required": ["id"], 

825 "title": "Handle:Message:Payload", 

826 "type": "object", 

827 }, 

828 ) 

829 

830 @pytest.mark.skipif( 

831 sys.version_info >= (3, 14), 

832 reason="Python 3.14 disallows redefining a class with the same name", 

833 ) 

834 def test_overwrite_schema(self) -> None: 

835 @dataclass 

836 class User: 

837 id: int 

838 name: str = "" 

839 

840 broker = self.broker_class() 

841 

842 @broker.subscriber("test") 

843 async def handle(user: User) -> None: ... 

844 

845 @dataclass 

846 class User: 

847 id: int 

848 email: str = "" 

849 

850 @broker.subscriber("test2") 

851 async def second_handle(user: User) -> None: ... 

852 

853 with pytest.warns( 

854 RuntimeWarning, 

855 match="Overwriting the message schema, data types have the same name", 

856 ): 

857 schema = self.get_spec(broker).to_jsonable() 

858 

859 payload = schema["components"]["schemas"] 

860 

861 key, value = next(iter(payload.items())) 

862 

863 assert key == "User" 

864 assert value == { 

865 "properties": IsDict({ 

866 "id": {"title": "Id", "type": "integer"}, 

867 "email": {"default": "", "title": "Email", "type": "string"}, 

868 }) 

869 | IsDict({ 

870 "id": {"title": "Id", "type": "integer"}, 

871 "name": {"default": "", "title": "Name", "type": "string"}, 

872 }), 

873 "required": ["id"], 

874 "title": key, 

875 "type": "object", 

876 }