Coverage for src / agent / tools / filesystem.py: 79%

402 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-11 14:30 +0000

1# Copyright 2025-2026 Microsoft Corporation 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Filesystem tools for safe, sandboxed file operations. 

16 

17This module provides structured filesystem tools that enable agents to inspect 

18and modify files in a controlled workspace without exposing arbitrary OS shell 

19execution to the LLM. 

20 

21Key Features: 

22- Workspace sandboxing with path traversal protection 

23- Structured directory listing and file reading 

24- Text search with literal and regex support 

25- Guarded write operations (disabled by default) 

26- Surgical text editing with safety checks 

27- Cross-platform path handling 

28 

29All operations are sandboxed to workspace_root (defaults to current directory). 

30""" 

31 

32import logging 

33import os 

34import re 

35import tempfile 

36from pathlib import Path 

37from typing import Annotated, Any 

38 

39from pydantic import Field 

40 

41from agent.config.schema import AgentSettings 

42from agent.tools.toolset import AgentToolset 

43 

44logger = logging.getLogger(__name__) 

45 

46 

47class FileSystemTools(AgentToolset): 

48 """Filesystem tools for safe, sandboxed file operations. 

49 

50 This toolset provides structured file operations with security guarantees: 

51 - All paths must be under configured workspace_root 

52 - Path traversal attempts are blocked 

53 - Symlinks that escape workspace are rejected 

54 - Write operations are disabled by default 

55 - Size limits prevent resource exhaustion 

56 

57 Example: 

58 >>> from agent.config import load_config 

59 >>> settings = load_config() 

60 >>> settings.agent.workspace_root = Path("/home/user/project") 

61 >>> tools = FileSystemTools(settings) 

62 >>> result = await tools.list_directory(".") 

63 >>> print(result) 

64 {'success': True, 'result': {'entries': [...], 'truncated': False}} 

65 """ 

66 

67 def __init__(self, settings: AgentSettings): 

68 """Initialize FileSystemTools with settings. 

69 

70 Args: 

71 settings: Agent settings instance with workspace_root 

72 """ 

73 super().__init__(settings) 

74 self._workspace_root_cache: Path | None = None 

75 

76 def get_tools(self) -> list: 

77 """Get list of filesystem tools. 

78 

79 Returns: 

80 List of filesystem tool functions 

81 """ 

82 return [ 

83 self.get_path_info, 

84 self.list_directory, 

85 self.read_file, 

86 self.search_text, 

87 self.write_file, 

88 self.apply_text_edit, 

89 self.create_directory, 

90 ] 

91 

92 def _get_workspace_root(self) -> Path: 

93 """Get and cache workspace root from config, environment, or current directory. 

94 

95 Priority order: 

96 1. Cached value (for performance) 

97 2. config.workspace_root (from settings.json) 

98 3. AGENT_WORKSPACE_ROOT env var 

99 4. Path.cwd() (default fallback) 

100 

101 Returns: 

102 Resolved workspace root Path (always returns a valid Path) 

103 

104 Note: 

105 Logs warning if workspace is home directory or filesystem root, 

106 as these are risky locations that may expose more than intended. 

107 """ 

108 if self._workspace_root_cache is not None: 

109 return self._workspace_root_cache 

110 

111 workspace_root: Path | None = None 

112 

113 # Check config first (handle legacy configs without workspace_root attribute) 

114 if hasattr(self.config, "workspace_root") and self.config.workspace_root is not None: 

115 workspace_root = self.config.workspace_root 

116 # Check environment variable 

117 elif env_root := os.getenv("AGENT_WORKSPACE_ROOT"): 

118 workspace_root = Path(env_root).expanduser().resolve() 

119 # Default to current working directory 

120 else: 

121 workspace_root = Path.cwd().resolve() 

122 # Log warning for risky workspace locations 

123 if workspace_root == Path.home() or workspace_root == Path("/"): 

124 logger.warning( 

125 f"Workspace is set to {workspace_root}. Consider using a project directory " 

126 "or configuring workspace_root in ~/.osdu-agent/settings.json for better security." 

127 ) 

128 

129 self._workspace_root_cache = workspace_root 

130 return self._workspace_root_cache 

131 

132 def _resolve_path(self, relative_path: str) -> dict | Path: 

133 """Resolve and validate path within workspace boundaries. 

134 

135 This is the core security function that enforces workspace sandboxing. 

136 All filesystem tools MUST call this before any filesystem access. 

137 

138 Security checks: 

139 1. Workspace root is configured 

140 2. No path traversal attempts (../) 

141 3. No absolute paths outside workspace 

142 4. Symlinks don't escape workspace 

143 

144 Args: 

145 relative_path: Path relative to workspace root 

146 

147 Returns: 

148 Resolved Path object if valid, or error dict if validation fails 

149 

150 Example: 

151 >>> path = self._resolve_path("src/main.py") 

152 >>> if isinstance(path, dict): 

153 ... return path # Error response 

154 >>> # Use path for filesystem operation 

