Coverage for src / agent / tools / filesystem.py: 79%
402 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 14:30 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 14:30 +0000
1# Copyright 2025-2026 Microsoft Corporation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Filesystem tools for safe, sandboxed file operations.
17This module provides structured filesystem tools that enable agents to inspect
18and modify files in a controlled workspace without exposing arbitrary OS shell
19execution to the LLM.
21Key Features:
22- Workspace sandboxing with path traversal protection
23- Structured directory listing and file reading
24- Text search with literal and regex support
25- Guarded write operations (disabled by default)
26- Surgical text editing with safety checks
27- Cross-platform path handling
29All operations are sandboxed to workspace_root (defaults to current directory).
30"""
32import logging
33import os
34import re
35import tempfile
36from pathlib import Path
37from typing import Annotated, Any
39from pydantic import Field
41from agent.config.schema import AgentSettings
42from agent.tools.toolset import AgentToolset
44logger = logging.getLogger(__name__)
47class FileSystemTools(AgentToolset):
48 """Filesystem tools for safe, sandboxed file operations.
50 This toolset provides structured file operations with security guarantees:
51 - All paths must be under configured workspace_root
52 - Path traversal attempts are blocked
53 - Symlinks that escape workspace are rejected
54 - Write operations are disabled by default
55 - Size limits prevent resource exhaustion
57 Example:
58 >>> from agent.config import load_config
59 >>> settings = load_config()
60 >>> settings.agent.workspace_root = Path("/home/user/project")
61 >>> tools = FileSystemTools(settings)
62 >>> result = await tools.list_directory(".")
63 >>> print(result)
64 {'success': True, 'result': {'entries': [...], 'truncated': False}}
65 """
67 def __init__(self, settings: AgentSettings):
68 """Initialize FileSystemTools with settings.
70 Args:
71 settings: Agent settings instance with workspace_root
72 """
73 super().__init__(settings)
74 self._workspace_root_cache: Path | None = None
76 def get_tools(self) -> list:
77 """Get list of filesystem tools.
79 Returns:
80 List of filesystem tool functions
81 """
82 return [
83 self.get_path_info,
84 self.list_directory,
85 self.read_file,
86 self.search_text,
87 self.write_file,
88 self.apply_text_edit,
89 self.create_directory,
90 ]
92 def _get_workspace_root(self) -> Path:
93 """Get and cache workspace root from config, environment, or current directory.
95 Priority order:
96 1. Cached value (for performance)
97 2. config.workspace_root (from settings.json)
98 3. AGENT_WORKSPACE_ROOT env var
99 4. Path.cwd() (default fallback)
101 Returns:
102 Resolved workspace root Path (always returns a valid Path)
104 Note:
105 Logs warning if workspace is home directory or filesystem root,
106 as these are risky locations that may expose more than intended.
107 """
108 if self._workspace_root_cache is not None:
109 return self._workspace_root_cache
111 workspace_root: Path | None = None
113 # Check config first (handle legacy configs without workspace_root attribute)
114 if hasattr(self.config, "workspace_root") and self.config.workspace_root is not None:
115 workspace_root = self.config.workspace_root
116 # Check environment variable
117 elif env_root := os.getenv("AGENT_WORKSPACE_ROOT"):
118 workspace_root = Path(env_root).expanduser().resolve()
119 # Default to current working directory
120 else:
121 workspace_root = Path.cwd().resolve()
122 # Log warning for risky workspace locations
123 if workspace_root == Path.home() or workspace_root == Path("/"):
124 logger.warning(
125 f"Workspace is set to {workspace_root}. Consider using a project directory "
126 "or configuring workspace_root in ~/.osdu-agent/settings.json for better security."
127 )
129 self._workspace_root_cache = workspace_root
130 return self._workspace_root_cache
132 def _resolve_path(self, relative_path: str) -> dict | Path:
133 """Resolve and validate path within workspace boundaries.
135 This is the core security function that enforces workspace sandboxing.
136 All filesystem tools MUST call this before any filesystem access.
138 Security checks:
139 1. Workspace root is configured
140 2. No path traversal attempts (../)
141 3. No absolute paths outside workspace
142 4. Symlinks don't escape workspace
144 Args:
145 relative_path: Path relative to workspace root
147 Returns:
148 Resolved Path object if valid, or error dict if validation fails
150 Example:
151 >>> path = self._resolve_path("src/main.py")
152 >>> if isinstance(path, dict):
153 ... return path # Error response
154 >>> # Use path for filesystem operation
155 """
156 workspace_root = self._get_workspace_root()
158 # Validate workspace root exists and is a directory
159 if not workspace_root.exists():
160 return self._create_error_response(
161 error="workspace_not_found",
162 message=f"Workspace root does not exist: {workspace_root}",
163 )
165 if not workspace_root.is_dir():
166 return self._create_error_response(
167 error="workspace_not_directory",
168 message=f"Workspace root is not a directory: {workspace_root}",
169 )
171 # Detect path traversal attempts before resolution
172 if ".." in Path(relative_path).parts:
173 logger.warning(f"Path traversal attempt detected: {relative_path}")
174 return self._create_error_response(
175 error="path_traversal_attempt",
176 message=f"Path contains '..' component: {relative_path}. Path traversal is not allowed.",
177 )
179 # Convert to Path and combine with workspace root
180 try:
181 # Handle both relative and absolute paths
182 requested_path = Path(relative_path)
184 if requested_path.is_absolute():
185 # For absolute paths, verify they start with workspace_root
186 unresolved = requested_path
187 resolved = requested_path.resolve()
188 else:
189 # For relative paths, combine with workspace_root
190 unresolved = workspace_root / requested_path
191 resolved = unresolved.resolve()
193 # Check if resolved path is within workspace
194 try:
195 # Use is_relative_to (Python 3.9+) to check boundaries
196 if not resolved.is_relative_to(workspace_root):
197 logger.warning(
198 f"Path outside workspace: {relative_path} -> {resolved} "
199 f"(workspace: {workspace_root})"
200 )
201 return self._create_error_response(
202 error="path_outside_workspace",
203 message=f"Path resolves outside workspace: {relative_path}",
204 )
205 except (ValueError, TypeError):
206 # Fallback for edge cases
207 return self._create_error_response(
208 error="path_outside_workspace",
209 message=f"Path resolves outside workspace: {relative_path}",
210 )
212 # Additional check: if symlink, verify target is in workspace
213 if unresolved.is_symlink():
214 try:
215 if not resolved.is_relative_to(workspace_root):
216 logger.warning(
217 f"Symlink target outside workspace: {relative_path} -> {resolved}"
218 )
219 return self._create_error_response(
220 error="symlink_outside_workspace",
221 message=f"Symlink target is outside workspace: {relative_path}",
222 )
223 except (ValueError, TypeError):
224 return self._create_error_response(
225 error="symlink_outside_workspace",
226 message=f"Symlink target is outside workspace: {relative_path}",
227 )
229 logger.debug(f"Path resolved: {relative_path} -> {resolved}")
230 return resolved
232 except (OSError, RuntimeError) as e:
233 logger.error(f"Error resolving path {relative_path}: {e}")
234 return self._create_error_response(
235 error="path_resolution_failed",
236 message=f"Failed to resolve path: {relative_path}. Error: {str(e)}",
237 )
239 """
240 {
241 "name": "get_path_info",
242 "description": "Get file/directory metadata within workspace. Returns exists, type, size, permissions, timestamps.",
243 "parameters": {
244 "type": "object",
245 "properties": {
246 "path": {
247 "type": "string",
248 "description": "Path relative to workspace root",
249 "default": "."
250 }
251 },
252 "required": []
253 }
254 }
255 """
257 async def get_path_info(
258 self, path: Annotated[str, Field(description="Path relative to workspace root")] = "."
259 ) -> dict:
260 """Get file/directory metadata within workspace. Returns exists, type, size, permissions, timestamps."""
261 # Resolve and validate path
262 resolved = self._resolve_path(path)
263 if isinstance(resolved, dict):
264 return resolved # Error response
266 # Get unresolved path for symlink detection
267 workspace_root = self._get_workspace_root()
268 requested_path = Path(path)
269 if requested_path.is_absolute():
270 unresolved = requested_path
271 else:
272 unresolved = workspace_root / requested_path
274 # Gather metadata
275 try:
276 exists = resolved.exists()
278 if not exists:
279 info: dict[str, Any] = {
280 "exists": False,
281 "type": None,
282 "size": None,
283 "modified": None,
284 "is_readable": False,
285 "is_writable": False,
286 "absolute_path": str(resolved),
287 }
288 return self._create_success_response(
289 result=info, message=f"Path does not exist: {path}"
290 )
292 # Determine type - check symlink before resolved types
293 if unresolved.is_symlink():
294 path_type = "symlink"
295 size = None
296 elif resolved.is_file():
297 path_type = "file"
298 size = resolved.stat().st_size
299 elif resolved.is_dir():
300 path_type = "directory"
301 size = None
302 else:
303 path_type = "other"
304 size = None
306 # Get modification time
307 modified = resolved.stat().st_mtime
309 # Check permissions
310 is_readable = os.access(resolved, os.R_OK)
311 is_writable = os.access(resolved, os.W_OK)
313 exists_info: dict[str, Any] = {
314 "exists": True,
315 "type": path_type,
316 "size": size,
317 "modified": modified,
318 "is_readable": is_readable,
319 "is_writable": is_writable,
320 "absolute_path": str(resolved),
321 }
323 return self._create_success_response(
324 result=exists_info, message=f"Retrieved metadata for: {path}"
325 )
327 except PermissionError:
328 return self._create_error_response(
329 error="permission_denied",
330 message=f"Permission denied accessing: {path}",
331 )
332 except OSError as e:
333 return self._create_error_response(
334 error="os_error", message=f"OS error accessing {path}: {str(e)}"
335 )
337 """
338 {
339 "name": "list_directory",
340 "description": "List directory contents within workspace with metadata. Supports recursive traversal. Default: 200 entries max, excludes hidden files. Returns entries with type and size.",
341 "parameters": {
342 "type": "object",
343 "properties": {
344 "path": {
345 "type": "string",
346 "description": "Directory path relative to workspace",
347 "default": "."
348 },
349 "recursive": {
350 "type": "boolean",
351 "description": "Recursively list subdirectories",
352 "default": false
353 },
354 "max_entries": {
355 "type": "integer",
356 "description": "Maximum entries to return",
357 "default": 200
358 },
359 "include_hidden": {
360 "type": "boolean",
361 "description": "Include hidden files (dotfiles)",
362 "default": false
363 }
364 },
365 "required": []
366 }
367 }
368 """
370 async def list_directory(
371 self,
372 path: Annotated[str, Field(description="Directory path relative to workspace")] = ".",
373 recursive: Annotated[bool, Field(description="Recursively list subdirectories")] = False,
374 max_entries: Annotated[int, Field(description="Maximum entries to return")] = 200,
375 include_hidden: Annotated[
376 bool, Field(description="Include hidden files (dotfiles)")
377 ] = False,
378 ) -> dict:
379 """List directory contents within workspace with metadata. Supports recursive traversal. Default: 200 entries max, excludes hidden files. Returns entries with type and size."""
380 # Cap max_entries at 500
381 max_entries = min(max_entries, 500)
383 # Resolve and validate path
384 resolved = self._resolve_path(path)
385 if isinstance(resolved, dict):
386 return resolved # Error response
388 # Check if path exists and is a directory
389 if not resolved.exists():
390 return self._create_error_response(error="not_found", message=f"Path not found: {path}")
392 if not resolved.is_dir():
393 return self._create_error_response(
394 error="not_a_directory", message=f"Path is not a directory: {path}"
395 )
397 # Get workspace root for relative path calculation
398 workspace_root = self._get_workspace_root()
399 entries: list[dict[str, Any]] = []
400 truncated = False
402 try:
403 if recursive:
404 # Recursive walk
405 for root, dirs, files in os.walk(resolved):
406 root_path = Path(root)
408 # Filter hidden directories if needed
409 if not include_hidden:
410 dirs[:] = [d for d in dirs if not d.startswith(".")]
412 # Add directories
413 for dir_name in dirs:
414 if len(entries) >= max_entries:
415 truncated = True
416 break
418 dir_path = root_path / dir_name
419 relative = dir_path.relative_to(workspace_root)
421 entries.append(
422 {
423 "name": dir_name,
424 "relative_path": str(relative),
425 "type": "directory",
426 "size": None,
427 }
428 )
430 if truncated:
431 break
433 # Filter and add files
434 if not include_hidden:
435 files = [f for f in files if not f.startswith(".")]
437 for file_name in files:
438 if len(entries) >= max_entries:
439 truncated = True
440 break
442 file_path = root_path / file_name
443 relative = file_path.relative_to(workspace_root)
445 try:
446 size = file_path.stat().st_size
447 except OSError:
448 size = None
450 entries.append(
451 {
452 "name": file_name,
453 "relative_path": str(relative),
454 "type": "file",
455 "size": size,
456 }
457 )
459 if truncated:
460 break
462 else:
463 # Non-recursive listing
464 for entry in resolved.iterdir():
465 if len(entries) >= max_entries:
466 truncated = True
467 break
469 # Skip hidden files if requested
470 if not include_hidden and entry.name.startswith("."):
471 continue
473 relative = entry.relative_to(workspace_root)
475 if entry.is_dir():
476 entries.append(
477 {
478 "name": entry.name,
479 "relative_path": str(relative),
480 "type": "directory",
481 "size": None,
482 }
483 )
484 elif entry.is_file():
485 try:
486 size = entry.stat().st_size
487 except OSError:
488 size = None
490 entries.append(
491 {
492 "name": entry.name,
493 "relative_path": str(relative),
494 "type": "file",
495 "size": size,
496 }
497 )
499 result = {"entries": entries, "truncated": truncated}
501 return self._create_success_response(
502 result=result, message=f"Listed {len(entries)} entries from: {path}"
503 )
505 except PermissionError:
506 return self._create_error_response(
507 error="permission_denied", message=f"Permission denied reading directory: {path}"
508 )
509 except OSError as e:
510 return self._create_error_response(
511 error="os_error", message=f"OS error listing directory {path}: {str(e)}"
512 )
514 """
515 {
516 "name": "read_file",
517 "description": "Read text file within workspace by line range. Paths relative to workspace root. Default: first 200 lines. Returns content with truncation flag for large files.",
518 "parameters": {
519 "type": "object",
520 "properties": {
521 "path": {
522 "type": "string",
523 "description": "File path relative to workspace"
524 },
525 "start_line": {
526 "type": "integer",
527 "description": "Starting line number (1-based)",
528 "default": 1
529 },
530 "max_lines": {
531 "type": "integer",
532 "description": "Maximum lines to read",
533 "default": 200
534 }
535 },
536 "required": ["path"]
537 }
538 }
539 """
541 async def read_file(
542 self,
543 path: Annotated[str, Field(description="File path relative to workspace")],
544 start_line: Annotated[int, Field(description="Starting line number (1-based)")] = 1,
545 max_lines: Annotated[int, Field(description="Maximum lines to read")] = 200,
546 ) -> dict:
547 """Read text file within workspace by line range. Paths relative to workspace root. Default: first 200 lines. Returns content with truncation flag for large files."""
548 # Cap max_lines at 1000
549 max_lines = min(max_lines, 1000)
551 # Resolve and validate path
552 resolved = self._resolve_path(path)
553 if isinstance(resolved, dict):
554 return resolved # Error response
556 # Check if path exists and is a file
557 if not resolved.exists():
558 return self._create_error_response(error="not_found", message=f"File not found: {path}")
560 if not resolved.is_file():
561 return self._create_error_response(
562 error="not_a_file", message=f"Path is not a file: {path}"
563 )
565 # Check file size limit
566 try:
567 file_size = resolved.stat().st_size
568 if file_size > self.config.filesystem_max_read_bytes:
569 return self._create_error_response(
570 error="file_too_large",
571 message=f"File size ({file_size} bytes) exceeds max read limit "
572 f"({self.config.filesystem_max_read_bytes} bytes): {path}",
573 )
574 except OSError as e:
575 return self._create_error_response(
576 error="os_error", message=f"Error getting file size for {path}: {str(e)}"
577 )
579 # Detect binary files (check first 8KB for null bytes)
580 try:
581 with open(resolved, "rb") as f:
582 sample = f.read(8192)
583 if b"\x00" in sample:
584 return self._create_error_response(
585 error="is_binary",
586 message=f"File appears to be binary (contains null bytes): {path}",
587 )
588 except OSError as e:
589 return self._create_error_response(
590 error="permission_denied", message=f"Cannot read file {path}: {str(e)}"
591 )
593 # Read file contents
594 try:
595 with open(resolved, encoding="utf-8", errors="replace") as f:
596 lines = f.readlines()
598 total_lines = len(lines)
600 # Check if start_line is valid
601 if start_line < 1:
602 start_line = 1
604 # Allow start_line beyond file length for empty files
605 if total_lines > 0 and start_line > total_lines:
606 return self._create_error_response(
607 error="line_out_of_range",
608 message=f"start_line ({start_line}) exceeds file length ({total_lines} lines): {path}",
609 )
611 # Calculate slice (convert to 0-based indexing)
612 start_idx = start_line - 1
613 end_idx = min(start_idx + max_lines, total_lines)
615 # Extract lines
616 selected_lines = lines[start_idx:end_idx]
617 content = "".join(selected_lines)
619 # Check if truncated
620 truncated = end_idx < total_lines
621 next_start_line = end_idx + 1 if truncated else None
622 actual_end_line = end_idx # 1-based
624 # Check if encoding errors occurred (look for replacement character)
625 encoding_errors = "\ufffd" in content
627 result = {
628 "path": path,
629 "start_line": start_line,
630 "end_line": actual_end_line,
631 "total_lines": total_lines,
632 "truncated": truncated,
633 "next_start_line": next_start_line,
634 "content": content,
635 "encoding_errors": encoding_errors,
636 }
638 return self._create_success_response(
639 result=result,
640 message=f"Read {len(selected_lines)} lines from {path} (lines {start_line}-{actual_end_line})",
641 )
643 except PermissionError:
644 return self._create_error_response(
645 error="permission_denied", message=f"Permission denied reading file: {path}"
646 )
647 except OSError as e:
648 return self._create_error_response(
649 error="os_error", message=f"Error reading file {path}: {str(e)}"
650 )
652 """
653 {
654 "name": "search_text",
655 "description": "Search text patterns across files in workspace. Supports literal (default) and regex modes. Case-sensitive by default. Max 50 matches. Returns matches with file, line, snippet.",
656 "parameters": {
657 "type": "object",
658 "properties": {
659 "query": {
660 "type": "string",
661 "description": "Search pattern (literal or regex)"
662 },
663 "path": {
664 "type": "string",
665 "description": "Directory or file to search",
666 "default": "."
667 },
668 "glob": {
669 "type": "string",
670 "description": "File pattern (e.g., '*.py', 'src/**/*.ts')",
671 "default": "**/*"
672 },
673 "max_matches": {
674 "type": "integer",
675 "description": "Maximum matches to return",
676 "default": 50
677 },
678 "use_regex": {
679 "type": "boolean",
680 "description": "Enable regex mode",
681 "default": false
682 },
683 "case_sensitive": {
684 "type": "boolean",
685 "description": "Case-sensitive search",
686 "default": true
687 }
688 },
689 "required": ["query"]
690 }
691 }
692 """
694 async def search_text(
695 self,
696 query: Annotated[str, Field(description="Search pattern (literal or regex)")],
697 path: Annotated[str, Field(description="Directory or file to search")] = ".",
698 glob: Annotated[
699 str, Field(description="File pattern (e.g., '*.py', 'src/**/*.ts')")
700 ] = "**/*",
701 max_matches: Annotated[int, Field(description="Maximum matches to return")] = 50,
702 use_regex: Annotated[bool, Field(description="Enable regex mode")] = False,
703 case_sensitive: Annotated[bool, Field(description="Case-sensitive search")] = True,
704 ) -> dict:
705 """Search text patterns across files in workspace. Supports literal (default) and regex modes. Case-sensitive by default. Max 50 matches. Returns matches with file, line, snippet."""
707 # Resolve and validate path
708 resolved = self._resolve_path(path)
709 if isinstance(resolved, dict):
710 return resolved # Error response
712 # Check if path exists
713 if not resolved.exists():
714 return self._create_error_response(error="not_found", message=f"Path not found: {path}")
716 # Compile regex if needed
717 regex_pattern = None
718 if use_regex:
719 try:
720 flags = 0 if case_sensitive else re.IGNORECASE
721 regex_pattern = re.compile(query, flags)
722 except re.error as e:
723 return self._create_error_response(
724 error="invalid_regex", message=f"Invalid regex pattern '{query}': {str(e)}"
725 )
727 # Collect files to search
728 files_to_search = []
729 workspace_root = self._get_workspace_root()
731 try:
732 if resolved.is_file():
733 # Single file search
734 files_to_search = [resolved]
735 elif resolved.is_dir():
736 # Directory search with glob filtering
737 # Use pathlib's glob matching instead of fnmatch for ** support
738 if glob == "**/*" or glob == "*":
739 # Match all files (optimization)
740 for file_path in resolved.rglob("*"):
741 if file_path.is_file():
742 files_to_search.append(file_path)
743 else:
744 # Use glob pattern matching
745 for file_path in resolved.glob(glob):
746 if file_path.is_file():
747 files_to_search.append(file_path)
748 # Also try rglob if glob contains **
749 if "**" in glob:
750 for file_path in resolved.glob(glob):
751 if file_path.is_file() and file_path not in files_to_search:
752 files_to_search.append(file_path)
753 else:
754 return self._create_error_response(
755 error="invalid_path_type", message=f"Path is neither file nor directory: {path}"
756 )
757 except (OSError, PermissionError) as e:
758 return self._create_error_response(
759 error="os_error", message=f"Error accessing path {path}: {str(e)}"
760 )
762 # Search files
763 matches: list[dict[str, Any]] = []
764 files_searched = 0
765 truncated = False
767 for file_path in files_to_search:
768 if len(matches) >= max_matches:
769 truncated = True
770 break
772 files_searched += 1
774 try:
775 # Skip binary files (check for null bytes in first 8KB)
776 with open(file_path, "rb") as f:
777 sample = f.read(8192)
778 if b"\x00" in sample:
779 continue # Skip binary file
781 # Search file contents
782 with open(file_path, encoding="utf-8", errors="replace") as f:
783 for line_num, line in enumerate(f, start=1):
784 if len(matches) >= max_matches:
785 truncated = True
786 break
788 # Perform search
789 if use_regex:
790 # Regex search
791 if regex_pattern is not None:
792 match_obj = regex_pattern.search(line)
793 else:
794 match_obj = None
795 if match_obj:
796 match_start = match_obj.start()
797 match_end = match_obj.end()
798 else:
799 continue
800 else:
801 # Literal search
802 search_line = line if case_sensitive else line.lower()
803 search_query = query if case_sensitive else query.lower()
805 match_start = search_line.find(search_query)
806 if match_start == -1:
807 continue
808 match_end = match_start + len(search_query)
810 # Truncate snippet to 200 chars
811 snippet = line.strip()
812 if len(snippet) > 200:
813 snippet = snippet[:200] + "..."
815 # Get relative path for result
816 relative_path = file_path.relative_to(workspace_root)
818 matches.append(
819 {
820 "file": str(relative_path),
821 "line": line_num,
822 "snippet": snippet,
823 "match_start": match_start,
824 "match_end": match_end,
825 }
826 )
828 except (OSError, PermissionError):
829 # Skip files we can't read
830 continue
831 except Exception as e:
832 # Log unexpected errors but continue searching
833 logger.warning(f"Unexpected error searching {file_path}: {e}")
834 continue
836 result = {
837 "query": query,
838 "use_regex": use_regex,
839 "files_searched": files_searched,
840 "matches": matches,
841 "truncated": truncated,
842 }
844 return self._create_success_response(
845 result=result,
846 message=f"Found {len(matches)} matches in {files_searched} files",
847 )
849 """
850 {
851 "name": "write_file",
852 "description": "Write file within workspace with safety checks. Requires filesystem_writes_enabled. Supports create/overwrite/append modes. Returns bytes written and mode used.",
853 "parameters": {
854 "type": "object",
855 "properties": {
856 "path": {
857 "type": "string",
858 "description": "File path relative to workspace"
859 },
860 "content": {
861 "type": "string",
862 "description": "Content to write"
863 },
864 "mode": {
865 "type": "string",
866 "description": "Write mode: create, overwrite, append",
867 "default": "create"
868 }
869 },
870 "required": ["path", "content"]
871 }
872 }
873 """
875 async def write_file(
876 self,
877 path: Annotated[str, Field(description="File path relative to workspace")],
878 content: Annotated[str, Field(description="Content to write")],
879 mode: Annotated[str, Field(description="Write mode: create, overwrite, append")] = "create",
880 ) -> dict:
881 """Write file within workspace with safety checks. Requires filesystem_writes_enabled. Supports create/overwrite/append modes. Returns bytes written and mode used."""
882 # Check if writes are enabled
883 if not self.config.filesystem_writes_enabled:
884 return self._create_error_response(
885 error="writes_disabled",
886 message="Filesystem writes are disabled. Set filesystem_writes_enabled=true in configuration.",
887 )
889 # Validate mode
890 valid_modes = ["create", "overwrite", "append"]
891 if mode not in valid_modes:
892 return self._create_error_response(
893 error="invalid_mode",
894 message=f"Invalid mode '{mode}'. Valid modes: {', '.join(valid_modes)}",
895 )
897 # Check content size limit
898 content_bytes = len(content.encode("utf-8"))
899 if content_bytes > self.config.filesystem_max_write_bytes:
900 return self._create_error_response(
901 error="write_too_large",
902 message=f"Content size ({content_bytes} bytes) exceeds max write limit "
903 f"({self.config.filesystem_max_write_bytes} bytes)",
904 )
906 # Resolve and validate path
907 resolved = self._resolve_path(path)
908 if isinstance(resolved, dict):
909 return resolved # Error response
911 # Check mode-specific preconditions
912 existed_before = resolved.exists()
914 if mode == "create" and existed_before:
915 return self._create_error_response(
916 error="file_exists",
917 message=f"File already exists (mode=create): {path}. Use mode='overwrite' to replace or mode='append' to add content.",
918 )
920 if mode == "overwrite" and not existed_before:
921 # Allow overwrite to create new file (like mode='create')
922 pass
924 # Perform write operation
925 try:
926 if mode == "append":
927 # Append mode
928 with open(resolved, "a", encoding="utf-8") as f:
929 f.write(content)
930 else:
931 # Create or overwrite mode
932 with open(resolved, "w", encoding="utf-8") as f:
933 f.write(content)
935 result = {
936 "path": path,
937 "bytes_written": content_bytes,
938 "mode": mode,
939 "existed_before": existed_before,
940 }
942 return self._create_success_response(
943 result=result, message=f"Wrote {content_bytes} bytes to {path} (mode={mode})"
944 )
946 except PermissionError:
947 return self._create_error_response(
948 error="permission_denied", message=f"Permission denied writing to: {path}"
949 )
950 except OSError as e:
951 return self._create_error_response(
952 error="os_error", message=f"Error writing to {path}: {str(e)}"
953 )
955 """
956 {
957 "name": "apply_text_edit",
958 "description": "Apply exact text replacement in file within workspace. Requires filesystem_writes_enabled and exact match. Use replace_all for multiple occurrences. Returns replacement count and size delta.",
959 "parameters": {
960 "type": "object",
961 "properties": {
962 "path": {
963 "type": "string",
964 "description": "File path relative to workspace"
965 },
966 "expected_text": {
967 "type": "string",
968 "description": "Exact text to find and replace"
969 },
970 "replacement_text": {
971 "type": "string",
972 "description": "Replacement text"
973 },
974 "replace_all": {
975 "type": "boolean",
976 "description": "Replace all occurrences",
977 "default": false
978 }
979 },
980 "required": ["path", "expected_text", "replacement_text"]
981 }
982 }
983 """
985 async def apply_text_edit(
986 self,
987 path: Annotated[str, Field(description="File path relative to workspace")],
988 expected_text: Annotated[str, Field(description="Exact text to find and replace")],
989 replacement_text: Annotated[str, Field(description="Replacement text")],
990 replace_all: Annotated[bool, Field(description="Replace all occurrences")] = False,
991 ) -> dict:
992 """Apply exact text replacement in file within workspace. Requires filesystem_writes_enabled and exact match. Use replace_all for multiple occurrences. Returns replacement count and size delta."""
993 # Check if writes are enabled
994 if not self.config.filesystem_writes_enabled:
995 return self._create_error_response(
996 error="writes_disabled",
997 message="Filesystem writes are disabled. Set filesystem_writes_enabled=true in configuration.",
998 )
1000 # Validate expected_text
1001 if not expected_text:
1002 return self._create_error_response(
1003 error="empty_expected_text",
1004 message="expected_text cannot be empty. Provide exact text to match.",
1005 )
1007 # Resolve and validate path
1008 resolved = self._resolve_path(path)
1009 if isinstance(resolved, dict):
1010 return resolved # Error response
1012 # Check if file exists and is a regular file
1013 if not resolved.exists():
1014 return self._create_error_response(error="not_found", message=f"File not found: {path}")
1016 if not resolved.is_file():
1017 return self._create_error_response(
1018 error="not_a_file", message=f"Path is not a file: {path}"
1019 )
1021 # Read file contents
1022 try:
1023 with open(resolved, encoding="utf-8", errors="replace") as f:
1024 original_content = f.read()
1026 original_size = len(original_content.encode("utf-8"))
1028 except PermissionError:
1029 return self._create_error_response(
1030 error="permission_denied", message=f"Permission denied reading file: {path}"
1031 )
1032 except OSError as e:
1033 return self._create_error_response(
1034 error="os_error", message=f"Error reading file {path}: {str(e)}"
1035 )
1037 # Count occurrences
1038 occurrences = original_content.count(expected_text)
1040 if occurrences == 0:
1041 return self._create_error_response(
1042 error="match_not_found",
1043 message=f"expected_text not found in file: {path}. No changes made.",
1044 )
1046 if occurrences > 1 and not replace_all:
1047 return self._create_error_response(
1048 error="multiple_matches",
1049 message=f"expected_text found {occurrences} times in {path}. "
1050 f"Use replace_all=true to replace all occurrences.",
1051 )
1053 # Perform replacement
1054 if replace_all:
1055 new_content = original_content.replace(expected_text, replacement_text)
1056 replacements = occurrences
1057 else:
1058 # Replace only first occurrence (occurrences == 1 at this point)
1059 new_content = original_content.replace(expected_text, replacement_text, 1)
1060 replacements = 1
1062 # Check new size limit
1063 new_size = len(new_content.encode("utf-8"))
1064 if new_size > self.config.filesystem_max_write_bytes:
1065 return self._create_error_response(
1066 error="write_too_large",
1067 message=f"Resulting file size ({new_size} bytes) exceeds max write limit "
1068 f"({self.config.filesystem_max_write_bytes} bytes)",
1069 )
1071 # Calculate lines changed (approximate)
1072 original_lines = original_content.splitlines()
1073 new_lines = new_content.splitlines()
1074 lines_changed = abs(len(new_lines) - len(original_lines)) + replacements
1076 # Write atomically (temp file + rename)
1077 try:
1078 # Create temp file in same directory for atomic rename
1079 temp_fd, temp_path = tempfile.mkstemp(
1080 dir=resolved.parent, prefix=f".{resolved.name}.", suffix=".tmp"
1081 )
1083 try:
1084 # Write to temp file
1085 with os.fdopen(temp_fd, "w", encoding="utf-8") as f:
1086 f.write(new_content)
1088 # Atomic rename
1089 os.replace(temp_path, resolved)
1091 except Exception:
1092 # Clean up temp file on error
1093 try:
1094 os.unlink(temp_path)
1095 except OSError:
1096 # Ignore errors during temp file cleanup; not critical if deletion fails
1097 pass
1098 raise
1100 result = {
1101 "path": path,
1102 "bytes_written": new_size,
1103 "replacements": replacements,
1104 "original_size": original_size,
1105 "new_size": new_size,
1106 "lines_changed": lines_changed,
1107 }
1109 return self._create_success_response(
1110 result=result,
1111 message=f"Applied {replacements} replacement(s) to {path}",
1112 )
1114 except PermissionError:
1115 return self._create_error_response(
1116 error="permission_denied", message=f"Permission denied writing to: {path}"
1117 )
1118 except OSError as e:
1119 return self._create_error_response(
1120 error="os_error", message=f"Error writing to {path}: {str(e)}"
1121 )
1123 """
1124 {
1125 "name": "create_directory",
1126 "description": "Create directory within workspace with optional parent creation. Requires filesystem_writes_enabled. Idempotent (success if exists). Returns created flag.",
1127 "parameters": {
1128 "type": "object",
1129 "properties": {
1130 "path": {
1131 "type": "string",
1132 "description": "Directory path relative to workspace"
1133 },
1134 "parents": {
1135 "type": "boolean",
1136 "description": "Create parent directories if needed",
1137 "default": true
1138 }
1139 },
1140 "required": ["path"]
1141 }
1142 }
1143 """
1145 async def create_directory(
1146 self,
1147 path: Annotated[str, Field(description="Directory path relative to workspace")],
1148 parents: Annotated[bool, Field(description="Create parent directories if needed")] = True,
1149 ) -> dict:
1150 """Create directory within workspace with optional parent creation. Requires filesystem_writes_enabled. Idempotent (success if exists). Returns created flag."""
1151 # Check if writes are enabled
1152 if not self.config.filesystem_writes_enabled:
1153 return self._create_error_response(
1154 error="writes_disabled",
1155 message="Filesystem writes are disabled. Set filesystem_writes_enabled=true in configuration.",
1156 )
1158 # Resolve and validate path
1159 resolved = self._resolve_path(path)
1160 if isinstance(resolved, dict):
1161 return resolved # Error response
1163 # Check if path exists
1164 if resolved.exists():
1165 if resolved.is_dir():
1166 # Already exists - idempotent success
1167 result = {"path": path, "created": False, "parents_created": 0}
1168 return self._create_success_response(
1169 result=result, message=f"Directory already exists: {path}"
1170 )
1171 else:
1172 # Exists but is not a directory
1173 return self._create_error_response(
1174 error="not_a_directory", message=f"Path exists but is not a directory: {path}"
1175 )
1177 # Create directory
1178 try:
1179 if parents:
1180 # Count parents that will be created
1181 parents_to_create = []
1182 check_path = resolved.parent
1183 workspace_root = self._get_workspace_root()
1185 while check_path != workspace_root and not check_path.exists():
1186 parents_to_create.append(check_path)
1187 check_path = check_path.parent
1189 # Create with parents
1190 resolved.mkdir(parents=True, exist_ok=True)
1191 parents_created = len(parents_to_create)
1192 else:
1193 # Create without parents (will fail if parent doesn't exist)
1194 resolved.mkdir(parents=False, exist_ok=True)
1195 parents_created = 0
1197 result = {"path": path, "created": True, "parents_created": parents_created}
1199 return self._create_success_response(
1200 result=result, message=f"Created directory: {path}"
1201 )
1203 except FileNotFoundError:
1204 return self._create_error_response(
1205 error="parent_not_found",
1206 message=f"Parent directory does not exist: {path}. Use parents=true to create.",
1207 )
1208 except PermissionError:
1209 return self._create_error_response(
1210 error="permission_denied", message=f"Permission denied creating directory: {path}"
1211 )
1212 except OSError as e:
1213 return self._create_error_response(
1214 error="os_error", message=f"Error creating directory {path}: {str(e)}"
1215 )