Coverage for fastagency/ui/mesop/message.py: 80%

196 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-19 12:16 +0000

1import json 1bcd

2import random 1bcd

3from collections.abc import Iterable, Iterator 1bcd

4from typing import TYPE_CHECKING, Callable, Optional 1bcd

5from uuid import UUID, uuid4 1bcd

6 

7import mesop as me 1bcd

8import mesop.labs as mel 1bcd

9 

10from fastagency.helpers import jsonify_string 1bcd

11 

12from ...logging import get_logger 1bcd

13from ...messages import ( 1bcd

14 AskingMessage, 

15 Error, 

16 FunctionCallExecution, 

17 IOMessage, 

18 KeepAlive, 

19 MessageProcessorMixin, 

20 MultipleChoice, 

21 SuggestedFunctionCall, 

22 SystemMessage, 

23 TextInput, 

24 TextMessage, 

25 WorkflowCompleted, 

26) 

27from ...runtimes.ag2.ag2 import create_ag2_event 1bcd

28from .components.inputs import input_text 1bcd

29from .data_model import Conversation, ConversationMessage, State 1bcd

30from .mesop import MesopMessage 1bcd

31from .send_prompt import get_more_messages, send_user_feedback_to_autogen 1bcd

32from .styles import MesopHomePageStyles, MesopMessageStyles 1bcd

33from .timer import wakeup_component 1bcd

34 

35if TYPE_CHECKING: 1bcd

36 from autogen.events.agent_events import ( 

37 RunCompletionEvent, 

38 TextEvent, 

39 UsingAutoReplyEvent, 

40 ) 

41 

42logger = get_logger(__name__) 1bcd

43 

44 

45class UUIDEncoder(json.JSONEncoder): 1bcd

46 def default(self, obj: object) -> object: 1bcd

47 if isinstance(obj, UUID): 

48 # if the obj is uuid, we simply return the value of uuid 

49 return obj.hex 

50 return json.JSONEncoder.default(self, obj) 

51 

52 

53def consume_responses(responses: Iterable[MesopMessage]) -> Iterator[None]: 1bcd

54 for message in responses: 1a

55 state = me.state(State) 1a

56 handle_message(state, message) 1a

57 yield 1a

58 if not isinstance(message.io_message, KeepAlive): 58 ↛ 60line 58 didn't jump to line 60 because the condition on line 58 was always true1a

59 me.scroll_into_view(key="end_of_messages") 1a

60 yield 1a

61 yield 1a

62 

63 

64def handle_message(state: State, message: MesopMessage) -> None: 1bcd

65 conversation = state.conversation 1a

66 messages = conversation.messages 1a

67 level = message.conversation.level 1a

68 conversation_id = message.conversation.id 1a

69 io_message = message.io_message 1a

70 message_dict = io_message.model_dump() 1a

71 message_json = json.dumps(message_dict, cls=UUIDEncoder) 1a

72 conversation_message = ConversationMessage( 1a

73 level=level, 

74 conversation_id=conversation_id, 

75 io_message_json=message_json, 

76 feedback=[], 

77 ) 

78 messages.append(conversation_message) 1a

79 conversation.messages = list(messages) 1a

80 if isinstance(io_message, AskingMessage): 1a

81 conversation.waiting_for_feedback = True 1a

82 conversation.completed = False 1a

83 if isinstance(io_message, WorkflowCompleted): 1a

84 conversation.completed = True 1a

85 conversation.waiting_for_feedback = False 1a

86 if not conversation.is_from_the_past: 86 ↛ exitline 86 didn't return from function 'handle_message' because the condition on line 86 was always true1a

87 uuid: str = uuid4().hex 1a

88 becomme_past = Conversation( 1a

89 id=uuid, 

90 title=find_suitable_title(conversation), 

91 messages=conversation.messages, 

92 completed=True, 

93 is_from_the_past=True, 

94 waiting_for_feedback=False, 

95 ) 

96 state.past_conversations.insert(0, becomme_past) 1a

97 

98 

99def find_suitable_title(conversation: Conversation) -> str: 1bcd

