Coverage for docs/docs_src/tutorials/whatsapp/main.py: 45%
22 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-19 12:16 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-19 12:16 +0000
1import os 1abcd
2from typing import Annotated, Any, Optional 1abcd
4from autogen import register_function, ConversableAgent, LLMConfig 1abcd
6from fastagency import UI 1abcd
7from fastagency.api.openapi import OpenAPI 1abcd
8from fastagency.api.openapi.security import APIKeyHeader 1abcd
9from fastagency.runtimes.ag2 import Workflow 1abcd
10from fastagency.runtimes.ag2.agents.websurfer import WebSurferAgent 1abcd
12llm_config = LLMConfig( 1abcd
13 model="gpt-4o-mini",
14 api_key=os.getenv("OPENAI_API_KEY"),
15 temperature=0.8,
16)
18openapi_url = "https://dev.infobip.com/openapi/products/whatsapp.json" 1abcd
20whatsapp_api = OpenAPI.create( 1abcd
21 openapi_url=openapi_url,
22 # this is an optional parameter, but specified here because servers are not specified in the OpenAPI specification
23 servers=[{"url": "https://api.infobip.com"}],
24)
26header_authorization = "App " # pragma: allowlist secret 1abcd
27header_authorization += os.getenv("WHATSAPP_API_KEY", "") 1abcd
28whatsapp_api.set_security_params(APIKeyHeader.Parameters(value=header_authorization)) 1abcd
30# This is the default sender number for Infobip.
31# If you want to use your own sender, please update the value below:
32sender = "447860099299" 1abcd
33WHATSAPP_SYSTEM_MESSAGE = f"""You are an agent in charge to communicate with the user and WhatsAPP API. 1abcd
34Always use 'present_completed_task_or_ask_question' to interact with the user.
35- make sure that the 'message' parameter contains all the necessary information for the user!
36Initially, the Web_Surfer_Agent will provide you with some content from the web.
37You should ask the user if he would like to receive the summary of the scraped page
38by using 'present_completed_task_or_ask_question'.
39- "If you want to receive the summary of the page as a WhatsApp message, please provide your number."
41 When sending the message, the Body must use the following format:
42{{
43 "from": "{sender}",
44 "to": "receiverNumber",
45 "messageId": "test-message-randomInt",
46 "content": {{
47 "text": "message"
48 }},
49 "callbackData": "Callback data"
50}}
52"from" number is always the same.
53"""
55wf = Workflow() 1abcd
58@wf.register(name="whatsapp_and_websurfer", description="WhatsApp and WebSurfer chat") 1abcd
59def whatsapp_and_websurfer_workflow(ui: UI, params: dict[str, Any]) -> str: 1abcd
60 def is_termination_msg(msg: dict[str, Any]) -> bool:
61 return msg["content"] is not None and "TERMINATE" in msg["content"]
63 def present_completed_task_or_ask_question(
64 message: Annotated[str, "Message for examiner"],
65 ) -> Optional[str]:
66 try:
67 return ui.text_input(
68 sender="Whatsapp_Agent",
69 recipient="User",
70 prompt=message,
71 )
72 except Exception as e: # pragma: no cover
73 return f"present_completed_task_or_ask_question() FAILED! {e}"
75 whatsapp_agent = ConversableAgent(
76 name="WhatsApp_Agent",
77 system_message=WHATSAPP_SYSTEM_MESSAGE,
78 llm_config=llm_config,
79 human_input_mode="NEVER",
80 is_termination_msg=is_termination_msg,
81 )
83 web_surfer = WebSurferAgent(
84 name="Web_Surfer_Agent",
85 llm_config=llm_config,
86 summarizer_llm_config=llm_config,
87 human_input_mode="NEVER",
88 executor=whatsapp_agent,
89 is_termination_msg=is_termination_msg,
90 bing_api_key=os.getenv("BING_API_KEY"),
91 )
93 register_function(
94 present_completed_task_or_ask_question,
95 caller=whatsapp_agent,
96 executor=web_surfer,
97 name="present_completed_task_or_ask_question",
98 description="""Present completed task or ask question.
99If you are presenting a completed task, last message should be a question: 'Do yo need anything else?'""",
100 )
102 wf.register_api(
103 api=whatsapp_api,
104 callers=whatsapp_agent,
105 executors=web_surfer,
106 functions=["send_whatsapp_text_message"],
107 )
109 initial_message = ui.text_input(
110 sender="Workflow",
111 recipient="User",
112 prompt="For which website would you like to receive a summary?",
113 )
115 response = whatsapp_agent.run(
116 web_surfer,
117 message=f"Users initial message: {initial_message}",
118 summary_method="reflection_with_llm",
119 max_turns=10,
120 )
122 return ui.process(response) # type: ignore[no-any-return]