server.py•60.6 kB
#!/usr/bin/env python3
"""
Kulturerbe MCP Server - Traditional MCP Implementation
Austrian Cultural Heritage Search via Kulturpool API
6-Tool Progressive Disclosure Architecture:
1. kulturpool_explore - Facet exploration (< 2KB response)
2. kulturpool_search_filtered - Filtered search (max 20 results)
3. kulturpool_get_details - Related object discovery (max 3 IDs)
4. kulturpool_get_institutions - Institution directory with locations
5. kulturpool_get_institution_details - Detailed institution metadata
6. kulturpool_get_assets - Optimized image assets with transformations
"""
import json
import logging
import time
import asyncio
import hashlib
import copy
from collections import defaultdict, deque, OrderedDict
from datetime import datetime
from urllib.parse import quote, urlencode
from typing import Any, Dict, List, Optional, ClassVar, Tuple
from threading import Lock, Thread
import requests
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
from pydantic import BaseModel, Field, field_validator, ValidationError
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# Configure logging
logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
# Create MCP server
server = Server("kulturerbe-mcp-server")
# ==============================================================================
# RESPONSE MICROCACHING
# ==============================================================================
class ResponseCache:
"""Thread-safe bounded TTL cache for API responses"""
def __init__(self, max_size: int = 1000, cleanup_interval: float = 300.0):
self._cache: OrderedDict[str, Tuple[float, Any]] = OrderedDict()
self._lock = Lock()
self._max_size = max_size
self._cleanup_interval = cleanup_interval
self._stats: Dict[str, int] = {
"hits": 0,
"misses": 0,
"evictions": 0,
}
self._cleanup_thread: Optional[Thread] = None
if cleanup_interval > 0:
self._start_cleanup_thread()
def _start_cleanup_thread(self) -> None:
def _periodic_cleanup() -> None:
while True:
time.sleep(self._cleanup_interval)
try:
self.clear_expired()
except Exception:
logger.exception("ResponseCache cleanup failed")
self._cleanup_thread = Thread(
target=_periodic_cleanup,
name="response-cache-cleanup",
daemon=True,
)
self._cleanup_thread.start()
def _generate_cache_key(self, url: str, params: Dict[str, Any]) -> str:
"""Generate consistent cache key from URL and parameters"""
sorted_params = sorted(params.items()) if params else []
cache_string = f"{url}|{sorted_params}"
return hashlib.md5(cache_string.encode()).hexdigest()
def _evict_expired_locked(self, now: Optional[float] = None) -> None:
current_time = now or time.time()
expired_keys = [
key for key, (expiry_time, _) in self._cache.items()
if current_time > expiry_time
]
for key in expired_keys:
self._cache.pop(key, None)
def _enforce_size_locked(self) -> None:
while len(self._cache) > self._max_size:
self._cache.popitem(last=False)
self._stats["evictions"] += 1
def get(self, url: str, params: Dict[str, Any]) -> Optional[Any]:
"""Get cached response if not expired"""
cache_key = self._generate_cache_key(url, params)
now = time.time()
with self._lock:
entry = self._cache.get(cache_key)
if not entry:
self._stats["misses"] += 1
return None
expiry_time, data = entry
if now > expiry_time:
self._cache.pop(cache_key, None)
self._stats["misses"] += 1
return None
self._cache.move_to_end(cache_key)
self._stats["hits"] += 1
return copy.deepcopy(data)
def set(self, url: str, params: Dict[str, Any], data: Any, ttl_seconds: float) -> None:
"""Cache response data with TTL"""
cache_key = self._generate_cache_key(url, params)
expiry_time = time.time() + ttl_seconds
entry: Tuple[float, Any] = (expiry_time, copy.deepcopy(data))
with self._lock:
self._evict_expired_locked(now=time.time())
self._cache[cache_key] = entry
self._cache.move_to_end(cache_key)
self._enforce_size_locked()
def clear_expired(self) -> None:
"""Remove all expired entries (for maintenance)"""
with self._lock:
self._evict_expired_locked()
def get_stats(self) -> Dict[str, int]:
"""Get cache statistics"""
with self._lock:
total_entries = len(self._cache)
memory_estimate = sum(len(str(data)) for _, data in self._cache.values())
return {
"total_entries": total_entries,
"memory_usage_estimate": memory_estimate,
"hits": self._stats["hits"],
"misses": self._stats["misses"],
"evictions": self._stats["evictions"],
"max_size": self._max_size,
}
# Global cache instance
response_cache = ResponseCache()
# ==============================================================================
# SECURITY & RATE LIMITING CLASSES
# ==============================================================================
class SecurityValidator:
"""Input validation and sanitization against prompt injection"""
DANGEROUS_CHARS = ['<', '>', '"', "'", '&', ';', '|', '`', '$']
DANGEROUS_PATTERNS = ['javascript:', 'data:', 'vbscript:', 'file:',
'ignore previous instructions', 'system prompt',
'assistant:', 'human:', '```', 'eval(', 'exec(',
'import os', 'subprocess', '/etc/passwd', '../']
@staticmethod
def sanitize_input(value: str) -> str:
"""Sanitize input string from dangerous characters and patterns"""
if not isinstance(value, str):
raise ValueError("Input must be string")
# Remove dangerous characters
for char in SecurityValidator.DANGEROUS_CHARS:
value = value.replace(char, '')
# Check for dangerous patterns
value_lower = value.lower()
for pattern in SecurityValidator.DANGEROUS_PATTERNS:
if pattern in value_lower:
raise ValueError(f"Security violation: dangerous pattern detected")
# Length limits
if len(value) > 500:
raise ValueError("Input too long (max 500 characters)")
return value.strip()
@staticmethod
def sanitize_query(value: str) -> str:
"""Sanitize query string - preserve search features like quotes and &"""
if not isinstance(value, str):
raise ValueError("Input must be string")
# Check for dangerous patterns (but keep quotes and &)
value_lower = value.lower()
for pattern in SecurityValidator.DANGEROUS_PATTERNS:
if pattern in value_lower:
raise ValueError(f"Security violation: dangerous pattern detected")
# Length limits
if len(value) > 500:
raise ValueError("Input too long (max 500 characters)")
return value.strip() # Only trim whitespace, no character removal
class RateLimiter:
"""Rate limiting: 100 requests per hour - simplified single-client version"""
def __init__(self, max_requests: int = 100, time_window: int = 3600):
"""Initialize rate limiter - single global bucket"""
self.max_requests = max_requests
self.time_window = time_window
self.requests = deque()
def is_allowed(self) -> bool:
"""Check if request is allowed - simplified for single client"""
now = time.time()
# Remove old requests outside time window
while self.requests and self.requests[0] < now - self.time_window:
self.requests.popleft()
# Check limit
if len(self.requests) >= self.max_requests:
return False
# Add current request
self.requests.append(now)
return True
# Global rate limiter instance
rate_limiter = RateLimiter()
# ==============================================================================
# IMAGE URL HELPERS - USING OFFICIAL API FIELDS
# ==============================================================================
def _extract_image_urls(doc: Dict) -> Dict[str, str]:
"""Extract image URLs from official API fields with fallback logic"""
preview_url = doc.get('previewImage', '')
# Try official API fields first
large_image = doc.get('isShownBy', '')
medium_image = doc.get('object', '')
# Fallback to URL manipulation if API fields are empty
if not large_image and preview_url:
if "_medium.webp" in preview_url:
large_image = preview_url.replace("_medium.webp", "_large.webp")
elif ".webp" in preview_url and "_medium" not in preview_url:
large_image = preview_url.replace(".webp", "_large.webp")
else:
large_image = preview_url # Use preview as fallback
if not medium_image and preview_url:
if "_small.webp" in preview_url:
medium_image = preview_url.replace("_small.webp", "_medium.webp")
else:
medium_image = preview_url # Use preview as fallback
return {
"preview_url": preview_url,
"large_image": large_image,
"medium_image": medium_image,
"iiif_manifest": doc.get('iiifManifest', '')
}
# ==============================================================================
# KULTURPOOL API CLIENT
# ==============================================================================
class KulturpoolClient:
"""Secure HTTP client for Kulturpool API"""
BASE_URL = "https://api.kulturpool.at/search/"
TIMEOUT = 10
def __init__(self):
self.base_url = "https://api.kulturpool.at" # Add base_url attribute
self.session = requests.Session()
# Configure retry strategy
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("https://", adapter)
self.session.headers.update({
'User-Agent': 'Kulturerbe-MCP-Server/1.0'
})
def search(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""Execute search with security validation and caching"""
try:
# Validate URL
if not self.BASE_URL.startswith("https://api.kulturpool.at"):
raise ValueError("Only Kulturpool API allowed")
original_params = copy.deepcopy(params)
# Check cache first (60 second TTL for search endpoints)
cached_response = response_cache.get(self.BASE_URL, original_params)
if cached_response:
logger.debug("Cache hit for search request")
return cached_response
# Special handling for filter_by and sort_by parameters that need unencoded characters
special_params = {}
params_for_request = copy.deepcopy(original_params)
if 'filter_by' in params_for_request:
special_params['filter_by'] = params_for_request.pop('filter_by')
if 'sort_by' in params_for_request:
special_params['sort_by'] = params_for_request.pop('sort_by')
if special_params:
# Build URL manually with proper encoding for special parameters
base_url = f"{self.BASE_URL}?{urlencode(params_for_request)}" if params_for_request else self.BASE_URL
separator = "&" if "?" in base_url else "?"
# Add special parameters while preserving :, =, & characters
special_parts = []
for key, value in special_params.items():
# Encode filter_by preserving only essential separators
# Encode parentheses and pipes to avoid proxy parsing issues
encoded_value = quote(value, safe='=:&')
special_parts.append(f"{key}={encoded_value}")
full_url = f"{base_url}{separator}{'&'.join(special_parts)}"
response = self.session.get(full_url, timeout=self.TIMEOUT)
else:
response = self.session.get(
self.BASE_URL,
params=params_for_request,
timeout=self.TIMEOUT
)
response.raise_for_status()
# Cache successful responses (60 seconds for search endpoints)
response_data = response.json()
response_cache.set(self.BASE_URL, original_params, response_data, 60.0)
logger.debug("Response cached for search request")
return response_data
except requests.RequestException as e:
raise ConnectionError(f"Kulturpool API request failed: {str(e)}")
def get_institutions(self, include_locations: bool = True, language: str = "de") -> Dict[str, Any]:
"""Get list of all institutions with caching"""
url = f"{self.base_url}/institutions/"
params = {}
if language in ["de", "en"]:
params["language"] = language
# Check cache first (6 hour TTL for institutions - they change rarely)
cache_params = params.copy()
cache_params["include_locations"] = include_locations
cached_response = response_cache.get(url, cache_params)
if cached_response:
logger.debug("Cache hit for institutions request")
return cached_response
try:
response = self.session.get(url, params=params, timeout=30)
response.raise_for_status()
data = response.json()
institutions = data.get('data', [])
processed_institutions = []
for inst in institutions:
processed_inst = {
"id": inst.get('id'),
"name": inst.get('name'),
"web_collection_url": inst.get('web_collection_url'),
"website_url": inst.get('website_url')
}
if include_locations and inst.get('location'):
location = inst.get('location', {})
if location.get('coordinates'):
coords = location['coordinates']
if coords and len(coords) > 0 and len(coords[0]) >= 2:
processed_inst["location"] = {
"lat": coords[0][1],
"lng": coords[0][0]
}
processed_institutions.append(processed_inst)
processed_result = {
"institutions": processed_institutions,
"total_count": len(processed_institutions)
}
# Cache successful response (6 hours for institutions)
response_cache.set(url, cache_params, processed_result, 21600.0) # 6 hours
logger.debug("Response cached for institutions request")
return processed_result
except requests.Timeout:
raise ValueError("API request timed out")
except requests.RequestException as e:
raise ValueError(f"API request failed: {str(e)}")
def get_institution_details(self, institution_id: int, language: str = "de") -> Dict[str, Any]:
"""Get detailed information about a specific institution with caching"""
url = f"{self.base_url}/institutions/{institution_id}"
params = {}
if language in ["de", "en"]:
params["language"] = language
cache_params = params.copy()
# Check cache first (12 hour TTL for institution details - they change rarely)
cached_response = response_cache.get(url, cache_params)
if cached_response:
logger.debug("Cache hit for institution details request")
return cached_response
try:
response = self.session.get(url, params=params, timeout=30)
response.raise_for_status()
data = response.json()
institution_data = data.get('data', {})
# Cache successful response (12 hours for institution details)
response_cache.set(url, cache_params, institution_data, 43200.0) # 12 hours
logger.debug("Response cached for institution details request")
return institution_data
except requests.Timeout:
raise ValueError("API request timed out")
except requests.RequestException as e:
raise ValueError(f"API request failed: {str(e)}")
def get_asset(self, asset_id: str, width: Optional[int] = None, height: Optional[int] = None, format: str = "webp", quality: int = 85, fit: str = "inside") -> Dict[str, Any]:
"""Get optimized asset with transformations and caching"""
url = f"{self.base_url}/assets/{asset_id}"
params = {}
if width:
params["width"] = width
if height:
params["height"] = height
if format in ["webp", "jpeg", "png"]:
params["format"] = format
if 1 <= quality <= 100:
params["quality"] = quality
if fit in ["inside", "outside", "cover", "fill"]:
params["fit"] = fit
# Check cache first (30 minute TTL for assets - moderate caching)
cached_response = response_cache.get(url, params)
if cached_response:
logger.debug("Cache hit for asset request")
return cached_response
try:
# Direct GET request (simplified approach based on audit findings)
response = self.session.get(url, params=params, timeout=30)
response.raise_for_status()
logger.debug("Asset metadata retrieved via GET request")
asset_data = {
"asset_id": asset_id,
"url": response.url,
"content_type": response.headers.get('content-type', ''),
"content_length": response.headers.get('content-length', ''),
"transformations": params
}
# Cache successful response (24 hours - assets rarely change)
response_cache.set(url, params, asset_data, 86400.0) # 24 hours
logger.debug("Response cached for asset request (24h TTL)")
return asset_data
except requests.Timeout:
raise ValueError("API request timed out")
except requests.RequestException as e:
raise ValueError(f"API request failed: {str(e)}")
# Global API client
api_client = KulturpoolClient()
# ==============================================================================
# PARAMETER VALIDATION MODELS
# ==============================================================================
class KulturpoolExploreParams(BaseModel):
"""Parameters for kulturpool_explore tool"""
query: str = Field(..., min_length=1, max_length=500)
max_examples: int = Field(default=10, ge=1, le=10)
@field_validator('query')
@classmethod
def validate_query(cls, v):
return SecurityValidator.sanitize_query(v)
class KulturpoolSearchParams(BaseModel):
"""Parameters for kulturpool_search_filtered tool"""
query: str = Field(..., min_length=1, max_length=500)
institutions: Optional[List[str]] = Field(None, max_length=10)
object_types: Optional[List[str]] = Field(None, max_length=5)
date_from: Optional[int] = Field(None)
date_to: Optional[int] = Field(None)
limit: int = Field(default=15, ge=1, le=20)
sort_by: Optional[str] = Field(None)
# Creator filter
creators: Optional[List[str]] = Field(None, max_length=5)
# Subject filter
subjects: Optional[List[str]] = Field(None, max_length=10)
# Media filter
media: Optional[List[str]] = Field(None, max_length=5)
# Dublin Core Type filter - LIMITED due to performance
dc_types: Optional[List[str]] = Field(None, max_length=3)
# Known institutions whitelist
KNOWN_INSTITUTIONS: ClassVar[List[str]] = [
"Albertina", "Belvedere", "Österreichische Nationalbibliothek",
"Wiener Stadt- und Landesarchiv", "MAK", "Weltmuseum Wien",
"Technisches Museum Wien", "Naturhistorisches Museum Wien"
]
# Known object types
KNOWN_TYPES: ClassVar[List[str]] = ["IMAGE", "TEXT", "SOUND", "VIDEO", "3D"]
# Known Dublin Core types (performance-limited list)
KNOWN_DC_TYPES: ClassVar[List[str]] = [
"Fotografie", "Gemälde", "Zeichnung", "Grafik",
"Druckwerk", "Karte", "Münze", "Medaille"
]
# Known sort fields
KNOWN_SORT_FIELDS: ClassVar[List[str]] = [
"titleSort:asc", "titleSort:desc",
"dataProvider:asc", "dataProvider:desc",
"dateMin:asc", "dateMin:desc",
"dateMax:asc", "dateMax:desc"
]
@field_validator('query')
@classmethod
def validate_query(cls, v):
return SecurityValidator.sanitize_query(v)
@field_validator('institutions')
@classmethod
def validate_institutions(cls, v):
if v:
return [inst for inst in v if inst in cls.KNOWN_INSTITUTIONS]
return v
@field_validator('object_types')
@classmethod
def validate_types(cls, v):
if v:
return [t for t in v if t in cls.KNOWN_TYPES]
return v
@field_validator('sort_by')
@classmethod
def validate_sort_by(cls, v):
if v and v not in cls.KNOWN_SORT_FIELDS:
raise ValueError(f"sort_by must be one of: {', '.join(cls.KNOWN_SORT_FIELDS)}")
return v
@field_validator('creators')
@classmethod
def validate_creators(cls, v):
if v:
# Sanitize each creator name
sanitized = []
for creator in v:
sanitized_creator = SecurityValidator.sanitize_input(creator)
if sanitized_creator: # Only add non-empty creators
sanitized.append(sanitized_creator)
return sanitized
return v
@field_validator('subjects')
@classmethod
def validate_subjects(cls, v):
if v:
# Sanitize each subject
sanitized = []
for subject in v:
sanitized_subject = SecurityValidator.sanitize_input(subject)
if sanitized_subject: # Only add non-empty subjects
sanitized.append(sanitized_subject)
return sanitized
return v
@field_validator('media')
@classmethod
def validate_media(cls, v):
if v:
# Sanitize each medium
sanitized = []
for medium in v:
sanitized_medium = SecurityValidator.sanitize_input(medium)
if sanitized_medium: # Only add non-empty media
sanitized.append(sanitized_medium)
return sanitized
return v
@field_validator('dc_types')
@classmethod
def validate_dc_types(cls, v):
if v:
# Performance-aware validation - limit to known types
sanitized = []
for dc_type in v:
sanitized_type = SecurityValidator.sanitize_input(dc_type)
if sanitized_type and sanitized_type in cls.KNOWN_DC_TYPES:
sanitized.append(sanitized_type)
return sanitized
return v
class KulturpoolDetailsParams(BaseModel):
"""Parameters for kulturpool_get_details tool"""
object_ids: List[str] = Field(..., min_length=1, max_length=3)
@field_validator('object_ids')
@classmethod
def validate_object_ids(cls, v):
import re
validated = []
for obj_id in v[:3]: # Hard limit to 3
# Basic ID validation
if re.match(r'^[a-zA-Z0-9_-]+$', obj_id):
validated.append(SecurityValidator.sanitize_input(obj_id))
if not validated:
raise ValueError("No valid object IDs provided")
return validated
# ==============================================================================
# RESPONSE PROCESSING UTILITIES
# ==============================================================================
class ResponseProcessor:
"""Utilities for processing and optimizing API responses"""
@staticmethod
def _to_year(value) -> Optional[int]:
"""Convert mixed date values (year or epoch s/ms) to a year.
Returns None if not convertible or implausible.
"""
try:
n = int(value)
except (ValueError, TypeError):
return None
# Treat small integers as years
if n <= 3000:
return n if 1000 <= n <= 2100 else None
# Heuristic: ms -> s
if n > 10_000_000_000:
n //= 1000
try:
return datetime.utcfromtimestamp(n).year
except Exception:
return None
@staticmethod
def analyze_facets(hits: List[Dict]) -> Dict[str, Dict[str, int]]:
"""Extract facet information from hits for overview"""
facets = {
"institutions": {},
"types": {},
"periods": {}
}
for hit in hits:
doc = hit.get('document', {})
# Institution facets
if provider := doc.get('dataProvider'):
facets["institutions"][provider] = facets["institutions"].get(provider, 0) + 1
# Type facets
if obj_type := doc.get('edmType'):
facets["types"][obj_type] = facets["types"].get(obj_type, 0) + 1
# Period facets (by century) using dateMin, fallback to dateMax
date_value = doc.get('dateMin') if doc.get('dateMin') is not None else doc.get('dateMax')
if date_value is not None:
year = ResponseProcessor._to_year(date_value)
if year:
century = f"{((year - 1)//100) + 1}. Jahrhundert"
facets["periods"][century] = facets["periods"].get(century, 0) + 1
# Convert to regular dicts and sort by count
return {
key: dict(sorted(facet_dict.items(),
key=lambda x: x[1], reverse=True)[:10])
for key, facet_dict in facets.items()
}
@staticmethod
def analyze_facets_from_response(response_data: Dict[str, Any]) -> Dict[str, Dict[str, int]]:
"""Prefer server-provided facet_counts when available; fallback to sampling hits."""
facet_counts = response_data.get('facet_counts') or []
if not facet_counts:
return ResponseProcessor.analyze_facets(response_data.get('hits', []))
facets = {
"institutions": {},
"types": {},
"periods": {},
}
for fc in facet_counts:
field = fc.get('field_name') or fc.get('field')
counts = fc.get('counts', [])
if field == 'dataProvider':
for item in counts:
v = item.get('value')
c = item.get('count', 0)
if v:
facets['institutions'][v] = facets['institutions'].get(v, 0) + c
elif field == 'edmType':
for item in counts:
v = item.get('value')
c = item.get('count', 0)
if v:
facets['types'][v] = facets['types'].get(v, 0) + c
elif field == 'dateMin':
for item in counts:
v = item.get('value')
c = item.get('count', 0)
y = ResponseProcessor._to_year(v)
if y:
century = f"{((y - 1)//100) + 1}. Jahrhundert"
facets['periods'][century] = facets['periods'].get(century, 0) + c
return {
key: dict(sorted(val.items(), key=lambda x: x[1], reverse=True)[:10])
for key, val in facets.items()
}
@staticmethod
def format_sample_results(hits: List[Dict], max_samples: int = 5) -> List[Dict]:
"""Extract sample results with essential metadata only"""
samples = []
for hit in hits[:max_samples]:
doc = hit.get('document', {})
# Extract essential fields only
sample = {
"id": doc.get('id', 'unknown'),
"title": doc.get('title', ['Unbekannter Titel'])[0] if doc.get('title') else 'Unbekannter Titel',
"creator": doc.get('creator', ['Unbekannt'])[0] if doc.get('creator') else 'Unbekannt',
"institution": doc.get('dataProvider', 'Unbekannt'),
"type": doc.get('edmType', 'Unbekannt'),
}
# Add image URLs using official API fields
sample.update(_extract_image_urls(doc))
# Add date if available (dateMin, fallback dateMax)
year = None
if (dv := doc.get('dateMin')) is not None:
year = ResponseProcessor._to_year(dv)
if year is None and (dv := doc.get('dateMax')) is not None:
year = ResponseProcessor._to_year(dv)
sample["date"] = str(year) if year else "Unbekannt"
samples.append(sample)
return samples
# ==============================================================================
# TOOL HANDLERS
# ==============================================================================
@server.call_tool()
async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]:
"""Handle tool calls"""
try:
if name == "kulturpool_explore":
return await kulturpool_explore_handler(arguments)
elif name == "kulturpool_search_filtered":
return await kulturpool_search_filtered_handler(arguments)
elif name == "kulturpool_get_details":
return await kulturpool_get_details_handler(arguments)
elif name == "kulturpool_get_institutions":
return await kulturpool_get_institutions_handler(arguments)
elif name == "kulturpool_get_institution_details":
return await kulturpool_get_institution_details_handler(arguments)
elif name == "kulturpool_get_assets":
return await kulturpool_get_assets_handler(arguments)
else:
logger.warning(f"Unknown tool requested: {name}")
return [TextContent(type="text", text=f"Unknown tool: {name}")]
except Exception as e:
logger.error(f"Tool execution failed: {name} - {str(e)}", exc_info=True)
error_response = {
"error": f"Tool execution failed: {name}",
"message": str(e)
}
return [TextContent(type="text", text=json.dumps(error_response, indent=2))]
async def kulturpool_explore_handler(arguments: Dict[str, Any]) -> List[TextContent]:
"""Handle kulturpool_explore tool calls"""
# Rate limiting check
if not rate_limiter.is_allowed():
logger.warning(f"Rate limit exceeded for kulturpool_explore")
raise ValueError("Rate limit exceeded. Please try again later.")
# Parameter validation
params = KulturpoolExploreParams(**arguments)
# Build API request parameters
api_params = {
'q': params.query,
'per_page': 30, # QA-compromise: maintains facet fidelity without prior 50-item overhead
'facet_by': 'dataProvider,edmType,dateMin,dateMax',
'include_fields': 'id,title,creator,dataProvider,edmType,previewImage,isShownAt,isShownBy,object,iiifManifest,dateMin,dateMax,subject,description'
}
# Execute API call
response_data = await asyncio.to_thread(api_client.search, api_params)
# Process response
total_found = response_data.get('found', 0)
hits = response_data.get('hits', [])
# Generate facets and samples (prefer facet_counts when present)
facets = ResponseProcessor.analyze_facets_from_response(response_data)
sample_results = ResponseProcessor.format_sample_results(hits, params.max_examples)
# Build response
result = {
"total_found": total_found,
"query": params.query,
"facets": facets,
"sample_results": sample_results,
"message": f"Found {total_found} results. Use kulturpool_search_filtered with specific facets to narrow down." if total_found > 1000 else f"Found {total_found} manageable results."
}
return [TextContent(type="text", text=json.dumps(result, indent=2, ensure_ascii=False))]
async def kulturpool_search_filtered_handler(arguments: Dict[str, Any]) -> List[TextContent]:
"""Handle kulturpool_search_filtered tool calls"""
# Rate limiting check
if not rate_limiter.is_allowed():
logger.warning(f"Rate limit exceeded for kulturpool_search_filtered")
raise ValueError("Rate limit exceeded. Please try again later.")
# Parameter validation
params = KulturpoolSearchParams(**arguments)
# Build API filters - OR within facet groups, AND between facets
groups = []
# Institutions (OR within group)
inst_filters = [f"dataProvider:={v}" for v in (params.institutions or [])]
if inst_filters:
groups.append(f"({' || '.join(inst_filters)})" if len(inst_filters) > 1 else inst_filters[0])
# Object types (OR within group)
type_filters = [f"edmType:={v}" for v in (params.object_types or [])]
if type_filters:
groups.append(f"({' || '.join(type_filters)})" if len(type_filters) > 1 else type_filters[0])
# Creators (wildcard, OR within group)
creator_filters = [f"creator:*{v}*" for v in (params.creators or [])]
if creator_filters:
groups.append(f"({' || '.join(creator_filters)})" if len(creator_filters) > 1 else creator_filters[0])
# Subjects (exact matching, OR within group)
subject_filters = [f"subject:={v}" for v in (params.subjects or [])]
if subject_filters:
groups.append(f"({' || '.join(subject_filters)})" if len(subject_filters) > 1 else subject_filters[0])
# Media (exact matching, OR within group)
media_filters = [f"medium:={v}" for v in (params.media or [])]
if media_filters:
groups.append(f"({' || '.join(media_filters)})" if len(media_filters) > 1 else media_filters[0])
# DC Types (exact matching, OR within group)
dc_filters = [f"dcType:={v}" for v in (params.dc_types or [])]
if dc_filters:
groups.append(f"({' || '.join(dc_filters)})" if len(dc_filters) > 1 else dc_filters[0])
# Date range filters with interval-overlap semantics
# Convert years to Unix timestamps for API compatibility
# Overlap([dateMin,dateMax], [from,to]) => (dateMin <= to) && (dateMax >= from OR dateMin >= from)
if params.date_from and params.date_to:
# Convert years to Unix timestamps (Jan 1st)
from_ts = int(datetime(params.date_from, 1, 1).timestamp())
to_ts = int(datetime(params.date_to, 12, 31, 23, 59, 59).timestamp())
# Only include objects with valid dates when filtering by date
groups.append(f"dateMin:<={to_ts}")
groups.append(f"(dateMax:>={from_ts} || dateMin:>={from_ts})")
groups.append(f"(dateMin:>0 || dateMax:>0)")
elif params.date_from and not params.date_to:
from_ts = int(datetime(params.date_from, 1, 1).timestamp())
groups.append(f"(dateMax:>={from_ts} || dateMin:>={from_ts})")
groups.append(f"(dateMin:>0 || dateMax:>0)")
elif params.date_to and not params.date_from:
to_ts = int(datetime(params.date_to, 12, 31, 23, 59, 59).timestamp())
groups.append(f"dateMin:<={to_ts}")
groups.append(f"(dateMin:>0 || dateMax:>0)")
# Build API parameters
api_params = {
'q': params.query,
'per_page': params.limit,
'include_fields': 'id,title,creator,dataProvider,edmType,previewImage,isShownAt,isShownBy,object,iiifManifest,dateMin,dateMax,subject,description'
}
# Debug: Log the groups and parameters
debug_info = {
'date_from': params.date_from,
'date_to': params.date_to,
'groups_count': len(groups),
'groups': groups[:3], # First 3 for debugging
'groups_bool': bool(groups),
'filter_by_set': False
}
# Force filter application if groups exist
if len(groups) > 0:
filter_string = ' && '.join(groups)
api_params['filter_by'] = filter_string
debug_info['final_filter_by'] = filter_string
debug_info['filter_by_set'] = True
if params.sort_by:
api_params['sort_by'] = params.sort_by
# Execute API call
response_data = await asyncio.to_thread(api_client.search, api_params)
# Process results
total_found = response_data.get('found', 0)
hits = response_data.get('hits', [])
# Format detailed results
results = []
for hit in hits:
doc = hit.get('document', {})
result_obj = {
"id": doc.get('id'),
"title": doc.get('title', [''])[0] if doc.get('title') else '',
"creator": doc.get('creator', []),
"institution": doc.get('dataProvider'),
"type": doc.get('edmType'),
"description": doc.get('description', [''])[0][:200] if doc.get('description') else '',
"subjects": doc.get('subject', [])[:5],
"detail_url": doc.get('isShownAt')
}
# Add image URLs using official API fields
result_obj.update(_extract_image_urls(doc))
# Add formatted date from dateMin, fallback to dateMax
y = None
if (dv := doc.get('dateMin')) is not None:
y = ResponseProcessor._to_year(dv)
if y is None and (dv := doc.get('dateMax')) is not None:
y = ResponseProcessor._to_year(dv)
result_obj["date"] = y
results.append(result_obj)
# Build final response
result = {
"total_found": total_found,
"returned": len(results),
"query": params.query,
"applied_filters": {
"institutions": params.institutions,
"object_types": params.object_types,
"date_from": params.date_from,
"date_to": params.date_to,
"creators": params.creators,
"subjects": params.subjects,
"media": params.media,
"dc_types": params.dc_types
},
"debug_filter_query": api_params.get('filter_by', 'No filters applied'),
"debug_info": debug_info,
"results": results,
"message": f"Results contain detailed metadata. Note: kulturpool_get_details has API limitations - use these search results directly." if results else "No results found. Try different filters."
}
return [TextContent(type="text", text=json.dumps(result, indent=2, ensure_ascii=False))]
async def kulturpool_get_details_handler(arguments: Dict[str, Any]) -> List[TextContent]:
"""Handle kulturpool_get_details tool calls - find related objects using IDs as search terms"""
# Rate limiting check
if not rate_limiter.is_allowed():
logger.warning(f"Rate limit exceeded for kulturpool_get_details")
raise ValueError("Rate limit exceeded. Please try again later.")
# Parameter validation
params = KulturpoolDetailsParams(**arguments)
related_searches = []
# Use each object ID as search term to find related objects
for obj_id in params.object_ids:
try:
# Search using the ID as a search term to find related content
api_params = {
'q': obj_id,
'per_page': 5, # Find up to 5 related objects per ID
'sort_by': '_score:desc' # Most relevant first
}
response_data = await asyncio.to_thread(api_client.search, api_params)
hits = response_data.get('hits', [])
total_found = response_data.get('found', 0)
related_objects = []
for hit in hits:
doc = hit.get('document', {})
related_obj = {
"id": doc.get('id'),
"title": doc.get('title', [''])[0] if doc.get('title') else '',
"creator": doc.get('creator', []),
"institution": doc.get('dataProvider'),
"type": doc.get('edmType'),
"preview_url": doc.get('previewImage'),
"detail_url": doc.get('isShownAt'),
"relevance_note": f"Found by searching for '{obj_id}'"
}
# Add date if available (dateMin, fallback dateMax)
y = None
if (dv := doc.get('dateMin')) is not None:
y = ResponseProcessor._to_year(dv)
if y is None and (dv := doc.get('dateMax')) is not None:
y = ResponseProcessor._to_year(dv)
if y is not None:
related_obj["date"] = y
related_objects.append(related_obj)
related_searches.append({
"search_term": obj_id,
"total_found": total_found,
"returned": len(related_objects),
"related_objects": related_objects,
"note": f"Objects found by using '{obj_id}' as search term"
})
except Exception as search_error:
related_searches.append({
"search_term": obj_id,
"error": f"Search failed: {str(search_error)}",
"related_objects": []
})
# Build final response
result = {
"search_strategy": "Using object IDs as search terms to find related cultural objects",
"searches_performed": len(params.object_ids),
"results": related_searches,
"note": "This tool finds related objects by content-based search, not direct ID lookup"
}
return [TextContent(type="text", text=json.dumps(result, indent=2, ensure_ascii=False))]
# ==============================================================================
# INSTITUTION MANAGEMENT HANDLERS (Phase 5)
# ==============================================================================
async def kulturpool_get_institutions_handler(arguments: Dict[str, Any]) -> List[TextContent]:
"""Handle kulturpool_get_institutions tool calls"""
if not rate_limiter.is_allowed():
logger.warning(f"Rate limit exceeded for kulturpool_get_institutions")
raise ValueError("Rate limit exceeded. Please try again later.")
include_locations = arguments.get('include_locations', True)
language = arguments.get('language', 'de')
if language not in ['de', 'en']:
language = 'de'
try:
response_data = await asyncio.to_thread(
api_client.get_institutions,
include_locations=include_locations,
language=language
)
response_data["request_params"] = {
"include_locations": include_locations,
"language": language
}
response_data["response_info"] = {
"timestamp": datetime.now().isoformat(),
"data_source": "Kulturpool API",
"note": "Use kulturpool_get_institution_details for complete information"
}
return [TextContent(type="text", text=json.dumps(response_data, indent=2, ensure_ascii=False))]
except Exception as e:
raise ValueError(f"Failed to retrieve institutions: {str(e)}")
async def kulturpool_get_institution_details_handler(arguments: Dict[str, Any]) -> List[TextContent]:
"""Handle kulturpool_get_institution_details tool calls"""
if not rate_limiter.is_allowed():
logger.warning(f"Rate limit exceeded for kulturpool_get_institution_details")
raise ValueError("Rate limit exceeded. Please try again later.")
institution_id = arguments.get('institution_id')
language = arguments.get('language', 'de')
if not institution_id or institution_id < 1:
raise ValueError("Valid institution_id is required")
if language not in ['de', 'en']:
language = 'de'
try:
institution_data = await asyncio.to_thread(
api_client.get_institution_details,
institution_id=institution_id,
language=language
)
processed_data = {
"institution_id": institution_id,
"basic_info": {
"name": institution_data.get('name', ''),
"web_collection_url": institution_data.get('web_collection_url', ''),
"website_url": institution_data.get('website_url', '')
},
"location": None,
"images": {
"favicon": institution_data.get('favicon', {}),
"hero_image": institution_data.get('hero_image', {})
},
"metadata": {
"intermediate_provider": institution_data.get('intermediate_provider'),
"language": language,
"timestamp": datetime.now().isoformat()
}
}
location = institution_data.get('location', {})
if location and location.get('coordinates'):
coords = location['coordinates']
if coords and len(coords) > 0 and len(coords[0]) >= 2:
processed_data["location"] = {
"type": location.get('type', 'MultiPoint'),
"coordinates": {
"longitude": coords[0][0],
"latitude": coords[0][1]
}
}
processed_data["note"] = "For image assets, use kulturpool_get_assets with asset IDs"
return [TextContent(type="text", text=json.dumps(processed_data, indent=2, ensure_ascii=False))]
except Exception as e:
raise ValueError(f"Failed to retrieve institution details for ID {institution_id}: {str(e)}")
async def kulturpool_get_assets_handler(arguments: Dict[str, Any]) -> List[TextContent]:
"""Handle kulturpool_get_assets tool calls"""
if not rate_limiter.is_allowed():
logger.warning(f"Rate limit exceeded for kulturpool_get_assets")
raise ValueError("Rate limit exceeded. Please try again later.")
asset_id = arguments.get('asset_id')
if not asset_id:
raise ValueError("asset_id is required")
asset_id = SecurityValidator.sanitize_input(asset_id)
width = arguments.get('width')
height = arguments.get('height')
format = arguments.get('format', 'webp')
quality = arguments.get('quality', 85)
fit = arguments.get('fit', 'inside')
if format not in ['webp', 'jpeg', 'png']:
format = 'webp'
if not (1 <= quality <= 100):
quality = 85
if fit not in ['inside', 'outside', 'cover', 'fill']:
fit = 'inside'
if width and not (1 <= width <= 4000):
width = None
if height and not (1 <= height <= 4000):
height = None
try:
asset_data = await asyncio.to_thread(
api_client.get_asset,
asset_id=asset_id,
width=width,
height=height,
format=format,
quality=quality,
fit=fit
)
asset_data["usage_info"] = {
"original_asset_id": asset_id,
"transformation_applied": bool(width or height),
"timestamp": datetime.now().isoformat(),
"note": "This URL provides optimized image with specified transformations"
}
return [TextContent(type="text", text=json.dumps(asset_data, indent=2, ensure_ascii=False))]
except Exception as e:
raise ValueError(f"Failed to retrieve asset {asset_id}: {str(e)}")
@server.list_tools()
async def list_tools() -> List[Tool]:
"""List available tools"""
return [
Tool(
name="kulturpool_explore",
description="Explore Austrian cultural heritage with facet overview. Always returns < 2KB response with facets and sample results.",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query for cultural objects",
"minLength": 1,
"maxLength": 500
},
"max_examples": {
"type": "integer",
"description": "Maximum number of sample results to return",
"minimum": 1,
"maximum": 10,
"default": 5
}
},
"required": ["query"]
}
),
Tool(
name="kulturpool_search_filtered",
description="Filtered search with specific facets. Returns max 20 results with complete metadata. Date filters accept years and use interval overlap semantics: an object matches if its [dateMin,dateMax] overlaps [date_from,date_to]. Supports creators, subjects, media, and Dublin Core type filters.",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query",
"minLength": 1,
"maxLength": 500
},
"institutions": {
"type": "array",
"items": {"type": "string"},
"description": "Filter by institutions (e.g., 'Albertina', 'Belvedere', 'Österreichische Nationalbibliothek')",
"maxItems": 10
},
"object_types": {
"type": "array",
"items": {"type": "string"},
"description": "Filter by object types (IMAGE, TEXT, SOUND, VIDEO, 3D)",
"maxItems": 5
},
"date_from": {
"type": "integer",
"description": "Filter by start year (interval overlaps)"
},
"date_to": {
"type": "integer",
"description": "Filter by end year (interval overlaps)"
},
"limit": {
"type": "integer",
"description": "Maximum results to return",
"minimum": 1,
"maximum": 20,
"default": 15
},
"sort_by": {
"type": "string",
"description": "Sort results by field (titleSort:asc, titleSort:desc, dataProvider:asc, dataProvider:desc, dateMin:asc, dateMin:desc, dateMax:asc, dateMax:desc)",
"enum": ["titleSort:asc", "titleSort:desc", "dataProvider:asc", "dataProvider:desc", "dateMin:asc", "dateMin:desc", "dateMax:asc", "dateMax:desc"]
},
"creators": {
"type": "array",
"items": {"type": "string"},
"description": "Filter by creators/artists (supports partial matching, e.g., 'Mozart', 'Dürer')",
"maxItems": 5
},
"subjects": {
"type": "array",
"items": {"type": "string"},
"description": "Filter by subjects/topics (exact matching, e.g., 'Portrait', 'Historische Person')",
"maxItems": 10
},
"media": {
"type": "array",
"items": {"type": "string"},
"description": "Filter by medium/material (exact matching, e.g., 'Handschrift', 'Glasdia', 'Fotografie')",
"maxItems": 5
},
"dc_types": {
"type": "array",
"items": {"type": "string"},
"description": "Filter by Dublin Core types (e.g., 'Fotografie', 'Gemälde'). WARNING: Can generate large result sets. Use with other filters.",
"maxItems": 3
}
},
"required": ["query"]
}
),
Tool(
name="kulturpool_get_details",
description="Find related cultural objects using object IDs as search terms. Uses content-based search to discover similar objects, related works, or objects from same collections. Does NOT retrieve additional metadata (use kulturpool_search_filtered results directly for complete object details).",
inputSchema={
"type": "object",
"properties": {
"object_ids": {
"type": "array",
"items": {"type": "string"},
"description": "Object IDs to use as search terms for finding related cultural objects and collections",
"minItems": 1,
"maxItems": 3
}
},
"required": ["object_ids"]
} ),
Tool(
name="kulturpool_get_institutions",
description="Get comprehensive list of all participating cultural institutions in the Kulturpool network. Returns institution names, IDs, websites, and optional geographical coordinates.",
inputSchema={
"type": "object",
"properties": {
"include_locations": {
"type": "boolean",
"description": "Include geographical coordinates for institutions",
"default": True
},
"language": {
"type": "string",
"description": "Response language (de for German, en for English)",
"enum": ["de", "en"],
"default": "de"
}
}
}
),
Tool(
name="kulturpool_get_institution_details",
description="Get detailed information about a specific cultural institution, including location data, images, and multilingual content. Use institution IDs from kulturpool_get_institutions.",
inputSchema={
"type": "object",
"properties": {
"institution_id": {
"type": "integer",
"description": "Institution ID (get from kulturpool_get_institutions)",
"minimum": 1
},
"language": {
"type": "string",
"description": "Response language (de for German, en for English)",
"enum": ["de", "en"],
"default": "de"
}
},
"required": ["institution_id"]
}
),
Tool(
name="kulturpool_get_assets",
description="Get optimized image assets with dynamic transformations. Supports resizing, format conversion, and quality adjustment for institution logos, hero images, and other visual assets.",
inputSchema={
"type": "object",
"properties": {
"asset_id": {
"type": "string",
"description": "Asset ID (get from institution details, favicon, hero_image fields)"
},
"width": {
"type": "integer",
"description": "Target width in pixels (1-4000)",
"minimum": 1,
"maximum": 4000
},
"height": {
"type": "integer",
"description": "Target height in pixels (1-4000)",
"minimum": 1,
"maximum": 4000
},
"format": {
"type": "string",
"description": "Output image format",
"enum": ["webp", "jpeg", "png"],
"default": "webp"
},
"quality": {
"type": "integer",
"description": "Image quality percentage (1-100)",
"minimum": 1,
"maximum": 100,
"default": 85
},
"fit": {
"type": "string",
"description": "Resize behavior: inside (maintain aspect ratio within bounds), outside (fill bounds, may crop), cover (fill exactly), fill (stretch to fit)",
"enum": ["inside", "outside", "cover", "fill"],
"default": "inside"
}
},
"required": ["asset_id"]
}
)
]
# ==============================================================================
# MAIN SERVER ENTRY POINT
# ==============================================================================
async def main():
"""Run the MCP server"""
import sys
import asyncio
from mcp.server.models import InitializationOptions
from mcp.server.lowlevel.server import NotificationOptions
from mcp.types import ServerCapabilities
logger.info("Starting Kulturerbe MCP Server v2.2...")
logger.info("6-Tool Progressive Disclosure Architecture:")
logger.info("1. kulturpool_explore - Facet overview (< 2KB)")
logger.info("2. kulturpool_search_filtered - Filtered results (≤ 20)")
logger.info("3. kulturpool_get_details - Related objects (≤ 3 IDs)")
logger.info("4. kulturpool_get_institutions - Institution list")
logger.info("5. kulturpool_get_institution_details - Institution details")
logger.info("6. kulturpool_get_assets - Optimized image assets")
logger.info("Rate limit: 100 requests/hour per client")
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="kulturerbe-mcp-server",
server_version="1.0.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
if __name__ == "__main__":
import asyncio
asyncio.run(main())