Skip to main content
Glama

File System MCP Server

test_real_filesystem.py11.1 kB
#!/usr/bin/env python3 """Test the File System MCP Server with your real file system.""" import asyncio import json import sys import os from pathlib import Path # Add src to Python path sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src')) from file_system_mcp_server.config import Config from file_system_mcp_server.safety import SafetyManager from file_system_mcp_server.file_operations import FileOperationsManager async def test_with_real_filesystem(): """Test the server with your actual file system.""" print("🚀 Testing File System MCP Server with Real File System") print("=" * 60) # Get current directory current_dir = Path.cwd() print(f"📁 Current directory: {current_dir}") # Create configuration for testing config = Config( backup_directory=str(current_dir / ".mcp_backups"), max_file_size=5 * 1024 * 1024, # 5MB limit max_recursion_depth=3, # Limit recursion for safety protected_paths=["/etc", "/usr", "/bin", "/System", "/Windows"], enable_backups=True, log_level="INFO" ) # Initialize components safety_manager = SafetyManager(config) file_ops = FileOperationsManager(config, safety_manager) print("✅ Server components initialized") print(f"🔒 Backup directory: {config.backup_directory}") # Test 1: List current directory print("\n📋 Test 1: List current directory") list_result = await file_ops.list_directory(str(current_dir)) if list_result.success: print(f"✅ Found {list_result.total_entries} items in current directory") print("📄 Files and directories:") for entry in list_result.entries[:10]: # Show first 10 items size_info = f"({entry.metadata.size} bytes)" if not entry.metadata.is_directory else "(directory)" print(f" • {entry.name} {size_info}") if len(list_result.entries) > 10: print(f" ... and {len(list_result.entries) - 10} more items") else: print(f"❌ Failed to list directory: {list_result.error.message}") # Test 2: Read an existing file (README.md if it exists) readme_path = current_dir / "README.md" if readme_path.exists(): print(f"\n📖 Test 2: Read {readme_path.name}") read_result = await file_ops.read_file(str(readme_path)) if read_result.success: content_preview = read_result.content[:200] + "..." if len(read_result.content) > 200 else read_result.content print(f"✅ Successfully read {readme_path.name}") print(f"📊 File size: {read_result.metadata.size} bytes") print(f"🕒 Modified: {read_result.metadata.modified_time}") print(f"📝 Content preview:\n{content_preview}") else: print(f"❌ Failed to read file: {read_result.error.message}") else: print("\n📖 Test 2: README.md not found, skipping read test") # Test 3: Create a test file test_file = current_dir / "mcp_test_file.txt" print(f"\n📝 Test 3: Create test file {test_file.name}") test_content = f"""# MCP Server Test File Created by File System MCP Server Current directory: {current_dir} Timestamp: {asyncio.get_event_loop().time()} This file demonstrates the MCP server's file creation capabilities. """ write_result = await file_ops.write_file(str(test_file), test_content) if write_result.success: print(f"✅ Successfully created {test_file.name}") print(f"📊 File size: {write_result.metadata.size} bytes") print(f"💾 Backup created: {write_result.backup_created or 'None (new file)'}") else: print(f"❌ Failed to create file: {write_result.error.message}") # Test 4: Update the test file if write_result.success: print(f"\n🔄 Test 4: Update test file") updated_content = test_content + "\n\n--- UPDATED ---\nThis line was added by the update operation.\n" update_result = await file_ops.update_file(str(test_file), updated_content) if update_result.success: print(f"✅ Successfully updated {test_file.name}") print(f"💾 Backup created: {update_result.backup_created}") print(f"📊 New size: {update_result.metadata.size} bytes") else: print(f"❌ Failed to update file: {update_result.error.message}") # Test 5: Get file info if test_file.exists(): print(f"\nℹ️ Test 5: Get file info for {test_file.name}") info_result = await file_ops.get_file_info(str(test_file)) if info_result.success: print(f"✅ File exists: {info_result.exists}") if info_result.metadata: print(f"📊 Size: {info_result.metadata.size} bytes") print(f"🕒 Modified: {info_result.metadata.modified_time}") print(f"🔐 Permissions: {info_result.metadata.permissions}") print(f"📁 Is directory: {info_result.metadata.is_directory}") print(f"🏷️ MIME type: {info_result.metadata.mime_type}") else: print(f"❌ Failed to get file info: {info_result.error.message}") # Test 6: List with pattern matching print(f"\n🔍 Test 6: List Python files (*.py pattern)") pattern_result = await file_ops.list_directory(str(current_dir), pattern="*.py") if pattern_result.success: py_files = [entry for entry in pattern_result.entries if entry.name.endswith('.py')] print(f"✅ Found {len(py_files)} Python files") for entry in py_files[:5]: # Show first 5 print(f" • {entry.name} ({entry.metadata.size} bytes)") if len(py_files) > 5: print(f" ... and {len(py_files) - 5} more Python files") else: print(f"❌ Failed to list with pattern: {pattern_result.error.message}") # Test 7: Test security (try to access protected path) print(f"\n🔒 Test 7: Test security protection") protected_result = await file_ops.read_file("/etc/passwd") if not protected_result.success: print(f"✅ Security working: {protected_result.error.message}") else: print(f"⚠️ Security bypass detected - this shouldn't happen!") # Test 8: List backups created print(f"\n💾 Test 8: Check backup system") if test_file.exists(): backup_info = safety_manager.get_backup_info(str(test_file)) print(f"✅ Found {len(backup_info)} backup(s) for {test_file.name}") for i, backup in enumerate(backup_info[:3]): # Show first 3 backups print(f" {i+1}. {Path(backup['path']).name} (created: {backup['created'][:19]})") # Test 9: Clean up (delete test file) print(f"\n🗑️ Test 9: Clean up test file") if test_file.exists(): delete_result = await file_ops.delete_file(str(test_file)) if delete_result.success: print(f"✅ Successfully deleted {test_file.name}") print(f"💾 Moved to backup: {delete_result.backup_location}") print(f"📊 Files deleted: {delete_result.files_deleted}") else: print(f"❌ Failed to delete file: {delete_result.error.message}") # Summary print(f"\n" + "=" * 60) print("🎉 Real File System Test Complete!") print(f"📁 Tested in: {current_dir}") print(f"💾 Backups stored in: {config.backup_directory}") print(f"🔒 Protected paths: {len(config.protected_paths)} paths") # Show backup directory contents backup_dir = Path(config.backup_directory) if backup_dir.exists(): backup_files = list(backup_dir.glob("*.backup")) print(f"💾 Total backups created: {len(backup_files)}") if backup_files: print("📋 Recent backups:") for backup in sorted(backup_files, key=lambda x: x.stat().st_mtime, reverse=True)[:3]: print(f" • {backup.name}") async def test_mcp_tools_simulation(): """Simulate MCP tool calls with real file system.""" print(f"\n🛠️ MCP Tools Simulation") print("=" * 40) current_dir = Path.cwd() config = Config( backup_directory=str(current_dir / ".mcp_backups"), max_file_size=5 * 1024 * 1024, enable_backups=True ) safety_manager = SafetyManager(config) file_ops = FileOperationsManager(config, safety_manager) # Simulate tool calls like an MCP client would make tools_to_test = [ ("list_directory", {"path": str(current_dir), "pattern": "*.md"}), ("get_file_info", {"path": str(current_dir / "README.md")}), ("list_directory", {"path": str(current_dir), "recursive": False}), ] for tool_name, args in tools_to_test: print(f"\n🔧 Tool: {tool_name}") print(f"📥 Args: {args}") try: if tool_name == "list_directory": result = await file_ops.list_directory( args["path"], args.get("pattern"), args.get("recursive", False) ) elif tool_name == "get_file_info": result = await file_ops.get_file_info(args["path"]) elif tool_name == "read_file": result = await file_ops.read_file(args["path"]) # Convert result to dict (like MCP server would) result_dict = result.to_dict() print(f"📤 Success: {result_dict['success']}") if result_dict['success']: if 'entries' in result_dict: print(f"📊 Found {len(result_dict['entries'])} entries") elif 'metadata' in result_dict and result_dict['metadata']: print(f"📊 File size: {result_dict['metadata']['size']} bytes") elif 'content' in result_dict: content_len = len(result_dict['content']) if result_dict['content'] else 0 print(f"📊 Content length: {content_len} characters") else: print(f"❌ Error: {result_dict.get('error', {}).get('message', 'Unknown error')}") except Exception as e: print(f"❌ Exception: {e}") if __name__ == "__main__": print("🧪 File System MCP Server - Real File System Test") try: # Run the main test asyncio.run(test_with_real_filesystem()) # Run MCP tools simulation asyncio.run(test_mcp_tools_simulation()) print(f"\n✨ All tests completed!") print(f"\n💡 Next steps:") print(f" 1. Install MCP: pip install mcp python-magic") print(f" 2. Run full server: python run_server.py --help") print(f" 3. Add to Kiro MCP config for AI assistant integration") except KeyboardInterrupt: print(f"\n⏹️ Test interrupted by user") except Exception as e: print(f"\n❌ Test failed: {e}") import traceback traceback.print_exc() sys.exit(1)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/chidvilas1234/Project-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server