Coverage for src / agent / agent.py: 78%

175 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-11 14:30 +0000

1# Copyright 2025-2026 Microsoft Corporation 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Core Agent class with multi-provider LLM support.""" 

16 

17import logging 

18import os 

19import re 

20from collections.abc import AsyncIterator 

21from importlib import resources 

22from pathlib import Path 

23from typing import Any, cast 

24 

25from agent.config import load_config 

26from agent.config.schema import AgentSettings 

27from agent.tools.filesystem import FileSystemTools 

28from agent.tools.git import GitTools 

29from agent.tools.gitlab import GitLabTools 

30from agent.tools.maven import MavenTools 

31from agent.tools.toolset import AgentToolset 

32 

33logger = logging.getLogger(__name__) 

34 

35 

36class Agent: 

37 """Agent with multi-provider LLM support and extensible tools. 

38 

39 Supports seven LLM providers through Microsoft Agent Framework: 

40 - OpenAI: Direct OpenAI API 

41 - Anthropic: Direct Anthropic API 

42 - Azure OpenAI: Azure-hosted OpenAI models 

43 - Azure AI Foundry: Microsoft's managed AI platform 

44 - Google Gemini: Google's Gemini models (custom integration) 

45 - GitHub Models: GitHub's AI models via OpenAI-compatible API 

46 - Local (Docker Models): Local models via Docker Desktop 

47 

48 Example: 

49 >>> from agent.config import load_config 

50 >>> settings = load_config() 

51 >>> agent = Agent(settings) 

52 >>> response = await agent.run("Say hello") 

53 >>> print(response) 

54 Hello! How can I help you today? 

55 """ 

56 

57 def __init__( 

58 self, 

59 settings: AgentSettings | None = None, 

60 chat_client: Any | None = None, 

61 toolsets: list[AgentToolset] | None = None, 

62 middleware: list | None = None, 

63 memory_manager: Any | None = None, 

64 ): 

65 """Initialize Agent. 

66 

67 Args: 

68 settings: Agent settings (loads from file + env if not provided) 

69 chat_client: Chat client for testing (optional, for dependency injection) 

70 toolsets: List of toolsets (default: FileSystemTools) 

71 middleware: List of middleware (framework auto-categorizes by type) 

72 memory_manager: Memory manager for conversation storage (optional) 

73 

74 Example: 

75 # Production use 

76 >>> from agent.config import load_config 

77 >>> settings = load_config() 

78 >>> agent = Agent(settings) 

79 

80 # Testing with mocks 

81 >>> from tests.mocks import MockChatClient 

82 >>> from tests.fixtures.config import mock_openai_settings 

83 >>> settings = mock_openai_settings() 

84 >>> mock_client = MockChatClient(response="Test response") 

85 >>> agent = Agent(settings=settings, chat_client=mock_client) 

86 

87 # With custom middleware 

88 >>> from agent.middleware import create_middleware 

89 >>> mw = create_middleware() 

90 >>> agent = Agent(settings=settings, middleware=mw) 

91 """ 

92 self.settings = settings or load_config() 

93 # Legacy alias for compatibility during migration 

94 self.config = self.settings 

95 

96 # Dependency injection for testing 

97 if chat_client is not None: 

98 self.chat_client = chat_client 

99 else: 

100 self.chat_client = self._create_chat_client() 

101 

102 # Initialize memory manager if enabled 

103 if memory_manager is not None: 

104 self.memory_manager = memory_manager 

105 elif self.settings.memory_enabled: 

106 from agent.memory import create_memory_manager 

107 

108 self.memory_manager = create_memory_manager(self.settings) 

109 logger.info(f"Memory enabled: {self.settings.memory_type}") 

110 else: 

111 self.memory_manager = None 

112 

113 # Initialize toolsets (avoid global state) 

114 if toolsets is None: 

115 toolsets = [ 

116 FileSystemTools(self.settings), 

117 GitTools(self.settings), 

118 GitLabTools(self.settings), 

119 MavenTools(self.settings), 

120 ] 

121 

122 # Load skills from settings 

