Coverage for fastagency/base.py: 95%
67 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-19 12:16 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-19 12:16 +0000
1import inspect 1efghaibcd
2from collections.abc import Awaitable, Generator, Iterable, Iterator, Mapping 1efghaibcd
3from contextlib import contextmanager 1efghaibcd
4from typing import ( 1efghaibcd
5 TYPE_CHECKING,
6 Any,
7 Callable,
8 Optional,
9 Protocol,
10 TypeVar,
11 Union,
12 runtime_checkable,
13)
15from .logging import get_logger 1efghaibcd
16from .messages import IOMessage, MessageProcessorProtocol 1efghaibcd
18if TYPE_CHECKING: 1efghaibcd
19 from autogen.io.run_response import AsyncRunResponse, RunResponse
21 from fastagency.api.openapi import OpenAPI
23__all__ = [ 1efghaibcd
24 "ASGIProtocol",
25 "AdapterProtocol",
26 "Agent",
27 "ProviderProtocol",
28 "Runnable",
29 "UIBase",
30 "WSGIProtocol",
31 "WorkflowTypeVar",
32 "WorkflowsProtocol",
33]
35logger = get_logger(__name__) 1efghaibcd
38@runtime_checkable 1efghaibcd
39class UIBase(MessageProcessorProtocol, Protocol): 1efghaibcd
40 @contextmanager 1efghaibcd
41 def create(self, app: "Runnable", import_string: str) -> Iterator[None]: ... 1efghaibcd
43 def start( 1efghaibcd
44 self,
45 *,
46 app: "Runnable", 1efghaibcd
47 import_string: str, 1efghaibcd
48 name: Optional[str] = None, 1efghaibcd
49 params: dict[str, Any], 1efghaibcd
50 single_run: bool = False, 1efghaibcd
51 ) -> None: ... 1efghaibcd
53 def create_workflow_ui(self, workflow_uuid: str) -> "UI": ... 1efghaibcd
55 # def process_streaming_message(
56 # self, message: IOStreamingMessage
57 # ) -> Optional[str]: ...
60class CreateWorkflowUIMixin: 1efghaibcd
61 def create_workflow_ui(self: UIBase, workflow_uuid: str) -> "UI": 1efghaibcd
62 return UI(uibase=self, workflow_uuid=workflow_uuid) 1eabcdj
65class UI: 1efghaibcd
66 def __init__(self, uibase: UIBase, workflow_uuid: str) -> None: 1efghaibcd
67 if workflow_uuid is None: 1eabcdj
68 logger.error("workflow_uuid must be provided")
69 raise ValueError("workflow_uuid must be provided")
70 self._ui_base = uibase 1eabcdj
71 self._workflow_uuid = workflow_uuid 1eabcdj
73 @property 1efghaibcd
74 def workflow_uuid(self) -> str: 1efghaibcd
75 return self._workflow_uuid 1eabcdj
77 @property 1efghaibcd
78 def ui_base(self) -> UIBase: 1efghaibcd
79 return self._ui_base 1j
81 async def async_process(self, response: "AsyncRunResponse") -> str: 1efghaibcd
82 """Process the async response from the workflow.
84 This method processes the events in the response and waits for the
85 summary to be ready.
86 """
87 async for event in response.events: 1a
88 self.process_message(event) 1a
90 return str(await response.summary) 1a
92 def process(self, response: "RunResponse") -> str: 1efghaibcd
93 """Process the response from the workflow.
95 This method processes the events in the response and waits for the
96 summary to be ready.
97 """
98 for event in response.events: 1a
99 self.process_message(event) 1a
101 return str(response.summary) 1a
103 def process_message(self, message: IOMessage) -> Optional[str]: 1efghaibcd
104 return self._ui_base.process_message(message) 1a
106 def text_message( 1efghaibcd
107 self,
108 # common parameters for all messages
109 sender: Optional[str] = None,
110 recipient: Optional[str] = None,
111 auto_reply: bool = False,
112 uuid: Optional[str] = None,
113 # text_message specific parameters
114 body: Optional[str] = None,
115 ) -> Optional[str]:
116 return self._ui_base.text_message( 1j
117 workflow_uuid=self.workflow_uuid,
118 sender=sender,
119 recipient=recipient,
120 auto_reply=auto_reply,
121 uuid=uuid,
122 body=body,
123 )
125 def suggested_function_call( 1efghaibcd
126 self,
127 # common parameters for all messages
128 sender: Optional[str] = None,
129 recipient: Optional[str] = None,
130 auto_reply: bool = False,
131 uuid: Optional[str] = None,
132 # suggested_function_call specific parameters
133 function_name: Optional[str] = None,
134 call_id: Optional[str] = None,
135 arguments: Optional[dict[str, Any]] = None,
136 ) -> Optional[str]:
137 return self._ui_base.suggested_function_call( 1j
138 workflow_uuid=self.workflow_uuid,
139 sender=sender,
140 recipient=recipient,
141 auto_reply=auto_reply,
142 uuid=uuid,
143 function_name=function_name,
144 call_id=call_id,
145 arguments=arguments,
146 )
148 def function_call_execution( 1efghaibcd
149 self,
150 # common parameters for all messages
151 sender: Optional[str] = None,
152 recipient: Optional[str] = None,
153 auto_reply: bool = False,
154 uuid: Optional[str] = None,
155 # function_call_execution specific parameters
156 function_name: Optional[str] = None,
157 call_id: Optional[str] = None,
158 retval: Any = None,
159 ) -> Optional[str]:
160 return self._ui_base.function_call_execution( 1j
161 workflow_uuid=self.workflow_uuid,
162 sender=sender,
163 recipient=recipient,
164 auto_reply=auto_reply,
165 uuid=uuid,
166 function_name=function_name,
167 call_id=call_id,
168 retval=retval,
169 )
171 def text_input( 1efghaibcd
172 self,
173 # common parameters for all messages
174 sender: Optional[str] = None, 1efghaibcd
175 recipient: Optional[str] = None, 1efghaibcd
176 auto_reply: bool = False, 1efghaibcd
177 uuid: Optional[str] = None, 1efghaibcd
178 # text_input specific parameters
179 prompt: Optional[str] = None, 1efghaibcd
180 suggestions: Optional[list[str]] = None, 1efghaibcd
181 password: bool = False, 1efghaibcd
182 ) -> Optional[str]: 1efghaibcd
183 return self._ui_base.text_input( 1eabcdj
184 workflow_uuid=self.workflow_uuid, 1eabcdj
185 sender=sender, 1eabcdj
186 recipient=recipient, 1eabcdj
187 auto_reply=auto_reply, 1eabcdj
188 uuid=uuid, 1eabcdj
189 prompt=prompt, 1eabcdj
190 suggestions=suggestions, 1eabcdj
191 password=password, 1eabcdj
192 )
194 def multiple_choice( 1efghaibcd
195 self,
196 # common parameters for all messages
197 sender: Optional[str] = None,
198 recipient: Optional[str] = None,
199 auto_reply: bool = False,
200 uuid: Optional[str] = None,
201 # multiple_choice specific parameters
202 prompt: Optional[str] = None,
203 choices: Optional[list[str]] = None,
204 default: Optional[str] = None,
205 single: bool = True,
206 ) -> Optional[str]:
207 return self._ui_base.multiple_choice( 1aj
208 workflow_uuid=self.workflow_uuid,
209 sender=sender,
210 recipient=recipient,
211 auto_reply=auto_reply,
212 uuid=uuid,
213 prompt=prompt,
214 choices=choices,
215 default=default,
216 single=single,
217 )
219 def system_message( 1efghaibcd
220 self,
221 # common parameters for all messages
222 sender: Optional[str] = None,
223 recipient: Optional[str] = None,
224 auto_reply: bool = False,
225 uuid: Optional[str] = None,
226 # system_message specific parameters
227 message: Optional[dict[str, Any]] = None,
228 ) -> Optional[str]:
229 return self._ui_base.system_message( 1a
230 workflow_uuid=self.workflow_uuid,
231 sender=sender,
232 recipient=recipient,
233 auto_reply=auto_reply,
234 uuid=uuid,
235 message=message,
236 )
238 def workflow_started( 1efghaibcd
239 self,
240 # common parameters for all messages
241 sender: Optional[str] = None,
242 recipient: Optional[str] = None,
243 auto_reply: bool = False,
244 uuid: Optional[str] = None,
245 # workflow_started specific parameters
246 name: Optional[str] = None,
247 description: Optional[str] = None,
248 params: Optional[dict[str, Any]] = None,
249 ) -> Optional[str]:
250 return self._ui_base.workflow_started( 1eabcdj
251 workflow_uuid=self.workflow_uuid,
252 sender=sender,
253 recipient=recipient,
254 auto_reply=auto_reply,
255 uuid=uuid,
256 name=name,
257 description=description,
258 params=params,
259 )
261 def workflow_completed( 1efghaibcd
262 self,
263 # common parameters for all messages
264 sender: Optional[str] = None,
265 recipient: Optional[str] = None,
266 auto_reply: bool = False,
267 uuid: Optional[str] = None,
268 # workflow_completed specific parameters
269 result: Optional[str] = None,
270 ) -> Optional[str]:
271 return self._ui_base.workflow_completed( 1eabcdj
272 workflow_uuid=self.workflow_uuid,
273 sender=sender,
274 recipient=recipient,
275 auto_reply=auto_reply,
276 uuid=uuid,
277 result=result,
278 )
280 def error( 1efghaibcd
281 self,
282 # common parameters for all messages
283 sender: Optional[str] = None,
284 recipient: Optional[str] = None,
285 auto_reply: bool = False,
286 uuid: Optional[str] = None,
287 # error specific parameters
288 short: Optional[str] = None,
289 long: Optional[str] = None,
290 ) -> Optional[str]:
291 return self._ui_base.error( 1eabcdj
292 workflow_uuid=self.workflow_uuid,
293 sender=sender,
294 recipient=recipient,
295 auto_reply=auto_reply,
296 uuid=uuid,
297 short=short,
298 long=long,
299 )
301 def keep_alive( 1efghaibcd
302 self,
303 # common parameters for all messages
304 sender: Optional[str] = None,
305 recipient: Optional[str] = None,
306 auto_reply: bool = False,
307 uuid: Optional[str] = None,
308 ) -> Optional[str]:
309 return self._ui_base.keep_alive(
310 workflow_uuid=self.workflow_uuid,
311 sender=sender,
312 recipient=recipient,
313 auto_reply=auto_reply,
314 uuid=uuid,
315 )
318@runtime_checkable 1efghaibcd
319class WSGIProtocol(Protocol): 1efghaibcd
320 def handle_wsgi( 1efghaibcd
321 self,
322 app: "Runnable", 1efghaibcd
323 environ: dict[str, Any], 1efghaibcd
324 start_response: Callable[..., Any], 1efghaibcd
325 ) -> list[bytes]: ... 1efghaibcd
328@runtime_checkable 1efghaibcd
329class ASGIProtocol(Protocol): 1efghaibcd
330 async def handle_asgi( 1efghaibcd
331 self,
332 app: "Runnable", 1efghaibcd
333 scope: dict[str, Any], 1efghaibcd
334 receive: Callable[[dict[str, Any]], Awaitable[None]], 1efghaibcd
335 send: Callable[[dict[str, Any]], Awaitable[None]], 1efghaibcd
336 ) -> None: ... 1efghaibcd
339# signature of a function decorated with @wf.register
340WorkflowTypeVar = TypeVar("WorkflowTypeVar", bound=Callable[[UI, dict[str, Any]], str]) 1efghaibcd
343Agent = TypeVar("Agent") 1efghaibcd
346@runtime_checkable 1efghaibcd
347class ProviderProtocol(Protocol): 1efghaibcd
348 def run( 1efghaibcd
349 self,
350 name: str, 1efghaibcd
351 ui: UI, 1efghaibcd
352 user_id: Optional[str] = None, 1efghaibcd
353 **kwargs: Any, 1efghaibcd
354 ) -> str: ... 1efghaibcd
356 """Run a workflow. 1bcd
358 Creates a new workflow and assigns it workflow_uuid. Then it calls the
359 workflow function (function decorated with @wf.register) with the given
360 ui and workflow_uuid.
362 Args:
363 name (str): The name of the workflow to run.
364 ui (UI): The UI object to use.
365 **kwargs: Additional parameters to pass to the workflow function.
366 """
368 @property 1efghaibcd
369 def names(self) -> list[str]: ... 1efghaibcd
371 def get_description(self, name: str) -> str: ... 1efghaibcd
374@runtime_checkable 1efghaibcd
375class WorkflowsProtocol(ProviderProtocol, Protocol): 1efghaibcd
376 def register( 1efghaibcd
377 self, name: str, description: str 1efghaibcd
378 ) -> Callable[[WorkflowTypeVar], WorkflowTypeVar]: ... 1efghaibcd
380 def register_api( 1efghaibcd
381 self,
382 api: "OpenAPI", 1efghaibcd
383 callers: Union[Agent, Iterable[Agent]], 1efghaibcd
384 executors: Union[Agent, Iterable[Agent]], 1efghaibcd
385 functions: Optional[ 1efghaibcd
386 Union[str, Iterable[Union[str, Mapping[str, Mapping[str, str]]]]] 1efghaibcd
387 ] = None, 1bcd
388 ) -> None: ... 1efghaibcd
391def check_register_decorator(func: WorkflowTypeVar) -> None: 1efghaibcd
392 # get names of all parameters in the function signature
393 sig = inspect.signature(func) 1efghaibcd
394 params = list(sig.parameters.keys()) 1efghaibcd
395 if params != ["ui", "params"]: 395 ↛ 396line 395 didn't jump to line 396 because the condition on line 395 was never true1efghaibcd
396 raise ValueError(
397 f"Expected function signature to be 'def func(ui: UI, workflow_uuid: str, params: dict[str, Any]) -> str', got {sig}"
398 )
401@runtime_checkable 1efghaibcd
402class AdapterProtocol(Protocol): 1efghaibcd
403 @classmethod 1efghaibcd
404 def create_provider(*args: Any, **kwargs: Any) -> ProviderProtocol: ... 1efghaibcd
407@runtime_checkable 1efghaibcd
408class Runnable(Protocol): 1efghaibcd
409 @contextmanager 1efghaibcd
410 def create(self, import_string: str) -> Generator[None, None, None]: ... 1efghaibcd
412 def start( 1efghaibcd
413 self,
414 *,
415 import_string: str, 1efghaibcd
416 name: Optional[str] = None, 1efghaibcd
417 params: dict[str, Any], 1efghaibcd
418 single_run: bool = False, 1efghaibcd
419 ) -> None: ... 1efghaibcd
421 @property 1efghaibcd
422 def provider(self) -> ProviderProtocol: ... 1efghaibcd
424 @property 1efghaibcd
425 def ui(self) -> UIBase: ... 1efghaibcd
427 @property 1efghaibcd
428 def title(self) -> str: ... 1efghaibcd
430 @property 1efghaibcd
431 def description(self) -> str: ... 1efghaibcd