Coverage for faststream / mqtt / broker / config.py: 86%
31 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from dataclasses import dataclass, field
2from typing import TYPE_CHECKING, Any, Literal, Optional, cast
4from faststream._internal.configs import BrokerConfig
5from faststream.exceptions import FeatureNotSupportedException, IncorrectState
6from faststream.mqtt.publisher.producer import ZmqttFakeProducer
7from faststream.opentelemetry.middleware import TelemetryMiddleware
9if TYPE_CHECKING:
10 import zmqtt
12 from faststream._internal.types import BrokerMiddleware
13 from faststream.mqtt.publisher.producer import ZmqttBaseProducer
16MQTTVersionUnset = cast("str", object())
19@dataclass(kw_only=True)
20class MQTTBrokerConfig(BrokerConfig):
21 version: Literal["3.1.1", "5.0", "unset"] = "unset"
23 producer: "ZmqttBaseProducer" = field(default_factory=ZmqttFakeProducer)
24 _client: Optional["zmqtt.MQTTClient"] = field(default=None, init=False, repr=False)
26 def __post_init__(self) -> None:
27 for m in self.broker_middlewares:
28 self._validate_middleware(m)
30 @property
31 def client(self) -> "zmqtt.MQTTClient":
32 if self._client is None: 32 ↛ 33line 32 didn't jump to line 33 because the condition on line 32 was never true
33 msg = "MQTT broker is not connected. Call connect() first."
34 raise IncorrectState(msg)
35 return self._client
37 def connect(self, client: "zmqtt.MQTTClient") -> None:
38 self._client = client
39 self.producer.connect(client, self.fd_config._serializer)
41 def disconnect(self) -> None:
42 self._client = None
43 self.producer.disconnect()
45 def add_middleware(self, middleware: "BrokerMiddleware[Any]") -> None:
46 self._validate_middleware(middleware)
47 return super().add_middleware(middleware)
49 def insert_middleware(self, middleware: "BrokerMiddleware[Any]") -> None:
50 self._validate_middleware(middleware)
51 return super().insert_middleware(middleware)
53 def _validate_middleware(self, middleware: "BrokerMiddleware[Any]") -> None:
54 if self.version == "3.1.1" and isinstance(middleware, TelemetryMiddleware):
55 msg = "Opentelementry don`t work in 3.1.1 mqtt"
56 raise FeatureNotSupportedException(msg)