Coverage for faststream / nats / broker / broker.py: 83%

93 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import logging 

2from collections.abc import Iterable, Sequence 

3from typing import ( 

4 TYPE_CHECKING, 

5 Any, 

6 Optional, 

7 Union, 

8 cast, 

9) 

10 

11import anyio 

12import nats 

13from fast_depends import Provider, dependency_provider 

14from nats.aio.client import ( 

15 DEFAULT_CONNECT_TIMEOUT, 

16 DEFAULT_DRAIN_TIMEOUT, 

17 DEFAULT_INBOX_PREFIX, 

18 DEFAULT_MAX_FLUSHER_QUEUE_SIZE, 

19 DEFAULT_MAX_OUTSTANDING_PINGS, 

20 DEFAULT_MAX_RECONNECT_ATTEMPTS, 

21 DEFAULT_PENDING_SIZE, 

22 DEFAULT_PING_INTERVAL, 

23 DEFAULT_RECONNECT_TIME_WAIT, 

24 Client, 

25) 

26from nats.aio.msg import Msg 

27from nats.errors import Error 

28from nats.js.errors import BadRequestError 

29from typing_extensions import overload, override 

30 

31from faststream.__about__ import SERVICE_NAME 

32from faststream._internal.broker import BrokerUsecase 

33from faststream._internal.constants import EMPTY 

34from faststream._internal.context.repository import ContextRepo 

35from faststream._internal.di import FastDependsConfig 

36from faststream.message import gen_cor_id 

37from faststream.middlewares import AckPolicy 

38from faststream.nats.configs import NatsBrokerConfig 

39from faststream.nats.publisher.producer import ( 

40 NatsFastProducerImpl, 

41 NatsJSFastProducer, 

42) 

43from faststream.nats.response import NatsPublishCommand 

44from faststream.nats.security import parse_security 

45from faststream.nats.subscriber.usecases.basic import LogicSubscriber 

46from faststream.response.publish_type import PublishType 

47from faststream.specification.schema import BrokerSpec 

48 

49from .logging import make_nats_logger_state 

50from .registrator import NatsRegistrator 

51 

52if TYPE_CHECKING: 

53 from types import TracebackType 

54 

55 from fast_depends.dependencies import Dependant 

56 from fast_depends.library.serializer import SerializerProto 

57 from nats.aio.client import ( 

58 Callback, 

59 Credentials, 

60 ErrorCallback, 

61 JWTCallback, 

62 SignatureCallback, 

63 ) 

64 from nats.js.api import Placement, RePublish, StorageType 

65 from nats.js.kv import KeyValue 

66 from nats.js.object_store import ObjectStore 

67 from typing_extensions import TypedDict 

68 

69 from faststream._internal.basic_types import LoggerProto, SendableMessage 

70 from faststream._internal.parser import CodecProto 

71 from faststream._internal.types import BrokerMiddleware, CustomCallable 

72 from faststream.nats.configs.broker import JsInitOptions 

73 from faststream.nats.helpers import KVBucketDeclarer, OSBucketDeclarer 

74 from faststream.nats.message import NatsMessage 

75 from faststream.nats.schemas import PubAck, Schedule 

76 from faststream.security import BaseSecurity 

77 from faststream.specification.schema.extra import Tag, TagDict 

78 

79 class NatsInitKwargs(TypedDict, total=False): 

