Coverage for fastagency/runtimes/ag2/tools/whatsapp.py: 100%
13 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-19 12:16 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-19 12:16 +0000
1from typing import Union 1bfgahicde
3from autogen import ConversableAgent 1bfgahicde
5from fastagency.api.openapi import OpenAPI 1bfgahicde
6from fastagency.api.openapi.security import APIKeyHeader 1bfgahicde
7from fastagency.runtimes.ag2.ag2 import Toolable 1bfgahicde
9WHATSAPP_OPENAPI_URL = "https://dev.infobip.com/openapi/products/whatsapp.json" 1bfgahicde
10WHATSAPP_API_SERVER = "https://api.infobip.com" 1bfgahicde
11WHATSAPP_FUNCTIONS = ["send_whatsapp_text_message"] 1bfgahicde
14class WhatsAppTool(Toolable): 1bfgahicde
15 def __init__( 1bfgahicde
16 self,
17 whatsapp_api_key: str,
18 whatsapp_openapi_url: str = WHATSAPP_OPENAPI_URL,
19 whatsapp_api_server: str = WHATSAPP_API_SERVER,
20 ):
21 """Create a new WhatsAppTool instance.
23 Args:
24 whatsapp_api_key (str): The WhatsApp API key.
25 whatsapp_openapi_url (str): Url of the openapi schema for Infobip WhatsApp API, defaults to https://dev.infobip.com/openapi/products/whatsapp.json
26 whatsapp_api_server (str): Url of the Infobip WhatsApp API server, defaults to https://api.infobip.com
27 """
28 self.whatsapp_api = OpenAPI.create( 1bacde
29 openapi_url=whatsapp_openapi_url,
30 servers=[{"url": whatsapp_api_server}],
31 )
33 header_authorization = f"App {whatsapp_api_key}" 1bacde
34 self.whatsapp_api.set_security_params( 1bacde
35 APIKeyHeader.Parameters(value=header_authorization)
36 )
38 def register( 1bfgahicde
39 self,
40 *,
41 caller: ConversableAgent,
42 executor: Union[ConversableAgent, list[ConversableAgent]],
43 ) -> None:
44 executors = executor if isinstance(executor, list) else [executor] 1a
46 self.whatsapp_api._register_for_llm(caller, functions=WHATSAPP_FUNCTIONS) 1a
48 for executor in executors: 1a
49 self.whatsapp_api._register_for_execution( 1a
50 executor, functions=WHATSAPP_FUNCTIONS
51 )