"""
Agent Orchestration Platform - Automatic Schema Generation
This module implements automatic JSON schema generation from Python type hints for FastMCP tools,
providing comprehensive type validation and documentation generation.
Architecture Integration:
- Design Patterns: Visitor pattern for type traversal, Factory pattern for schema creation
- Security Model: Input validation through schema enforcement and type safety
- Performance Profile: O(1) schema generation with caching and efficient type analysis
Technical Decisions:
- Pydantic Integration: Leverage Pydantic's schema generation for comprehensive validation
- Type Hint Analysis: Use typing module inspection for complete type information
- Security Validation: Ensure all schemas enforce security constraints and limits
- Caching Strategy: Cache generated schemas for optimal performance
Dependencies & Integration:
- External: Pydantic for schema generation, typing for type inspection
- Internal: Type system for domain types, security contracts for validation
Quality Assurance:
- Test Coverage: Property-based testing for schema accuracy and completeness
- Error Handling: Graceful fallback for unsupported types with comprehensive logging
Author: Adder_5 | Created: 2025-06-26 | Last Modified: 2025-06-26
"""
import inspect
import json
import re
from dataclasses import fields, is_dataclass
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import (
Any,
Callable,
Dict,
List,
Optional,
Type,
Union,
get_args,
get_origin,
get_type_hints,
)
# Import types for schema generation
from src.models.ids import AgentId, SessionId
from src.models.security import SecurityLevel
from .contracts_shim import ensure, require
class SchemaGenerationError(Exception):
"""Exception for schema generation failures."""
pass
class TypeSchemaGenerator:
"""
Automatic JSON schema generator from Python type hints.
Implements comprehensive schema generation with security validation,
documentation extraction, and comprehensive type support.
"""
def __init__(self):
"""Initialize schema generator with type mappings."""
self._type_cache: Dict[Type, Dict[str, Any]] = {}
self._security_constraints = {
"string_max_length": 10000,
"array_max_items": 1000,
"object_max_properties": 100,
"number_maximum": 2**31,
"number_minimum": -(2**31),
}
@require(lambda func: callable(func))
@ensure(lambda result: isinstance(result, dict))
def generate_tool_schema(self, func: Callable) -> Dict[str, Any]:
"""
Generate complete JSON schema for FastMCP tool function.
Contracts:
Preconditions:
- Function is callable
- Function has type hints for parameters
Postconditions:
- Schema includes all parameters with types
- Security constraints are applied
- Documentation is extracted from docstring
Invariants:
- Schema follows JSON Schema Draft 7 specification
- All security constraints are enforced
- Generated schema is deterministic
"""
try:
# Get function signature and type hints
signature = inspect.signature(func)
type_hints = get_type_hints(func)
# Extract documentation
docstring = inspect.getdoc(func) or ""
description, param_docs = self._parse_docstring(docstring)
# Generate parameter schemas
properties = {}
required = []
for param_name, param in signature.parameters.items():
# Skip 'ctx' parameter (FastMCP Context)
if param_name == "ctx":
continue
# Get parameter type
param_type = type_hints.get(param_name, Any)
# Generate parameter schema
param_schema = self._generate_type_schema(param_type)
# Add parameter documentation
if param_name in param_docs:
param_schema["description"] = param_docs[param_name]
properties[param_name] = param_schema
# Check if parameter is required
if param.default == param.empty:
required.append(param_name)
# Generate complete tool schema
tool_schema = {
"type": "object",
"properties": properties,
"required": required,
"additionalProperties": False,
"title": func.__name__,
"description": description,
}
# Apply security constraints
self._apply_security_constraints(tool_schema)
return tool_schema
except Exception as e:
raise SchemaGenerationError(
f"Failed to generate schema for {func.__name__}: {e}"
)
def _generate_type_schema(self, type_hint: Type) -> Dict[str, Any]:
"""Generate JSON schema for individual type hint."""
# Check cache first
if type_hint in self._type_cache:
return self._type_cache[type_hint].copy()
schema = self._generate_type_schema_impl(type_hint)
# Cache the result
self._type_cache[type_hint] = schema.copy()
return schema
def _generate_type_schema_impl(self, type_hint: Type) -> Dict[str, Any]:
"""Implementation of type schema generation."""
# Handle None type
if type_hint is type(None):
return {"type": "null"}
# Handle basic types
if type_hint == str:
return {
"type": "string",
"maxLength": self._security_constraints["string_max_length"],
}
elif type_hint in (int, float):
return {
"type": "number",
"minimum": self._security_constraints["number_minimum"],
"maximum": self._security_constraints["number_maximum"],
}
elif type_hint == bool:
return {"type": "boolean"}
elif type_hint == datetime:
return {
"type": "string",
"format": "date-time",
"description": "ISO 8601 formatted datetime string",
}
elif type_hint == Path:
return {
"type": "string",
"format": "path",
"maxLength": 1000,
"description": "Filesystem path string",
}
# Handle branded types (NewType)
if hasattr(type_hint, "__supertype__"):
base_schema = self._generate_type_schema(type_hint.__supertype__)
base_schema["description"] = f"Branded type: {type_hint.__name__}"
return base_schema
# Handle Enum types
if inspect.isclass(type_hint) and issubclass(type_hint, Enum):
return {
"type": "string",
"enum": [item.value for item in type_hint],
"description": f"Enumeration: {type_hint.__name__}",
}
# Handle Union types (including Optional)
origin = get_origin(type_hint)
args = get_args(type_hint)
if origin is Union:
# Handle Optional[T] (Union[T, None])
if len(args) == 2 and type(None) in args:
non_none_type = args[0] if args[1] is type(None) else args[1]
schema = self._generate_type_schema(non_none_type)
schema["nullable"] = True
return schema
else:
# Handle general Union types
return {"anyOf": [self._generate_type_schema(arg) for arg in args]}
# Handle List types
if origin in (list, List):
item_type = args[0] if args else Any
return {
"type": "array",
"items": self._generate_type_schema(item_type),
"maxItems": self._security_constraints["array_max_items"],
}
# Handle Dict types
if origin in (dict, Dict):
value_type = args[1] if len(args) >= 2 else Any
return {
"type": "object",
"additionalProperties": self._generate_type_schema(value_type),
"maxProperties": self._security_constraints["object_max_properties"],
}
# Handle dataclass types
if is_dataclass(type_hint):
return self._generate_dataclass_schema(type_hint)
# Handle Any type
if type_hint == Any:
return {"description": "Any type"}
# Fallback for unknown types
return {
"type": "string",
"description": f"Unknown type: {type_hint}",
"maxLength": self._security_constraints["string_max_length"],
}
def _generate_dataclass_schema(self, dataclass_type: Type) -> Dict[str, Any]:
"""Generate schema for dataclass types."""
try:
properties = {}
required = []
for field in fields(dataclass_type):
field_schema = self._generate_type_schema(field.type)
# Add field documentation if available
if hasattr(field, "metadata") and "description" in field.metadata:
field_schema["description"] = field.metadata["description"]
properties[field.name] = field_schema
# Check if field is required (no default value)
if (
field.default
== field.default_factory
== dataclass_type.__dataclass_fields__[field.name].default
):
required.append(field.name)
return {
"type": "object",
"properties": properties,
"required": required,
"additionalProperties": False,
"title": dataclass_type.__name__,
}
except Exception as e:
# Fallback for complex dataclasses
return {
"type": "object",
"description": f"Dataclass: {dataclass_type.__name__}",
"additionalProperties": True,
}
def _parse_docstring(self, docstring: str) -> tuple[str, Dict[str, str]]:
"""Parse function docstring to extract description and parameter documentation."""
if not docstring:
return "", {}
lines = docstring.strip().split("\n")
description_lines = []
param_docs = {}
current_section = "description"
current_param = None
for line in lines:
line = line.strip()
# Check for parameter documentation
param_match = re.match(r"(\w+):\s*(.*)", line)
if param_match:
param_name, param_desc = param_match.groups()
param_docs[param_name] = param_desc
current_section = "params"
current_param = param_name
continue
# Continue parameter description
if (
current_section == "params"
and current_param
and line
and not line.startswith("Args:")
):
if current_param in param_docs:
param_docs[current_param] += " " + line
continue
# Check for Args: section
if line.startswith("Args:") or line.startswith("Arguments:"):
current_section = "params"
continue
# Check for Returns: section
if line.startswith("Returns:") or line.startswith("Return:"):
current_section = "returns"
continue
# Add to description if we're still in description section
if current_section == "description" and line:
description_lines.append(line)
description = " ".join(description_lines)
return description, param_docs
def _apply_security_constraints(self, schema: Dict[str, Any]) -> None:
"""Apply comprehensive security constraints to generated schema."""
def apply_constraints_recursive(obj: Dict[str, Any]) -> None:
for key, value in obj.items():
if isinstance(value, dict):
# Apply string length limits
if value.get("type") == "string" and "maxLength" not in value:
value["maxLength"] = self._security_constraints[
"string_max_length"
]
# Apply array size limits
if value.get("type") == "array" and "maxItems" not in value:
value["maxItems"] = self._security_constraints[
"array_max_items"
]
# Apply object property limits
if value.get("type") == "object" and "maxProperties" not in value:
value["maxProperties"] = self._security_constraints[
"object_max_properties"
]
# Apply number range limits
if value.get("type") == "number":
if "minimum" not in value:
value["minimum"] = self._security_constraints[
"number_minimum"
]
if "maximum" not in value:
value["maximum"] = self._security_constraints[
"number_maximum"
]
# Recurse into nested objects
apply_constraints_recursive(value)
apply_constraints_recursive(schema)
def generate_response_schema(self, return_type: Type) -> Dict[str, Any]:
"""Generate schema for function return type."""
return self._generate_type_schema(return_type)
def validate_schema(self, schema: Dict[str, Any]) -> bool:
"""Validate that generated schema is well-formed JSON Schema."""
required_fields = ["type"]
try:
# Check basic structure
if not isinstance(schema, dict):
return False
# Validate against JSON Schema meta-schema (simplified)
if "type" in schema:
valid_types = ["null", "boolean", "object", "array", "number", "string"]
if (
isinstance(schema["type"], str)
and schema["type"] not in valid_types
):
return False
# Check for recursive structure
json.dumps(schema) # This will fail if there are circular references
return True
except Exception:
return False
# Global schema generator instance
_schema_generator = TypeSchemaGenerator()
def generate_tool_schema(func: Callable) -> Dict[str, Any]:
"""Generate JSON schema for FastMCP tool function."""
return _schema_generator.generate_tool_schema(func)
def generate_response_schema(return_type: Type) -> Dict[str, Any]:
"""Generate schema for function return type."""
return _schema_generator.generate_response_schema(return_type)
def validate_tool_schema(schema: Dict[str, Any]) -> bool:
"""Validate that tool schema is well-formed."""
return _schema_generator.validate_schema(schema)