Coverage for src / agent / skills / loader.py: 98%
168 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 14:30 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 14:30 +0000
1# Copyright 2025-2026 Microsoft Corporation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Skill loader for discovering and loading skills.
17This module handles skill discovery, manifest parsing, script metadata collection,
18and dynamic toolset importing.
19"""
21import importlib.util
22import logging
23from pathlib import Path
24from typing import TYPE_CHECKING, Any
26from agent.skills.errors import SkillManifestError
27from agent.skills.manifest import SkillManifest, parse_skill_manifest
28from agent.skills.registry import SkillRegistry
29from agent.skills.security import normalize_script_name, normalize_skill_name
30from agent.tools.toolset import AgentToolset
32if TYPE_CHECKING:
33 from agent.skills.documentation_index import SkillDocumentationIndex
35logger = logging.getLogger(__name__)
38class SkillLoader:
39 """Load and manage skills for the agent.
41 Handles skill discovery, manifest parsing, toolset instantiation,
42 and script metadata collection.
44 Example:
45 >>> from agent.config import load_config
46 >>> config = load_config()
47 >>> loader = SkillLoader(config)
48 >>> toolsets, script_tools, skill_instructions = loader.load_enabled_skills()
49 """
51 def __init__(self, config: Any):
52 """Initialize skill loader.
54 Args:
55 config: AgentSettings with skill paths and enabled skills list
56 """
57 self.config = config
58 self.registry = SkillRegistry()
59 self._loaded_scripts: dict[str, list[dict[str, Any]]] = {}
61 def scan_skill_directory(self, directory: Path) -> list[Path]:
62 """Scan directory for skills (SKILL.md files).
64 Args:
65 directory: Directory to scan for skills
67 Returns:
68 List of skill directory paths containing SKILL.md
69 """
70 if not directory.exists():
71 return []
73 skill_dirs = []
74 for item in directory.iterdir():
75 if not item.is_dir():
76 continue
78 skill_md = item / "SKILL.md"
79 if skill_md.exists() and skill_md.is_file():
80 skill_dirs.append(item)
82 return skill_dirs
84 def discover_scripts(self, skill_path: Path, manifest: SkillManifest) -> list[dict[str, Any]]:
85 """Discover scripts in skill's scripts/ directory.
87 Args:
88 skill_path: Path to skill directory
89 manifest: Parsed SKILL.md manifest
91 Returns:
92 List of script metadata dicts with 'name' and 'path' keys
93 """
94 scripts_dir = skill_path / "scripts"
95 if not scripts_dir.exists() or not scripts_dir.is_dir():
96 return []
98 scripts = []
100 # If manifest specifies scripts explicitly, use those
101 if manifest.scripts is not None:
102 for script_name in manifest.scripts:
103 # SECURITY: Reject script names with path separators (prevent traversal)
104 if "/" in script_name or "\\" in script_name or ".." in script_name:
105 logger.error(
106 f"Security: Rejected script '{script_name}' with path separators "
107 f"in skill '{manifest.name}'"
108 )
109 continue
111 # Normalize script name (add .py if missing)
112 normalized = normalize_script_name(script_name)
113 script_path = scripts_dir / normalized
115 # SECURITY: Verify script is actually within scripts_dir
116 try:
117 script_path.resolve().relative_to(scripts_dir.resolve())
118 except ValueError:
119 logger.error(
120 f"Security: Rejected script '{script_name}' - "
121 f"path escapes scripts directory in skill '{manifest.name}'"
122 )
123 continue
125 if script_path.exists() and script_path.is_file():
126 scripts.append({"name": normalized.removesuffix(".py"), "path": script_path})
127 else:
128 logger.warning(
129 f"Script '{script_name}' listed in manifest but not found: {script_path}"
130 )
131 else:
132 # Auto-discover: scan for *.py files, excluding patterns
133 for script_file in scripts_dir.glob("*.py"):
134 # Skip if matches ignore patterns
135 if self._should_ignore_script(script_file, manifest.scripts_ignore):
136 continue
138 # Skip non-files (shouldn't happen with glob but be safe)
139 if not script_file.is_file():
140 continue
142 # Skip symbolic links for security
143 if script_file.is_symlink():
144 logger.warning(f"Skipping symbolic link script: {script_file}")
145 continue
147 script_name = script_file.stem # Remove .py extension
148 scripts.append({"name": script_name, "path": script_file})
150 return scripts
152 def _should_ignore_script(self, script_path: Path, ignore_patterns: list[str]) -> bool:
153 """Check if script matches any ignore pattern.
155 Args:
156 script_path: Path to script file
157 ignore_patterns: List of glob patterns to exclude
159 Returns:
160 True if script should be ignored
161 """
162 for pattern in ignore_patterns:
163 if script_path.match(pattern):
164 return True
165 return False
167 def _import_toolset(
168 self, skill_path: Path, skill_name: str, toolset_def: str
169 ) -> AgentToolset | None:
170 """Dynamically import and instantiate a toolset class.
172 Args:
173 skill_path: Path to skill directory
174 skill_name: Skill name (for logging)
175 toolset_def: Toolset definition in "module:Class" format
177 Returns:
178 Instantiated toolset or None if import failed
179 """
180 try:
181 # Parse "module:Class" format
182 if ":" not in toolset_def:
183 logger.error(f"Invalid toolset format '{toolset_def}' in skill '{skill_name}'")
184 return None
186 module_path, class_name = toolset_def.split(":", 1)
188 # Convert module path to file path (e.g., "toolsets.hello" -> "toolsets/hello.py")
189 file_path = skill_path / f"{module_path.replace('.', '/')}.py"
191 if not file_path.exists():
192 logger.error(f"Toolset file not found: {file_path}")
193 return None
195 # Dynamic import
196 spec = importlib.util.spec_from_file_location(
197 f"skill.{skill_name}.{module_path}", file_path
198 )
199 if spec is None or spec.loader is None:
200 logger.error(f"Failed to create module spec for {file_path}")
201 return None
203 module = importlib.util.module_from_spec(spec)
204 spec.loader.exec_module(module)
206 # Get class and validate it's an AgentToolset
207 if not hasattr(module, class_name):
208 logger.error(f"Class '{class_name}' not found in {file_path}")
209 return None
211 toolset_class = getattr(module, class_name)
213 if not issubclass(toolset_class, AgentToolset):
214 logger.error(f"Class '{class_name}' must inherit from AgentToolset")
215 return None
217 # Instantiate with config
218 instance: AgentToolset = toolset_class(self.config)
219 return instance
221 except Exception as e:
222 logger.error(f"Failed to import toolset '{toolset_def}' from skill '{skill_name}': {e}")
223 return None
225 def load_skill(self, skill_path: Path) -> tuple[SkillManifest, list[AgentToolset], list[dict]]:
226 """Load a single skill from a directory.
228 Args:
229 skill_path: Path to skill directory containing SKILL.md
231 Returns:
232 Tuple of (manifest, toolset_instances, script_metadata)
234 Raises:
235 SkillManifestError: If manifest is invalid
236 """
237 # Parse manifest
238 manifest = parse_skill_manifest(skill_path)
240 # Load toolsets (if any)
241 toolsets = []
242 for toolset_def in manifest.toolsets:
243 toolset = self._import_toolset(skill_path, manifest.name, toolset_def)
244 if toolset is not None:
245 toolsets.append(toolset)
247 # Discover scripts (metadata only, don't load code)
248 scripts = self.discover_scripts(skill_path, manifest)
250 return manifest, toolsets, scripts
252 def load_enabled_skills(self) -> tuple[list[AgentToolset], Any, "SkillDocumentationIndex"]:
253 """Load all enabled skills based on configuration.
255 This is the main entry point called by Agent.__init__().
257 Behavior:
258 - Auto-discovers all bundled skills (opt-out via disabled_bundled)
259 - Loads plugin skills from config.skills.plugins (only if enabled=true)
261 Returns:
262 Tuple of (skill_toolsets, script_wrapper_toolset, skill_documentation_index)
264 Raises:
265 SkillError: If critical skill loading fails
266 """
267 # Get skills config
268 skills_config = self.config.skills
270 # Get user overrides for bundled skills (three-state logic)
271 disabled_bundled = getattr(skills_config, "disabled_bundled", [])
272 enabled_bundled = getattr(skills_config, "enabled_bundled", [])
274 # Ensure lists (handle Mock objects in tests)
275 if not isinstance(disabled_bundled, list):
276 disabled_bundled = []
277 if not isinstance(enabled_bundled, list):
278 enabled_bundled = []
280 # Normalize for matching
281 disabled_canonical = {normalize_skill_name(name) for name in disabled_bundled}
282 enabled_canonical = {normalize_skill_name(name) for name in enabled_bundled}
284 # Collect all skill directories to scan
285 bundled_skill_dirs = []
286 plugin_skill_dirs = []
288 # 1. Auto-discover bundled skills (always scan unless explicitly disabled)
289 bundled_dir = getattr(skills_config, "bundled_dir", None)
290 if bundled_dir:
291 bundled_path = Path(bundled_dir) if isinstance(bundled_dir, str) else Path(bundled_dir)
292 if bundled_path.exists():
293 bundled_skill_dirs = self.scan_skill_directory(bundled_path)
294 logger.info(f"Auto-discovered {len(bundled_skill_dirs)} bundled skills")
296 # 2. Load enabled plugin skills from config.skills.plugins
297 plugins = getattr(skills_config, "plugins", [])
298 user_dir = getattr(skills_config, "user_dir", None)
300 for plugin in plugins:
301 if not plugin.enabled:
302 continue
304 # Try installed_path first, fall back to user_dir/name
305 if plugin.installed_path:
306 plugin_path = Path(plugin.installed_path)
307 elif user_dir:
308 plugin_path = Path(user_dir) / normalize_skill_name(plugin.name)
309 else:
310 logger.warning(f"Plugin '{plugin.name}' has no installed_path and no user_dir set")
311 continue
313 if plugin_path.exists() and (plugin_path / "SKILL.md").exists():
314 plugin_skill_dirs.append(plugin_path)
315 else:
316 logger.warning(
317 f"Plugin skill '{plugin.name}' not found at {plugin_path}. "
318 f"Run: agent skill install {plugin.git_url}"
319 )
321 # Load all skills (bundled + plugins)
322 all_toolsets = []
323 all_scripts = {}
325 # Create documentation index for runtime context injection
326 from agent.skills.documentation_index import SkillDocumentationIndex
328 skill_docs = SkillDocumentationIndex()
330 for skill_dir in bundled_skill_dirs + plugin_skill_dirs:
331 try:
332 manifest, toolsets, scripts = self.load_skill(skill_dir)
333 canonical_name = normalize_skill_name(manifest.name)
335 # Three-state logic for bundled skills (plugins always enabled if in config)
336 is_bundled = skill_dir in bundled_skill_dirs
337 if is_bundled:
338 # User explicitly enabled (overrides default_enabled: false)
339 if canonical_name in enabled_canonical:
340 should_load = True
341 # User explicitly disabled (overrides default_enabled: true)
342 elif canonical_name in disabled_canonical:
343 should_load = False
344 # No user override - use manifest default
345 else:
346 should_load = manifest.default_enabled
348 if not should_load:
349 logger.info(
350 f"Skipping bundled skill '{manifest.name}' (default_enabled={manifest.default_enabled})"
351 )
352 continue
354 # Load the skill
355 all_toolsets.extend(toolsets)
356 if scripts:
357 all_scripts[canonical_name] = scripts
358 self._loaded_scripts[canonical_name] = scripts
360 # Add skill to documentation index for progressive disclosure
361 # Always add, even if instructions are empty - skill may have triggers/toolsets/scripts
362 skill_docs.add_skill(canonical_name, manifest)
364 logger.info(
365 f"Loaded {'bundled' if is_bundled else 'plugin'} skill '{manifest.name}': "
366 f"{len(toolsets)} toolsets, {len(scripts)} scripts"
367 )
369 except SkillManifestError as e:
370 logger.error(f"Failed to load skill from {skill_dir}: {e}")
371 continue
372 except Exception as e:
373 logger.error(f"Unexpected error loading skill from {skill_dir}: {e}", exc_info=True)
374 continue
376 # Create script wrapper toolset if we have scripts
377 script_wrapper = None
378 if all_scripts:
379 from agent.skills.script_tools import ScriptToolset
381 script_wrapper = ScriptToolset(self.config, all_scripts)
383 return all_toolsets, script_wrapper, skill_docs
385 def validate_dependencies(self, manifest: SkillManifest) -> None:
386 """Validate skill dependencies (for future use).
388 Currently a no-op. Phase 2 will add dependency checking for:
389 - min/max osdu-agent version compatibility
390 - Python package dependencies for toolsets
392 Args:
393 manifest: Skill manifest to validate
395 Raises:
396 SkillDependencyError: If dependencies are not met
397 """
398 # Phase 2: Check min/max_agent_base_version
399 # Phase 2: Check Python package availability for toolsets
400 pass