"""Integration tests covering the Streamable HTTP transport against a live CKAN portal."""
from __future__ import annotations
import os
from typing import Any
import httpx
import pytest
from asgi_lifespan import LifespanManager
from mcp.client.session import ClientSession
from mcp.client.streamable_http import streamablehttp_client
from ckan_mcp.main import create_default_server
from ckan_mcp.types import CkanToolsConfig
RUN_LIVE_INTEGRATION = os.getenv("CKAN_RUN_INTEGRATION_TESTS") == "1"
pytestmark = [
pytest.mark.integration,
pytest.mark.asyncio,
pytest.mark.skipif(
not RUN_LIVE_INTEGRATION,
reason="Set CKAN_RUN_INTEGRATION_TESTS=1 to run the live CKAN integration suite.",
),
]
def _asgi_client_factory(app):
"""Create a factory that returns httpx clients bound to the in-process ASGI app."""
def factory(
headers: dict[str, str] | None = None,
timeout: httpx.Timeout | None = None,
auth: httpx.Auth | None = None,
) -> httpx.AsyncClient:
transport = httpx.ASGITransport(app=app)
client_kwargs: dict[str, Any] = {
"transport": transport,
"base_url": "http://ckan-mcp.test",
"follow_redirects": True,
}
if headers:
client_kwargs["headers"] = headers
client_kwargs["timeout"] = timeout or httpx.Timeout(30.0)
if auth:
client_kwargs["auth"] = auth
return httpx.AsyncClient(**client_kwargs)
return factory
async def test_streamable_http_can_search_live_portal(live_ckan_config: CkanToolsConfig) -> None:
"""Exercise the HTTP transport end-to-end using the real CKAN portal."""
server = create_default_server()
app = server.create_streamable_http_app(mount_path="/mcp", json_response=True)
client_factory = _asgi_client_factory(app)
base_url = "http://ckan-mcp.test/mcp"
async with LifespanManager(app):
async with streamablehttp_client(
base_url,
httpx_client_factory=client_factory,
) as (read_stream, write_stream, _):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
init_payload: dict[str, Any] = {
"country": os.getenv("CKAN_TEST_COUNTRY", "Canada"),
"location": os.getenv("CKAN_TEST_LOCATION", "Toronto"),
}
overrides: dict[str, Any] = {}
if live_ckan_config.action_transport != "post":
overrides["actionTransport"] = live_ckan_config.action_transport
if live_ckan_config.datastore_id_alias:
overrides["datastoreIdAlias"] = True
if live_ckan_config.dataset_page_url_template:
overrides["datasetPageUrlTemplate"] = live_ckan_config.dataset_page_url_template
if overrides:
init_payload["overrides"] = overrides
init_result = await session.call_tool("ckan_api_initialise", init_payload)
assert not init_result.isError
assert init_result.structuredContent is not None
selection = init_result.structuredContent["selection"]
assert selection["country"]
assert selection["location"]
query = os.getenv("CKAN_TEST_SEARCH_QUERY", "traffic")
search_result = await session.call_tool(
"search_datasets", {"query": query, "limit": 3, "rows": 3}
)
assert not search_result.isError
structured = search_result.structuredContent
assert structured is not None
assert structured["returned_count"] > 0
assert structured["datasets"][0]["url"].startswith("http")