Coverage for docs/docs_src/tutorials/giphy/main.py: 38%
21 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-19 12:16 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-19 12:16 +0000
1import os 1abcd
2from typing import Annotated, Any, Optional 1abcd
4from autogen import register_function, ConversableAgent, LLMConfig 1abcd
6from fastagency import UI 1abcd
7from fastagency.api.openapi import OpenAPI 1abcd
8from fastagency.api.openapi.security import APIKeyQuery 1abcd
9from fastagency.runtimes.ag2.agents.websurfer import WebSurferAgent 1abcd
10from fastagency.runtimes.ag2 import Workflow 1abcd
12llm_config = LLMConfig( 1abcd
13 model="gpt-4o-mini",
14 api_key=os.getenv("OPENAI_API_KEY"),
15 temperature=0.8,
16)
18openapi_url = "https://raw.githubusercontent.com/ag2ai/fastagency/refs/heads/main/examples/openapi/giphy_openapi.json" 1abcd
19giphy_api = OpenAPI.create(openapi_url=openapi_url) 1abcd
21giphy_api_key = os.getenv("GIPHY_API_KEY", "") 1abcd
22giphy_api.set_security_params(APIKeyQuery.Parameters(value=giphy_api_key)) 1abcd
24GIPHY_SYSTEM_MESSAGE = """You are an agent in charge to communicate with the user and Giphy API. 1abcd
25Always use 'present_completed_task_or_ask_question' to interact with the user.
26- make sure that the 'message' parameter contains all the necessary information for the user!
27Initially, the Web_Surfer_Agent will provide you with some content from the web.
28You must present this content provided Web_Surfer_Agent to the user by using 'present_completed_task_or_ask_question'.
29Along with the content, ask the user if he wants you to generate some gifs based on the content.
30- Do NOT generate gifs BEFORE you present the web content to the user, otherwise, you will be penalized!
32Once get the wanted gifs, present them to the user by using 'present_completed_task_or_ask_question' again.
33Note: Use '.gif' files when presenting a gif to the user and format it as a markdown gif -> 
34- Also, make sure to add new lines '\n\n' between headlines and gifs for better readability.
35e.g.:
36'''
37# Here are some gifs for you:
39## Title 1
40
42## Title 2
43
44'''
46Write 'TERMINATE' to end the conversation."""
48wf = Workflow() 1abcd
51@wf.register(name="giphy_and_websurfer", description="Giphy and Websurfer chat") 1abcd
52def giphy_workflow_with_security( 1abcd
53 ui: UI, params: dict[str, Any]
54) -> str:
55 def is_termination_msg(msg: dict[str, Any]) -> bool:
56 return msg["content"] is not None and "TERMINATE" in msg["content"]
58 def present_completed_task_or_ask_question(
59 message: Annotated[str, "Message for examiner"],
60 ) -> Optional[str]:
61 try:
62 return ui.text_input(
63 sender="giphy_agent",
64 recipient="giphy_agent",
65 prompt=message,
66 )
67 except Exception as e: # pragma: no cover
68 return f"present_completed_task_or_ask_question() FAILED! {e}"
70 giphy_agent = ConversableAgent(
71 name="Giphy_Agent",
72 system_message=GIPHY_SYSTEM_MESSAGE,
73 llm_config=llm_config,
74 human_input_mode="NEVER",
75 is_termination_msg=is_termination_msg,
76 )
77 web_surfer = WebSurferAgent(
78 name="Web_Surfer_Agent",
79 llm_config=llm_config,
80 summarizer_llm_config=llm_config,
81 human_input_mode="NEVER",
82 executor=giphy_agent,
83 is_termination_msg=is_termination_msg,
84 bing_api_key=os.getenv("BING_API_KEY")
85 )
87 register_function(
88 present_completed_task_or_ask_question,
89 caller=giphy_agent,
90 executor=web_surfer,
91 name="present_completed_task_or_ask_question",
92 description="""Present completed task or ask question.
93If you are presenting a completed task, last message should be a question: 'Do yo need anything else?'""",
94 )
96 functions = ["random_gif", "search_gifs", "trending_gifs"]
97 wf.register_api(
98 api=giphy_api,
99 callers=giphy_agent,
100 executors=web_surfer,
101 functions=functions,
102 )
104 initial_message = ui.text_input(
105 sender="Workflow",
106 recipient="User",
107 prompt="I can help you find images related to a certain subject. What kind of images would you like to find?",
108 )
110 response = giphy_agent.run(
111 web_surfer,
112 message=f"Users initial message: {initial_message}",
113 summary_method="reflection_with_llm",
114 max_turns=10,
115 )
117 return ui.process(response) # type: ignore[no-any-return]