#!/usr/bin/env python3
"""Interactive test script to explore your file system with the MCP server."""
import asyncio
import sys
import os
from pathlib import Path
# Add src to Python path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
from file_system_mcp_server.config import Config
from file_system_mcp_server.safety import SafetyManager
from file_system_mcp_server.file_operations import FileOperationsManager
class InteractiveFileSystemTester:
"""Interactive tester for the File System MCP Server."""
def __init__(self):
"""Initialize the tester."""
current_dir = Path.cwd()
self.config = Config(
backup_directory=str(current_dir / ".mcp_backups"),
max_file_size=10 * 1024 * 1024, # 10MB
max_recursion_depth=5,
protected_paths=["/etc", "/usr", "/bin", "/System", "/Windows"],
enable_backups=True,
log_level="INFO"
)
self.safety_manager = SafetyManager(self.config)
self.file_ops = FileOperationsManager(self.config, self.safety_manager)
self.current_path = current_dir
def print_menu(self):
"""Print the interactive menu."""
print(f"\nποΈ File System MCP Server - Interactive Test")
print(f"π Current path: {self.current_path}")
print(f"πΎ Backup directory: {self.config.backup_directory}")
print("=" * 60)
print("1. π List current directory")
print("2. π Read a file")
print("3. π Create a file")
print("4. π Update a file")
print("5. ποΈ Delete a file")
print("6. βΉοΈ Get file info")
print("7. π Search with pattern")
print("8. π Change directory")
print("9. π Test security (try protected path)")
print("10. πΎ View backups")
print("11. π§Ή Clean up backups")
print("0. β Exit")
print("=" * 60)
async def list_directory(self):
"""List current directory."""
print(f"\nπ Listing directory: {self.current_path}")
recursive = input("π Recursive listing? (y/N): ").lower().startswith('y')
pattern = input("π Pattern filter (e.g., *.py) or Enter for all: ").strip()
pattern = pattern if pattern else None
result = await self.file_ops.list_directory(
str(self.current_path),
pattern=pattern,
recursive=recursive
)
if result.success:
print(f"β
Found {result.total_entries} total items, showing {len(result.entries)}")
if pattern:
print(f"π Pattern: {pattern}")
if recursive:
print(f"π Recursive: Yes (max depth reached: {result.max_depth_reached})")
print("\nπ Items:")
for i, entry in enumerate(result.entries[:20], 1): # Show first 20
icon = "π" if entry.metadata.is_directory else "π"
size = f"({entry.metadata.size} bytes)" if not entry.metadata.is_directory else ""
print(f" {i:2d}. {icon} {entry.name} {size}")
if len(result.entries) > 20:
print(f" ... and {len(result.entries) - 20} more items")
else:
print(f"β Error: {result.error.message}")
async def read_file(self):
"""Read a file."""
filename = input("\nπ Enter filename to read: ").strip()
if not filename:
return
file_path = self.current_path / filename
result = await self.file_ops.read_file(str(file_path))
if result.success:
print(f"β
Successfully read {filename}")
print(f"π Size: {result.metadata.size} bytes")
print(f"π·οΈ MIME type: {result.metadata.mime_type}")
print(f"π Modified: {result.metadata.modified_time}")
if result.is_binary:
print("π Binary file - content not displayed")
else:
print(f"π Encoding: {result.encoding}")
content = result.content
if len(content) > 500:
print(f"π Content (first 500 chars):\n{content[:500]}...")
show_more = input("\nπ Show full content? (y/N): ").lower().startswith('y')
if show_more:
print(f"\nπ Full content:\n{content}")
else:
print(f"π Content:\n{content}")
else:
print(f"β Error: {result.error.message}")
async def create_file(self):
"""Create a new file."""
filename = input("\nπ Enter filename to create: ").strip()
if not filename:
return
print("π Enter file content (type 'END' on a new line to finish):")
content_lines = []
while True:
line = input()
if line.strip() == 'END':
break
content_lines.append(line)
content = '\n'.join(content_lines)
file_path = self.current_path / filename
overwrite = False
if file_path.exists():
overwrite = input(f"β οΈ File exists. Overwrite? (y/N): ").lower().startswith('y')
result = await self.file_ops.write_file(str(file_path), content, overwrite=overwrite)
if result.success:
print(f"β
Successfully created {filename}")
print(f"π Size: {result.metadata.size} bytes")
if result.backup_created:
print(f"πΎ Backup created: {Path(result.backup_created).name}")
else:
print(f"β Error: {result.error.message}")
async def update_file(self):
"""Update an existing file."""
filename = input("\nπ Enter filename to update: ").strip()
if not filename:
return
file_path = self.current_path / filename
# First read the current content
read_result = await self.file_ops.read_file(str(file_path))
if not read_result.success:
print(f"β Cannot read file: {read_result.error.message}")
return
if read_result.is_binary:
print("β Cannot update binary file")
return
print(f"π Current content:\n{read_result.content}")
print("\nπ Enter new content (type 'END' on a new line to finish):")
content_lines = []
while True:
line = input()
if line.strip() == 'END':
break
content_lines.append(line)
new_content = '\n'.join(content_lines)
result = await self.file_ops.update_file(str(file_path), new_content)
if result.success:
print(f"β
Successfully updated {filename}")
print(f"π New size: {result.metadata.size} bytes")
print(f"πΎ Backup created: {Path(result.backup_created).name}")
else:
print(f"β Error: {result.error.message}")
if result.restored_from_backup:
print("π File restored from backup")
async def delete_file(self):
"""Delete a file."""
filename = input("\nποΈ Enter filename to delete: ").strip()
if not filename:
return
file_path = self.current_path / filename
confirm = input(f"β οΈ Delete {filename}? (y/N): ").lower().startswith('y')
if not confirm:
print("β Deletion cancelled")
return
result = await self.file_ops.delete_file(str(file_path))
if result.success:
print(f"β
Successfully deleted {filename}")
print(f"πΎ Moved to backup: {Path(result.backup_location).name}")
if result.was_directory:
print(f"π Directory with {result.files_deleted} files deleted")
else:
print(f"β Error: {result.error.message}")
async def get_file_info(self):
"""Get file information."""
filename = input("\nβΉοΈ Enter filename for info: ").strip()
if not filename:
return
file_path = self.current_path / filename
result = await self.file_ops.get_file_info(str(file_path))
if result.success:
if result.exists:
print(f"β
File information for {filename}:")
m = result.metadata
print(f"π Size: {m.size:,} bytes")
print(f"π Is directory: {m.is_directory}")
print(f"π·οΈ MIME type: {m.mime_type}")
print(f"π Permissions: {m.permissions}")
print(f"π Modified: {m.modified_time}")
print(f"π Created: {m.created_time}")
print(f"π Accessed: {m.accessed_time}")
if m.owner:
print(f"π€ Owner: {m.owner}")
if m.group:
print(f"π₯ Group: {m.group}")
else:
print(f"β File does not exist: {filename}")
else:
print(f"β Error: {result.error.message}")
async def search_with_pattern(self):
"""Search with pattern."""
pattern = input("\nπ Enter search pattern (e.g., *.py, test_*): ").strip()
if not pattern:
return
recursive = input("π Search recursively? (y/N): ").lower().startswith('y')
result = await self.file_ops.list_directory(
str(self.current_path),
pattern=pattern,
recursive=recursive
)
if result.success:
print(f"β
Found {len(result.entries)} matches for '{pattern}'")
if recursive:
print("π Searched recursively")
for i, entry in enumerate(result.entries[:15], 1): # Show first 15
icon = "π" if entry.metadata.is_directory else "π"
rel_path = Path(entry.path).relative_to(self.current_path)
size = f"({entry.metadata.size} bytes)" if not entry.metadata.is_directory else ""
print(f" {i:2d}. {icon} {rel_path} {size}")
if len(result.entries) > 15:
print(f" ... and {len(result.entries) - 15} more matches")
else:
print(f"β Error: {result.error.message}")
def change_directory(self):
"""Change current directory."""
new_path = input(f"\nπ Enter new directory (current: {self.current_path}): ").strip()
if not new_path:
return
try:
if new_path == "..":
new_path = self.current_path.parent
elif new_path.startswith("/") or new_path.startswith("~"):
new_path = Path(new_path).expanduser()
else:
new_path = self.current_path / new_path
new_path = new_path.resolve()
if new_path.exists() and new_path.is_dir():
self.current_path = new_path
print(f"β
Changed to: {self.current_path}")
else:
print(f"β Directory does not exist: {new_path}")
except Exception as e:
print(f"β Error: {e}")
async def test_security(self):
"""Test security features."""
print("\nπ Testing security features...")
protected_paths = ["/etc/passwd", "/usr/bin/ls", "C:\\Windows\\System32"]
for path in protected_paths:
if Path(path).exists():
print(f"\nπ§ͺ Testing access to: {path}")
result = await self.file_ops.read_file(path)
if result.success:
print(f"β οΈ Security bypass! This shouldn't happen.")
else:
print(f"β
Blocked: {result.error.message}")
break
else:
print("βΉοΈ No protected paths found to test")
def view_backups(self):
"""View backup files."""
print(f"\nπΎ Backup directory: {self.config.backup_directory}")
backup_dir = Path(self.config.backup_directory)
if not backup_dir.exists():
print("β No backup directory found")
return
backup_files = list(backup_dir.glob("*.backup"))
if not backup_files:
print("π No backup files found")
return
print(f"β
Found {len(backup_files)} backup files:")
# Sort by modification time, newest first
backup_files.sort(key=lambda x: x.stat().st_mtime, reverse=True)
for i, backup in enumerate(backup_files[:10], 1): # Show first 10
stat_info = backup.stat()
size = stat_info.st_size
from datetime import datetime
mod_time = datetime.fromtimestamp(stat_info.st_mtime).strftime("%Y-%m-%d %H:%M:%S")
print(f" {i:2d}. {backup.name} ({size} bytes, {mod_time})")
if len(backup_files) > 10:
print(f" ... and {len(backup_files) - 10} more backups")
def cleanup_backups(self):
"""Clean up old backup files."""
days = input("\nπ§Ή Delete backups older than how many days? (default: 7): ").strip()
try:
days = int(days) if days else 7
except ValueError:
print("β Invalid number")
return
confirm = input(f"β οΈ Delete backups older than {days} days? (y/N): ").lower().startswith('y')
if not confirm:
print("β Cleanup cancelled")
return
cleaned = self.safety_manager.cleanup_old_backups(days)
print(f"β
Cleaned up {cleaned} old backup files")
async def run(self):
"""Run the interactive tester."""
print("π File System MCP Server - Interactive Tester")
print("=" * 60)
print("β
Server initialized successfully")
while True:
try:
self.print_menu()
choice = input("\nπ― Enter your choice (0-11): ").strip()
if choice == '0':
print("π Goodbye!")
break
elif choice == '1':
await self.list_directory()
elif choice == '2':
await self.read_file()
elif choice == '3':
await self.create_file()
elif choice == '4':
await self.update_file()
elif choice == '5':
await self.delete_file()
elif choice == '6':
await self.get_file_info()
elif choice == '7':
await self.search_with_pattern()
elif choice == '8':
self.change_directory()
elif choice == '9':
await self.test_security()
elif choice == '10':
self.view_backups()
elif choice == '11':
self.cleanup_backups()
else:
print("β Invalid choice. Please try again.")
input("\nβΈοΈ Press Enter to continue...")
except KeyboardInterrupt:
print("\nπ Goodbye!")
break
except Exception as e:
print(f"\nβ Error: {e}")
input("βΈοΈ Press Enter to continue...")
if __name__ == "__main__":
tester = InteractiveFileSystemTester()
asyncio.run(tester.run())