Coverage for fastagency/ui/mesop/message.py: 80%
196 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-19 12:16 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-19 12:16 +0000
1import json 1bcd
2import random 1bcd
3from collections.abc import Iterable, Iterator 1bcd
4from typing import TYPE_CHECKING, Callable, Optional 1bcd
5from uuid import UUID, uuid4 1bcd
7import mesop as me 1bcd
8import mesop.labs as mel 1bcd
10from fastagency.helpers import jsonify_string 1bcd
12from ...logging import get_logger 1bcd
13from ...messages import ( 1bcd
14 AskingMessage,
15 Error,
16 FunctionCallExecution,
17 IOMessage,
18 KeepAlive,
19 MessageProcessorMixin,
20 MultipleChoice,
21 SuggestedFunctionCall,
22 SystemMessage,
23 TextInput,
24 TextMessage,
25 WorkflowCompleted,
26)
27from ...runtimes.ag2.ag2 import create_ag2_event 1bcd
28from .components.inputs import input_text 1bcd
29from .data_model import Conversation, ConversationMessage, State 1bcd
30from .mesop import MesopMessage 1bcd
31from .send_prompt import get_more_messages, send_user_feedback_to_autogen 1bcd
32from .styles import MesopHomePageStyles, MesopMessageStyles 1bcd
33from .timer import wakeup_component 1bcd
35if TYPE_CHECKING: 1bcd
36 from autogen.events.agent_events import (
37 RunCompletionEvent,
38 TextEvent,
39 UsingAutoReplyEvent,
40 )
42logger = get_logger(__name__) 1bcd
45class UUIDEncoder(json.JSONEncoder): 1bcd
46 def default(self, obj: object) -> object: 1bcd
47 if isinstance(obj, UUID):
48 # if the obj is uuid, we simply return the value of uuid
49 return obj.hex
50 return json.JSONEncoder.default(self, obj)
53def consume_responses(responses: Iterable[MesopMessage]) -> Iterator[None]: 1bcd
54 for message in responses: 1a
55 state = me.state(State) 1a
56 handle_message(state, message) 1a
57 yield 1a
58 if not isinstance(message.io_message, KeepAlive): 58 ↛ 60line 58 didn't jump to line 60 because the condition on line 58 was always true1a
59 me.scroll_into_view(key="end_of_messages") 1a
60 yield 1a
61 yield 1a
64def handle_message(state: State, message: MesopMessage) -> None: 1bcd
65 conversation = state.conversation 1a
66 messages = conversation.messages 1a
67 level = message.conversation.level 1a
68 conversation_id = message.conversation.id 1a
69 io_message = message.io_message 1a
70 message_dict = io_message.model_dump() 1a
71 message_json = json.dumps(message_dict, cls=UUIDEncoder) 1a
72 conversation_message = ConversationMessage( 1a
73 level=level,
74 conversation_id=conversation_id,
75 io_message_json=message_json,
76 feedback=[],
77 )
78 messages.append(conversation_message) 1a
79 conversation.messages = list(messages) 1a
80 if isinstance(io_message, AskingMessage): 1a
81 conversation.waiting_for_feedback = True 1a
82 conversation.completed = False 1a
83 if isinstance(io_message, WorkflowCompleted): 1a
84 conversation.completed = True 1a
85 conversation.waiting_for_feedback = False 1a
86 if not conversation.is_from_the_past: 86 ↛ exitline 86 didn't return from function 'handle_message' because the condition on line 86 was always true1a
87 uuid: str = uuid4().hex 1a
88 becomme_past = Conversation( 1a
89 id=uuid,
90 title=find_suitable_title(conversation),
91 messages=conversation.messages,
92 completed=True,
93 is_from_the_past=True,
94 waiting_for_feedback=False,
95 )
96 state.past_conversations.insert(0, becomme_past) 1a
99def find_suitable_title(conversation: Conversation) -> str: 1bcd
100 messages = conversation.messages 1a
101 first_input = next(filter(lambda m: m.feedback_completed, messages), None) 1a
102 if first_input and first_input.feedback: 1a
103 return first_input.feedback[0] 1a
105 state = me.state(State) 1a
106 return f"The conversation that shall remain unnamed ({random.randint(0, 100)}-{len(state.past_conversations)})" # nosec: B311 1a
109def message_box( 1bcd
110 message: ConversationMessage, read_only: bool, *, styles: MesopHomePageStyles
111) -> None:
112 io_message_dict = json.loads(message.io_message_json) 1bcda
113 level = message.level 1bcda
114 conversation_id = message.conversation_id 1bcda
115 try: 1bcda
116 io_message = IOMessage.create(**io_message_dict) 1bcda
117 except Exception:
118 io_message = create_ag2_event(
119 **io_message_dict,
120 )
122 visitor = MesopGUIMessageVisitor(level, conversation_id, message, styles, read_only) 1bcda
123 visitor.process_message(io_message) 1bcda
126class MesopGUIMessageVisitor(MessageProcessorMixin): 1bcd
127 def __init__( 1bcd
128 self,
129 level: int,
130 conversation_id: str,
131 conversation_message: ConversationMessage,
132 styles: MesopHomePageStyles,
133 read_only: bool = False,
134 ) -> None:
135 """Initialize the MesopGUIMessageVisitor object.
137 Args:
138 level (int): The level of the message.
139 conversation_id (str): The ID of the conversation.
140 conversation_message (ConversationMessage): Conversation message that wraps the visited io_message
141 styles (MesopHomePageStyles): Styles for the message
142 read_only (bool): Input messages are disabled in read only mode
143 """
144 self._level = level 1bcda
145 self._conversation_id = conversation_id 1bcda
146 self._readonly = read_only 1bcda
147 self._conversation_message = conversation_message 1bcda
148 self._styles = styles 1bcda
150 def _has_feedback(self) -> bool: 1bcd
151 return len(self._conversation_message.feedback) > 0 1a
153 def _is_completed(self) -> bool: 1bcd
154 return self._conversation_message.feedback_completed 1a
156 def _provide_feedback(self, feedback: str) -> Iterator[None]: 1bcd
157 logger.debug(f"MesopGUIMessageVisitor._provide_feedback({feedback=})") 1a
158 state = me.state(State) 1a
159 conversation = state.conversation 1a
160 conversation.feedback = "" 1a
161 conversation.waiting_for_feedback = False 1a
162 yield 1a
163 me.scroll_into_view(key="end_of_messages") 1a
164 yield 1a
165 responses = send_user_feedback_to_autogen(feedback) 1a
166 yield from consume_responses(responses) 1a
168 def _render_content(self, content: str, msg_md_style: me.Style) -> None: 1bcd
169 content = jsonify_string(content) 1bcda
170 me.markdown(content, style=msg_md_style) 1bcda
172 def visit_default( 1bcd
173 self,
174 message: IOMessage, 1bcd
175 *,
176 content: Optional[str] = None, 1bcd
177 style: Optional[MesopMessageStyles] = None, 1bcd
178 error: Optional[bool] = False, 1bcd
179 inner_callback: Optional[Callable[..., None]] = None, 1bcd
180 scrollable: Optional[bool] = False, 1bcd
181 ) -> None: 1bcd
182 # logger.info(f"visit_default: {message=}")
183 style = style or self._styles.message.default 1bcda
184 title = message.type.replace("_", " ").capitalize() 1bcda
185 title = "[Error] " + title if error else title 1bcda
186 with me.box(style=style.box or self._styles.message.default.box): 1bcda
187 self._header( 1bcda
188 message, 1bcda
189 title=title, 1bcda
190 box_style=style.header_box, 1bcda
191 md_style=style.header_md, 1bcda
192 )
194 if isinstance(message, IOMessage): 1bcda
195 content = content or json.dumps( 1bcda
196 message.model_dump()["content"], cls=UUIDEncoder 1bcda
197 )
198 else:
199 content = (
200 message.content.content
201 if hasattr(message, "content")
202 and hasattr(message.content, "content")
203 else ""
204 )
205 content = self._body_to_str(content) 1bcda
207 self._render_content( 1bcda
208 content, 1bcda
209 msg_md_style=style.scrollable_md 1bcda
210 or self._styles.message.default.scrollable_md
211 if scrollable 1bcda
212 else style.md or self._styles.message.default.md, 1bcda
213 )
215 if inner_callback: 1bcda
216 inner_callback() 1bcda
218 def visit_text(self, message: "TextEvent") -> None: 1bcd
219 content = message.content.content
220 self.visit_default(
221 message,
222 content=content,
223 style=self._styles.message.text,
224 )
226 def visit_using_auto_repy(self, message: "UsingAutoReplyEvent") -> None: 1bcd
227 content = None
228 self.visit_default(
229 message,
230 content=content,
231 style=self._styles.message.system,
232 )
234 def visit_run_completion(self, message: "RunCompletionEvent") -> None: 1bcd
235 # We can ignore the RunCompletionEvent as we handle RunResponse already
236 pass
238 def visit_text_message(self, message: TextMessage) -> None: 1bcd
239 content = message.body if message.body else "" 1bcda
240 content = content if content.strip() != "" else "*(empty message)*" 1bcda
241 self.visit_default( 1bcda
242 message,
243 content=content,
244 style=self._styles.message.text,
245 )
247 def visit_error(self, message: Error) -> None: 1bcd
248 self.visit_default( 1a
249 message,
250 content=f"### {message.short}\n{message.long}",
251 style=self._styles.message.error,
252 scrollable=True,
253 )
255 def visit_system_message(self, message: SystemMessage) -> None: 1bcd
256 content = ( 1bcd
257 f"""#### **{message.message["heading"]}**
259{message.message["body"]}
260"""
261 if "heading" in message.message and "body" in message.message
262 else json.dumps(message.message)
263 )
264 self.visit_default( 1bcd
265 message,
266 content=content,
267 style=self._styles.message.system,
268 scrollable=True,
269 )
271 def visit_keep_alive(self, message: KeepAlive) -> None: 1bcd
272 def on_wakeup(e: mel.WebEvent) -> Iterator[None]:
273 logger.debug("waking up, after the keep alive")
274 self._conversation_message.feedback_completed = True
275 yield from consume_responses(get_more_messages())
277 with me.box():
278 if not (self._readonly or self._conversation_message.feedback_completed):
279 wakeup_component(on_wakeup=on_wakeup)
281 def visit_suggested_function_call(self, message: SuggestedFunctionCall) -> None: 1bcd
282 content = f"""**function_name**: `{message.function_name}`<br> 1bcda
283**call_id**: `{message.call_id}`<br>
284**arguments**: {json.dumps(message.arguments)}"""
285 self.visit_default( 1bcda
286 message,
287 content=content,
288 style=self._styles.message.suggested_function_call,
289 scrollable=True,
290 )
292 def visit_function_call_execution(self, message: FunctionCallExecution) -> None: 1bcd
293 content = f"""**function_name**: `{message.function_name}`<br> 1bcda
294**call_id**: `{message.call_id}`<br>
295**retval**: {message.retval}"""
296 return self.visit_default( 1bcda
297 message,
298 content=content,
299 style=self._styles.message.function_call_execution,
300 scrollable=True,
301 )
303 def visit_text_input(self, message: TextInput) -> str: 1bcd
304 def on_input(feedback: str) -> Iterator[None]: 1bcda
305 self._conversation_message.feedback = [feedback] 1a
306 self._conversation_message.feedback_completed = True 1a
307 yield from self._provide_feedback(feedback) 1a
309 def value_if_completed() -> Optional[str]: 1bcda
310 message = self._conversation_message 1bcda
311 return message.feedback[0] if message.feedback_completed else None 1bcda
313 # base_color = "#dff"
314 prompt = message.prompt if message.prompt else "Please enter a value" 1bcda
315 if message.suggestions: 1bcda
316 suggestions = ",".join(suggestion for suggestion in message.suggestions) 1bcd
317 prompt += "\n Suggestions: " + suggestions 1bcd
319 self.visit_default( 1bcda
320 message,
321 content=prompt,
322 style=self._styles.message.text_input,
323 inner_callback=lambda: input_text(
324 on_input,
325 key="prompt",
326 disabled=self._readonly or self._has_feedback(),
327 value=value_if_completed(),
328 style=self._styles.message.text_input_inner,
329 ),
330 )
331 return "" 1bcda
333 def visit_multiple_choice(self, message: MultipleChoice) -> str: 1bcd
334 if message.single: 1bcda
335 return self._visit_single_choice(message) 1bcda
336 else:
337 return self._visit_many_choices(message) 1bcda
339 def _visit_single_choice(self, message: MultipleChoice) -> str: 1bcd
340 def on_click(ev: me.ClickEvent) -> Iterator[None]: 1bcda
341 self._conversation_message.feedback_completed = True 1a
342 self._conversation_message.feedback = [ev.key] 1a
343 yield from self._provide_feedback(ev.key) 1a
345 prompt = message.prompt if message.prompt else "Please enter a value" 1bcda
347 def inner_callback() -> None: 1bcda
348 with me.box( 1bcda
349 style=self._styles.message.single_choice_inner.box,
350 ):
351 for choice in message.choices: 351 ↛ exitline 351 didn't return from function 'inner_callback' because the loop on line 351 didn't complete1bcda
352 disabled = self._readonly or self._is_completed() 1bcda
353 selected = choice in self._conversation_message.feedback 1bcda
354 if selected: 1bcda
355 style = self._styles.message.single_choice_inner.selected_button 1a
356 elif disabled: 1bcda
357 style = self._styles.message.single_choice_inner.disabled_button 1bcda
358 else:
359 style = self._styles.message.single_choice_inner.button 1a
360 me.button( 1bcda
361 label=choice,
362 on_click=on_click,
363 color="primary",
364 type="flat",
365 key=choice,
366 disabled=disabled,
367 style=style,
368 )
370 self.visit_default( 1bcda
371 message,
372 content=prompt,
373 style=self._styles.message.text_input,
374 inner_callback=inner_callback,
375 )
376 return "" 1bcda
378 def _visit_many_choices(self, message: MultipleChoice) -> str: 1bcd
379 def on_change(ev: me.CheckboxChangeEvent) -> None: 1bcda
380 message_feedback = self._conversation_message.feedback 1a
381 choice = ev.key 1a
382 yes_no = ev.checked 1a
383 if yes_no: 383 ↛ 387line 383 didn't jump to line 387 because the condition on line 383 was always true1a
384 if choice not in message_feedback: 384 ↛ exitline 384 didn't return from function 'on_change' because the condition on line 384 was always true1a
385 message_feedback.append(choice) 1a
386 else:
387 if choice in message_feedback:
388 message_feedback.remove(choice)
390 def on_click(ev: me.ClickEvent) -> Iterator[None]: 1bcda
391 message_feedback = self._conversation_message.feedback 1a
392 feedback = ",".join(message_feedback) 1a
393 self._conversation_message.feedback_completed = True 1a
394 yield from self._provide_feedback(feedback) 1a
396 def should_be_checked(option: str) -> bool: 1bcda
397 conversation_message = self._conversation_message 1bcda
398 return option in conversation_message.feedback 1bcda
400 prompt = message.prompt if message.prompt else "Please select a value:" 1bcda
402 def inner_callback() -> None: 1bcda
403 if message.choices: 403 ↛ exitline 403 didn't return from function 'inner_callback' because the condition on line 403 was always true1bcda
404 with me.box( 1bcda
405 style=self._styles.message.multiple_choice_inner.box,
406 ):
407 for option in message.choices: 1bcda
408 me.checkbox( 1bcda
409 label=option,
410 key=option,
411 checked=should_be_checked(option),
412 on_change=on_change,
413 disabled=self._readonly or self._is_completed(),
414 style=self._styles.message.multiple_choice_inner.checkbox,
415 )
416 me.button( 1bcda
417 label="OK",
418 on_click=on_click,
419 color="primary",
420 type="flat",
421 style=self._styles.message.multiple_choice_inner.button,
422 )
424 self.visit_default( 1bcda
425 message,
426 content=prompt,
427 style=self._styles.message.text_input,
428 inner_callback=inner_callback,
429 )
430 return "" 1bcda
432 def render_error_message( 1bcd
433 self,
434 e: Exception,
435 message: IOMessage,
436 *,
437 content: Optional[str] = None,
438 style: Optional[MesopMessageStyles] = None,
439 ) -> None:
440 style = self._styles.message.error or self._styles.message.default
441 title = "[Error] " + message.type.replace("_", " ").capitalize()
443 with me.box(style=style.box or self._styles.message.default.box):
444 self._header(
445 message,
446 title=title,
447 box_style=style.header_box or self._styles.message.default.header_box,
448 md_style=style.header_md or self._styles.message.default.header_md,
449 )
451 content = (
452 "Failed to render message:"
453 + json.dumps(message.model_dump(), indent=2, cls=UUIDEncoder)
454 + f"<br>Error: {e}"
455 )
457 logger.warning(f"render_error_message: {content=}")
458 logger.warning(e, exc_info=True)
459 # me.markdown(content, style=style.md or self._styles.message.default.md)
460 self._render_content(content, style.md or self._styles.message.default.md)
462 def process_message(self, message: IOMessage) -> Optional[str]: 1bcd
463 try: 1bcda
464 return self.visit(message) 1bcda
465 except Exception as e:
466 logger.warning(f"Failed to render message: {e}")
467 self.render_error_message(e, message)
468 return None
470 def _header( 1bcd
471 self,
472 message: IOMessage,
473 *,
474 title: Optional[str] = None,
475 box_style: Optional[me.Style] = None,
476 md_style: Optional[me.Style] = None,
477 ) -> None:
478 if isinstance(message, IOMessage): 478 ↛ 482line 478 didn't jump to line 482 because the condition on line 478 was always true1bcda
479 sender = message.sender 1bcda
480 recipient = message.recipient 1bcda
481 else:
482 sender = message.content.sender
483 recipient = message.content.recipient
485 with me.box(style=box_style or self._styles.message.default.header_box): 1bcda
486 h = title if title else message.type 1bcda
487 if sender and recipient: 487 ↛ 489line 487 didn't jump to line 489 because the condition on line 487 was always true1bcda
488 h += f": {sender} (to {recipient})" 1bcda
489 elif sender:
490 h += f": to {sender}"
491 elif hasattr(message, "recipient") and message.recipient:
492 h += f": from {recipient}"
493 if hasattr(message, "auto_reply") and message.auto_reply: 493 ↛ 494line 493 didn't jump to line 494 because the condition on line 493 was never true1bcda
494 h += " (auto-reply)"
495 h = f"**{h}**" 1bcda
496 # style=me.Style(padding=me.Padding(top=8, right=16, left=16, bottom=8))
497 me.markdown(h, style=md_style or self._styles.message.default.header_md) 1bcda