123 try: 

124 # Auto-detect bundled_dir if not set 

125 if self.settings.skills.bundled_dir is None: 

126 # Use importlib.resources to find bundled skills in package 

127 try: 

128 bundled_skills_path = resources.files("agent").joinpath("_bundled_skills") 

129 self.settings.skills.bundled_dir = str(bundled_skills_path) 

130 logger.debug( 

131 f"Auto-detected bundled_dir: {self.settings.skills.bundled_dir}" 

132 ) 

133 except (ModuleNotFoundError, AttributeError, TypeError) as e: 

134 logger.warning(f"Could not auto-detect bundled_dir: {e}") 

135 

136 from agent.skills.loader import SkillLoader 

137 

138 skill_loader = SkillLoader(self.settings) 

139 skill_toolsets, script_tools, skill_docs = skill_loader.load_enabled_skills() 

140 

141 # Store skill documentation index for context provider 

142 self.skill_docs = skill_docs 

143 

144 if skill_toolsets: 

145 toolsets.extend(skill_toolsets) 

146 logger.info(f"Loaded {len(skill_toolsets)} skill toolsets") 

147 

148 if script_tools: 

149 toolsets.append(script_tools) 

150 logger.info(f"Loaded script wrapper with {script_tools.script_count} scripts") 

151 

152 if skill_docs.has_skills(): 

153 logger.info(f"Loaded {skill_docs.count()} skill(s) for progressive disclosure") 

154 

155 except Exception as e: 

156 logger.error(f"Failed to load skills: {e}", exc_info=True) 

157 # Continue without skills - graceful degradation 

158 

159 self.toolsets = toolsets 

160 

161 # Collect all tools from toolsets 

162 self.tools = [] 

163 for toolset in self.toolsets: 

164 self.tools.extend(toolset.get_tools()) 

165 

166 # Initialize middleware (create default if not provided) 

167 if middleware is None: 

168 from agent.middleware import create_middleware 

169 

170 middleware = create_middleware() 

171 self.middleware = middleware 

172 

173 # Create agent 

174 self.agent = self._create_agent() 

175 

176 def _create_chat_client(self) -> Any: 

177 """Create chat client based on configuration. 

178 

179 Supports: 

180 - openai: OpenAI API (gpt-5-mini, gpt-4o, etc.) 

181 - anthropic: Anthropic API (claude-sonnet-4-5, claude-opus-4, etc.) 

182 - azure: Azure OpenAI (gpt-5-codex, gpt-4o, etc.) 

183 - foundry: Azure AI Foundry with managed models 

184 - gemini: Google Gemini (gemini-2.0-flash-exp, gemini-2.5-pro, etc.) 

185 - github: GitHub Models (phi-4, llama-3.3-70b-instruct, etc.) 

186 - local: Local models via Docker Desktop (phi4, etc.) 

187 

188 Returns: 

189 Configured chat client for the selected provider 

190 

191 Raises: 

192 ValueError: If provider is unknown or not supported 

193 """ 

194 if self.settings.llm_provider == "openai": 

195 from agent_framework.openai import OpenAIChatClient 

196 

197 return OpenAIChatClient( 

198 model_id=self.settings.openai_model, 

199 api_key=self.settings.openai_api_key, 

200 ) 

201 elif self.settings.llm_provider == "anthropic": 

202 from agent.providers.anthropic import AnthropicChatClient 

203 

204 return AnthropicChatClient( 

205 model_id=self.settings.anthropic_model, 

206 api_key=self.settings.anthropic_api_key, 

207 ) 

208 elif self.settings.llm_provider == "azure": 

209 from agent_framework.azure import AzureOpenAIChatClient, AzureOpenAIResponsesClient 

210 from azure.identity import AzureCliCredential, DefaultAzureCredential 

211 

212 # gpt-5-codex requires the responses endpoint, use AzureOpenAIResponsesClient 

213 # gpt-5-mini and others use chat completions endpoint, use AzureOpenAIChatClient 

214 deployment_name = self.settings.azure_openai_deployment or "" 

