Coverage for src / agent / commands / loader.py: 77%
90 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 14:30 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 14:30 +0000
1# Copyright 2025-2026 Microsoft Corporation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Slash command loader for discovering and loading commands.
17This module handles command discovery from bundled and user directories,
18using importlib.resources for wheel-safe access to bundled commands.
19"""
21import logging
22from importlib.resources import as_file, files
23from pathlib import Path
24from typing import TYPE_CHECKING
26from agent.commands.manifest import SlashCommandError, SlashCommandManifest, parse_command_manifest
28if TYPE_CHECKING:
29 from agent.config.schema import AgentSettings
31logger = logging.getLogger(__name__)
34class SlashCommandLoader:
35 """Load slash commands from bundled and user directories.
37 Bundled commands are loaded from agent.commands package using
38 importlib.resources for wheel/zip compatibility. User commands
39 are loaded from ~/.osdu-agent/commands/ and can override bundled
40 commands with the same name.
42 Example:
43 >>> from agent.config import load_config
44 >>> config = load_config()
45 >>> loader = SlashCommandLoader(config)
46 >>> commands = loader.discover_commands()
47 >>> for cmd in commands:
48 ... print(f"/{cmd.name}: {cmd.description}")
49 """
51 def __init__(self, settings: "AgentSettings"):
52 """Initialize command loader.
54 Args:
55 settings: Agent settings with data_dir for user commands location
56 """
57 self.settings = settings
58 self.user_dir = Path(settings.agent_data_dir) / "commands"
60 def discover_commands(self) -> list[SlashCommandManifest]:
61 """Discover all available slash commands.
63 Loads bundled commands first, then user commands. User commands
64 with the same name override bundled commands.
66 Returns:
67 List of discovered command manifests
68 """
69 commands: dict[str, SlashCommandManifest] = {}
71 # Load bundled commands first
72 for cmd in self._load_bundled_commands():
73 commands[cmd.name] = cmd
75 # Load user commands (overrides bundled if same name)
76 for cmd in self._scan_directory(self.user_dir):
77 if cmd.name in commands:
78 logger.info(f"User command '{cmd.name}' overrides bundled command")
79 commands[cmd.name] = cmd
81 return list(commands.values())
83 def _load_bundled_commands(self) -> list[SlashCommandManifest]:
84 """Load bundled commands using importlib.resources.
86 Uses as_file() context manager for compatibility with zipped
87 packages/wheels where files aren't on disk.
89 Returns:
90 List of bundled command manifests
91 """
92 commands: list[SlashCommandManifest] = []
94 try:
95 package_files = files("agent.commands")
96 except ModuleNotFoundError:
97 logger.debug("No bundled commands package found")
98 return commands
100 try:
101 for item in package_files.iterdir():
102 # Skip non-markdown files and special files
103 if not item.name.endswith(".md"):
104 continue
105 if item.name.startswith("_"):
106 continue
108 try:
109 # Use as_file() for wheel/zip compatibility
110 with as_file(item) as path:
111 content = path.read_text(encoding="utf-8")
112 manifest = parse_command_manifest(content, item.name)
113 commands.append(manifest)
114 logger.debug(f"Loaded bundled command: {manifest.name}")
115 except SlashCommandError as e:
116 logger.warning(f"Failed to parse bundled command {item.name}: {e}")
117 except Exception as e:
118 logger.warning(f"Unexpected error loading bundled command {item.name}: {e}")
119 except Exception as e:
120 logger.warning(f"Failed to iterate bundled commands: {e}")
122 return commands
124 def _scan_directory(self, directory: Path) -> list[SlashCommandManifest]:
125 """Scan filesystem directory for .md command files.
127 Args:
128 directory: Directory to scan for command files
130 Returns:
131 List of command manifests from the directory
132 """
133 if not directory.exists():
134 return []
136 if not directory.is_dir():
137 logger.warning(f"Commands path is not a directory: {directory}")
138 return []
140 commands: list[SlashCommandManifest] = []
142 for item in directory.glob("*.md"):
143 # Skip special files
144 if item.name.startswith("_"):
145 continue
147 # Skip directories (shouldn't match glob but be safe)
148 if not item.is_file():
149 continue
151 # Skip symbolic links for security
152 if item.is_symlink():
153 logger.warning(f"Skipping symbolic link command file: {item}")
154 continue
156 try:
157 content = item.read_text(encoding="utf-8")
158 manifest = parse_command_manifest(content, item.name)
159 commands.append(manifest)
160 logger.debug(f"Loaded user command: {manifest.name}")
161 except SlashCommandError as e:
162 logger.warning(f"Failed to parse command {item}: {e}")
163 except Exception as e:
164 logger.warning(f"Unexpected error loading command {item}: {e}")
166 return commands
168 def load_command_by_name(self, name: str) -> SlashCommandManifest | None:
169 """Load a specific command by name.
171 Checks user directory first (for override), then bundled.
173 Args:
174 name: Command name (without leading /)
176 Returns:
177 Command manifest if found, None otherwise
178 """
179 # Check user directory first
180 user_file = self.user_dir / f"{name}.md"
181 if user_file.exists() and user_file.is_file():
182 try:
183 content = user_file.read_text(encoding="utf-8")
184 return parse_command_manifest(content, user_file.name)
185 except SlashCommandError as e:
186 logger.warning(f"Failed to parse user command {name}: {e}")
188 # Check bundled commands
189 try:
190 package_files = files("agent.commands")
191 bundled_file = package_files.joinpath(f"{name}.md")
192 with as_file(bundled_file) as path:
193 if path.exists():
194 content = path.read_text(encoding="utf-8")
195 return parse_command_manifest(content, f"{name}.md")
196 except (ModuleNotFoundError, FileNotFoundError, TypeError):
197 pass
198 except SlashCommandError as e:
199 logger.warning(f"Failed to parse bundled command {name}: {e}")
201 return None