"""Live CKAN integration tests covering the Action API surface."""
from __future__ import annotations
import os
from collections.abc import Iterable
import pytest
from ckan_mcp.ckan_tools import CkanToolsManager
from ckan_mcp.types import CkanPackage, CkanToolsConfig
RUN_LIVE_INTEGRATION = os.getenv("CKAN_RUN_INTEGRATION_TESTS") == "1"
pytestmark = [
pytest.mark.integration,
pytest.mark.asyncio,
pytest.mark.skipif(
not RUN_LIVE_INTEGRATION,
reason="Set CKAN_RUN_INTEGRATION_TESTS=1 to run the live CKAN integration suite.",
),
]
def _choose_search_terms() -> Iterable[str]:
"""Yield the configured live search terms."""
configured = os.getenv("CKAN_TEST_SEARCH_TERMS")
if configured:
return [term.strip() for term in configured.split(",") if term.strip()]
return ("traffic", "transportation", "ferry", "transit")
def _first_datastore_resource(package: CkanPackage) -> str | None:
"""Return a datastore-active resource ID if available."""
for resource in package.resources:
if resource.datastore_active:
return resource.id
return package.resources[0].id if package.resources else None
async def _resolve_datastore_candidate(
manager: CkanToolsManager, config: CkanToolsConfig
) -> tuple[CkanPackage, str]:
"""Find a CKAN dataset/resource pair with datastore entries."""
async with manager._create_session() as session:
for term in _choose_search_terms():
result = await manager.fetch_package_search(
term,
session,
rows=5,
fq="res_datastore_active:true",
)
for package in result.results:
resource_id = _first_datastore_resource(package)
if resource_id:
return package, resource_id
pytest.skip(
"Could not find a dataset with datastore resources from the configured search terms."
)
async def test_live_package_round_trip(live_ckan_config: CkanToolsConfig) -> None:
"""Verify package_list/package_show round-trips against a live CKAN portal."""
manager = CkanToolsManager(live_ckan_config)
async with manager._create_session() as session:
package_ids = await manager.fetch_package_list(session, limit=25)
assert package_ids, "Expected at least one dataset ID from package_list"
identifier = package_ids[0]
package = await manager.fetch_package(identifier, session)
assert package.id == identifier or package.name == identifier
assert package.title or package.name
async def test_live_datastore_preview_returns_records(live_ckan_config: CkanToolsConfig) -> None:
"""Ensure datastore_search returns real records for a live dataset."""
manager = CkanToolsManager(live_ckan_config)
package, resource_id = await _resolve_datastore_candidate(manager, live_ckan_config)
async with manager._create_session() as session:
preview = await manager.fetch_resource(resource_id, session, limit=5)
assert preview.records, f"Expected records for datastore resource on {package.title}"
assert preview.resource_id in {resource_id, None}
if preview.total is not None:
assert preview.total >= len(preview.records)