155 """ 

156 workspace_root = self._get_workspace_root() 

157 

158 # Validate workspace root exists and is a directory 

159 if not workspace_root.exists(): 

160 return self._create_error_response( 

161 error="workspace_not_found", 

162 message=f"Workspace root does not exist: {workspace_root}", 

163 ) 

164 

165 if not workspace_root.is_dir(): 

166 return self._create_error_response( 

167 error="workspace_not_directory", 

168 message=f"Workspace root is not a directory: {workspace_root}", 

169 ) 

170 

171 # Detect path traversal attempts before resolution 

172 if ".." in Path(relative_path).parts: 

173 logger.warning(f"Path traversal attempt detected: {relative_path}") 

174 return self._create_error_response( 

175 error="path_traversal_attempt", 

176 message=f"Path contains '..' component: {relative_path}. Path traversal is not allowed.", 

177 ) 

178 

179 # Convert to Path and combine with workspace root 

180 try: 

181 # Handle both relative and absolute paths 

182 requested_path = Path(relative_path) 

183 

184 if requested_path.is_absolute(): 

185 # For absolute paths, verify they start with workspace_root 

186 unresolved = requested_path 

187 resolved = requested_path.resolve() 

188 else: 

189 # For relative paths, combine with workspace_root 

190 unresolved = workspace_root / requested_path 

191 resolved = unresolved.resolve() 

192 

193 # Check if resolved path is within workspace 

194 try: 

195 # Use is_relative_to (Python 3.9+) to check boundaries 

196 if not resolved.is_relative_to(workspace_root): 

197 logger.warning( 

198 f"Path outside workspace: {relative_path} -> {resolved} " 

199 f"(workspace: {workspace_root})" 

200 ) 

201 return self._create_error_response( 

202 error="path_outside_workspace", 

203 message=f"Path resolves outside workspace: {relative_path}", 

204 ) 

205 except (ValueError, TypeError): 

206 # Fallback for edge cases 

207 return self._create_error_response( 

208 error="path_outside_workspace", 

209 message=f"Path resolves outside workspace: {relative_path}", 

210 ) 

211 

212 # Additional check: if symlink, verify target is in workspace 

213 if unresolved.is_symlink(): 

214 try: 

215 if not resolved.is_relative_to(workspace_root): 

216 logger.warning( 

217 f"Symlink target outside workspace: {relative_path} -> {resolved}" 

218 ) 

219 return self._create_error_response( 

220 error="symlink_outside_workspace", 

221 message=f"Symlink target is outside workspace: {relative_path}", 

222 ) 

223 except (ValueError, TypeError): 

224 return self._create_error_response( 

225 error="symlink_outside_workspace", 

226 message=f"Symlink target is outside workspace: {relative_path}", 

227 ) 

228 

229 logger.debug(f"Path resolved: {relative_path} -> {resolved}") 

230 return resolved 

231 

232 except (OSError, RuntimeError) as e: 

233 logger.error(f"Error resolving path {relative_path}: {e}") 

234 return self._create_error_response( 

235 error="path_resolution_failed", 

236 message=f"Failed to resolve path: {relative_path}. Error: {str(e)}", 

237 ) 

238 

239 """ 

240 { 

241 "name": "get_path_info", 

242 "description": "Get file/directory metadata within workspace. Returns exists, type, size, permissions, timestamps.", 

243 "parameters": { 

244 "type": "object", 

245 "properties": { 

246 "path": { 

247 "type": "string", 

248 "description": "Path relative to workspace root", 

249 "default": "." 

250 } 

251 }, 

252 "required": [] 

253 } 

254 } 

255 """ 

256 

257 async def get_path_info( 

258 self, path: Annotated[str, Field(description="Path relative to workspace root")] = "." 

259 ) -> dict: 

260 """Get file/directory metadata within workspace. Returns exists, type, size, permissions, timestamps.""" 

261 # Resolve and validate path 

262 resolved = self._resolve_path(path) 

263 if isinstance(resolved, dict): 

264 return resolved # Error response 

265 

266 # Get unresolved path for symlink detection 

267 workspace_root = self._get_workspace_root() 

268 requested_path = Path(path) 

269 if requested_path.is_absolute(): 

270 unresolved = requested_path 

271 else: 

272 unresolved = workspace_root / requested_path 

273 

274 # Gather metadata 

275 try: 

276 exists = resolved.exists() 

277 

278 if not exists: 

279 info: dict[str, Any] = { 

280 "exists": False, 

281 "type": None, 

282 "size": None, 

283 "modified": None, 

284 "is_readable": False, 

285 "is_writable": False, 

286 "absolute_path": str(resolved), 

287 } 

288 return self._create_success_response( 

289 result=info, message=f"Path does not exist: {path}" 

290 ) 

291 

292 # Determine type - check symlink before resolved types 

293 if unresolved.is_symlink(): 

294 path_type = "symlink" 

295 size = None 

296 elif resolved.is_file(): 

297 path_type = "file" 

298 size = resolved.stat().st_size 

299 elif resolved.is_dir(): 

300 path_type = "directory" 

301 size = None 

302 else: 

303 path_type = "other" 

304 size = None 

305 

306 # Get modification time 

307 modified = resolved.stat().st_mtime 

308 

309 # Check permissions 

310 is_readable = os.access(resolved, os.R_OK) 

311 is_writable = os.access(resolved, os.W_OK) 

312 

313 exists_info: dict[str, Any] = { 

314 "exists": True, 

315 "type": path_type, 

316 "size": size, 

317 "modified": modified, 

318 "is_readable": is_readable, 

319 "is_writable": is_writable, 

320 "absolute_path": str(resolved), 

321 } 

322 

323 return self._create_success_response( 

324 result=exists_info, message=f"Retrieved metadata for: {path}" 

325 ) 

326 

327 except PermissionError: 

328 return self._create_error_response( 

329 error="permission_denied", 

330 message=f"Permission denied accessing: {path}", 

331 ) 

332 except OSError as e: 

333 return self._create_error_response( 

334 error="os_error", message=f"OS error accessing {path}: {str(e)}" 

335 ) 

336 

337 """ 

