Skip to main content
Glama
file_system_agent.py27.4 kB
#!/usr/bin/env python3 """ File System Test Agent for MCP Integration Validates that MCP file operations correctly affect the host system and that host system changes are properly reflected through MCP. """ import os import sys import json import asyncio import tempfile import hashlib from pathlib import Path from typing import Dict, Any, List, Optional from datetime import datetime import logging # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent)) from test_agent_base import TestAgentBase logger = logging.getLogger(__name__) class FileSystemAgent(TestAgentBase): """Test agent for validating file system operations through MCP.""" def __init__(self): super().__init__( name="FileSystemAgent", description="Validates MCP file system operations with bidirectional verification" ) self.test_dir = self.test_base_dir / "test-filesystem" self.test_files = [] async def validate_prerequisites(self) -> bool: """Validate file system test prerequisites.""" try: # Create test directory self.test_dir.mkdir(parents=True, exist_ok=True) # Verify write permissions test_file = self.test_dir / ".prereq-test" test_file.write_text("test") content = test_file.read_text() test_file.unlink() return content == "test" except Exception as e: logger.error(f"Prerequisites validation failed: {e}") return False async def execute_test_scenarios(self) -> List[Dict[str, Any]]: """ Execute comprehensive file system test scenarios. Tests: 1. File creation through MCP → verify on host 2. File modification through MCP → verify changes 3. Directory operations through MCP → verify structure 4. Host file creation → verify through MCP 5. Concurrent operations → verify consistency 6. Permission handling → verify security """ test_results = [] # Test 1: File Creation Through MCP logger.info("Test 1: File creation through MCP") result = await self._test_file_creation_mcp() test_results.append(result) # Test 2: File Modification Through MCP logger.info("Test 2: File modification through MCP") result = await self._test_file_modification_mcp() test_results.append(result) # Test 3: Directory Operations Through MCP logger.info("Test 3: Directory operations through MCP") result = await self._test_directory_operations_mcp() test_results.append(result) # Test 4: Host File Creation → MCP Detection logger.info("Test 4: Host file creation and MCP detection") result = await self._test_host_file_creation() test_results.append(result) # Test 5: Concurrent Operations logger.info("Test 5: Concurrent file operations") result = await self._test_concurrent_operations() test_results.append(result) # Test 6: Permission Handling logger.info("Test 6: File permission handling") result = await self._test_permission_handling() test_results.append(result) # Test 7: Large File Handling logger.info("Test 7: Large file operations") result = await self._test_large_file_handling() test_results.append(result) # Test 8: Symbolic Links and Special Files logger.info("Test 8: Symbolic links and special files") result = await self._test_special_files() test_results.append(result) self.test_results = test_results return test_results async def _test_file_creation_mcp(self) -> Dict[str, Any]: """Test file creation through MCP and verify on host.""" test_name = "file_creation_mcp" test_file = self.test_dir / "mcp-created-file.txt" test_content = f"Created by MCP at {datetime.now().isoformat()}" try: # Execute MCP operation to create file mcp_result = await self.execute_mcp_operation( "create_file", { "path": str(test_file), "content": test_content, "mode": "w" } ) # Validate host system change host_validation = await self.validate_host_system_change({ "type": "file_created", "path": str(test_file), "content": test_content }) # Additional validation: file metadata metadata_valid = False if test_file.exists(): stat = test_file.stat() metadata_valid = ( stat.st_size == len(test_content) and stat.st_mode & 0o777 > 0 ) self.test_files.append(test_file) return { "test": test_name, "mcp_success": mcp_result["success"], "host_validation": host_validation, "metadata_valid": metadata_valid, "passed": mcp_result["success"] and host_validation and metadata_valid, "details": { "file_path": str(test_file), "content_length": len(test_content), "mcp_result": mcp_result } } except Exception as e: logger.error(f"Test {test_name} failed: {e}") return { "test": test_name, "passed": False, "error": str(e) } async def _test_file_modification_mcp(self) -> Dict[str, Any]: """Test file modification through MCP.""" test_name = "file_modification_mcp" test_file = self.test_dir / "mcp-modified-file.txt" try: # Create initial file initial_content = "Initial content" test_file.write_text(initial_content) initial_hash = hashlib.sha256(initial_content.encode()).hexdigest() # Modify through MCP modified_content = f"Modified by MCP at {datetime.now().isoformat()}" mcp_result = await self.execute_mcp_operation( "modify_file", { "path": str(test_file), "content": modified_content, "mode": "w" } ) # Validate modification host_validation = await self.validate_host_system_change({ "type": "file_modified", "path": str(test_file), "contains": "Modified by MCP" }) # Verify content hash changed hash_changed = False if test_file.exists(): new_content = test_file.read_text() new_hash = hashlib.sha256(new_content.encode()).hexdigest() hash_changed = new_hash != initial_hash self.test_files.append(test_file) return { "test": test_name, "mcp_success": mcp_result["success"], "host_validation": host_validation, "hash_changed": hash_changed, "passed": mcp_result["success"] and host_validation and hash_changed, "details": { "initial_hash": initial_hash, "modification_detected": hash_changed } } except Exception as e: logger.error(f"Test {test_name} failed: {e}") return { "test": test_name, "passed": False, "error": str(e) } async def _test_directory_operations_mcp(self) -> Dict[str, Any]: """Test directory operations through MCP.""" test_name = "directory_operations_mcp" test_dir = self.test_dir / "mcp-test-directory" nested_dir = test_dir / "nested" / "deep" / "structure" try: # Create directory structure through MCP mcp_result = await self.execute_mcp_operation( "create_directory", { "path": str(nested_dir), "parents": True } ) # Validate directory creation dir_validation = await self.validate_host_system_change({ "type": "directory_created", "path": str(nested_dir) }) # Create files in directory files_created = 0 for i in range(3): file_path = nested_dir / f"test-file-{i}.txt" file_result = await self.execute_mcp_operation( "create_file", { "path": str(file_path), "content": f"Test file {i}" } ) if file_result["success"] and file_path.exists(): files_created += 1 self.test_files.append(file_path) # List directory through MCP list_result = await self.execute_mcp_operation( "list_directory", {"path": str(nested_dir)} ) # Validate listing listing_valid = ( list_result["success"] and len(list_result.get("result", {}).get("files", [])) == files_created ) return { "test": test_name, "mcp_success": mcp_result["success"], "dir_validation": dir_validation, "files_created": files_created == 3, "listing_valid": listing_valid, "passed": all([ mcp_result["success"], dir_validation, files_created == 3, listing_valid ]), "details": { "directory_path": str(nested_dir), "files_created_count": files_created } } except Exception as e: logger.error(f"Test {test_name} failed: {e}") return { "test": test_name, "passed": False, "error": str(e) } async def _test_host_file_creation(self) -> Dict[str, Any]: """Test host file creation and MCP detection.""" test_name = "host_file_creation" host_file = self.test_dir / "host-created-file.txt" try: # Create file directly on host host_content = f"Created by host at {datetime.now().isoformat()}" host_file.write_text(host_content) self.test_files.append(host_file) # Small delay to ensure file system sync await asyncio.sleep(0.1) # Read file through MCP mcp_result = await self.execute_mcp_operation( "read_file", {"path": str(host_file)} ) # Validate MCP can see the file mcp_sees_file = ( mcp_result["success"] and mcp_result.get("result", {}).get("content") == host_content ) # Get file info through MCP info_result = await self.execute_mcp_operation( "get_file_info", {"path": str(host_file)} ) # Validate file metadata metadata_valid = ( info_result["success"] and info_result.get("result", {}).get("size") == len(host_content) ) return { "test": test_name, "host_file_created": host_file.exists(), "mcp_sees_file": mcp_sees_file, "metadata_valid": metadata_valid, "passed": host_file.exists() and mcp_sees_file and metadata_valid, "details": { "file_path": str(host_file), "content_length": len(host_content), "mcp_read_success": mcp_result["success"] } } except Exception as e: logger.error(f"Test {test_name} failed: {e}") return { "test": test_name, "passed": False, "error": str(e) } async def _test_concurrent_operations(self) -> Dict[str, Any]: """Test concurrent file operations for consistency.""" test_name = "concurrent_operations" concurrent_dir = self.test_dir / "concurrent-test" concurrent_dir.mkdir(exist_ok=True) try: # Define concurrent operations async def create_file(index: int) -> Dict[str, Any]: file_path = concurrent_dir / f"concurrent-{index}.txt" result = await self.execute_mcp_operation( "create_file", { "path": str(file_path), "content": f"Concurrent file {index} - {datetime.now().isoformat()}" } ) if result["success"]: self.test_files.append(file_path) return result # Execute concurrent operations tasks = [create_file(i) for i in range(10)] results = await asyncio.gather(*tasks, return_exceptions=True) # Count successes successes = sum(1 for r in results if isinstance(r, dict) and r.get("success")) # Verify all files exist on host host_files = list(concurrent_dir.glob("concurrent-*.txt")) host_count = len(host_files) # Read all files through MCP to verify consistency read_tasks = [] for f in host_files: read_tasks.append(self.execute_mcp_operation( "read_file", {"path": str(f)} )) read_results = await asyncio.gather(*read_tasks, return_exceptions=True) read_successes = sum(1 for r in read_results if isinstance(r, dict) and r.get("success")) return { "test": test_name, "operations_attempted": 10, "mcp_successes": successes, "host_files_found": host_count, "mcp_reads_successful": read_successes, "passed": successes == 10 and host_count == 10 and read_successes == 10, "details": { "consistency": successes == host_count == read_successes, "concurrent_dir": str(concurrent_dir) } } except Exception as e: logger.error(f"Test {test_name} failed: {e}") return { "test": test_name, "passed": False, "error": str(e) } async def _test_permission_handling(self) -> Dict[str, Any]: """Test file permission handling and security.""" test_name = "permission_handling" perm_file = self.test_dir / "permission-test-file.txt" try: # Create file with specific permissions through MCP mcp_result = await self.execute_mcp_operation( "create_file", { "path": str(perm_file), "content": "Permission test", "mode": "w", "permissions": 0o600 # Read/write for owner only } ) if perm_file.exists(): self.test_files.append(perm_file) # Check permissions on host permissions_correct = False if perm_file.exists(): stat = perm_file.stat() # Check if permissions match (considering umask) permissions_correct = (stat.st_mode & 0o777) <= 0o600 # Try to modify permissions through MCP chmod_result = await self.execute_mcp_operation( "change_permissions", { "path": str(perm_file), "mode": 0o644 } ) # Verify permission change permission_changed = False if chmod_result["success"] and perm_file.exists(): new_stat = perm_file.stat() permission_changed = (new_stat.st_mode & 0o777) != (stat.st_mode & 0o777) # Test restricted path access (should fail) restricted_result = await self.execute_mcp_operation( "create_file", { "path": "/etc/test-file-should-fail.txt", "content": "This should fail" } ) restricted_blocked = not restricted_result["success"] return { "test": test_name, "file_created": mcp_result["success"], "permissions_set": permissions_correct, "permission_change": chmod_result["success"] and permission_changed, "restricted_access_blocked": restricted_blocked, "passed": all([ mcp_result["success"], permissions_correct, restricted_blocked ]), "details": { "test_file": str(perm_file), "security_validated": restricted_blocked } } except Exception as e: logger.error(f"Test {test_name} failed: {e}") return { "test": test_name, "passed": False, "error": str(e) } async def _test_large_file_handling(self) -> Dict[str, Any]: """Test large file operations and streaming.""" test_name = "large_file_handling" large_file = self.test_dir / "large-test-file.bin" try: # Create large file (10MB) through MCP size_mb = 10 chunk_size = 1024 * 1024 # 1MB chunks # Start file creation create_result = await self.execute_mcp_operation( "create_file", { "path": str(large_file), "content": "", "mode": "w" } ) if not create_result["success"]: return { "test": test_name, "passed": False, "error": "Failed to create large file" } # Append data in chunks bytes_written = 0 for i in range(size_mb): chunk_data = f"Chunk {i} " * (chunk_size // 10) # Repeated text append_result = await self.execute_mcp_operation( "append_file", { "path": str(large_file), "content": chunk_data } ) if append_result["success"]: bytes_written += len(chunk_data) if large_file.exists(): self.test_files.append(large_file) # Verify file size on host actual_size = large_file.stat().st_size if large_file.exists() else 0 size_correct = abs(actual_size - bytes_written) < 1024 # Allow small difference # Test streaming read through MCP stream_result = await self.execute_mcp_operation( "stream_file", { "path": str(large_file), "chunk_size": chunk_size } ) return { "test": test_name, "file_created": create_result["success"], "bytes_written": bytes_written, "actual_size": actual_size, "size_correct": size_correct, "streaming_supported": stream_result["success"], "passed": create_result["success"] and size_correct, "details": { "target_size_mb": size_mb, "actual_size_mb": actual_size / (1024 * 1024) } } except Exception as e: logger.error(f"Test {test_name} failed: {e}") return { "test": test_name, "passed": False, "error": str(e) } async def _test_special_files(self) -> Dict[str, Any]: """Test symbolic links and special file handling.""" test_name = "special_files" try: # Create regular file target_file = self.test_dir / "symlink-target.txt" target_file.write_text("Symlink target content") self.test_files.append(target_file) # Create symlink through MCP symlink_path = self.test_dir / "test-symlink.txt" symlink_result = await self.execute_mcp_operation( "create_symlink", { "target": str(target_file), "link": str(symlink_path) } ) # Verify symlink on host symlink_valid = symlink_path.is_symlink() if symlink_path.exists() else False if symlink_valid: self.test_files.append(symlink_path) # Read through symlink via MCP read_result = await self.execute_mcp_operation( "read_file", { "path": str(symlink_path), "follow_symlinks": True } ) symlink_readable = ( read_result["success"] and read_result.get("result", {}).get("content") == "Symlink target content" ) # Test hidden file handling hidden_file = self.test_dir / ".hidden-test-file" hidden_result = await self.execute_mcp_operation( "create_file", { "path": str(hidden_file), "content": "Hidden file content" } ) if hidden_file.exists(): self.test_files.append(hidden_file) # List directory with hidden files list_result = await self.execute_mcp_operation( "list_directory", { "path": str(self.test_dir), "show_hidden": True } ) hidden_visible = False if list_result["success"]: files = list_result.get("result", {}).get("files", []) hidden_visible = any(".hidden-test-file" in f for f in files) return { "test": test_name, "symlink_created": symlink_result["success"] or symlink_valid, "symlink_valid": symlink_valid, "symlink_readable": symlink_readable, "hidden_file_created": hidden_result["success"], "hidden_file_visible": hidden_visible, "passed": all([ symlink_valid or not symlink_result["success"], # May not support symlinks hidden_result["success"], hidden_visible ]), "details": { "symlink_support": symlink_valid, "hidden_file_support": hidden_visible } } except Exception as e: logger.error(f"Test {test_name} failed: {e}") return { "test": test_name, "passed": False, "error": str(e) } async def validate_system_state(self) -> bool: """Validate system state after file system tests.""" try: # Check for orphaned test files outside test directory orphaned_files = [] for f in self.test_files: if f.exists() and not str(f).startswith(str(self.test_base_dir)): orphaned_files.append(str(f)) if orphaned_files: logger.warning(f"Found orphaned files: {orphaned_files}") return False # Verify test directory is not excessively large total_size = sum( f.stat().st_size for f in self.test_dir.rglob("*") if f.is_file() ) # Should be under 100MB for all tests if total_size > 100 * 1024 * 1024: logger.warning(f"Test directory too large: {total_size / (1024*1024):.2f}MB") return False return True except Exception as e: logger.error(f"System state validation failed: {e}") return False async def cleanup(self): """Clean up test files and directories.""" logger.info("Cleaning up file system test artifacts") # Remove test files for f in self.test_files: try: if f.exists(): if f.is_symlink() or f.is_file(): f.unlink() elif f.is_dir(): import shutil shutil.rmtree(f) except Exception as e: logger.warning(f"Failed to clean up {f}: {e}") # Clean test directory if self.test_dir.exists(): try: import shutil shutil.rmtree(self.test_dir) except Exception as e: logger.warning(f"Failed to clean test directory: {e}") async def main(): """Run the file system test agent.""" agent = FileSystemAgent() result = await agent.run() # Print summary print("\n" + "="*60) print("File System Test Agent Results") print("="*60) print(json.dumps(result, indent=2)) # Exit with appropriate code sys.exit(0 if result.get("summary", {}).get("status") == "PASSED" else 1) if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/krzemienski/shannon-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server