Coverage for fastagency/runtimes/ag2/ag2.py: 80%
55 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-19 12:16 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-19 12:16 +0000
1import asyncio 1afgheibcd
2import re 1afgheibcd
3from collections.abc import Iterable, Mapping 1afgheibcd
4from typing import ( 1afgheibcd
5 TYPE_CHECKING,
6 Any,
7 Callable,
8 Optional,
9 Protocol,
10 Union,
11 runtime_checkable,
12)
14from autogen.agentchat import ConversableAgent 1afgheibcd
15from autogen.events.base_event import get_event_classes 1afgheibcd
17from ...base import ( 1afgheibcd
18 UI,
19 WorkflowTypeVar,
20 WorkflowsProtocol,
21 check_register_decorator,
22)
23from ...logging import get_logger 1afgheibcd
25if TYPE_CHECKING: 1afgheibcd
26 from autogen.events.base_event import BaseEvent
28 from fastagency.api.openapi import OpenAPI
30__all__ = [ 1afgheibcd
31 "Workflow",
32 "create_ag2_event",
33]
35# Populate ag2 event classes for each event type
36EVENT_CLASSES = get_event_classes() 1afgheibcd
38# Get the logger
39logger = get_logger(__name__) 1afgheibcd
41logger.info("Importing autogen.base.py") 1afgheibcd
43_patterns = { 1afgheibcd
44 "end_of_message": (
45 "^\\n--------------------------------------------------------------------------------\\n$",
46 ),
47 "auto_reply": (
48 "^\\x1b\\[31m\\n>>>>>>>> USING AUTO REPLY...\\x1b\\[0m\\n$",
49 "^\\n>>>>>>>> USING AUTO REPLY...\\n$",
50 ),
51 "sender_recipient": (
52 "^\\x1b\\[33m([a-zA-Z0-9_-]+)\\x1b\\[0m \\(to ([a-zA-Z0-9_-]+)\\):\\n\\n$",
53 "^([a-zA-Z0-9_-]+) \\(to ([a-zA-Z0-9_-]+)\\):\\n\\n$",
54 ),
55 "suggested_function_call": (
56 "^\\x1b\\[32m\\*\\*\\*\\*\\* Suggested tool call \\((call_[a-zA-Z0-9]+)\\): ([a-zA-Z0-9_]+) \\*\\*\\*\\*\\*\\x1b\\[0m\\n$",
57 "^\\*\\*\\*\\*\\* Suggested tool call \\((call_[a-zA-Z0-9]+)\\): ([a-zA-Z0-9_]+) \\*\\*\\*\\*\\*\\n$",
58 ),
59 "stars": ("^\\x1b\\[32m(\\*{69}\\*+)\\x1b\\[0m\n$", "^(\\*{69}\\*+)\\n$"),
60 "function_call_execution": (
61 "^\\x1b\\[35m\\n>>>>>>>> EXECUTING FUNCTION ([a-zA-Z_]+)...\\x1b\\[0m\\n$",
62 "^\\n>>>>>>>> EXECUTING FUNCTION ([a-zA-Z_]+)...\\n$",
63 ),
64 "response_from_calling_tool": (
65 "^\\x1b\\[32m\\*\\*\\*\\*\\* Response from calling tool \\((call_[a-zA-Z0-9_]+)\\) \\*\\*\\*\\*\\*\\x1b\\[0m\\n$",
66 "^\\*\\*\\*\\*\\* Response from calling tool \\((call_[a-zA-Z0-9_]+)\\) \\*\\*\\*\\*\\*\\n$",
67 ),
68 "no_human_input_received": (
69 "^\\x1b\\[31m\\n>>>>>>>> NO HUMAN INPUT RECEIVED\\.\\x1b\\[0m$",
70 "^\\n>>>>>>>> NO HUMAN INPUT RECEIVED\\.$",
71 ),
72 "user_interrupted": ("^USER INTERRUPTED\\n$",),
73 "arguments": ("^Arguments: \\n(.*)\\n$",),
74 "auto_reply_input": (
75 "^Replying as ([a-zA-Z0-9_]+). Provide feedback to ([a-zA-Z0-9_]+). Press enter to skip and use auto-reply, or type 'exit' to end the conversation: $",
76 ),
77 "next_speaker": (
78 "^Next speaker: [a-zA-Z0-9_]+$",
79 "^\\u001b\\[32m\nNext speaker: [a-zA-Z0-9_]+\n\\u001b\\[0m$",
80 ),
81}
84def _match(key: str, string: str, /) -> bool: 1afgheibcd
85 return any(re.match(pattern, string) for pattern in _patterns[key]) 1abcd
88def _findall(key: str, string: str, /) -> tuple[str, ...]: 1afgheibcd
89 for pattern in _patterns[key]: 1abcd
90 if re.match(pattern, string): 1abcd
91 return re.findall(pattern, string)[0] # type: ignore[no-any-return] 1abcd
92 return () # type: ignore[no-any-return]
95def create_ag2_event(type: Optional[str] = None, **kwargs: Any) -> "BaseEvent": 1afgheibcd
96 type = type or "text"
97 if type not in EVENT_CLASSES:
98 raise ValueError(f"Unknown event type: {type}")
100 # Get the ag2 event class
101 cls = EVENT_CLASSES[type]
103 content = kwargs.pop("content", {})
104 kwargs.update(content)
106 return cls(**kwargs)
109class Workflow(WorkflowsProtocol): 1afgheibcd
110 def __init__(self) -> None: 1afgheibcd
111 """Initialize the workflows."""
112 self._workflows: dict[str, tuple[Callable[[UI, dict[str, Any]], str], str]] = {} 1afgheibcd
114 def register( 1afgheibcd
115 self, name: str, description: str, *, fail_on_redefintion: bool = False
116 ) -> Callable[[WorkflowTypeVar], WorkflowTypeVar]:
117 def decorator(func: WorkflowTypeVar) -> WorkflowTypeVar: 1afgheibcd
118 check_register_decorator(func) 1afgheibcd
119 if name in self._workflows: 119 ↛ 120line 119 didn't jump to line 120 because the condition on line 119 was never true1afgheibcd
120 if fail_on_redefintion:
121 raise ValueError(f"A workflow with name '{name}' already exists.")
122 else:
123 logger.warning(f"Overwriting workflow with name '{name}'")
125 self._workflows[name] = func, description 1afgheibcd
126 return func 1afgheibcd
128 return decorator 1afgheibcd
130 def run( 1afgheibcd
131 self,
132 name: str,
133 ui: UI,
134 user_id: Optional[str] = None,
135 **kwargs: Any,
136 ) -> str:
137 workflow, _ = self._workflows[name] 1aebcdj
139 # todo: inject user_id into call (and other stuff)
140 try: 1aebcdj
141 ui.workflow_started( 1aebcdj
142 sender="Workflow",
143 recipient="User",
144 name=name,
145 description=self.get_description(name),
146 params=kwargs,
147 )
148 retval = ( 1aebcdj
149 asyncio.run(workflow(ui, kwargs))
150 if asyncio.iscoroutinefunction(workflow)
151 else workflow(ui, kwargs)
152 )
154 except Exception as e: 1aebcd
155 logger.error( 1aebcd
156 f"Unhandled exception occurred when executing the workflow: {e}",
157 exc_info=True,
158 )
159 ui.error( 1aebcd
160 sender="Workflow",
161 recipient="User",
162 short="Unhandled exception occurred when executing the workflow.",
163 long=str(e),
164 )
165 retval = f"Unhandled exception occurred when executing the workflow: {e}" 1aebcd
167 ui.workflow_completed( 1aebcdj
168 sender="Workflow",
169 recipient="User",
170 result=retval,
171 )
172 logger.info(f"Workflow '{name}' completed with result: {retval}") 1aebcdj
174 return retval 1aebcdj
176 @property 1afgheibcd
177 def names(self) -> list[str]: 1afgheibcd
178 return list(self._workflows.keys()) 1aebcdj
180 def get_description(self, name: str) -> str: 1afgheibcd
181 _, description = self._workflows.get(name, (None, "Description not available!")) 1aebcdj
182 return description 1aebcdj
184 def register_api( 1afgheibcd
185 self,
186 api: "OpenAPI",
187 callers: Union[ConversableAgent, Iterable[ConversableAgent]],
188 executors: Union[ConversableAgent, Iterable[ConversableAgent]],
189 functions: Optional[
190 Union[str, Iterable[Union[str, Mapping[str, Mapping[str, str]]]]]
191 ] = None,
192 ) -> None:
193 if not isinstance(callers, Iterable): 193 ↛ 195line 193 didn't jump to line 195 because the condition on line 193 was always true1aebcd
194 callers = [callers] 1aebcd
195 if not isinstance(executors, Iterable): 195 ↛ 197line 195 didn't jump to line 197 because the condition on line 195 was always true1aebcd
196 executors = [executors] 1aebcd
197 if isinstance(functions, str): 1aebcd
198 functions = [functions] 1abcd
200 for caller in callers: 1aebcd
201 api._register_for_llm(caller, functions=functions) 1aebcd
203 for executor in executors: 1aebcd
204 api._register_for_execution(executor, functions=functions) 1aebcd
207@runtime_checkable 1afgheibcd
208class Toolable(Protocol): 1afgheibcd
209 def register( 1afgheibcd
210 self,
211 *,
212 caller: ConversableAgent, 1afgheibcd
213 executor: Union[ConversableAgent, list[ConversableAgent]], 1afgheibcd
214 ) -> None: ... 1afgheibcd