338 { 

339 "name": "list_directory", 

340 "description": "List directory contents within workspace with metadata. Supports recursive traversal. Default: 200 entries max, excludes hidden files. Returns entries with type and size.", 

341 "parameters": { 

342 "type": "object", 

343 "properties": { 

344 "path": { 

345 "type": "string", 

346 "description": "Directory path relative to workspace", 

347 "default": "." 

348 }, 

349 "recursive": { 

350 "type": "boolean", 

351 "description": "Recursively list subdirectories", 

352 "default": false 

353 }, 

354 "max_entries": { 

355 "type": "integer", 

356 "description": "Maximum entries to return", 

357 "default": 200 

358 }, 

359 "include_hidden": { 

360 "type": "boolean", 

361 "description": "Include hidden files (dotfiles)", 

362 "default": false 

363 } 

364 }, 

365 "required": [] 

366 } 

367 } 

368 """ 

369 

370 async def list_directory( 

371 self, 

372 path: Annotated[str, Field(description="Directory path relative to workspace")] = ".", 

373 recursive: Annotated[bool, Field(description="Recursively list subdirectories")] = False, 

374 max_entries: Annotated[int, Field(description="Maximum entries to return")] = 200, 

375 include_hidden: Annotated[ 

376 bool, Field(description="Include hidden files (dotfiles)") 

377 ] = False, 

378 ) -> dict: 

379 """List directory contents within workspace with metadata. Supports recursive traversal. Default: 200 entries max, excludes hidden files. Returns entries with type and size.""" 

380 # Cap max_entries at 500 

381 max_entries = min(max_entries, 500) 

382 

383 # Resolve and validate path 

384 resolved = self._resolve_path(path) 

385 if isinstance(resolved, dict): 

386 return resolved # Error response 

387 

388 # Check if path exists and is a directory 

389 if not resolved.exists(): 

390 return self._create_error_response(error="not_found", message=f"Path not found: {path}") 

391 

392 if not resolved.is_dir(): 

393 return self._create_error_response( 

394 error="not_a_directory", message=f"Path is not a directory: {path}" 

395 ) 

396 

397 # Get workspace root for relative path calculation 

398 workspace_root = self._get_workspace_root() 

399 entries: list[dict[str, Any]] = [] 

400 truncated = False 

401 

402 try: 

403 if recursive: 

404 # Recursive walk 

405 for root, dirs, files in os.walk(resolved): 

406 root_path = Path(root) 

407 

408 # Filter hidden directories if needed 

409 if not include_hidden: 

410 dirs[:] = [d for d in dirs if not d.startswith(".")] 

411 

412 # Add directories 

413 for dir_name in dirs: 

414 if len(entries) >= max_entries: 

415 truncated = True 

416 break 

417 

418 dir_path = root_path / dir_name 

419 relative = dir_path.relative_to(workspace_root) 

420 

421 entries.append( 

422 { 

423 "name": dir_name, 

424 "relative_path": str(relative), 

425 "type": "directory", 

426 "size": None, 

427 } 

428 ) 

429 

430 if truncated: 

431 break 

432 

433 # Filter and add files 

434 if not include_hidden: 

435 files = [f for f in files if not f.startswith(".")] 

436 

437 for file_name in files: 

438 if len(entries) >= max_entries: 

439 truncated = True 

440 break 

441 

442 file_path = root_path / file_name 

443 relative = file_path.relative_to(workspace_root) 

444 

445 try: 

446 size = file_path.stat().st_size 

447 except OSError: 

448 size = None 

449 

450 entries.append( 

451 { 

452 "name": file_name, 

453 "relative_path": str(relative), 

454 "type": "file", 

455 "size": size, 

456 } 

457 ) 

458 

459 if truncated: 

460 break 

461 

462 else: 

463 # Non-recursive listing 

464 for entry in resolved.iterdir(): 

465 if len(entries) >= max_entries: 

466 truncated = True 

467 break 

468 

469 # Skip hidden files if requested 

470 if not include_hidden and entry.name.startswith("."): 

471 continue 

472 

473 relative = entry.relative_to(workspace_root) 

474 

475 if entry.is_dir(): 

476 entries.append( 

477 { 

478 "name": entry.name, 

479 "relative_path": str(relative), 

480 "type": "directory", 

481 "size": None, 

482 } 

483 ) 

484 elif entry.is_file(): 

485 try: 

486 size = entry.stat().st_size 

487 except OSError: 

488 size = None 

489 

490 entries.append( 

491 { 

492 "name": entry.name, 

493 "relative_path": str(relative), 

494 "type": "file", 

495 "size": size, 

496 } 

497 ) 

498 

499 result = {"entries": entries, "truncated": truncated} 

500 

501 return self._create_success_response( 

502 result=result, message=f"Listed {len(entries)} entries from: {path}" 

503 ) 

504 

505 except PermissionError: 

506 return self._create_error_response( 

507 error="permission_denied", message=f"Permission denied reading directory: {path}" 

508 ) 

509 except OSError as e: 

510 return self._create_error_response( 

511 error="os_error", message=f"OS error listing directory {path}: {str(e)}" 

512 ) 

513 

514 """ 

