test_real_filesystem.py•11.1 kB
#!/usr/bin/env python3
"""Test the File System MCP Server with your real file system."""
import asyncio
import json
import sys
import os
from pathlib import Path
# Add src to Python path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
from file_system_mcp_server.config import Config
from file_system_mcp_server.safety import SafetyManager
from file_system_mcp_server.file_operations import FileOperationsManager
async def test_with_real_filesystem():
"""Test the server with your actual file system."""
print("🚀 Testing File System MCP Server with Real File System")
print("=" * 60)
# Get current directory
current_dir = Path.cwd()
print(f"📁 Current directory: {current_dir}")
# Create configuration for testing
config = Config(
backup_directory=str(current_dir / ".mcp_backups"),
max_file_size=5 * 1024 * 1024, # 5MB limit
max_recursion_depth=3, # Limit recursion for safety
protected_paths=["/etc", "/usr", "/bin", "/System", "/Windows"],
enable_backups=True,
log_level="INFO"
)
# Initialize components
safety_manager = SafetyManager(config)
file_ops = FileOperationsManager(config, safety_manager)
print("✅ Server components initialized")
print(f"🔒 Backup directory: {config.backup_directory}")
# Test 1: List current directory
print("\n📋 Test 1: List current directory")
list_result = await file_ops.list_directory(str(current_dir))
if list_result.success:
print(f"✅ Found {list_result.total_entries} items in current directory")
print("📄 Files and directories:")
for entry in list_result.entries[:10]: # Show first 10 items
size_info = f"({entry.metadata.size} bytes)" if not entry.metadata.is_directory else "(directory)"
print(f" • {entry.name} {size_info}")
if len(list_result.entries) > 10:
print(f" ... and {len(list_result.entries) - 10} more items")
else:
print(f"❌ Failed to list directory: {list_result.error.message}")
# Test 2: Read an existing file (README.md if it exists)
readme_path = current_dir / "README.md"
if readme_path.exists():
print(f"\n📖 Test 2: Read {readme_path.name}")
read_result = await file_ops.read_file(str(readme_path))
if read_result.success:
content_preview = read_result.content[:200] + "..." if len(read_result.content) > 200 else read_result.content
print(f"✅ Successfully read {readme_path.name}")
print(f"📊 File size: {read_result.metadata.size} bytes")
print(f"🕒 Modified: {read_result.metadata.modified_time}")
print(f"📝 Content preview:\n{content_preview}")
else:
print(f"❌ Failed to read file: {read_result.error.message}")
else:
print("\n📖 Test 2: README.md not found, skipping read test")
# Test 3: Create a test file
test_file = current_dir / "mcp_test_file.txt"
print(f"\n📝 Test 3: Create test file {test_file.name}")
test_content = f"""# MCP Server Test File
Created by File System MCP Server
Current directory: {current_dir}
Timestamp: {asyncio.get_event_loop().time()}
This file demonstrates the MCP server's file creation capabilities.
"""
write_result = await file_ops.write_file(str(test_file), test_content)
if write_result.success:
print(f"✅ Successfully created {test_file.name}")
print(f"📊 File size: {write_result.metadata.size} bytes")
print(f"💾 Backup created: {write_result.backup_created or 'None (new file)'}")
else:
print(f"❌ Failed to create file: {write_result.error.message}")
# Test 4: Update the test file
if write_result.success:
print(f"\n🔄 Test 4: Update test file")
updated_content = test_content + "\n\n--- UPDATED ---\nThis line was added by the update operation.\n"
update_result = await file_ops.update_file(str(test_file), updated_content)
if update_result.success:
print(f"✅ Successfully updated {test_file.name}")
print(f"💾 Backup created: {update_result.backup_created}")
print(f"📊 New size: {update_result.metadata.size} bytes")
else:
print(f"❌ Failed to update file: {update_result.error.message}")
# Test 5: Get file info
if test_file.exists():
print(f"\nℹ️ Test 5: Get file info for {test_file.name}")
info_result = await file_ops.get_file_info(str(test_file))
if info_result.success:
print(f"✅ File exists: {info_result.exists}")
if info_result.metadata:
print(f"📊 Size: {info_result.metadata.size} bytes")
print(f"🕒 Modified: {info_result.metadata.modified_time}")
print(f"🔐 Permissions: {info_result.metadata.permissions}")
print(f"📁 Is directory: {info_result.metadata.is_directory}")
print(f"🏷️ MIME type: {info_result.metadata.mime_type}")
else:
print(f"❌ Failed to get file info: {info_result.error.message}")
# Test 6: List with pattern matching
print(f"\n🔍 Test 6: List Python files (*.py pattern)")
pattern_result = await file_ops.list_directory(str(current_dir), pattern="*.py")
if pattern_result.success:
py_files = [entry for entry in pattern_result.entries if entry.name.endswith('.py')]
print(f"✅ Found {len(py_files)} Python files")
for entry in py_files[:5]: # Show first 5
print(f" • {entry.name} ({entry.metadata.size} bytes)")
if len(py_files) > 5:
print(f" ... and {len(py_files) - 5} more Python files")
else:
print(f"❌ Failed to list with pattern: {pattern_result.error.message}")
# Test 7: Test security (try to access protected path)
print(f"\n🔒 Test 7: Test security protection")
protected_result = await file_ops.read_file("/etc/passwd")
if not protected_result.success:
print(f"✅ Security working: {protected_result.error.message}")
else:
print(f"⚠️ Security bypass detected - this shouldn't happen!")
# Test 8: List backups created
print(f"\n💾 Test 8: Check backup system")
if test_file.exists():
backup_info = safety_manager.get_backup_info(str(test_file))
print(f"✅ Found {len(backup_info)} backup(s) for {test_file.name}")
for i, backup in enumerate(backup_info[:3]): # Show first 3 backups
print(f" {i+1}. {Path(backup['path']).name} (created: {backup['created'][:19]})")
# Test 9: Clean up (delete test file)
print(f"\n🗑️ Test 9: Clean up test file")
if test_file.exists():
delete_result = await file_ops.delete_file(str(test_file))
if delete_result.success:
print(f"✅ Successfully deleted {test_file.name}")
print(f"💾 Moved to backup: {delete_result.backup_location}")
print(f"📊 Files deleted: {delete_result.files_deleted}")
else:
print(f"❌ Failed to delete file: {delete_result.error.message}")
# Summary
print(f"\n" + "=" * 60)
print("🎉 Real File System Test Complete!")
print(f"📁 Tested in: {current_dir}")
print(f"💾 Backups stored in: {config.backup_directory}")
print(f"🔒 Protected paths: {len(config.protected_paths)} paths")
# Show backup directory contents
backup_dir = Path(config.backup_directory)
if backup_dir.exists():
backup_files = list(backup_dir.glob("*.backup"))
print(f"💾 Total backups created: {len(backup_files)}")
if backup_files:
print("📋 Recent backups:")
for backup in sorted(backup_files, key=lambda x: x.stat().st_mtime, reverse=True)[:3]:
print(f" • {backup.name}")
async def test_mcp_tools_simulation():
"""Simulate MCP tool calls with real file system."""
print(f"\n🛠️ MCP Tools Simulation")
print("=" * 40)
current_dir = Path.cwd()
config = Config(
backup_directory=str(current_dir / ".mcp_backups"),
max_file_size=5 * 1024 * 1024,
enable_backups=True
)
safety_manager = SafetyManager(config)
file_ops = FileOperationsManager(config, safety_manager)
# Simulate tool calls like an MCP client would make
tools_to_test = [
("list_directory", {"path": str(current_dir), "pattern": "*.md"}),
("get_file_info", {"path": str(current_dir / "README.md")}),
("list_directory", {"path": str(current_dir), "recursive": False}),
]
for tool_name, args in tools_to_test:
print(f"\n🔧 Tool: {tool_name}")
print(f"📥 Args: {args}")
try:
if tool_name == "list_directory":
result = await file_ops.list_directory(
args["path"],
args.get("pattern"),
args.get("recursive", False)
)
elif tool_name == "get_file_info":
result = await file_ops.get_file_info(args["path"])
elif tool_name == "read_file":
result = await file_ops.read_file(args["path"])
# Convert result to dict (like MCP server would)
result_dict = result.to_dict()
print(f"📤 Success: {result_dict['success']}")
if result_dict['success']:
if 'entries' in result_dict:
print(f"📊 Found {len(result_dict['entries'])} entries")
elif 'metadata' in result_dict and result_dict['metadata']:
print(f"📊 File size: {result_dict['metadata']['size']} bytes")
elif 'content' in result_dict:
content_len = len(result_dict['content']) if result_dict['content'] else 0
print(f"📊 Content length: {content_len} characters")
else:
print(f"❌ Error: {result_dict.get('error', {}).get('message', 'Unknown error')}")
except Exception as e:
print(f"❌ Exception: {e}")
if __name__ == "__main__":
print("🧪 File System MCP Server - Real File System Test")
try:
# Run the main test
asyncio.run(test_with_real_filesystem())
# Run MCP tools simulation
asyncio.run(test_mcp_tools_simulation())
print(f"\n✨ All tests completed!")
print(f"\n💡 Next steps:")
print(f" 1. Install MCP: pip install mcp python-magic")
print(f" 2. Run full server: python run_server.py --help")
print(f" 3. Add to Kiro MCP config for AI assistant integration")
except KeyboardInterrupt:
print(f"\n⏹️ Test interrupted by user")
except Exception as e:
print(f"\n❌ Test failed: {e}")
import traceback
traceback.print_exc()
sys.exit(1)