Coverage for faststream / redis / publisher / factory.py: 92%
17 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from typing import TYPE_CHECKING, Any, TypeAlias, Union
3from faststream.exceptions import SetupError
4from faststream.redis.schemas import INCORRECT_SETUP_MSG, ListSub, PubSub, StreamSub
5from faststream.redis.schemas.proto import validate_options
7from .config import RedisPublisherConfig, RedisPublisherSpecificationConfig
8from .specification import (
9 ChannelPublisherSpecification,
10 ListPublisherSpecification,
11 RedisPublisherSpecification,
12 StreamPublisherSpecification,
13)
14from .usecase import (
15 ChannelPublisher,
16 ListBatchPublisher,
17 ListPublisher,
18 LogicPublisher,
19 StreamPublisher,
20)
22if TYPE_CHECKING:
23 from faststream.redis.configs import RedisBrokerConfig
24 from faststream.redis.parser import MessageFormat
27PublisherType: TypeAlias = LogicPublisher
30def create_publisher(
31 *,
32 channel: Union["PubSub", str, None],
33 list: Union["ListSub", str, None],
34 stream: Union["StreamSub", str, None],
35 headers: dict[str, Any] | None,
36 reply_to: str,
37 config: "RedisBrokerConfig",
38 message_format: type["MessageFormat"] | None,
39 # AsyncAPI args
40 title_: str | None,
41 description_: str | None,
42 schema_: Any | None,
43 include_in_schema: bool,
44) -> PublisherType:
45 validate_options(channel=channel, list=list, stream=stream)
47 publisher_config = RedisPublisherConfig(
48 reply_to=reply_to,
49 headers=headers,
50 _message_format=message_format,
51 _outer_config=config,
52 )
54 specification_config = RedisPublisherSpecificationConfig(
55 schema_=schema_,
56 title_=title_,
57 description_=description_,
58 include_in_schema=include_in_schema,
59 )
61 specification: RedisPublisherSpecification
62 if channel_sub := PubSub.validate(channel):
63 specification = ChannelPublisherSpecification(
64 config,
65 specification_config,
66 channel_sub,
67 )
69 return ChannelPublisher(publisher_config, specification, channel=channel_sub)
71 if stream_sub := StreamSub.validate(stream):
72 specification = StreamPublisherSpecification(
73 config,
74 specification_config,
75 stream_sub,
76 )
78 return StreamPublisher(publisher_config, specification, stream=stream_sub)
80 if list_sub := ListSub.validate(list): 80 ↛ 88line 80 didn't jump to line 88 because the condition on line 80 was always true
81 specification = ListPublisherSpecification(config, specification_config, list_sub)
83 if list_sub.batch:
84 return ListBatchPublisher(publisher_config, specification, list=list_sub)
86 return ListPublisher(publisher_config, specification, list=list_sub)
88 raise SetupError(INCORRECT_SETUP_MSG)