#!/usr/bin/env python3
"""
Basic Usage Examples for CKAN MCP Server
This script demonstrates how to use the CKAN MCP server programmatically
for various data analysis and discovery tasks.
"""
import asyncio
import json
import os
from dotenv import load_dotenv
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src"))
from ckan_mcp.ckan_tools import CkanToolsManager, CkanApiError
from ckan_mcp.types import CkanToolsConfig
# Load environment variables
load_dotenv()
def _load_toronto_config() -> dict:
"""Load Toronto configuration from the JSON config file."""
config_path = os.path.join(
os.path.dirname(__file__), "..", "src", "ckan_mcp", "data", "ckan_config_selection.json"
)
with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f)
toronto_config = config_data["countries"]["Canada"]["locations"]["Toronto"]
# Build the complete configuration dict - start with base_url
result = {
"ckan_base_url": toronto_config["base_url"],
}
# Add overrides if they exist
if "overrides" in toronto_config:
result.update(toronto_config["overrides"])
return result
TORONTO_DEFAULTS = _load_toronto_config()
def _build_default_config() -> CkanToolsConfig:
"""Create a CKAN config that mirrors the Toronto MCP catalog entry."""
base_url = os.getenv("CKAN_BASE_URL", TORONTO_DEFAULTS["ckan_base_url"])
site_url = os.getenv("CKAN_SITE_URL", TORONTO_DEFAULTS["ckan_site_url"])
dataset_url_template = os.getenv(
"CKAN_DATASET_URL_TEMPLATE", TORONTO_DEFAULTS["dataset_page_url_template"]
)
config_kwargs = {
"ckan_base_url": base_url,
"ckan_site_url": site_url,
"dataset_page_url_template": dataset_url_template,
"action_transport": TORONTO_DEFAULTS["action_transport"],
"datastore_id_alias": TORONTO_DEFAULTS["datastore_id_alias"],
"helper_prompt": TORONTO_DEFAULTS["helper_prompt"],
}
api_key = os.getenv("CKAN_API_KEY")
if api_key:
config_kwargs["api_key"] = api_key
return CkanToolsConfig(**config_kwargs)
DEFAULT_CONFIG = _build_default_config()
def _create_manager() -> CkanToolsManager:
"""Create a manager using the Toronto defaults (or env overrides)."""
return CkanToolsManager(DEFAULT_CONFIG)
async def example_1_basic_search():
"""Example 1: Basic dataset search"""
print("π Example 1: Basic Dataset Search")
print("=" * 50)
# Configure the manager
manager = _create_manager()
try:
# Search for parking datasets
async with manager._create_session() as session:
search_result = await manager.fetch_package_search("parking", session)
print(f"Found {search_result.count} parking-related datasets")
print("\nTop 5 results:")
for i, dataset in enumerate(search_result.results[:5], 1):
print(f"{i}. {dataset.title}")
print(f" Organization: {dataset.organization.title}")
print(f" Resources: {len(dataset.resources)}")
print(f" Description: {dataset.notes[:100]}...")
print()
except Exception as e:
print(f"Error: {e}")
print("\n" + "=" * 50 + "\n")
async def example_2_relevance_scoring():
"""Example 2: Advanced search with relevance scoring"""
print("π― Example 2: Advanced Search with Relevance Scoring")
print("=" * 50)
manager = _create_manager()
try:
async with manager._create_session() as session:
# Advanced search with facets
search_result = await manager.fetch_package_search(
"police expediture",
session,
rows=20,
facet_fields=["organization", "tags"],
)
print(f"Total results: {search_result.count}")
print("\nRelevance-scored results:")
# Score and rank datasets
scored_datasets = []
for dataset in search_result.results:
score = manager.scorer.score(dataset, "police expenditure")
scored_datasets.append((dataset, score))
# Sort by relevance score
scored_datasets.sort(key=lambda x: x[1], reverse=True)
for i, (dataset, score) in enumerate(scored_datasets[:10], 1):
print(f"{i}. Score: {score:2d} | {dataset.title}")
print(f" Org: {dataset.organization.title}")
print(f" Tags: {', '.join(tag.name for tag in dataset.tags[:3])}")
print()
except Exception as e:
print(f"Error: {e}")
print("\n" + "=" * 50 + "\n")
async def example_3_update_frequency_analysis():
"""Example 3: Update frequency analysis"""
print("π Example 3: Update Frequency Analysis")
print("=" * 50)
manager = _create_manager()
try:
async with manager._create_session() as session:
# Get recent datasets
search_result = await manager.fetch_package_search("toronto", session, rows=15)
print("Update Frequency Analysis:")
print("-" * 30)
frequency_groups = {}
for dataset in search_result.results:
freq = manager.analyzer.categorize(dataset)
if freq not in frequency_groups:
frequency_groups[freq] = []
frequency_groups[freq].append(dataset)
for freq, datasets in frequency_groups.items():
print(f"\n{freq.upper()} ({len(datasets)} datasets):")
for dataset in datasets[:3]: # Show top 3 per category
last_modified = dataset.metadata_modified
refresh_rate = dataset.refresh_rate or "Not specified"
print(f" β’ {dataset.title}")
print(f" Last modified: {last_modified}")
print(f" Refresh rate: {refresh_rate}")
except Exception as e:
print(f"Error: {e}")
print("\n" + "=" * 50 + "\n")
async def example_4_data_structure_analysis():
"""Example 4: Data structure analysis"""
print("ποΈ Example 4: Data Structure Analysis")
print("=" * 50)
manager = _create_manager()
test_datasets = ["bicycle parking bike stations", "ttc subway delay data", "traffic volumes midblock"]
def select_dataset_candidate(query: str, candidates):
"""Pick the best dataset match from search results."""
normalized = query.replace("-", " ").lower()
for candidate in candidates:
if candidate.name == query:
return candidate
for candidate in candidates:
if normalized in candidate.title.lower():
return candidate
return candidates[0] if candidates else None
analysis_completed = False
try:
async with manager._create_session() as session:
for dataset_name in test_datasets:
print(f"\nSearching for dataset: '{dataset_name}'")
print("-" * 40)
package = None
# First try direct fetch by name/ID
try:
package = await manager.fetch_package(dataset_name, session)
except Exception:
print(f"Direct fetch for '{dataset_name}' failed, searching by keyword...")
if package is None:
try:
search_result = await manager.fetch_package_search(dataset_name, session)
except Exception as search_error:
print(f"Search failed for '{dataset_name}': {search_error}")
continue
datastores = [
pkg
for pkg in search_result.results
if any(resource.datastore_active for resource in pkg.resources)
]
candidate = select_dataset_candidate(
dataset_name, datastores or search_result.results
)
if not candidate:
print(f"No datasets found for '{dataset_name}'.")
continue
try:
package = await manager.fetch_package(candidate.id, session)
except Exception as fetch_error:
print(f"Failed to fetch dataset '{candidate.name}': {fetch_error}")
continue
datastore_resources = [r for r in package.resources if r.datastore_active]
if not datastore_resources:
print(f"Dataset '{package.name}' has no datastore resources, trying next...")
continue
print(f"Analyzing dataset: {package.name}")
print(f"Title: {package.title}")
print(f"Total resources: {len(package.resources)}")
print(f"Datastore resources: {len(datastore_resources)}")
resource = datastore_resources[0]
print(f"\nAnalyzing resource: {resource.name}")
try:
datastore_info = await manager.fetch_datastore_info(resource.id, session)
print(f"Record count: {datastore_info.total:,}")
print(f"Fields: {len(datastore_info.fields)}")
print("\nFirst 10 fields:")
for field in datastore_info.fields[:10]:
field_type = field.type
field_label = (
field.info.label if field.info and field.info.label else field.id
)
print(f" β’ {field_label} ({field_type})")
if datastore_info.total > 0:
sample_data = await manager.fetch_resource(
resource.id, session, limit=3, offset=0
)
print(f"\nSample records ({len(sample_data.records)}):")
for i, record in enumerate(sample_data.records[:3], 1):
print(f" Record {i}:")
for key, value in list(record.items())[:5]:
print(f" {key}: {value}")
if len(record) > 5:
print(f" ... and {len(record) - 5} more fields")
except Exception as e:
print(f"Error analyzing datastore: {e}")
continue
print("-" * 40)
analysis_completed = True
if not analysis_completed:
print("No datasets with datastore resources were successfully analyzed.")
except Exception as e:
print(f"Unexpected error during data structure analysis: {e}")
print("\n" + "=" * 50 + "\n")
async def example_5_comprehensive_insights():
"""Example 5: Comprehensive dataset insights"""
print("π‘ Example 5: Comprehensive Dataset Insights")
print("=" * 50)
try:
# Simulate getting comprehensive insights
query = "environmental data"
print(f"Getting comprehensive insights for: '{query}'")
# This would normally be called through MCP protocol
# For demonstration, we'll use the manager directly
manager = _create_manager()
async with manager._create_session() as session:
# Get search results
search_result = await manager.fetch_package_search(
query,
session,
rows=10,
facet_fields=["organization", "tags"],
)
# Score and analyze datasets
insights = []
for dataset in search_result.results[:5]:
relevance_score = manager.scorer.score(dataset, query)
update_frequency = manager.analyzer.categorize(dataset)
# Get resource info
datastore_resources = [r for r in dataset.resources if r.datastore_active]
insight = {
"id": dataset.id,
"title": dataset.title,
"organization": dataset.organization.title,
"relevance_score": relevance_score,
"update_frequency": update_frequency,
"resource_count": len(dataset.resources),
"datastore_resources": len(datastore_resources),
"url": manager.url_builder.build_dataset_url(dataset),
"tags": [tag.name for tag in dataset.tags[:5]],
}
# Add update info
insight["update_info"] = {
"frequency": update_frequency,
"last_modified": dataset.metadata_modified,
"refresh_rate": dataset.refresh_rate or "Not specified",
}
# Add data structure summary
insight["data_structure"] = {
"resource_count": len(dataset.resources),
"datastore_resources": len(datastore_resources),
"formats": list(set(r.format for r in dataset.resources)),
}
insights.append(insight)
# Display insights
print(f"\nFound {len(insights)} key insights:")
for i, insight in enumerate(insights, 1):
print(f"\n{i}. {insight['title']}")
print(f" Relevance Score: {insight['relevance_score']}")
print(f" Organization: {insight['organization']}")
print(f" Update Frequency: {insight['update_info']['frequency']}")
print(
f" Resources: {insight['data_structure']['resource_count']} total, {insight['data_structure']['datastore_resources']} with data"
)
print(f" Formats: {', '.join(insight['data_structure']['formats'])}")
print(f" Tags: {', '.join(insight['tags'])}")
# Generate query suggestions
organizations = list(set(insight["organization"] for insight in insights))
common_tags = []
for insight in insights:
common_tags.extend(insight["tags"])
common_tags = list(set(common_tags))[:10]
print(f"\nπ Query Suggestions:")
print(f"Organizations: {', '.join(organizations)}")
print(f"Common Tags: {', '.join(common_tags)}")
except Exception as e:
print(f"Error: {e}")
print("\n" + "=" * 50 + "\n")
async def _resolve_dataset_identifier(
manager: CkanToolsManager, dataset_id: str, dataset_query: str
) -> str:
"""Confirm dataset identifier by falling back to a search query."""
search_term = dataset_query or dataset_id
async with manager._create_session() as session:
search_result = await manager.fetch_package_search(search_term, session)
if not search_result.results:
raise RuntimeError(f"No datasets found for query '{search_term}'.")
datastore_candidates = [
pkg
for pkg in search_result.results
if any(resource.datastore_active for resource in pkg.resources)
]
candidate = datastore_candidates[0] if datastore_candidates else search_result.results[0]
print(f"Selected dataset '{candidate.title}' (ID: {candidate.id}) from search results.")
return candidate.id
async def example_6_download_dataset():
"""Example 6: Download a dataset and helper files locally."""
print("π₯ Example 6: Download Toronto Island Ferry Sales Data")
print("=" * 50)
manager = _create_manager()
dataset_id = None
dataset_query = "Toronto Island Ferry Ticket Sales"
print(f"Attempting to download dataset '{dataset_id}' via curl...")
target_root = os.getenv("CKAN_MCP_LOCAL_DATASTORE", os.getcwd())
print(f"Target directory: {target_root}")
try:
resolved_dataset_id = await _resolve_dataset_identifier(manager, dataset_id, dataset_query)
except RuntimeError as resolution_error:
print(f"β Could not resolve dataset identifier: {resolution_error}")
print("\n" + "=" * 50 + "\n")
return
print(f"Downloading dataset using resolved identifier: {resolved_dataset_id}")
try:
download_result = await manager.download_dataset_locally(
package_id=resolved_dataset_id,
preferred_format="CSV",
download_timeout=180,
)
dataset_info = download_result["dataset"]
print(f"\nβ
Download complete for: {dataset_info['title']}")
print(f"- Resource: {dataset_info['resource_name']} ({dataset_info['resource_format']})")
print(f"- Data file: {dataset_info['download_path']}")
print(f"- Metadata: {dataset_info['metadata_path']}")
print(f"- How-to guide: {dataset_info['how_to_path']}")
print(f"- Storage dir: {download_result['storage_directory']}")
print("\nHow-to guide preview:")
with open(dataset_info["how_to_path"], "r", encoding="utf-8") as how_to_file:
for _ in range(10):
line = how_to_file.readline()
if not line:
break
print(line.rstrip())
except CkanApiError as api_error:
print(f"β CKAN API error while downloading dataset: {api_error}")
except Exception as e:
print(f"β Failed to download dataset: {e}")
print("\n" + "=" * 50 + "\n")
async def main():
"""Run all examples"""
print("π CKAN MCP Server - Usage Examples")
print("=" * 60)
print(f"Active CKAN endpoint: {DEFAULT_CONFIG.ckan_base_url}")
print(f"Action transport: {DEFAULT_CONFIG.action_transport.upper()}")
if DEFAULT_CONFIG.helper_prompt:
print(f"Helper prompt: {DEFAULT_CONFIG.helper_prompt}")
print()
# Check environment variables
if not os.getenv("CKAN_BASE_URL"):
print("β οΈ Warning: CKAN_BASE_URL not set, using default")
print("Set environment variables for real API access:")
print("export CKAN_BASE_URL='https://ckan0.cf.opendata.inter.prod-toronto.ca/api/3/action'")
print("export CKAN_SITE_URL='https://ckan0.cf.opendata.inter.prod-toronto.ca'")
print()
try:
await example_1_basic_search()
await example_2_relevance_scoring()
await example_3_update_frequency_analysis()
await example_4_data_structure_analysis()
await example_5_comprehensive_insights()
await example_6_download_dataset()
print("β
All examples completed successfully!")
except KeyboardInterrupt:
print("\nβΉοΈ Examples interrupted by user")
except Exception as e:
print(f"\nβ Error running examples: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(main())