100 messages = conversation.messages 1a

101 first_input = next(filter(lambda m: m.feedback_completed, messages), None) 1a

102 if first_input and first_input.feedback: 1a

103 return first_input.feedback[0] 1a

104 

105 state = me.state(State) 1a

106 return f"The conversation that shall remain unnamed ({random.randint(0, 100)}-{len(state.past_conversations)})" # nosec: B311 1a

107 

108 

109def message_box( 1bcd

110 message: ConversationMessage, read_only: bool, *, styles: MesopHomePageStyles 

111) -> None: 

112 io_message_dict = json.loads(message.io_message_json) 1bcda

113 level = message.level 1bcda

114 conversation_id = message.conversation_id 1bcda

115 try: 1bcda

116 io_message = IOMessage.create(**io_message_dict) 1bcda

117 except Exception: 

118 io_message = create_ag2_event( 

119 **io_message_dict, 

120 ) 

121 

122 visitor = MesopGUIMessageVisitor(level, conversation_id, message, styles, read_only) 1bcda

123 visitor.process_message(io_message) 1bcda

124 

125 

126class MesopGUIMessageVisitor(MessageProcessorMixin): 1bcd

127 def __init__( 1bcd

128 self, 

129 level: int, 

130 conversation_id: str, 

131 conversation_message: ConversationMessage, 

132 styles: MesopHomePageStyles, 

133 read_only: bool = False, 

134 ) -> None: 

135 """Initialize the MesopGUIMessageVisitor object. 

136 

137 Args: 

138 level (int): The level of the message. 

139 conversation_id (str): The ID of the conversation. 

140 conversation_message (ConversationMessage): Conversation message that wraps the visited io_message 

141 styles (MesopHomePageStyles): Styles for the message 

142 read_only (bool): Input messages are disabled in read only mode 

143 """ 

144 self._level = level 1bcda

145 self._conversation_id = conversation_id 1bcda

146 self._readonly = read_only 1bcda

147 self._conversation_message = conversation_message 1bcda

148 self._styles = styles 1bcda

149 

150 def _has_feedback(self) -> bool: 1bcd

151 return len(self._conversation_message.feedback) > 0 1a

152 

153 def _is_completed(self) -> bool: 1bcd

154 return self._conversation_message.feedback_completed 1a

155 

156 def _provide_feedback(self, feedback: str) -> Iterator[None]: 1bcd

157 logger.debug(f"MesopGUIMessageVisitor._provide_feedback({feedback=})") 1a

158 state = me.state(State) 1a

159 conversation = state.conversation 1a

160 conversation.feedback = "" 1a

161 conversation.waiting_for_feedback = False 1a

162 yield 1a

163 me.scroll_into_view(key="end_of_messages") 1a

164 yield 1a

165 responses = send_user_feedback_to_autogen(feedback) 1a

166 yield from consume_responses(responses) 1a

167 

168 def _render_content(self, content: str, msg_md_style: me.Style) -> None: 1bcd

169 content = jsonify_string(content) 1bcda

170 me.markdown(content, style=msg_md_style) 1bcda

171 

172 def visit_default( 1bcd

173 self, 

174 message: IOMessage, 1bcd

175 *, 

176 content: Optional[str] = None, 1bcd

177 style: Optional[MesopMessageStyles] = None, 1bcd

178 error: Optional[bool] = False, 1bcd

179 inner_callback: Optional[Callable[..., None]] = None, 1bcd

180 scrollable: Optional[bool] = False, 1bcd

181 ) -> None: 1bcd

182 # logger.info(f"visit_default: {message=}") 

183 style = style or self._styles.message.default 1bcda

184 title = message.type.replace("_", " ").capitalize() 1bcda

185 title = "[Error] " + title if error else title 1bcda

186 with me.box(style=style.box or self._styles.message.default.box): 1bcda

187 self._header( 1bcda

188 message, 1bcda

189 title=title, 1bcda

190 box_style=style.header_box, 1bcda

191 md_style=style.header_md, 1bcda

192 ) 

193 

194 if isinstance(message, IOMessage): 1bcda

