interactive_test.py•16 kB
#!/usr/bin/env python3
"""Interactive test script to explore your file system with the MCP server."""
import asyncio
import sys
import os
from pathlib import Path
# Add src to Python path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
from file_system_mcp_server.config import Config
from file_system_mcp_server.safety import SafetyManager
from file_system_mcp_server.file_operations import FileOperationsManager
class InteractiveFileSystemTester:
"""Interactive tester for the File System MCP Server."""
def __init__(self):
"""Initialize the tester."""
current_dir = Path.cwd()
self.config = Config(
backup_directory=str(current_dir / ".mcp_backups"),
max_file_size=10 * 1024 * 1024, # 10MB
max_recursion_depth=5,
protected_paths=["/etc", "/usr", "/bin", "/System", "/Windows"],
enable_backups=True,
log_level="INFO"
)
self.safety_manager = SafetyManager(self.config)
self.file_ops = FileOperationsManager(self.config, self.safety_manager)
self.current_path = current_dir
def print_menu(self):
"""Print the interactive menu."""
print(f"\n🗂️ File System MCP Server - Interactive Test")
print(f"📁 Current path: {self.current_path}")
print(f"💾 Backup directory: {self.config.backup_directory}")
print("=" * 60)
print("1. 📋 List current directory")
print("2. 📖 Read a file")
print("3. 📝 Create a file")
print("4. 🔄 Update a file")
print("5. 🗑️ Delete a file")
print("6. ℹ️ Get file info")
print("7. 🔍 Search with pattern")
print("8. 📂 Change directory")
print("9. 🔒 Test security (try protected path)")
print("10. 💾 View backups")
print("11. 🧹 Clean up backups")
print("0. ❌ Exit")
print("=" * 60)
async def list_directory(self):
"""List current directory."""
print(f"\n📋 Listing directory: {self.current_path}")
recursive = input("🔄 Recursive listing? (y/N): ").lower().startswith('y')
pattern = input("🔍 Pattern filter (e.g., *.py) or Enter for all: ").strip()
pattern = pattern if pattern else None
result = await self.file_ops.list_directory(
str(self.current_path),
pattern=pattern,
recursive=recursive
)
if result.success:
print(f"✅ Found {result.total_entries} total items, showing {len(result.entries)}")
if pattern:
print(f"🔍 Pattern: {pattern}")
if recursive:
print(f"🔄 Recursive: Yes (max depth reached: {result.max_depth_reached})")
print("\n📄 Items:")
for i, entry in enumerate(result.entries[:20], 1): # Show first 20
icon = "📁" if entry.metadata.is_directory else "📄"
size = f"({entry.metadata.size} bytes)" if not entry.metadata.is_directory else ""
print(f" {i:2d}. {icon} {entry.name} {size}")
if len(result.entries) > 20:
print(f" ... and {len(result.entries) - 20} more items")
else:
print(f"❌ Error: {result.error.message}")
async def read_file(self):
"""Read a file."""
filename = input("\n📖 Enter filename to read: ").strip()
if not filename:
return
file_path = self.current_path / filename
result = await self.file_ops.read_file(str(file_path))
if result.success:
print(f"✅ Successfully read {filename}")
print(f"📊 Size: {result.metadata.size} bytes")
print(f"🏷️ MIME type: {result.metadata.mime_type}")
print(f"🕒 Modified: {result.metadata.modified_time}")
if result.is_binary:
print("🔒 Binary file - content not displayed")
else:
print(f"📝 Encoding: {result.encoding}")
content = result.content
if len(content) > 500:
print(f"📄 Content (first 500 chars):\n{content[:500]}...")
show_more = input("\n📖 Show full content? (y/N): ").lower().startswith('y')
if show_more:
print(f"\n📄 Full content:\n{content}")
else:
print(f"📄 Content:\n{content}")
else:
print(f"❌ Error: {result.error.message}")
async def create_file(self):
"""Create a new file."""
filename = input("\n📝 Enter filename to create: ").strip()
if not filename:
return
print("📝 Enter file content (type 'END' on a new line to finish):")
content_lines = []
while True:
line = input()
if line.strip() == 'END':
break
content_lines.append(line)
content = '\n'.join(content_lines)
file_path = self.current_path / filename
overwrite = False
if file_path.exists():
overwrite = input(f"⚠️ File exists. Overwrite? (y/N): ").lower().startswith('y')
result = await self.file_ops.write_file(str(file_path), content, overwrite=overwrite)
if result.success:
print(f"✅ Successfully created {filename}")
print(f"📊 Size: {result.metadata.size} bytes")
if result.backup_created:
print(f"💾 Backup created: {Path(result.backup_created).name}")
else:
print(f"❌ Error: {result.error.message}")
async def update_file(self):
"""Update an existing file."""
filename = input("\n🔄 Enter filename to update: ").strip()
if not filename:
return
file_path = self.current_path / filename
# First read the current content
read_result = await self.file_ops.read_file(str(file_path))
if not read_result.success:
print(f"❌ Cannot read file: {read_result.error.message}")
return
if read_result.is_binary:
print("❌ Cannot update binary file")
return
print(f"📄 Current content:\n{read_result.content}")
print("\n📝 Enter new content (type 'END' on a new line to finish):")
content_lines = []
while True:
line = input()
if line.strip() == 'END':
break
content_lines.append(line)
new_content = '\n'.join(content_lines)
result = await self.file_ops.update_file(str(file_path), new_content)
if result.success:
print(f"✅ Successfully updated {filename}")
print(f"📊 New size: {result.metadata.size} bytes")
print(f"💾 Backup created: {Path(result.backup_created).name}")
else:
print(f"❌ Error: {result.error.message}")
if result.restored_from_backup:
print("🔄 File restored from backup")
async def delete_file(self):
"""Delete a file."""
filename = input("\n🗑️ Enter filename to delete: ").strip()
if not filename:
return
file_path = self.current_path / filename
confirm = input(f"⚠️ Delete {filename}? (y/N): ").lower().startswith('y')
if not confirm:
print("❌ Deletion cancelled")
return
result = await self.file_ops.delete_file(str(file_path))
if result.success:
print(f"✅ Successfully deleted {filename}")
print(f"💾 Moved to backup: {Path(result.backup_location).name}")
if result.was_directory:
print(f"📁 Directory with {result.files_deleted} files deleted")
else:
print(f"❌ Error: {result.error.message}")
async def get_file_info(self):
"""Get file information."""
filename = input("\nℹ️ Enter filename for info: ").strip()
if not filename:
return
file_path = self.current_path / filename
result = await self.file_ops.get_file_info(str(file_path))
if result.success:
if result.exists:
print(f"✅ File information for {filename}:")
m = result.metadata
print(f"📊 Size: {m.size:,} bytes")
print(f"📁 Is directory: {m.is_directory}")
print(f"🏷️ MIME type: {m.mime_type}")
print(f"🔐 Permissions: {m.permissions}")
print(f"🕒 Modified: {m.modified_time}")
print(f"🕒 Created: {m.created_time}")
print(f"🕒 Accessed: {m.accessed_time}")
if m.owner:
print(f"👤 Owner: {m.owner}")
if m.group:
print(f"👥 Group: {m.group}")
else:
print(f"❌ File does not exist: {filename}")
else:
print(f"❌ Error: {result.error.message}")
async def search_with_pattern(self):
"""Search with pattern."""
pattern = input("\n🔍 Enter search pattern (e.g., *.py, test_*): ").strip()
if not pattern:
return
recursive = input("🔄 Search recursively? (y/N): ").lower().startswith('y')
result = await self.file_ops.list_directory(
str(self.current_path),
pattern=pattern,
recursive=recursive
)
if result.success:
print(f"✅ Found {len(result.entries)} matches for '{pattern}'")
if recursive:
print("🔄 Searched recursively")
for i, entry in enumerate(result.entries[:15], 1): # Show first 15
icon = "📁" if entry.metadata.is_directory else "📄"
rel_path = Path(entry.path).relative_to(self.current_path)
size = f"({entry.metadata.size} bytes)" if not entry.metadata.is_directory else ""
print(f" {i:2d}. {icon} {rel_path} {size}")
if len(result.entries) > 15:
print(f" ... and {len(result.entries) - 15} more matches")
else:
print(f"❌ Error: {result.error.message}")
def change_directory(self):
"""Change current directory."""
new_path = input(f"\n📂 Enter new directory (current: {self.current_path}): ").strip()
if not new_path:
return
try:
if new_path == "..":
new_path = self.current_path.parent
elif new_path.startswith("/") or new_path.startswith("~"):
new_path = Path(new_path).expanduser()
else:
new_path = self.current_path / new_path
new_path = new_path.resolve()
if new_path.exists() and new_path.is_dir():
self.current_path = new_path
print(f"✅ Changed to: {self.current_path}")
else:
print(f"❌ Directory does not exist: {new_path}")
except Exception as e:
print(f"❌ Error: {e}")
async def test_security(self):
"""Test security features."""
print("\n🔒 Testing security features...")
protected_paths = ["/etc/passwd", "/usr/bin/ls", "C:\\Windows\\System32"]
for path in protected_paths:
if Path(path).exists():
print(f"\n🧪 Testing access to: {path}")
result = await self.file_ops.read_file(path)
if result.success:
print(f"⚠️ Security bypass! This shouldn't happen.")
else:
print(f"✅ Blocked: {result.error.message}")
break
else:
print("ℹ️ No protected paths found to test")
def view_backups(self):
"""View backup files."""
print(f"\n💾 Backup directory: {self.config.backup_directory}")
backup_dir = Path(self.config.backup_directory)
if not backup_dir.exists():
print("❌ No backup directory found")
return
backup_files = list(backup_dir.glob("*.backup"))
if not backup_files:
print("📭 No backup files found")
return
print(f"✅ Found {len(backup_files)} backup files:")
# Sort by modification time, newest first
backup_files.sort(key=lambda x: x.stat().st_mtime, reverse=True)
for i, backup in enumerate(backup_files[:10], 1): # Show first 10
stat_info = backup.stat()
size = stat_info.st_size
from datetime import datetime
mod_time = datetime.fromtimestamp(stat_info.st_mtime).strftime("%Y-%m-%d %H:%M:%S")
print(f" {i:2d}. {backup.name} ({size} bytes, {mod_time})")
if len(backup_files) > 10:
print(f" ... and {len(backup_files) - 10} more backups")
def cleanup_backups(self):
"""Clean up old backup files."""
days = input("\n🧹 Delete backups older than how many days? (default: 7): ").strip()
try:
days = int(days) if days else 7
except ValueError:
print("❌ Invalid number")
return
confirm = input(f"⚠️ Delete backups older than {days} days? (y/N): ").lower().startswith('y')
if not confirm:
print("❌ Cleanup cancelled")
return
cleaned = self.safety_manager.cleanup_old_backups(days)
print(f"✅ Cleaned up {cleaned} old backup files")
async def run(self):
"""Run the interactive tester."""
print("🚀 File System MCP Server - Interactive Tester")
print("=" * 60)
print("✅ Server initialized successfully")
while True:
try:
self.print_menu()
choice = input("\n🎯 Enter your choice (0-11): ").strip()
if choice == '0':
print("👋 Goodbye!")
break
elif choice == '1':
await self.list_directory()
elif choice == '2':
await self.read_file()
elif choice == '3':
await self.create_file()
elif choice == '4':
await self.update_file()
elif choice == '5':
await self.delete_file()
elif choice == '6':
await self.get_file_info()
elif choice == '7':
await self.search_with_pattern()
elif choice == '8':
self.change_directory()
elif choice == '9':
await self.test_security()
elif choice == '10':
self.view_backups()
elif choice == '11':
self.cleanup_backups()
else:
print("❌ Invalid choice. Please try again.")
input("\n⏸️ Press Enter to continue...")
except KeyboardInterrupt:
print("\n👋 Goodbye!")
break
except Exception as e:
print(f"\n❌ Error: {e}")
input("⏸️ Press Enter to continue...")
if __name__ == "__main__":
tester = InteractiveFileSystemTester()
asyncio.run(tester.run())