Coverage for src / agent / events.py: 96%
47 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 14:30 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 14:30 +0000
1# Copyright 2025-2026 Microsoft Corporation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Event systems for agent execution transparency.
17This module provides two event implementations:
191. **EventEmitter** (PRIMARY - from display.events):
20 - Task-safe using asyncio.Queue
21 - Used for runtime execution visualization
22 - Preferred for new code
242. **EventBus** (LEGACY - maintained for backward compatibility):
25 - Simple observer pattern
26 - Used in tests and documentation
27 - Consider using EventEmitter for new implementations
29Recommended Usage:
30 >>> from agent.events import get_event_emitter # Primary API
31 >>> emitter = get_event_emitter()
32 >>> emitter.emit(LLMRequestEvent())
34Legacy Usage (maintained for tests):
35 >>> from agent.events import Event, EventType, get_event_bus
36 >>> bus = get_event_bus()
37 >>> bus.subscribe(my_listener)
38"""
40from dataclasses import dataclass, field
41from enum import Enum
42from typing import Any, ClassVar, Protocol
43from uuid import uuid4
46class EventType(Enum):
47 """Event types for the event bus.
49 These events represent key points in the agent execution lifecycle
50 that components may want to observe.
51 """
53 LLM_REQUEST = "llm_request"
54 LLM_RESPONSE = "llm_response"
55 TOOL_START = "tool_start"
56 TOOL_COMPLETE = "tool_complete"
57 TOOL_ERROR = "tool_error"
58 AGENT_START = "agent_start"
59 AGENT_COMPLETE = "agent_complete"
62@dataclass
63class Event:
64 """Base event class.
66 Events carry data about what happened in the system. Listeners receive
67 events and can react to them without the event emitter needing to know
68 about the listeners.
70 Attributes:
71 type: The type of event (from EventType enum)
72 data: Dictionary containing event-specific data
73 event_id: Unique identifier for event correlation
74 parent_id: ID of parent event for hierarchical display (optional)
75 """
77 type: EventType
78 data: dict[str, Any]
79 event_id: str = field(default_factory=lambda: str(uuid4()))
80 parent_id: str | None = None
83class EventListener(Protocol):
84 """Protocol for event listeners.
86 Any object implementing this protocol can be registered as a listener.
87 The listener will receive all events emitted on the bus.
88 """
90 def handle_event(self, event: Event) -> None:
91 """Handle an event.
93 Args:
94 event: The event to handle
95 """
96 pass
99class EventBus:
100 """Singleton event bus for loose coupling.
102 The event bus allows components to communicate without direct dependencies.
103 Middleware can emit events, and display components can listen to them.
105 This implements the Singleton pattern - there's only one event bus instance
106 per application.
108 Example:
109 >>> bus = EventBus()
110 >>> # Register a listener
111 >>> class MyListener:
112 ... def handle_event(self, event):
113 ... print(f"Received: {event.type}")
114 >>> listener = MyListener()
115 >>> bus.subscribe(listener)
116 >>> # Emit an event
117 >>> event = Event(EventType.TOOL_START, {"tool": "read_file"})
118 >>> bus.emit(event)
119 Received: EventType.TOOL_START
120 """
122 _instance: ClassVar["EventBus | None"] = None
123 _listeners: list[EventListener]
125 def __new__(cls) -> "EventBus":
126 """Create or return the singleton instance."""
127 if cls._instance is None:
128 cls._instance = super().__new__(cls)
129 cls._instance._listeners = []
130 return cls._instance
132 def subscribe(self, listener: EventListener) -> None:
133 """Subscribe a listener to events.
135 Args:
136 listener: Object implementing EventListener protocol
138 Example:
139 >>> bus = EventBus()
140 >>> bus.subscribe(my_display_component)
141 """
142 if listener not in self._listeners:
143 self._listeners.append(listener)
145 def unsubscribe(self, listener: EventListener) -> None:
146 """Unsubscribe a listener from events.
148 Args:
149 listener: Previously subscribed listener
151 Example:
152 >>> bus = EventBus()
153 >>> bus.unsubscribe(my_display_component)
154 """
155 if listener in self._listeners:
156 self._listeners.remove(listener)
158 def emit(self, event: Event) -> None:
159 """Emit an event to all listeners.
161 All registered listeners will receive the event via their
162 handle_event method.
164 Args:
165 event: Event to emit
167 Example:
168 >>> bus = EventBus()
169 >>> event = Event(EventType.LLM_REQUEST, {"prompt": "Hello"})
170 >>> bus.emit(event)
171 """
172 for listener in self._listeners:
173 listener.handle_event(event)
175 def clear(self) -> None:
176 """Clear all listeners.
178 This is primarily useful for testing to ensure a clean state
179 between test runs.
181 Example:
182 >>> bus = EventBus()
183 >>> bus.clear() # Remove all listeners
184 """
185 self._listeners.clear()
188def get_event_bus() -> EventBus:
189 """Get the global event bus instance.
191 Returns:
192 The singleton EventBus instance
194 Example:
195 >>> bus = get_event_bus()
196 >>> bus.subscribe(my_listener)
197 """
198 return EventBus()
201def get_event_emitter() -> EventBus:
202 """Alias for get_event_bus() for consistency with display module naming.
204 This provides compatibility with the butler-agent pattern where
205 get_event_emitter() is used by middleware and display components.
207 Returns:
208 The singleton EventBus instance
210 Example:
211 >>> emitter = get_event_emitter()
212 >>> emitter.emit(event)
213 """
214 return get_event_bus()
217# ============================================================================
218# Re-exports from display.events (Primary API)
219# ============================================================================
220# These re-exports provide convenient access to the task-safe EventEmitter
221# implementation which is the recommended approach for new code.
223try:
224 from agent.display.events import (
225 EventEmitter,
226 ExecutionEvent,
227 LLMRequestEvent,
228 LLMResponseEvent,
229 ToolCompleteEvent,
230 ToolErrorEvent,
231 ToolStartEvent,
232 get_current_tool_event_id,
233 set_current_tool_event_id,
234 )
235 from agent.display.events import (
236 get_event_emitter as get_display_event_emitter,
237 )
239 __all__ = [
240 # Legacy EventBus API (backward compatibility)
241 "Event",
242 "EventType",
243 "EventListener",
244 "EventBus",
245 "get_event_bus",
246 "get_event_emitter",
247 # Primary EventEmitter API (recommended)
248 "EventEmitter",
249 "ExecutionEvent",
250 "LLMRequestEvent",
251 "LLMResponseEvent",
252 "ToolStartEvent",
253 "ToolCompleteEvent",
254 "ToolErrorEvent",
255 "get_display_event_emitter",
256 "get_current_tool_event_id",
257 "set_current_tool_event_id",
258 ]
259except ImportError:
260 # Handle circular import during initialization
261 __all__ = [
262 "Event",
263 "EventType",
264 "EventListener",
265 "EventBus",
266 "get_event_bus",
267 "get_event_emitter",
268 ]