195 content = content or json.dumps( 1bcda

196 message.model_dump()["content"], cls=UUIDEncoder 1bcda

197 ) 

198 else: 

199 content = ( 

200 message.content.content 

201 if hasattr(message, "content") 

202 and hasattr(message.content, "content") 

203 else "" 

204 ) 

205 content = self._body_to_str(content) 1bcda

206 

207 self._render_content( 1bcda

208 content, 1bcda

209 msg_md_style=style.scrollable_md 1bcda

210 or self._styles.message.default.scrollable_md 

211 if scrollable 1bcda

212 else style.md or self._styles.message.default.md, 1bcda

213 ) 

214 

215 if inner_callback: 1bcda

216 inner_callback() 1bcda

217 

218 def visit_text(self, message: "TextEvent") -> None: 1bcd

219 content = message.content.content 

220 self.visit_default( 

221 message, 

222 content=content, 

223 style=self._styles.message.text, 

224 ) 

225 

226 def visit_using_auto_repy(self, message: "UsingAutoReplyEvent") -> None: 1bcd

227 content = None 

228 self.visit_default( 

229 message, 

230 content=content, 

231 style=self._styles.message.system, 

232 ) 

233 

234 def visit_run_completion(self, message: "RunCompletionEvent") -> None: 1bcd

235 # We can ignore the RunCompletionEvent as we handle RunResponse already 

236 pass 

237 

238 def visit_text_message(self, message: TextMessage) -> None: 1bcd

239 content = message.body if message.body else "" 1bcda

240 content = content if content.strip() != "" else "*(empty message)*" 1bcda

241 self.visit_default( 1bcda

242 message, 

243 content=content, 

244 style=self._styles.message.text, 

245 ) 

246 

247 def visit_error(self, message: Error) -> None: 1bcd

248 self.visit_default( 1a

249 message, 

250 content=f"### {message.short}\n{message.long}", 

251 style=self._styles.message.error, 

252 scrollable=True, 

253 ) 

254 

255 def visit_system_message(self, message: SystemMessage) -> None: 1bcd

256 content = ( 1bcd

257 f"""#### **{message.message["heading"]}** 

258 

259{message.message["body"]} 

260""" 

261 if "heading" in message.message and "body" in message.message 

262 else json.dumps(message.message) 

263 ) 

264 self.visit_default( 1bcd

265 message, 

266 content=content, 

267 style=self._styles.message.system, 

268 scrollable=True, 

269 ) 

270 

271 def visit_keep_alive(self, message: KeepAlive) -> None: 1bcd

272 def on_wakeup(e: mel.WebEvent) -> Iterator[None]: 

273 logger.debug("waking up, after the keep alive") 

274 self._conversation_message.feedback_completed = True 

275 yield from consume_responses(get_more_messages()) 

276 

277 with me.box(): 

278 if not (self._readonly or self._conversation_message.feedback_completed): 

279 wakeup_component(on_wakeup=on_wakeup) 

280 

281 def visit_suggested_function_call(self, message: SuggestedFunctionCall) -> None: 1bcd

282 content = f"""**function_name**: `{message.function_name}`<br> 1bcda

283**call_id**: `{message.call_id}`<br> 

284**arguments**: {json.dumps(message.arguments)}""" 

285 self.visit_default( 1bcda

286 message, 

287 content=content, 

288 style=self._styles.message.suggested_function_call, 

289 scrollable=True, 

290 ) 

291 

292 def visit_function_call_execution(self, message: FunctionCallExecution) -> None: 1bcd

293 content = f"""**function_name**: `{message.function_name}`<br> 1bcda

294**call_id**: `{message.call_id}`<br> 

295**retval**: {message.retval}""" 

296 return self.visit_default( 1bcda

297 message, 

298 content=content, 

299 style=self._styles.message.function_call_execution, 

300 scrollable=True, 

301 ) 

302 

303 def visit_text_input(self, message: TextInput) -> str: 1bcd

304 def on_input(feedback: str) -> Iterator[None]: 1bcda

