Coverage for src / agent / events.py: 96%

47 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-11 14:30 +0000

1# Copyright 2025-2026 Microsoft Corporation 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Event systems for agent execution transparency. 

16 

17This module provides two event implementations: 

18 

191. **EventEmitter** (PRIMARY - from display.events): 

20 - Task-safe using asyncio.Queue 

21 - Used for runtime execution visualization 

22 - Preferred for new code 

23 

242. **EventBus** (LEGACY - maintained for backward compatibility): 

25 - Simple observer pattern 

26 - Used in tests and documentation 

27 - Consider using EventEmitter for new implementations 

28 

29Recommended Usage: 

30 >>> from agent.events import get_event_emitter # Primary API 

31 >>> emitter = get_event_emitter() 

32 >>> emitter.emit(LLMRequestEvent()) 

33 

34Legacy Usage (maintained for tests): 

35 >>> from agent.events import Event, EventType, get_event_bus 

36 >>> bus = get_event_bus() 

37 >>> bus.subscribe(my_listener) 

38""" 

39 

40from dataclasses import dataclass, field 

41from enum import Enum 

42from typing import Any, ClassVar, Protocol 

43from uuid import uuid4 

44 

45 

46class EventType(Enum): 

47 """Event types for the event bus. 

48 

49 These events represent key points in the agent execution lifecycle 

50 that components may want to observe. 

51 """ 

52 

53 LLM_REQUEST = "llm_request" 

54 LLM_RESPONSE = "llm_response" 

55 TOOL_START = "tool_start" 

56 TOOL_COMPLETE = "tool_complete" 

57 TOOL_ERROR = "tool_error" 

58 AGENT_START = "agent_start" 

59 AGENT_COMPLETE = "agent_complete" 

60 

61 

62@dataclass 

63class Event: 

64 """Base event class. 

65 

66 Events carry data about what happened in the system. Listeners receive 

67 events and can react to them without the event emitter needing to know 

68 about the listeners. 

69 

70 Attributes: 

71 type: The type of event (from EventType enum) 

72 data: Dictionary containing event-specific data 

73 event_id: Unique identifier for event correlation 

74 parent_id: ID of parent event for hierarchical display (optional) 

75 """ 

76 

77 type: EventType 

78 data: dict[str, Any] 

79 event_id: str = field(default_factory=lambda: str(uuid4())) 

80 parent_id: str | None = None 

81 

82 

83class EventListener(Protocol): 

84 """Protocol for event listeners. 

85 

86 Any object implementing this protocol can be registered as a listener. 

87 The listener will receive all events emitted on the bus. 

88 """ 

89 

90 def handle_event(self, event: Event) -> None: 

91 """Handle an event. 

92 

93 Args: 

94 event: The event to handle 

95 """ 

96 pass 

97 

98 

99class EventBus: 

100 """Singleton event bus for loose coupling. 

101 

102 The event bus allows components to communicate without direct dependencies. 

103 Middleware can emit events, and display components can listen to them. 

104 

105 This implements the Singleton pattern - there's only one event bus instance 

106 per application. 

107 

108 Example: 

109 >>> bus = EventBus() 

110 >>> # Register a listener 

111 >>> class MyListener: 

112 ... def handle_event(self, event): 

113 ... print(f"Received: {event.type}") 

114 >>> listener = MyListener() 

115 >>> bus.subscribe(listener) 

116 >>> # Emit an event 

117 >>> event = Event(EventType.TOOL_START, {"tool": "read_file"}) 

118 >>> bus.emit(event) 

119 Received: EventType.TOOL_START 

120 """ 

121 

122 _instance: ClassVar["EventBus | None"] = None 

123 _listeners: list[EventListener] 

124 

125 def __new__(cls) -> "EventBus": 

126 """Create or return the singleton instance.""" 

127 if cls._instance is None: 

128 cls._instance = super().__new__(cls) 

129 cls._instance._listeners = [] 

130 return cls._instance 

131 

132 def subscribe(self, listener: EventListener) -> None: 

133 """Subscribe a listener to events. 

