Coverage for faststream / nats / broker / broker.py: 83%
93 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import logging
2from collections.abc import Iterable, Sequence
3from typing import (
4 TYPE_CHECKING,
5 Any,
6 Optional,
7 Union,
8 cast,
9)
11import anyio
12import nats
13from fast_depends import Provider, dependency_provider
14from nats.aio.client import (
15 DEFAULT_CONNECT_TIMEOUT,
16 DEFAULT_DRAIN_TIMEOUT,
17 DEFAULT_INBOX_PREFIX,
18 DEFAULT_MAX_FLUSHER_QUEUE_SIZE,
19 DEFAULT_MAX_OUTSTANDING_PINGS,
20 DEFAULT_MAX_RECONNECT_ATTEMPTS,
21 DEFAULT_PENDING_SIZE,
22 DEFAULT_PING_INTERVAL,
23 DEFAULT_RECONNECT_TIME_WAIT,
24 Client,
25)
26from nats.aio.msg import Msg
27from nats.errors import Error
28from nats.js.errors import BadRequestError
29from typing_extensions import overload, override
31from faststream.__about__ import SERVICE_NAME
32from faststream._internal.broker import BrokerUsecase
33from faststream._internal.constants import EMPTY
34from faststream._internal.context.repository import ContextRepo
35from faststream._internal.di import FastDependsConfig
36from faststream.message import gen_cor_id
37from faststream.middlewares import AckPolicy
38from faststream.nats.configs import NatsBrokerConfig
39from faststream.nats.publisher.producer import (
40 NatsFastProducerImpl,
41 NatsJSFastProducer,
42)
43from faststream.nats.response import NatsPublishCommand
44from faststream.nats.security import parse_security
45from faststream.nats.subscriber.usecases.basic import LogicSubscriber
46from faststream.response.publish_type import PublishType
47from faststream.specification.schema import BrokerSpec
49from .logging import make_nats_logger_state
50from .registrator import NatsRegistrator
52if TYPE_CHECKING:
53 from types import TracebackType
55 from fast_depends.dependencies import Dependant
56 from fast_depends.library.serializer import SerializerProto
57 from nats.aio.client import (
58 Callback,
59 Credentials,
60 ErrorCallback,
61 JWTCallback,
62 SignatureCallback,
63 )
64 from nats.js.api import Placement, RePublish, StorageType
65 from nats.js.kv import KeyValue
66 from nats.js.object_store import ObjectStore
67 from typing_extensions import TypedDict
69 from faststream._internal.basic_types import LoggerProto, SendableMessage
70 from faststream._internal.parser import CodecProto
71 from faststream._internal.types import BrokerMiddleware, CustomCallable
72 from faststream.nats.configs.broker import JsInitOptions
73 from faststream.nats.helpers import KVBucketDeclarer, OSBucketDeclarer
74 from faststream.nats.message import NatsMessage
75 from faststream.nats.schemas import PubAck, Schedule
76 from faststream.security import BaseSecurity
77 from faststream.specification.schema.extra import Tag, TagDict
79 class NatsInitKwargs(TypedDict, total=False):
80 """NatsBroker.connect() method type hints.
82 Args:
83 error_cb:
84 Callback to report errors.
85 disconnected_cb: \
86 Callback to report disconnection from NATS.
87 closed_cb:
88 Callback to report when client stops reconnection to NATS.
89 discovered_server_cb:
90 A callback to report when a new server joins the cluster.
91 reconnected_cb:
92 Callback to report success reconnection.
93 name:
94 Label the connection with name (shown in NATS monitoring
95 pedantic:
96 Turn on NATS server pedantic mode that performs extra checks on the protocol.
97 https://docs.nats.io/using-nats/developer/connecting/misc#turn-on-pedantic-mode
98 verbose:
99 Verbose mode produce more feedback about code execution.
100 allow_reconnect:
101 Whether recover connection automatically or not.
102 connect_timeout:
103 Timeout in seconds to establish connection with NATS server.
104 reconnect_time_wait:
105 Time in seconds to wait for reestablish connection to NATS server
106 max_reconnect_attempts:
107 Maximum attempts number to reconnect to NATS server.
108 ping_interval:
109 Interval in seconds to ping.
110 max_outstanding_pings:
111 Maximum number of failed pings
112 dont_randomize:
113 Boolean indicating should client randomly shuffle servers list for reconnection randomness.
114 flusher_queue_size:
115 Max count of commands awaiting to be flushed to the socket
116 no_echo:
117 Boolean indicating should commands be echoed.
118 tls_hostname:
119 Hostname for TLS.
120 token:
121 Auth token for NATS auth.
122 drain_timeout:
123 Timeout in seconds to drain subscriptions.
124 signature_cb:
125 A callback used to sign a nonce from the server while authenticating with nkeys.
126 The user should sign the nonce and return the base64 encoded signature.
127 user_jwt_cb:
128 A callback used to fetch and return the account signed JWT for this user.
129 user_credentials:
130 A user credentials file or tuple of files.
131 nkeys_seed:
132 Path-like object containing nkeys seed that will be used.
133 nkeys_seed_str:
134 Nkeys seed to be used.
135 inbox_prefix:
136 Prefix for generating unique inboxes, subjects with that prefix and NUID.ß
137 pending_size:
138 Max size of the pending buffer for publishing commands.
139 flush_timeout:
140 Max duration to wait for a forced flush to occur
141 """
143 error_cb: "ErrorCallback" | None
144 disconnected_cb: "Callback" | None
145 closed_cb: Callback | None
146 discovered_server_cb: "Callback" | None
147 reconnected_cb: "Callback" | None
148 name: str | None
149 pedantic: bool
150 verbose: bool
151 allow_reconnect: bool
152 connect_timeout: int
153 reconnect_time_wait: int
154 max_reconnect_attempts: int
155 ping_interval: int
156 max_outstanding_pings: int
157 dont_randomize: bool
158 flusher_queue_size: int
159 no_echo: bool
160 tls_hostname: str | None
161 token: str | None
162 drain_timeout: int
163 signature_cb: "SignatureCallback" | None
164 user_jwt_cb: "JWTCallback" | None
165 user_credentials: "Credentials" | None
166 nkeys_seed: str | None
167 nkeys_seed_str: str | None
168 inbox_prefix: str | bytes
169 pending_size: int
170 flush_timeout: float | None
173class NatsBroker(
174 NatsRegistrator,
175 BrokerUsecase[Msg, Client],
176):
177 """A class to represent a NATS broker."""
179 url: list[str]
181 def __init__(
182 self,
183 servers: str | Iterable[str] = ("nats://localhost:4222",),
184 *,
185 error_cb: Optional["ErrorCallback"] = None,
186 disconnected_cb: Optional["Callback"] = None,
187 closed_cb: Optional["Callback"] = None,
188 discovered_server_cb: Optional["Callback"] = None,
189 reconnected_cb: Optional["Callback"] = None,
190 name: str | None = SERVICE_NAME,
191 pedantic: bool = False,
192 verbose: bool = False,
193 allow_reconnect: bool = True,
194 connect_timeout: int = DEFAULT_CONNECT_TIMEOUT,
195 reconnect_time_wait: int = DEFAULT_RECONNECT_TIME_WAIT,
196 max_reconnect_attempts: int = DEFAULT_MAX_RECONNECT_ATTEMPTS,
197 ping_interval: int = DEFAULT_PING_INTERVAL,
198 max_outstanding_pings: int = DEFAULT_MAX_OUTSTANDING_PINGS,
199 dont_randomize: bool = False,
200 flusher_queue_size: int = DEFAULT_MAX_FLUSHER_QUEUE_SIZE,
201 no_echo: bool = False,
202 tls_hostname: str | None = None,
203 token: str | None = None,
204 drain_timeout: int = DEFAULT_DRAIN_TIMEOUT,
205 signature_cb: Optional["SignatureCallback"] = None,
206 user_jwt_cb: Optional["JWTCallback"] = None,
207 user_credentials: Optional["Credentials"] = None,
208 nkeys_seed: str | None = None,
209 nkeys_seed_str: str | None = None,
210 inbox_prefix: str | bytes = DEFAULT_INBOX_PREFIX,
211 pending_size: int = DEFAULT_PENDING_SIZE,
212 flush_timeout: float | None = None,
213 js_options: Union["JsInitOptions", dict[str, Any], None] = None,
214 graceful_timeout: float | None = None,
215 ack_policy: AckPolicy = EMPTY,
216 decoder: Optional["CustomCallable"] = None,
217 codec: Optional["CodecProto"] = None,
218 parser: Optional["CustomCallable"] = None,
219 dependencies: Iterable["Dependant"] = (),
220 middlewares: Sequence["BrokerMiddleware[Any, Any]"] = (),
221 routers: Iterable[NatsRegistrator] = (),
222 security: Optional["BaseSecurity"] = None,
223 specification_url: str | Iterable[str] | None = None,
224 protocol: str | None = "nats",
225 protocol_version: str | None = "custom",
226 description: str | None = None,
227 tags: Iterable[Union["Tag", "TagDict"]] = (),
228 logger: Optional["LoggerProto"] = EMPTY,
229 log_level: int = logging.INFO,
230 apply_types: bool = True,
231 serializer: Optional["SerializerProto"] = EMPTY,
232 provider: Optional["Provider"] = None,
233 context: Optional["ContextRepo"] = None,
234 ) -> None:
235 """Initialize the NatsBroker object.
237 Args:
238 servers:
239 NATS cluster addresses to connect.
240 error_cb:
241 Callback to report errors.
242 disconnected_cb:
243 Callback to report disconnection from NATS.
244 closed_cb:
245 Callback to report when client stops reconnection to NATS.
246 discovered_server_cb:
247 A callback to report when a new server joins the cluster.
248 reconnected_cb:
249 Callback to report success reconnection.
250 name:
251 Label the connection with name (shown in NATS monitoring).
252 pedantic:
253 Turn on NATS server pedantic mode that performs extra checks on the protocol.
254 https://docs.nats.io/using-nats/developer/connecting/misc#turn-on-pedantic-mode
255 verbose:
256 Verbose mode produce more feedback about code execution.
257 allow_reconnect:
258 Whether recover connection automatically or not.
259 connect_timeout:
260 Timeout in seconds to establish connection with NATS server.
261 reconnect_time_wait:
262 Time in seconds to wait for reestablish connection to NATS server
263 max_reconnect_attempts:
264 Maximum attempts number to reconnect to NATS server.
265 ping_interval:
266 Interval in seconds to ping.
267 max_outstanding_pings:
268 Maximum number of failed pings
269 dont_randomize:
270 Boolean indicating should client randomly shuffle servers list for reconnection randomness.
271 flusher_queue_size:
272 Max count of commands awaiting to be flushed to the socket
273 no_echo:
274 Boolean indicating should commands be echoed.
275 tls_hostname:
276 Hostname for TLS.
277 token:
278 Auth token for NATS auth.
279 drain_timeout:
280 Timeout in seconds to drain subscriptions.
281 signature_cb:
282 A callback used to sign a nonce from the server while authenticating with nkeys.
283 The user should sign the nonce and return the base64 encoded signature.
284 user_jwt_cb:
285 A callback used to fetch and return the account signed JWT for this user.
286 user_credentials:
287 A user credentials file or tuple of files.
288 nkeys_seed:
289 Path-like object containing nkeys seed that will be used.
290 nkeys_seed_str:
291 Nkeys seed to be used.
292 inbox_prefix:
293 Prefix for generating unique inboxes, subjects with that prefix and NUID.ß
294 pending_size:
295 Max size of the pending buffer for publishing commands.
296 flush_timeout:
297 Max duration to wait for a forced flush to occur
298 js_options:
299 JetStream initialization options.
300 graceful_timeout:
301 Graceful shutdown timeout. Broker waits for all running subscribers completion before shut down.
302 ack_policy:
303 Default acknowledgement policy for all subscribers. Individual subscribers can override.
304 decoder:
305 Custom decoder object.
306 codec:
307 Custom codec object.
308 parser:
309 Custom parser object.
310 dependencies:
311 Dependencies to apply to all broker subscribers.
312 middlewares:
313 "Middlewares to apply to all broker publishers/subscribers.
314 routers:
315 "Routers to apply to broker.
316 security:
317 Security options to connect broker and generate AsyncAPI server security information.
318 specification_url:
319 AsyncAPI hardcoded server addresses. Use `servers` if not specified.
320 protocol:
321 AsyncAPI server protocol.
322 protocol_version:
323 AsyncAPI server protocol version.
324 description:
325 AsyncAPI server description.
326 tags:
327 AsyncAPI server tags.
328 logger:
329 User specified logger to pass into Context and log service messages.
330 log_level:
331 Service messages log level.
332 apply_types:
333 Whether to use FastDepends or not.
334 serializer:
335 FastDepends-compatible serializer to validate incoming messages.
336 provider:
337 Provider for FastDepends.
338 context:
339 Context for FastDepends.
340 """
341 secure_kwargs = parse_security(security)
343 servers = [servers] if isinstance(servers, str) else list(servers)
345 if specification_url is not None:
346 if isinstance(specification_url, str): 346 ↛ 347line 346 didn't jump to line 347 because the condition on line 346 was never true
347 specification_url = [specification_url]
348 else:
349 specification_url = list(specification_url)
350 else:
351 specification_url = servers
353 js_producer = NatsJSFastProducer(
354 parser=parser,
355 decoder=decoder,
356 )
358 producer = NatsFastProducerImpl(
359 parser=parser,
360 decoder=decoder,
361 )
363 super().__init__(
364 # NATS options
365 servers=servers,
366 name=name,
367 verbose=verbose,
368 allow_reconnect=allow_reconnect,
369 reconnect_time_wait=reconnect_time_wait,
370 max_reconnect_attempts=max_reconnect_attempts,
371 no_echo=no_echo,
372 pedantic=pedantic,
373 inbox_prefix=inbox_prefix,
374 pending_size=pending_size,
375 connect_timeout=connect_timeout,
376 drain_timeout=drain_timeout,
377 flush_timeout=flush_timeout,
378 ping_interval=ping_interval,
379 max_outstanding_pings=max_outstanding_pings,
380 dont_randomize=dont_randomize,
381 flusher_queue_size=flusher_queue_size,
382 # security
383 tls_hostname=tls_hostname,
384 token=token,
385 user_credentials=user_credentials,
386 nkeys_seed=nkeys_seed,
387 nkeys_seed_str=nkeys_seed_str,
388 **secure_kwargs,
389 # callbacks
390 error_cb=self._log_connection_broken(error_cb),
391 reconnected_cb=self._log_reconnected(reconnected_cb),
392 disconnected_cb=disconnected_cb,
393 closed_cb=closed_cb,
394 discovered_server_cb=discovered_server_cb,
395 signature_cb=signature_cb,
396 user_jwt_cb=user_jwt_cb,
397 # Basic args
398 routers=routers,
399 config=NatsBrokerConfig(
400 producer=producer,
401 js_producer=js_producer,
402 js_options=js_options or {},
403 # both args
404 broker_middlewares=middlewares,
405 broker_parser=parser,
406 broker_decoder=decoder,
407 broker_codec=codec,
408 logger=make_nats_logger_state(
409 logger=logger,
410 log_level=log_level,
411 ),
412 fd_config=FastDependsConfig(
413 use_fastdepends=apply_types,
414 serializer=serializer,
415 provider=provider or dependency_provider,
416 context=context or ContextRepo(),
417 ),
418 # subscriber args
419 broker_dependencies=dependencies,
420 graceful_timeout=graceful_timeout,
421 ack_policy=ack_policy,
422 extra_context={
423 "broker": self,
424 },
425 ),
426 specification=BrokerSpec(
427 description=description,
428 url=specification_url,
429 protocol=protocol,
430 protocol_version=protocol_version,
431 security=security,
432 tags=tags,
433 ),
434 )
436 async def _connect(self) -> "Client":
437 connection = await nats.connect(**self._connection_kwargs)
438 self.config.connect(connection)
439 return connection
441 async def stop(
442 self,
443 exc_type: type[BaseException] | None = None,
444 exc_val: BaseException | None = None,
445 exc_tb: Optional["TracebackType"] = None,
446 ) -> None:
447 await super().stop(exc_type, exc_val, exc_tb)
449 if self._connection is not None:
450 await self._connection.drain()
451 self._connection = None
453 self.config.disconnect()
455 async def start(self) -> None:
456 """Connect broker to NATS cluster and startup all subscribers."""
457 await self.connect()
459 stream_context = self.config.connection_state.stream
461 for stream, subjects in filter(
462 lambda x: x[0].declare,
463 self._stream_builder.objects.values(),
464 ):
465 try:
466 await stream_context.add_stream(
467 config=stream.config,
468 subjects=list(subjects),
469 )
471 except BadRequestError as e: # noqa: PERF203
472 self._setup_logger()
474 log_context = LogicSubscriber.build_log_context(
475 message=None,
476 subject="",
477 queue="",
478 stream=stream.name,
479 )
481 logger_state = self.config.logger
483 if (
484 e.description
485 == "stream name already in use with a different configuration"
486 ):
487 old_config = (await stream_context.stream_info(stream.name)).config
489 logger_state.log(str(e), logging.WARNING, log_context)
491 for subject in old_config.subjects or ():
492 subjects.append(subject)
494 stream.config.subjects = list(subjects)
495 await stream_context.update_stream(config=stream.config)
497 else: # pragma: no cover
498 logger_state.log(
499 str(e),
500 logging.ERROR,
501 log_context,
502 exc_info=e,
503 )
505 finally:
506 # prevent from double declaration
507 stream.declare = False
509 await super().start()
511 @overload
512 async def publish(
513 self,
514 message: "SendableMessage",
515 subject: str,
516 headers: dict[str, str] | None = None,
517 reply_to: str = "",
518 correlation_id: str | None = None,
519 stream: None = None,
520 timeout: float | None = None,
521 schedule: Optional["Schedule"] = None,
522 ) -> None: ...
524 @overload
525 async def publish(
526 self,
527 message: "SendableMessage",
528 subject: str,
529 headers: dict[str, str] | None = None,
530 reply_to: str = "",
531 correlation_id: str | None = None,
532 stream: str | None = None,
533 timeout: float | None = None,
534 schedule: Optional["Schedule"] = None,
535 ) -> "PubAck": ...
537 @override
538 async def publish(
539 self,
540 message: "SendableMessage",
541 subject: str,
542 headers: dict[str, str] | None = None,
543 reply_to: str = "",
544 correlation_id: str | None = None,
545 stream: str | None = None,
546 timeout: float | None = None,
547 schedule: Optional["Schedule"] = None,
548 ) -> Optional["PubAck"]:
549 """Publish message directly.
551 This method allows you to publish message in not AsyncAPI-documented way. You can use it in another frameworks
552 applications or to publish messages from time to time.
554 Please, use `@broker.publisher(...)` or `broker.publisher(...).publish(...)` instead in a regular way.
556 Args:
557 message:
558 Message body to send.
559 Can be any encodable object (native python types or `pydantic.BaseModel`).
560 subject:
561 NATS subject to send message.
562 headers:
563 Message headers to store metainformation.
564 **content-type** and **correlation_id** will be set automatically by framework anyway.
565 reply_to:
566 NATS subject name to send response.
567 correlation_id:
568 Manual message **correlation_id** setter.
569 **correlation_id** is a useful option to trace messages.
570 stream:
571 This option validates that the target subject is in presented stream.
572 Can be omitted without any effect if you doesn't want PubAck frame.
573 timeout:
574 Timeout to send message to NATS.
575 schedule:
576 Schedule to publish message at a specific time.
578 Returns:
579 `None` if you publishes a regular message.
580 `faststream.nats.PubAck` if you publishes a message to stream.
581 """
582 cmd = NatsPublishCommand(
583 message=message,
584 correlation_id=correlation_id or gen_cor_id(),
585 subject=subject,
586 headers=headers,
587 reply_to=reply_to,
588 stream=stream,
589 timeout=timeout or 0.5,
590 _publish_type=PublishType.PUBLISH,
591 schedule=schedule,
592 )
594 result: PubAck | None
595 if stream:
596 result = await super()._basic_publish(cmd, producer=self.config.js_producer)
597 else:
598 result = await super()._basic_publish(cmd, producer=self.config.producer)
599 return result
601 @override
602 async def request( # type: ignore[override]
603 self,
604 message: "SendableMessage",
605 subject: str,
606 headers: dict[str, str] | None = None,
607 correlation_id: str | None = None,
608 stream: str | None = None,
609 timeout: float = 0.5,
610 ) -> "NatsMessage":
611 """Make a synchronous request to outer subscriber.
613 If out subscriber listens subject by stream, you should setup the same **stream** explicitly.
614 Another way you will reseave confirmation frame as a response.
616 Args:
617 message:
618 Message body to send.
619 Can be any encodable object (native python types or `pydantic.BaseModel`).
620 subject:
621 NATS subject to send message.
622 headers:
623 Message headers to store metainformation.
624 **content-type** and **correlation_id** will be set automatically by framework anyway.
625 correlation_id:
626 Manual message **correlation_id** setter.
627 **correlation_id** is a useful option to trace messages.
628 stream:
629 JetStream name. This option is required if your target subscriber listens for events using JetStream.
630 timeout:
631 Timeout to send message to NATS.
633 Returns:
634 `faststream.nats.message.NatsMessage` object as an outer subscriber response.
635 """
636 cmd = NatsPublishCommand(
637 message=message,
638 correlation_id=correlation_id or gen_cor_id(),
639 subject=subject,
640 headers=headers,
641 timeout=timeout,
642 stream=stream,
643 _publish_type=PublishType.REQUEST,
644 )
646 producer = self.config.js_producer if stream is not None else self.config.producer
648 msg: NatsMessage = await super()._basic_request(cmd, producer=producer)
649 return msg
651 async def key_value(
652 self,
653 bucket: str,
654 *,
655 description: str | None = None,
656 max_value_size: int | None = None,
657 history: int = 1,
658 ttl: float | None = None, # in seconds
659 max_bytes: int | None = None,
660 storage: Optional["StorageType"] = None,
661 replicas: int = 1,
662 placement: Optional["Placement"] = None,
663 republish: Optional["RePublish"] = None,
664 direct: bool | None = None,
665 # custom
666 declare: bool = True,
667 ) -> "KeyValue":
668 kv_declarer = cast("KVBucketDeclarer", self.config.kv_declarer)
669 return await kv_declarer.create_key_value(
670 bucket=bucket,
671 description=description,
672 max_value_size=max_value_size,
673 history=history,
674 ttl=ttl,
675 max_bytes=max_bytes,
676 storage=storage,
677 replicas=replicas,
678 placement=placement,
679 republish=republish,
680 direct=direct,
681 declare=declare,
682 )
684 async def object_storage(
685 self,
686 bucket: str,
687 *,
688 description: str | None = None,
689 ttl: float | None = None,
690 max_bytes: int | None = None,
691 storage: Optional["StorageType"] = None,
692 replicas: int = 1,
693 placement: Optional["Placement"] = None,
694 declare: bool = True,
695 ) -> "ObjectStore":
696 os_declarer = cast("OSBucketDeclarer", self.config.os_declarer)
697 return await os_declarer.create_object_store(
698 bucket=bucket,
699 description=description,
700 ttl=ttl,
701 max_bytes=max_bytes,
702 storage=storage,
703 replicas=replicas,
704 placement=placement,
705 declare=declare,
706 )
708 def _log_connection_broken(
709 self,
710 error_cb: Optional["ErrorCallback"] = None,
711 ) -> "ErrorCallback":
712 c = LogicSubscriber.build_log_context(None, "")
714 async def wrapper(err: Exception) -> None:
715 if error_cb is not None:
716 await error_cb(err)
718 if isinstance(err, Error) and self.config.connection_state:
719 self.config.logger.log(
720 f"Connection broken with {err!r}",
721 logging.WARNING,
722 c,
723 exc_info=err,
724 )
726 return wrapper
728 def _log_reconnected(
729 self,
730 cb: Optional["Callback"] = None,
731 ) -> "Callback":
732 c = LogicSubscriber.build_log_context(None, "")
734 async def wrapper() -> None:
735 if cb is not None:
736 await cb()
738 if not self.config.connection_state:
739 self.config.logger.log("Connection established", logging.INFO, c)
741 return wrapper
743 async def new_inbox(self) -> str:
744 """Return a unique inbox that can be used for NATS requests or subscriptions.
746 The inbox prefix can be customised by passing `inbox_prefix` when creating your `NatsBroker`.
748 This method calls `nats.aio.client.Client.new_inbox` [1] under the hood.
750 [1] https://nats-io.github.io/nats.py/modules.html#nats.aio.client.Client.new_inbox
751 """
752 return self.connection.new_inbox()
754 @override
755 async def ping(self, timeout: float | None) -> bool:
756 sleep_time = (timeout or 10) / 10
758 with anyio.move_on_after(timeout) as cancel_scope:
759 if self._connection is None: 759 ↛ 760line 759 didn't jump to line 760 because the condition on line 759 was never true
760 return False
762 while True:
763 if cancel_scope.cancel_called: 763 ↛ 764line 763 didn't jump to line 764 because the condition on line 763 was never true
764 return False
766 if self._connection.is_connected: 766 ↛ 769line 766 didn't jump to line 769 because the condition on line 766 was always true
767 return True
769 await anyio.sleep(sleep_time)
771 return False
773 @property
774 def connection(self) -> "Client":
775 return self.config.broker_config.connection_state.connection