Coverage for faststream / redis / publisher / factory.py: 92%

17 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from typing import TYPE_CHECKING, Any, TypeAlias, Union 

2 

3from faststream.exceptions import SetupError 

4from faststream.redis.schemas import INCORRECT_SETUP_MSG, ListSub, PubSub, StreamSub 

5from faststream.redis.schemas.proto import validate_options 

6 

7from .config import RedisPublisherConfig, RedisPublisherSpecificationConfig 

8from .specification import ( 

9 ChannelPublisherSpecification, 

10 ListPublisherSpecification, 

11 RedisPublisherSpecification, 

12 StreamPublisherSpecification, 

13) 

14from .usecase import ( 

15 ChannelPublisher, 

16 ListBatchPublisher, 

17 ListPublisher, 

18 LogicPublisher, 

19 StreamPublisher, 

20) 

21 

22if TYPE_CHECKING: 

23 from faststream.redis.configs import RedisBrokerConfig 

24 from faststream.redis.parser import MessageFormat 

25 

26 

27PublisherType: TypeAlias = LogicPublisher 

28 

29 

30def create_publisher( 

31 *, 

32 channel: Union["PubSub", str, None], 

33 list: Union["ListSub", str, None], 

34 stream: Union["StreamSub", str, None], 

35 headers: dict[str, Any] | None, 

36 reply_to: str, 

37 config: "RedisBrokerConfig", 

38 message_format: type["MessageFormat"] | None, 

39 # AsyncAPI args 

40 title_: str | None, 

41 description_: str | None, 

42 schema_: Any | None, 

43 include_in_schema: bool, 

44) -> PublisherType: 

45 validate_options(channel=channel, list=list, stream=stream) 

46 

47 publisher_config = RedisPublisherConfig( 

48 reply_to=reply_to, 

49 headers=headers, 

50 _message_format=message_format, 

51 _outer_config=config, 

52 ) 

53 

54 specification_config = RedisPublisherSpecificationConfig( 

55 schema_=schema_, 

56 title_=title_, 

57 description_=description_, 

58 include_in_schema=include_in_schema, 

59 ) 

60 

61 specification: RedisPublisherSpecification 

62 if channel_sub := PubSub.validate(channel): 

63 specification = ChannelPublisherSpecification( 

64 config, 

65 specification_config, 

66 channel_sub, 

67 ) 

68 

69 return ChannelPublisher(publisher_config, specification, channel=channel_sub) 

70 

71 if stream_sub := StreamSub.validate(stream): 

72 specification = StreamPublisherSpecification( 

73 config, 

74 specification_config, 

75 stream_sub, 

76 ) 

77 

78 return StreamPublisher(publisher_config, specification, stream=stream_sub) 

79 

80 if list_sub := ListSub.validate(list): 80 ↛ 88line 80 didn't jump to line 88 because the condition on line 80 was always true

81 specification = ListPublisherSpecification(config, specification_config, list_sub) 

82 

83 if list_sub.batch: 

84 return ListBatchPublisher(publisher_config, specification, list=list_sub) 

85 

86 return ListPublisher(publisher_config, specification, list=list_sub) 

87 

88 raise SetupError(INCORRECT_SETUP_MSG)