80 """NatsBroker.connect() method type hints. 

81 

82 Args: 

83 error_cb: 

84 Callback to report errors. 

85 disconnected_cb: \ 

86 Callback to report disconnection from NATS. 

87 closed_cb: 

88 Callback to report when client stops reconnection to NATS. 

89 discovered_server_cb: 

90 A callback to report when a new server joins the cluster. 

91 reconnected_cb: 

92 Callback to report success reconnection. 

93 name: 

94 Label the connection with name (shown in NATS monitoring 

95 pedantic: 

96 Turn on NATS server pedantic mode that performs extra checks on the protocol. 

97 https://docs.nats.io/using-nats/developer/connecting/misc#turn-on-pedantic-mode 

98 verbose: 

99 Verbose mode produce more feedback about code execution. 

100 allow_reconnect: 

101 Whether recover connection automatically or not. 

102 connect_timeout: 

103 Timeout in seconds to establish connection with NATS server. 

104 reconnect_time_wait: 

105 Time in seconds to wait for reestablish connection to NATS server 

106 max_reconnect_attempts: 

107 Maximum attempts number to reconnect to NATS server. 

108 ping_interval: 

109 Interval in seconds to ping. 

110 max_outstanding_pings: 

111 Maximum number of failed pings 

112 dont_randomize: 

113 Boolean indicating should client randomly shuffle servers list for reconnection randomness. 

114 flusher_queue_size: 

115 Max count of commands awaiting to be flushed to the socket 

116 no_echo: 

117 Boolean indicating should commands be echoed. 

118 tls_hostname: 

119 Hostname for TLS. 

120 token: 

121 Auth token for NATS auth. 

122 drain_timeout: 

123 Timeout in seconds to drain subscriptions. 

124 signature_cb: 

125 A callback used to sign a nonce from the server while authenticating with nkeys. 

126 The user should sign the nonce and return the base64 encoded signature. 

127 user_jwt_cb: 

128 A callback used to fetch and return the account signed JWT for this user. 

129 user_credentials: 

130 A user credentials file or tuple of files. 

131 nkeys_seed: 

132 Path-like object containing nkeys seed that will be used. 

133 nkeys_seed_str: 

134 Nkeys seed to be used. 

135 inbox_prefix: 

136 Prefix for generating unique inboxes, subjects with that prefix and NUID.ß 

137 pending_size: 

138 Max size of the pending buffer for publishing commands. 

139 flush_timeout: 

140 Max duration to wait for a forced flush to occur 

141 """ 

142 

143 error_cb: "ErrorCallback" | None 

144 disconnected_cb: "Callback" | None 

145 closed_cb: Callback | None 

146 discovered_server_cb: "Callback" | None 

147 reconnected_cb: "Callback" | None 

148 name: str | None 

149 pedantic: bool 

150 verbose: bool 

151 allow_reconnect: bool 

152 connect_timeout: int 

153 reconnect_time_wait: int 

154 max_reconnect_attempts: int 

155 ping_interval: int 

156 max_outstanding_pings: int 

157 dont_randomize: bool 

158 flusher_queue_size: int 

159 no_echo: bool 

160 tls_hostname: str | None 

161 token: str | None 

162 drain_timeout: int 

163 signature_cb: "SignatureCallback" | None 

164 user_jwt_cb: "JWTCallback" | None 

165 user_credentials: "Credentials" | None 

166 nkeys_seed: str | None 

167 nkeys_seed_str: str | None 

168 inbox_prefix: str | bytes 

169 pending_size: int 

170 flush_timeout: float | None 

171 

172 

173class NatsBroker( 

174 NatsRegistrator, 

175 BrokerUsecase[Msg, Client], 

176): 

177 """A class to represent a NATS broker.""" 

178 

179 url: list[str] 

180 