215 use_responses_client = "codex" in deployment_name.lower() 

216 client_class = ( 

217 AzureOpenAIResponsesClient if use_responses_client else AzureOpenAIChatClient 

218 ) 

219 

220 # Use API key if provided, otherwise use Azure CLI credential 

221 if self.settings.azure_openai_api_key: 

222 return client_class( 

223 endpoint=self.settings.azure_openai_endpoint, 

224 deployment_name=self.settings.azure_openai_deployment, 

225 api_version=self.settings.azure_openai_api_version, 

226 api_key=self.settings.azure_openai_api_key, 

227 ) 

228 else: 

229 # Try AzureCliCredential first, fall back to DefaultAzureCredential 

230 credential: AzureCliCredential | DefaultAzureCredential 

231 try: 

232 credential = AzureCliCredential() 

233 return client_class( 

234 endpoint=self.settings.azure_openai_endpoint, 

235 deployment_name=self.settings.azure_openai_deployment, 

236 api_version=self.settings.azure_openai_api_version, 

237 credential=credential, 

238 ) 

239 except Exception: 

240 credential = DefaultAzureCredential() 

241 return client_class( 

242 endpoint=self.settings.azure_openai_endpoint, 

243 deployment_name=self.settings.azure_openai_deployment, 

244 api_version=self.settings.azure_openai_api_version, 

245 credential=credential, 

246 ) 

247 elif self.settings.llm_provider == "foundry": 

248 from agent_framework.azure import AzureAIAgentClient 

249 from azure.identity.aio import AzureCliCredential as AsyncAzureCliCredential 

250 

251 return AzureAIAgentClient( 

252 project_endpoint=self.settings.azure_project_endpoint, 

253 model_deployment_name=self.settings.azure_model_deployment, 

254 async_credential=AsyncAzureCliCredential(), 

255 ) 

256 elif self.settings.llm_provider == "gemini": 

257 from agent.providers.gemini import GeminiChatClient 

258 

259 return GeminiChatClient( 

260 model_id=self.settings.gemini_model, 

261 api_key=self.settings.gemini_api_key, 

262 project_id=self.settings.gemini_project_id, 

263 location=self.settings.gemini_location, 

264 use_vertexai=self.settings.gemini_use_vertexai, 

265 ) 

266 elif self.settings.llm_provider == "github": 

267 from agent.providers.github import GitHubChatClient 

268 

269 return GitHubChatClient( 

270 model_id=self.settings.github_model, 

271 token=self.settings.github_token, 

272 endpoint=self.settings.github_endpoint, 

273 org=self.settings.github_org, 

274 ) 

275 elif self.settings.llm_provider == "local": 

276 from agent_framework.openai import OpenAIChatClient 

277 

278 return OpenAIChatClient( 

279 model_id=self.settings.local_model, 

280 base_url=self.settings.local_base_url, 

281 api_key="not-needed", # Docker doesn't require authentication 

282 ) 

283 else: 

284 raise ValueError( 

285 f"Unknown provider: {self.settings.llm_provider}. " 

286 f"Supported: openai, anthropic, azure, foundry, gemini, github, local" 

287 ) 

288 

289 def _load_system_prompt(self) -> str: 

290 """Load system prompt with three-tier fallback and placeholder replacement. 

291 

292 Loading priority: 

293 1. AGENT_SYSTEM_PROMPT env variable (explicit override) 

294 2. ~/.osdu-agent/system.md (user's default custom prompt) 

295 3. Package default from prompts/system.md 

296 4. Hardcoded fallback (if all file loading fails) 

297 

298 Returns: 

299 System prompt string with placeholders replaced and YAML front matter stripped 

300 """ 

301 prompt_content = "" 

302 

303 # Tier 1: Try explicit env variable override (AGENT_SYSTEM_PROMPT) 

304 if self.settings.system_prompt_file: 

305 try: 

306 # Expand environment variables and user home directory 

307 expanded_path = os.path.expandvars(self.settings.system_prompt_file) 

308 custom_path = Path(expanded_path).expanduser() 

