"""Integration tests for the MCP server."""
import asyncio
import json
import tempfile
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock
import pytest
from file_system_mcp_server.config import Config
from file_system_mcp_server.server import FileSystemMCPServer
class TestMCPServerIntegration:
"""Integration tests for the MCP server."""
@pytest.fixture
def temp_config(self):
"""Create a temporary configuration for testing."""
with tempfile.TemporaryDirectory() as temp_dir:
config = Config(
backup_directory=str(Path(temp_dir) / "backups"),
max_file_size=1024 * 1024, # 1MB
max_recursion_depth=5,
protected_paths=["/etc", "/usr"],
enable_backups=True,
log_level="DEBUG"
)
yield config
@pytest.fixture
def server(self, temp_config):
"""Create a server instance for testing."""
return FileSystemMCPServer(temp_config)
@pytest.fixture
def test_workspace(self):
"""Create a temporary workspace for testing."""
with tempfile.TemporaryDirectory() as temp_dir:
workspace = Path(temp_dir)
# Create test files and directories
(workspace / "test.txt").write_text("Hello, World!")
(workspace / "subdir").mkdir()
(workspace / "subdir" / "nested.py").write_text("print('nested')")
(workspace / "binary.bin").write_bytes(b"\x00\x01\x02\x03")
yield workspace
async def test_list_tools(self, server):
"""Test that all expected tools are listed."""
# Mock the list_tools handler
tools = await server.server._handlers["list_tools"]()
expected_tools = {
"read_file", "write_file", "update_file",
"list_directory", "delete_file", "get_file_info"
}
tool_names = {tool.name for tool in tools}
assert tool_names == expected_tools
# Check that each tool has proper schema
for tool in tools:
assert tool.name
assert tool.description
assert tool.inputSchema
assert "type" in tool.inputSchema
assert "properties" in tool.inputSchema
assert "required" in tool.inputSchema
async def test_read_file_tool(self, server, test_workspace):
"""Test the read_file tool."""
test_file = test_workspace / "test.txt"
# Call the tool
result = await server.server._handlers["call_tool"](
"read_file",
{"path": str(test_file)}
)
assert len(result) == 1
response_data = json.loads(result[0].text)
assert response_data["success"] is True
assert response_data["content"] == "Hello, World!"
assert response_data["is_binary"] is False
assert "metadata" in response_data
assert response_data["metadata"]["size"] == 13
async def test_read_binary_file_tool(self, server, test_workspace):
"""Test reading a binary file."""
binary_file = test_workspace / "binary.bin"
result = await server.server._handlers["call_tool"](
"read_file",
{"path": str(binary_file)}
)
response_data = json.loads(result[0].text)
assert response_data["success"] is True
assert response_data["content"] is None # Binary files don't return content
assert response_data["is_binary"] is True
assert "metadata" in response_data
async def test_write_file_tool(self, server, test_workspace):
"""Test the write_file tool."""
new_file = test_workspace / "new_file.txt"
content = "This is new content"
result = await server.server._handlers["call_tool"](
"write_file",
{"path": str(new_file), "content": content}
)
response_data = json.loads(result[0].text)
assert response_data["success"] is True
assert response_data["path"] == str(new_file)
assert "metadata" in response_data
# Verify file was actually created
assert new_file.exists()
assert new_file.read_text() == content
async def test_write_file_overwrite_protection(self, server, test_workspace):
"""Test write_file overwrite protection."""
existing_file = test_workspace / "test.txt"
result = await server.server._handlers["call_tool"](
"write_file",
{"path": str(existing_file), "content": "new content", "overwrite": False}
)
response_data = json.loads(result[0].text)
assert response_data["success"] is False
assert "error" in response_data
assert response_data["error"]["error_type"] == "FileExists"
async def test_update_file_tool(self, server, test_workspace):
"""Test the update_file tool."""
test_file = test_workspace / "test.txt"
original_content = test_file.read_text()
new_content = "Updated content"
result = await server.server._handlers["call_tool"](
"update_file",
{"path": str(test_file), "content": new_content}
)
response_data = json.loads(result[0].text)
assert response_data["success"] is True
assert "backup_created" in response_data
# Verify file was updated
assert test_file.read_text() == new_content
# Verify backup was created
if response_data["backup_created"]:
backup_file = Path(response_data["backup_created"])
assert backup_file.exists()
async def test_list_directory_tool(self, server, test_workspace):
"""Test the list_directory tool."""
result = await server.server._handlers["call_tool"](
"list_directory",
{"path": str(test_workspace)}
)
response_data = json.loads(result[0].text)
assert response_data["success"] is True
assert response_data["total_entries"] >= 3 # At least test.txt, subdir, binary.bin
assert len(response_data["entries"]) >= 3
# Check that entries have proper structure
for entry in response_data["entries"]:
assert "name" in entry
assert "path" in entry
assert "metadata" in entry
async def test_list_directory_with_pattern(self, server, test_workspace):
"""Test directory listing with pattern filtering."""
result = await server.server._handlers["call_tool"](
"list_directory",
{"path": str(test_workspace), "pattern": "*.txt"}
)
response_data = json.loads(result[0].text)
assert response_data["success"] is True
assert response_data["pattern_used"] == "*.txt"
# Should only include .txt files
txt_files = [entry for entry in response_data["entries"] if entry["name"].endswith(".txt")]
assert len(txt_files) >= 1
async def test_list_directory_recursive(self, server, test_workspace):
"""Test recursive directory listing."""
result = await server.server._handlers["call_tool"](
"list_directory",
{"path": str(test_workspace), "recursive": True}
)
response_data = json.loads(result[0].text)
assert response_data["success"] is True
assert response_data["recursive"] is True
# Should include files from subdirectories
nested_files = [entry for entry in response_data["entries"] if "nested.py" in entry["name"]]
assert len(nested_files) >= 1
async def test_delete_file_tool(self, server, test_workspace):
"""Test the delete_file tool."""
# Create a file to delete
delete_file = test_workspace / "to_delete.txt"
delete_file.write_text("Delete me")
result = await server.server._handlers["call_tool"](
"delete_file",
{"path": str(delete_file)}
)
response_data = json.loads(result[0].text)
assert response_data["success"] is True
assert "backup_location" in response_data
# File should be gone
assert not delete_file.exists()
# Backup should exist
if response_data["backup_location"]:
backup_file = Path(response_data["backup_location"])
assert backup_file.exists()
async def test_get_file_info_tool(self, server, test_workspace):
"""Test the get_file_info tool."""
test_file = test_workspace / "test.txt"
result = await server.server._handlers["call_tool"](
"get_file_info",
{"path": str(test_file)}
)
response_data = json.loads(result[0].text)
assert response_data["success"] is True
assert response_data["exists"] is True
assert "metadata" in response_data
metadata = response_data["metadata"]
assert metadata["size"] == 13
assert metadata["is_directory"] is False
assert "modified_time" in metadata
assert "permissions" in metadata
async def test_get_file_info_nonexistent(self, server, test_workspace):
"""Test get_file_info for nonexistent file."""
nonexistent = test_workspace / "nonexistent.txt"
result = await server.server._handlers["call_tool"](
"get_file_info",
{"path": str(nonexistent)}
)
response_data = json.loads(result[0].text)
assert response_data["success"] is True
assert response_data["exists"] is False
assert response_data["metadata"] is None
async def test_security_protected_path(self, server):
"""Test that protected paths are blocked."""
result = await server.server._handlers["call_tool"](
"read_file",
{"path": "/etc/passwd"}
)
response_data = json.loads(result[0].text)
assert response_data["success"] is False
assert "error" in response_data
assert response_data["error"]["error_type"] == "SecurityError"
async def test_error_handling_invalid_tool(self, server):
"""Test error handling for invalid tool names."""
result = await server.server._handlers["call_tool"](
"invalid_tool",
{"path": "/some/path"}
)
assert len(result) == 1
assert "Unknown tool" in result[0].text
async def test_error_handling_missing_arguments(self, server):
"""Test error handling for missing required arguments."""
result = await server.server._handlers["call_tool"](
"read_file",
{} # Missing required 'path' argument
)
assert len(result) == 1
assert "Error:" in result[0].text