181 def __init__( 

182 self, 

183 servers: str | Iterable[str] = ("nats://localhost:4222",), 

184 *, 

185 error_cb: Optional["ErrorCallback"] = None, 

186 disconnected_cb: Optional["Callback"] = None, 

187 closed_cb: Optional["Callback"] = None, 

188 discovered_server_cb: Optional["Callback"] = None, 

189 reconnected_cb: Optional["Callback"] = None, 

190 name: str | None = SERVICE_NAME, 

191 pedantic: bool = False, 

192 verbose: bool = False, 

193 allow_reconnect: bool = True, 

194 connect_timeout: int = DEFAULT_CONNECT_TIMEOUT, 

195 reconnect_time_wait: int = DEFAULT_RECONNECT_TIME_WAIT, 

196 max_reconnect_attempts: int = DEFAULT_MAX_RECONNECT_ATTEMPTS, 

197 ping_interval: int = DEFAULT_PING_INTERVAL, 

198 max_outstanding_pings: int = DEFAULT_MAX_OUTSTANDING_PINGS, 

199 dont_randomize: bool = False, 

200 flusher_queue_size: int = DEFAULT_MAX_FLUSHER_QUEUE_SIZE, 

201 no_echo: bool = False, 

202 tls_hostname: str | None = None, 

203 token: str | None = None, 

204 drain_timeout: int = DEFAULT_DRAIN_TIMEOUT, 

205 signature_cb: Optional["SignatureCallback"] = None, 

206 user_jwt_cb: Optional["JWTCallback"] = None, 

207 user_credentials: Optional["Credentials"] = None, 

208 nkeys_seed: str | None = None, 

209 nkeys_seed_str: str | None = None, 

210 inbox_prefix: str | bytes = DEFAULT_INBOX_PREFIX, 

211 pending_size: int = DEFAULT_PENDING_SIZE, 

212 flush_timeout: float | None = None, 

213 js_options: Union["JsInitOptions", dict[str, Any], None] = None, 

214 graceful_timeout: float | None = None, 

215 ack_policy: AckPolicy = EMPTY, 

216 decoder: Optional["CustomCallable"] = None, 

217 codec: Optional["CodecProto"] = None, 

218 parser: Optional["CustomCallable"] = None, 

219 dependencies: Iterable["Dependant"] = (), 

220 middlewares: Sequence["BrokerMiddleware[Any, Any]"] = (), 

221 routers: Iterable[NatsRegistrator] = (), 

222 security: Optional["BaseSecurity"] = None, 

223 specification_url: str | Iterable[str] | None = None, 

224 protocol: str | None = "nats", 

225 protocol_version: str | None = "custom", 

226 description: str | None = None, 

227 tags: Iterable[Union["Tag", "TagDict"]] = (), 

228 logger: Optional["LoggerProto"] = EMPTY, 

229 log_level: int = logging.INFO, 

230 apply_types: bool = True, 

231 serializer: Optional["SerializerProto"] = EMPTY, 

232 provider: Optional["Provider"] = None, 

233 context: Optional["ContextRepo"] = None, 

234 ) -> None: 

235 """Initialize the NatsBroker object. 

236 

237 Args: 

238 servers: 

239 NATS cluster addresses to connect. 

240 error_cb: 

241 Callback to report errors. 

242 disconnected_cb: 

243 Callback to report disconnection from NATS. 

244 closed_cb: 

245 Callback to report when client stops reconnection to NATS. 

246 discovered_server_cb: 

247 A callback to report when a new server joins the cluster. 

248 reconnected_cb: 

249 Callback to report success reconnection. 

250 name: 

251 Label the connection with name (shown in NATS monitoring). 

252 pedantic: 

253 Turn on NATS server pedantic mode that performs extra checks on the protocol. 

254 https://docs.nats.io/using-nats/developer/connecting/misc#turn-on-pedantic-mode 

255 verbose: 

256 Verbose mode produce more feedback about code execution. 

257 allow_reconnect: 

258 Whether recover connection automatically or not. 

259 connect_timeout: 

260 Timeout in seconds to establish connection with NATS server. 

261 reconnect_time_wait: 

262 Time in seconds to wait for reestablish connection to NATS server 

263 max_reconnect_attempts: 

264 Maximum attempts number to reconnect to NATS server. 

265 ping_interval: 

266 Interval in seconds to ping. 

267 max_outstanding_pings: 

268 Maximum number of failed pings 

269 dont_randomize: 

270 Boolean indicating should client randomly shuffle servers list for reconnection randomness. 

271 flusher_queue_size: 

272 Max count of commands awaiting to be flushed to the socket 

273 no_echo: 

274 Boolean indicating should commands be echoed. 

275 tls_hostname: 

276 Hostname for TLS. 

277 token: 

278 Auth token for NATS auth. 

279 drain_timeout: 

280 Timeout in seconds to drain subscriptions. 

281 signature_cb: 

282 A callback used to sign a nonce from the server while authenticating with nkeys. 

283 The user should sign the nonce and return the base64 encoded signature. 

284 user_jwt_cb: 

285 A callback used to fetch and return the account signed JWT for this user. 

286 user_credentials: 

287 A user credentials file or tuple of files. 

288 nkeys_seed: 

289 Path-like object containing nkeys seed that will be used. 

290 nkeys_seed_str: 

291 Nkeys seed to be used. 

292 inbox_prefix: 

293 Prefix for generating unique inboxes, subjects with that prefix and NUID.ß 

294 pending_size: 

295 Max size of the pending buffer for publishing commands. 

296 flush_timeout: 

297 Max duration to wait for a forced flush to occur 

298 js_options: 

299 JetStream initialization options. 

300 graceful_timeout: 

301 Graceful shutdown timeout. Broker waits for all running subscribers completion before shut down. 

302 ack_policy: 

303 Default acknowledgement policy for all subscribers. Individual subscribers can override. 

304 decoder: 

305 Custom decoder object. 

306 codec: 

307 Custom codec object. 

308 parser: 

309 Custom parser object. 

310 dependencies: 

311 Dependencies to apply to all broker subscribers. 

312 middlewares: 

313 "Middlewares to apply to all broker publishers/subscribers. 

314 routers: 

315 "Routers to apply to broker. 

316 security: 

317 Security options to connect broker and generate AsyncAPI server security information. 

318 specification_url: 

319 AsyncAPI hardcoded server addresses. Use `servers` if not specified. 

320 protocol: 

321 AsyncAPI server protocol. 

322 protocol_version: 

323 AsyncAPI server protocol version. 

324 description: 

325 AsyncAPI server description. 

326 tags: 

327 AsyncAPI server tags. 

328 logger: 

329 User specified logger to pass into Context and log service messages. 

330 log_level: 

331 Service messages log level. 

332 apply_types: 

333 Whether to use FastDepends or not. 

334 serializer: 

335 FastDepends-compatible serializer to validate incoming messages. 

336 provider: 

337 Provider for FastDepends. 

338 context: 

339 Context for FastDepends. 

340 """ 