515 { 

516 "name": "read_file", 

517 "description": "Read text file within workspace by line range. Paths relative to workspace root. Default: first 200 lines. Returns content with truncation flag for large files.", 

518 "parameters": { 

519 "type": "object", 

520 "properties": { 

521 "path": { 

522 "type": "string", 

523 "description": "File path relative to workspace" 

524 }, 

525 "start_line": { 

526 "type": "integer", 

527 "description": "Starting line number (1-based)", 

528 "default": 1 

529 }, 

530 "max_lines": { 

531 "type": "integer", 

532 "description": "Maximum lines to read", 

533 "default": 200 

534 } 

535 }, 

536 "required": ["path"] 

537 } 

538 } 

539 """ 

540 

541 async def read_file( 

542 self, 

543 path: Annotated[str, Field(description="File path relative to workspace")], 

544 start_line: Annotated[int, Field(description="Starting line number (1-based)")] = 1, 

545 max_lines: Annotated[int, Field(description="Maximum lines to read")] = 200, 

546 ) -> dict: 

547 """Read text file within workspace by line range. Paths relative to workspace root. Default: first 200 lines. Returns content with truncation flag for large files.""" 

548 # Cap max_lines at 1000 

549 max_lines = min(max_lines, 1000) 

550 

551 # Resolve and validate path 

552 resolved = self._resolve_path(path) 

553 if isinstance(resolved, dict): 

554 return resolved # Error response 

555 

556 # Check if path exists and is a file 

557 if not resolved.exists(): 

558 return self._create_error_response(error="not_found", message=f"File not found: {path}") 

559 

560 if not resolved.is_file(): 

561 return self._create_error_response( 

562 error="not_a_file", message=f"Path is not a file: {path}" 

563 ) 

564 

565 # Check file size limit 

566 try: 

567 file_size = resolved.stat().st_size 

568 if file_size > self.config.filesystem_max_read_bytes: 

569 return self._create_error_response( 

570 error="file_too_large", 

571 message=f"File size ({file_size} bytes) exceeds max read limit " 

572 f"({self.config.filesystem_max_read_bytes} bytes): {path}", 

573 ) 

574 except OSError as e: 

575 return self._create_error_response( 

576 error="os_error", message=f"Error getting file size for {path}: {str(e)}" 

577 ) 

578 

579 # Detect binary files (check first 8KB for null bytes) 

580 try: 

581 with open(resolved, "rb") as f: 

582 sample = f.read(8192) 

583 if b"\x00" in sample: 

584 return self._create_error_response( 

585 error="is_binary", 

586 message=f"File appears to be binary (contains null bytes): {path}", 

587 ) 

588 except OSError as e: 

589 return self._create_error_response( 

590 error="permission_denied", message=f"Cannot read file {path}: {str(e)}" 

591 ) 

592 

593 # Read file contents 

594 try: 

595 with open(resolved, encoding="utf-8", errors="replace") as f: 

596 lines = f.readlines() 

597 

598 total_lines = len(lines) 

599 

600 # Check if start_line is valid 

601 if start_line < 1: 

602 start_line = 1 

603 

604 # Allow start_line beyond file length for empty files 

605 if total_lines > 0 and start_line > total_lines: 

606 return self._create_error_response( 

607 error="line_out_of_range", 

608 message=f"start_line ({start_line}) exceeds file length ({total_lines} lines): {path}", 

609 ) 

610 

611 # Calculate slice (convert to 0-based indexing) 

612 start_idx = start_line - 1 

613 end_idx = min(start_idx + max_lines, total_lines) 

614 

615 # Extract lines 

616 selected_lines = lines[start_idx:end_idx] 

617 content = "".join(selected_lines) 

618 

619 # Check if truncated 

620 truncated = end_idx < total_lines 

621 next_start_line = end_idx + 1 if truncated else None 

622 actual_end_line = end_idx # 1-based 

623 

624 # Check if encoding errors occurred (look for replacement character) 

625 encoding_errors = "\ufffd" in content 

626 

627 result = { 

628 "path": path, 

629 "start_line": start_line, 

630 "end_line": actual_end_line, 

631 "total_lines": total_lines, 

632 "truncated": truncated, 

633 "next_start_line": next_start_line, 

634 "content": content, 

635 "encoding_errors": encoding_errors, 

636 } 

637 

638 return self._create_success_response( 

639 result=result, 

640 message=f"Read {len(selected_lines)} lines from {path} (lines {start_line}-{actual_end_line})", 

641 ) 

642 

643 except PermissionError: 

644 return self._create_error_response( 

645 error="permission_denied", message=f"Permission denied reading file: {path}" 

646 ) 

647 except OSError as e: 

648 return self._create_error_response( 

649 error="os_error", message=f"Error reading file {path}: {str(e)}" 

650 ) 

651 

652 """ 

