Coverage for fastagency/ui/mesop/mesop.py: 84%
166 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-19 12:16 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-19 12:16 +0000
1import threading 1abc
2import time 1abc
3import traceback 1abc
4from collections.abc import Generator, Iterator 1abc
5from contextlib import contextmanager 1abc
6from dataclasses import dataclass 1abc
7from pathlib import Path 1abc
8from queue import Queue 1abc
9from tempfile import TemporaryDirectory 1abc
10from typing import TYPE_CHECKING, Any, Callable, ClassVar, Optional 1abc
11from uuid import uuid4 1abc
13import mesop as me 1abc
14from mesop.bin.bin import FLAGS as MESOP_FLAGS 1abc
15from mesop.bin.bin import main as mesop_main 1abc
17from ...base import ( 1abc
18 UI,
19 CreateWorkflowUIMixin,
20 ProviderProtocol,
21 Runnable,
22)
23from ...logging import get_logger 1abc
24from ...messages import ( 1abc
25 AskingMessage,
26 IOMessage,
27 KeepAlive,
28 MessageProcessorMixin,
29 MultipleChoice,
30 TextInput,
31 TextMessage,
32 WorkflowCompleted,
33)
34from .auth import AuthProtocol 1abc
35from .styles import MesopHomePageStyles 1abc
36from .timer import configure_static_file_serving 1abc
38if TYPE_CHECKING: 1abc
39 from autogen.events.agent_events import (
40 ExecuteFunctionEvent,
41 InputRequestEvent,
42 RunCompletionEvent,
43 TerminationEvent,
44 TextEvent,
45 UsingAutoReplyEvent,
46 )
48logger = get_logger(__name__) 1abc
51@dataclass 1abc
52class MesopMessage: 1abc
53 """A Mesop message."""
55 io_message: IOMessage 1abc
56 conversation: "MesopUI" 1abc
59class MesopUI(MessageProcessorMixin, CreateWorkflowUIMixin): # UIBase 1abc
60 _import_string: Optional[str] = None 1abc
61 _main_path: Optional[str] = None 1abc
62 _created_instance: Optional["MesopUI"] = None 1abc
63 _app: Optional[Runnable] = None 1abc
64 _me: Optional[Callable[..., Any]] = None 1abc
66 def __init__( 1abc
67 self,
68 super_conversation: "Optional[MesopUI]" = None,
69 *,
70 security_policy: Optional[me.SecurityPolicy] = None,
71 styles: Optional[MesopHomePageStyles] = None,
72 keep_alive: Optional[bool] = False,
73 auth: Optional[AuthProtocol] = None,
74 ) -> None:
75 """Initialize the console UI object.
77 Args:
78 super_conversation (Optional[MesopUI], optional): The super conversation. Defaults to None.
79 security_policy (Optional[me.SecurityPolicy], optional): The security policy. Defaults to None.
80 styles (Optional[MesopHomePageStyles], optional): The styles. Defaults to None.
81 keep_alive (Optional[bool]): If keep alive messages should be inserted, defaults to False`
82 auth (Optional[AuthProtocol]): The auth settings to use. Defaults to None.
83 """
84 logger.info(f"Initializing MesopUI: {self}") 1abcd
85 try: 1abcd
86 self.id: str = uuid4().hex 1abcd
87 self.super_conversation: Optional[MesopUI] = super_conversation 1abcd
88 self.sub_conversations: list[MesopUI] = [] 1abcd
89 self._in_queue: Optional[Queue[str]] = None 1abcd
90 self._out_queue: Optional[Queue[MesopMessage]] = None 1abcd
92 self._keep_me_alive = keep_alive 1abcd
93 self._keep_alive_thread: Optional[threading.Thread] = None 1abcd
94 if super_conversation is None: 94 ↛ 99line 94 didn't jump to line 99 because the condition on line 94 was always true1abcd
95 self._in_queue = Queue() 1abcd
96 self._out_queue = Queue() 1abcd
97 self.keep_me_alive() 1abcd
99 MesopUI.register(self) 1abcd
101 if MesopUI._me is None: 1abcd
102 from .main import create_home_page, me 1abc
104 create_home_page( 1abc
105 self, security_policy=security_policy, styles=styles, auth=auth
106 )
107 MesopUI._me = me 1abc
109 except Exception as e:
110 logger.error(e, exc_info=True)
111 raise
112 logger.info(f"Initialized MesopUI: {self}") 1abcd
114 _registry: ClassVar[dict[str, "MesopUI"]] = {} 1abc
116 def keep_me_alive(self) -> None: 1abc
117 def keep_alive_worker() -> None: 1abcd
118 while self._keep_me_alive: 1d
119 time.sleep(3) 1d
120 if self._out_queue: 120 ↛ 118line 120 didn't jump to line 118 because the condition on line 120 was always true1d
121 # todo: do something more elegant
122 msg = KeepAlive(workflow_uuid="") 1d
123 mesop_msg = self._mesop_message(msg) 1d
124 logger.debug(f"putting keepalive {msg.uuid}") 1d
125 self._out_queue.put(mesop_msg) 1d
127 if self._keep_me_alive and self._keep_alive_thread is None: 1abcd
128 self._keep_alive_thread = threading.Thread(target=keep_alive_worker) 1d
129 self._keep_alive_thread.start() 1d
131 def do_not_keep_me_alive(self) -> None: 1abc
132 self._keep_me_alive = False 1d
134 @classmethod 1abc
135 def get_created_instance(cls) -> "MesopUI": 1abc
136 created_instance = cls._created_instance 1abc
137 if created_instance is None: 1abc
138 raise RuntimeError("MesopUI has not been created yet.") 1abc
140 return created_instance 1abc
142 @property 1abc
143 def app(self) -> Runnable: 1abc
144 app = MesopUI._app 1abcd
145 if app is None: 1abcd
146 logger.error("MesopUI has not been created yet.")
147 raise RuntimeError("MesopUI has not been created yet.")
149 return app 1abcd
151 @contextmanager 1abc
152 def create(self, app: Runnable, import_string: str) -> Iterator[None]: 1abc
153 logger.info(f"Creating MesopUI with import string: {import_string}") 1abc
154 MesopUI._app = app 1abc
155 MesopUI._import_string = import_string 1abc
157 start_script = """import fastagency.ui.mesop.main""" 1abc
159 with TemporaryDirectory() as temp_dir: 1abc
160 main_path = Path(temp_dir) / "main.py" 1abc
161 with main_path.open("w") as f: 1abc
162 f.write(start_script) 1abc
164 MESOP_FLAGS.mark_as_parsed() 1abc
165 MesopUI._main_path = str(main_path) 1abc
166 MesopUI._created_instance = self 1abc
168 yield 1abc
170 def start( 1abc
171 self,
172 *,
173 app: Runnable,
174 import_string: str,
175 name: Optional[str] = None,
176 params: dict[str, Any],
177 single_run: bool = False,
178 ) -> None:
179 logger.info(
180 f"Starting MesopUI: import_string={self._import_string}, main_path={self._main_path}"
181 )
182 if single_run:
183 logger.warning("single_run parameter is currently not supported in MesopUI")
185 MesopUI._app = app
187 mesop_main(["mesop", self._main_path])
189 @classmethod 1abc
190 def register(cls, conversation: "MesopUI") -> None: 1abc
191 cls._registry[conversation.id] = conversation 1abcd
193 @classmethod 1abc
194 def get_conversation(cls, id: str) -> "MesopUI": 1abc
195 return cls._registry[id] 1d
197 @classmethod 1abc
198 def unregister(cls, conversation: "MesopUI") -> None: 1abc
199 del cls._registry[conversation.id]
201 @property 1abc
202 def is_root_conversation(self) -> bool: 1abc
203 return self.super_conversation is not None
205 @property 1abc
206 def root_conversation(self) -> "MesopUI": 1abc
207 if self.super_conversation is None: 207 ↛ 210line 207 didn't jump to line 210 because the condition on line 207 was always true1d
208 return self 1d
209 else:
210 return self.super_conversation.root_conversation
212 @property 1abc
213 def in_queue(self) -> Queue[str]: 1abc
214 queue = self.root_conversation._in_queue 1d
215 return queue # type: ignore[return-value] 1d
217 @property 1abc
218 def out_queue(self) -> Queue[MesopMessage]: 1abc
219 queue = self.root_conversation._out_queue 1d
220 return queue # type: ignore[return-value] 1d
222 @property 1abc
223 def level(self) -> int: 1abc
224 return ( 1d
225 0 if self.super_conversation is None else self.super_conversation.level + 1
226 )
228 def _publish(self, mesop_msg: MesopMessage) -> None: 1abc
229 self.out_queue.put(mesop_msg) 1d
231 def _mesop_message(self, io_message: IOMessage) -> MesopMessage: 1abc
232 return MesopMessage( 1abcd
233 conversation=self,
234 io_message=io_message,
235 )
237 def visit_default(self, message: IOMessage) -> None: 1abc
238 mesop_msg = self._mesop_message(message) 1d
239 self._publish(mesop_msg) 1d
241 def visit_text(self, message: "TextEvent") -> None: 1abc
242 mesop_msg = self._mesop_message(message)
243 self._publish(mesop_msg)
245 def visit_using_auto_reply(self, message: "UsingAutoReplyEvent") -> None: 1abc
246 pass
248 def visit_run_completion(self, message: "RunCompletionEvent") -> None: 1abc
249 # We can ignore the RunCompletionEvent as we handle RunResponse already
250 pass
252 def visit_execute_function(self, message: "ExecuteFunctionEvent") -> None: 1abc
253 mesop_msg = self._mesop_message(message)
254 self._publish(mesop_msg)
256 def visit_termination(self, message: "TerminationEvent") -> None: 1abc
257 pass
259 def visit_text_message(self, message: TextMessage) -> None: 1abc
260 mesop_msg = self._mesop_message(message) 1d
261 self._publish(mesop_msg) 1d
263 def visit_text_input(self, message: TextInput) -> str: 1abc
264 mesop_msg = self._mesop_message(message) 1d
265 self._publish(mesop_msg) 1d
266 return self.in_queue.get() 1d
268 def visit_input_request(self, message: "InputRequestEvent") -> str: 1abc
269 mesop_msg = self._mesop_message(message)
270 self._publish(mesop_msg)
271 return self.in_queue.get()
273 def visit_multiple_choice(self, message: MultipleChoice) -> str: 1abc
274 mesop_msg = self._mesop_message(message) 1d
275 self._publish(mesop_msg) 1d
276 return self.in_queue.get() 1d
278 def process_message(self, message: IOMessage) -> Optional[str]: 1abc
279 return self.visit(message) 1d
281 def create_subconversation(self) -> "MesopUI": 1abc
282 sub_conversation = MesopUI(self)
283 self.sub_conversations.append(sub_conversation)
285 return sub_conversation
287 def _is_stream_braker(self, message: IOMessage) -> bool: 1abc
288 return isinstance(message, (AskingMessage, WorkflowCompleted, KeepAlive)) 1d
290 def respond(self, message: str) -> None: 1abc
291 self.in_queue.put(message) 1d
293 @classmethod 1abc
294 def respond_to( 1abc
295 cls, conversation_id: str, message: str
296 ) -> Generator[MesopMessage, None, None]:
297 conversation = cls.get_conversation(conversation_id)
298 conversation.respond(message)
299 return conversation.get_message_stream()
301 def get_message_stream(self) -> Generator[MesopMessage, None, None]: 1abc
302 while True:
303 message = self.out_queue.get() 1d
304 if self._is_stream_braker(message.io_message): 1d
305 yield message 1d
306 break 1d
307 yield message 1d
309 def handle_wsgi( 1abc
310 self,
311 app: "Runnable", 1abc
312 environ: dict[str, Any], 1abc
313 start_response: Callable[..., Any], 1abc
314 ) -> list[bytes]: 1abc
315 logger.debug(f"Starting MesopUI using WSGI interface with app: {app}") 1d
316 MesopUI._created_instance = self 1d
317 MesopUI._app = app 1d
319 if configure_static_file_serving is None: # pragme: no cover 1d
320 logger.error("configure_static_file_serving is None")
322 if MesopUI._me is None: 1d
323 logger.error("MesopUI._me is None")
324 raise RuntimeError("MesopUI._me is None")
326 return MesopUI._me(environ, start_response) # type: ignore[no-any-return] 1d
329def run_workflow_mesop(provider: ProviderProtocol, name: str) -> UI: 1abc
330 def workflow_worker( 1d
331 provider: ProviderProtocol, name: str, mesop_ui: MesopUI, workflow_uuid: str
332 ) -> None:
333 ui = mesop_ui.create_workflow_ui(workflow_uuid) 1d
334 try: 1d
335 provider.run( 1d
336 name=name,
337 ui=ui,
338 )
339 except Exception as e:
340 logger.error(
341 f"Unexpected exception raised in Mesop workflow worker: {e}",
342 exc_info=True,
343 )
344 ui.error(
345 sender="Mesop workflow_worker",
346 short=f"Unexpected exception raised: {e}",
347 long=traceback.format_exc(),
348 )
349 return
350 finally:
351 mesop_ui.do_not_keep_me_alive() 1d
353 ui_base = MesopUI(keep_alive=True) 1d
354 workflow_uuid = ui_base.id 1d
356 # subconversation = ui_base.create_subconversation()
357 thread = threading.Thread( 1d
358 target=workflow_worker, args=(provider, name, ui_base, workflow_uuid)
359 )
360 thread.start() 1d
362 return ui_base.create_workflow_ui(workflow_uuid) 1d
364 # # needed for uvicorn to recognize the class as a valid ASGI application
365 # async def __call__(
366 # self,
367 # scope: dict[str, Any],
368 # receive: Callable[[], Awaitable[dict]],
369 # send: Callable[[dict], Awaitable[None]],
370 # ) -> None:
371 # MesopUI._created_instance = self
372 # from .main import me
374 # return await me(scope, receive, send)