"""
Tests for CKAN Tools functionality.
This module provides comprehensive testing for all 10 CKAN tools with
mock responses and real API integration capabilities.
"""
import asyncio
import json
from dataclasses import dataclass
from types import SimpleNamespace
from typing import Any
from unittest.mock import AsyncMock, patch
import aiohttp
import pytest
from ckan_mcp.ckan_tools import (
CkanApiError,
CkanToolsManager,
register_ckan_tools,
_build_error_response,
_build_how_to_markdown,
_config_public_view,
_derive_resource_filename,
_handle_ckan_api_initialise,
_handle_search_datasets,
_resolve_dataset_directory,
create_tool_response,
)
from ckan_mcp.config_selection import CkanConfigCatalog
from ckan_mcp.helpers import UpdateFrequencyAnalyzer
from ckan_mcp.session_state import SessionConfigStore
from ckan_mcp.types import (
CkanOrganization,
CkanPackage,
CkanResource,
CkanSearchResult,
CkanToolsConfig,
)
@dataclass
class ToolTestCase:
"""Test case definition."""
name: str
tool: str
parameters: dict[str, Any]
expected_checks: dict[str, Any]
class MockCkanClient:
"""Mock CKAN client for testing."""
def __init__(self, config: CkanToolsConfig):
"""Initialize mock client with configuration."""
self.config = config
self.manager = CkanToolsManager(config)
async def call_tool(self, tool_name: str, parameters: dict[str, Any]) -> Any:
"""Mock tool call - returns test data."""
# Return mock responses based on tool name
if tool_name == "find_relevant_datasets":
return {
"query": parameters.get("query", ""),
"total_found": 5,
"returned_count": 5,
"datasets": [
{
"id": f"dataset_{i}",
"title": f"Test Dataset {i}",
"relevance_score": 10 - i,
"update_frequency": "monthly",
}
for i in range(min(5, parameters.get("maxResults", 20)))
],
"facets": {},
}
elif tool_name == "analyze_dataset_updates":
return {
"total_datasets": 3,
"frequency_summary": [
{
"frequency": "monthly",
"count": 2,
"datasets": [{"id": "d1", "title": "Dataset 1"}],
},
{
"frequency": "weekly",
"count": 1,
"datasets": [{"id": "d2", "title": "Dataset 2"}],
},
],
"datasets": [
{
"id": "d1",
"title": "Dataset 1",
"update_frequency": "monthly",
"last_modified": "2024-01-01T00:00:00Z",
}
],
}
elif tool_name == "analyze_dataset_structure":
return {
"package_id": parameters.get("packageId", "test-id"),
"name": "test-dataset",
"title": "Test Dataset",
"resource_summary": {
"total_resources": 3,
"datastore_resources": 2,
"formats": ["CSV", "JSON"],
},
"resources": [
{
"id": "resource_1",
"name": "Test Resource",
"format": "CSV",
"datastore_active": True,
"record_count": 1000,
}
],
}
elif tool_name == "get_data_categories":
return {
"organizations": [
{
"id": "org1",
"name": "city-toronto",
"title": "City of Toronto",
"package_count": 150,
}
],
"groups": [
{
"id": "group1",
"name": "transportation",
"title": "Transportation",
"package_count": 25,
}
],
}
elif tool_name == "get_dataset_insights":
return {
"query": parameters.get("query", ""),
"total_found": 10,
"analyzed_datasets": 3,
"insights": [
{
"id": "insight1",
"title": "Insight 1",
"relevance_score": 9,
"update_info": {"frequency": "monthly"},
"data_structure": {"record_count": 1000},
}
],
"query_suggestions": {
"organizations": ["City of Toronto"],
"common_tags": ["transportation", "data"],
},
}
elif tool_name == "get_package":
return {
"id": parameters.get("packageId", "test-id"),
"title": "Test Package",
"description": "A test dataset",
"organization": "Test Org",
"tags": ["test", "data"],
}
elif tool_name == "get_first_datastore_resource_records":
return {
"resource_id": "resource_1",
"resource_name": "Test Resource",
"total_records": 1000,
"returned_records": 10,
"records": [{"id": 1, "value": "test"}],
}
elif tool_name == "get_resource_records":
return {
"resource_id": parameters.get("resourceId", "test-resource"),
"total_records": 1000,
"returned_records": parameters.get("limit", 10),
"records": [{"id": i, "value": f"test_{i}"} for i in range(10)],
}
elif tool_name == "list_datasets":
return {
"total_datasets": 500,
"returned_count": parameters.get("limit", 50),
"dataset_ids": [f"dataset_{i}" for i in range(parameters.get("limit", 50))],
}
elif tool_name == "search_datasets":
return {
"query": parameters.get("query", ""),
"total_found": 25,
"returned_count": min(20, parameters.get("limit", 20)),
"datasets": [
{
"id": f"search_result_{i}",
"title": f"Search Result {i}",
"description": f"Description for result {i}",
}
for i in range(min(20, parameters.get("limit", 20)))
],
}
else:
return {"error": f"Unknown tool: {tool_name}"}
class DummyServer:
"""Capture registered tools for schema assertions."""
def __init__(self) -> None:
self.list_handler: Any | None = None
self.call_handler: Any | None = None
def list_tools(self):
def decorator(func):
self.list_handler = func
return decorator
def call_tool(self):
def decorator(func):
self.call_handler = func
return decorator
def _build_session_store() -> SessionConfigStore:
"""Create a session store with a dummy server for initialisation tests."""
fake_server = SimpleNamespace(request_context=SimpleNamespace(session=object()))
return SessionConfigStore(fake_server)
def test_config_public_view_redacts_api_key():
config = CkanToolsConfig(
ckan_base_url="https://example.com/api/3/action", api_key="super-secret"
)
view = _config_public_view(config)
assert view["api_key"] == "***"
# Ensure other fields remain intact
assert view["ckan_base_url"] == "https://example.com/api/3/action"
@pytest.mark.asyncio
async def test_ckan_api_initialise_requires_api_key_when_flagged():
session_store = _build_session_store()
catalog = CkanConfigCatalog(
{
"countries": {
"Testland": {
"locations": {
"Secure City": {
"base_url": "https://example.com/api/3/action",
"overrides": {"requires_api_key": True},
}
}
}
}
}
)
handler = _handle_ckan_api_initialise(session_store, catalog)
response = await handler({"country": "Testland", "location": "Secure City"})
assert response.isError is True
assert response.structuredContent is not None
assert "requires an API key" in response.structuredContent["error"]
@pytest.mark.asyncio
async def test_ckan_api_initialise_overrides_schema_accepts_empty_string():
server = DummyServer()
session_store = _build_session_store()
catalog = CkanConfigCatalog({"countries": {}})
await register_ckan_tools(server, session_store, catalog)
assert server.list_handler is not None
registered_tools = await server.list_handler()
init_tool = next(tool for tool in registered_tools if tool.name == "ckan_api_initialise")
overrides_schema = init_tool.inputSchema["properties"]["overrides"]
assert "anyOf" in overrides_schema
assert any(
option.get("type") == "string" and option.get("maxLength") == 0
for option in overrides_schema["anyOf"]
)
def _extract_result_items(result: Any) -> list[Any]:
"""Return whichever list field represents items for the given result."""
if isinstance(result, dict):
for field in ("datasets", "insights", "records"):
value = result.get(field)
if isinstance(value, list):
return value
return []
# Test cases matching TypeScript version
TEST_CASES = [
ToolTestCase(
name="Basic Dataset Search",
tool="find_relevant_datasets",
parameters={"query": "parking", "maxResults": 5, "includeRelevanceScore": True},
expected_checks={
"hasResults": True,
"minResults": 1,
"hasField": ["datasets", "total_found"],
"noErrors": True,
},
),
ToolTestCase(
name="Update Frequency Analysis",
tool="analyze_dataset_updates",
parameters={"query": "traffic", "groupByFrequency": True},
expected_checks={
"hasResults": True,
"hasField": ["frequency_summary", "total_datasets"],
"customCheck": lambda result: len(result.get("frequency_summary", [])) > 0,
},
),
ToolTestCase(
name="Data Structure Analysis",
tool="analyze_dataset_structure",
parameters={"packageId": "building-permits", "includeDataPreview": False},
expected_checks={
"hasResults": True,
"hasField": ["resources", "resource_summary"],
"customCheck": lambda result: len(result.get("resources", [])) > 0,
},
),
ToolTestCase(
name="Category Discovery",
tool="get_data_categories",
parameters={},
expected_checks={
"hasResults": True,
"hasField": ["organizations", "groups"],
"customCheck": lambda result: len(result.get("organizations", [])) > 0,
},
),
ToolTestCase(
name="Comprehensive Insights",
tool="get_dataset_insights",
parameters={
"query": "transportation",
"maxDatasets": 3,
"includeUpdateFrequency": True,
"includeDataStructure": True,
},
expected_checks={
"hasResults": True,
"minResults": 1,
"hasField": ["insights", "query_suggestions"],
"customCheck": lambda result: len(result.get("insights", [])) > 0,
},
),
# Basic tools tests
ToolTestCase(
name="Get Package",
tool="get_package",
parameters={"packageId": "test-package", "summary": True},
expected_checks={"hasResults": True, "hasField": ["id", "title"], "noErrors": True},
),
ToolTestCase(
name="List Datasets",
tool="list_datasets",
parameters={"limit": 10, "offset": 0},
expected_checks={
"hasResults": True,
"hasField": ["total_datasets", "dataset_ids"],
"customCheck": lambda result: len(result.get("dataset_ids", [])) > 0,
},
),
ToolTestCase(
name="Search Datasets",
tool="search_datasets",
parameters={"query": "test", "limit": 5},
expected_checks={
"hasResults": True,
"hasField": ["datasets", "total_found"],
"customCheck": lambda result: len(result.get("datasets", [])) > 0,
},
),
]
class TestCkanTools:
"""Test suite for CKAN tools."""
@pytest.fixture
def config(self):
"""Create test configuration."""
return CkanToolsConfig(
ckan_base_url="https://test.ckan.api",
ckan_site_url="https://test.ckan.site",
request_timeout=5000,
)
@pytest.fixture
def mock_client(self, config):
"""Create mock CKAN client."""
return MockCkanClient(config)
@pytest.mark.asyncio
@pytest.mark.parametrize("test_case", TEST_CASES)
async def test_tool_functionality(self, mock_client, test_case):
"""Test each tool with its test case."""
print(f"\n๐งช Running test: {test_case.name}")
try:
result = await mock_client.call_tool(test_case.tool, test_case.parameters)
print("โ
Tool executed successfully")
# Run validation checks
checks = test_case.expected_checks
passed = True
if checks.get("hasResults") and not result:
print("โ Expected results but got none")
passed = False
if checks.get("noErrors") and isinstance(result, dict) and result.get("error"):
print(f"โ Unexpected error: {result['error']}")
passed = False
if "hasField" in checks:
for field in checks["hasField"]:
if field not in result:
print(f"โ Missing expected field: {field}")
passed = False
if "minResults" in checks:
datasets_count = len(_extract_result_items(result))
if datasets_count < checks["minResults"]:
print(
f"โ Expected at least {checks['minResults']} results, got {datasets_count}"
)
passed = False
if "maxResults" in checks:
datasets_count = len(_extract_result_items(result))
if datasets_count > checks["maxResults"]:
print(
f"โ Expected at most {checks['maxResults']} results, got {datasets_count}"
)
passed = False
custom_check = checks.get("customCheck")
if custom_check and not custom_check(result):
print("โ Custom validation check failed")
passed = False
if passed:
print(f"โ
Test passed: {test_case.name}")
else:
print(f"โ Test failed: {test_case.name}")
assert passed, f"Test failed: {test_case.name}"
except Exception as e:
print(f"โ Test failed with error: {e}")
raise
def test_create_tool_response(self):
"""Test tool response creation."""
data = {"test": "data", "number": 42}
response = create_tool_response(data)
assert response.content is not None
assert len(response.content) == 1
assert response.content[0].type == "text"
# Check that the data is properly JSON serialized
response_text = response.content[0].text
parsed_data = json.loads(response_text)
assert parsed_data == data
def test_ckan_tools_manager_initialization(self, config):
"""Test CkanToolsManager initialization."""
manager = CkanToolsManager(config)
assert manager.config == config
assert manager.scorer is not None
assert manager.analyzer is not None
assert manager.summary_builder is not None
assert manager.url_builder is not None
def test_url_normalization(self, config):
"""Test URL normalization functions."""
manager = CkanToolsManager(config)
# Test base URL normalization
assert manager.normalize_base_url("https://test.com/") == "https://test.com"
assert manager.normalize_base_url("https://test.com//") == "https://test.com"
assert manager.normalize_base_url(" https://test.com/ ") == "https://test.com"
# Test site URL normalization
assert manager.normalize_site_url("https://site.com/") == "https://site.com"
assert manager.normalize_site_url(None) is None
assert manager.normalize_site_url("") is None
def test_endpoint_url_building(self, config):
"""Test endpoint URL building."""
manager = CkanToolsManager(config)
url = manager.build_endpoint_url("package_show")
assert url == "https://test.ckan.api/package_show"
url = manager.build_endpoint_url("/package_show")
assert url == "https://test.ckan.api/package_show"
@pytest.mark.asyncio
async def test_create_session_context(self, config):
"""Ensure the manager provides a configured aiohttp session."""
manager = CkanToolsManager(config)
async with manager._create_session() as session:
assert isinstance(session, aiohttp.ClientSession)
assert not session.closed
assert session.closed
@pytest.mark.asyncio
async def test_api_error_handling(self, config):
"""Test API error handling."""
manager = CkanToolsManager(config)
# Mock a failed response
with patch("aiohttp.ClientSession.post") as mock_post:
mock_response = AsyncMock()
mock_response.ok = False
mock_response.status = 404
mock_response.reason = "Not Found"
mock_response.text = AsyncMock(return_value='{"error": {"message": "Not Found"}}')
mock_response.headers = {}
mock_post.return_value.__aenter__.return_value = mock_response
async with aiohttp.ClientSession() as session:
with pytest.raises(CkanApiError) as exc_info:
await manager._api_call("nonexistent_endpoint", session)
error = exc_info.value
assert error.status_code == 404
assert error.error_payload.get("message") == "Not Found"
assert error.url == "https://test.ckan.api/nonexistent_endpoint"
def test_build_error_response_adds_request_url(self):
"""Ensure tool errors include the request URL for debugging."""
api_error = CkanApiError("Bad Request", status_code=400, url="https://example.com/api")
response = _build_error_response("Failed to call CKAN.", api_error)
assert response.isError is True
assert response.structuredContent is not None
assert response.structuredContent.get("url") == "https://example.com/api"
assert (
response.structuredContent.get("ckanError", {}).get("url") == "https://example.com/api"
)
@pytest.mark.asyncio
async def test_search_datasets_forwards_extra_params(self, config, monkeypatch):
"""Ensure extraSearchParams reach fetch_package_search unchanged."""
session_store = _build_session_store()
session_store.set_config(config)
handler = _handle_search_datasets(session_store)
mock_search = AsyncMock(
return_value=CkanSearchResult(count=0, results=[], facets=None, search_facets=None)
)
monkeypatch.setattr(CkanToolsManager, "fetch_package_search", mock_search)
response = await handler(
{"query": "traffic", "extraSearchParams": {"include_drafts": True}}
)
assert response.isError is False
assert mock_search.await_count == 1
called_kwargs = mock_search.await_args.kwargs # type: ignore[attr-defined]
assert called_kwargs.get("extra_params") == {"include_drafts": True}
@pytest.mark.asyncio
async def test_search_datasets_rejects_invalid_extra_params(self, config):
"""Ensure invalid extraSearchParams produce an error response."""
session_store = _build_session_store()
session_store.set_config(config)
handler = _handle_search_datasets(session_store)
response = await handler({"query": "traffic", "extraSearchParams": "not-a-dict"})
assert response.isError is True
assert response.structuredContent is not None
assert "extraSearchParams" in response.structuredContent["error"]
def test_days_since_date_supports_naive_iso(self, config):
"""Ensure update analyzer can parse ISO strings without timezone."""
analyzer = UpdateFrequencyAnalyzer(config)
days = analyzer._days_since_date("2000-01-01T12:34:56.123456")
assert isinstance(days, int)
assert days >= 0
def test_days_since_date_supports_timezone_iso(self, config):
"""Ensure analyzer can parse ISO strings with timezone offsets."""
analyzer = UpdateFrequencyAnalyzer(config)
target = "2000-01-01T12:34:56.123456+00:00"
days = analyzer._days_since_date(target)
assert isinstance(days, int)
assert days >= 0
def test_resolve_dataset_directory_uses_env(self, tmp_path, monkeypatch):
"""Ensure dataset directory resolution honors CKAN_MCP_LOCAL_DATASTORE."""
monkeypatch.setenv("CKAN_MCP_LOCAL_DATASTORE", str(tmp_path))
dataset_dir = _resolve_dataset_directory("Sample Dataset")
assert dataset_dir.parent == tmp_path
assert dataset_dir.exists()
def test_derive_resource_filename_uses_format(self):
"""Ensure filenames honor resource format extensions."""
resource = CkanResource(
id="res-123",
name="Toronto Ferry Sales",
format="CSV",
size=None,
mimetype=None,
url="https://example.com/path/data.csv",
created="2024-01-01T00:00:00Z",
last_modified="2024-01-15T00:00:00Z",
datastore_active=True,
description=None,
)
filename = _derive_resource_filename(resource)
assert filename.startswith("toronto_ferry_sales")
assert filename.endswith(".csv")
def test_build_how_to_markdown_includes_required_sections(self, config, tmp_path):
"""Ensure how-to guidance includes context, limitations, and Python instructions."""
analyzer = UpdateFrequencyAnalyzer(config)
organization = CkanOrganization(
id="org-1",
name="city-toronto",
title="City of Toronto",
description="",
package_count=1,
image_url=None,
)
resource = CkanResource(
id="res-abc",
name="Ferry Tickets",
format="CSV",
size=None,
mimetype=None,
url="https://example.com/ferry.csv",
created="2024-01-01T00:00:00Z",
last_modified="2024-02-01T00:00:00Z",
datastore_active=True,
description="Aggregated ticket sales exclude cancelled transactions.",
)
package = CkanPackage(
id="pkg-1",
name="toronto-island-ferry-sales",
title="Toronto Island Ferry Sales",
notes="Ticket sales per route and fare class.",
tags=[],
organization=organization,
resources=[resource],
metadata_created="2023-01-01T00:00:00Z",
metadata_modified="2024-02-02T00:00:00Z",
refresh_rate="monthly",
maintainer=None,
maintainer_updated=None,
groups=None,
state="active",
type="dataset",
url="https://example.com/dataset",
ckan_url="https://example.com/dataset",
)
dataset_path = tmp_path / "dataset.csv"
dataset_path.write_text("id,value\n1,2\n", encoding="utf-8")
metadata_path = tmp_path / "metadata.json"
metadata_path.write_text("{}", encoding="utf-8")
markdown = _build_how_to_markdown(
package=package,
resource=resource,
dataset_path=dataset_path,
metadata_path=metadata_path,
dataset_url="https://example.com/dataset",
analyzer=analyzer,
fields=["ticket_type", "sale_date"],
record_count=100,
)
assert "Dataset Overview" in markdown
assert "Limitations" in markdown
assert "pd.read_csv" in markdown
assert "Data Range" in markdown
class TestPerformance:
"""Performance tests for CKAN tools."""
@pytest.fixture
def config(self):
"""Create test configuration."""
return CkanToolsConfig(ckan_base_url="https://test.ckan.api", request_timeout=10000)
@pytest.fixture
def mock_client(self, config):
"""Create mock CKAN client."""
return MockCkanClient(config)
@pytest.mark.asyncio
async def test_tool_response_times(self, mock_client):
"""Test that tool responses are within acceptable time limits."""
performance_tests = [
{
"name": "Large Query Response Time",
"tool": "find_relevant_datasets",
"parameters": {"query": "toronto", "maxResults": 50},
"max_time_ms": 5000, # 5 seconds for mock
},
{
"name": "Complex Analysis Response Time",
"tool": "get_dataset_insights",
"parameters": {"query": "budget financial", "maxDatasets": 10},
"max_time_ms": 5000, # 5 seconds for mock
},
]
for test in performance_tests:
start_time = asyncio.get_event_loop().time()
await mock_client.call_tool(test["tool"], test["parameters"])
end_time = asyncio.get_event_loop().time()
duration_ms = (end_time - start_time) * 1000
print(f"{test['name']}: {duration_ms:.2f}ms")
assert (
duration_ms < test["max_time_ms"]
), f"{test['name']} exceeded time limit: {duration_ms:.2f}ms > {test['max_time_ms']}ms"
if __name__ == "__main__":
# Run tests when executed directly
asyncio.run(pytest.main([__file__, "-v"]))