653 { 

654 "name": "search_text", 

655 "description": "Search text patterns across files in workspace. Supports literal (default) and regex modes. Case-sensitive by default. Max 50 matches. Returns matches with file, line, snippet.", 

656 "parameters": { 

657 "type": "object", 

658 "properties": { 

659 "query": { 

660 "type": "string", 

661 "description": "Search pattern (literal or regex)" 

662 }, 

663 "path": { 

664 "type": "string", 

665 "description": "Directory or file to search", 

666 "default": "." 

667 }, 

668 "glob": { 

669 "type": "string", 

670 "description": "File pattern (e.g., '*.py', 'src/**/*.ts')", 

671 "default": "**/*" 

672 }, 

673 "max_matches": { 

674 "type": "integer", 

675 "description": "Maximum matches to return", 

676 "default": 50 

677 }, 

678 "use_regex": { 

679 "type": "boolean", 

680 "description": "Enable regex mode", 

681 "default": false 

682 }, 

683 "case_sensitive": { 

684 "type": "boolean", 

685 "description": "Case-sensitive search", 

686 "default": true 

687 } 

688 }, 

689 "required": ["query"] 

690 } 

691 } 

692 """ 

693 

694 async def search_text( 

695 self, 

696 query: Annotated[str, Field(description="Search pattern (literal or regex)")], 

697 path: Annotated[str, Field(description="Directory or file to search")] = ".", 

698 glob: Annotated[ 

699 str, Field(description="File pattern (e.g., '*.py', 'src/**/*.ts')") 

700 ] = "**/*", 

701 max_matches: Annotated[int, Field(description="Maximum matches to return")] = 50, 

702 use_regex: Annotated[bool, Field(description="Enable regex mode")] = False, 

703 case_sensitive: Annotated[bool, Field(description="Case-sensitive search")] = True, 

704 ) -> dict: 

705 """Search text patterns across files in workspace. Supports literal (default) and regex modes. Case-sensitive by default. Max 50 matches. Returns matches with file, line, snippet.""" 

706 

707 # Resolve and validate path 

708 resolved = self._resolve_path(path) 

709 if isinstance(resolved, dict): 

710 return resolved # Error response 

711 

712 # Check if path exists 

713 if not resolved.exists(): 

714 return self._create_error_response(error="not_found", message=f"Path not found: {path}") 

715 

716 # Compile regex if needed 

717 regex_pattern = None 

718 if use_regex: 

719 try: 

720 flags = 0 if case_sensitive else re.IGNORECASE 

721 regex_pattern = re.compile(query, flags) 

722 except re.error as e: 

723 return self._create_error_response( 

724 error="invalid_regex", message=f"Invalid regex pattern '{query}': {str(e)}" 

725 ) 

726 

727 # Collect files to search 

728 files_to_search = [] 

729 workspace_root = self._get_workspace_root() 

730 

731 try: 

732 if resolved.is_file(): 

733 # Single file search 

734 files_to_search = [resolved] 

735 elif resolved.is_dir(): 

736 # Directory search with glob filtering 

737 # Use pathlib's glob matching instead of fnmatch for ** support 

738 if glob == "**/*" or glob == "*": 

739 # Match all files (optimization) 

740 for file_path in resolved.rglob("*"): 

741 if file_path.is_file(): 

742 files_to_search.append(file_path) 

743 else: 

744 # Use glob pattern matching 

745 for file_path in resolved.glob(glob): 

746 if file_path.is_file(): 

747 files_to_search.append(file_path) 

748 # Also try rglob if glob contains ** 

749 if "**" in glob: 

750 for file_path in resolved.glob(glob): 

751 if file_path.is_file() and file_path not in files_to_search: 

752 files_to_search.append(file_path) 

753 else: 

754 return self._create_error_response( 

755 error="invalid_path_type", message=f"Path is neither file nor directory: {path}" 

756 ) 

757 except (OSError, PermissionError) as e: 

758 return self._create_error_response( 

759 error="os_error", message=f"Error accessing path {path}: {str(e)}" 

760 ) 

761 

762 # Search files 

763 matches: list[dict[str, Any]] = [] 

764 files_searched = 0 

765 truncated = False 

766 

767 for file_path in files_to_search: 

768 if len(matches) >= max_matches: 

769 truncated = True 

770 break 

771 

772 files_searched += 1 

773 

774 try: 

775 # Skip binary files (check for null bytes in first 8KB) 

776 with open(file_path, "rb") as f: 

777 sample = f.read(8192) 

778 if b"\x00" in sample: 

779 continue # Skip binary file 

780 

781 # Search file contents 

782 with open(file_path, encoding="utf-8", errors="replace") as f: 

783 for line_num, line in enumerate(f, start=1): 

784 if len(matches) >= max_matches: 

785 truncated = True 

786 break 

787 

788 # Perform search 

789 if use_regex: 

790 # Regex search 

791 if regex_pattern is not None: 

792 match_obj = regex_pattern.search(line) 

793 else: 

794 match_obj = None 

795 if match_obj: 

796 match_start = match_obj.start() 

797 match_end = match_obj.end() 

798 else: 

799 continue 

800 else: 

801 # Literal search 

802 search_line = line if case_sensitive else line.lower() 

803 search_query = query if case_sensitive else query.lower() 

804 

805 match_start = search_line.find(search_query) 

806 if match_start == -1: 

807 continue 

808 match_end = match_start + len(search_query) 

809 

810 # Truncate snippet to 200 chars 

811 snippet = line.strip() 

812 if len(snippet) > 200: 

813 snippet = snippet[:200] + "..." 

814 

815 # Get relative path for result 

816 relative_path = file_path.relative_to(workspace_root) 

817 

818 matches.append( 

819 { 

820 "file": str(relative_path), 

821 "line": line_num, 

822 "snippet": snippet, 

823 "match_start": match_start, 

824 "match_end": match_end, 

825 } 

826 ) 

827 

828 except (OSError, PermissionError): 

829 # Skip files we can't read 

830 continue 

831 except Exception as e: 

832 # Log unexpected errors but continue searching 

833 logger.warning(f"Unexpected error searching {file_path}: {e}") 

834 continue 

835 

836 result = { 

837 "query": query, 

838 "use_regex": use_regex, 

839 "files_searched": files_searched, 

840 "matches": matches, 

841 "truncated": truncated, 

842 } 

843 

844 return self._create_success_response( 

845 result=result, 

846 message=f"Found {len(matches)} matches in {files_searched} files", 

847 ) 

848 

849 """ 

