Skip to main content
Glama
test_mcp_server.py11.3 kB
"""Integration tests for the MCP server.""" import asyncio import json import tempfile from pathlib import Path from unittest.mock import AsyncMock, MagicMock import pytest from file_system_mcp_server.config import Config from file_system_mcp_server.server import FileSystemMCPServer class TestMCPServerIntegration: """Integration tests for the MCP server.""" @pytest.fixture def temp_config(self): """Create a temporary configuration for testing.""" with tempfile.TemporaryDirectory() as temp_dir: config = Config( backup_directory=str(Path(temp_dir) / "backups"), max_file_size=1024 * 1024, # 1MB max_recursion_depth=5, protected_paths=["/etc", "/usr"], enable_backups=True, log_level="DEBUG" ) yield config @pytest.fixture def server(self, temp_config): """Create a server instance for testing.""" return FileSystemMCPServer(temp_config) @pytest.fixture def test_workspace(self): """Create a temporary workspace for testing.""" with tempfile.TemporaryDirectory() as temp_dir: workspace = Path(temp_dir) # Create test files and directories (workspace / "test.txt").write_text("Hello, World!") (workspace / "subdir").mkdir() (workspace / "subdir" / "nested.py").write_text("print('nested')") (workspace / "binary.bin").write_bytes(b"\x00\x01\x02\x03") yield workspace async def test_list_tools(self, server): """Test that all expected tools are listed.""" # Mock the list_tools handler tools = await server.server._handlers["list_tools"]() expected_tools = { "read_file", "write_file", "update_file", "list_directory", "delete_file", "get_file_info" } tool_names = {tool.name for tool in tools} assert tool_names == expected_tools # Check that each tool has proper schema for tool in tools: assert tool.name assert tool.description assert tool.inputSchema assert "type" in tool.inputSchema assert "properties" in tool.inputSchema assert "required" in tool.inputSchema async def test_read_file_tool(self, server, test_workspace): """Test the read_file tool.""" test_file = test_workspace / "test.txt" # Call the tool result = await server.server._handlers["call_tool"]( "read_file", {"path": str(test_file)} ) assert len(result) == 1 response_data = json.loads(result[0].text) assert response_data["success"] is True assert response_data["content"] == "Hello, World!" assert response_data["is_binary"] is False assert "metadata" in response_data assert response_data["metadata"]["size"] == 13 async def test_read_binary_file_tool(self, server, test_workspace): """Test reading a binary file.""" binary_file = test_workspace / "binary.bin" result = await server.server._handlers["call_tool"]( "read_file", {"path": str(binary_file)} ) response_data = json.loads(result[0].text) assert response_data["success"] is True assert response_data["content"] is None # Binary files don't return content assert response_data["is_binary"] is True assert "metadata" in response_data async def test_write_file_tool(self, server, test_workspace): """Test the write_file tool.""" new_file = test_workspace / "new_file.txt" content = "This is new content" result = await server.server._handlers["call_tool"]( "write_file", {"path": str(new_file), "content": content} ) response_data = json.loads(result[0].text) assert response_data["success"] is True assert response_data["path"] == str(new_file) assert "metadata" in response_data # Verify file was actually created assert new_file.exists() assert new_file.read_text() == content async def test_write_file_overwrite_protection(self, server, test_workspace): """Test write_file overwrite protection.""" existing_file = test_workspace / "test.txt" result = await server.server._handlers["call_tool"]( "write_file", {"path": str(existing_file), "content": "new content", "overwrite": False} ) response_data = json.loads(result[0].text) assert response_data["success"] is False assert "error" in response_data assert response_data["error"]["error_type"] == "FileExists" async def test_update_file_tool(self, server, test_workspace): """Test the update_file tool.""" test_file = test_workspace / "test.txt" original_content = test_file.read_text() new_content = "Updated content" result = await server.server._handlers["call_tool"]( "update_file", {"path": str(test_file), "content": new_content} ) response_data = json.loads(result[0].text) assert response_data["success"] is True assert "backup_created" in response_data # Verify file was updated assert test_file.read_text() == new_content # Verify backup was created if response_data["backup_created"]: backup_file = Path(response_data["backup_created"]) assert backup_file.exists() async def test_list_directory_tool(self, server, test_workspace): """Test the list_directory tool.""" result = await server.server._handlers["call_tool"]( "list_directory", {"path": str(test_workspace)} ) response_data = json.loads(result[0].text) assert response_data["success"] is True assert response_data["total_entries"] >= 3 # At least test.txt, subdir, binary.bin assert len(response_data["entries"]) >= 3 # Check that entries have proper structure for entry in response_data["entries"]: assert "name" in entry assert "path" in entry assert "metadata" in entry async def test_list_directory_with_pattern(self, server, test_workspace): """Test directory listing with pattern filtering.""" result = await server.server._handlers["call_tool"]( "list_directory", {"path": str(test_workspace), "pattern": "*.txt"} ) response_data = json.loads(result[0].text) assert response_data["success"] is True assert response_data["pattern_used"] == "*.txt" # Should only include .txt files txt_files = [entry for entry in response_data["entries"] if entry["name"].endswith(".txt")] assert len(txt_files) >= 1 async def test_list_directory_recursive(self, server, test_workspace): """Test recursive directory listing.""" result = await server.server._handlers["call_tool"]( "list_directory", {"path": str(test_workspace), "recursive": True} ) response_data = json.loads(result[0].text) assert response_data["success"] is True assert response_data["recursive"] is True # Should include files from subdirectories nested_files = [entry for entry in response_data["entries"] if "nested.py" in entry["name"]] assert len(nested_files) >= 1 async def test_delete_file_tool(self, server, test_workspace): """Test the delete_file tool.""" # Create a file to delete delete_file = test_workspace / "to_delete.txt" delete_file.write_text("Delete me") result = await server.server._handlers["call_tool"]( "delete_file", {"path": str(delete_file)} ) response_data = json.loads(result[0].text) assert response_data["success"] is True assert "backup_location" in response_data # File should be gone assert not delete_file.exists() # Backup should exist if response_data["backup_location"]: backup_file = Path(response_data["backup_location"]) assert backup_file.exists() async def test_get_file_info_tool(self, server, test_workspace): """Test the get_file_info tool.""" test_file = test_workspace / "test.txt" result = await server.server._handlers["call_tool"]( "get_file_info", {"path": str(test_file)} ) response_data = json.loads(result[0].text) assert response_data["success"] is True assert response_data["exists"] is True assert "metadata" in response_data metadata = response_data["metadata"] assert metadata["size"] == 13 assert metadata["is_directory"] is False assert "modified_time" in metadata assert "permissions" in metadata async def test_get_file_info_nonexistent(self, server, test_workspace): """Test get_file_info for nonexistent file.""" nonexistent = test_workspace / "nonexistent.txt" result = await server.server._handlers["call_tool"]( "get_file_info", {"path": str(nonexistent)} ) response_data = json.loads(result[0].text) assert response_data["success"] is True assert response_data["exists"] is False assert response_data["metadata"] is None async def test_security_protected_path(self, server): """Test that protected paths are blocked.""" result = await server.server._handlers["call_tool"]( "read_file", {"path": "/etc/passwd"} ) response_data = json.loads(result[0].text) assert response_data["success"] is False assert "error" in response_data assert response_data["error"]["error_type"] == "SecurityError" async def test_error_handling_invalid_tool(self, server): """Test error handling for invalid tool names.""" result = await server.server._handlers["call_tool"]( "invalid_tool", {"path": "/some/path"} ) assert len(result) == 1 assert "Unknown tool" in result[0].text async def test_error_handling_missing_arguments(self, server): """Test error handling for missing required arguments.""" result = await server.server._handlers["call_tool"]( "read_file", {} # Missing required 'path' argument ) assert len(result) == 1 assert "Error:" in result[0].text

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/chidvilas1234/Project-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server