Coverage for src / agent / skills / script_tools.py: 100%

111 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-11 14:30 +0000

1# Copyright 2025-2026 Microsoft Corporation 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Script wrapper tools for executing PEP 723 scripts. 

16 

17Provides generic tools for progressive disclosure: script_list, script_help, script_run. 

18These tools enable LLMs to discover and execute standalone scripts without loading 

19their code into context. 

20""" 

21 

22import asyncio 

23import json as json_module 

24from pathlib import Path 

25from typing import Annotated, Any 

26 

27from pydantic import Field 

28 

29from agent.skills.security import normalize_script_name, normalize_skill_name 

30from agent.tools.toolset import AgentToolset 

31 

32 

33class ScriptToolset(AgentToolset): 

34 """Generic wrapper tools for executing skill scripts. 

35 

36 Provides progressive disclosure: LLM can list scripts, get help, then execute. 

37 Scripts are NOT loaded into context - only metadata is registered. 

38 

39 Example: 

40 >>> scripts = {"osdu-quality": [{"name": "job_status", "path": Path("...")}]} 

41 >>> toolset = ScriptToolset(config, scripts) 

42 >>> tools = toolset.get_tools() 

43 """ 

44 

45 def __init__(self, config: Any, scripts: dict[str, list[dict[str, Any]]]): 

46 """Initialize script toolset. 

47 

48 Args: 

49 config: AgentSettings with script execution settings 

50 scripts: Dict mapping skill names to script metadata lists 

51 Format: {skill_name: [{"name": str, "path": Path}, ...]} 

52 """ 

53 super().__init__(config) 

54 self.scripts = scripts 

55 

56 # Execution safety limits (use config values with safe defaults) 

57 self.timeout = getattr(config, "script_timeout", 60) 

58 self.max_output = getattr(config, "max_script_output", 1048576) 

59 self.max_args = 100 # Not configurable - hard security limit 

60 self.max_args_length = 4096 # Not configurable - hard security limit 

61 

62 def get_tools(self) -> list: 

63 """Get list of script wrapper tools.""" 

64 return [ 

65 self.script_list, 

66 self.script_help, 

67 self.script_run, 

68 ] 

69 

70 @property 

71 def script_count(self) -> int: 

72 """Get total number of scripts across all skills.""" 

73 return sum(len(scripts) for scripts in self.scripts.values()) 

74 

75 """ 

76 { 

77 "name": "script_list", 

78 "description": "List available scripts for skill or all skills. Returns script metadata with names and paths.", 

79 "parameters": { 

80 "type": "object", 

81 "properties": { 

82 "skill_name": { 

83 "type": ["string", "null"], 

84 "description": "Skill name, or None for all skills", 

85 "default": null 

86 } 

87 }, 

88 "required": [] 

89 } 

90 } 

91 """ 

92 

93 async def script_list( 

94 self, 

95 skill_name: Annotated[ 

96 str | None, Field(description="Skill name, or None for all skills") 

97 ] = None, 

98 ) -> dict: 

99 """List available scripts for skill or all skills. Returns script metadata with names and paths.""" 

100 try: 

101 if skill_name is not None: 

102 # Normalize skill name 

103 canonical = normalize_skill_name(skill_name) 

104 

105 if canonical not in self.scripts: 

106 return self._create_error_response( 

107 error="not_found", message=f"Skill '{skill_name}' not found" 

108 ) 

109 

110 # Return scripts for this skill only 

111 scripts_list = [ 

112 {"name": s["name"], "path": str(s["path"])} for s in self.scripts[canonical] 

113 ] 

114 

115 return self._create_success_response( 

116 result={canonical: scripts_list}, 

117 message=f"Found {len(scripts_list)} scripts in '{skill_name}'", 

118 ) 

119 else: 

120 # Return all scripts 

121 all_scripts = {} 

122 total_count = 0 

123 

124 for skill, scripts in self.scripts.items(): 

125 scripts_list = [{"name": s["name"], "path": str(s["path"])} for s in scripts] 

126 all_scripts[skill] = scripts_list 

127 total_count += len(scripts_list) 

128 

129 return self._create_success_response( 

130 result=all_scripts, 

131 message=f"Found {total_count} scripts across {len(self.scripts)} skills", 

132 ) 

133 

134 except Exception as e: 

135 return self._create_error_response( 

136 error="execution_failed", message=f"Failed to list scripts: {e}" 

137 ) 

138 

139 """ 