309 prompt_content = custom_path.read_text(encoding="utf-8") 

310 logger.info( 

311 f"Loaded system prompt from AGENT_SYSTEM_PROMPT: {self.settings.system_prompt_file}" 

312 ) 

313 except Exception as e: 

314 logger.warning( 

315 f"Failed to load system prompt from AGENT_SYSTEM_PROMPT={self.settings.system_prompt_file}: {e}. " 

316 "Trying next fallback." 

317 ) 

318 

319 # Tier 2: Try user's default custom prompt (~/.osdu-agent/system.md) 

320 if not prompt_content and self.settings.agent_data_dir: 

321 try: 

322 user_default_path = self.settings.agent_data_dir / "system.md" 

323 if user_default_path.exists(): 

324 prompt_content = user_default_path.read_text(encoding="utf-8") 

325 logger.info(f"Loaded system prompt from user default: {user_default_path}") 

326 except Exception as e: 

327 logger.warning( 

328 f"Failed to load user default system prompt: {e}. Trying next fallback." 

329 ) 

330 

331 # Tier 3: Try package default 

332 if not prompt_content: 

333 try: 

334 # Use importlib.resources for package resource loading 

335 prompt_files = resources.files("agent.prompts") 

336 prompt_file = prompt_files.joinpath("system.md") 

337 prompt_content = prompt_file.read_text(encoding="utf-8") 

338 logger.info("Loaded system prompt from package default: prompts/system.md") 

339 except Exception as e: 

340 logger.warning( 

341 f"Failed to load package default system prompt: {e}. Using hardcoded fallback." 

342 ) 

343 

344 # Tier 4: Hardcoded fallback 

345 if not prompt_content: 

346 prompt_content = """You are a helpful AI assistant that can use tools to assist with various tasks. 

347 

348You help users with: 

349- Natural language interactions 

350- Information synthesis and summarization 

351- Tool-based task execution 

352- Context-aware conversations 

353 

354Be helpful, concise, and clear in your responses.""" 

355 logger.warning("Using hardcoded fallback system prompt") 

356 

357 # Strip YAML front matter if present (more robust regex-based approach) 

358 yaml_pattern = r"^---\s*\n.*?\n---\s*\n" 

359 if re.match(yaml_pattern, prompt_content, re.DOTALL): 

360 prompt_content = re.sub(yaml_pattern, "", prompt_content, flags=re.DOTALL) 

361 logger.info("Stripped YAML front matter from system prompt") 

362 

363 # Replace placeholders with settings values 

364 replacements = { 

365 "{{DATA_DIR}}": str(self.settings.agent_data_dir), 

366 "{{SESSION_DIR}}": str(self.settings.agent_session_dir), 

367 "{{MODEL}}": self.settings.get_model_display_name(), 

368 "{{PROVIDER}}": self.settings.llm_provider, 

369 "{{MEMORY_ENABLED}}": str(self.settings.memory_enabled), 

370 } 

371 

372 for placeholder, value in replacements.items(): 

373 prompt_content = prompt_content.replace(placeholder, value) 

374 

375 # Warn if any unresolved placeholders remain 

376 unresolved = re.findall(r"\{\{[^}]+\}\}", prompt_content) 

377 if unresolved: 

378 logger.warning( 

379 f"Unresolved placeholders in system prompt: {', '.join(set(unresolved))}" 

380 ) 

381 

382 return prompt_content 

383 

384 def _create_agent(self) -> Any: 

385 """Create agent with tools, instructions, and middleware. 

386 

387 Returns: 

388 Configured agent instance ready to handle requests 

389 """ 

390 instructions = self._load_system_prompt() 

391 

392 # Create context providers for dynamic injection 

393 context_providers: list[Any] = [] 

394 

395 # Add memory context provider if enabled 

396 if self.memory_manager: 

397 from agent.memory import MemoryContextProvider 

398 

399 memory_provider = MemoryContextProvider( 

400 self.memory_manager, history_limit=self.settings.memory_history_limit 

401 ) 

402 context_providers.append(memory_provider) 

