"""
Helper classes for CKAN data analysis.
This module contains the core analysis classes used by CKAN tools:
- RelevanceScorer: Scores datasets based on query relevance
- UpdateFrequencyAnalyzer: Analyzes dataset update patterns
- SummaryBuilder: Creates summarized representations of CKAN data
"""
import logging
import re
from datetime import UTC, datetime
from typing import Any
from urllib.parse import quote
from .types import (
CkanDatastoreField,
CkanPackage,
CkanResource,
CkanToolsConfig,
PackageSummary,
ResourceAnalysis,
ResourceSummary,
UpdateFrequencyCategory,
)
logger = logging.getLogger(__name__)
class RelevanceScorer:
"""Scores datasets based on query relevance using weighted matching."""
def __init__(self, config: CkanToolsConfig):
"""Initialize the scorer with configuration weights."""
self.config = config
self.stop_words = {
"a",
"an",
"the",
"and",
"or",
"but",
"in",
"on",
"at",
"to",
"for",
"of",
"with",
"by",
"from",
"as",
"is",
"was",
"are",
"were",
"be",
"been",
"being",
"have",
"has",
"had",
"do",
"does",
"did",
"will",
"would",
"could",
"should",
"may",
"might",
"must",
"can",
"this",
"that",
"these",
"those",
}
def score(self, dataset: CkanPackage, query: str) -> int:
"""
Calculate relevance score for a dataset against a query.
Args:
dataset: The CKAN package to score
query: The search query
Returns:
Relevance score (higher = more relevant)
"""
query_terms = self._tokenize_query(query)
if not query_terms:
return 0
score = 0
score += self._score_title(dataset.title, query_terms)
score += self._score_description(dataset.notes, query_terms)
score += self._score_tags(dataset.tags or [], query_terms)
score += self._score_organization(dataset.organization, query_terms)
score += self._score_resources(dataset.resources or [], query_terms)
return score
def _score_title(self, title: str | None, query_terms: list[str]) -> int:
"""Score title match using tokenized query terms."""
return self._score_text_field(title, query_terms, self.config.relevance_weights.title)
def _score_description(self, description: str | None, query_terms: list[str]) -> int:
"""Score description match using tokenized query terms."""
return self._score_text_field(
description, query_terms, self.config.relevance_weights.description
)
def _score_tags(self, tags: list[Any], query_terms: list[str]) -> int:
"""Score tag match."""
if not tags:
return 0
for tag in tags:
tag_value = ""
if hasattr(tag, "name"):
tag_value = tag.name or ""
elif isinstance(tag, dict):
tag_value = tag.get("name", "") or ""
elif isinstance(tag, str):
tag_value = tag
if self._text_matches_terms(tag_value, query_terms):
return self.config.relevance_weights.tags
return 0
def _score_organization(self, organization: Any, query_terms: list[str]) -> int:
"""Score organization match."""
org_title = ""
if hasattr(organization, "title"):
org_title = organization.title or ""
elif isinstance(organization, dict):
org_title = organization.get("title", "")
return self._score_text_field(
org_title, query_terms, self.config.relevance_weights.organization
)
def _score_resources(self, resources: list[CkanResource], query_terms: list[str]) -> int:
"""Score resource match with diminishing returns for multiple hits."""
if not resources or not query_terms:
return 0
base_score = self.config.relevance_weights.resource
multipliers = [1.0, 0.7]
total_score = 0
match_count = 0
for resource in resources:
resource_text = f"{resource.name or ''} {resource.format or ''}".strip()
if not resource_text:
continue
if self._text_matches_terms(resource_text, query_terms):
if match_count < len(multipliers):
multiplier = multipliers[match_count]
else:
multiplier = 0.5
increment = max(1, int(round(base_score * multiplier)))
total_score += increment
match_count += 1
return total_score
def _tokenize_query(self, query: str) -> list[str]:
"""Split the query into meaningful, de-duplicated terms."""
if not query:
return []
terms = re.findall(r"\b\w+\b", query.lower())
filtered = [term for term in terms if term not in self.stop_words and len(term) > 1]
# Preserve order while removing duplicates
seen = set()
unique_terms = []
for term in filtered:
if term not in seen:
unique_terms.append(term)
seen.add(term)
return unique_terms
def _score_text_field(self, text: str | None, query_terms: list[str], weight: int) -> int:
"""Return weight if any term matches text using word boundaries."""
if not text or not query_terms:
return 0
return weight if self._text_matches_terms(text, query_terms) else 0
def _text_matches_terms(self, text: str, query_terms: list[str]) -> bool:
"""Check whether any query term matches the provided text."""
text_lower = text.lower().replace("_", " ")
return any(self._word_boundary_match(term, text_lower) for term in query_terms)
def _word_boundary_match(self, term: str, text_lower: str) -> bool:
"""Return True if term appears as a full word within the text."""
if not term:
return False
pattern = r"\b" + re.escape(term) + r"\b"
return re.search(pattern, text_lower) is not None
class UpdateFrequencyAnalyzer:
"""Analyzes dataset update frequency patterns."""
PATTERNS = {
"daily": ["daily", "real-time"],
"weekly": ["weekly"],
"monthly": ["monthly"],
"quarterly": ["quarterly"],
"annually": ["annual", "yearly"],
"irregular": ["irregular", "as needed"],
}
def __init__(self, config: CkanToolsConfig):
"""Initialize the analyzer with configuration thresholds."""
self.config = config
def categorize(self, dataset: CkanPackage) -> UpdateFrequencyCategory:
"""
Categorize dataset update frequency.
Args:
dataset: The CKAN package to analyze
Returns:
Update frequency category
"""
refresh_rate = (dataset.refresh_rate or "").lower()
# Check explicit patterns first
for category, patterns in self.PATTERNS.items():
if any(pattern in refresh_rate for pattern in patterns):
return category # type: ignore
# Infer from metadata if available
return self._infer_from_metadata(dataset)
def _infer_from_metadata(self, dataset: CkanPackage) -> UpdateFrequencyCategory:
"""Infer frequency from metadata timestamps."""
last_update = dataset.maintainer_updated or dataset.metadata_modified
if not last_update:
return "unknown"
try:
days_since = self._days_since_date(last_update)
thresholds = self.config.frequency_thresholds
if days_since < thresholds.frequent_days:
return "frequent"
elif days_since < thresholds.monthly_days:
return "monthly"
elif days_since < thresholds.quarterly_days:
return "quarterly"
else:
return "infrequent"
except (ValueError, TypeError) as e:
logger.warning(f"Could not parse date '{last_update}': {e}")
return "unknown"
def _days_since_date(self, date_string: str) -> int:
"""Calculate days since a given date."""
try:
normalized_value = date_string.strip()
if not normalized_value:
raise ValueError("Date string is empty")
if normalized_value.endswith("Z"):
normalized_value = normalized_value[:-1] + "+00:00"
parsed_date: datetime | None = None
try:
parsed_date = datetime.fromisoformat(normalized_value)
except ValueError:
date_formats = [
"%Y-%m-%dT%H:%M:%S.%fZ",
"%Y-%m-%dT%H:%M:%SZ",
"%Y-%m-%dT%H:%M:%S.%f",
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%d",
]
for fmt in date_formats:
try:
parsed_date = datetime.strptime(date_string, fmt)
break
except ValueError:
continue
if parsed_date is None:
raise ValueError(f"Could not parse date: {date_string}")
if parsed_date.tzinfo is not None:
parsed_date = parsed_date.astimezone(UTC).replace(tzinfo=None)
now = datetime.now()
return (now - parsed_date).days
except Exception as e:
logger.debug(f"Error calculating days since date '{date_string}': {e}")
raise
class SummaryBuilder:
"""Builds summary representations of CKAN data."""
def __init__(self, config: CkanToolsConfig):
"""Initialize the summary builder with configuration."""
self.config = config
def package(self, pkg: CkanPackage) -> PackageSummary:
"""Create a package summary."""
package_name = pkg.name or pkg.id
package_title = pkg.title or package_name
return PackageSummary(
id=pkg.id,
name=package_name,
title=package_title,
description=self._truncate_description(pkg.notes),
organization=(
pkg.organization.title
if (pkg.organization and pkg.organization.title)
else "Unknown"
),
tags=[tag.name for tag in pkg.tags[:5]],
created=pkg.metadata_created or pkg.metadata_modified or "",
last_modified=pkg.metadata_modified or pkg.metadata_created or "",
resource_count=len(pkg.resources),
datastore_resources=self._count_datastore_resources(pkg.resources),
url=self._build_dataset_url(pkg),
)
def resource(self, resource: CkanResource) -> ResourceSummary:
"""Create a resource summary."""
return ResourceSummary(
id=resource.id,
name=resource.name or resource.id,
format=resource.format or "Unknown",
size=resource.size,
datastore_active=bool(resource.datastore_active),
last_modified=self._get_resource_last_modified(resource),
)
def resource_analysis(
self,
resource: CkanResource,
fields: list[CkanDatastoreField] | None = None,
record_count: int | None = None,
sample_data: list[dict[str, Any]] | None = None,
) -> ResourceAnalysis:
"""Create a detailed resource analysis."""
return ResourceAnalysis(
id=resource.id,
name=resource.name or resource.id,
format=resource.format or "Unknown",
size=resource.size,
datastore_active=bool(resource.datastore_active),
last_modified=self._get_resource_last_modified(resource),
mimetype=resource.mimetype,
url=resource.url,
created=resource.created or "",
fields=fields,
record_count=record_count,
sample_data=sample_data,
)
def _get_resource_last_modified(self, resource: CkanResource) -> str:
"""Return a usable last modified string, falling back to created date."""
return resource.last_modified or resource.created or ""
def _truncate_description(self, notes: str | None) -> str:
"""Truncate description to maximum length."""
if not notes:
return ""
max_length = 200
return notes[:max_length] + "..." if len(notes) > max_length else notes
def _count_datastore_resources(self, resources: list[CkanResource]) -> int:
"""Count resources with active datastore."""
return sum(1 for resource in resources if resource.datastore_active)
def _build_dataset_url(self, pkg: CkanPackage) -> str:
"""Build dataset URL from package and configuration."""
pkg_id = pkg.id or ""
pkg_name = pkg.name or pkg_id
template = self.config.dataset_page_url_template
if template:
return template.replace("{id}", quote(pkg_id)).replace("{name}", quote(pkg_name))
if self.config.ckan_site_url:
return f"{self.config.ckan_site_url}/dataset?id={quote(pkg_id)}"
return pkg.ckan_url or pkg.url or ""
class DatasetUrlBuilder:
"""Utility class for building dataset URLs."""
def __init__(self, config: CkanToolsConfig):
"""Initialize the URL builder with configuration."""
self.config = config
def build_dataset_url(self, pkg: CkanPackage | dict[str, Any]) -> str:
"""Build dataset URL from package and configuration."""
# Handle both CkanPackage objects and dictionaries
if isinstance(pkg, dict):
pkg_id = pkg.get("id") or ""
pkg_name = pkg.get("name") or pkg_id
pkg_url = pkg.get("url", "")
pkg_ckan_url = pkg.get("ckan_url", "")
else:
pkg_id = pkg.id or ""
pkg_name = pkg.name or pkg_id
pkg_url = pkg.url or ""
pkg_ckan_url = pkg.ckan_url or ""
template = self.config.dataset_page_url_template
if template:
return template.replace("{id}", quote(pkg_id)).replace("{name}", quote(pkg_name))
if self.config.ckan_site_url:
return f"{self.config.ckan_site_url}/dataset?id={quote(pkg_id)}"
return pkg_ckan_url or pkg_url or ""
# Utility functions for backward compatibility
def analyze_dataset_relevance(dataset: CkanPackage, query: str, config: CkanToolsConfig) -> int:
"""Analyze dataset relevance using RelevanceScorer."""
scorer = RelevanceScorer(config)
return scorer.score(dataset, query)
def get_update_frequency_category(dataset: CkanPackage, config: CkanToolsConfig) -> str:
"""Get update frequency category using UpdateFrequencyAnalyzer."""
analyzer = UpdateFrequencyAnalyzer(config)
return analyzer.categorize(dataset)
def create_package_summary(pkg: CkanPackage, config: CkanToolsConfig) -> PackageSummary:
"""Create package summary using SummaryBuilder."""
builder = SummaryBuilder(config)
return builder.package(pkg)
def create_resource_summary(resource: CkanResource, config: CkanToolsConfig) -> ResourceSummary:
"""Create resource summary using SummaryBuilder."""
builder = SummaryBuilder(config)
return builder.resource(resource)