140 { 

141 "name": "script_help", 

142 "description": "Get help for skill script by running --help. Use to discover arguments and options before running. Returns help text.", 

143 "parameters": { 

144 "type": "object", 

145 "properties": { 

146 "skill_name": { 

147 "type": "string", 

148 "description": "Skill name (e.g., 'sample-skill')" 

149 }, 

150 "script_name": { 

151 "type": "string", 

152 "description": "Script name (e.g., 'sample' or 'sample.py')" 

153 } 

154 }, 

155 "required": ["skill_name", "script_name"] 

156 } 

157 } 

158 """ 

159 

160 async def script_help( 

161 self, 

162 skill_name: Annotated[str, Field(description="Skill name (e.g., 'sample-skill')")], 

163 script_name: Annotated[ 

164 str, Field(description="Script name (e.g., 'sample' or 'sample.py')") 

165 ], 

166 ) -> dict: 

167 """Get help for skill script by running --help. Use to discover arguments and options before running. Returns help text.""" 

168 try: 

169 # Normalize names 

170 canonical_skill = normalize_skill_name(skill_name) 

171 canonical_script = normalize_script_name(script_name) 

172 

173 # Find script path 

174 script_path = self._find_script(canonical_skill, canonical_script) 

175 if script_path is None: 

176 return self._create_error_response( 

177 error="not_found", 

178 message=f"Script '{script_name}' not found in skill '{skill_name}'", 

179 ) 

180 

181 # Execute with --help 

182 cmd = [self._get_uv_executable(), "run", str(script_path), "--help"] 

183 

184 result = await asyncio.create_subprocess_exec( 

185 *cmd, 

186 stdout=asyncio.subprocess.PIPE, 

187 stderr=asyncio.subprocess.PIPE, 

188 cwd=script_path.parent, 

189 ) 

190 

191 try: 

192 stdout, stderr = await asyncio.wait_for(result.communicate(), timeout=self.timeout) 

193 except TimeoutError: 

194 result.kill() 

195 return self._create_error_response( 

196 error="timeout", 

197 message=f"Script help timed out after {self.timeout}s", 

198 ) 

199 

200 stdout_text = stdout.decode("utf-8") 

201 stderr_text = stderr.decode("utf-8") 

202 

203 if result.returncode != 0: 

204 return self._create_error_response( 

205 error="execution_failed", 

206 message=f"Script help failed with exit code {result.returncode}\nstderr: {stderr_text[-500:]}", 

207 ) 

208 

209 return self._create_success_response( 

210 result={"help_text": stdout_text, "usage": stdout_text}, 

211 message=f"Retrieved help for {script_name}", 

212 ) 

213 

214 except Exception as e: 

215 return self._create_error_response( 

216 error="execution_failed", message=f"Failed to get script help: {e}" 

217 ) 

218 

219 """ 

220 { 

221 "name": "script_run", 

222 "description": "Execute skill script with arguments. Most scripts support --json for structured output. Check --help first. Max 100 args. Returns script output.", 

223 "parameters": { 

224 "type": "object", 

225 "properties": { 

226 "skill_name": { 

227 "type": "string", 

228 "description": "Skill name" 

229 }, 

230 "script_name": { 

231 "type": "string", 

232 "description": "Script name" 

233 }, 

234 "args": { 

235 "type": ["array", "null"], 

236 "items": {"type": "string"}, 

237 "description": "Script arguments", 

238 "default": null 

239 }, 

240 "json_output": { 

241 "type": "boolean", 

242 "description": "Request JSON output", 

243 "default": true 

244 } 

245 }, 

246 "required": ["skill_name", "script_name"] 

247 } 

248 } 

