Coverage for fastagency/ui/mesop/mesop.py: 84%

166 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-19 12:16 +0000

1import threading 1abc

2import time 1abc

3import traceback 1abc

4from collections.abc import Generator, Iterator 1abc

5from contextlib import contextmanager 1abc

6from dataclasses import dataclass 1abc

7from pathlib import Path 1abc

8from queue import Queue 1abc

9from tempfile import TemporaryDirectory 1abc

10from typing import TYPE_CHECKING, Any, Callable, ClassVar, Optional 1abc

11from uuid import uuid4 1abc

12 

13import mesop as me 1abc

14from mesop.bin.bin import FLAGS as MESOP_FLAGS 1abc

15from mesop.bin.bin import main as mesop_main 1abc

16 

17from ...base import ( 1abc

18 UI, 

19 CreateWorkflowUIMixin, 

20 ProviderProtocol, 

21 Runnable, 

22) 

23from ...logging import get_logger 1abc

24from ...messages import ( 1abc

25 AskingMessage, 

26 IOMessage, 

27 KeepAlive, 

28 MessageProcessorMixin, 

29 MultipleChoice, 

30 TextInput, 

31 TextMessage, 

32 WorkflowCompleted, 

33) 

34from .auth import AuthProtocol 1abc

35from .styles import MesopHomePageStyles 1abc

36from .timer import configure_static_file_serving 1abc

37 

38if TYPE_CHECKING: 1abc

39 from autogen.events.agent_events import ( 

40 ExecuteFunctionEvent, 

41 InputRequestEvent, 

42 RunCompletionEvent, 

43 TerminationEvent, 

44 TextEvent, 

45 UsingAutoReplyEvent, 

46 ) 

47 

48logger = get_logger(__name__) 1abc

49 

50 

51@dataclass 1abc

52class MesopMessage: 1abc

53 """A Mesop message.""" 

54 

55 io_message: IOMessage 1abc

56 conversation: "MesopUI" 1abc

57 

58 

59class MesopUI(MessageProcessorMixin, CreateWorkflowUIMixin): # UIBase 1abc

60 _import_string: Optional[str] = None 1abc

61 _main_path: Optional[str] = None 1abc

62 _created_instance: Optional["MesopUI"] = None 1abc

63 _app: Optional[Runnable] = None 1abc

64 _me: Optional[Callable[..., Any]] = None 1abc

65 

66 def __init__( 1abc

67 self, 

68 super_conversation: "Optional[MesopUI]" = None, 

69 *, 

70 security_policy: Optional[me.SecurityPolicy] = None, 

71 styles: Optional[MesopHomePageStyles] = None, 

72 keep_alive: Optional[bool] = False, 

73 auth: Optional[AuthProtocol] = None, 

74 ) -> None: 

75 """Initialize the console UI object. 

76 

77 Args: 

78 super_conversation (Optional[MesopUI], optional): The super conversation. Defaults to None. 

79 security_policy (Optional[me.SecurityPolicy], optional): The security policy. Defaults to None. 

80 styles (Optional[MesopHomePageStyles], optional): The styles. Defaults to None. 

81 keep_alive (Optional[bool]): If keep alive messages should be inserted, defaults to False` 

82 auth (Optional[AuthProtocol]): The auth settings to use. Defaults to None. 

83 """ 

84 logger.info(f"Initializing MesopUI: {self}") 1abcd

85 try: 1abcd

86 self.id: str = uuid4().hex 1abcd

87 self.super_conversation: Optional[MesopUI] = super_conversation 1abcd

88 self.sub_conversations: list[MesopUI] = [] 1abcd

89 self._in_queue: Optional[Queue[str]] = None 1abcd

90 self._out_queue: Optional[Queue[MesopMessage]] = None 1abcd

91 

92 self._keep_me_alive = keep_alive 1abcd

93 self._keep_alive_thread: Optional[threading.Thread] = None 1abcd

94 if super_conversation is None: 94 ↛ 99line 94 didn't jump to line 99 because the condition on line 94 was always true1abcd

95 self._in_queue = Queue() 1abcd

96 self._out_queue = Queue() 1abcd

97 self.keep_me_alive() 1abcd

98 

99 MesopUI.register(self) 1abcd

100 

101 if MesopUI._me is None: 1abcd

102 from .main import create_home_page, me 1abc

103 

104 create_home_page( 1abc

105 self, security_policy=security_policy, styles=styles, auth=auth 

106 ) 

107 MesopUI._me = me 1abc

108 

109 except Exception as e: 

110 logger.error(e, exc_info=True) 

111 raise 

112 logger.info(f"Initialized MesopUI: {self}") 1abcd

113 

114 _registry: ClassVar[dict[str, "MesopUI"]] = {} 1abc

115 

116 def keep_me_alive(self) -> None: 1abc

117 def keep_alive_worker() -> None: 1abcd

118 while self._keep_me_alive: 1d

119 time.sleep(3) 1d

120 if self._out_queue: 120 ↛ 118line 120 didn't jump to line 118 because the condition on line 120 was always true1d