341 secure_kwargs = parse_security(security) 

342 

343 servers = [servers] if isinstance(servers, str) else list(servers) 

344 

345 if specification_url is not None: 

346 if isinstance(specification_url, str): 346 ↛ 347line 346 didn't jump to line 347 because the condition on line 346 was never true

347 specification_url = [specification_url] 

348 else: 

349 specification_url = list(specification_url) 

350 else: 

351 specification_url = servers 

352 

353 js_producer = NatsJSFastProducer( 

354 parser=parser, 

355 decoder=decoder, 

356 ) 

357 

358 producer = NatsFastProducerImpl( 

359 parser=parser, 

360 decoder=decoder, 

361 ) 

362 

363 super().__init__( 

364 # NATS options 

365 servers=servers, 

366 name=name, 

367 verbose=verbose, 

368 allow_reconnect=allow_reconnect, 

369 reconnect_time_wait=reconnect_time_wait, 

370 max_reconnect_attempts=max_reconnect_attempts, 

371 no_echo=no_echo, 

372 pedantic=pedantic, 

373 inbox_prefix=inbox_prefix, 

374 pending_size=pending_size, 

375 connect_timeout=connect_timeout, 

376 drain_timeout=drain_timeout, 

377 flush_timeout=flush_timeout, 

378 ping_interval=ping_interval, 

379 max_outstanding_pings=max_outstanding_pings, 

380 dont_randomize=dont_randomize, 

381 flusher_queue_size=flusher_queue_size, 

382 # security 

383 tls_hostname=tls_hostname, 

384 token=token, 

385 user_credentials=user_credentials, 

386 nkeys_seed=nkeys_seed, 

387 nkeys_seed_str=nkeys_seed_str, 

388 **secure_kwargs, 

389 # callbacks 

390 error_cb=self._log_connection_broken(error_cb), 

391 reconnected_cb=self._log_reconnected(reconnected_cb), 

392 disconnected_cb=disconnected_cb, 

393 closed_cb=closed_cb, 

394 discovered_server_cb=discovered_server_cb, 

395 signature_cb=signature_cb, 

396 user_jwt_cb=user_jwt_cb, 

397 # Basic args 

398 routers=routers, 

399 config=NatsBrokerConfig( 

400 producer=producer, 

401 js_producer=js_producer, 

402 js_options=js_options or {}, 

403 # both args 

404 broker_middlewares=middlewares, 

405 broker_parser=parser, 

406 broker_decoder=decoder, 

407 broker_codec=codec, 

408 logger=make_nats_logger_state( 

409 logger=logger, 

410 log_level=log_level, 

411 ), 

412 fd_config=FastDependsConfig( 

413 use_fastdepends=apply_types, 

414 serializer=serializer, 

415 provider=provider or dependency_provider, 

416 context=context or ContextRepo(), 

417 ), 

418 # subscriber args 

419 broker_dependencies=dependencies, 

420 graceful_timeout=graceful_timeout, 

421 ack_policy=ack_policy, 

422 extra_context={ 

423 "broker": self, 

424 }, 

425 ), 

426 specification=BrokerSpec( 

427 description=description, 

428 url=specification_url, 

429 protocol=protocol, 

430 protocol_version=protocol_version, 

431 security=security, 

432 tags=tags, 

433 ), 

434 ) 

