Coverage for faststream / mqtt / broker / config.py: 86%

31 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from dataclasses import dataclass, field 

2from typing import TYPE_CHECKING, Any, Literal, Optional, cast 

3 

4from faststream._internal.configs import BrokerConfig 

5from faststream.exceptions import FeatureNotSupportedException, IncorrectState 

6from faststream.mqtt.publisher.producer import ZmqttFakeProducer 

7from faststream.opentelemetry.middleware import TelemetryMiddleware 

8 

9if TYPE_CHECKING: 

10 import zmqtt 

11 

12 from faststream._internal.types import BrokerMiddleware 

13 from faststream.mqtt.publisher.producer import ZmqttBaseProducer 

14 

15 

16MQTTVersionUnset = cast("str", object()) 

17 

18 

19@dataclass(kw_only=True) 

20class MQTTBrokerConfig(BrokerConfig): 

21 version: Literal["3.1.1", "5.0", "unset"] = "unset" 

22 

23 producer: "ZmqttBaseProducer" = field(default_factory=ZmqttFakeProducer) 

24 _client: Optional["zmqtt.MQTTClient"] = field(default=None, init=False, repr=False) 

25 

26 def __post_init__(self) -> None: 

27 for m in self.broker_middlewares: 

28 self._validate_middleware(m) 

29 

30 @property 

31 def client(self) -> "zmqtt.MQTTClient": 

32 if self._client is None: 32 ↛ 33line 32 didn't jump to line 33 because the condition on line 32 was never true

33 msg = "MQTT broker is not connected. Call connect() first." 

34 raise IncorrectState(msg) 

35 return self._client 

36 

37 def connect(self, client: "zmqtt.MQTTClient") -> None: 

38 self._client = client 

39 self.producer.connect(client, self.fd_config._serializer) 

40 

41 def disconnect(self) -> None: 

42 self._client = None 

43 self.producer.disconnect() 

44 

45 def add_middleware(self, middleware: "BrokerMiddleware[Any]") -> None: 

46 self._validate_middleware(middleware) 

47 return super().add_middleware(middleware) 

48 

49 def insert_middleware(self, middleware: "BrokerMiddleware[Any]") -> None: 

50 self._validate_middleware(middleware) 

51 return super().insert_middleware(middleware) 

52 

53 def _validate_middleware(self, middleware: "BrokerMiddleware[Any]") -> None: 

54 if self.version == "3.1.1" and isinstance(middleware, TelemetryMiddleware): 

55 msg = "Opentelementry don`t work in 3.1.1 mqtt" 

56 raise FeatureNotSupportedException(msg)