Skip to main content
Glama

File System MCP Server

interactive_test.py16 kB
#!/usr/bin/env python3 """Interactive test script to explore your file system with the MCP server.""" import asyncio import sys import os from pathlib import Path # Add src to Python path sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src')) from file_system_mcp_server.config import Config from file_system_mcp_server.safety import SafetyManager from file_system_mcp_server.file_operations import FileOperationsManager class InteractiveFileSystemTester: """Interactive tester for the File System MCP Server.""" def __init__(self): """Initialize the tester.""" current_dir = Path.cwd() self.config = Config( backup_directory=str(current_dir / ".mcp_backups"), max_file_size=10 * 1024 * 1024, # 10MB max_recursion_depth=5, protected_paths=["/etc", "/usr", "/bin", "/System", "/Windows"], enable_backups=True, log_level="INFO" ) self.safety_manager = SafetyManager(self.config) self.file_ops = FileOperationsManager(self.config, self.safety_manager) self.current_path = current_dir def print_menu(self): """Print the interactive menu.""" print(f"\n🗂️ File System MCP Server - Interactive Test") print(f"📁 Current path: {self.current_path}") print(f"💾 Backup directory: {self.config.backup_directory}") print("=" * 60) print("1. 📋 List current directory") print("2. 📖 Read a file") print("3. 📝 Create a file") print("4. 🔄 Update a file") print("5. 🗑️ Delete a file") print("6. ℹ️ Get file info") print("7. 🔍 Search with pattern") print("8. 📂 Change directory") print("9. 🔒 Test security (try protected path)") print("10. 💾 View backups") print("11. 🧹 Clean up backups") print("0. ❌ Exit") print("=" * 60) async def list_directory(self): """List current directory.""" print(f"\n📋 Listing directory: {self.current_path}") recursive = input("🔄 Recursive listing? (y/N): ").lower().startswith('y') pattern = input("🔍 Pattern filter (e.g., *.py) or Enter for all: ").strip() pattern = pattern if pattern else None result = await self.file_ops.list_directory( str(self.current_path), pattern=pattern, recursive=recursive ) if result.success: print(f"✅ Found {result.total_entries} total items, showing {len(result.entries)}") if pattern: print(f"🔍 Pattern: {pattern}") if recursive: print(f"🔄 Recursive: Yes (max depth reached: {result.max_depth_reached})") print("\n📄 Items:") for i, entry in enumerate(result.entries[:20], 1): # Show first 20 icon = "📁" if entry.metadata.is_directory else "📄" size = f"({entry.metadata.size} bytes)" if not entry.metadata.is_directory else "" print(f" {i:2d}. {icon} {entry.name} {size}") if len(result.entries) > 20: print(f" ... and {len(result.entries) - 20} more items") else: print(f"❌ Error: {result.error.message}") async def read_file(self): """Read a file.""" filename = input("\n📖 Enter filename to read: ").strip() if not filename: return file_path = self.current_path / filename result = await self.file_ops.read_file(str(file_path)) if result.success: print(f"✅ Successfully read {filename}") print(f"📊 Size: {result.metadata.size} bytes") print(f"🏷️ MIME type: {result.metadata.mime_type}") print(f"🕒 Modified: {result.metadata.modified_time}") if result.is_binary: print("🔒 Binary file - content not displayed") else: print(f"📝 Encoding: {result.encoding}") content = result.content if len(content) > 500: print(f"📄 Content (first 500 chars):\n{content[:500]}...") show_more = input("\n📖 Show full content? (y/N): ").lower().startswith('y') if show_more: print(f"\n📄 Full content:\n{content}") else: print(f"📄 Content:\n{content}") else: print(f"❌ Error: {result.error.message}") async def create_file(self): """Create a new file.""" filename = input("\n📝 Enter filename to create: ").strip() if not filename: return print("📝 Enter file content (type 'END' on a new line to finish):") content_lines = [] while True: line = input() if line.strip() == 'END': break content_lines.append(line) content = '\n'.join(content_lines) file_path = self.current_path / filename overwrite = False if file_path.exists(): overwrite = input(f"⚠️ File exists. Overwrite? (y/N): ").lower().startswith('y') result = await self.file_ops.write_file(str(file_path), content, overwrite=overwrite) if result.success: print(f"✅ Successfully created {filename}") print(f"📊 Size: {result.metadata.size} bytes") if result.backup_created: print(f"💾 Backup created: {Path(result.backup_created).name}") else: print(f"❌ Error: {result.error.message}") async def update_file(self): """Update an existing file.""" filename = input("\n🔄 Enter filename to update: ").strip() if not filename: return file_path = self.current_path / filename # First read the current content read_result = await self.file_ops.read_file(str(file_path)) if not read_result.success: print(f"❌ Cannot read file: {read_result.error.message}") return if read_result.is_binary: print("❌ Cannot update binary file") return print(f"📄 Current content:\n{read_result.content}") print("\n📝 Enter new content (type 'END' on a new line to finish):") content_lines = [] while True: line = input() if line.strip() == 'END': break content_lines.append(line) new_content = '\n'.join(content_lines) result = await self.file_ops.update_file(str(file_path), new_content) if result.success: print(f"✅ Successfully updated {filename}") print(f"📊 New size: {result.metadata.size} bytes") print(f"💾 Backup created: {Path(result.backup_created).name}") else: print(f"❌ Error: {result.error.message}") if result.restored_from_backup: print("🔄 File restored from backup") async def delete_file(self): """Delete a file.""" filename = input("\n🗑️ Enter filename to delete: ").strip() if not filename: return file_path = self.current_path / filename confirm = input(f"⚠️ Delete {filename}? (y/N): ").lower().startswith('y') if not confirm: print("❌ Deletion cancelled") return result = await self.file_ops.delete_file(str(file_path)) if result.success: print(f"✅ Successfully deleted {filename}") print(f"💾 Moved to backup: {Path(result.backup_location).name}") if result.was_directory: print(f"📁 Directory with {result.files_deleted} files deleted") else: print(f"❌ Error: {result.error.message}") async def get_file_info(self): """Get file information.""" filename = input("\nℹ️ Enter filename for info: ").strip() if not filename: return file_path = self.current_path / filename result = await self.file_ops.get_file_info(str(file_path)) if result.success: if result.exists: print(f"✅ File information for {filename}:") m = result.metadata print(f"📊 Size: {m.size:,} bytes") print(f"📁 Is directory: {m.is_directory}") print(f"🏷️ MIME type: {m.mime_type}") print(f"🔐 Permissions: {m.permissions}") print(f"🕒 Modified: {m.modified_time}") print(f"🕒 Created: {m.created_time}") print(f"🕒 Accessed: {m.accessed_time}") if m.owner: print(f"👤 Owner: {m.owner}") if m.group: print(f"👥 Group: {m.group}") else: print(f"❌ File does not exist: {filename}") else: print(f"❌ Error: {result.error.message}") async def search_with_pattern(self): """Search with pattern.""" pattern = input("\n🔍 Enter search pattern (e.g., *.py, test_*): ").strip() if not pattern: return recursive = input("🔄 Search recursively? (y/N): ").lower().startswith('y') result = await self.file_ops.list_directory( str(self.current_path), pattern=pattern, recursive=recursive ) if result.success: print(f"✅ Found {len(result.entries)} matches for '{pattern}'") if recursive: print("🔄 Searched recursively") for i, entry in enumerate(result.entries[:15], 1): # Show first 15 icon = "📁" if entry.metadata.is_directory else "📄" rel_path = Path(entry.path).relative_to(self.current_path) size = f"({entry.metadata.size} bytes)" if not entry.metadata.is_directory else "" print(f" {i:2d}. {icon} {rel_path} {size}") if len(result.entries) > 15: print(f" ... and {len(result.entries) - 15} more matches") else: print(f"❌ Error: {result.error.message}") def change_directory(self): """Change current directory.""" new_path = input(f"\n📂 Enter new directory (current: {self.current_path}): ").strip() if not new_path: return try: if new_path == "..": new_path = self.current_path.parent elif new_path.startswith("/") or new_path.startswith("~"): new_path = Path(new_path).expanduser() else: new_path = self.current_path / new_path new_path = new_path.resolve() if new_path.exists() and new_path.is_dir(): self.current_path = new_path print(f"✅ Changed to: {self.current_path}") else: print(f"❌ Directory does not exist: {new_path}") except Exception as e: print(f"❌ Error: {e}") async def test_security(self): """Test security features.""" print("\n🔒 Testing security features...") protected_paths = ["/etc/passwd", "/usr/bin/ls", "C:\\Windows\\System32"] for path in protected_paths: if Path(path).exists(): print(f"\n🧪 Testing access to: {path}") result = await self.file_ops.read_file(path) if result.success: print(f"⚠️ Security bypass! This shouldn't happen.") else: print(f"✅ Blocked: {result.error.message}") break else: print("ℹ️ No protected paths found to test") def view_backups(self): """View backup files.""" print(f"\n💾 Backup directory: {self.config.backup_directory}") backup_dir = Path(self.config.backup_directory) if not backup_dir.exists(): print("❌ No backup directory found") return backup_files = list(backup_dir.glob("*.backup")) if not backup_files: print("📭 No backup files found") return print(f"✅ Found {len(backup_files)} backup files:") # Sort by modification time, newest first backup_files.sort(key=lambda x: x.stat().st_mtime, reverse=True) for i, backup in enumerate(backup_files[:10], 1): # Show first 10 stat_info = backup.stat() size = stat_info.st_size from datetime import datetime mod_time = datetime.fromtimestamp(stat_info.st_mtime).strftime("%Y-%m-%d %H:%M:%S") print(f" {i:2d}. {backup.name} ({size} bytes, {mod_time})") if len(backup_files) > 10: print(f" ... and {len(backup_files) - 10} more backups") def cleanup_backups(self): """Clean up old backup files.""" days = input("\n🧹 Delete backups older than how many days? (default: 7): ").strip() try: days = int(days) if days else 7 except ValueError: print("❌ Invalid number") return confirm = input(f"⚠️ Delete backups older than {days} days? (y/N): ").lower().startswith('y') if not confirm: print("❌ Cleanup cancelled") return cleaned = self.safety_manager.cleanup_old_backups(days) print(f"✅ Cleaned up {cleaned} old backup files") async def run(self): """Run the interactive tester.""" print("🚀 File System MCP Server - Interactive Tester") print("=" * 60) print("✅ Server initialized successfully") while True: try: self.print_menu() choice = input("\n🎯 Enter your choice (0-11): ").strip() if choice == '0': print("👋 Goodbye!") break elif choice == '1': await self.list_directory() elif choice == '2': await self.read_file() elif choice == '3': await self.create_file() elif choice == '4': await self.update_file() elif choice == '5': await self.delete_file() elif choice == '6': await self.get_file_info() elif choice == '7': await self.search_with_pattern() elif choice == '8': self.change_directory() elif choice == '9': await self.test_security() elif choice == '10': self.view_backups() elif choice == '11': self.cleanup_backups() else: print("❌ Invalid choice. Please try again.") input("\n⏸️ Press Enter to continue...") except KeyboardInterrupt: print("\n👋 Goodbye!") break except Exception as e: print(f"\n❌ Error: {e}") input("⏸️ Press Enter to continue...") if __name__ == "__main__": tester = InteractiveFileSystemTester() asyncio.run(tester.run())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/chidvilas1234/Project-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server