435 

436 async def _connect(self) -> "Client": 

437 connection = await nats.connect(**self._connection_kwargs) 

438 self.config.connect(connection) 

439 return connection 

440 

441 async def stop( 

442 self, 

443 exc_type: type[BaseException] | None = None, 

444 exc_val: BaseException | None = None, 

445 exc_tb: Optional["TracebackType"] = None, 

446 ) -> None: 

447 await super().stop(exc_type, exc_val, exc_tb) 

448 

449 if self._connection is not None: 

450 await self._connection.drain() 

451 self._connection = None 

452 

453 self.config.disconnect() 

454 

455 async def start(self) -> None: 

456 """Connect broker to NATS cluster and startup all subscribers.""" 

457 await self.connect() 

458 

459 stream_context = self.config.connection_state.stream 

460 

461 for stream, subjects in filter( 

462 lambda x: x[0].declare, 

463 self._stream_builder.objects.values(), 

464 ): 

465 try: 

466 await stream_context.add_stream( 

467 config=stream.config, 

468 subjects=list(subjects), 

469 ) 

470 

471 except BadRequestError as e: # noqa: PERF203 

472 self._setup_logger() 

473 

474 log_context = LogicSubscriber.build_log_context( 

475 message=None, 

476 subject="", 

477 queue="", 

478 stream=stream.name, 

479 ) 

480 

481 logger_state = self.config.logger 

482 

483 if ( 

484 e.description 

485 == "stream name already in use with a different configuration" 

486 ): 

487 old_config = (await stream_context.stream_info(stream.name)).config 

488 

489 logger_state.log(str(e), logging.WARNING, log_context) 

490 

491 for subject in old_config.subjects or (): 

492 subjects.append(subject) 

493 

494 stream.config.subjects = list(subjects) 

495 await stream_context.update_stream(config=stream.config) 

496 

497 else: # pragma: no cover 

498 logger_state.log( 

499 str(e), 

500 logging.ERROR, 

501 log_context, 

502 exc_info=e, 

503 ) 

504 

505 finally: 

506 # prevent from double declaration 

507 stream.declare = False 

508 

509 await super().start() 

510 

511 @overload 

512 async def publish( 

513 self, 

514 message: "SendableMessage", 

515 subject: str, 

516 headers: dict[str, str] | None = None, 

517 reply_to: str = "", 

518 correlation_id: str | None = None, 

519 stream: None = None, 

520 timeout: float | None = None, 

521 schedule: Optional["Schedule"] = None, 

522 ) -> None: ... 

523 

524 @overload 

525 async def publish( 

526 self, 

527 message: "SendableMessage", 

528 subject: str, 

529 headers: dict[str, str] | None = None, 

530 reply_to: str = "", 

531 correlation_id: str | None = None, 

532 stream: str | None = None, 

533 timeout: float | None = None, 

534 schedule: Optional["Schedule"] = None, 

535 ) -> "PubAck": ... 

536 

537 @override 

