Coverage for fastagency/ui/console/console.py: 88%
87 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-19 12:16 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-19 12:16 +0000
1import getpass 1efghaibcd
2import json 1efghaibcd
3import textwrap 1efghaibcd
4from collections.abc import Iterator 1efghaibcd
5from contextlib import contextmanager 1efghaibcd
6from dataclasses import dataclass 1efghaibcd
7from typing import TYPE_CHECKING, Any, Optional, Union 1efghaibcd
8from uuid import uuid4 1efghaibcd
10from ...base import ( 1efghaibcd
11 CreateWorkflowUIMixin,
12 Runnable,
13)
14from ...logging import get_logger 1efghaibcd
15from ...messages import ( 1efghaibcd
16 IOMessage,
17 MessageProcessorMixin,
18 MultipleChoice,
19 TextInput,
20 TextMessage,
21)
23if TYPE_CHECKING: 1efghaibcd
24 from autogen.events.agent_events import (
25 ExecuteFunctionEvent,
26 InputRequestEvent,
27 RunCompletionEvent,
28 TerminationEvent,
29 TextEvent,
30 UsingAutoReplyEvent,
31 )
33logger = get_logger(__name__) 1efghaibcd
36class ConsoleUI(MessageProcessorMixin, CreateWorkflowUIMixin): # implements UI 1efghaibcd
37 @dataclass 1efghaibcd
38 class ConsoleMessage: 1efghaibcd
39 """A console message."""
41 sender: Optional[str] 1efghaibcd
42 recipient: Optional[str] 1efghaibcd
43 heading: Optional[str] 1efghaibcd
44 body: Optional[Union[str, list[dict[str, Any]]]] 1efghaibcd
46 def __init__( 1efghaibcd
47 self,
48 super_conversation: Optional["ConsoleUI"] = None,
49 ) -> None:
50 """Initialize the console UI object.
52 Args:
53 super_conversation (Optional[UI], optional): The super conversation. Defaults to None.
54 """
55 self.super_conversation: Optional[ConsoleUI] = super_conversation 1eabcd
56 self.sub_conversations: list[ConsoleUI] = [] 1eabcd
58 @contextmanager 1efghaibcd
59 def create(self, app: Runnable, import_string: str) -> Iterator[None]: 1efghaibcd
60 yield 1eabcd
62 def start( 1efghaibcd
63 self,
64 *,
65 app: Runnable,
66 import_string: str,
67 name: Optional[str] = None,
68 params: dict[str, Any],
69 single_run: bool = False,
70 ) -> None:
71 workflow_uuid = uuid4().hex 1eabcd
72 ui = self.create_workflow_ui(workflow_uuid=workflow_uuid) 1eabcd
73 name = name or app.provider.names[0] 1eabcd
74 app.provider.run(name=name, ui=ui, **params) 1eabcd
76 @property 1efghaibcd
77 def level(self) -> int: 1efghaibcd
78 return ( 1bcd
79 0 if self.super_conversation is None else self.super_conversation.level + 1
80 )
82 def _format_message(self, console_msg: ConsoleMessage) -> str: 1efghaibcd
83 body = self._body_to_str(console_msg.body) 1eabcd
84 heading = f"[{console_msg.heading}]" if console_msg.heading else "" 1eabcd
85 title = f"{console_msg.sender} (to {console_msg.recipient}) {heading}"[:74] 1eabcd
87 s = f"""╭─ {title} {"─" * (74 - len(title))}─╮ 1eabcd
88│
89{textwrap.indent(textwrap.fill(body, replace_whitespace=False, drop_whitespace=False), "│ ", predicate=lambda line: True)}
90╰{"─" * 78}╯
91"""
92 # remove empty lines
93 s = "\n".join([line for line in s.split("\n") if line.strip()]) 1eabcd
95 # add trailing withespace and │ to each line except the first and the last
96 lines = s.split("\n") 1eabcd
97 s = ( 1eabcd
98 lines[0]
99 + "\n"
100 + "\n".join([line + " " * (79 - len(line)) + "│" for line in lines[1:-1]])
101 + "\n"
102 + lines[-1]
103 + "\n"
104 )
106 return s 1eabcd
108 def _indent(self, text: str) -> str: 1efghaibcd
109 return textwrap.indent(text, " " * 4 * self.level) 1eabcd
111 def _format_and_print(self, console_msg: ConsoleMessage) -> None: 1efghaibcd
112 msg = self._format_message(console_msg) 1eabcd
113 msg = self._indent(msg) 1eabcd
115 print(msg) # noqa: T201 `print` found 1eabcd
117 def visit_default(self, message: IOMessage) -> None: 1efghaibcd
118 if hasattr(message, "content"): 1eabcd
119 content = message.content 1a
120 console_msg = self.ConsoleMessage( 1a
121 sender=content.sender,
122 recipient=content.recipient,
123 heading=message.type,
124 body=getattr(content, "content", None),
125 )
126 self._format_and_print(console_msg) 1a
127 else:
128 content = message.model_dump()["content"] 1eabcd
129 console_msg = self.ConsoleMessage( 1eabcd
130 sender=message.sender,
131 recipient=message.recipient,
132 heading=message.type,
133 body=json.dumps(content, indent=2),
134 )
135 self._format_and_print(console_msg) 1eabcd
137 def visit_text(self, message: "TextEvent") -> None: 1efghaibcd
138 content = message.content 1a
139 console_msg = self.ConsoleMessage( 1a
140 sender=content.sender,
141 recipient=content.recipient,
142 heading=message.type,
143 body=content.content,
144 )
145 self._format_and_print(console_msg) 1a
147 def visit_using_auto_reply(self, message: "UsingAutoReplyEvent") -> None: 1efghaibcd
148 # Do nothing if it is of type UsingAutoReplyEvent
149 pass 1a
151 def visit_run_completion(self, message: "RunCompletionEvent") -> None: 1efghaibcd
152 # We can ignore the RunCompletionEvent as we handle RunResponse already
153 pass 1a
155 def visit_execute_function(self, message: "ExecuteFunctionEvent") -> None: 1efghaibcd
156 content = message.content 1a
158 body = f"\n>>>>>>>> EXECUTING FUNCTION {content.func_name}...\nCall ID: {content.call_id}\nInput arguments: {content.arguments}" 1a
159 console_msg = self.ConsoleMessage( 1a
160 sender="Workflow",
161 recipient=content.recipient,
162 heading=message.type,
163 body=body,
164 )
165 self._format_and_print(console_msg) 1a
167 def visit_termination(self, message: "TerminationEvent") -> None: 1efghaibcd
168 pass 1a
170 def visit_text_message(self, message: TextMessage) -> None: 1efghaibcd
171 console_msg = self.ConsoleMessage(
172 sender=message.sender,
173 recipient=message.recipient,
174 heading=message.type,
175 body=message.body,
176 )
177 self._format_and_print(console_msg)
179 def visit_text_input(self, message: TextInput) -> str: 1efghaibcd
180 suggestions = ( 1a
181 f" (suggestions: {', '.join(message.suggestions)})"
182 if message.suggestions
183 else ""
184 )
185 console_msg = self.ConsoleMessage( 1a
186 sender=message.sender,
187 recipient=message.recipient,
188 heading=message.type,
189 body=f"{message.prompt}{suggestions}:",
190 )
192 prompt = self._format_message(console_msg) 1a
193 prompt = self._indent(prompt) 1a
194 if message.password: 1a
195 return getpass.getpass(prompt)
196 else:
197 return input(prompt) 1a
199 def visit_input_request(self, message: "InputRequestEvent") -> str: 1efghaibcd
200 prompt = message.content.prompt 1a
201 if message.content.password: 1a
202 result = getpass.getpass(prompt if prompt != "" else "Password: ")
203 else:
204 result = input(prompt) 1a
205 message.content.respond(result) 1a
206 return result 1a
208 def visit_multiple_choice(self, message: MultipleChoice) -> str: 1efghaibcd
209 console_msg = self.ConsoleMessage( 1a
210 sender=message.sender,
211 recipient=message.recipient,
212 heading=message.type,
213 body=f"{message.prompt} (choices: {', '.join(message.choices)}, default: {message.default})",
214 )
216 prompt = self._format_message(console_msg) 1a
217 prompt = self._indent(prompt) 1a
218 while True:
219 # logger.info(f"visit_multiple_choice(): {prompt=}")
220 retval = input(prompt) 1a
221 if retval in message.choices: 221 ↛ 223line 221 didn't jump to line 223 because the condition on line 221 was always true1a
222 return retval 1a
223 elif retval == "" and message.default:
224 return message.default
225 else:
226 print(f"Invalid choice ('{retval}'). Please try again.") # noqa: T201 `print` found
228 def process_message(self, message: IOMessage) -> Optional[str]: 1efghaibcd
229 # logger.info(f"process_message(): {message=}")
230 return self.visit(message) 1eabcd
232 # def process_streaming_message(self, message: IOStreamingMessage) -> str | None:
233 # raise NotImplementedError
235 def create_subconversation(self) -> "ConsoleUI": 1efghaibcd
236 sub_conversation = ConsoleUI(self)
237 self.sub_conversations.append(sub_conversation)
239 return sub_conversation