403 logger.info("Memory context provider enabled") 

404 

405 # Add skill context provider if skills are loaded 

406 if hasattr(self, "skill_docs") and self.skill_docs.has_skills(): 

407 from agent.skills.context_provider import SkillContextProvider 

408 

409 skill_provider = SkillContextProvider( 

410 skill_docs=self.skill_docs, 

411 memory_manager=None, # Not used in current implementation 

412 max_skills=3, 

413 ) 

414 context_providers.append(skill_provider) 

415 logger.info(f"Skill context provider enabled with {self.skill_docs.count()} skills") 

416 

417 # IMPORTANT: Pass middleware as a list, not dict 

418 # Agent Framework automatically categorizes middleware by signature 

419 # Converting to dict breaks middleware invocation 

420 logger.info( 

421 f"Creating agent with {len(context_providers)} context providers: {[type(p).__name__ for p in context_providers]}" 

422 ) 

423 return self.chat_client.create_agent( 

424 name="Agent", 

425 instructions=instructions, 

426 tools=self.tools, 

427 middleware=self.middleware, # Must be list, not dict 

428 context_providers=context_providers if context_providers else None, 

429 ) 

430 

431 def get_new_thread(self) -> Any: 

432 """Create a new conversation thread. 

433 

434 Returns: 

435 New thread instance for maintaining conversation context 

436 

437 Example: 

438 >>> agent = Agent(config) 

439 >>> thread = agent.get_new_thread() 

440 >>> response = await agent.run("Hello", thread=thread) 

441 """ 

442 if hasattr(self.chat_client, "create_thread"): 

443 return self.chat_client.create_thread() 

444 # Fallback: some providers may not support explicit threads (e.g., OpenAI) 

445 # They manage conversation context automatically through message history 

446 return None 

447 

448 async def run(self, prompt: str, thread: Any | None = None) -> str: 

449 """Run agent with prompt. 

450 

451 Args: 

452 prompt: User prompt 

453 thread: Optional thread for conversation context 

454 

455 Returns: 

456 Agent response as a string 

457 

458 Example: 

459 >>> agent = Agent(config) 

460 >>> response = await agent.run("Say hello to Alice") 

461 >>> print(response) 

462 Hello, Alice! 

463 

464 >>> # With thread for context 

465 >>> thread = agent.get_new_thread() 

466 >>> response = await agent.run("Hello", thread=thread) 

467 """ 

468 if thread: 

469 result = await self.agent.run(prompt, thread=thread) 

470 else: 

471 result = await self.agent.run(prompt) 

472 

473 # Handle different provider return types 

474 # OpenAI returns str, Anthropic returns AgentRunResponse with .text 

475 if isinstance(result, str): 

476 return result 

477 elif hasattr(result, "text"): 

478 return str(result.text) 

479 else: 

480 return cast(str, result) 

481 

482 async def run_stream(self, prompt: str, thread: Any | None = None) -> AsyncIterator[str]: 

483 """Run agent with streaming response. 

484 

485 Args: 

486 prompt: User prompt 

487 thread: Optional thread for conversation context 

488 

489 Yields: 

490 Response chunks as they become available 

491 

492 Example: 

493 >>> agent = Agent(config) 

494 >>> async for chunk in agent.run_stream("Say hello"): 

495 ... print(chunk, end="") 

496 Hello! How can I help you? 

497 

498 >>> # With thread for context 

499 >>> thread = agent.get_new_thread() 

500 >>> async for chunk in agent.run_stream("Hello", thread=thread): 

501 ... print(chunk, end="") 

502 """ 

503 if thread: 

504 stream = self.agent.run_stream(prompt, thread=thread) 

505 else: 

506 stream = self.agent.run_stream(prompt) 

507 

508 async for chunk in stream: 

509 # Handle different provider chunk types 

510 # OpenAI returns str, Anthropic returns AgentRunResponseUpdate with .text 

511 if isinstance(chunk, str): 

512 yield chunk 

513 elif hasattr(chunk, "text"): 

514 yield chunk.text 

515 else: 

516 yield str(chunk)