538 async def publish( 

539 self, 

540 message: "SendableMessage", 

541 subject: str, 

542 headers: dict[str, str] | None = None, 

543 reply_to: str = "", 

544 correlation_id: str | None = None, 

545 stream: str | None = None, 

546 timeout: float | None = None, 

547 schedule: Optional["Schedule"] = None, 

548 ) -> Optional["PubAck"]: 

549 """Publish message directly. 

550 

551 This method allows you to publish message in not AsyncAPI-documented way. You can use it in another frameworks 

552 applications or to publish messages from time to time. 

553 

554 Please, use `@broker.publisher(...)` or `broker.publisher(...).publish(...)` instead in a regular way. 

555 

556 Args: 

557 message: 

558 Message body to send. 

559 Can be any encodable object (native python types or `pydantic.BaseModel`). 

560 subject: 

561 NATS subject to send message. 

562 headers: 

563 Message headers to store metainformation. 

564 **content-type** and **correlation_id** will be set automatically by framework anyway. 

565 reply_to: 

566 NATS subject name to send response. 

567 correlation_id: 

568 Manual message **correlation_id** setter. 

569 **correlation_id** is a useful option to trace messages. 

570 stream: 

571 This option validates that the target subject is in presented stream. 

572 Can be omitted without any effect if you doesn't want PubAck frame. 

573 timeout: 

574 Timeout to send message to NATS. 

575 schedule: 

576 Schedule to publish message at a specific time. 

577 

578 Returns: 

579 `None` if you publishes a regular message. 

580 `faststream.nats.PubAck` if you publishes a message to stream. 

581 """ 

582 cmd = NatsPublishCommand( 

583 message=message, 

584 correlation_id=correlation_id or gen_cor_id(), 

585 subject=subject, 

586 headers=headers, 

587 reply_to=reply_to, 

588 stream=stream, 

589 timeout=timeout or 0.5, 

590 _publish_type=PublishType.PUBLISH, 

591 schedule=schedule, 

592 ) 

593 

594 result: PubAck | None 

595 if stream: 

596 result = await super()._basic_publish(cmd, producer=self.config.js_producer) 

597 else: 

598 result = await super()._basic_publish(cmd, producer=self.config.producer) 

599 return result 

600 

601 @override 

602 async def request( # type: ignore[override] 

603 self, 

604 message: "SendableMessage", 

605 subject: str, 

606 headers: dict[str, str] | None = None, 

607 correlation_id: str | None = None, 

608 stream: str | None = None, 

609 timeout: float = 0.5, 

610 ) -> "NatsMessage": 

611 """Make a synchronous request to outer subscriber. 

612 

613 If out subscriber listens subject by stream, you should setup the same **stream** explicitly. 

614 Another way you will reseave confirmation frame as a response. 

615 

616 Args: 

617 message: 

618 Message body to send. 

619 Can be any encodable object (native python types or `pydantic.BaseModel`). 

620 subject: 

621 NATS subject to send message. 

622 headers: 

623 Message headers to store metainformation. 

624 **content-type** and **correlation_id** will be set automatically by framework anyway. 

625 correlation_id: 

626 Manual message **correlation_id** setter. 

627 **correlation_id** is a useful option to trace messages. 

628 stream: 

629 JetStream name. This option is required if your target subscriber listens for events using JetStream. 

630 timeout: 

631 Timeout to send message to NATS. 

632 

633 Returns: 

634 `faststream.nats.message.NatsMessage` object as an outer subscriber response. 

635 """ 

636 cmd = NatsPublishCommand( 

637 message=message, 

638 correlation_id=correlation_id or gen_cor_id(), 

639 subject=subject, 

640 headers=headers, 

641 timeout=timeout, 

642 stream=stream, 

643 _publish_type=PublishType.REQUEST, 

644 ) 

645 

646 producer = self.config.js_producer if stream is not None else self.config.producer 

647 

648 msg: NatsMessage = await super()._basic_request(cmd, producer=producer) 

649 return msg 

650 