850 { 

851 "name": "write_file", 

852 "description": "Write file within workspace with safety checks. Requires filesystem_writes_enabled. Supports create/overwrite/append modes. Returns bytes written and mode used.", 

853 "parameters": { 

854 "type": "object", 

855 "properties": { 

856 "path": { 

857 "type": "string", 

858 "description": "File path relative to workspace" 

859 }, 

860 "content": { 

861 "type": "string", 

862 "description": "Content to write" 

863 }, 

864 "mode": { 

865 "type": "string", 

866 "description": "Write mode: create, overwrite, append", 

867 "default": "create" 

868 } 

869 }, 

870 "required": ["path", "content"] 

871 } 

872 } 

873 """ 

874 

875 async def write_file( 

876 self, 

877 path: Annotated[str, Field(description="File path relative to workspace")], 

878 content: Annotated[str, Field(description="Content to write")], 

879 mode: Annotated[str, Field(description="Write mode: create, overwrite, append")] = "create", 

880 ) -> dict: 

881 """Write file within workspace with safety checks. Requires filesystem_writes_enabled. Supports create/overwrite/append modes. Returns bytes written and mode used.""" 

882 # Check if writes are enabled 

883 if not self.config.filesystem_writes_enabled: 

884 return self._create_error_response( 

885 error="writes_disabled", 

886 message="Filesystem writes are disabled. Set filesystem_writes_enabled=true in configuration.", 

887 ) 

888 

889 # Validate mode 

890 valid_modes = ["create", "overwrite", "append"] 

891 if mode not in valid_modes: 

892 return self._create_error_response( 

893 error="invalid_mode", 

894 message=f"Invalid mode '{mode}'. Valid modes: {', '.join(valid_modes)}", 

895 ) 

896 

897 # Check content size limit 

898 content_bytes = len(content.encode("utf-8")) 

899 if content_bytes > self.config.filesystem_max_write_bytes: 

900 return self._create_error_response( 

901 error="write_too_large", 

902 message=f"Content size ({content_bytes} bytes) exceeds max write limit " 

903 f"({self.config.filesystem_max_write_bytes} bytes)", 

904 ) 

905 

906 # Resolve and validate path 

907 resolved = self._resolve_path(path) 

908 if isinstance(resolved, dict): 

909 return resolved # Error response 

910 

911 # Check mode-specific preconditions 

912 existed_before = resolved.exists() 

913 

914 if mode == "create" and existed_before: 

915 return self._create_error_response( 

916 error="file_exists", 

917 message=f"File already exists (mode=create): {path}. Use mode='overwrite' to replace or mode='append' to add content.", 

918 ) 

919 

920 if mode == "overwrite" and not existed_before: 

921 # Allow overwrite to create new file (like mode='create') 

922 pass 

923 

924 # Perform write operation 

925 try: 

926 if mode == "append": 

927 # Append mode 

928 with open(resolved, "a", encoding="utf-8") as f: 

929 f.write(content) 

930 else: 

931 # Create or overwrite mode 

932 with open(resolved, "w", encoding="utf-8") as f: 

933 f.write(content) 

934 

935 result = { 

936 "path": path, 

937 "bytes_written": content_bytes, 

938 "mode": mode, 

939 "existed_before": existed_before, 

940 } 

941 

942 return self._create_success_response( 

943 result=result, message=f"Wrote {content_bytes} bytes to {path} (mode={mode})" 

944 ) 

945 

946 except PermissionError: 

947 return self._create_error_response( 

948 error="permission_denied", message=f"Permission denied writing to: {path}" 

949 ) 

950 except OSError as e: 

951 return self._create_error_response( 

952 error="os_error", message=f"Error writing to {path}: {str(e)}" 

953 ) 

954 

955 """ 