305 self._conversation_message.feedback = [feedback] 1a

306 self._conversation_message.feedback_completed = True 1a

307 yield from self._provide_feedback(feedback) 1a

308 

309 def value_if_completed() -> Optional[str]: 1bcda

310 message = self._conversation_message 1bcda

311 return message.feedback[0] if message.feedback_completed else None 1bcda

312 

313 # base_color = "#dff" 

314 prompt = message.prompt if message.prompt else "Please enter a value" 1bcda

315 if message.suggestions: 1bcda

316 suggestions = ",".join(suggestion for suggestion in message.suggestions) 1bcd

317 prompt += "\n Suggestions: " + suggestions 1bcd

318 

319 self.visit_default( 1bcda

320 message, 

321 content=prompt, 

322 style=self._styles.message.text_input, 

323 inner_callback=lambda: input_text( 

324 on_input, 

325 key="prompt", 

326 disabled=self._readonly or self._has_feedback(), 

327 value=value_if_completed(), 

328 style=self._styles.message.text_input_inner, 

329 ), 

330 ) 

331 return "" 1bcda

332 

333 def visit_multiple_choice(self, message: MultipleChoice) -> str: 1bcd

334 if message.single: 1bcda

335 return self._visit_single_choice(message) 1bcda

336 else: 

337 return self._visit_many_choices(message) 1bcda

338 

339 def _visit_single_choice(self, message: MultipleChoice) -> str: 1bcd

340 def on_click(ev: me.ClickEvent) -> Iterator[None]: 1bcda

341 self._conversation_message.feedback_completed = True 1a

342 self._conversation_message.feedback = [ev.key] 1a

343 yield from self._provide_feedback(ev.key) 1a

344 

345 prompt = message.prompt if message.prompt else "Please enter a value" 1bcda

346 

347 def inner_callback() -> None: 1bcda

348 with me.box( 1bcda

349 style=self._styles.message.single_choice_inner.box, 

350 ): 

351 for choice in message.choices: 351 ↛ exitline 351 didn't return from function 'inner_callback' because the loop on line 351 didn't complete1bcda

352 disabled = self._readonly or self._is_completed() 1bcda

353 selected = choice in self._conversation_message.feedback 1bcda

354 if selected: 1bcda

355 style = self._styles.message.single_choice_inner.selected_button 1a

356 elif disabled: 1bcda

357 style = self._styles.message.single_choice_inner.disabled_button 1bcda

358 else: 

359 style = self._styles.message.single_choice_inner.button 1a

360 me.button( 1bcda

361 label=choice, 

362 on_click=on_click, 

363 color="primary", 

364 type="flat", 

365 key=choice, 

366 disabled=disabled, 

367 style=style, 

368 ) 

369 

370 self.visit_default( 1bcda

371 message, 

372 content=prompt, 

373 style=self._styles.message.text_input, 

374 inner_callback=inner_callback, 

375 ) 

376 return "" 1bcda

377 

378 def _visit_many_choices(self, message: MultipleChoice) -> str: 1bcd

379 def on_change(ev: me.CheckboxChangeEvent) -> None: 1bcda

380 message_feedback = self._conversation_message.feedback 1a

381 choice = ev.key 1a

382 yes_no = ev.checked 1a

383 if yes_no: 383 ↛ 387line 383 didn't jump to line 387 because the condition on line 383 was always true1a

384 if choice not in message_feedback: 384 ↛ exitline 384 didn't return from function 'on_change' because the condition on line 384 was always true1a

385 message_feedback.append(choice) 1a

386 else: 

387 if choice in message_feedback: 

388 message_feedback.remove(choice) 

389 

390 def on_click(ev: me.ClickEvent) -> Iterator[None]: 1bcda

391 message_feedback = self._conversation_message.feedback 1a

392 feedback = ",".join(message_feedback) 1a

393 self._conversation_message.feedback_completed = True 1a

394 yield from self._provide_feedback(feedback) 1a

395 

396 def should_be_checked(option: str) -> bool: 1bcda

397 conversation_message = self._conversation_message 1bcda

398 return option in conversation_message.feedback 1bcda