651 async def key_value( 

652 self, 

653 bucket: str, 

654 *, 

655 description: str | None = None, 

656 max_value_size: int | None = None, 

657 history: int = 1, 

658 ttl: float | None = None, # in seconds 

659 max_bytes: int | None = None, 

660 storage: Optional["StorageType"] = None, 

661 replicas: int = 1, 

662 placement: Optional["Placement"] = None, 

663 republish: Optional["RePublish"] = None, 

664 direct: bool | None = None, 

665 # custom 

666 declare: bool = True, 

667 ) -> "KeyValue": 

668 kv_declarer = cast("KVBucketDeclarer", self.config.kv_declarer) 

669 return await kv_declarer.create_key_value( 

670 bucket=bucket, 

671 description=description, 

672 max_value_size=max_value_size, 

673 history=history, 

674 ttl=ttl, 

675 max_bytes=max_bytes, 

676 storage=storage, 

677 replicas=replicas, 

678 placement=placement, 

679 republish=republish, 

680 direct=direct, 

681 declare=declare, 

682 ) 

683 

684 async def object_storage( 

685 self, 

686 bucket: str, 

687 *, 

688 description: str | None = None, 

689 ttl: float | None = None, 

690 max_bytes: int | None = None, 

691 storage: Optional["StorageType"] = None, 

692 replicas: int = 1, 

693 placement: Optional["Placement"] = None, 

694 declare: bool = True, 

695 ) -> "ObjectStore": 

696 os_declarer = cast("OSBucketDeclarer", self.config.os_declarer) 

697 return await os_declarer.create_object_store( 

698 bucket=bucket, 

699 description=description, 

700 ttl=ttl, 

701 max_bytes=max_bytes, 

702 storage=storage, 

703 replicas=replicas, 

704 placement=placement, 

705 declare=declare, 

706 ) 

707 

708 def _log_connection_broken( 

709 self, 

710 error_cb: Optional["ErrorCallback"] = None, 

711 ) -> "ErrorCallback": 

712 c = LogicSubscriber.build_log_context(None, "") 

713 

714 async def wrapper(err: Exception) -> None: 

715 if error_cb is not None: 

716 await error_cb(err) 

717 

718 if isinstance(err, Error) and self.config.connection_state: 

719 self.config.logger.log( 

720 f"Connection broken with {err!r}", 

721 logging.WARNING, 

722 c, 

723 exc_info=err, 

724 ) 

725 

726 return wrapper 

727 

728 def _log_reconnected( 

729 self, 

730 cb: Optional["Callback"] = None, 

731 ) -> "Callback": 

732 c = LogicSubscriber.build_log_context(None, "") 

733 

734 async def wrapper() -> None: 

735 if cb is not None: 

736 await cb() 

737 

738 if not self.config.connection_state: 

739 self.config.logger.log("Connection established", logging.INFO, c) 

740 

741 return wrapper 

742 

743 async def new_inbox(self) -> str: 

744 """Return a unique inbox that can be used for NATS requests or subscriptions. 

745 

746 The inbox prefix can be customised by passing `inbox_prefix` when creating your `NatsBroker`. 

747 

748 This method calls `nats.aio.client.Client.new_inbox` [1] under the hood. 

749 

750 [1] https://nats-io.github.io/nats.py/modules.html#nats.aio.client.Client.new_inbox 

751 """ 

752 return self.connection.new_inbox() 

753 

754 @override 

755 async def ping(self, timeout: float | None) -> bool: 

756 sleep_time = (timeout or 10) / 10 

757 

758 with anyio.move_on_after(timeout) as cancel_scope: 

759 if self._connection is None: 759 ↛ 760line 759 didn't jump to line 760 because the condition on line 759 was never true

760 return False 

761 

762 while True: 

763 if cancel_scope.cancel_called: 763 ↛ 764line 763 didn't jump to line 764 because the condition on line 763 was never true

764 return False 

765 

766 if self._connection.is_connected: 766 ↛ 769line 766 didn't jump to line 769 because the condition on line 766 was always true

767 return True 

768 

769 await anyio.sleep(sleep_time) 

770 

771 return False 

772 

773 @property 

774 def connection(self) -> "Client": 

775 return self.config.broker_config.connection_state.connection