956 { 

957 "name": "apply_text_edit", 

958 "description": "Apply exact text replacement in file within workspace. Requires filesystem_writes_enabled and exact match. Use replace_all for multiple occurrences. Returns replacement count and size delta.", 

959 "parameters": { 

960 "type": "object", 

961 "properties": { 

962 "path": { 

963 "type": "string", 

964 "description": "File path relative to workspace" 

965 }, 

966 "expected_text": { 

967 "type": "string", 

968 "description": "Exact text to find and replace" 

969 }, 

970 "replacement_text": { 

971 "type": "string", 

972 "description": "Replacement text" 

973 }, 

974 "replace_all": { 

975 "type": "boolean", 

976 "description": "Replace all occurrences", 

977 "default": false 

978 } 

979 }, 

980 "required": ["path", "expected_text", "replacement_text"] 

981 } 

982 } 

983 """ 

984 

985 async def apply_text_edit( 

986 self, 

987 path: Annotated[str, Field(description="File path relative to workspace")], 

988 expected_text: Annotated[str, Field(description="Exact text to find and replace")], 

989 replacement_text: Annotated[str, Field(description="Replacement text")], 

990 replace_all: Annotated[bool, Field(description="Replace all occurrences")] = False, 

991 ) -> dict: 

992 """Apply exact text replacement in file within workspace. Requires filesystem_writes_enabled and exact match. Use replace_all for multiple occurrences. Returns replacement count and size delta.""" 

993 # Check if writes are enabled 

994 if not self.config.filesystem_writes_enabled: 

995 return self._create_error_response( 

996 error="writes_disabled", 

997 message="Filesystem writes are disabled. Set filesystem_writes_enabled=true in configuration.", 

998 ) 

999 

1000 # Validate expected_text 

1001 if not expected_text: 

1002 return self._create_error_response( 

1003 error="empty_expected_text", 

1004 message="expected_text cannot be empty. Provide exact text to match.", 

1005 ) 

1006 

1007 # Resolve and validate path 

1008 resolved = self._resolve_path(path) 

1009 if isinstance(resolved, dict): 

1010 return resolved # Error response 

1011 

1012 # Check if file exists and is a regular file 

1013 if not resolved.exists(): 

1014 return self._create_error_response(error="not_found", message=f"File not found: {path}") 

1015 

1016 if not resolved.is_file(): 

1017 return self._create_error_response( 

1018 error="not_a_file", message=f"Path is not a file: {path}" 

1019 ) 

1020 

1021 # Read file contents 

1022 try: 

1023 with open(resolved, encoding="utf-8", errors="replace") as f: 

1024 original_content = f.read() 

1025 

1026 original_size = len(original_content.encode("utf-8")) 

1027 

1028 except PermissionError: 

1029 return self._create_error_response( 

1030 error="permission_denied", message=f"Permission denied reading file: {path}" 

1031 ) 

1032 except OSError as e: 

1033 return self._create_error_response( 

1034 error="os_error", message=f"Error reading file {path}: {str(e)}" 

1035 ) 

1036 

1037 # Count occurrences 

1038 occurrences = original_content.count(expected_text) 

1039 

1040 if occurrences == 0: 

1041 return self._create_error_response( 

1042 error="match_not_found", 

1043 message=f"expected_text not found in file: {path}. No changes made.", 

1044 ) 

1045 

1046 if occurrences > 1 and not replace_all: 

1047 return self._create_error_response( 

1048 error="multiple_matches", 

1049 message=f"expected_text found {occurrences} times in {path}. " 

1050 f"Use replace_all=true to replace all occurrences.", 

1051 ) 

1052 

1053 # Perform replacement 

1054 if replace_all: 

1055 new_content = original_content.replace(expected_text, replacement_text) 

1056 replacements = occurrences 

1057 else: 

1058 # Replace only first occurrence (occurrences == 1 at this point) 

1059 new_content = original_content.replace(expected_text, replacement_text, 1) 

1060 replacements = 1 

1061 

1062 # Check new size limit 

1063 new_size = len(new_content.encode("utf-8")) 

1064 if new_size > self.config.filesystem_max_write_bytes: 

1065 return self._create_error_response( 

1066 error="write_too_large", 

1067 message=f"Resulting file size ({new_size} bytes) exceeds max write limit " 

1068 f"({self.config.filesystem_max_write_bytes} bytes)", 

1069 ) 

1070 

1071 # Calculate lines changed (approximate) 

1072 original_lines = original_content.splitlines() 

1073 new_lines = new_content.splitlines() 

1074 lines_changed = abs(len(new_lines) - len(original_lines)) + replacements 

1075 

1076 # Write atomically (temp file + rename) 

1077 try: 

1078 # Create temp file in same directory for atomic rename 

1079 temp_fd, temp_path = tempfile.mkstemp( 

1080 dir=resolved.parent, prefix=f".{resolved.name}.", suffix=".tmp" 

1081 ) 

1082 

1083 try: 

1084 # Write to temp file 

1085 with os.fdopen(temp_fd, "w", encoding="utf-8") as f: 

1086 f.write(new_content) 

1087 

1088 # Atomic rename 

1089 os.replace(temp_path, resolved) 

1090 

1091 except Exception: 

1092 # Clean up temp file on error 

1093 try: 

1094 os.unlink(temp_path) 

1095 except OSError: 

1096 # Ignore errors during temp file cleanup; not critical if deletion fails 

1097 pass 

1098 raise 

1099 

1100 result = { 

1101 "path": path, 

1102 "bytes_written": new_size, 

1103 "replacements": replacements, 

1104 "original_size": original_size, 

1105 "new_size": new_size, 

1106 "lines_changed": lines_changed, 

1107 } 

1108 

1109 return self._create_success_response( 

1110 result=result, 

1111 message=f"Applied {replacements} replacement(s) to {path}", 

1112 ) 

1113 

1114 except PermissionError: 

1115 return self._create_error_response( 

1116 error="permission_denied", message=f"Permission denied writing to: {path}" 

1117 ) 

1118 except OSError as e: 

1119 return self._create_error_response( 

1120 error="os_error", message=f"Error writing to {path}: {str(e)}" 

1121 ) 

1122 

1123 """ 

