Coverage for docs/docs_src/tutorials/whatsapp/main.py: 45%

22 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-19 12:16 +0000

1import os 1abcd

2from typing import Annotated, Any, Optional 1abcd

3 

4from autogen import register_function, ConversableAgent, LLMConfig 1abcd

5 

6from fastagency import UI 1abcd

7from fastagency.api.openapi import OpenAPI 1abcd

8from fastagency.api.openapi.security import APIKeyHeader 1abcd

9from fastagency.runtimes.ag2 import Workflow 1abcd

10from fastagency.runtimes.ag2.agents.websurfer import WebSurferAgent 1abcd

11 

12llm_config = LLMConfig( 1abcd

13 model="gpt-4o-mini", 

14 api_key=os.getenv("OPENAI_API_KEY"), 

15 temperature=0.8, 

16) 

17 

18openapi_url = "https://dev.infobip.com/openapi/products/whatsapp.json" 1abcd

19 

20whatsapp_api = OpenAPI.create( 1abcd

21 openapi_url=openapi_url, 

22 # this is an optional parameter, but specified here because servers are not specified in the OpenAPI specification 

23 servers=[{"url": "https://api.infobip.com"}], 

24) 

25 

26header_authorization = "App " # pragma: allowlist secret 1abcd

27header_authorization += os.getenv("WHATSAPP_API_KEY", "") 1abcd

28whatsapp_api.set_security_params(APIKeyHeader.Parameters(value=header_authorization)) 1abcd

29 

30# This is the default sender number for Infobip. 

31# If you want to use your own sender, please update the value below: 

32sender = "447860099299" 1abcd

33WHATSAPP_SYSTEM_MESSAGE = f"""You are an agent in charge to communicate with the user and WhatsAPP API. 1abcd

34Always use 'present_completed_task_or_ask_question' to interact with the user. 

35- make sure that the 'message' parameter contains all the necessary information for the user! 

36Initially, the Web_Surfer_Agent will provide you with some content from the web. 

37You should ask the user if he would like to receive the summary of the scraped page 

38by using 'present_completed_task_or_ask_question'. 

39- "If you want to receive the summary of the page as a WhatsApp message, please provide your number." 

40 

41 When sending the message, the Body must use the following format: 

42{{ 

43 "from": "{sender}", 

44 "to": "receiverNumber", 

45 "messageId": "test-message-randomInt", 

46 "content": {{ 

47 "text": "message" 

48 }}, 

49 "callbackData": "Callback data" 

50}} 

51 

52"from" number is always the same. 

53""" 

54 

55wf = Workflow() 1abcd

56 

57 

58@wf.register(name="whatsapp_and_websurfer", description="WhatsApp and WebSurfer chat") 1abcd

59def whatsapp_and_websurfer_workflow(ui: UI, params: dict[str, Any]) -> str: 1abcd

60 def is_termination_msg(msg: dict[str, Any]) -> bool: 

61 return msg["content"] is not None and "TERMINATE" in msg["content"] 

62 

63 def present_completed_task_or_ask_question( 

64 message: Annotated[str, "Message for examiner"], 

65 ) -> Optional[str]: 

66 try: 

67 return ui.text_input( 

68 sender="Whatsapp_Agent", 

69 recipient="User", 

70 prompt=message, 

71 ) 

72 except Exception as e: # pragma: no cover 

73 return f"present_completed_task_or_ask_question() FAILED! {e}" 

74 

75 whatsapp_agent = ConversableAgent( 

76 name="WhatsApp_Agent", 

77 system_message=WHATSAPP_SYSTEM_MESSAGE, 

78 llm_config=llm_config, 

79 human_input_mode="NEVER", 

80 is_termination_msg=is_termination_msg, 

81 ) 

82 

83 web_surfer = WebSurferAgent( 

84 name="Web_Surfer_Agent", 

85 llm_config=llm_config, 

86 summarizer_llm_config=llm_config, 

87 human_input_mode="NEVER", 

88 executor=whatsapp_agent, 

89 is_termination_msg=is_termination_msg, 

90 bing_api_key=os.getenv("BING_API_KEY"), 

91 ) 

92 

93 register_function( 

94 present_completed_task_or_ask_question, 

95 caller=whatsapp_agent, 

96 executor=web_surfer, 

97 name="present_completed_task_or_ask_question", 

98 description="""Present completed task or ask question. 

99If you are presenting a completed task, last message should be a question: 'Do yo need anything else?'""", 

100 ) 

101 

102 wf.register_api( 

103 api=whatsapp_api, 

104 callers=whatsapp_agent, 

105 executors=web_surfer, 

106 functions=["send_whatsapp_text_message"], 

107 ) 

108 

109 initial_message = ui.text_input( 

110 sender="Workflow", 

111 recipient="User", 

112 prompt="For which website would you like to receive a summary?", 

113 ) 

114 

115 response = whatsapp_agent.run( 

116 web_surfer, 

117 message=f"Users initial message: {initial_message}", 

118 summary_method="reflection_with_llm", 

119 max_turns=10, 

120 ) 

121 

122 return ui.process(response) # type: ignore[no-any-return]