"""
YAML parser and validator for MCP-Ables tool definitions
"""
import yaml
from typing import Any, Dict, List, Optional
from dataclasses import dataclass
from pathlib import Path
@dataclass
class ArgumentSpec:
"""Specification for a single argument"""
name: str
type: str
description: str
required: bool
default: Optional[Any] = None
def validate_type(self, value: Any) -> Any:
"""Validate and convert value to the specified type"""
type_map = {
'string': str,
'int': int,
'float': float,
'bool': bool
}
if self.type not in type_map:
raise ValueError(f"Unknown type '{self.type}' for argument '{self.name}'")
try:
return type_map[self.type](value)
except (ValueError, TypeError) as e:
raise ValueError(f"Cannot convert '{value}' to {self.type} for argument '{self.name}': {e}")
@dataclass
class ToolSpec:
"""Complete specification for an MCP-Ables tool"""
name: str
description: str
run_kind: str
run_cmd: str
args: Dict[str, ArgumentSpec]
class YAMLParser:
"""Parser and validator for MCP-Ables YAML files"""
SUPPORTED_TYPES = {'string', 'int', 'float', 'bool'}
SUPPORTED_RUN_KINDS = {'shell'}
@staticmethod
def parse_file(file_path: str) -> List[ToolSpec]:
"""
Load and parse a YAML file into a list of ToolSpecs
Supports both single-tool and multi-tool formats:
- Single-tool: {name, description, run, args}
- Multi-tool: {tools: [{name, description, run, args}, ...]}
Returns a list of ToolSpecs (even for single-tool files)
"""
try:
with open(file_path, 'r') as f:
data = yaml.safe_load(f)
except FileNotFoundError:
raise ValueError(f"YAML file not found: {file_path}")
except yaml.YAMLError as e:
raise ValueError(f"Invalid YAML syntax: {e}")
if not isinstance(data, dict):
raise ValueError(f"YAML file must contain a dictionary, got {type(data).__name__}")
# Auto-detect format
if 'tools' in data:
# Multi-tool format
tools_list = data['tools']
if not isinstance(tools_list, list):
raise ValueError("Field 'tools' must be a list of tool definitions")
if not tools_list:
raise ValueError("Field 'tools' cannot be empty")
tool_specs = []
for idx, tool_data in enumerate(tools_list):
if not isinstance(tool_data, dict):
raise ValueError(f"Tool at index {idx} must be a dictionary")
try:
tool_specs.append(YAMLParser.parse_dict(tool_data))
except ValueError as e:
raise ValueError(f"Error in tool at index {idx}: {e}")
return tool_specs
else:
# Single-tool format (backward compatible)
return [YAMLParser.parse_dict(data)]
@staticmethod
def parse_dict(data: Dict[str, Any]) -> ToolSpec:
"""Parse a dictionary into a ToolSpec"""
# Validate required top-level fields
YAMLParser._validate_required_fields(data, ['name', 'description', 'run', 'args'])
# Validate name and description
name = data['name']
description = data['description']
if not isinstance(name, str) or not name.strip():
raise ValueError("Field 'name' must be a non-empty string")
if not isinstance(description, str) or not description.strip():
raise ValueError("Field 'description' must be a non-empty string")
# Validate run section
run_section = data['run']
if not isinstance(run_section, dict):
raise ValueError("Field 'run' must be a dictionary")
YAMLParser._validate_required_fields(run_section, ['kind', 'cmd'], context='run')
run_kind = run_section['kind']
run_cmd = run_section['cmd']
if run_kind not in YAMLParser.SUPPORTED_RUN_KINDS:
raise ValueError(f"Unsupported run kind '{run_kind}'. Supported: {YAMLParser.SUPPORTED_RUN_KINDS}")
if not isinstance(run_cmd, str) or not run_cmd.strip():
raise ValueError("Field 'run.cmd' must be a non-empty string")
# Parse and validate args
args_section = data['args']
if not isinstance(args_section, dict):
raise ValueError("Field 'args' must be a dictionary")
args = {}
for arg_name, arg_spec in args_section.items():
args[arg_name] = YAMLParser._parse_argument(arg_name, arg_spec)
return ToolSpec(
name=name,
description=description,
run_kind=run_kind,
run_cmd=run_cmd,
args=args
)
@staticmethod
def _parse_argument(name: str, spec: Dict[str, Any]) -> ArgumentSpec:
"""Parse a single argument specification"""
if not isinstance(spec, dict):
raise ValueError(f"Argument '{name}' must be a dictionary with type, description, etc.")
# Validate required fields for argument
YAMLParser._validate_required_fields(spec, ['type', 'description'], context=f"arg '{name}'")
arg_type = spec['type']
description = spec['description']
required = spec.get('required', True) # Default to required
default = spec.get('default')
# Validate type
if arg_type not in YAMLParser.SUPPORTED_TYPES:
raise ValueError(
f"Argument '{name}' has unsupported type '{arg_type}'. "
f"Supported: {YAMLParser.SUPPORTED_TYPES}"
)
# Validate description
if not isinstance(description, str) or not description.strip():
raise ValueError(f"Argument '{name}' description must be a non-empty string")
# Validate required field
if not isinstance(required, bool):
raise ValueError(f"Argument '{name}' field 'required' must be true or false")
# Validate default value
if default is not None:
if required:
raise ValueError(f"Argument '{name}' cannot have a default value if it is required")
# Validate that default matches the type
try:
ArgumentSpec(name, arg_type, description, required, default).validate_type(default)
except ValueError as e:
raise ValueError(f"Default value for argument '{name}' is invalid: {e}")
return ArgumentSpec(
name=name,
type=arg_type,
description=description,
required=required,
default=default
)
@staticmethod
def _validate_required_fields(data: Dict[str, Any], fields: List[str], context: str = 'root'):
"""Validate that required fields are present in a dictionary"""
missing = [f for f in fields if f not in data]
if missing:
raise ValueError(f"Missing required fields in {context}: {', '.join(missing)}")
@staticmethod
def parse_directory(dir_path: str) -> List[ToolSpec]:
"""
Recursively scan a directory for YAML files and parse all tools
Args:
dir_path: Path to directory to scan
Returns:
List of all ToolSpecs from all YAML files found
Raises:
ValueError: If directory doesn't exist or no YAML files found
"""
path = Path(dir_path)
if not path.exists():
raise ValueError(f"Directory not found: {dir_path}")
if not path.is_dir():
raise ValueError(f"Not a directory: {dir_path}")
# Find all YAML files recursively
yaml_files = list(path.rglob("*.yaml")) + list(path.rglob("*.yml"))
if not yaml_files:
raise ValueError(f"No YAML files (*.yaml or *.yml) found in directory: {dir_path}")
# Parse all files and collect tools
all_tools = []
errors = []
for yaml_file in sorted(yaml_files):
try:
tools = YAMLParser.parse_file(str(yaml_file))
all_tools.extend(tools)
except ValueError as e:
errors.append(f"{yaml_file.name}: {e}")
# Report any errors
if errors:
error_msg = "Errors parsing YAML files:\n" + "\n".join(f" - {e}" for e in errors)
raise ValueError(error_msg)
# Check for duplicate tool names
tool_names = [t.name for t in all_tools]
duplicates = [name for name in tool_names if tool_names.count(name) > 1]
if duplicates:
unique_dupes = list(set(duplicates))
raise ValueError(f"Duplicate tool names found: {', '.join(unique_dupes)}")
return all_tools