#!/usr/bin/env python3
"""
CKAN Analysis Components Examples
This script demonstrates the three core analysis components of the CKAN MCP server:
- RelevanceScorer: Scores datasets based on query relevance
- UpdateFrequencyAnalyzer: Analyzes dataset update patterns
- SummaryBuilder: Creates structured summaries of CKAN data
Run this script to see how the analysis components work with real Toronto Open Data.
"""
import asyncio
import json
import os
from datetime import datetime, timedelta
from dotenv import load_dotenv
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src"))
from ckan_mcp.ckan_tools import CkanToolsManager, CkanApiError
from ckan_mcp.types import (
CkanToolsConfig,
RelevanceWeights,
FrequencyThresholds,
CkanPackage,
CkanResource,
CkanTag,
CkanOrganization
)
from ckan_mcp.helpers import RelevanceScorer, UpdateFrequencyAnalyzer, SummaryBuilder
# Load environment variables
load_dotenv()
def _load_toronto_config() -> dict:
"""Load Toronto configuration from the JSON config file."""
config_path = os.path.join(
os.path.dirname(__file__), "..", "src", "ckan_mcp", "data", "ckan_config_selection.json"
)
with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f)
toronto_config = config_data["countries"]["Canada"]["locations"]["Toronto"]
# Build the complete configuration dict
result = {
"ckan_base_url": toronto_config["base_url"],
}
# Add overrides if they exist
if "overrides" in toronto_config:
result.update(toronto_config["overrides"])
return result
TORONTO_DEFAULTS = _load_toronto_config()
def _build_default_config() -> CkanToolsConfig:
"""Create a CKAN config with custom analysis weights."""
base_url = os.getenv("CKAN_BASE_URL", TORONTO_DEFAULTS["ckan_base_url"])
site_url = os.getenv("CKAN_SITE_URL", TORONTO_DEFAULTS["ckan_site_url"])
dataset_url_template = os.getenv(
"CKAN_DATASET_URL_TEMPLATE", TORONTO_DEFAULTS["dataset_page_url_template"]
)
# Custom relevance weights for demonstration
custom_weights = RelevanceWeights(
title=15, # Higher weight for title matches
description=7, # Medium weight for description matches
tags=5, # Good weight for tag matches
organization=3, # Small weight for organization matches
resource=2 # Small weight for resource matches
)
# Custom frequency thresholds
custom_thresholds = FrequencyThresholds(
frequent_days=14, # Consider updates within 14 days as frequent
monthly_days=45, # Consider updates within 45 days as monthly
quarterly_days=120 # Consider updates within 120 days as quarterly
)
config_kwargs = {
"ckan_base_url": base_url,
"ckan_site_url": site_url,
"dataset_page_url_template": dataset_url_template,
"action_transport": TORONTO_DEFAULTS["action_transport"],
"datastore_id_alias": TORONTO_DEFAULTS["datastore_id_alias"],
"helper_prompt": TORONTO_DEFAULTS["helper_prompt"],
"relevance_weights": custom_weights,
"frequency_thresholds": custom_thresholds,
}
api_key = os.getenv("CKAN_API_KEY")
if api_key:
config_kwargs["api_key"] = api_key
return CkanToolsConfig(**config_kwargs)
DEFAULT_CONFIG = _build_default_config()
def _create_manager() -> CkanToolsManager:
"""Create a manager using the custom configuration."""
return CkanToolsManager(DEFAULT_CONFIG)
def create_sample_dataset() -> CkanPackage:
"""Create a sample dataset for testing analysis components."""
# Sample organization
org = CkanOrganization(
id="city-of-toronto",
name="city-of-toronto",
title="City of Toronto"
)
# Sample tags
tags = [
CkanTag(id="transportation", name="transportation"),
CkanTag(id="traffic", name="traffic"),
CkanTag(id="real-time", name="real-time"),
CkanTag(id="api", name="api")
]
# Create sample package with realistic data
dataset = CkanPackage(
id="traffic-volumes-toronto",
name="traffic-volumes-toronto",
title="Traffic Volumes - Toronto Transportation",
notes="""This dataset contains traffic volume counts for various intersections across the City of Toronto.
Data is collected through automated traffic counters and updated on a regular basis.
The dataset includes information about vehicle types, traffic patterns, and peak hours.
This data is essential for transportation planning and traffic management.""",
tags=tags,
organization=org,
resources=[
CkanResource(
id="resource-1",
name="Traffic Volume Data",
format="CSV",
size=1048576,
datastore_active=True,
url="https://example.com/traffic-data.csv"
),
CkanResource(
id="resource-2",
name="API Documentation",
format="PDF",
size=524288,
datastore_active=False,
url="https://example.com/api-doc.pdf"
)
],
metadata_created="2023-01-15T10:30:00Z",
metadata_modified="2024-11-20T14:45:00Z",
refresh_rate="daily"
)
return dataset
async def demo_relevance_scorer():
"""Demonstrate RelevanceScorer functionality with various queries."""
print("šÆ Demo 1: Relevance Scorer")
print("=" * 60)
# Create scorer with custom configuration
manager = _create_manager()
scorer = manager.scorer
print("Configuration:")
print(f" Title weight: {DEFAULT_CONFIG.relevance_weights.title}")
print(f" Description weight: {DEFAULT_CONFIG.relevance_weights.description}")
print(f" Tags weight: {DEFAULT_CONFIG.relevance_weights.tags}")
print(f" Organization weight: {DEFAULT_CONFIG.relevance_weights.organization}")
print(f" Resource weight: {DEFAULT_CONFIG.relevance_weights.resource}")
print()
# Create sample dataset
dataset = create_sample_dataset()
print(f"Dataset: {dataset.title}")
print(f"Description: {dataset.notes[:100]}...")
print(f"Organization: {dataset.organization.title}")
print(f"Tags: {', '.join(tag.name for tag in dataset.tags)}")
print()
# Test different queries
test_queries = [
"traffic",
"transportation",
"toronto",
"data",
"api",
"weather", # Should score 0
"volume"
]
print("Relevance Scores:")
print("-" * 30)
for query in test_queries:
score = scorer.score(dataset, query)
print(f"Query: '{query:12}' -> Score: {score:3d}")
# Show breakdown for a couple of queries
if query in ["traffic", "toronto"]:
print(f" Title match: {'ā' if query in dataset.title.lower() else 'ā'} (+{DEFAULT_CONFIG.relevance_weights.title if query in dataset.title.lower() else 0})")
print(f" Description match: {'ā' if query in dataset.notes.lower() else 'ā'} (+{DEFAULT_CONFIG.relevance_weights.description if query in dataset.notes.lower() else 0})")
print(f" Tag match: {'ā' if any(query in tag.name.lower() for tag in dataset.tags) else 'ā'} (+{DEFAULT_CONFIG.relevance_weights.tags if any(query in tag.name.lower() for tag in dataset.tags) else 0})")
print(f" Organization match: {'ā' if query in dataset.organization.title.lower() else 'ā'} (+{DEFAULT_CONFIG.relevance_weights.organization if query in dataset.organization.title.lower() else 0})")
print(f" Resource match: {'ā' if any(query in (res.name or '').lower() for res in dataset.resources) else 'ā'} (+{DEFAULT_CONFIG.relevance_weights.resource if any(query in (res.name or '').lower() for res in dataset.resources) else 0})")
print()
async def demo_update_frequency_analyzer():
"""Demonstrate UpdateFrequencyAnalyzer functionality."""
print("š Demo 2: Update Frequency Analyzer")
print("=" * 60)
manager = _create_manager()
analyzer = manager.analyzer
print("Configuration:")
print(f" Frequent threshold: {DEFAULT_CONFIG.frequency_thresholds.frequent_days} days")
print(f" Monthly threshold: {DEFAULT_CONFIG.frequency_thresholds.monthly_days} days")
print(f" Quarterly threshold: {DEFAULT_CONFIG.frequency_thresholds.quarterly_days} days")
print()
# Create test datasets with different update patterns
test_datasets = []
# Create datasets with different update patterns using model_construct
# Create datasets with different update patterns
# Base dataset data
base_data = create_sample_dataset().model_dump()
# Dataset 1: Explicit refresh rate
base_data_copy1 = base_data.copy()
base_data_copy1.update({
"refresh_rate": "real-time",
"name": "real-time-transit-data"
})
dataset1 = CkanPackage.model_construct(**base_data_copy1)
test_datasets.append(("Real-time Transit Data", dataset1))
# Dataset 2: Recent update
base_data_copy2 = base_data.copy()
base_data_copy2.update({
"metadata_modified": (datetime.now() - timedelta(days=5)).isoformat() + "Z",
"refresh_rate": None,
"name": "recent-parking-data"
})
dataset2 = CkanPackage.model_construct(**base_data_copy2)
test_datasets.append(("Recent Parking Data (5 days ago)", dataset2))
# Dataset 3: Monthly update
base_data_copy3 = base_data.copy()
base_data_copy3.update({
"metadata_modified": (datetime.now() - timedelta(days=35)).isoformat() + "Z",
"refresh_rate": None,
"name": "monthly-service-requests"
})
dataset3 = CkanPackage.model_construct(**base_data_copy3)
test_datasets.append(("Monthly Service Requests (35 days ago)", dataset3))
# Dataset 4: Quarterly update
base_data_copy4 = base_data.copy()
base_data_copy4.update({
"metadata_modified": (datetime.now() - timedelta(days=100)).isoformat() + "Z",
"refresh_rate": None,
"name": "quarterly-budget-report"
})
dataset4 = CkanPackage.model_construct(**base_data_copy4)
test_datasets.append(("Quarterly Budget Report (100 days ago)", dataset4))
# Dataset 5: Old update
base_data_copy5 = base_data.copy()
base_data_copy5.update({
"metadata_modified": (datetime.now() - timedelta(days=200)).isoformat() + "Z",
"refresh_rate": None,
"name": "annual-census-data"
})
dataset5 = CkanPackage.model_construct(**base_data_copy5)
test_datasets.append(("Annual Census Data (200 days ago)", dataset5))
# Dataset 6: Explicit patterns
explicit_patterns = [
("Daily Weather Data", "daily"),
("Weekly Crime Statistics", "weekly"),
("Monthly Financial Reports", "monthly"),
("Quarterly Performance Metrics", "quarterly"),
("Annual Population Estimates", "annually"),
("Irregular Special Events", "irregular")
]
print("Explicit Pattern Analysis:")
print("-" * 40)
base_dataset_data = create_sample_dataset().model_dump()
for name, pattern in explicit_patterns:
dataset_data = base_dataset_data.copy()
dataset_data.update({
"refresh_rate": pattern,
"name": name.lower().replace(" ", "-")
})
dataset = CkanPackage.model_construct(**dataset_data)
category = analyzer.categorize(dataset)
print(f"{name:30} -> {category.upper()}")
print()
print("Inferred from Metadata:")
print("-" * 40)
for name, dataset in test_datasets:
category = analyzer.categorize(dataset)
last_update = dataset.metadata_modified or dataset.maintainer_updated or "Unknown"
print(f"{name:40} -> {category.upper()}")
print(f" Last modified: {last_update}")
print()
async def demo_summary_builder():
"""Demonstrate SummaryBuilder functionality."""
print("š Demo 3: Summary Builder")
print("=" * 60)
manager = _create_manager()
summary_builder = manager.summary_builder
# Create sample dataset
dataset = create_sample_dataset()
print("Original Dataset:")
print("-" * 30)
print(f"ID: {dataset.id}")
print(f"Name: {dataset.name}")
print(f"Title: {dataset.title}")
print(f"Description: {dataset.notes[:100]}...")
print(f"Organization: {dataset.organization.title}")
print(f"Tags: {[tag.name for tag in dataset.tags]}")
print(f"Resources: {len(dataset.resources)}")
print()
# Create package summary
package_summary = summary_builder.package(dataset)
print("Package Summary:")
print("-" * 20)
print(f"ID: {package_summary.id}")
print(f"Name: {package_summary.name}")
print(f"Title: {package_summary.title}")
print(f"Description: {package_summary.description}")
print(f"Organization: {package_summary.organization}")
print(f"Tags: {package_summary.tags}")
print(f"Created: {package_summary.created}")
print(f"Last Modified: {package_summary.last_modified}")
print(f"Resource Count: {package_summary.resource_count}")
print(f"Datastore Resources: {package_summary.datastore_resources}")
print(f"URL: {package_summary.url}")
print()
# Create resource summaries
print("Resource Summaries:")
print("-" * 25)
for i, resource in enumerate(dataset.resources, 1):
resource_summary = summary_builder.resource(resource)
print(f"Resource {i}:")
print(f" ID: {resource_summary.id}")
print(f" Name: {resource_summary.name}")
print(f" Format: {resource_summary.format}")
print(f" Size: {resource_summary.size:,} bytes" if resource_summary.size else " Size: Unknown")
print(f" Datastore Active: {resource_summary.datastore_active}")
print(f" Last Modified: {resource_summary.last_modified}")
print()
# Demonstrate resource analysis with mock field data
print("Resource Analysis (with mock field data):")
print("-" * 45)
from ckan_mcp.types import CkanDatastoreField
# Mock field data for demonstration
mock_fields = [
CkanDatastoreField(id="intersection_id", type="int"),
CkanDatastoreField(id="intersection_name", type="text", info={"label": "Intersection Name"}),
CkanDatastoreField(id="date", type="timestamp"),
CkanDatastoreField(id="vehicle_count", type="int", info={"label": "Vehicle Count"}),
CkanDatastoreField(id="average_speed", type="numeric"),
]
# Mock sample data
mock_sample_data = [
{"intersection_id": 1001, "intersection_name": "Yonge & Bloor", "date": "2024-11-20T08:00:00Z", "vehicle_count": 1250, "average_speed": 35.2},
{"intersection_id": 1002, "intersection_name": "Queen & Spadina", "date": "2024-11-20T08:00:00Z", "vehicle_count": 980, "average_speed": 28.7},
{"intersection_id": 1003, "intersection_name": "King & Bathurst", "date": "2024-11-20T08:00:00Z", "vehicle_count": 750, "average_speed": 32.1},
]
datastore_resource = dataset.resources[0] # First resource has datastore_active=True
resource_analysis = summary_builder.resource_analysis(
datastore_resource,
fields=mock_fields,
record_count=15420,
sample_data=mock_sample_data[:2] # Show only first 2 records
)
print(f"Resource: {resource_analysis.name}")
print(f"Format: {resource_analysis.format}")
print(f"Record Count: {resource_analysis.record_count:,}")
print(f"Fields ({len(resource_analysis.fields or [])}):")
for field in (resource_analysis.fields or [])[:5]:
field_label = field.info.label if field.info and field.info.label else field.id
print(f" ⢠{field_label} ({field.type})")
print(f"Sample Data ({len(resource_analysis.sample_data or [])} records):")
for i, record in enumerate(resource_analysis.sample_data or [], 1):
print(f" Record {i}:")
for key, value in list(record.items())[:4]:
print(f" {key}: {value}")
if len(record) > 4:
print(f" ... and {len(record) - 4} more fields")
print()
async def demo_real_world_analysis():
"""Demonstrate analysis with real Toronto Open Data."""
print("š Demo 4: Real-world Analysis with Toronto Open Data")
print("=" * 60)
manager = _create_manager()
try:
async with manager._create_session() as session:
# Search for real datasets
print("Fetching real datasets from Toronto Open Data Portal...")
search_result = await manager.fetch_package_search("transportation traffic", session, rows=5)
if not search_result.results:
print("No datasets found. Check your internet connection.")
return
print(f"Found {len(search_result.results)} transportation datasets")
print()
# Analyze each dataset
for i, dataset in enumerate(search_result.results[:3], 1):
print(f"Dataset {i}: {dataset.title}")
print("-" * 50)
# Relevance scoring
query = "transportation"
relevance_score = manager.scorer.score(dataset, query)
print(f"Relevance Score for '{query}': {relevance_score}")
# Update frequency analysis
frequency = manager.analyzer.categorize(dataset)
print(f"Update Frequency: {frequency.upper()}")
print(f"Last Modified: {dataset.metadata_modified}")
print(f"Refresh Rate: {dataset.refresh_rate or 'Not specified'}")
# Package summary
summary = manager.summary_builder.package(dataset)
print(f"Resources: {summary.resource_count} total, {summary.datastore_resources} with datastore")
print(f"Organization: {summary.organization}")
print(f"Tags: {', '.join(summary.tags[:3])}")
# Resource analysis for first datastore resource
datastore_resources = [r for r in dataset.resources if r.datastore_active]
if datastore_resources:
resource = datastore_resources[0]
print(f"Main Data Resource: {resource.name} ({resource.format})")
try:
# Try to get actual datastore info
datastore_info = await manager.fetch_datastore_info(resource.id, session)
print(f"Datastore Records: {datastore_info.total:,}")
print(f"Datastore Fields: {len(datastore_info.fields)}")
# Get sample data
if datastore_info.total > 0:
sample_data = await manager.fetch_resource(resource.id, session, limit=2)
print(f"Sample Data: {len(sample_data.records)} records")
if sample_data.records:
first_record = sample_data.records[0]
print(f"First record fields: {', '.join(list(first_record.keys())[:5])}")
except Exception as e:
print(f"Could not fetch datastore details: {e}")
print()
except Exception as e:
print(f"Error fetching real data: {e}")
print("This might be due to network issues or API changes.")
async def demo_comparative_analysis():
"""Demonstrate comparative analysis across multiple datasets."""
print("š Demo 5: Comparative Analysis")
print("=" * 60)
# Create datasets with different characteristics for comparison
datasets = []
base_data = create_sample_dataset().model_dump()
# High relevance, frequent updates
high_relevance_data = base_data.copy()
high_relevance_data.update({
"title": "Toronto Real-Time Traffic Data API",
"notes": """Real-time traffic data from Toronto's transportation network.
This API provides live traffic information including vehicle counts, speeds, and congestion levels.""",
"refresh_rate": "real-time",
"metadata_modified": (datetime.now() - timedelta(hours=2)).isoformat() + "Z"
})
high_relevance = CkanPackage.model_construct(**high_relevance_data)
datasets.append(("High Relevance, Real-time", high_relevance))
# Medium relevance, monthly updates
medium_relevance_data = base_data.copy()
medium_relevance_data.update({
"title": "Monthly Transportation Report",
"notes": """Monthly summary of transportation statistics including public transit usage
and traffic patterns for urban planning purposes.""",
"refresh_rate": "monthly",
"metadata_modified": (datetime.now() - timedelta(days=25)).isoformat() + "Z"
})
medium_relevance = CkanPackage.model_construct(**medium_relevance_data)
datasets.append(("Medium Relevance, Monthly", medium_relevance))
# Low relevance, infrequent updates
low_relevance_data = base_data.copy()
low_relevance_data.update({
"title": "Historical Infrastructure Assessment",
"notes": """Biennial assessment of city infrastructure condition including roads,
bridges, and public facilities for maintenance planning.""",
"refresh_rate": "annually",
"metadata_modified": (datetime.now() - timedelta(days=400)).isoformat() + "Z"
})
low_relevance = CkanPackage.model_construct(**low_relevance_data)
datasets.append(("Low Relevance, Annual", low_relevance))
manager = _create_manager()
print("Comparative Analysis Results:")
print("-" * 50)
query = "transportation traffic"
for description, dataset in datasets:
print(f"\n{description}:")
print(f" Dataset: {dataset.title}")
# Relevance analysis
relevance_score = manager.scorer.score(dataset, query)
print(f" Relevance Score: {relevance_score:3d} / 32") # Max possible score with custom weights
# Frequency analysis
frequency = manager.analyzer.categorize(dataset)
print(f" Update Frequency: {frequency.upper()}")
# Summary metrics
summary = manager.summary_builder.package(dataset)
print(f" Resource Count: {summary.resource_count}")
print(f" Datastore Resources: {summary.datastore_resources}")
# Calculate an overall "quality score" (relevance + recency bonus)
days_since_update = 0
if dataset.metadata_modified:
try:
# Simple date parsing for demo
date_str = dataset.metadata_modified.replace('Z', '+00:00')
if 'T' in date_str:
update_date = datetime.fromisoformat(date_str.split('T')[0])
days_since_update = (datetime.now() - update_date.replace(tzinfo=None)).days
except:
days_since_update = 999
recency_bonus = max(0, 30 - days_since_update) # 30 points max for recent data
overall_score = relevance_score + recency_bonus
print(f" Days Since Update: {days_since_update}")
print(f" Recency Bonus: {recency_bonus:2d}")
print(f" Overall Score: {overall_score:3d}")
async def main():
"""Run all analysis demonstrations."""
print("š¬ CKAN MCP Server Analysis Components Demo")
print("=" * 70)
print("This demonstration shows the three core analysis components:")
print("1. RelevanceScorer - Ranks datasets by query relevance")
print("2. UpdateFrequencyAnalyzer - Categorizes update patterns")
print("3. SummaryBuilder - Creates structured summaries")
print()
# Check environment variables
if not os.getenv("CKAN_BASE_URL"):
print("ā ļø Warning: CKAN_BASE_URL not set, using default")
print("Set environment variables for real API access:")
print("export CKAN_BASE_URL='https://ckan0.cf.opendata.inter.prod-toronto.ca/api/3/action'")
print("export CKAN_SITE_URL='https://ckan0.cf.opendata.inter.prod-toronto.ca'")
print()
try:
await demo_relevance_scorer()
await demo_update_frequency_analyzer()
await demo_summary_builder()
await demo_comparative_analysis()
await demo_real_world_analysis()
print("ā
All analysis demonstrations completed!")
except KeyboardInterrupt:
print("\nā¹ļø Demonstrations interrupted by user")
except Exception as e:
print(f"\nā Error running demonstrations: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(main())