"""
CKAN Tools for CKAN Open Data MCP Server
This module provides comprehensive access to CKAN's CKAN-powered open data portal
through 11 specialized tools for data discovery, analysis, retrieval, and local downloads.
"""
import asyncio
import bz2
import gzip
import json
import logging
import lzma
import os
import shutil
import tarfile
import time
import zipfile
from collections.abc import AsyncIterator, Awaitable, Callable, Mapping
from collections.abc import Mapping as MappingABC
from contextlib import asynccontextmanager, closing
from pathlib import Path
from textwrap import dedent
from typing import IO, Any, cast
from urllib.parse import urlencode, urlparse
import aiohttp
from mcp.server import Server
from mcp.types import Tool
from .config_selection import CkanConfigCatalog
from .helpers import (
DatasetUrlBuilder,
RelevanceScorer,
SummaryBuilder,
UpdateFrequencyAnalyzer,
create_package_summary,
)
from .session_state import MissingSessionConfigError, SessionConfigStore
from .types import (
CkanDatastoreResult,
CkanGroup,
CkanOrganization,
CkanPackage,
CkanResource,
CkanSearchResult,
CkanToolsConfig,
ToolContent,
ToolResponse,
)
logger = logging.getLogger(__name__)
ToolHandler = Callable[[dict[str, Any]], Awaitable[ToolResponse]]
RATE_LIMIT_HEADER_PREFIXES = ("x-rate-limit", "x-ckan-rate-limit")
RATE_LIMIT_HEADER_NAMES = {
"retry-after",
"retry_after",
"x-ratelimit-limit",
"x-ratelimit-remaining",
"x-ratelimit-reset",
}
ZIP_MIME_TYPES = {"application/zip", "application/x-zip-compressed"}
TAR_MIME_TYPES = {"application/x-tar", "application/x-gtar"}
COMPRESSED_STREAM_MIME_TYPES = {
"application/gzip": "gzip",
"application/x-gzip": "gzip",
"application/x-bzip2": "bzip2",
"application/x-xz": "xz",
}
SUPPORTED_ARCHIVE_MIME_TYPES = (
ZIP_MIME_TYPES | TAR_MIME_TYPES | set(COMPRESSED_STREAM_MIME_TYPES.keys())
)
EXTRACTION_PREVIEW_LIMIT = 10
PathLikeStr = str | os.PathLike[str]
ArchiveOpener = Callable[[PathLikeStr, str], IO[bytes]]
def _extract_rate_limit_headers(headers: Mapping[str, str] | None) -> dict[str, str]:
"""Pull rate-limit and retry hints from CKAN responses."""
extracted: dict[str, str] = {}
if headers is None or not isinstance(headers, MappingABC):
return extracted
for key, value in headers.items():
lower_key = key.lower()
if lower_key in RATE_LIMIT_HEADER_NAMES or any(
lower_key.startswith(prefix) for prefix in RATE_LIMIT_HEADER_PREFIXES
):
extracted[key] = value
return extracted
def _normalize_action_base_url(base_url: str) -> str:
"""Ensure CKAN base URLs point to the /action endpoint."""
trimmed = (base_url or "").strip().rstrip("/")
if not trimmed:
raise ValueError("A CKAN base URL must be provided.")
if trimmed.endswith("/action"):
return trimmed
if trimmed.endswith("action"):
return trimmed
return f"{trimmed}/action"
def _build_request_url(base_url: str, query_items: list[tuple[str, str]] | None) -> str:
"""Attach query parameters to a base URL for debugging."""
if not query_items:
return base_url
query_string = urlencode(query_items, doseq=True)
return f"{base_url}?{query_string}" if query_string else base_url
class CkanApiError(Exception):
"""Custom exception for CKAN API errors."""
def __init__(
self,
message: str,
*,
status_code: int | None = None,
error_payload: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
url: str | None = None,
):
super().__init__(message)
self.message = message
self.status_code = status_code
self.error_payload = error_payload or {}
self.headers = headers or {}
self.url = url
def to_dict(self) -> dict[str, Any]:
"""Return a structured representation of the error."""
payload: dict[str, Any] = {
"message": self.message,
"status_code": self.status_code,
}
if self.url:
payload["url"] = self.url
if self.error_payload:
payload["error"] = self.error_payload
if self.headers:
payload["headers"] = self.headers
return payload
class CkanToolsManager:
"""Manages CKAN API configuration and connections."""
def __init__(self, config: CkanToolsConfig):
"""Initialize with CKAN configuration."""
self.config = config
self.scorer = RelevanceScorer(config)
self.analyzer = UpdateFrequencyAnalyzer(config)
self.summary_builder = SummaryBuilder(config)
self.url_builder = DatasetUrlBuilder(config)
@asynccontextmanager
async def _create_session(self) -> AsyncIterator[aiohttp.ClientSession]:
"""Create a configured aiohttp session for CKAN API calls."""
timeout_seconds = max(self.config.request_timeout / 1000.0, 0.001)
timeout = aiohttp.ClientTimeout(total=timeout_seconds)
connector = aiohttp.TCPConnector(limit=20)
async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session:
yield session
def _build_headers(self) -> dict[str, str]:
"""Build headers for CKAN API requests."""
headers = {"Accept": "application/json", "User-Agent": "CKAN-MCP-Server/1.0.0"}
if self.config.api_key:
headers["X-CKAN-API-Key"] = self.config.api_key
return headers
def _sanitize_payload(self, payload: dict[str, Any] | None) -> dict[str, Any]:
"""Remove None values from payloads before sending to CKAN."""
sanitized: dict[str, Any] = {}
for key, value in (payload or {}).items():
if value is None:
continue
if isinstance(value, list):
filtered = [item for item in value if item is not None]
if filtered:
sanitized[key] = filtered
else:
sanitized[key] = value
return sanitized
def _prepare_query_params(self, params: dict[str, Any]) -> list[tuple[str, str]]:
"""Convert a payload dictionary into query parameter tuples."""
query_items: list[tuple[str, str]] = []
def _stringify(value: Any) -> str:
if isinstance(value, bool):
return "true" if value else "false"
return str(value)
for key, value in params.items():
if isinstance(value, (list, tuple)):
for item in value:
if item is not None:
query_items.append((key, _stringify(item)))
elif value is not None:
query_items.append((key, _stringify(value)))
return query_items
def normalize_base_url(self, base_url: str) -> str:
"""Normalize CKAN base URL."""
trimmed = base_url.strip()
if not trimmed:
raise ValueError("CKAN_BASE_URL must be provided")
return trimmed.rstrip("/")
def normalize_site_url(self, site_url: str | None) -> str | None:
"""Normalize CKAN site URL."""
if not site_url:
return None
trimmed = site_url.strip()
return trimmed.rstrip("/") if trimmed else None
def build_endpoint_url(self, endpoint: str) -> str:
"""Build full CKAN API endpoint URL."""
normalized_endpoint = endpoint.lstrip("/")
return f"{self.config.ckan_base_url}/{normalized_endpoint}"
async def _api_call(
self,
action: str,
session: aiohttp.ClientSession,
*,
params: dict[str, Any] | None = None,
method: str | None = None,
) -> Any:
"""Make API call to CKAN endpoint with error handling."""
url = self.build_endpoint_url(action)
payload = self._sanitize_payload(params)
timeout = aiohttp.ClientTimeout(total=self.config.request_timeout / 1000)
headers = self._build_headers()
transport = (method or self.config.action_transport).lower()
request_url = url
if transport not in {"get", "post"}:
raise ValueError(f"Unsupported transport '{transport}' for CKAN API calls.")
request_context: Any
if transport == "get":
query_params = self._prepare_query_params(payload)
request_url = _build_request_url(url, query_params)
request_context = session.get(
url,
params=query_params or None,
headers=headers,
timeout=timeout,
)
else:
request_context = session.post(
url,
json=payload or {},
headers=headers,
timeout=timeout,
)
try:
async with request_context as response:
header_hints = _extract_rate_limit_headers(response.headers)
if not response.ok:
error_payload: dict[str, Any] = {}
error_message = f"CKAN API HTTP error: {response.status} {response.reason}"
raw_text = await response.text()
if raw_text:
try:
error_data = json.loads(raw_text)
if isinstance(error_data, dict):
error_payload = error_data.get("error") or error_data
if isinstance(error_payload, dict):
error_message = error_payload.get("message", error_message)
else:
error_payload = {"message": raw_text.strip()}
error_message = raw_text.strip()
except json.JSONDecodeError:
error_payload = {"message": raw_text.strip()}
error_message = raw_text.strip()
raise CkanApiError(
error_message,
status_code=response.status,
error_payload=error_payload if isinstance(error_payload, dict) else {},
headers=header_hints,
url=request_url,
)
data = await response.json()
if not data.get("success", False):
error_payload = data.get("error") or {}
error_message = error_payload.get("message", "Unknown API error")
raise CkanApiError(
error_message,
status_code=response.status,
error_payload=error_payload if isinstance(error_payload, dict) else {},
headers=header_hints,
url=request_url,
)
return data.get("result", {})
except aiohttp.ClientError as e:
logger.error(f"CKAN API call failed for {action}: {e}")
raise CkanApiError(f"Failed to fetch from CKAN API: {e}", url=request_url) from e
# API wrapper functions
async def fetch_package(self, package_id: str, session: aiohttp.ClientSession) -> CkanPackage:
"""Fetch a specific package by ID."""
result = await self._api_call("package_show", session, params={"id": package_id})
return CkanPackage(**result)
async def fetch_resource(
self,
resource_id: str,
session: aiohttp.ClientSession,
limit: int = 10,
offset: int = 0,
*,
params: dict[str, Any] | None = None,
) -> CkanDatastoreResult:
"""Fetch datastore records for a resource."""
payload = dict(params or {})
resource_key = "id" if self.config.datastore_id_alias else "resource_id"
payload.pop("resource_id", None)
payload.pop("id", None)
payload[resource_key] = resource_id
payload.setdefault("limit", limit)
payload.setdefault("offset", offset)
result = await self._api_call("datastore_search", session, params=payload)
return CkanDatastoreResult(**result)
async def fetch_package_list(
self, session: aiohttp.ClientSession, *, limit: int = 50, offset: int = 0
) -> list[str]:
"""Fetch list of all package IDs."""
payload = {"limit": limit, "offset": offset}
result = await self._api_call("package_list", session, params=payload)
if not isinstance(result, list):
return []
return [str(package_id) for package_id in result]
async def fetch_package_search(
self,
query: str,
session: aiohttp.ClientSession,
*,
rows: int | None = None,
start: int | None = None,
fq: str | None = None,
sort: str | None = None,
include_private: bool | None = None,
facet_fields: list[str] | None = None,
extra_params: dict[str, Any] | None = None,
) -> CkanSearchResult:
"""Search packages with configurable options."""
capped_rows = rows if rows is not None else self.config.default_search_rows
if self.config.max_search_rows:
capped_rows = min(capped_rows, self.config.max_search_rows)
capped_rows = max(capped_rows, 0)
payload: dict[str, Any] = {"q": query, "rows": capped_rows}
if start is not None:
payload["start"] = start
if fq:
payload["fq"] = fq
if sort:
payload["sort"] = sort
if include_private is not None:
payload["include_private"] = include_private
if facet_fields:
payload["facet"] = True
payload["facet.field"] = facet_fields
if extra_params:
for key, value in extra_params.items():
if key not in payload and value is not None:
payload[key] = value
result = await self._api_call("package_search", session, params=payload)
return CkanSearchResult(**result)
async def fetch_organizations(self, session: aiohttp.ClientSession) -> list[CkanOrganization]:
"""Fetch all organizations."""
result = await self._api_call("organization_list", session, params={"all_fields": True})
if not isinstance(result, list):
raise CkanApiError("organization_list returned unexpected payload format")
organizations: list[CkanOrganization] = []
for org in result:
if isinstance(org, MappingABC):
organizations.append(CkanOrganization(**dict(org)))
return organizations
async def fetch_groups(self, session: aiohttp.ClientSession) -> list[CkanGroup]:
"""Fetch all groups."""
result = await self._api_call("group_list", session, params={"all_fields": True})
if not isinstance(result, list):
raise CkanApiError("group_list returned unexpected payload format")
groups: list[CkanGroup] = []
for group in result:
if isinstance(group, MappingABC):
groups.append(CkanGroup(**dict(group)))
return groups
async def fetch_resource_info(
self, resource_id: str, session: aiohttp.ClientSession
) -> CkanResource:
"""Fetch resource information."""
result = await self._api_call("resource_show", session, params={"id": resource_id})
return CkanResource(**result)
async def fetch_datastore_info(
self, resource_id: str, session: aiohttp.ClientSession
) -> CkanDatastoreResult:
"""Fetch datastore information (metadata only)."""
resource_key = "id" if self.config.datastore_id_alias else "resource_id"
result = await self._api_call(
"datastore_search",
session,
params={resource_key: resource_id, "limit": 0},
)
return CkanDatastoreResult(**result)
async def download_dataset_locally(
self,
package_id: str,
*,
resource_id: str | None = None,
preferred_format: str | None = None,
download_timeout: int = 120,
) -> dict[str, Any]:
"""Download a dataset resource, metadata, and usage guide to local storage."""
async with self._create_session() as session:
package = await self.fetch_package(package_id, session)
resource = _select_resource_to_download(
package.resources,
resource_id=resource_id,
preferred_format=preferred_format,
)
if resource is None:
raise ValueError("The requested dataset has no downloadable resources.")
dataset_dir = _resolve_dataset_directory(package.name or package.id)
dataset_path = dataset_dir / _derive_resource_filename(resource)
metadata_path = dataset_dir / "metadata.json"
await _download_file_with_curl(resource.url, dataset_path, download_timeout)
verified_mime_type = await _detect_file_mime_type(dataset_path)
extraction_directory: Path | None = None
extracted_files: list[Path] = []
if _is_archive_mime_type(verified_mime_type):
extraction_directory, extracted_files = _extract_archive_contents(
dataset_path, dataset_dir, verified_mime_type
)
sample_data_path: Path = extracted_files[0] if extracted_files else dataset_path
metadata_payload = (
package.model_dump() if hasattr(package, "model_dump") else package.dict()
)
with metadata_path.open("w", encoding="utf-8") as metadata_file:
json.dump(metadata_payload, metadata_file, indent=2, default=str)
fields: list[str] | None = None
record_count: int | None = None
if resource.datastore_active:
try:
datastore_info = await self.fetch_datastore_info(resource.id, session)
fields = [field.id for field in datastore_info.fields]
record_count = datastore_info.total
except Exception as datastore_error:
logger.warning(
"Unable to fetch datastore info for resource %s: %s",
resource.id,
datastore_error,
)
dataset_url = self.url_builder.build_dataset_url(package)
how_to_content = _build_how_to_markdown(
package=package,
resource=resource,
dataset_path=dataset_path,
metadata_path=metadata_path,
dataset_url=dataset_url,
analyzer=self.analyzer,
fields=fields,
record_count=record_count,
verified_mime_type=verified_mime_type,
extraction_directory=extraction_directory,
extracted_files=extracted_files,
sample_data_path=sample_data_path,
)
how_to_path = dataset_dir / "how_to_use.md"
how_to_path.write_text(how_to_content, encoding="utf-8")
return {
"dataset": {
"id": package.id,
"name": package.name,
"title": package.title,
"resource_id": resource.id,
"resource_name": resource.name,
"resource_format": resource.format,
"download_path": str(dataset_path),
"metadata_path": str(metadata_path),
"how_to_path": str(how_to_path),
"dataset_url": dataset_url,
"record_count": record_count,
"verified_mime_type": verified_mime_type,
"extraction_directory": (
str(extraction_directory) if extraction_directory else None
),
"extracted_files": [str(path) for path in extracted_files],
"last_updated": resource.last_modified or package.metadata_modified,
},
"storage_directory": str(dataset_dir),
"sample_data_path": str(sample_data_path),
}
def create_tool_response(data: Any, *, is_error: bool = False) -> ToolResponse:
"""Create a standard MCP tool response."""
structured_content: dict[str, Any] | None = None
if hasattr(data, "model_dump"):
structured_content = data.model_dump()
elif isinstance(data, dict):
structured_content = data
content = ToolContent(
type="text", text=json.dumps(structured_content or data, indent=2, default=str)
)
return ToolResponse(content=[content], structuredContent=structured_content, isError=is_error)
def _build_error_response(message: str, exc: Exception | None = None) -> ToolResponse:
"""Create a standardized error response with optional CKAN metadata."""
payload: dict[str, Any] = {"error": message}
if isinstance(exc, CkanApiError):
payload["ckanError"] = exc.to_dict()
if exc.url:
payload["url"] = exc.url
elif exc is not None:
payload["details"] = str(exc)
return create_tool_response(payload, is_error=True)
def _session_not_initialized_response() -> ToolResponse:
"""Standard response when CKAN is not initialised for the session."""
return create_tool_response(
{
"error": "CKAN API is not initialised for this session.",
"action": "Use the 'ckan_api_initialise' tool to select a portal.",
},
is_error=True,
)
def _config_public_view(config: CkanToolsConfig) -> dict[str, Any]:
"""Return a sanitized view of the CKAN configuration without exposing secrets."""
config_data = config.model_dump()
if config_data.get("api_key"):
config_data["api_key"] = "***"
return config_data
DATASTORE_ARGUMENT_MAP = {
"filters": "filters",
"q": "q",
"plain": "plain",
"distinct": "distinct",
"fields": "fields",
"sort": "sort",
"includeTotal": "include_total",
"recordsFormat": "records_format",
}
SEARCH_ARGUMENT_MAP = {
"start": "start",
"fq": "fq",
"sort": "sort",
"includePrivate": "include_private",
}
def _collect_arguments(
arguments: dict[str, Any], mapping: dict[str, str], *, prefix: str | None = None
) -> dict[str, Any]:
"""Collect whitelisted arguments and map them to CKAN parameter names."""
collected: dict[str, Any] = {}
for arg_key, api_key in mapping.items():
lookup = arg_key
if prefix:
lookup = f"{prefix}{arg_key[0].upper()}{arg_key[1:]}"
if lookup in arguments and arguments[lookup] is not None:
collected[api_key] = arguments[lookup]
return collected
def _get_extra_search_params(
arguments: dict[str, Any],
) -> tuple[dict[str, Any] | None, ToolResponse | None]:
"""Validate the extraSearchParams structure used by search-based tools."""
if "extraSearchParams" not in arguments:
return None, None
raw_params = arguments.get("extraSearchParams")
if raw_params is None:
return None, None
if not isinstance(raw_params, dict):
return None, _build_error_response(
"extraSearchParams must be provided as an object/dictionary."
)
filtered = {key: value for key, value in raw_params.items() if value is not None}
return filtered or None, None
def _build_query_suggestions(insights: list[dict[str, Any]]) -> dict[str, list[str]]:
"""Construct organization and tag suggestions from dataset insights."""
organizations: set[str] = set()
tags: set[str] = set()
for insight in insights:
org_value = insight.get("organization")
if isinstance(org_value, str) and org_value:
organizations.add(org_value)
tag_values = insight.get("tags")
if isinstance(tag_values, list):
tags.update(tag for tag in tag_values if isinstance(tag, str))
return {
"organizations": list(organizations),
"common_tags": list(tags)[:10],
}
def _wrap_with_manager(
session_store: SessionConfigStore,
callback: Callable[[CkanToolsManager, dict[str, Any]], Awaitable[ToolResponse]],
) -> ToolHandler:
"""Create a handler that instantiates a manager for the active session."""
async def handler(arguments: dict[str, Any]) -> ToolResponse:
try:
config = session_store.require_config()
except MissingSessionConfigError:
return _session_not_initialized_response()
manager = CkanToolsManager(config)
return await callback(manager, arguments)
return handler
def _sanitize_path_segment(value: str, fallback: str = "dataset") -> str:
"""Generate a filesystem-safe name for directories/files."""
normalized = value.strip().lower()
if not normalized:
normalized = fallback
safe_chars = []
for char in normalized:
if char.isalnum() or char in ("-", "_"):
safe_chars.append(char)
else:
safe_chars.append("_")
sanitized = "".join(safe_chars).strip("_")
return sanitized or fallback
def _resolve_dataset_directory(dataset_name: str) -> Path:
"""Determine where to store datasets locally."""
base_path = os.getenv("CKAN_MCP_LOCAL_DATASTORE")
base_dir = Path(base_path).expanduser() if base_path else Path.cwd()
dataset_dir = base_dir / _sanitize_path_segment(dataset_name or "dataset")
dataset_dir.mkdir(parents=True, exist_ok=True)
return dataset_dir
def _derive_resource_filename(resource: CkanResource) -> str:
"""Derive a sensible filename for a downloaded resource."""
candidate_name = resource.name or resource.id
sanitized_name = _sanitize_path_segment(candidate_name, fallback="resource")
extension = ""
if resource.format:
extension = resource.format.lower().lstrip(".")
else:
parsed = urlparse(resource.url)
extension = Path(parsed.path).suffix.lstrip(".")
if extension:
return f"{sanitized_name}.{extension}"
return sanitized_name
def _build_python_reader_snippet(resource: CkanResource, dataset_path: Path) -> str:
"""Create a Python snippet for reading the downloaded dataset."""
fmt = (resource.format or "").lower()
path_str = str(dataset_path)
if fmt in {"csv", "txt"}:
return dedent(
f"""
import pandas as pd
df = pd.read_csv(r"{path_str}")
print(df.head())
"""
).strip()
if fmt == "tsv":
return dedent(
f"""
import pandas as pd
df = pd.read_csv(r"{path_str}", sep="\\t")
print(df.head())
"""
).strip()
if fmt in {"json"}:
return dedent(
f"""
import pandas as pd
df = pd.read_json(r"{path_str}")
print(df.head())
"""
).strip()
if fmt in {"xlsx", "xls"}:
return dedent(
f"""
import pandas as pd
df = pd.read_excel(r"{path_str}")
print(df.head())
"""
).strip()
return dedent(
f"""
# Fallback reader for {resource.format or "unknown"} files
with open(r"{path_str}", "rb") as raw_file:
data = raw_file.read()
print(f"Loaded {{len(data)}} bytes from {path_str}")
"""
).strip()
def _build_how_to_markdown(
package: CkanPackage,
resource: CkanResource,
dataset_path: Path,
metadata_path: Path,
dataset_url: str,
analyzer: UpdateFrequencyAnalyzer,
fields: list[str] | None = None,
record_count: int | None = None,
*,
verified_mime_type: str | None = None,
extraction_directory: Path | None = None,
extracted_files: list[Path] | None = None,
sample_data_path: Path | None = None,
) -> str:
"""Create guidance explaining how to use the downloaded dataset."""
description = (package.notes or "No description provided by the publisher.").strip()
organization = package.organization.title if package.organization else "Unknown organization"
last_updated = resource.last_modified or package.metadata_modified
data_range = (
f"{package.metadata_created} – {last_updated}" if last_updated else package.metadata_created
)
update_frequency = analyzer.categorize(package)
field_section = ""
if fields:
preview_fields = "\n".join(f"- {field}" for field in fields[:25])
field_section = f"The datastore lists {len(fields)} fields. Key columns:\n{preview_fields}"
else:
field_section = (
"Field-level metadata is not available from CKAN's datastore API for this resource. "
"Inspect the downloaded file to discover the schema."
)
limitations = resource.description or (
"The publishing agency did not supply explicit limitations in the metadata. "
"Use the dataset page to confirm coverage, aggregation rules, and any caveats."
)
data_source_path = sample_data_path or dataset_path
python_snippet = _build_python_reader_snippet(resource, data_source_path)
verified_mime = verified_mime_type or "Unavailable"
extraction_lines = (
"- **Extracted Files Directory**: Not applicable; the download already contains the data."
)
if extraction_directory:
extraction_lines = f"- **Extracted Files Directory**: {extraction_directory}"
preview_files: list[str] = []
if extracted_files:
preview_files = [str(path) for path in extracted_files[:EXTRACTION_PREVIEW_LIMIT]]
if preview_files:
preview_section = "\n".join(f" - {path}" for path in preview_files)
extraction_lines += f"\n- **Extracted File Preview (first {len(preview_files)} entries)**:\n{preview_section}"
else:
extraction_lines += (
"\n- **Extracted File Preview**: No files were created during extraction."
)
primary_data_line = ""
if data_source_path != dataset_path:
primary_data_line = f"- **Primary Extracted File**: {data_source_path}"
content = f"""# How to Use “{package.title}”
## Dataset Overview
{description}
- **Organization**: {organization}
- **Source URL**: {dataset_url or "Not provided"}
- **Update Frequency**: {update_frequency}
- **Last Updated**: {last_updated or "Unavailable"}
- **Data Range**: {data_range}
## File Details
- **Resource Name**: {resource.name}
- **Format**: {resource.format or "Unknown"}
- **Verified MIME Type**: {verified_mime}
- **Local File**: {dataset_path}
- **Metadata File**: {metadata_path}
- **Record Count**: {record_count if record_count is not None else "Unknown"}
{primary_data_line}
- **Extraction Notes**:
{extraction_lines}
## Reading the File in Python
```python
{python_snippet}
```
## File Structure and Contents
{field_section}
## Dataset Context & Limitations
{limitations}
"""
return dedent(content).strip() + "\n"
async def _download_file_with_curl(url: str, destination: Path, timeout_seconds: int = 120) -> None:
"""Download a file using curl."""
destination.parent.mkdir(parents=True, exist_ok=True)
timeout_seconds = max(timeout_seconds, 1)
cmd = [
"curl",
"--fail",
"--location",
"--silent",
"--show-error",
"--max-time",
str(timeout_seconds),
"-o",
str(destination),
url,
]
try:
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
except FileNotFoundError as exc:
raise RuntimeError(
"curl is required to download datasets but was not found in PATH"
) from exc
try:
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=timeout_seconds)
except TimeoutError as exc:
process.kill()
await process.communicate()
raise RuntimeError(f"curl download timed out after {timeout_seconds} seconds") from exc
if process.returncode != 0:
error_output = (stderr or stdout or b"").decode().strip()
raise RuntimeError(f"curl failed with exit code {process.returncode}: {error_output}")
async def _detect_file_mime_type(file_path: Path) -> str:
"""Use the `file` command to determine a file's MIME type."""
cmd = ["file", "--mime-type", "-b", str(file_path)]
try:
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
except FileNotFoundError as exc:
raise RuntimeError("The `file` command is required to inspect dataset downloads.") from exc
stdout, stderr = await process.communicate()
if process.returncode != 0:
error_output = (stderr or stdout or b"").decode().strip()
raise RuntimeError(f"file failed to determine MIME type: {error_output}")
return stdout.decode().strip()
def _normalize_mime_type(mime_type: str | None) -> str:
return (mime_type or "").strip().lower()
def _is_archive_mime_type(mime_type: str | None) -> bool:
return _normalize_mime_type(mime_type) in SUPPORTED_ARCHIVE_MIME_TYPES
def _extract_archive_contents(
archive_path: Path, dataset_dir: Path, mime_type: str
) -> tuple[Path | None, list[Path]]:
"""Extract archive contents into a dedicated directory inside dataset_dir."""
normalized_mime = _normalize_mime_type(mime_type)
if normalized_mime not in SUPPORTED_ARCHIVE_MIME_TYPES:
return None, []
extraction_dir = dataset_dir / "extracted" / archive_path.stem
if extraction_dir.exists():
if extraction_dir.is_dir():
shutil.rmtree(extraction_dir)
else:
extraction_dir.unlink()
extraction_dir.mkdir(parents=True, exist_ok=True)
if normalized_mime in ZIP_MIME_TYPES:
if not zipfile.is_zipfile(archive_path):
raise RuntimeError(
"CKAN reported an archive download but the file is not a valid zip archive."
)
with zipfile.ZipFile(archive_path, "r") as zip_file:
extracted_files = _safe_extract_zip(zip_file, extraction_dir)
return extraction_dir, extracted_files
if normalized_mime in TAR_MIME_TYPES:
with tarfile.open(archive_path, "r:*") as tar_file:
extracted_files = _safe_extract_tar(tar_file, extraction_dir)
return extraction_dir, extracted_files
if normalized_mime in COMPRESSED_STREAM_MIME_TYPES:
compression_kind = COMPRESSED_STREAM_MIME_TYPES[normalized_mime]
if tarfile.is_tarfile(archive_path):
with tarfile.open(archive_path, "r:*") as tar_file:
extracted_files = _safe_extract_tar(tar_file, extraction_dir)
return extraction_dir, extracted_files
extracted_files = _decompress_single_file(
archive_path, extraction_dir, compression=compression_kind
)
return extraction_dir, extracted_files
return None, []
def _ensure_within_directory(directory: Path, target: Path) -> None:
directory_resolved = directory.resolve()
target_resolved = target.resolve()
if not target_resolved.is_relative_to(directory_resolved):
raise RuntimeError(
"Archive extraction aborted because a file attempted to escape the target directory."
)
def _safe_extract_zip(zip_file: zipfile.ZipFile, destination: Path) -> list[Path]:
extracted_files: list[Path] = []
for member in zip_file.infolist():
member_path = destination / member.filename
_ensure_within_directory(destination, member_path)
if member.is_dir():
member_path.mkdir(parents=True, exist_ok=True)
continue
member_path.parent.mkdir(parents=True, exist_ok=True)
with zip_file.open(member, "r") as source, member_path.open("wb") as target:
shutil.copyfileobj(source, target)
extracted_files.append(member_path.resolve())
return extracted_files
def _safe_extract_tar(tar_file: tarfile.TarFile, destination: Path) -> list[Path]:
extracted_files: list[Path] = []
for member in tar_file.getmembers():
member_path = destination / member.name
_ensure_within_directory(destination, member_path)
if member.isdir():
member_path.mkdir(parents=True, exist_ok=True)
continue
if member.issym() or member.islnk():
# Skip symbolic links for safety.
continue
file_obj = tar_file.extractfile(member)
if file_obj is None:
continue
member_path.parent.mkdir(parents=True, exist_ok=True)
with closing(file_obj) as source, member_path.open("wb") as target:
shutil.copyfileobj(source, target)
extracted_files.append(member_path.resolve())
return extracted_files
def _decompress_single_file(
archive_path: Path, destination: Path, *, compression: str
) -> list[Path]:
destination.mkdir(parents=True, exist_ok=True)
output_path = destination / archive_path.stem
if output_path.exists():
if output_path.is_dir():
shutil.rmtree(output_path)
else:
output_path.unlink()
opener_map: dict[str, ArchiveOpener] = {
"gzip": cast(ArchiveOpener, gzip.open),
"bzip2": cast(ArchiveOpener, bz2.open),
"xz": cast(ArchiveOpener, lzma.open),
}
opener = opener_map.get(compression)
if opener is None:
raise RuntimeError(f"Unsupported compression type: {compression}")
with opener(archive_path, "rb") as source, output_path.open("wb") as target:
shutil.copyfileobj(source, target)
return [output_path.resolve()]
def _select_resource_to_download(
resources: list[CkanResource],
resource_id: str | None = None,
preferred_format: str | None = None,
) -> CkanResource | None:
"""Select the most appropriate resource to download."""
if not resources:
return None
if resource_id:
for resource in resources:
if resource.id == resource_id:
return resource
raise ValueError(f"Resource '{resource_id}' was not found in the dataset.")
preferred_format_lower = (preferred_format or "").lower()
if preferred_format_lower:
format_matches = [
resource
for resource in resources
if resource.format and resource.format.lower() == preferred_format_lower
]
if format_matches:
datastore_matches = [r for r in format_matches if r.datastore_active]
return datastore_matches[0] if datastore_matches else format_matches[0]
datastore_resources = [resource for resource in resources if resource.datastore_active]
if datastore_resources:
return datastore_resources[0]
return resources[0]
EXTENSION_INSIGHTS: dict[str, str] = {
"harvest": (
"Harvest extension is enabled; consider adding tools that inspect harvest sources or job status "
"because the MCP server currently focuses on core dataset APIs."
),
"spatial": (
"Spatial extension detected; bounding-box or coordinate-based search tools are not yet exposed by this MCP server."
),
"datapusher": (
"DataPusher is installed; there are no MCP tools to trigger uploads or monitor DataPusher queues."
),
}
MAX_AUDIT_DISCOVERY_ROWS = 60
def _summarize_exception(exc: Exception) -> str:
"""Return a concise, human-readable summary for audit diagnostics."""
if isinstance(exc, CkanApiError):
if exc.status_code:
return f"{exc.message} (HTTP {exc.status_code})"
return exc.message
return str(exc)
async def _probe_action_transport(
manager: CkanToolsManager, session: aiohttp.ClientSession
) -> dict[str, Any]:
"""Check whether the portal accepts GET or POST calls for CKAN actions."""
method_results: list[dict[str, Any]] = []
supports: dict[str, bool] = {"get": False, "post": False}
for method in ("get", "post"):
start = time.perf_counter()
try:
await manager._api_call("package_list", session, params={"limit": 1}, method=method)
duration_ms = round((time.perf_counter() - start) * 1000, 2)
method_results.append(
{
"method": method,
"success": True,
"status": 200,
"durationMs": duration_ms,
"error": None,
}
)
supports[method] = True
except CkanApiError as api_exc:
duration_ms = round((time.perf_counter() - start) * 1000, 2)
method_results.append(
{
"method": method,
"success": False,
"status": api_exc.status_code,
"durationMs": duration_ms,
"error": _summarize_exception(api_exc),
}
)
except Exception as exc:
duration_ms = round((time.perf_counter() - start) * 1000, 2)
method_results.append(
{
"method": method,
"success": False,
"status": None,
"durationMs": duration_ms,
"error": _summarize_exception(exc),
}
)
recommended: str | None
if supports["get"] and not supports["post"]:
recommended = "get"
elif supports["post"] and not supports["get"]:
recommended = "post"
elif supports["get"] and supports["post"]:
recommended = manager.config.action_transport
else:
recommended = None
notes: list[str] = []
if not supports["get"]:
notes.append("GET requests to the Action API failed during audit probes.")
if not supports["post"]:
notes.append("POST requests to the Action API failed during audit probes.")
if (
recommended
and recommended != manager.config.action_transport
and (supports["get"] != supports["post"])
):
notes.append(
f"Configured transport '{manager.config.action_transport}' did not match the only successful option '{recommended}'."
)
if recommended is None:
notes.append(
"Neither GET nor POST requests succeeded. Verify API credentials or network connectivity."
)
return {
"supportsGet": supports["get"],
"supportsPost": supports["post"],
"recommended": recommended,
"methodResults": method_results,
"notes": notes,
}
async def _probe_site_metadata(
manager: CkanToolsManager,
session: aiohttp.ClientSession,
preferred_method: str | None,
) -> dict[str, Any]:
"""Fetch site metadata and CKAN status to support configuration hints."""
notes: list[str] = []
site_url: str | None = None
site_title: str | None = None
site_description: str | None = None
ckan_version: str | None = None
extensions: list[str] | None = None
try:
site_read = await manager._api_call("site_read", session, method=preferred_method)
site_url = site_read.get("site_url") or site_read.get("site_url_root")
site_title = site_read.get("title") or site_read.get("site_title")
site_description = site_read.get("site_description") or site_read.get("description")
except Exception as exc:
notes.append(f"site_read failed: {_summarize_exception(exc)}")
try:
status_payload = await manager._api_call("status_show", session, method=preferred_method)
ckan_version = status_payload.get("ckan_version") or status_payload.get("version")
extensions = status_payload.get("extensions")
if not site_url:
site_url = status_payload.get("site_url")
if not site_title:
site_title = status_payload.get("site_title")
except Exception as exc:
notes.append(f"status_show failed: {_summarize_exception(exc)}")
return {
"siteUrl": site_url,
"siteTitle": site_title,
"siteDescription": site_description,
"ckanVersion": ckan_version,
"extensions": extensions,
"notes": notes,
}
async def _discover_datastore_resource(
manager: CkanToolsManager,
session: aiohttp.ClientSession,
*,
method: str | None,
package_id: str | None = None,
resource_id: str | None = None,
) -> dict[str, Any]:
"""Pick a datastore-backed resource to test alias behaviour."""
notes: list[str] = []
dataset_summary: dict[str, Any] | None = None
resource_summary: dict[str, Any] | None = None
discovery_method: str | None = None
async def _load_package(pkg_id: str) -> CkanPackage | None:
try:
payload = await manager._api_call(
"package_show", session, params={"id": pkg_id}, method=method
)
return CkanPackage(**payload)
except Exception as exc:
notes.append(f"package_show for '{pkg_id}' failed: {_summarize_exception(exc)}")
return None
async def _load_resource(res_id: str) -> dict[str, Any] | None:
try:
payload = await manager._api_call(
"resource_show", session, params={"id": res_id}, method=method
)
if not isinstance(payload, dict):
return None
return payload
except Exception as exc:
notes.append(f"resource_show for '{res_id}' failed: {_summarize_exception(exc)}")
return None
raw_resource: dict[str, Any] | None = None
if resource_id:
raw_resource = await _load_resource(resource_id)
if raw_resource:
resource = CkanResource(**raw_resource)
resource_summary = {
"id": resource.id,
"name": resource.name,
"format": resource.format,
"datastore_active": resource.datastore_active,
}
discovery_method = "resource_show"
dataset_id = raw_resource.get("package_id") or package_id
if dataset_id:
package = await _load_package(dataset_id)
if package:
dataset_summary = {
"id": package.id,
"name": package.name,
"title": package.title,
}
elif package_id:
package = await _load_package(package_id)
if package:
dataset_summary = {
"id": package.id,
"name": package.name,
"title": package.title,
}
if resource_summary is None and package_id:
package = await _load_package(package_id)
if package:
dataset_summary = {
"id": package.id,
"name": package.name,
"title": package.title,
}
datastore_resource = next(
(res for res in package.resources if res.datastore_active), None
)
if datastore_resource:
resource_summary = {
"id": datastore_resource.id,
"name": datastore_resource.name,
"format": datastore_resource.format,
"datastore_active": datastore_resource.datastore_active,
}
discovery_method = "package_show"
else:
notes.append(f"Package '{package_id}' does not expose any datastore resources.")
if resource_summary is None:
# Fall back to a broad search for any datastore-backed dataset.
rows = min(max(manager.config.default_search_rows, 10), MAX_AUDIT_DISCOVERY_ROWS)
try:
search_payload = await manager._api_call(
"package_search",
session,
params={"q": "*:*", "rows": rows},
method=method,
)
search_result = CkanSearchResult(**search_payload)
for dataset in search_result.results:
datastore_resource = next(
(res for res in dataset.resources if res.datastore_active), None
)
if datastore_resource:
dataset_summary = {
"id": dataset.id,
"name": dataset.name,
"title": dataset.title,
}
resource_summary = {
"id": datastore_resource.id,
"name": datastore_resource.name,
"format": datastore_resource.format,
"datastore_active": datastore_resource.datastore_active,
}
discovery_method = "package_search"
break
if resource_summary is None:
notes.append("No datastore resources were found in the first search page.")
except Exception as exc:
notes.append(f"package_search discovery failed: {_summarize_exception(exc)}")
return {
"dataset": dataset_summary,
"resource": resource_summary,
"notes": notes,
"discoveryMethod": discovery_method,
}
async def _probe_datastore_alias(
manager: CkanToolsManager,
session: aiohttp.ClientSession,
*,
method: str | None,
package_id: str | None = None,
resource_id: str | None = None,
) -> dict[str, Any]:
"""Validate datastore access pattern differences such as id aliases."""
discovery = await _discover_datastore_resource(
manager,
session,
method=method,
package_id=package_id,
resource_id=resource_id,
)
notes = list(discovery["notes"])
resource_summary = discovery["resource"]
dataset_summary = discovery["dataset"]
if not resource_summary:
notes.append("Unable to find a datastore-backed resource to probe.")
return {
"tested": False,
"supportsResourceId": None,
"supportsIdAlias": None,
"dataset": dataset_summary,
"resource": resource_summary,
"discoveryMethod": discovery["discoveryMethod"],
"testResults": [],
"notes": notes,
}
resource_id_value = resource_summary["id"]
test_results: list[dict[str, Any]] = []
supports_resource_id: bool | None = None
alias_supported: bool | None = None
for key in ("resource_id", "id"):
start = time.perf_counter()
try:
await manager._api_call(
"datastore_search",
session,
params={key: resource_id_value, "limit": 0},
method=method,
)
duration_ms = round((time.perf_counter() - start) * 1000, 2)
test_results.append(
{
"parameter": key,
"success": True,
"status": 200,
"durationMs": duration_ms,
"error": None,
}
)
if key == "resource_id":
supports_resource_id = True
else:
alias_supported = True
except CkanApiError as api_exc:
duration_ms = round((time.perf_counter() - start) * 1000, 2)
test_results.append(
{
"parameter": key,
"success": False,
"status": api_exc.status_code,
"durationMs": duration_ms,
"error": _summarize_exception(api_exc),
}
)
if key == "resource_id":
supports_resource_id = False
else:
alias_supported = False
except Exception as exc:
duration_ms = round((time.perf_counter() - start) * 1000, 2)
test_results.append(
{
"parameter": key,
"success": False,
"status": None,
"durationMs": duration_ms,
"error": _summarize_exception(exc),
}
)
if key == "resource_id":
supports_resource_id = False
else:
alias_supported = False
if supports_resource_id is False:
notes.append("datastore_search rejected the standard 'resource_id' parameter.")
if alias_supported:
notes.append("datastore_search accepted the 'id' alias, enabling compatibility mode.")
elif alias_supported is False:
notes.append(
"datastore_search rejected the 'id' alias; disable datastoreIdAlias overrides."
)
return {
"tested": True,
"supportsResourceId": supports_resource_id,
"supportsIdAlias": alias_supported,
"dataset": dataset_summary,
"resource": resource_summary,
"discoveryMethod": discovery["discoveryMethod"],
"testResults": test_results,
"notes": notes,
}
def _build_audit_recommendations(
config: CkanToolsConfig,
transport: dict[str, Any],
site_metadata: dict[str, Any],
datastore: dict[str, Any],
) -> dict[str, Any]:
"""Translate audit findings into practical config overrides and helper text."""
overrides: dict[str, Any] = {}
helper_notes: list[str] = []
notes: list[str] = []
recommended_transport = transport.get("recommended")
if recommended_transport and recommended_transport != config.action_transport:
overrides["action_transport"] = recommended_transport
helper_notes.append(
f"Use {recommended_transport.upper()} requests when calling the Action API."
)
notes.append(
f"Transport mismatch detected (configured {config.action_transport}, recommended {recommended_transport})."
)
alias_support = datastore.get("supportsIdAlias")
if alias_support is True and not config.datastore_id_alias:
overrides["datastore_id_alias"] = True
helper_notes.append("Portal accepts the datastore 'id' alias for compatibility queries.")
notes.append("Enable datastoreIdAlias for this portal.")
elif alias_support is False and config.datastore_id_alias:
overrides["datastore_id_alias"] = False
helper_notes.append("Use 'resource_id' when addressing datastore_search; 'id' is rejected.")
notes.append("Disable datastoreIdAlias for this portal.")
site_url = site_metadata.get("siteUrl")
normalized_site_url = site_url.rstrip("/") if isinstance(site_url, str) else None
normalized_config_site = (
config.ckan_site_url.rstrip("/") if isinstance(config.ckan_site_url, str) else None
)
if normalized_site_url and normalized_site_url != normalized_config_site:
overrides["ckan_site_url"] = normalized_site_url
notes.append("Set ckanSiteUrl override to the discovered site URL.")
if normalized_site_url and not config.dataset_page_url_template:
dataset_template = f"{normalized_site_url}/dataset/{{name}}"
overrides["dataset_page_url_template"] = dataset_template
notes.append("Populate datasetPageUrlTemplate using the portal's canonical site URL.")
helper_prompt: str | None = None
combined_prompts = helper_notes + transport.get("notes", []) + datastore.get("notes", [])
filtered_prompts = [prompt for prompt in combined_prompts if prompt]
if filtered_prompts:
helper_prompt = " ".join(dict.fromkeys(filtered_prompts))
overrides["helper_prompt"] = helper_prompt
overrides_payload = overrides or None
config_selection_snippet: str | None = None
if overrides_payload is not None:
config_selection_snippet = json.dumps(
{"overrides": overrides_payload}, indent=2, sort_keys=True
)
return {
"overrides": overrides_payload,
"helperPromptSuggestion": helper_prompt,
"configSelectionSnippet": config_selection_snippet,
"notes": notes,
}
def _derive_missing_functionality_notes(extensions: list[str] | None) -> list[str]:
"""Suggest MCP enhancements based on CKAN extensions that are installed remotely."""
if not extensions:
return []
notes: list[str] = []
seen: set[str] = set()
for extension in extensions:
lookup = extension.lower()
if lookup in seen:
continue
seen.add(lookup)
insight = EXTENSION_INSIGHTS.get(lookup)
if insight:
notes.append(insight)
return notes
async def register_ckan_tools(
server: Server, session_store: SessionConfigStore, config_catalog: CkanConfigCatalog
) -> None:
"""Register all CKAN tools with the MCP server."""
tools: list[Tool] = [
Tool(
name="ckan_api_initialise",
description="Select which CKAN portal this MCP session should use",
inputSchema={
"type": "object",
"properties": {
"country": {
"type": "string",
"description": "Country name (e.g., Canada, United Kingdom)",
},
"location": {
"type": "string",
"description": "Location within the country (e.g., Toronto)",
},
"resetContext": {
"type": "boolean",
"description": "When true, clears the current CKAN selection before applying a new one.",
"default": False,
},
"apiKey": {
"type": "string",
"description": "Optional CKAN API token for accessing restricted datasets.",
},
"overrides": {
"description": "Session-specific overrides for CKAN transport and metadata hints.",
"anyOf": [
{
"type": "object",
"properties": {
"ckanSiteUrl": {
"type": "string",
"description": "Override the CKAN site URL when the catalog entry does not include it.",
},
"datasetPageUrlTemplate": {
"type": "string",
"description": "Template for dataset pages (supports {id} and {name}).",
},
"defaultSearchRows": {
"type": "integer",
"description": "Override the default row count for package_search.",
},
"defaultPreviewLimit": {
"type": "integer",
"description": "Override preview record count for datastore samples.",
},
"actionTransport": {
"type": "string",
"enum": ["get", "post"],
"description": "Force GET or POST requests for the Action API.",
},
"datastoreIdAlias": {
"type": "boolean",
"description": "Set true when datastore endpoints accept the 'id' alias.",
},
"requiresApiKey": {
"type": "boolean",
"description": "Indicate that the CKAN portal requires API keys for access.",
},
"maxSearchRows": {
"type": "integer",
"description": "Set an upper bound for package_search row counts.",
},
"helperPrompt": {
"type": "string",
"description": "Helper text describing quirks of the CKAN instance.",
},
},
},
{
"type": "string",
"maxLength": 0,
"description": "Accept an empty string when the client cannot omit this optional object.",
},
],
},
},
},
),
Tool(
name="ckan_api_availability",
description="List the configured CKAN portals and show the current selection when available",
inputSchema={"type": "object", "properties": {}},
),
Tool(
name="audit_ckan_api",
description="Review the active CKAN endpoint for specification deviations and configuration overrides",
inputSchema={
"type": "object",
"properties": {
"packageId": {
"type": "string",
"description": "Optional dataset ID to target when testing datastore compatibility.",
},
"resourceId": {
"type": "string",
"description": "Optional datastore resource ID to use when verifying id/alias support.",
},
},
},
),
Tool(
name="get_package",
description="Fetch complete dataset metadata by ID",
inputSchema={
"type": "object",
"properties": {
"packageId": {
"type": "string",
"description": "The ID of the dataset to fetch",
},
"summary": {
"type": "boolean",
"description": "Whether to return a summary instead of full metadata",
"default": True,
},
},
"required": ["packageId"],
},
),
Tool(
name="get_first_datastore_resource_records",
description="Get records from the first active datastore resource in a dataset",
inputSchema={
"type": "object",
"properties": {
"packageId": {
"type": "string",
"description": "The ID of the dataset containing the resource",
},
"limit": {
"type": "integer",
"description": "Maximum number of records to return",
"default": 10,
},
"offset": {
"type": "integer",
"description": "Number of records to skip before returning results",
"default": 0,
},
"filters": {
"type": "object",
"description": "Datastore API filters to apply on the server (field:value mapping).",
},
"q": {
"type": "string",
"description": "Full-text query applied by CKAN's datastore_search endpoint.",
},
"plain": {
"type": "boolean",
"description": "When true, disables text highlighting in datastore results.",
},
"distinct": {
"type": "boolean",
"description": "Return only distinct rows from the datastore resource.",
},
"fields": {
"type": "array",
"items": {"type": "string"},
"description": "Subset of fields to return for each record.",
},
"sort": {
"type": "string",
"description": 'Sort expression understood by datastore_search (e.g., "column desc").',
},
"includeTotal": {
"type": "boolean",
"description": "Include the total record count from CKAN even when limit=0.",
},
"recordsFormat": {
"type": "string",
"description": "Optional datastore output format (e.g., objects or lists).",
},
},
"required": ["packageId"],
},
),
Tool(
name="get_resource_records",
description="Get records from a specific datastore resource",
inputSchema={
"type": "object",
"properties": {
"resourceId": {
"type": "string",
"description": "The ID of the resource to fetch records from",
},
"limit": {
"type": "integer",
"description": "Maximum number of records to return",
"default": 10,
},
"offset": {
"type": "integer",
"description": "Number of records to skip (for pagination)",
"default": 0,
},
"filters": {
"type": "object",
"description": "Datastore API filters to apply on the server (field:value mapping).",
},
"q": {
"type": "string",
"description": "Full-text query applied by CKAN's datastore_search endpoint.",
},
"plain": {
"type": "boolean",
"description": "When true, disables text highlighting in datastore results.",
},
"distinct": {
"type": "boolean",
"description": "Return only distinct rows from the datastore resource.",
},
"fields": {
"type": "array",
"items": {"type": "string"},
"description": "Subset of fields to return for each record.",
},
"sort": {
"type": "string",
"description": 'Sort expression understood by datastore_search (e.g., "column desc").',
},
"includeTotal": {
"type": "boolean",
"description": "Include the total record count from CKAN even when limit=0.",
},
"recordsFormat": {
"type": "string",
"description": "Optional datastore output format (e.g., objects or lists).",
},
},
"required": ["resourceId"],
},
),
Tool(
name="list_datasets",
description="List all available datasets with pagination",
inputSchema={
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "Maximum number of datasets to return",
"default": 50,
},
"offset": {
"type": "integer",
"description": "Number of datasets to skip (for pagination)",
"default": 0,
},
"includeTotal": {
"type": "boolean",
"description": "When true, queries CKAN for the total dataset count (extra request).",
"default": True,
},
},
},
),
Tool(
name="search_datasets",
description="Search datasets by keyword",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query to find relevant datasets",
},
"limit": {
"type": "integer",
"description": "Maximum number of results to return",
"default": 20,
},
"start": {
"type": "integer",
"description": "Offset into the CKAN result set (maps to Action API 'start').",
},
"fq": {
"type": "string",
"description": "Filter query to narrow search results using CKAN's Solr syntax.",
},
"sort": {
"type": "string",
"description": "Sort expression supported by package_search (e.g., 'metadata_modified desc').",
},
"rows": {
"type": "integer",
"description": "Number of CKAN rows to request before applying the limit in this tool.",
},
"extraSearchParams": {
"type": "object",
"description": "Additional CKAN package_search parameters to forward verbatim (e.g., include_drafts, fl, bf).",
},
"facetFields": {
"type": "array",
"items": {"type": "string"},
"description": "List of facet fields to request from CKAN.",
},
"includePrivate": {
"type": "boolean",
"description": "Set true when using an API key and you want private datasets included.",
},
},
"required": ["query"],
},
),
Tool(
name="find_relevant_datasets",
description="Intelligent dataset discovery with relevance scoring",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query for finding relevant datasets",
},
"maxResults": {
"type": "integer",
"description": "Maximum number of results to return",
"default": 20,
},
"includeRelevanceScore": {
"type": "boolean",
"description": "Whether to include relevance scores in results",
"default": True,
},
"start": {
"type": "integer",
"description": "Offset into the CKAN result set (maps to Action API 'start').",
},
"fq": {
"type": "string",
"description": "Filter query to narrow search results using CKAN's Solr syntax.",
},
"sort": {
"type": "string",
"description": "Sort expression supported by package_search.",
},
"extraSearchParams": {
"type": "object",
"description": "Additional CKAN package_search parameters to forward verbatim.",
},
"facetFields": {
"type": "array",
"items": {"type": "string"},
"description": "List of facet fields to request from CKAN (defaults to organization, groups, tags).",
},
"includePrivate": {
"type": "boolean",
"description": "Set true when using an API key and you want private datasets included.",
},
"rows": {
"type": "integer",
"description": "Override the number of CKAN rows requested before relevance re-ranking.",
},
},
"required": ["query"],
},
),
Tool(
name="analyze_dataset_updates",
description="Update frequency analysis with categorization",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query to find datasets for analysis (optional if packageIds provided)",
},
"packageIds": {
"type": "array",
"items": {"type": "string"},
"description": "Specific package IDs to analyze (optional if query provided)",
},
"groupByFrequency": {
"type": "boolean",
"description": "Whether to group results by update frequency",
"default": True,
},
"start": {
"type": "integer",
"description": "Offset into the CKAN search result set when using the query parameter.",
},
"fq": {
"type": "string",
"description": "Filter query for CKAN search when using the query parameter.",
},
"sort": {
"type": "string",
"description": "Sort expression supported by package_search when using the query parameter.",
},
"facetFields": {
"type": "array",
"items": {"type": "string"},
"description": "Facet fields to request alongside dataset search results.",
},
"includePrivate": {
"type": "boolean",
"description": "Set true when using an API key and private datasets should be included.",
},
"extraSearchParams": {
"type": "object",
"description": "Additional CKAN package_search parameters forwarded verbatim when using the query parameter.",
},
"searchRows": {
"type": "integer",
"description": "Maximum number of CKAN search rows to inspect when using the query parameter.",
"default": 100,
},
},
},
),
Tool(
name="analyze_dataset_structure",
description="Deep data structure analysis with field definitions",
inputSchema={
"type": "object",
"properties": {
"packageId": {
"type": "string",
"description": "The ID of the dataset to analyze",
},
"includeDataPreview": {
"type": "boolean",
"description": "Whether to include sample data records",
"default": False,
},
"previewLimit": {
"type": "integer",
"description": "Number of sample records to include",
"default": 5,
},
"previewOffset": {
"type": "integer",
"description": "Offset for the sample data preview",
"default": 0,
},
"previewFilters": {
"type": "object",
"description": "Datastore filters applied when fetching the preview sample.",
},
"previewQ": {
"type": "string",
"description": "Datastore full-text query for the preview sample.",
},
"previewPlain": {
"type": "boolean",
"description": "Disable text highlighting in preview results when true.",
},
"previewDistinct": {
"type": "boolean",
"description": "Return only distinct rows in the preview sample.",
},
"previewFields": {
"type": "array",
"items": {"type": "string"},
"description": "Subset of fields to include in the preview sample.",
},
"previewSort": {
"type": "string",
"description": "Sort expression for preview samples.",
},
"previewIncludeTotal": {
"type": "boolean",
"description": "Include the total record count in preview responses.",
},
"previewRecordsFormat": {
"type": "string",
"description": "Datastore preview records format (e.g., objects or lists).",
},
},
"required": ["packageId"],
},
),
Tool(
name="get_data_categories",
description="Explore organizations and topic groups",
inputSchema={"type": "object", "properties": {}},
),
Tool(
name="get_dataset_insights",
description="Comprehensive analysis combining multiple dimensions",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query for finding datasets to analyze",
},
"includeUpdateFrequency": {
"type": "boolean",
"description": "Whether to include update frequency analysis",
"default": True,
},
"includeDataStructure": {
"type": "boolean",
"description": "Whether to include data structure analysis",
"default": True,
},
"maxDatasets": {
"type": "integer",
"description": "Maximum number of datasets to analyze",
"default": 20,
},
"start": {
"type": "integer",
"description": "Offset into the CKAN result set (maps to Action API 'start').",
},
"fq": {
"type": "string",
"description": "Filter query to narrow search results using CKAN's Solr syntax.",
},
"sort": {
"type": "string",
"description": "Sort expression supported by package_search.",
},
"facetFields": {
"type": "array",
"items": {"type": "string"},
"description": "List of facet fields to request from CKAN (defaults to organization, groups, tags).",
},
"includePrivate": {
"type": "boolean",
"description": "Set true when using an API key and you want private datasets included.",
},
"rows": {
"type": "integer",
"description": "Override the number of CKAN rows requested before filtering for analysis.",
},
"extraSearchParams": {
"type": "object",
"description": "Additional CKAN package_search parameters to forward verbatim.",
},
},
"required": ["query"],
},
),
Tool(
name="download_dataset_locally",
description="Download a dataset resource, metadata, and usage guide to the local filesystem using curl",
inputSchema={
"type": "object",
"properties": {
"packageId": {
"type": "string",
"description": "The dataset ID or name to download",
},
"resourceId": {
"type": "string",
"description": "Optional ID of the specific resource to download",
},
"preferredFormat": {
"type": "string",
"description": "Preferred resource format to download (CSV, JSON, etc.)",
},
"downloadTimeoutSeconds": {
"type": "integer",
"description": "Maximum time allowed for the curl download",
"default": 120,
},
},
"required": ["packageId"],
},
),
]
tool_handlers: dict[str, ToolHandler] = {
"ckan_api_initialise": _handle_ckan_api_initialise(session_store, config_catalog),
"ckan_api_availability": _handle_ckan_api_availability(session_store, config_catalog),
"audit_ckan_api": _handle_audit_ckan_api(session_store),
"get_package": _handle_get_package(session_store),
"get_first_datastore_resource_records": _handle_get_first_datastore_resource_records(
session_store
),
"get_resource_records": _handle_get_resource_records(session_store),
"list_datasets": _handle_list_datasets(session_store),
"search_datasets": _handle_search_datasets(session_store),
"find_relevant_datasets": _handle_find_relevant_datasets(session_store),
"analyze_dataset_updates": _handle_analyze_dataset_updates(session_store),
"analyze_dataset_structure": _handle_analyze_dataset_structure(session_store),
"get_data_categories": _handle_get_data_categories(session_store),
"get_dataset_insights": _handle_get_dataset_insights(session_store),
"download_dataset_locally": _handle_download_dataset_locally(session_store),
}
async def list_ckan_tools() -> list[Tool]:
return tools
server.list_tools()(list_ckan_tools)
async def call_ckan_tool(tool_name: str, arguments: dict[str, Any]) -> ToolResponse:
handler = tool_handlers.get(tool_name)
if handler is None:
return create_tool_response({"error": f"Unknown CKAN tool: {tool_name}"}, is_error=True)
return await handler(arguments)
server.call_tool()(call_ckan_tool)
# Tool handler functions
def _handle_ckan_api_initialise(
session_store: SessionConfigStore, config_catalog: CkanConfigCatalog
) -> ToolHandler:
async def handler(arguments: dict[str, Any]) -> ToolResponse:
reset_requested = bool(arguments.get("resetContext", False))
country = cast(str | None, arguments.get("country"))
location = cast(str | None, arguments.get("location"))
override_key_map = {
"ckanSiteUrl": "ckan_site_url",
"datasetPageUrlTemplate": "dataset_page_url_template",
"defaultSearchRows": "default_search_rows",
"defaultPreviewLimit": "default_preview_limit",
"actionTransport": "action_transport",
"datastoreIdAlias": "datastore_id_alias",
"requiresApiKey": "requires_api_key",
"maxSearchRows": "max_search_rows",
"helperPrompt": "helper_prompt",
"apiKey": "api_key",
}
overrides_argument = arguments.get("overrides")
if isinstance(overrides_argument, str):
if overrides_argument.strip():
return create_tool_response(
{
"error": "Overrides must be provided as an object.",
"action": "Pass overrides as an object with nested keys like actionTransport.",
},
is_error=True,
)
overrides_argument = {}
raw_overrides = overrides_argument or {}
if raw_overrides and not isinstance(raw_overrides, dict):
return create_tool_response(
{
"error": "Overrides must be provided as an object.",
"action": "Pass overrides as an object with nested keys like actionTransport.",
},
is_error=True,
)
normalized_overrides: dict[str, Any] = {}
for key, value in raw_overrides.items():
normalized_overrides[override_key_map.get(key, key)] = value
if reset_requested:
session_store.clear()
has_config = session_store.has_config()
if has_config and country and location and not reset_requested:
current = session_store.get_metadata()
return create_tool_response(
{
"error": "CKAN API already initialised for this session. "
"Pass resetContext=true to drop the current selection before reinitialising.",
"currentSelection": current,
},
is_error=True,
)
if not country:
description = config_catalog.describe()
if reset_requested:
description["message"] = (
"Session CKAN selection cleared. Choose a country to continue."
)
return create_tool_response(description)
if country and not location:
description = config_catalog.describe(country)
is_error = "error" in description
return create_tool_response(description, is_error=is_error)
assert country is not None
assert location is not None
entry = config_catalog.get_location_entry(country, location)
if entry is None:
return create_tool_response(
{
"error": f"'{location}' in '{country}' is not an available CKAN portal.",
"countries": config_catalog.list_countries(),
},
is_error=True,
)
try:
entry_overrides = entry.get("overrides") or {}
config_kwargs: dict[str, Any] = {
"ckan_base_url": _normalize_action_base_url(entry["base_url"]),
}
optional_fields = [
"ckan_site_url",
"dataset_page_url_template",
"default_search_rows",
"default_preview_limit",
"action_transport",
"datastore_id_alias",
"requires_api_key",
"max_search_rows",
"helper_prompt",
]
def resolve_override(field_name: str) -> Any:
if field_name in normalized_overrides:
return normalized_overrides[field_name]
if field_name in entry_overrides:
return entry_overrides[field_name]
return entry.get(field_name)
for field_name in optional_fields:
value = resolve_override(field_name)
if value is not None:
config_kwargs[field_name] = value
candidate_api_key = (
arguments.get("apiKey")
or normalized_overrides.get("api_key")
or entry_overrides.get("api_key")
or entry.get("api_key")
or os.getenv("CKAN_API_KEY")
)
if candidate_api_key:
config_kwargs["api_key"] = candidate_api_key
if config_kwargs.get("requires_api_key") and not config_kwargs.get("api_key"):
helper_prompt = resolve_override("helper_prompt")
return create_tool_response(
{
"error": f"The CKAN portal '{entry['location']}' requires an API key.",
"action": (
"Provide apiKey when calling ckan_api_initialise or set the CKAN_API_KEY environment variable."
),
"helperPrompt": helper_prompt,
},
is_error=True,
)
config = CkanToolsConfig(**config_kwargs)
metadata = {
"country": entry["country"],
"location": entry["location"],
"ckan_base_url": config.ckan_base_url,
"action_transport": config.action_transport,
"requires_api_key": config.requires_api_key,
}
if config.helper_prompt:
metadata["helper_prompt"] = config.helper_prompt
session_store.set_config(config, metadata=metadata)
return create_tool_response(
{
"message": f"CKAN API initialised for {entry['location']} ({entry['country']}).",
"selection": metadata,
"config": _config_public_view(config),
}
)
except Exception as exc:
logger.error("Failed to initialise CKAN config: %s", exc)
return create_tool_response(
{"error": f"Failed to initialise CKAN for {entry['location']}: {exc}"},
is_error=True,
)
return handler
def _handle_ckan_api_availability(
session_store: SessionConfigStore, config_catalog: CkanConfigCatalog
) -> ToolHandler:
async def handler(arguments: dict[str, Any]) -> ToolResponse:
catalog_description = config_catalog.describe()
response_payload = dict(catalog_description)
if session_store.has_config():
response_payload["currentSelection"] = session_store.get_metadata()
return create_tool_response(response_payload)
return handler
def _handle_audit_ckan_api(session_store: SessionConfigStore) -> ToolHandler:
async def runner(manager: CkanToolsManager, arguments: dict[str, Any]) -> ToolResponse:
package_id = arguments.get("packageId")
resource_id = arguments.get("resourceId")
async with aiohttp.ClientSession() as session:
try:
transport_assessment = await _probe_action_transport(manager, session)
preferred_method = (
transport_assessment.get("recommended") or manager.config.action_transport
)
site_metadata = await _probe_site_metadata(manager, session, preferred_method)
datastore_assessment = await _probe_datastore_alias(
manager,
session,
method=preferred_method,
package_id=package_id,
resource_id=resource_id,
)
missing_functionality = _derive_missing_functionality_notes(
site_metadata.get("extensions")
)
recommendations = _build_audit_recommendations(
manager.config, transport_assessment, site_metadata, datastore_assessment
)
audit_payload = {
"endpoint": {
"baseUrl": manager.config.ckan_base_url,
"configuredTransport": manager.config.action_transport,
"configuredDatastoreIdAlias": manager.config.datastore_id_alias,
"configuredSiteUrl": manager.config.ckan_site_url,
"configuredDatasetPageTemplate": manager.config.dataset_page_url_template,
"requiresApiKey": manager.config.requires_api_key,
"apiKeyConfigured": bool(manager.config.api_key),
},
"transportAssessment": transport_assessment,
"siteMetadata": site_metadata,
"datastoreAssessment": datastore_assessment,
"recommendations": recommendations,
"missingFunctionality": missing_functionality,
}
return create_tool_response(audit_payload)
except CkanApiError as api_exc:
logger.error("CKAN error in audit_ckan_api: %s", api_exc.message, exc_info=True)
return _build_error_response("Failed to audit CKAN API.", api_exc)
except Exception as exc:
logger.exception("Error in audit_ckan_api")
return _build_error_response("Failed to audit CKAN API.", exc)
return _wrap_with_manager(session_store, runner)
def _handle_get_package(session_store: SessionConfigStore) -> ToolHandler:
async def runner(manager: CkanToolsManager, arguments: dict[str, Any]) -> ToolResponse:
package_id = arguments["packageId"]
summary = arguments.get("summary", True)
async with aiohttp.ClientSession() as session:
try:
pkg = await manager.fetch_package(package_id, session)
result = create_package_summary(pkg, manager.config) if summary else pkg.dict()
return create_tool_response(result)
except CkanApiError as api_exc:
logger.error("CKAN error in get_package: %s", api_exc.message, exc_info=True)
return _build_error_response(f"Failed to fetch package '{package_id}'.", api_exc)
except Exception as exc:
logger.exception("Error in get_package")
return _build_error_response(f"Failed to fetch package '{package_id}'.", exc)
return _wrap_with_manager(session_store, runner)
def _handle_get_first_datastore_resource_records(
session_store: SessionConfigStore,
) -> ToolHandler:
async def runner(manager: CkanToolsManager, arguments: dict[str, Any]) -> ToolResponse:
package_id = arguments["packageId"]
limit = arguments.get("limit", 10)
offset = arguments.get("offset", 0)
datastore_params = _collect_arguments(arguments, DATASTORE_ARGUMENT_MAP)
async with aiohttp.ClientSession() as session:
try:
pkg = await manager.fetch_package(package_id, session)
ds_resources = [r for r in pkg.resources if r.datastore_active]
if not ds_resources:
return create_tool_response({"message": "No active datastore resources found."})
result = await manager.fetch_resource(
ds_resources[0].id,
session,
limit,
offset,
params=datastore_params,
)
summary = {
"resource_id": ds_resources[0].id,
"resource_name": ds_resources[0].name,
"total_records": result.total,
"returned_records": len(result.records),
"fields": [field.dict() for field in result.fields],
"records": result.records,
}
return create_tool_response(summary)
except CkanApiError as api_exc:
logger.error(
"CKAN error in get_first_datastore_resource_records: %s",
api_exc.message,
exc_info=True,
)
return _build_error_response("Failed to fetch resource records.", api_exc)
except Exception as exc:
logger.exception("Error in get_first_datastore_resource_records")
return _build_error_response("Failed to fetch resource records.", exc)
return _wrap_with_manager(session_store, runner)
def _handle_get_resource_records(session_store: SessionConfigStore) -> ToolHandler:
async def runner(manager: CkanToolsManager, arguments: dict[str, Any]) -> ToolResponse:
resource_id = arguments["resourceId"]
limit = arguments.get("limit", 10)
offset = arguments.get("offset", 0)
datastore_params = _collect_arguments(arguments, DATASTORE_ARGUMENT_MAP)
async with aiohttp.ClientSession() as session:
try:
result = await manager.fetch_resource(
resource_id, session, limit, offset, params=datastore_params
)
summary = {
"resource_id": resource_id,
"total_records": result.total,
"returned_records": len(result.records),
"limit": limit,
"offset": offset,
"fields": [field.dict() for field in result.fields],
"records": result.records,
}
return create_tool_response(summary)
except CkanApiError as api_exc:
logger.error(
"CKAN error in get_resource_records: %s", api_exc.message, exc_info=True
)
return _build_error_response("Failed to fetch resource records.", api_exc)
except Exception as exc:
logger.exception("Error in get_resource_records")
return _build_error_response("Failed to fetch resource records.", exc)
return _wrap_with_manager(session_store, runner)
def _handle_list_datasets(session_store: SessionConfigStore) -> ToolHandler:
async def runner(manager: CkanToolsManager, arguments: dict[str, Any]) -> ToolResponse:
limit = arguments.get("limit", 50)
offset = arguments.get("offset", 0)
include_total = arguments.get("includeTotal", True)
async with aiohttp.ClientSession() as session:
try:
dataset_ids = await manager.fetch_package_list(session, limit=limit, offset=offset)
total_count: int | None = None
if include_total:
try:
search_result = await manager.fetch_package_search(
"*:*", session, rows=0, start=0
)
total_count = search_result.count
except Exception as count_error:
logger.warning("Unable to fetch CKAN dataset count: %s", count_error)
summary = {
"total_datasets": total_count,
"returned_count": len(dataset_ids),
"offset": offset,
"limit": limit,
"dataset_ids": dataset_ids,
"has_more": (
(offset + len(dataset_ids) < total_count)
if (total_count is not None)
else len(dataset_ids) == limit
),
}
return create_tool_response(summary)
except CkanApiError as api_exc:
logger.error("CKAN error in list_datasets: %s", api_exc.message, exc_info=True)
return _build_error_response("Failed to list datasets.", api_exc)
except Exception as exc:
logger.exception("Error in list_datasets")
return _build_error_response("Failed to list datasets.", exc)
return _wrap_with_manager(session_store, runner)
def _handle_search_datasets(session_store: SessionConfigStore) -> ToolHandler:
async def runner(manager: CkanToolsManager, arguments: dict[str, Any]) -> ToolResponse:
query = arguments["query"]
output_limit = arguments.get("limit", 20)
search_rows = arguments.get("rows", output_limit)
search_options = _collect_arguments(arguments, SEARCH_ARGUMENT_MAP)
facet_fields = arguments.get("facetFields")
extra_params, extra_error = _get_extra_search_params(arguments)
if extra_error:
return extra_error
async with aiohttp.ClientSession() as session:
try:
result = await manager.fetch_package_search(
query,
session,
rows=search_rows,
start=search_options.get("start"),
fq=search_options.get("fq"),
sort=search_options.get("sort"),
include_private=search_options.get("include_private"),
facet_fields=facet_fields,
extra_params=extra_params,
)
limited_results = [
create_package_summary(pkg, manager.config).dict()
for pkg in result.results[:output_limit]
]
summary = {
"query": query,
"total_found": result.count,
"returned_count": len(limited_results),
"datasets": limited_results,
"facets": result.facets or result.search_facets or {},
}
return create_tool_response(summary)
except CkanApiError as api_exc:
logger.error("CKAN error in search_datasets: %s", api_exc.message, exc_info=True)
return _build_error_response("Failed to search datasets.", api_exc)
except Exception as exc:
logger.exception("Error in search_datasets")
return _build_error_response("Failed to search datasets.", exc)
return _wrap_with_manager(session_store, runner)
def _handle_find_relevant_datasets(session_store: SessionConfigStore) -> ToolHandler:
async def runner(manager: CkanToolsManager, arguments: dict[str, Any]) -> ToolResponse:
query = arguments["query"]
max_results = arguments.get("maxResults", 20)
include_relevance_score = arguments.get("includeRelevanceScore", True)
requested_rows = arguments.get("rows")
search_rows = (
requested_rows
if requested_rows is not None
else max(max_results, manager.config.default_search_rows)
)
search_options = _collect_arguments(arguments, SEARCH_ARGUMENT_MAP)
facet_fields = arguments.get("facetFields") or ["organization", "groups", "tags"]
extra_params, extra_error = _get_extra_search_params(arguments)
if extra_error:
return extra_error
async with aiohttp.ClientSession() as session:
try:
search_result = await manager.fetch_package_search(
query,
session,
rows=search_rows,
start=search_options.get("start"),
fq=search_options.get("fq"),
sort=search_options.get("sort"),
include_private=search_options.get("include_private"),
facet_fields=facet_fields,
extra_params=extra_params,
)
# Analyze and score each dataset for relevance
scored_datasets = []
for dataset in search_result.results:
relevance_score = manager.scorer.score(dataset, query)
update_frequency = manager.analyzer.categorize(dataset)
dataset_dict = dataset.dict()
dataset_dict.update(
{
"relevance_score": relevance_score,
"update_frequency": update_frequency,
"resource_count": len(dataset.resources),
"has_datastore": any(r.datastore_active for r in dataset.resources),
}
)
scored_datasets.append(dataset_dict)
# Sort by relevance score and limit results
sorted_datasets = sorted(
scored_datasets, key=lambda x: x["relevance_score"], reverse=True
)[:max_results]
analysis = {
"query": query,
"total_found": search_result.count,
"returned_count": len(sorted_datasets),
"datasets": (
sorted_datasets
if include_relevance_score
else [
{k: v for k, v in d.items() if k != "relevance_score"}
for d in sorted_datasets
]
),
"facets": search_result.facets or {},
}
return create_tool_response(analysis)
except CkanApiError as api_exc:
logger.error(
"CKAN error in find_relevant_datasets: %s", api_exc.message, exc_info=True
)
return _build_error_response("Failed to find relevant datasets.", api_exc)
except Exception as exc:
logger.exception("Error in find_relevant_datasets")
return _build_error_response("Failed to find relevant datasets.", exc)
return _wrap_with_manager(session_store, runner)
def _handle_analyze_dataset_updates(session_store: SessionConfigStore) -> ToolHandler:
async def runner(manager: CkanToolsManager, arguments: dict[str, Any]) -> ToolResponse:
query = arguments.get("query")
package_ids = arguments.get("packageIds")
group_by_frequency = arguments.get("groupByFrequency", True)
search_rows = arguments.get("searchRows", 100)
search_options = _collect_arguments(arguments, SEARCH_ARGUMENT_MAP)
facet_fields = arguments.get("facetFields")
extra_params, extra_error = _get_extra_search_params(arguments)
if extra_error:
return extra_error
async with aiohttp.ClientSession() as session:
try:
datasets = []
facets: dict[str, Any] | None = None
if package_ids:
# Get specific packages
datasets = await asyncio.gather(
*[manager.fetch_package(pkg_id, session) for pkg_id in package_ids]
)
elif query:
# Search for datasets
search_result = await manager.fetch_package_search(
query,
session,
rows=search_rows,
start=search_options.get("start"),
fq=search_options.get("fq"),
sort=search_options.get("sort"),
include_private=search_options.get("include_private"),
facet_fields=facet_fields,
extra_params=extra_params,
)
datasets = search_result.results
facets = search_result.facets or search_result.search_facets
else:
return _build_error_response(
"Either query or packageIds parameter is required."
)
# Analyze update patterns
update_analysis: list[dict[str, Any]] = []
for dataset in datasets:
update_freq = manager.analyzer.categorize(dataset)
analysis = {
"id": dataset.id,
"name": dataset.name,
"title": dataset.title,
"update_frequency": update_freq,
"last_modified": dataset.metadata_modified,
"refresh_rate": dataset.refresh_rate,
"organization": (
dataset.organization.title if dataset.organization else None
),
"resource_count": len(dataset.resources),
"datastore_resources": sum(
1 for r in dataset.resources if r.datastore_active
),
}
update_analysis.append(analysis)
result = {
"total_datasets": len(datasets),
"datasets": update_analysis,
}
if facets:
result["facets"] = facets
if group_by_frequency:
frequency_groups: dict[str | None, list[dict[str, Any]]] = {}
for dataset_info in update_analysis:
freq = cast(str | None, dataset_info.get("update_frequency"))
frequency_groups.setdefault(freq, []).append(dataset_info)
result["frequency_summary"] = [
{
"frequency": frequency,
"count": len(datasets_list),
"datasets": [
{"id": d["id"], "title": d["title"]} for d in datasets_list
],
}
for frequency, datasets_list in frequency_groups.items()
]
return create_tool_response(result)
except CkanApiError as api_exc:
logger.error(
"CKAN error in analyze_dataset_updates: %s", api_exc.message, exc_info=True
)
return _build_error_response("Failed to analyze dataset updates.", api_exc)
except Exception as exc:
logger.exception("Error in analyze_dataset_updates")
return _build_error_response("Failed to analyze dataset updates.", exc)
return _wrap_with_manager(session_store, runner)
def _handle_analyze_dataset_structure(session_store: SessionConfigStore) -> ToolHandler:
async def runner(manager: CkanToolsManager, arguments: dict[str, Any]) -> ToolResponse:
package_id = arguments["packageId"]
include_data_preview = arguments.get("includeDataPreview", False)
preview_limit = arguments.get("previewLimit", manager.config.default_preview_limit)
preview_offset = arguments.get("previewOffset", 0)
preview_params = _collect_arguments(arguments, DATASTORE_ARGUMENT_MAP, prefix="preview")
async with aiohttp.ClientSession() as session:
try:
pkg = await manager.fetch_package(package_id, session)
resource_analysis: list[dict[str, Any]] = []
for resource in pkg.resources:
resource_info: dict[str, Any] = {
"id": resource.id,
"name": resource.name,
"format": resource.format,
"size": resource.size,
"mimetype": resource.mimetype,
"url": resource.url,
"created": resource.created,
"last_modified": resource.last_modified or resource.created,
"datastore_active": resource.datastore_active,
"fields": None,
"record_count": None,
"sample_data": None,
}
# If it's a datastore resource, get field information
if resource.datastore_active:
try:
datastore_info = await manager.fetch_datastore_info(
resource.id, session
)
resource_info["fields"] = [
field.dict() for field in datastore_info.fields
]
resource_info["record_count"] = datastore_info.total
# Get sample data if requested
if include_data_preview and preview_limit > 0:
sample_result = await manager.fetch_resource(
resource.id,
session,
preview_limit,
preview_offset,
params=preview_params,
)
resource_info["sample_data"] = sample_result.records
except Exception:
# If datastore query fails, mark as inactive
resource_info["datastore_active"] = False
resource_analysis.append(resource_info)
analysis = {
"package_id": pkg.id,
"name": pkg.name,
"title": pkg.title,
"description": pkg.notes,
"tags": [tag.name for tag in pkg.tags],
"organization": pkg.organization.title if pkg.organization else None,
"created": pkg.metadata_created,
"last_modified": pkg.metadata_modified,
"update_frequency": manager.analyzer.categorize(pkg),
"resource_summary": {
"total_resources": len(pkg.resources),
"datastore_resources": sum(
1 for r in resource_analysis if r["datastore_active"]
),
"formats": list(
{r["format"] for r in resource_analysis if r.get("format")}
),
"total_records": sum(
(r.get("record_count") or 0) for r in resource_analysis
),
},
"resources": resource_analysis,
}
return create_tool_response(analysis)
except CkanApiError as api_exc:
logger.error(
"CKAN error in analyze_dataset_structure: %s", api_exc.message, exc_info=True
)
return _build_error_response("Failed to analyze dataset structure.", api_exc)
except Exception as exc:
logger.exception("Error in analyze_dataset_structure")
return _build_error_response("Failed to analyze dataset structure.", exc)
return _wrap_with_manager(session_store, runner)
def _handle_get_data_categories(session_store: SessionConfigStore) -> ToolHandler:
async def runner(manager: CkanToolsManager, arguments: dict[str, Any]) -> ToolResponse:
async with aiohttp.ClientSession() as session:
try:
organizations, groups = await asyncio.gather(
manager.fetch_organizations(session), manager.fetch_groups(session)
)
result = {
"organizations": [
{
"id": org.id,
"name": org.name,
"title": org.title,
"description": org.description,
"package_count": org.package_count,
}
for org in organizations
],
"groups": [
{
"id": group.id,
"name": group.name,
"title": group.title,
"description": group.description,
"package_count": group.package_count,
}
for group in groups
],
}
return create_tool_response(result)
except CkanApiError as api_exc:
logger.error(
"CKAN error in get_data_categories: %s", api_exc.message, exc_info=True
)
return _build_error_response("Failed to get data categories.", api_exc)
except Exception as exc:
logger.exception("Error in get_data_categories")
return _build_error_response("Failed to get data categories.", exc)
return _wrap_with_manager(session_store, runner)
def _handle_get_dataset_insights(session_store: SessionConfigStore) -> ToolHandler:
async def runner(manager: CkanToolsManager, arguments: dict[str, Any]) -> ToolResponse:
query = arguments["query"]
include_update_frequency = arguments.get("includeUpdateFrequency", True)
include_data_structure = arguments.get("includeDataStructure", True)
max_datasets = arguments.get("maxDatasets", 20)
requested_rows = arguments.get("rows")
search_rows = (
requested_rows
if requested_rows is not None
else max(max_datasets, manager.config.default_search_rows)
)
search_options = _collect_arguments(arguments, SEARCH_ARGUMENT_MAP)
facet_fields = arguments.get("facetFields") or ["organization", "groups", "tags"]
extra_params, extra_error = _get_extra_search_params(arguments)
if extra_error:
return extra_error
async with aiohttp.ClientSession() as session:
try:
# Find relevant datasets
search_result = await manager.fetch_package_search(
query,
session,
rows=search_rows,
start=search_options.get("start"),
fq=search_options.get("fq"),
sort=search_options.get("sort"),
include_private=search_options.get("include_private"),
facet_fields=facet_fields,
extra_params=extra_params,
)
# Score and sort datasets
scored_datasets = sorted(
search_result.results,
key=lambda x: manager.scorer.score(x, query),
reverse=True,
)[:max_datasets]
# Gather insights for each dataset
insights: list[dict[str, Any]] = []
for dataset in scored_datasets:
base_info: dict[str, Any] = {
"id": dataset.id,
"name": dataset.name,
"title": dataset.title,
"description": dataset.notes,
"relevance_score": manager.scorer.score(dataset, query),
"organization": (
dataset.organization.title if dataset.organization else None
),
"tags": [tag.name for tag in dataset.tags],
"url": manager.url_builder.build_dataset_url(dataset),
}
additional_info: dict[str, Any] = {}
if include_update_frequency:
additional_info["update_info"] = {
"frequency": manager.analyzer.categorize(dataset),
"last_modified": dataset.metadata_modified,
"refresh_rate": dataset.refresh_rate,
}
if include_data_structure:
datastore_resources = [r for r in dataset.resources if r.datastore_active]
if datastore_resources:
try:
first_resource = datastore_resources[0]
datastore_info = await manager.fetch_datastore_info(
first_resource.id, session
)
additional_info["data_structure"] = {
"record_count": datastore_info.total,
"fields": [field.dict() for field in datastore_info.fields],
"resource_count": len(dataset.resources),
"datastore_resources": len(datastore_resources),
}
except Exception:
additional_info["data_structure"] = {
"error": "Could not fetch datastore info",
"resource_count": len(dataset.resources),
"datastore_resources": len(datastore_resources),
}
else:
additional_info["data_structure"] = {
"message": "No active datastore resources",
"resource_count": len(dataset.resources),
"datastore_resources": 0,
}
insights.append({**base_info, **additional_info})
result = {
"query": query,
"total_found": search_result.count,
"analyzed_datasets": len(insights),
"insights": insights,
"query_suggestions": _build_query_suggestions(insights),
"facets": search_result.facets or {},
}
return create_tool_response(result)
except CkanApiError as api_exc:
logger.error(
"CKAN error in get_dataset_insights: %s", api_exc.message, exc_info=True
)
return _build_error_response("Failed to get dataset insights.", api_exc)
except Exception as exc:
logger.exception("Error in get_dataset_insights")
return _build_error_response("Failed to get dataset insights.", exc)
return _wrap_with_manager(session_store, runner)
def _handle_download_dataset_locally(session_store: SessionConfigStore) -> ToolHandler:
async def runner(manager: CkanToolsManager, arguments: dict[str, Any]) -> ToolResponse:
package_id = arguments["packageId"]
resource_id = arguments.get("resourceId")
preferred_format = arguments.get("preferredFormat")
download_timeout = int(arguments.get("downloadTimeoutSeconds", 120))
try:
result = await manager.download_dataset_locally(
package_id,
resource_id=resource_id,
preferred_format=preferred_format,
download_timeout=download_timeout,
)
return create_tool_response(result)
except ValueError as value_error:
logger.error("Download tool received invalid input: %s", value_error)
return create_tool_response({"error": str(value_error)}, is_error=True)
except CkanApiError as api_exc:
logger.error(
"CKAN error in download_dataset_locally: %s", api_exc.message, exc_info=True
)
return _build_error_response("Failed to download dataset.", api_exc)
except Exception as exc:
logger.error("Error in download_dataset_locally: %s", exc)
return _build_error_response("Failed to download dataset.", exc)
return _wrap_with_manager(session_store, runner)