134 

135 Args: 

136 listener: Object implementing EventListener protocol 

137 

138 Example: 

139 >>> bus = EventBus() 

140 >>> bus.subscribe(my_display_component) 

141 """ 

142 if listener not in self._listeners: 

143 self._listeners.append(listener) 

144 

145 def unsubscribe(self, listener: EventListener) -> None: 

146 """Unsubscribe a listener from events. 

147 

148 Args: 

149 listener: Previously subscribed listener 

150 

151 Example: 

152 >>> bus = EventBus() 

153 >>> bus.unsubscribe(my_display_component) 

154 """ 

155 if listener in self._listeners: 

156 self._listeners.remove(listener) 

157 

158 def emit(self, event: Event) -> None: 

159 """Emit an event to all listeners. 

160 

161 All registered listeners will receive the event via their 

162 handle_event method. 

163 

164 Args: 

165 event: Event to emit 

166 

167 Example: 

168 >>> bus = EventBus() 

169 >>> event = Event(EventType.LLM_REQUEST, {"prompt": "Hello"}) 

170 >>> bus.emit(event) 

171 """ 

172 for listener in self._listeners: 

173 listener.handle_event(event) 

174 

175 def clear(self) -> None: 

176 """Clear all listeners. 

177 

178 This is primarily useful for testing to ensure a clean state 

179 between test runs. 

180 

181 Example: 

182 >>> bus = EventBus() 

183 >>> bus.clear() # Remove all listeners 

184 """ 

185 self._listeners.clear() 

186 

187 

188def get_event_bus() -> EventBus: 

189 """Get the global event bus instance. 

190 

191 Returns: 

192 The singleton EventBus instance 

193 

194 Example: 

195 >>> bus = get_event_bus() 

196 >>> bus.subscribe(my_listener) 

197 """ 

198 return EventBus() 

199 

200 

201def get_event_emitter() -> EventBus: 

202 """Alias for get_event_bus() for consistency with display module naming. 

203 

204 This provides compatibility with the butler-agent pattern where 

205 get_event_emitter() is used by middleware and display components. 

206 

207 Returns: 

208 The singleton EventBus instance 

209 

210 Example: 

211 >>> emitter = get_event_emitter() 

212 >>> emitter.emit(event) 

213 """ 

214 return get_event_bus() 

215 

216 

217# ============================================================================ 

218# Re-exports from display.events (Primary API) 

219# ============================================================================ 

220# These re-exports provide convenient access to the task-safe EventEmitter 

221# implementation which is the recommended approach for new code. 

222 

223try: 

224 from agent.display.events import ( 

225 EventEmitter, 

226 ExecutionEvent, 

227 LLMRequestEvent, 

228 LLMResponseEvent, 

229 ToolCompleteEvent, 

230 ToolErrorEvent, 

231 ToolStartEvent, 

232 get_current_tool_event_id, 

233 set_current_tool_event_id, 

234 ) 

235 from agent.display.events import ( 

236 get_event_emitter as get_display_event_emitter, 

237 ) 

238 

239 __all__ = [ 

240 # Legacy EventBus API (backward compatibility) 

241 "Event", 

242 "EventType", 

243 "EventListener", 

244 "EventBus", 

245 "get_event_bus", 

246 "get_event_emitter", 

247 # Primary EventEmitter API (recommended) 

248 "EventEmitter", 

249 "ExecutionEvent", 

250 "LLMRequestEvent", 

251 "LLMResponseEvent", 

252 "ToolStartEvent", 

253 "ToolCompleteEvent", 

254 "ToolErrorEvent", 

255 "get_display_event_emitter", 

256 "get_current_tool_event_id", 

257 "set_current_tool_event_id", 

258 ] 

259except ImportError: 

260 # Handle circular import during initialization 

261 __all__ = [ 

262 "Event", 

263 "EventType", 

264 "EventListener", 

265 "EventBus", 

266 "get_event_bus", 

267 "get_event_emitter", 

268 ]