121 # todo: do something more elegant 

122 msg = KeepAlive(workflow_uuid="") 1d

123 mesop_msg = self._mesop_message(msg) 1d

124 logger.debug(f"putting keepalive {msg.uuid}") 1d

125 self._out_queue.put(mesop_msg) 1d

126 

127 if self._keep_me_alive and self._keep_alive_thread is None: 1abcd

128 self._keep_alive_thread = threading.Thread(target=keep_alive_worker) 1d

129 self._keep_alive_thread.start() 1d

130 

131 def do_not_keep_me_alive(self) -> None: 1abc

132 self._keep_me_alive = False 1d

133 

134 @classmethod 1abc

135 def get_created_instance(cls) -> "MesopUI": 1abc

136 created_instance = cls._created_instance 1abc

137 if created_instance is None: 1abc

138 raise RuntimeError("MesopUI has not been created yet.") 1abc

139 

140 return created_instance 1abc

141 

142 @property 1abc

143 def app(self) -> Runnable: 1abc

144 app = MesopUI._app 1abcd

145 if app is None: 1abcd

146 logger.error("MesopUI has not been created yet.") 

147 raise RuntimeError("MesopUI has not been created yet.") 

148 

149 return app 1abcd

150 

151 @contextmanager 1abc

152 def create(self, app: Runnable, import_string: str) -> Iterator[None]: 1abc

153 logger.info(f"Creating MesopUI with import string: {import_string}") 1abc

154 MesopUI._app = app 1abc

155 MesopUI._import_string = import_string 1abc

156 

157 start_script = """import fastagency.ui.mesop.main""" 1abc

158 

159 with TemporaryDirectory() as temp_dir: 1abc

160 main_path = Path(temp_dir) / "main.py" 1abc

161 with main_path.open("w") as f: 1abc

162 f.write(start_script) 1abc

163 

164 MESOP_FLAGS.mark_as_parsed() 1abc

165 MesopUI._main_path = str(main_path) 1abc

166 MesopUI._created_instance = self 1abc

167 

168 yield 1abc

169 

170 def start( 1abc

171 self, 

172 *, 

173 app: Runnable, 

174 import_string: str, 

175 name: Optional[str] = None, 

176 params: dict[str, Any], 

177 single_run: bool = False, 

178 ) -> None: 

179 logger.info( 

180 f"Starting MesopUI: import_string={self._import_string}, main_path={self._main_path}" 

181 ) 

182 if single_run: 

183 logger.warning("single_run parameter is currently not supported in MesopUI") 

184 

185 MesopUI._app = app 

186 

187 mesop_main(["mesop", self._main_path]) 

188 

189 @classmethod 1abc

190 def register(cls, conversation: "MesopUI") -> None: 1abc

191 cls._registry[conversation.id] = conversation 1abcd

192 

193 @classmethod 1abc

194 def get_conversation(cls, id: str) -> "MesopUI": 1abc

195 return cls._registry[id] 1d

196 

197 @classmethod 1abc

198 def unregister(cls, conversation: "MesopUI") -> None: 1abc

199 del cls._registry[conversation.id] 

200 

201 @property 1abc

202 def is_root_conversation(self) -> bool: 1abc

203 return self.super_conversation is not None 

204 

205 @property 1abc

206 def root_conversation(self) -> "MesopUI": 1abc

207 if self.super_conversation is None: 207 ↛ 210line 207 didn't jump to line 210 because the condition on line 207 was always true1d

208 return self 1d

209 else: 

210 return self.super_conversation.root_conversation 

211 

212 @property 1abc

213 def in_queue(self) -> Queue[str]: 1abc

214 queue = self.root_conversation._in_queue 1d

215 return queue # type: ignore[return-value] 1d

216 

217 @property 1abc

218 def out_queue(self) -> Queue[MesopMessage]: 1abc

219 queue = self.root_conversation._out_queue 1d

220 return queue # type: ignore[return-value] 1d

221 

222 @property 1abc

223 def level(self) -> int: 1abc

224 return ( 1d

225 0 if self.super_conversation is None else self.super_conversation.level + 1 

226 ) 

227 

228 def _publish(self, mesop_msg: MesopMessage) -> None: 1abc

229 self.out_queue.put(mesop_msg) 1d

230 

231 def _mesop_message(self, io_message: IOMessage) -> MesopMessage: 1abc

232 return MesopMessage( 1abcd

233 conversation=self, 

234 io_message=io_message, 

235 ) 

236 

237 def visit_default(self, message: IOMessage) -> None: 1abc

238 mesop_msg = self._mesop_message(message) 1d

239 self._publish(mesop_msg) 1d

240 

241 def visit_text(self, message: "TextEvent") -> None: 1abc

242 mesop_msg = self._mesop_message(message) 

243 self._publish(mesop_msg) 

244 

245 def visit_using_auto_reply(self, message: "UsingAutoReplyEvent") -> None: 1abc

246 pass 

247 

248 def visit_run_completion(self, message: "RunCompletionEvent") -> None: 1abc

