Coverage for src / agent / skills / script_tools.py: 100%
111 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 14:30 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 14:30 +0000
1# Copyright 2025-2026 Microsoft Corporation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Script wrapper tools for executing PEP 723 scripts.
17Provides generic tools for progressive disclosure: script_list, script_help, script_run.
18These tools enable LLMs to discover and execute standalone scripts without loading
19their code into context.
20"""
22import asyncio
23import json as json_module
24from pathlib import Path
25from typing import Annotated, Any
27from pydantic import Field
29from agent.skills.security import normalize_script_name, normalize_skill_name
30from agent.tools.toolset import AgentToolset
33class ScriptToolset(AgentToolset):
34 """Generic wrapper tools for executing skill scripts.
36 Provides progressive disclosure: LLM can list scripts, get help, then execute.
37 Scripts are NOT loaded into context - only metadata is registered.
39 Example:
40 >>> scripts = {"osdu-quality": [{"name": "job_status", "path": Path("...")}]}
41 >>> toolset = ScriptToolset(config, scripts)
42 >>> tools = toolset.get_tools()
43 """
45 def __init__(self, config: Any, scripts: dict[str, list[dict[str, Any]]]):
46 """Initialize script toolset.
48 Args:
49 config: AgentSettings with script execution settings
50 scripts: Dict mapping skill names to script metadata lists
51 Format: {skill_name: [{"name": str, "path": Path}, ...]}
52 """
53 super().__init__(config)
54 self.scripts = scripts
56 # Execution safety limits (use config values with safe defaults)
57 self.timeout = getattr(config, "script_timeout", 60)
58 self.max_output = getattr(config, "max_script_output", 1048576)
59 self.max_args = 100 # Not configurable - hard security limit
60 self.max_args_length = 4096 # Not configurable - hard security limit
62 def get_tools(self) -> list:
63 """Get list of script wrapper tools."""
64 return [
65 self.script_list,
66 self.script_help,
67 self.script_run,
68 ]
70 @property
71 def script_count(self) -> int:
72 """Get total number of scripts across all skills."""
73 return sum(len(scripts) for scripts in self.scripts.values())
75 """
76 {
77 "name": "script_list",
78 "description": "List available scripts for skill or all skills. Returns script metadata with names and paths.",
79 "parameters": {
80 "type": "object",
81 "properties": {
82 "skill_name": {
83 "type": ["string", "null"],
84 "description": "Skill name, or None for all skills",
85 "default": null
86 }
87 },
88 "required": []
89 }
90 }
91 """
93 async def script_list(
94 self,
95 skill_name: Annotated[
96 str | None, Field(description="Skill name, or None for all skills")
97 ] = None,
98 ) -> dict:
99 """List available scripts for skill or all skills. Returns script metadata with names and paths."""
100 try:
101 if skill_name is not None:
102 # Normalize skill name
103 canonical = normalize_skill_name(skill_name)
105 if canonical not in self.scripts:
106 return self._create_error_response(
107 error="not_found", message=f"Skill '{skill_name}' not found"
108 )
110 # Return scripts for this skill only
111 scripts_list = [
112 {"name": s["name"], "path": str(s["path"])} for s in self.scripts[canonical]
113 ]
115 return self._create_success_response(
116 result={canonical: scripts_list},
117 message=f"Found {len(scripts_list)} scripts in '{skill_name}'",
118 )
119 else:
120 # Return all scripts
121 all_scripts = {}
122 total_count = 0
124 for skill, scripts in self.scripts.items():
125 scripts_list = [{"name": s["name"], "path": str(s["path"])} for s in scripts]
126 all_scripts[skill] = scripts_list
127 total_count += len(scripts_list)
129 return self._create_success_response(
130 result=all_scripts,
131 message=f"Found {total_count} scripts across {len(self.scripts)} skills",
132 )
134 except Exception as e:
135 return self._create_error_response(
136 error="execution_failed", message=f"Failed to list scripts: {e}"
137 )
139 """
140 {
141 "name": "script_help",
142 "description": "Get help for skill script by running --help. Use to discover arguments and options before running. Returns help text.",
143 "parameters": {
144 "type": "object",
145 "properties": {
146 "skill_name": {
147 "type": "string",
148 "description": "Skill name (e.g., 'sample-skill')"
149 },
150 "script_name": {
151 "type": "string",
152 "description": "Script name (e.g., 'sample' or 'sample.py')"
153 }
154 },
155 "required": ["skill_name", "script_name"]
156 }
157 }
158 """
160 async def script_help(
161 self,
162 skill_name: Annotated[str, Field(description="Skill name (e.g., 'sample-skill')")],
163 script_name: Annotated[
164 str, Field(description="Script name (e.g., 'sample' or 'sample.py')")
165 ],
166 ) -> dict:
167 """Get help for skill script by running --help. Use to discover arguments and options before running. Returns help text."""
168 try:
169 # Normalize names
170 canonical_skill = normalize_skill_name(skill_name)
171 canonical_script = normalize_script_name(script_name)
173 # Find script path
174 script_path = self._find_script(canonical_skill, canonical_script)
175 if script_path is None:
176 return self._create_error_response(
177 error="not_found",
178 message=f"Script '{script_name}' not found in skill '{skill_name}'",
179 )
181 # Execute with --help
182 cmd = [self._get_uv_executable(), "run", str(script_path), "--help"]
184 result = await asyncio.create_subprocess_exec(
185 *cmd,
186 stdout=asyncio.subprocess.PIPE,
187 stderr=asyncio.subprocess.PIPE,
188 cwd=script_path.parent,
189 )
191 try:
192 stdout, stderr = await asyncio.wait_for(result.communicate(), timeout=self.timeout)
193 except TimeoutError:
194 result.kill()
195 return self._create_error_response(
196 error="timeout",
197 message=f"Script help timed out after {self.timeout}s",
198 )
200 stdout_text = stdout.decode("utf-8")
201 stderr_text = stderr.decode("utf-8")
203 if result.returncode != 0:
204 return self._create_error_response(
205 error="execution_failed",
206 message=f"Script help failed with exit code {result.returncode}\nstderr: {stderr_text[-500:]}",
207 )
209 return self._create_success_response(
210 result={"help_text": stdout_text, "usage": stdout_text},
211 message=f"Retrieved help for {script_name}",
212 )
214 except Exception as e:
215 return self._create_error_response(
216 error="execution_failed", message=f"Failed to get script help: {e}"
217 )
219 """
220 {
221 "name": "script_run",
222 "description": "Execute skill script with arguments. Most scripts support --json for structured output. Check --help first. Max 100 args. Returns script output.",
223 "parameters": {
224 "type": "object",
225 "properties": {
226 "skill_name": {
227 "type": "string",
228 "description": "Skill name"
229 },
230 "script_name": {
231 "type": "string",
232 "description": "Script name"
233 },
234 "args": {
235 "type": ["array", "null"],
236 "items": {"type": "string"},
237 "description": "Script arguments",
238 "default": null
239 },
240 "json_output": {
241 "type": "boolean",
242 "description": "Request JSON output",
243 "default": true
244 }
245 },
246 "required": ["skill_name", "script_name"]
247 }
248 }
249 """
251 async def script_run(
252 self,
253 skill_name: Annotated[str, Field(description="Skill name")],
254 script_name: Annotated[str, Field(description="Script name")],
255 args: Annotated[list[str] | None, Field(description="Script arguments")] = None,
256 json_output: Annotated[bool, Field(description="Request JSON output")] = True,
257 ) -> dict:
258 """Execute skill script with arguments. Most scripts support --json for structured output. Check --help first. Max 100 args. Returns script output."""
259 try:
260 # Normalize args
261 if args is None:
262 args = []
264 # Validate args limits
265 if len(args) > self.max_args:
266 return self._create_error_response(
267 error="args_too_large",
268 message=f"Too many arguments: {len(args)} (max {self.max_args})",
269 )
271 total_length = sum(len(arg) for arg in args)
272 if total_length > self.max_args_length:
273 return self._create_error_response(
274 error="args_too_large",
275 message=f"Arguments too large: {total_length} bytes (max {self.max_args_length})",
276 )
278 # Normalize names
279 canonical_skill = normalize_skill_name(skill_name)
280 canonical_script = normalize_script_name(script_name)
282 # Find script path
283 script_path = self._find_script(canonical_skill, canonical_script)
284 if script_path is None:
285 return self._create_error_response(
286 error="not_found",
287 message=f"Script '{script_name}' not found in skill '{skill_name}'",
288 )
290 # Build command
291 cmd = [self._get_uv_executable(), "run", str(script_path)] + args
292 if json_output:
293 cmd.append("--json")
295 # Execute script
296 result = await asyncio.create_subprocess_exec(
297 *cmd,
298 stdout=asyncio.subprocess.PIPE,
299 stderr=asyncio.subprocess.PIPE,
300 cwd=script_path.parent,
301 )
303 try:
304 stdout, stderr = await asyncio.wait_for(result.communicate(), timeout=self.timeout)
305 except TimeoutError:
306 result.kill()
307 return self._create_error_response(
308 error="timeout",
309 message=f"Script timed out after {self.timeout}s",
310 )
312 stdout_text = stdout.decode("utf-8")
313 stderr_text = stderr.decode("utf-8")
315 # Check output size
316 if len(stdout_text) > self.max_output:
317 stdout_text = stdout_text[: self.max_output]
318 stderr_text += f"\nWarning: Output truncated at {self.max_output} bytes"
320 # Handle non-zero exit code
321 if result.returncode != 0:
322 return self._create_error_response(
323 error="execution_failed",
324 message=f"Script failed with exit code {result.returncode}\nstderr: {stderr_text[-500:]}",
325 )
327 # Parse JSON if requested
328 if json_output:
329 try:
330 parsed = json_module.loads(stdout_text)
331 return self._create_success_response(
332 result=parsed, message=f"Executed {script_name} script"
333 )
334 except json_module.JSONDecodeError:
335 return self._create_error_response(
336 error="parse_error",
337 message=f"Expected JSON output, got: {stdout_text[:200]}\nstderr: {stderr_text[-500:]}",
338 )
339 else:
340 # Return plain text
341 return self._create_success_response(
342 result=stdout_text, message=f"Executed {script_name} script"
343 )
345 except Exception as e:
346 return self._create_error_response(
347 error="execution_failed", message=f"Failed to execute script: {e}"
348 )
350 def _find_script(self, canonical_skill: str, canonical_script: str) -> Path | None:
351 """Find script path by canonical skill and script names.
353 Args:
354 canonical_skill: Normalized skill name
355 canonical_script: Normalized script name (with .py extension)
357 Returns:
358 Path to script or None if not found
359 """
360 if canonical_skill not in self.scripts:
361 return None
363 script_stem = canonical_script.removesuffix(".py")
365 for script in self.scripts[canonical_skill]:
366 if script["name"] == script_stem:
367 script_path: Path = script["path"]
368 return script_path
370 return None
372 def _get_uv_executable(self) -> str:
373 """Get uv executable path.
375 Tries 'uv' first, falls back to 'python -m uv' if not on PATH.
377 Returns:
378 uv executable path or fallback command
379 """
380 # Try 'uv' directly (most common case)
381 # For now, assume 'uv' is on PATH
382 # Phase 2: Add Windows fallback to sys.executable -m uv
383 return "uv"