Coverage for fastagency/messages.py: 95%
166 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-19 12:16 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-19 12:16 +0000
1import re 1dfgheiabc
2from abc import ABC, abstractmethod 1dfgheiabc
3from dataclasses import asdict, dataclass, field, fields 1dfgheiabc
4from typing import Any, Literal, Optional, Protocol, Type, Union 1dfgheiabc
5from uuid import UUID, uuid4 1dfgheiabc
7from pydantic import BaseModel 1dfgheiabc
9from .logging import get_logger 1dfgheiabc
11__all__ = [ 1dfgheiabc
12 "Error",
13 "FunctionCallExecution",
14 "IOMessage",
15 "KeepAlive",
16 "MessageProcessorMixin",
17 "MessageProcessorProtocol",
18 "MessageType",
19 "MultipleChoice",
20 "SuggestedFunctionCall",
21 "SystemMessage",
22 "TextInput",
23 "TextMessage",
24 "WorkflowCompleted",
25 "WorkflowStarted",
26]
29logger = get_logger(__name__) 1dfgheiabc
32def _camel_to_snake(name: str) -> str: 1dfgheiabc
33 name = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", name) 1deabcj
34 return re.sub("([a-z0-9])([A-Z])", r"\1_\2", name).lower() 1deabcj
37# we keep this hardcoded for mypy and type checks
38MessageType = Literal[ 1dfgheiabc
39 "text_message",
40 "suggested_function_call",
41 "function_call_execution",
42 "text_input",
43 "multiple_choice",
44 "system_message",
45 "keep_alive",
46 "workflow_started",
47 "workflow_completed",
48 "error",
49]
52@dataclass 1dfgheiabc
53class IOMessage( 1dfgheiabc
54 ABC
55): # `IOMessage` is an abstract base class, but it has no abstract methods
56 workflow_uuid: str 1dfgheiabc
57 sender: Optional[str] = None 1dfgheiabc
58 recipient: Optional[str] = None 1dfgheiabc
59 auto_reply: bool = False 1dfgheiabc
60 uuid: str = field(default_factory=lambda: str(uuid4().hex)) 1dfgheiabcj
62 @property 1dfgheiabc
63 def type(self) -> MessageType: 1dfgheiabc
64 retval: MessageType = _camel_to_snake(self.__class__.__name__) # type: ignore[assignment] 1deabcj
65 return retval 1deabcj
67 @staticmethod 1dfgheiabc
68 def _get_message_class(type: Optional[MessageType]) -> "Type[IOMessage]": 1dfgheiabc
69 type = type or "text_message" 1abcj
70 lookup: dict[MessageType, "Type[IOMessage]"] = { 1abcj
71 "text_message": TextMessage,
72 "suggested_function_call": SuggestedFunctionCall,
73 "function_call_execution": FunctionCallExecution,
74 "text_input": TextInput,
75 "multiple_choice": MultipleChoice,
76 "keep_alive": KeepAlive,
77 "system_message": SystemMessage,
78 "workflow_started": WorkflowStarted,
79 "workflow_completed": WorkflowCompleted,
80 "error": Error,
81 }
82 return lookup[type] 1abcj
84 @staticmethod 1dfgheiabc
85 def create(type: Optional[MessageType] = None, **kwargs: Any) -> "IOMessage": 1dfgheiabc
86 cls = IOMessage._get_message_class(type) 1abcj
88 content = kwargs.pop("content", {}) 1abcj
89 kwargs.update(content) 1abcj
91 return cls(**kwargs) 1abcj
93 @staticmethod 1dfgheiabc
94 def _get_parameters_names() -> list[str]: 1dfgheiabc
95 return [field.name for field in fields(IOMessage)] 1deabcj
97 def model_dump(self) -> dict[str, Any]: 1dfgheiabc
98 params_names = IOMessage._get_parameters_names() 1deabcj
99 d = asdict(self) 1deabcj
100 content = {k: v for k, v in d.items() if k not in params_names} 1deabcj
101 retval = {k: v for k, v in d.items() if k in params_names} 1deabcj
102 retval["content"] = content 1deabcj
103 retval["type"] = self.type 1deabcj
104 return retval 1deabcj
107# message type that asks user something
110@dataclass 1dfgheiabc
111class AskingMessage(IOMessage): ... 1dfgheiabc
114# type of output messages
115@dataclass 1dfgheiabc
116class TextMessage(IOMessage): 1dfgheiabc
117 body: Optional[str] = None 1dfgheiabc
119 def __post_init__(self) -> None: 1dfgheiabc
120 """Set the default value for the `type` attribute."""
121 if self.type is None: 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true1abcj
122 self.type = "text_message"
125@dataclass 1dfgheiabc
126class SuggestedFunctionCall(IOMessage): 1dfgheiabc
127 function_name: Optional[str] = None 1dfgheiabc
128 call_id: Optional[str] = None 1dfgheiabc
129 arguments: dict[str, Any] = field(default_factory=dict) 1dfgheiabc
132@dataclass 1dfgheiabc
133class FunctionCallExecution(IOMessage): 1dfgheiabc
134 function_name: Optional[str] = None 1dfgheiabc
135 call_id: Optional[str] = None 1dfgheiabc
136 retval: Any = None 1dfgheiabc
139# types of input messages
140@dataclass 1dfgheiabc
141class TextInput(AskingMessage): 1dfgheiabc
142 prompt: Optional[str] = None 1dfgheiabc
143 suggestions: list[str] = field(default_factory=list) 1dfgheiabc
144 password: bool = False 1dfgheiabc
147@dataclass 1dfgheiabc
148class MultipleChoice(AskingMessage): 1dfgheiabc
149 prompt: Optional[str] = None 1dfgheiabc
150 choices: list[str] = field(default_factory=list) 1dfgheiabc
151 default: Optional[str] = None 1dfgheiabc
152 single: bool = True 1dfgheiabc
153 # todo: add validation
156@dataclass 1dfgheiabc
157class SystemMessage(IOMessage): 1dfgheiabc
158 message: dict[str, Any] = field(default_factory=dict) 1dfgheiabc
161@dataclass 1dfgheiabc
162class WorkflowStarted(IOMessage): 1dfgheiabc
163 name: Optional[str] = None 1dfgheiabc
164 description: Optional[str] = None 1dfgheiabc
165 params: dict[str, Any] = field(default_factory=dict) 1dfgheiabc
168@dataclass 1dfgheiabc
169class WorkflowCompleted(IOMessage): 1dfgheiabc
170 result: Optional[str] = None 1dfgheiabc
173@dataclass 1dfgheiabc
174class Error(IOMessage): 1dfgheiabc
175 short: Optional[str] = None 1dfgheiabc
176 long: Optional[str] = None 1dfgheiabc
179@dataclass 1dfgheiabc
180class KeepAlive(IOMessage): ... 1dfgheiabc
183class MessageProcessorProtocol(Protocol): 1dfgheiabc
184 def process_message(self, message: IOMessage) -> Optional[str]: ... 1dfgheiabc
186 def text_message( 1dfgheiabc
187 self,
188 # common parameters for all messages
189 workflow_uuid: str, 1dfgheiabc
190 sender: Optional[str] = None, 1dfgheiabc
191 recipient: Optional[str] = None, 1dfgheiabc
192 auto_reply: bool = False, 1dfgheiabc
193 uuid: Optional[str] = None, 1dfgheiabc
194 # text_message specific parameters
195 body: Optional[str] = None, 1dfgheiabc
196 ) -> Optional[str]: ... 1dfgheiabc
198 def suggested_function_call( 1dfgheiabc
199 self,
200 # common parameters for all messages
201 workflow_uuid: str, 1dfgheiabc
202 sender: Optional[str] = None, 1dfgheiabc
203 recipient: Optional[str] = None, 1dfgheiabc
204 auto_reply: bool = False, 1dfgheiabc
205 uuid: Optional[str] = None, 1dfgheiabc
206 # suggested_function_call specific parameters
207 function_name: Optional[str] = None, 1dfgheiabc
208 call_id: Optional[str] = None, 1dfgheiabc
209 arguments: Optional[dict[str, Any]] = None, 1dfgheiabc
210 ) -> Optional[str]: ... 1dfgheiabc
212 def function_call_execution( 1dfgheiabc
213 self,
214 # common parameters for all messages
215 workflow_uuid: str, 1dfgheiabc
216 sender: Optional[str] = None, 1dfgheiabc
217 recipient: Optional[str] = None, 1dfgheiabc
218 auto_reply: bool = False, 1dfgheiabc
219 uuid: Optional[str] = None, 1dfgheiabc
220 # function_call_execution specific parameters
221 function_name: Optional[str] = None, 1dfgheiabc
222 call_id: Optional[str] = None, 1dfgheiabc
223 retval: Any = None, 1dfgheiabc
224 ) -> Optional[str]: ... 1dfgheiabc
226 def text_input( 1dfgheiabc
227 self,
228 # common parameters for all messages
229 workflow_uuid: str, 1dfgheiabc
230 sender: Optional[str] = None, 1dfgheiabc
231 recipient: Optional[str] = None, 1dfgheiabc
232 auto_reply: bool = False, 1dfgheiabc
233 uuid: Optional[str] = None, 1dfgheiabc
234 # text_input specific parameters
235 prompt: Optional[str] = None, 1dfgheiabc
236 suggestions: Optional[list[str]] = None, 1dfgheiabc
237 password: bool = False, 1dfgheiabc
238 ) -> Optional[str]: ... 1dfgheiabc
240 def multiple_choice( 1dfgheiabc
241 self,
242 # common parameters for all messages
243 workflow_uuid: str, 1dfgheiabc
244 sender: Optional[str] = None, 1dfgheiabc
245 recipient: Optional[str] = None, 1dfgheiabc
246 auto_reply: bool = False, 1dfgheiabc
247 uuid: Optional[str] = None, 1dfgheiabc
248 # multiple_choice specific parameters
249 prompt: Optional[str] = None, 1dfgheiabc
250 choices: Optional[list[str]] = None, 1dfgheiabc
251 default: Optional[str] = None, 1dfgheiabc
252 single: bool = True, 1dfgheiabc
253 ) -> Optional[str]: ... 1dfgheiabc
255 def system_message( 1dfgheiabc
256 self,
257 # common parameters for all messages
258 workflow_uuid: str, 1dfgheiabc
259 sender: Optional[str] = None, 1dfgheiabc
260 recipient: Optional[str] = None, 1dfgheiabc
261 auto_reply: bool = False, 1dfgheiabc
262 uuid: Optional[str] = None, 1dfgheiabc
263 # system_message specific parameters
264 message: Optional[dict[str, Any]] = None, 1dfgheiabc
265 ) -> Optional[str]: ... 1dfgheiabc
267 def workflow_started( 1dfgheiabc
268 self,
269 # common parameters for all messages
270 workflow_uuid: str, 1dfgheiabc
271 sender: Optional[str] = None, 1dfgheiabc
272 recipient: Optional[str] = None, 1dfgheiabc
273 auto_reply: bool = False, 1dfgheiabc
274 uuid: Optional[str] = None, 1dfgheiabc
275 # workflow_started specific parameters
276 name: Optional[str] = None, 1dfgheiabc
277 description: Optional[str] = None, 1dfgheiabc
278 params: Optional[dict[str, Any]] = None, 1dfgheiabc
279 ) -> Optional[str]: ... 1dfgheiabc
281 def workflow_completed( 1dfgheiabc
282 self,
283 # common parameters for all messages
284 workflow_uuid: str, 1dfgheiabc
285 sender: Optional[str] = None, 1dfgheiabc
286 recipient: Optional[str] = None, 1dfgheiabc
287 auto_reply: bool = False, 1dfgheiabc
288 uuid: Optional[str] = None, 1dfgheiabc
289 # workflow_completed specific parameters
290 result: Optional[str] = None, 1dfgheiabc
291 ) -> Optional[str]: ... 1dfgheiabc
293 def error( 1dfgheiabc
294 self,
295 # common parameters for all messages
296 workflow_uuid: str, 1dfgheiabc
297 sender: Optional[str] = None, 1dfgheiabc
298 recipient: Optional[str] = None, 1dfgheiabc
299 auto_reply: bool = False, 1dfgheiabc
300 uuid: Optional[str] = None, 1dfgheiabc
301 # error specific parameters
302 short: Optional[str] = None, 1dfgheiabc
303 long: Optional[str] = None, 1dfgheiabc
304 ) -> Optional[str]: ... 1dfgheiabc
306 def keep_alive( 1dfgheiabc
307 self,
308 # common parameters for all messages
309 workflow_uuid: str, 1dfgheiabc
310 sender: Optional[str] = None, 1dfgheiabc
311 recipient: Optional[str] = None, 1dfgheiabc
312 auto_reply: bool = False, 1dfgheiabc
313 uuid: Optional[str] = None, 1dfgheiabc
314 ) -> Optional[str]: ... 1dfgheiabc
317class MessageProcessorMixin(ABC): 1dfgheiabc
318 def visit(self, message: IOMessage) -> Optional[str]: 1dfgheiabc
319 method_name = f"visit_{message.type}" 1deabcj
320 method = getattr(self, method_name, self.visit_default) 1deabcj
321 return method(message) 1deabcj
323 @abstractmethod 1dfgheiabc
324 def visit_default(self, message: IOMessage) -> Optional[str]: ... 1dfgheiabc
326 def visit_text_message(self, message: TextMessage) -> Optional[str]: 1dfgheiabc
327 return self.visit_default(message)
329 def visit_suggested_function_call( 1dfgheiabc
330 self, message: SuggestedFunctionCall
331 ) -> Optional[str]:
332 return self.visit_default(message) 1j
334 def visit_function_call_execution( 1dfgheiabc
335 self, message: FunctionCallExecution
336 ) -> Optional[str]:
337 return self.visit_default(message) 1j
339 def visit_text_input(self, message: TextInput) -> Optional[str]: 1dfgheiabc
340 return self.visit_default(message) 1dabc
342 def visit_multiple_choice(self, message: MultipleChoice) -> Optional[str]: 1dfgheiabc
343 return self.visit_default(message)
345 def visit_system_message(self, message: SystemMessage) -> Optional[str]: 1dfgheiabc
346 return self.visit_default(message) 1e
348 def visit_keep_alive(self, message: KeepAlive) -> Optional[str]: 1dfgheiabc
349 return self.visit_default(message)
351 def visit_workflow_started(self, message: WorkflowStarted) -> Optional[str]: 1dfgheiabc
352 return self.visit_default(message) 1deabcj
354 def visit_workflow_completed(self, message: WorkflowCompleted) -> Optional[str]: 1dfgheiabc
355 return self.visit_default(message) 1deabcj
357 def visit_error(self, message: Error) -> Optional[str]: 1dfgheiabc
358 return self.visit_default(message) 1deabcj
360 def process_message(self, message: IOMessage) -> Optional[str]: 1dfgheiabc
361 try: 1dabc
362 return self.visit(message) 1dabc
363 except Exception as e: 1dabc
364 # log the error and return None
365 logger.error(f"Error processing message ({message}): {e}", exc_info=True) 1dabc
366 return None 1dabc
368 @staticmethod 1dfgheiabc
369 def _body_to_str(body: Optional[Union[str, list[dict[str, Any]]]]) -> str: 1dfgheiabc
370 if body is None: 1deabcj
371 return "" 1deabc
372 if isinstance(body, str): 1deabcj
373 return body 1deabcj
374 elif not isinstance(body, list): 374 ↛ 375line 374 didn't jump to line 375 because the condition on line 374 was never true1deabc
375 raise TypeError(
376 f"Unsupported body type: {type(body)}. Expected str or list[dict]."
377 )
378 final_msg = "" 1deabc
379 for msg in body: 1deabc
380 content_type = msg.get("type") 1deabc
381 if content_type == "text": 1deabc
382 final_msg += msg.get("text", "") 1deabc
383 else:
384 final_msg += f"Inserting '{content_type}' content" 1dabc
385 return final_msg 1deabc
387 def text_message( 1dfgheiabc
388 self,
389 workflow_uuid: str,
390 sender: Optional[str] = None,
391 recipient: Optional[str] = None,
392 auto_reply: bool = False,
393 uuid: Optional[str] = None,
394 body: Optional[str] = None,
395 ) -> Optional[str]:
396 uuid = uuid or str(uuid4().hex) 1j
397 return self.process_message( 1j
398 TextMessage(
399 sender=sender,
400 recipient=recipient,
401 auto_reply=auto_reply,
402 uuid=uuid,
403 workflow_uuid=workflow_uuid,
404 body=body,
405 )
406 )
408 def suggested_function_call( 1dfgheiabc
409 self,
410 workflow_uuid: str,
411 sender: Optional[str] = None,
412 recipient: Optional[str] = None,
413 auto_reply: bool = False,
414 uuid: Optional[str] = None,
415 function_name: Optional[str] = None,
416 call_id: Optional[str] = None,
417 arguments: Optional[dict[str, Any]] = None,
418 ) -> Optional[str]:
419 uuid = uuid or str(uuid4().hex) 1j
420 arguments = arguments or {} 1j
421 return self.process_message( 1j
422 SuggestedFunctionCall(
423 sender=sender,
424 recipient=recipient,
425 auto_reply=auto_reply,
426 uuid=uuid,
427 workflow_uuid=workflow_uuid,
428 function_name=function_name,
429 call_id=call_id,
430 arguments=arguments,
431 )
432 )
434 def function_call_execution( 1dfgheiabc
435 self,
436 workflow_uuid: str,
437 sender: Optional[str] = None,
438 recipient: Optional[str] = None,
439 auto_reply: bool = False,
440 uuid: Optional[str] = None,
441 function_name: Optional[str] = None,
442 call_id: Optional[str] = None,
443 retval: Any = None,
444 ) -> Optional[str]:
445 uuid = uuid or str(uuid4().hex) 1j
446 return self.process_message( 1j
447 FunctionCallExecution(
448 sender=sender,
449 recipient=recipient,
450 auto_reply=auto_reply,
451 uuid=uuid,
452 workflow_uuid=workflow_uuid,
453 function_name=function_name,
454 call_id=call_id,
455 retval=retval,
456 )
457 )
459 def text_input( 1dfgheiabc
460 self,
461 workflow_uuid: str, 1dfgheiabc
462 sender: Optional[str] = None, 1dfgheiabc
463 recipient: Optional[str] = None, 1dfgheiabc
464 auto_reply: bool = False, 1dfgheiabc
465 uuid: Optional[str] = None, 1dfgheiabc
466 prompt: Optional[str] = None, 1dfgheiabc
467 suggestions: Optional[list[str]] = None, 1dfgheiabc
468 password: bool = False, 1dfgheiabc
469 ) -> Optional[str]: 1dfgheiabc
470 uuid = uuid or str(uuid4().hex) 1deabcj
471 suggestions = suggestions or [] 1deabcj
472 return self.process_message( 1deabcj
473 TextInput( 1deabcj
474 sender=sender, 1deabcj
475 recipient=recipient, 1deabcj
476 auto_reply=auto_reply, 1deabcj
477 uuid=uuid, 1deabcj
478 workflow_uuid=workflow_uuid, 1deabcj
479 prompt=prompt, 1deabcj
480 suggestions=suggestions, 1deabcj
481 password=password, 1deabcj
482 )
483 )
485 def multiple_choice( 1dfgheiabc
486 self,
487 workflow_uuid: str,
488 sender: Optional[str] = None,
489 recipient: Optional[str] = None,
490 auto_reply: bool = False,
491 uuid: Optional[str] = None,
492 prompt: Optional[str] = None,
493 choices: Optional[list[str]] = None,
494 default: Optional[str] = None,
495 single: bool = True,
496 ) -> Optional[str]:
497 uuid = uuid or str(uuid4().hex) 1ej
498 choices = choices or [] 1ej
499 return self.process_message( 1ej
500 MultipleChoice(
501 sender=sender,
502 recipient=recipient,
503 auto_reply=auto_reply,
504 uuid=uuid,
505 workflow_uuid=workflow_uuid,
506 prompt=prompt,
507 choices=choices,
508 default=default,
509 single=single,
510 )
511 )
513 def system_message( 1dfgheiabc
514 self,
515 workflow_uuid: str,
516 sender: Optional[str] = None,
517 recipient: Optional[str] = None,
518 auto_reply: bool = False,
519 uuid: Optional[str] = None,
520 message: Optional[dict[str, Any]] = None,
521 ) -> Optional[str]:
522 uuid = uuid or str(uuid4().hex) 1e
523 message = message or {} 1e
524 return self.process_message( 1e
525 SystemMessage(
526 sender=sender,
527 recipient=recipient,
528 auto_reply=auto_reply,
529 uuid=uuid,
530 workflow_uuid=workflow_uuid,
531 message=message,
532 )
533 )
535 def workflow_started( 1dfgheiabc
536 self,
537 workflow_uuid: str,
538 sender: Optional[str] = None,
539 recipient: Optional[str] = None,
540 auto_reply: bool = False,
541 uuid: Optional[str] = None,
542 name: Optional[str] = None,
543 description: Optional[str] = None,
544 params: Optional[dict[str, Any]] = None,
545 ) -> Optional[str]:
546 uuid = uuid or str(uuid4().hex) 1deabcj
547 params = params or {} 1deabcj
548 return self.process_message( 1deabcj
549 WorkflowStarted(
550 sender=sender,
551 recipient=recipient,
552 auto_reply=auto_reply,
553 uuid=uuid,
554 workflow_uuid=workflow_uuid,
555 name=name,
556 description=description,
557 params=params,
558 )
559 )
561 def workflow_completed( 1dfgheiabc
562 self,
563 workflow_uuid: str,
564 sender: Optional[str] = None,
565 recipient: Optional[str] = None,
566 auto_reply: bool = False,
567 uuid: Optional[str] = None,
568 result: Optional[str] = None,
569 ) -> Optional[str]:
570 uuid = uuid or str(uuid4().hex) 1deabcj
571 return self.process_message( 1deabcj
572 WorkflowCompleted(
573 sender=sender,
574 recipient=recipient,
575 auto_reply=auto_reply,
576 uuid=uuid,
577 workflow_uuid=workflow_uuid,
578 result=result,
579 )
580 )
582 def error( 1dfgheiabc
583 self,
584 workflow_uuid: str,
585 sender: Optional[str] = None,
586 recipient: Optional[str] = None,
587 auto_reply: bool = False,
588 uuid: Optional[str] = None,
589 short: Optional[str] = None,
590 long: Optional[str] = None,
591 ) -> Optional[str]:
592 uuid = uuid or str(uuid4().hex) 1deabcj
593 return self.process_message( 1deabcj
594 Error(
595 sender=sender,
596 recipient=recipient,
597 auto_reply=auto_reply,
598 uuid=uuid,
599 workflow_uuid=workflow_uuid,
600 short=short,
601 long=long,
602 )
603 )
605 def keep_alive( 1dfgheiabc
606 self,
607 workflow_uuid: str,
608 sender: Optional[str] = None,
609 recipient: Optional[str] = None,
610 auto_reply: bool = False,
611 uuid: Optional[str] = None,
612 ) -> Optional[str]:
613 uuid = uuid or str(uuid4().hex)
614 return self.process_message(
615 KeepAlive(
616 sender=sender,
617 recipient=recipient,
618 auto_reply=auto_reply,
619 uuid=uuid,
620 workflow_uuid=workflow_uuid,
621 )
622 )
625class InputResponseModel(BaseModel): 1dfgheiabc
626 msg: str 1dfgheiabc
627 question_uuid: Optional[UUID] = None 1dfgheiabc
628 error: bool = False 1dfgheiabc
631class InitiateWorkflowModel(BaseModel): 1dfgheiabc
632 user_id: Optional[str] = None 1dfgheiabc
633 workflow_uuid: UUID 1dfgheiabc
634 name: str 1dfgheiabc
635 params: dict[str, Any] 1dfgheiabc