"""SchemaManager -- main facade for schema discovery, caching, and association."""
from __future__ import annotations
import datetime
import logging
import os
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import TYPE_CHECKING
import httpx
import orjson
from strong_typing.exception import JsonKeyError, JsonTypeError, JsonValueError
from strong_typing.serialization import json_to_object, object_to_json
from mcp_json_yaml_toml.schemas.ide_cache import IDESchemaProvider
from mcp_json_yaml_toml.schemas.loading import (
_extract_schema_url_from_content,
_match_glob_pattern,
)
from mcp_json_yaml_toml.schemas.models import (
FileAssociation,
SchemaCatalog,
SchemaConfig,
SchemaInfo,
)
from mcp_json_yaml_toml.schemas.scanning import (
CACHE_EXPIRY_SECONDS,
SCHEMA_STORE_CATALOG_URL,
_get_ide_schema_locations,
)
if TYPE_CHECKING:
from strong_typing.core import Schema
logger = logging.getLogger(__name__)
class SchemaManager:
"""Manages JSON schemas with local caching and Schema Store integration."""
config: SchemaConfig
_ide_provider: IDESchemaProvider
def __init__(self, cache_dir: Path | None = None) -> None:
"""Initialize schema manager.
Args:
cache_dir: Optional custom cache directory. Defaults to ~/.cache/mcp-json-yaml-toml/schemas
"""
if cache_dir:
self.cache_dir = cache_dir
else:
self.cache_dir = Path.home() / ".cache" / "mcp-json-yaml-toml" / "schemas"
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.catalog_path = self.cache_dir / "catalog.json"
self.config_path = self.cache_dir / "schema_config.json"
self.config = self._load_config()
self._ide_provider = IDESchemaProvider()
def _load_config(self) -> SchemaConfig:
"""Load schema configuration from file.
Returns:
Typed SchemaConfig dataclass.
"""
if self.config_path.exists():
try:
raw_data = orjson.loads(self.config_path.read_bytes())
return json_to_object(SchemaConfig, raw_data)
except (
orjson.JSONDecodeError,
JsonKeyError,
JsonTypeError,
JsonValueError,
) as e:
logger.debug("Failed to load schema config: %s", e)
# Return default empty config
return SchemaConfig()
def _save_config(self) -> None:
"""Save schema configuration to file."""
self.config_path.write_bytes(
orjson.dumps(object_to_json(self.config), option=orjson.OPT_INDENT_2)
)
def get_catalog(self) -> SchemaCatalog | None:
"""Get the Schema Store catalog as a typed SchemaCatalog dataclass.
Returns:
SchemaCatalog dataclass if available, None if fetch fails and no cache exists.
"""
raw_catalog = self._get_raw_catalog()
if raw_catalog is None:
return None
try:
return json_to_object(SchemaCatalog, raw_catalog)
except (JsonKeyError, JsonTypeError, JsonValueError) as e:
logger.debug("Failed to parse catalog as SchemaCatalog: %s", e)
return None
def get_ide_provider(self) -> IDESchemaProvider:
"""Get the IDE schema provider instance."""
return self._ide_provider
def _try_load_local_schema(self, schema_path: Path) -> Schema | None:
"""Attempt to load a local JSON schema file."""
if not schema_path.exists():
return None
try:
bytes_data = schema_path.read_bytes()
if not bytes_data:
return None
data = orjson.loads(bytes_data)
except (OSError, orjson.JSONDecodeError):
return None
# Properly narrow the type of the loaded JSON to match Schema
# We check if it's a dict; for full validation against Schema (dict[str, JsonType])
# we'd need a recursive check, but proving string keys is a good start.
if isinstance(data, dict) and all(isinstance(k, str) for k in data):
return data
return None
def get_schema_path_for_file(self, file_path: Path) -> Path | None:
"""Find and return the cached schema file path for a given file.
This fetches and caches the schema if needed, then returns the path
to the cached schema file rather than loading it into memory.
Args:
file_path: Path to the file to find a schema for.
Returns:
Path to the cached schema file if found, None otherwise.
"""
info = self.get_schema_info_for_file(file_path)
if not info:
return None
url = info.url
if url.startswith("file://"):
return Path(url[7:])
return self._fetch_schema_to_cache(url)
def get_schema_for_file(self, file_path: Path) -> Schema | None:
"""Find and return the schema for a given file.
Args:
file_path: Path to the file to find a schema for.
Returns:
Parsed schema dict if found, None otherwise.
"""
info = self.get_schema_info_for_file(file_path)
if not info:
return None
url = info.url
if url.startswith("file://"):
schema_path = Path(url[7:])
loaded = self._try_load_local_schema(schema_path)
if loaded is not None:
return loaded
return self._fetch_schema(url)
def _lookup_catalog_schema(self, file_path: Path) -> SchemaInfo | None:
"""Look up schema in the Schema Store catalog."""
catalog = self.get_catalog()
if not catalog:
return None
filename = file_path.name
for schema_entry in catalog.schemas:
for pattern in schema_entry.fileMatch:
# Check exact filename match first (fast path)
if filename == pattern:
return SchemaInfo(
name=schema_entry.name, url=schema_entry.url, source="catalog"
)
# Check glob pattern match
if _match_glob_pattern(file_path, pattern):
return SchemaInfo(
name=schema_entry.name, url=schema_entry.url, source="catalog"
)
return None
def get_schema_info_for_file(self, file_path: Path) -> SchemaInfo | None:
"""Find and return schema information for a given file.
Args:
file_path: Path to the file to find a schema for.
Returns:
Dict with schema name, url, and source if found, None otherwise.
"""
# 1. Check file associations first
file_str = str(file_path.resolve())
if file_str in self.config.file_associations:
assoc = self.config.file_associations[file_str]
return SchemaInfo(
name="User Association", url=assoc.schema_url, source="user"
)
# 2. Check for $schema in content
content_schema_url = _extract_schema_url_from_content(file_path)
if content_schema_url:
return SchemaInfo(
name="In-file $schema", url=content_schema_url, source="content"
)
# 3. Check local IDE schema index
filename = file_path.name
ide_result = self._ide_provider.lookup_schema(filename, file_path)
if ide_result:
return SchemaInfo(
name=ide_result.name, url=ide_result.url, source=ide_result.source
)
# 4. Check catalog
return self._lookup_catalog_schema(file_path)
def add_file_association(
self, file_path: Path, schema_url: str, schema_name: str | None = None
) -> None:
"""Associate a file with a schema URL.
Args:
file_path: Path to the file.
schema_url: URL of the schema.
schema_name: Optional name of the schema (stored in source field).
"""
file_str = str(file_path.resolve())
self.config.file_associations[file_str] = FileAssociation(
schema_url=schema_url, source=schema_name or "user"
)
self._save_config()
def remove_file_association(self, file_path: Path) -> bool:
"""Remove file-to-schema association.
Args:
file_path: Path to the file.
Returns:
True if association was removed, False if it didn't exist.
"""
file_str = str(file_path.resolve())
if file_str in self.config.file_associations:
del self.config.file_associations[file_str]
self._save_config()
return True
return False
def _get_raw_catalog(self) -> Schema | None:
"""Get the Schema Store catalog, using cache if available.
Returns:
Parsed catalog dict if available, None if fetch fails and no cache exists.
"""
if self._is_cache_valid(self.catalog_path):
try:
cached: Schema = orjson.loads(self.catalog_path.read_bytes())
except orjson.JSONDecodeError:
pass # Invalid cache, re-fetch
else:
return cached
catalog: Schema | None = None
try:
response = httpx.get(SCHEMA_STORE_CATALOG_URL, timeout=10.0)
response.raise_for_status()
catalog = response.json()
self.catalog_path.write_bytes(orjson.dumps(catalog))
except (
httpx.HTTPError,
httpx.TimeoutException,
OSError,
orjson.JSONDecodeError,
):
# If fetch fails and we have a stale cache, use it
if self.catalog_path.exists():
try:
stale: Schema = orjson.loads(self.catalog_path.read_bytes())
except orjson.JSONDecodeError:
pass
else:
return stale
return None
return catalog
def _get_cache_path_for_url(self, url: str) -> Path:
"""Get the cache path for a schema URL.
Args:
url: URL of the schema.
Returns:
Path where the schema would be cached.
"""
schema_filename = url.rsplit("/", maxsplit=1)[-1]
if not schema_filename.endswith(".json"):
schema_filename += ".json"
return self.cache_dir / schema_filename
def _fetch_schema_to_cache(self, url: str) -> Path | None:
"""Fetch a schema and return the cache path.
Ensures the schema is cached locally and returns the path to the cache file.
Args:
url: URL of the schema to fetch.
Returns:
Path to the cached schema file, or None if fetch fails.
"""
cache_path = self._get_cache_path_for_url(url)
# If cache is valid, return it directly
if self._is_cache_valid(cache_path):
return cache_path
# Try to fetch and cache the schema
schema = self._fetch_schema(url)
if schema is not None:
return cache_path
# Check if we have a stale cache we can use
if cache_path.exists():
return cache_path
return None
def _fetch_schema(self, url: str) -> Schema | None:
"""Fetch a schema from a URL, using cache if available.
Args:
url: URL of the schema to fetch.
Returns:
Parsed schema dict if available, None if fetch fails and no cache exists.
"""
cache_path = self._get_cache_path_for_url(url)
schema_filename = cache_path.name
if self._is_cache_valid(cache_path):
try:
cached: Schema = orjson.loads(cache_path.read_bytes())
except orjson.JSONDecodeError:
pass
else:
return cached
# Check IDE caches before making network request
ide_schema = self._fetch_from_ide_cache(schema_filename, schema_url=url)
if ide_schema:
# Cache it locally for future use
cache_path.write_bytes(orjson.dumps(ide_schema))
return ide_schema
schema: Schema | None = None
try:
response = httpx.get(url, timeout=10.0)
response.raise_for_status()
schema = response.json()
cache_path.write_bytes(orjson.dumps(schema))
except (
httpx.HTTPError,
httpx.TimeoutException,
OSError,
orjson.JSONDecodeError,
):
# If fetch fails and we have a stale cache, use it
if cache_path.exists():
try:
stale: Schema = orjson.loads(cache_path.read_bytes())
except orjson.JSONDecodeError:
pass
else:
return stale
return None
return schema
def _normalize_schema_url(self, url: str) -> str:
"""Normalize schema URL for comparison.
Handles domain variants between www.schemastore.org and json.schemastore.org.
Args:
url: URL to normalize.
Returns:
Normalized URL with consistent domain.
"""
# Normalize www.schemastore.org to json.schemastore.org for comparison
return url.replace(
"https://www.schemastore.org/", "https://json.schemastore.org/"
)
def _urls_match(self, url1: str, url2: str) -> bool:
"""Check if two schema URLs match, handling domain variants.
Args:
url1: First URL to compare.
url2: Second URL to compare.
Returns:
True if URLs match after normalization, False otherwise.
"""
if url1 == url2:
return True
# Normalize both URLs and compare
normalized1 = self._normalize_schema_url(url1)
normalized2 = self._normalize_schema_url(url2)
if normalized1 == normalized2:
return True
# Fallback: compare just the filename portion
filename1 = url1.rsplit("/", maxsplit=1)[-1]
filename2 = url2.rsplit("/", maxsplit=1)[-1]
return filename1 == filename2 and filename1.endswith(".json")
def _search_hash_based_cache(
self, cache_dir: Path, schema_url: str
) -> Schema | None:
"""Search hash-based cache (vscode-yaml style) by checking $id in content.
Args:
cache_dir: Directory containing hash-named schema files.
schema_url: URL to match against schema $id field.
Returns:
Parsed schema dict if found, None otherwise.
"""
if not cache_dir.exists():
return None
for cached_file in cache_dir.iterdir():
if not cached_file.is_file():
continue
try:
content = orjson.loads(cached_file.read_bytes())
if isinstance(content, dict):
schema_id = content.get("$id")
if isinstance(schema_id, str) and self._urls_match(
schema_id, schema_url
):
return content
except (orjson.JSONDecodeError, OSError):
continue
return None
def _fetch_from_ide_cache(
self, schema_filename: str, schema_url: str | None = None
) -> Schema | None:
"""Try to find schema in IDE cache locations using concurrent checking.
Searches for schemas by:
1. Exact filename match (e.g., github-workflow.json)
2. Schema $id field match for hash-based caches (e.g., vscode-yaml)
Args:
schema_filename: Name of the schema file to look for.
schema_url: Optional URL to look for in hash-based caches.
Returns:
Parsed schema dict if found, None otherwise.
"""
cache_dirs = _get_ide_schema_locations()
def try_load_schema(cache_dir: Path) -> Schema | None:
# Try exact filename match first
schema_path = cache_dir / schema_filename
if schema_path.exists():
try:
loaded: Schema = orjson.loads(schema_path.read_bytes())
except orjson.JSONDecodeError:
pass
else:
return loaded
# Try hash-based cache lookup
if schema_url:
return self._search_hash_based_cache(cache_dir, schema_url)
return None
# Check all directories concurrently
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {
executor.submit(try_load_schema, cache_dir): cache_dir
for cache_dir in cache_dirs
}
for future in as_completed(futures):
result = future.result()
if result is not None:
# Cancel other futures if possible
for f in futures:
f.cancel()
return result
return None
def _is_cache_valid(self, path: Path) -> bool:
"""Check if a cached file is valid and not expired.
Args:
path: Path to the cached file.
Returns:
True if cache exists and is not expired, False otherwise.
"""
if not path.exists():
return False
mtime = path.stat().st_mtime
age = time.time() - mtime
return age < CACHE_EXPIRY_SECONDS
def scan_for_schema_dirs(
self, search_paths: list[Path], max_depth: int = 5
) -> list[Path]:
"""Recursively scan directories for schema caches.
Args:
search_paths: List of directories to search.
max_depth: Maximum directory depth to search.
Returns:
List of discovered schema directories.
"""
discovered = []
for search_path in search_paths:
if not search_path.exists() or not search_path.is_dir():
continue
# Recursively find schema directories with improved heuristics
for root, dirs, files in os.walk(search_path):
# Calculate current depth
depth = str(root).count(os.sep) - str(search_path).count(os.sep)
if depth > max_depth:
dirs[:] = [] # Don't recurse further
continue
dir_path = Path(root)
is_schema_dir = False
# Heuristic 1: Directory is named "schemas" or "jsonSchemas"
if dir_path.name in {"schemas", "jsonSchemas"}:
is_schema_dir = True
# Heuristic 2: Directory contains catalog.json
if "catalog.json" in files:
is_schema_dir = True
# Heuristic 3: Directory contains .schema.json files
if any(f.endswith(".schema.json") for f in files):
is_schema_dir = True
if is_schema_dir and dir_path not in discovered:
discovered.append(dir_path)
# Update config
self.config.discovered_dirs = [str(p) for p in discovered]
self.config.last_scan = datetime.datetime.now(datetime.UTC).isoformat()
self._save_config()
return discovered
def add_custom_dir(self, directory: Path) -> None:
"""Add a custom schema cache directory.
Args:
directory: Path to schema directory.
"""
dir_str = str(directory.expanduser().resolve())
if dir_str not in self.config.custom_cache_dirs:
self.config.custom_cache_dirs.append(dir_str)
self._save_config()
def add_custom_catalog(self, name: str, uri: str) -> None:
"""Add a custom schema catalog.
Args:
name: Friendly name for the catalog.
uri: URL or file path to catalog.json.
"""
self.config.custom_catalogs[name] = uri
self._save_config()
def get_config(self) -> SchemaConfig:
"""Get current schema configuration.
Returns:
Current config dataclass.
"""
return self.config