1124 { 

1125 "name": "create_directory", 

1126 "description": "Create directory within workspace with optional parent creation. Requires filesystem_writes_enabled. Idempotent (success if exists). Returns created flag.", 

1127 "parameters": { 

1128 "type": "object", 

1129 "properties": { 

1130 "path": { 

1131 "type": "string", 

1132 "description": "Directory path relative to workspace" 

1133 }, 

1134 "parents": { 

1135 "type": "boolean", 

1136 "description": "Create parent directories if needed", 

1137 "default": true 

1138 } 

1139 }, 

1140 "required": ["path"] 

1141 } 

1142 } 

1143 """ 

1144 

1145 async def create_directory( 

1146 self, 

1147 path: Annotated[str, Field(description="Directory path relative to workspace")], 

1148 parents: Annotated[bool, Field(description="Create parent directories if needed")] = True, 

1149 ) -> dict: 

1150 """Create directory within workspace with optional parent creation. Requires filesystem_writes_enabled. Idempotent (success if exists). Returns created flag.""" 

1151 # Check if writes are enabled 

1152 if not self.config.filesystem_writes_enabled: 

1153 return self._create_error_response( 

1154 error="writes_disabled", 

1155 message="Filesystem writes are disabled. Set filesystem_writes_enabled=true in configuration.", 

1156 ) 

1157 

1158 # Resolve and validate path 

1159 resolved = self._resolve_path(path) 

1160 if isinstance(resolved, dict): 

1161 return resolved # Error response 

1162 

1163 # Check if path exists 

1164 if resolved.exists(): 

1165 if resolved.is_dir(): 

1166 # Already exists - idempotent success 

1167 result = {"path": path, "created": False, "parents_created": 0} 

1168 return self._create_success_response( 

1169 result=result, message=f"Directory already exists: {path}" 

1170 ) 

1171 else: 

1172 # Exists but is not a directory 

1173 return self._create_error_response( 

1174 error="not_a_directory", message=f"Path exists but is not a directory: {path}" 

1175 ) 

1176 

1177 # Create directory 

1178 try: 

1179 if parents: 

1180 # Count parents that will be created 

1181 parents_to_create = [] 

1182 check_path = resolved.parent 

1183 workspace_root = self._get_workspace_root() 

1184 

1185 while check_path != workspace_root and not check_path.exists(): 

1186 parents_to_create.append(check_path) 

1187 check_path = check_path.parent 

1188 

1189 # Create with parents 

1190 resolved.mkdir(parents=True, exist_ok=True) 

1191 parents_created = len(parents_to_create) 

1192 else: 

1193 # Create without parents (will fail if parent doesn't exist) 

1194 resolved.mkdir(parents=False, exist_ok=True) 

1195 parents_created = 0 

1196 

1197 result = {"path": path, "created": True, "parents_created": parents_created} 

1198 

1199 return self._create_success_response( 

1200 result=result, message=f"Created directory: {path}" 

1201 ) 

1202 

1203 except FileNotFoundError: 

1204 return self._create_error_response( 

1205 error="parent_not_found", 

1206 message=f"Parent directory does not exist: {path}. Use parents=true to create.", 

1207 ) 

1208 except PermissionError: 

1209 return self._create_error_response( 

1210 error="permission_denied", message=f"Permission denied creating directory: {path}" 

1211 ) 

1212 except OSError as e: 

1213 return self._create_error_response( 

1214 error="os_error", message=f"Error creating directory {path}: {str(e)}" 

1215 )