249 # We can ignore the RunCompletionEvent as we handle RunResponse already 

250 pass 

251 

252 def visit_execute_function(self, message: "ExecuteFunctionEvent") -> None: 1abc

253 mesop_msg = self._mesop_message(message) 

254 self._publish(mesop_msg) 

255 

256 def visit_termination(self, message: "TerminationEvent") -> None: 1abc

257 pass 

258 

259 def visit_text_message(self, message: TextMessage) -> None: 1abc

260 mesop_msg = self._mesop_message(message) 1d

261 self._publish(mesop_msg) 1d

262 

263 def visit_text_input(self, message: TextInput) -> str: 1abc

264 mesop_msg = self._mesop_message(message) 1d

265 self._publish(mesop_msg) 1d

266 return self.in_queue.get() 1d

267 

268 def visit_input_request(self, message: "InputRequestEvent") -> str: 1abc

269 mesop_msg = self._mesop_message(message) 

270 self._publish(mesop_msg) 

271 return self.in_queue.get() 

272 

273 def visit_multiple_choice(self, message: MultipleChoice) -> str: 1abc

274 mesop_msg = self._mesop_message(message) 1d

275 self._publish(mesop_msg) 1d

276 return self.in_queue.get() 1d

277 

278 def process_message(self, message: IOMessage) -> Optional[str]: 1abc

279 return self.visit(message) 1d

280 

281 def create_subconversation(self) -> "MesopUI": 1abc

282 sub_conversation = MesopUI(self) 

283 self.sub_conversations.append(sub_conversation) 

284 

285 return sub_conversation 

286 

287 def _is_stream_braker(self, message: IOMessage) -> bool: 1abc

288 return isinstance(message, (AskingMessage, WorkflowCompleted, KeepAlive)) 1d

289 

290 def respond(self, message: str) -> None: 1abc

291 self.in_queue.put(message) 1d

292 

293 @classmethod 1abc

294 def respond_to( 1abc

295 cls, conversation_id: str, message: str 

296 ) -> Generator[MesopMessage, None, None]: 

297 conversation = cls.get_conversation(conversation_id) 

298 conversation.respond(message) 

299 return conversation.get_message_stream() 

300 

301 def get_message_stream(self) -> Generator[MesopMessage, None, None]: 1abc

302 while True: 

303 message = self.out_queue.get() 1d

304 if self._is_stream_braker(message.io_message): 1d

305 yield message 1d

306 break 1d

307 yield message 1d

308 

309 def handle_wsgi( 1abc

310 self, 

311 app: "Runnable", 1abc

312 environ: dict[str, Any], 1abc

313 start_response: Callable[..., Any], 1abc

314 ) -> list[bytes]: 1abc

315 logger.debug(f"Starting MesopUI using WSGI interface with app: {app}") 1d

316 MesopUI._created_instance = self 1d

317 MesopUI._app = app 1d

318 

319 if configure_static_file_serving is None: # pragme: no cover 1d

320 logger.error("configure_static_file_serving is None") 

321 

322 if MesopUI._me is None: 1d

323 logger.error("MesopUI._me is None") 

324 raise RuntimeError("MesopUI._me is None") 

325 

326 return MesopUI._me(environ, start_response) # type: ignore[no-any-return] 1d

327 

328 

329def run_workflow_mesop(provider: ProviderProtocol, name: str) -> UI: 1abc

330 def workflow_worker( 1d

331 provider: ProviderProtocol, name: str, mesop_ui: MesopUI, workflow_uuid: str 

332 ) -> None: 

333 ui = mesop_ui.create_workflow_ui(workflow_uuid) 1d

334 try: 1d

335 provider.run( 1d

336 name=name, 

337 ui=ui, 

338 ) 

339 except Exception as e: 

340 logger.error( 

341 f"Unexpected exception raised in Mesop workflow worker: {e}", 

342 exc_info=True, 

343 ) 

344 ui.error( 

345 sender="Mesop workflow_worker", 

346 short=f"Unexpected exception raised: {e}", 

347 long=traceback.format_exc(), 

348 ) 

349 return 

350 finally: 

351 mesop_ui.do_not_keep_me_alive() 1d

352 

353 ui_base = MesopUI(keep_alive=True) 1d

354 workflow_uuid = ui_base.id 1d

355 

356 # subconversation = ui_base.create_subconversation() 

357 thread = threading.Thread( 1d

358 target=workflow_worker, args=(provider, name, ui_base, workflow_uuid) 

359 ) 

360 thread.start() 1d

361 

362 return ui_base.create_workflow_ui(workflow_uuid) 1d

363 

364 # # needed for uvicorn to recognize the class as a valid ASGI application 

365 # async def __call__( 

366 # self, 

367 # scope: dict[str, Any], 

368 # receive: Callable[[], Awaitable[dict]], 

369 # send: Callable[[dict], Awaitable[None]], 

370 # ) -> None: 

371 # MesopUI._created_instance = self 

372 # from .main import me 

373 

374 # return await me(scope, receive, send)