Coverage for docs/docs_src/user_guide/runtimes/ag2/whatsapp_tool.py: 33%

15 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-19 12:16 +0000

1import os 1abcd

2from typing import Any 1abcd

3 

4from autogen import UserProxyAgent 1abcd

5from autogen import ConversableAgent, LLMConfig 1abcd

6 

7from fastagency import UI, FastAgency 1abcd

8from fastagency.runtimes.ag2 import Workflow 1abcd

9from fastagency.runtimes.ag2.tools import WhatsAppTool 1abcd

10from fastagency.ui.console import ConsoleUI 1abcd

11 

12llm_config = LLMConfig( 1abcd

13 model="gpt-4o-mini", 

14 api_key=os.getenv("OPENAI_API_KEY"), 

15 temperature=0.8, 

16) 

17 

18wf = Workflow() 1abcd

19 

20 

21@wf.register(name="simple_whatsapp", description="WhatsApp chat") # type: ignore[type-var] 1abcd

22def whatsapp_workflow(ui: UI, params: dict[str, Any]) -> str: 1abcd

23 def is_termination_msg(msg: dict[str, Any]) -> bool: 

24 return msg["content"] is not None and "TERMINATE" in msg["content"] 

25 

26 initial_message = ui.text_input( 

27 sender="Workflow", 

28 recipient="User", 

29 prompt="I can help you with sending a message over whatsapp, what would you like to send?", 

30 ) 

31 

32 with llm_config: 

33 user_agent = UserProxyAgent( 

34 name="User_Agent", 

35 system_message="You are a user agent, when the message is successfully sent, you can end the conversation by sending 'TERMINATE'", 

36 human_input_mode="NEVER", 

37 is_termination_msg=is_termination_msg, 

38 ) 

39 assistant_agent = ConversableAgent( 

40 name="Assistant_Agent", 

41 system_message="You are a useful assistant for sending messages to whatsapp, use 447860099299 as your (sender) number.", 

42 human_input_mode="NEVER", 

43 is_termination_msg=is_termination_msg, 

44 ) 

45 

46 whatsapp = WhatsAppTool( 

47 whatsapp_api_key=os.getenv("WHATSAPP_API_KEY", ""), 

48 ) 

49 

50 whatsapp.register( 

51 caller=assistant_agent, 

52 executor=user_agent, 

53 ) 

54 

55 response = user_agent.run( 

56 assistant_agent, 

57 message=initial_message, 

58 summary_method="reflection_with_llm", 

59 max_turns=5, 

60 ) 

61 

62 return ui.process(response) # type: ignore[no-any-return] 

63 

64 

65app = FastAgency(provider=wf, ui=ConsoleUI()) 1abcd