249 """ 

250 

251 async def script_run( 

252 self, 

253 skill_name: Annotated[str, Field(description="Skill name")], 

254 script_name: Annotated[str, Field(description="Script name")], 

255 args: Annotated[list[str] | None, Field(description="Script arguments")] = None, 

256 json_output: Annotated[bool, Field(description="Request JSON output")] = True, 

257 ) -> dict: 

258 """Execute skill script with arguments. Most scripts support --json for structured output. Check --help first. Max 100 args. Returns script output.""" 

259 try: 

260 # Normalize args 

261 if args is None: 

262 args = [] 

263 

264 # Validate args limits 

265 if len(args) > self.max_args: 

266 return self._create_error_response( 

267 error="args_too_large", 

268 message=f"Too many arguments: {len(args)} (max {self.max_args})", 

269 ) 

270 

271 total_length = sum(len(arg) for arg in args) 

272 if total_length > self.max_args_length: 

273 return self._create_error_response( 

274 error="args_too_large", 

275 message=f"Arguments too large: {total_length} bytes (max {self.max_args_length})", 

276 ) 

277 

278 # Normalize names 

279 canonical_skill = normalize_skill_name(skill_name) 

280 canonical_script = normalize_script_name(script_name) 

281 

282 # Find script path 

283 script_path = self._find_script(canonical_skill, canonical_script) 

284 if script_path is None: 

285 return self._create_error_response( 

286 error="not_found", 

287 message=f"Script '{script_name}' not found in skill '{skill_name}'", 

288 ) 

289 

290 # Build command 

291 cmd = [self._get_uv_executable(), "run", str(script_path)] + args 

292 if json_output: 

293 cmd.append("--json") 

294 

295 # Execute script 

296 result = await asyncio.create_subprocess_exec( 

297 *cmd, 

298 stdout=asyncio.subprocess.PIPE, 

299 stderr=asyncio.subprocess.PIPE, 

300 cwd=script_path.parent, 

301 ) 

302 

303 try: 

304 stdout, stderr = await asyncio.wait_for(result.communicate(), timeout=self.timeout) 

305 except TimeoutError: 

306 result.kill() 

307 return self._create_error_response( 

308 error="timeout", 

309 message=f"Script timed out after {self.timeout}s", 

310 ) 

311 

312 stdout_text = stdout.decode("utf-8") 

313 stderr_text = stderr.decode("utf-8") 

314 

315 # Check output size 

316 if len(stdout_text) > self.max_output: 

317 stdout_text = stdout_text[: self.max_output] 

318 stderr_text += f"\nWarning: Output truncated at {self.max_output} bytes" 

319 

320 # Handle non-zero exit code 

321 if result.returncode != 0: 

322 return self._create_error_response( 

323 error="execution_failed", 

324 message=f"Script failed with exit code {result.returncode}\nstderr: {stderr_text[-500:]}", 

325 ) 

326 

327 # Parse JSON if requested 

328 if json_output: 

329 try: 

330 parsed = json_module.loads(stdout_text) 

331 return self._create_success_response( 

332 result=parsed, message=f"Executed {script_name} script" 

333 ) 

334 except json_module.JSONDecodeError: 

335 return self._create_error_response( 

336 error="parse_error", 

337 message=f"Expected JSON output, got: {stdout_text[:200]}\nstderr: {stderr_text[-500:]}", 

338 ) 

339 else: 

340 # Return plain text 

341 return self._create_success_response( 

342 result=stdout_text, message=f"Executed {script_name} script" 

343 ) 

344 

345 except Exception as e: 

346 return self._create_error_response( 

347 error="execution_failed", message=f"Failed to execute script: {e}" 

348 ) 

349 

350 def _find_script(self, canonical_skill: str, canonical_script: str) -> Path | None: 

351 """Find script path by canonical skill and script names. 

352 

353 Args: 

354 canonical_skill: Normalized skill name 

355 canonical_script: Normalized script name (with .py extension) 

356 

357 Returns: 

358 Path to script or None if not found 

359 """ 

360 if canonical_skill not in self.scripts: 

361 return None 

362 

363 script_stem = canonical_script.removesuffix(".py") 

364 

365 for script in self.scripts[canonical_skill]: 

366 if script["name"] == script_stem: 

367 script_path: Path = script["path"] 

368 return script_path 

369 

370 return None 

371 

372 def _get_uv_executable(self) -> str: 

373 """Get uv executable path. 

374 

375 Tries 'uv' first, falls back to 'python -m uv' if not on PATH. 

376 

377 Returns: 

378 uv executable path or fallback command 

379 """ 

380 # Try 'uv' directly (most common case) 

381 # For now, assume 'uv' is on PATH 

382 # Phase 2: Add Windows fallback to sys.executable -m uv 

383 return "uv"