Coverage for fastagency/runtimes/ag2/ag2.py: 80%

55 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-19 12:16 +0000

1import asyncio 1afgheibcd

2import re 1afgheibcd

3from collections.abc import Iterable, Mapping 1afgheibcd

4from typing import ( 1afgheibcd

5 TYPE_CHECKING, 

6 Any, 

7 Callable, 

8 Optional, 

9 Protocol, 

10 Union, 

11 runtime_checkable, 

12) 

13 

14from autogen.agentchat import ConversableAgent 1afgheibcd

15from autogen.events.base_event import get_event_classes 1afgheibcd

16 

17from ...base import ( 1afgheibcd

18 UI, 

19 WorkflowTypeVar, 

20 WorkflowsProtocol, 

21 check_register_decorator, 

22) 

23from ...logging import get_logger 1afgheibcd

24 

25if TYPE_CHECKING: 1afgheibcd

26 from autogen.events.base_event import BaseEvent 

27 

28 from fastagency.api.openapi import OpenAPI 

29 

30__all__ = [ 1afgheibcd

31 "Workflow", 

32 "create_ag2_event", 

33] 

34 

35# Populate ag2 event classes for each event type 

36EVENT_CLASSES = get_event_classes() 1afgheibcd

37 

38# Get the logger 

39logger = get_logger(__name__) 1afgheibcd

40 

41logger.info("Importing autogen.base.py") 1afgheibcd

42 

43_patterns = { 1afgheibcd

44 "end_of_message": ( 

45 "^\\n--------------------------------------------------------------------------------\\n$", 

46 ), 

47 "auto_reply": ( 

48 "^\\x1b\\[31m\\n>>>>>>>> USING AUTO REPLY...\\x1b\\[0m\\n$", 

49 "^\\n>>>>>>>> USING AUTO REPLY...\\n$", 

50 ), 

51 "sender_recipient": ( 

52 "^\\x1b\\[33m([a-zA-Z0-9_-]+)\\x1b\\[0m \\(to ([a-zA-Z0-9_-]+)\\):\\n\\n$", 

53 "^([a-zA-Z0-9_-]+) \\(to ([a-zA-Z0-9_-]+)\\):\\n\\n$", 

54 ), 

55 "suggested_function_call": ( 

56 "^\\x1b\\[32m\\*\\*\\*\\*\\* Suggested tool call \\((call_[a-zA-Z0-9]+)\\): ([a-zA-Z0-9_]+) \\*\\*\\*\\*\\*\\x1b\\[0m\\n$", 

57 "^\\*\\*\\*\\*\\* Suggested tool call \\((call_[a-zA-Z0-9]+)\\): ([a-zA-Z0-9_]+) \\*\\*\\*\\*\\*\\n$", 

58 ), 

59 "stars": ("^\\x1b\\[32m(\\*{69}\\*+)\\x1b\\[0m\n$", "^(\\*{69}\\*+)\\n$"), 

60 "function_call_execution": ( 

61 "^\\x1b\\[35m\\n>>>>>>>> EXECUTING FUNCTION ([a-zA-Z_]+)...\\x1b\\[0m\\n$", 

62 "^\\n>>>>>>>> EXECUTING FUNCTION ([a-zA-Z_]+)...\\n$", 

63 ), 

64 "response_from_calling_tool": ( 

65 "^\\x1b\\[32m\\*\\*\\*\\*\\* Response from calling tool \\((call_[a-zA-Z0-9_]+)\\) \\*\\*\\*\\*\\*\\x1b\\[0m\\n$", 

66 "^\\*\\*\\*\\*\\* Response from calling tool \\((call_[a-zA-Z0-9_]+)\\) \\*\\*\\*\\*\\*\\n$", 

67 ), 

68 "no_human_input_received": ( 

69 "^\\x1b\\[31m\\n>>>>>>>> NO HUMAN INPUT RECEIVED\\.\\x1b\\[0m$", 

70 "^\\n>>>>>>>> NO HUMAN INPUT RECEIVED\\.$", 

71 ), 

72 "user_interrupted": ("^USER INTERRUPTED\\n$",), 

73 "arguments": ("^Arguments: \\n(.*)\\n$",), 

74 "auto_reply_input": ( 

75 "^Replying as ([a-zA-Z0-9_]+). Provide feedback to ([a-zA-Z0-9_]+). Press enter to skip and use auto-reply, or type 'exit' to end the conversation: $", 

76 ), 

77 "next_speaker": ( 

78 "^Next speaker: [a-zA-Z0-9_]+$", 

79 "^\\u001b\\[32m\nNext speaker: [a-zA-Z0-9_]+\n\\u001b\\[0m$", 

80 ), 

81} 

82 

83 

84def _match(key: str, string: str, /) -> bool: 1afgheibcd

85 return any(re.match(pattern, string) for pattern in _patterns[key]) 1abcd

86 

87 

88def _findall(key: str, string: str, /) -> tuple[str, ...]: 1afgheibcd

89 for pattern in _patterns[key]: 1abcd

90 if re.match(pattern, string): 1abcd

91 return re.findall(pattern, string)[0] # type: ignore[no-any-return] 1abcd