399 

400 prompt = message.prompt if message.prompt else "Please select a value:" 1bcda

401 

402 def inner_callback() -> None: 1bcda

403 if message.choices: 403 ↛ exitline 403 didn't return from function 'inner_callback' because the condition on line 403 was always true1bcda

404 with me.box( 1bcda

405 style=self._styles.message.multiple_choice_inner.box, 

406 ): 

407 for option in message.choices: 1bcda

408 me.checkbox( 1bcda

409 label=option, 

410 key=option, 

411 checked=should_be_checked(option), 

412 on_change=on_change, 

413 disabled=self._readonly or self._is_completed(), 

414 style=self._styles.message.multiple_choice_inner.checkbox, 

415 ) 

416 me.button( 1bcda

417 label="OK", 

418 on_click=on_click, 

419 color="primary", 

420 type="flat", 

421 style=self._styles.message.multiple_choice_inner.button, 

422 ) 

423 

424 self.visit_default( 1bcda

425 message, 

426 content=prompt, 

427 style=self._styles.message.text_input, 

428 inner_callback=inner_callback, 

429 ) 

430 return "" 1bcda

431 

432 def render_error_message( 1bcd

433 self, 

434 e: Exception, 

435 message: IOMessage, 

436 *, 

437 content: Optional[str] = None, 

438 style: Optional[MesopMessageStyles] = None, 

439 ) -> None: 

440 style = self._styles.message.error or self._styles.message.default 

441 title = "[Error] " + message.type.replace("_", " ").capitalize() 

442 

443 with me.box(style=style.box or self._styles.message.default.box): 

444 self._header( 

445 message, 

446 title=title, 

447 box_style=style.header_box or self._styles.message.default.header_box, 

448 md_style=style.header_md or self._styles.message.default.header_md, 

449 ) 

450 

451 content = ( 

452 "Failed to render message:" 

453 + json.dumps(message.model_dump(), indent=2, cls=UUIDEncoder) 

454 + f"<br>Error: {e}" 

455 ) 

456 

457 logger.warning(f"render_error_message: {content=}") 

458 logger.warning(e, exc_info=True) 

459 # me.markdown(content, style=style.md or self._styles.message.default.md) 

460 self._render_content(content, style.md or self._styles.message.default.md) 

461 

462 def process_message(self, message: IOMessage) -> Optional[str]: 1bcd

463 try: 1bcda

464 return self.visit(message) 1bcda

465 except Exception as e: 

466 logger.warning(f"Failed to render message: {e}") 

467 self.render_error_message(e, message) 

468 return None 

469 

470 def _header( 1bcd

471 self, 

472 message: IOMessage, 

473 *, 

474 title: Optional[str] = None, 

475 box_style: Optional[me.Style] = None, 

476 md_style: Optional[me.Style] = None, 

477 ) -> None: 

478 if isinstance(message, IOMessage): 478 ↛ 482line 478 didn't jump to line 482 because the condition on line 478 was always true1bcda

479 sender = message.sender 1bcda

480 recipient = message.recipient 1bcda

481 else: 

482 sender = message.content.sender 

483 recipient = message.content.recipient 

484 

485 with me.box(style=box_style or self._styles.message.default.header_box): 1bcda

486 h = title if title else message.type 1bcda

487 if sender and recipient: 487 ↛ 489line 487 didn't jump to line 489 because the condition on line 487 was always true1bcda

488 h += f": {sender} (to {recipient})" 1bcda

489 elif sender: 

490 h += f": to {sender}" 

491 elif hasattr(message, "recipient") and message.recipient: 

492 h += f": from {recipient}" 

493 if hasattr(message, "auto_reply") and message.auto_reply: 493 ↛ 494line 493 didn't jump to line 494 because the condition on line 493 was never true1bcda

494 h += " (auto-reply)" 

495 h = f"**{h}**" 1bcda

496 # style=me.Style(padding=me.Padding(top=8, right=16, left=16, bottom=8)) 

497 me.markdown(h, style=md_style or self._styles.message.default.header_md) 1bcda