92 return () # type: ignore[no-any-return] 

93 

94 

95def create_ag2_event(type: Optional[str] = None, **kwargs: Any) -> "BaseEvent": 1afgheibcd

96 type = type or "text" 

97 if type not in EVENT_CLASSES: 

98 raise ValueError(f"Unknown event type: {type}") 

99 

100 # Get the ag2 event class 

101 cls = EVENT_CLASSES[type] 

102 

103 content = kwargs.pop("content", {}) 

104 kwargs.update(content) 

105 

106 return cls(**kwargs) 

107 

108 

109class Workflow(WorkflowsProtocol): 1afgheibcd

110 def __init__(self) -> None: 1afgheibcd

111 """Initialize the workflows.""" 

112 self._workflows: dict[str, tuple[Callable[[UI, dict[str, Any]], str], str]] = {} 1afgheibcd

113 

114 def register( 1afgheibcd

115 self, name: str, description: str, *, fail_on_redefintion: bool = False 

116 ) -> Callable[[WorkflowTypeVar], WorkflowTypeVar]: 

117 def decorator(func: WorkflowTypeVar) -> WorkflowTypeVar: 1afgheibcd

118 check_register_decorator(func) 1afgheibcd

119 if name in self._workflows: 119 ↛ 120line 119 didn't jump to line 120 because the condition on line 119 was never true1afgheibcd

120 if fail_on_redefintion: 

121 raise ValueError(f"A workflow with name '{name}' already exists.") 

122 else: 

123 logger.warning(f"Overwriting workflow with name '{name}'") 

124 

125 self._workflows[name] = func, description 1afgheibcd

126 return func 1afgheibcd

127 

128 return decorator 1afgheibcd

129 

130 def run( 1afgheibcd

131 self, 

132 name: str, 

133 ui: UI, 

134 user_id: Optional[str] = None, 

135 **kwargs: Any, 

136 ) -> str: 

137 workflow, _ = self._workflows[name] 1aebcdj

138 

139 # todo: inject user_id into call (and other stuff) 

140 try: 1aebcdj

141 ui.workflow_started( 1aebcdj

142 sender="Workflow", 

143 recipient="User", 

144 name=name, 

145 description=self.get_description(name), 

146 params=kwargs, 

147 ) 

148 retval = ( 1aebcdj

149 asyncio.run(workflow(ui, kwargs)) 

150 if asyncio.iscoroutinefunction(workflow) 

151 else workflow(ui, kwargs) 

152 ) 

153 

154 except Exception as e: 1aebcd

155 logger.error( 1aebcd

156 f"Unhandled exception occurred when executing the workflow: {e}", 

157 exc_info=True, 

158 ) 

159 ui.error( 1aebcd

160 sender="Workflow", 

161 recipient="User", 

162 short="Unhandled exception occurred when executing the workflow.", 

163 long=str(e), 

164 ) 

165 retval = f"Unhandled exception occurred when executing the workflow: {e}" 1aebcd

166 

167 ui.workflow_completed( 1aebcdj

168 sender="Workflow", 

169 recipient="User", 

170 result=retval, 

171 ) 

172 logger.info(f"Workflow '{name}' completed with result: {retval}") 1aebcdj

173 

174 return retval 1aebcdj

175 

176 @property 1afgheibcd

177 def names(self) -> list[str]: 1afgheibcd

178 return list(self._workflows.keys()) 1aebcdj

179 

180 def get_description(self, name: str) -> str: 1afgheibcd

181 _, description = self._workflows.get(name, (None, "Description not available!")) 1aebcdj

182 return description 1aebcdj

183 

184 def register_api( 1afgheibcd

185 self, 

186 api: "OpenAPI", 

187 callers: Union[ConversableAgent, Iterable[ConversableAgent]], 

188 executors: Union[ConversableAgent, Iterable[ConversableAgent]], 

189 functions: Optional[ 

190 Union[str, Iterable[Union[str, Mapping[str, Mapping[str, str]]]]] 

191 ] = None, 

192 ) -> None: 

193 if not isinstance(callers, Iterable): 193 ↛ 195line 193 didn't jump to line 195 because the condition on line 193 was always true1aebcd

194 callers = [callers] 1aebcd

195 if not isinstance(executors, Iterable): 195 ↛ 197line 195 didn't jump to line 197 because the condition on line 195 was always true1aebcd

196 executors = [executors] 1aebcd

197 if isinstance(functions, str): 1aebcd

198 functions = [functions] 1abcd

199 

200 for caller in callers: 1aebcd

201 api._register_for_llm(caller, functions=functions) 1aebcd

202 

203 for executor in executors: 1aebcd

204 api._register_for_execution(executor, functions=functions) 1aebcd

205 

206 

207@runtime_checkable 1afgheibcd

208class Toolable(Protocol): 1afgheibcd

209 def register( 1afgheibcd

210 self, 

211 *, 

212 caller: ConversableAgent, 1afgheibcd

213 executor: Union[ConversableAgent, list[ConversableAgent]], 1afgheibcd

214 ) -> None: ... 1afgheibcd