"""Standards validation tools.
These tools validate code and configurations against Darwin platform standards.
"""
import re
from dataclasses import dataclass, field
from enum import Enum
from fastmcp import FastMCP, Context
from ..observability import get_tracer
router = FastMCP("validation-tools")
tracer = get_tracer(__name__)
class Severity(str, Enum):
"""Validation issue severity levels."""
ERROR = "error"
WARNING = "warning"
INFO = "info"
@dataclass
class ValidationIssue:
"""A single validation issue."""
severity: Severity
rule: str
message: str
line: int | None = None
column: int | None = None
suggestion: str | None = None
def to_dict(self) -> dict:
return {
"severity": self.severity.value,
"rule": self.rule,
"message": self.message,
"line": self.line,
"column": self.column,
"suggestion": self.suggestion,
}
@dataclass
class ValidationResult:
"""Result of a validation operation."""
valid: bool
errors: int = 0
warnings: int = 0
issues: list[ValidationIssue] = field(default_factory=list)
def to_dict(self) -> dict:
return {
"valid": self.valid,
"errors": self.errors,
"warnings": self.warnings,
"issues": [i.to_dict() for i in self.issues],
}
# =============================================================================
# Naming Convention Validation
# =============================================================================
# Azure resource naming patterns
AZURE_NAMING_PATTERNS = {
"resource_group": r"^rg-[a-z0-9-]+-[a-z]{2,4}-(dev|staging|prod)$",
"storage_account": r"^st[a-z0-9]{3,21}$",
"key_vault": r"^kv-[a-z0-9-]+-[a-z]{2,4}$",
"aks_cluster": r"^aks-[a-z0-9-]+-[a-z]{2,4}-(dev|staging|prod)$",
"container_registry": r"^cr[a-z0-9]{3,45}$",
"app_service": r"^app-[a-z0-9-]+-[a-z]{2,4}-(dev|staging|prod)$",
"function_app": r"^func-[a-z0-9-]+-[a-z]{2,4}-(dev|staging|prod)$",
"virtual_network": r"^vnet-[a-z0-9-]+-[a-z]{2,4}-(dev|staging|prod)$",
"subnet": r"^snet-[a-z0-9-]+$",
"network_security_group": r"^nsg-[a-z0-9-]+$",
"managed_identity": r"^id-[a-z0-9-]+-[a-z]{2,4}$",
}
@router.tool()
async def validate_azure_resource_name(
resource_name: str,
resource_type: str,
ctx: Context
) -> dict:
"""Validate an Azure resource name against naming conventions.
Checks if the given resource name follows the Darwin platform
naming conventions for Azure resources.
Args:
resource_name: The resource name to validate
resource_type: Type of Azure resource (e.g., "resource_group", "storage_account")
ctx: MCP context for logging
Returns:
ValidationResult with any issues found.
Example:
>>> result = await validate_azure_resource_name(
... "rg-myapp-eus-dev",
... "resource_group"
... )
>>> print(result["valid"])
True
"""
with tracer.start_as_current_span("tool.validate_azure_resource_name") as span:
span.set_attribute("resource.name", resource_name)
span.set_attribute("resource.type", resource_type)
await ctx.info(f"Validating {resource_type} name: {resource_name}")
issues = []
# Check if resource type is known
if resource_type not in AZURE_NAMING_PATTERNS:
issues.append(ValidationIssue(
severity=Severity.WARNING,
rule="known-resource-type",
message=f"Unknown resource type: {resource_type}",
suggestion=f"Known types: {', '.join(AZURE_NAMING_PATTERNS.keys())}"
))
else:
pattern = AZURE_NAMING_PATTERNS[resource_type]
if not re.match(pattern, resource_name):
issues.append(ValidationIssue(
severity=Severity.ERROR,
rule="naming-convention",
message=f"Name '{resource_name}' does not match pattern for {resource_type}",
suggestion=f"Expected pattern: {pattern}"
))
# Check length constraints
if len(resource_name) > 63:
issues.append(ValidationIssue(
severity=Severity.ERROR,
rule="max-length",
message=f"Name exceeds maximum length of 63 characters ({len(resource_name)})"
))
# Check for invalid characters
if not re.match(r"^[a-z0-9-]+$", resource_name):
issues.append(ValidationIssue(
severity=Severity.ERROR,
rule="valid-characters",
message="Name contains invalid characters",
suggestion="Use only lowercase letters, numbers, and hyphens"
))
result = ValidationResult(
valid=not any(i.severity == Severity.ERROR for i in issues),
errors=sum(1 for i in issues if i.severity == Severity.ERROR),
warnings=sum(1 for i in issues if i.severity == Severity.WARNING),
issues=issues
)
span.set_attribute("validation.valid", result.valid)
span.set_attribute("validation.errors", result.errors)
return result.to_dict()
# =============================================================================
# MCP Tool Validation
# =============================================================================
@router.tool()
async def validate_mcp_tool_definition(
tool_name: str,
tool_description: str,
parameters: dict,
ctx: Context
) -> dict:
"""Validate an MCP tool definition against standards.
Checks if the tool follows Darwin platform MCP tool design standards.
Args:
tool_name: Name of the tool
tool_description: Tool docstring/description
parameters: Tool parameters schema (JSON Schema format)
ctx: MCP context
Returns:
ValidationResult with any issues found.
"""
with tracer.start_as_current_span("tool.validate_mcp_tool_definition") as span:
span.set_attribute("tool.name", tool_name)
await ctx.info(f"Validating MCP tool: {tool_name}")
issues = []
# Validate tool name
if not re.match(r"^[a-z][a-z0-9_]*$", tool_name):
issues.append(ValidationIssue(
severity=Severity.ERROR,
rule="tool-name-format",
message="Tool name must be lowercase with underscores",
suggestion="Use snake_case: e.g., 'terraform_plan'"
))
if len(tool_name) > 64:
issues.append(ValidationIssue(
severity=Severity.ERROR,
rule="tool-name-length",
message=f"Tool name exceeds 64 characters ({len(tool_name)})"
))
# Validate description
if not tool_description:
issues.append(ValidationIssue(
severity=Severity.ERROR,
rule="description-required",
message="Tool must have a description"
))
elif len(tool_description) < 20:
issues.append(ValidationIssue(
severity=Severity.WARNING,
rule="description-length",
message="Description should be more detailed (at least 20 chars)"
))
# Check for docstring sections
if tool_description:
if "Args:" not in tool_description:
issues.append(ValidationIssue(
severity=Severity.WARNING,
rule="docstring-args",
message="Description should include Args section"
))
if "Returns:" not in tool_description:
issues.append(ValidationIssue(
severity=Severity.WARNING,
rule="docstring-returns",
message="Description should include Returns section"
))
# Validate parameters
if parameters:
if "properties" in parameters:
for param_name, param_schema in parameters["properties"].items():
# Check parameter description
if "description" not in param_schema:
issues.append(ValidationIssue(
severity=Severity.WARNING,
rule="param-description",
message=f"Parameter '{param_name}' lacks description"
))
# Check parameter type
if "type" not in param_schema:
issues.append(ValidationIssue(
severity=Severity.ERROR,
rule="param-type",
message=f"Parameter '{param_name}' lacks type definition"
))
result = ValidationResult(
valid=not any(i.severity == Severity.ERROR for i in issues),
errors=sum(1 for i in issues if i.severity == Severity.ERROR),
warnings=sum(1 for i in issues if i.severity == Severity.WARNING),
issues=issues
)
return result.to_dict()
# =============================================================================
# Agent Card Validation
# =============================================================================
@router.tool()
async def validate_agent_card(
agent_card: dict,
ctx: Context
) -> dict:
"""Validate an Agent Card against A2A protocol standards.
Checks if the agent card follows the Darwin platform A2A standards
and contains all required fields.
Args:
agent_card: The agent card JSON to validate
ctx: MCP context
Returns:
ValidationResult with any issues found.
"""
with tracer.start_as_current_span("tool.validate_agent_card") as span:
agent_name = agent_card.get("name", "unknown")
span.set_attribute("agent.name", agent_name)
await ctx.info(f"Validating agent card: {agent_name}")
issues = []
# Required fields
required_fields = ["name", "description", "url", "version", "capabilities"]
for field_name in required_fields:
if field_name not in agent_card:
issues.append(ValidationIssue(
severity=Severity.ERROR,
rule="required-field",
message=f"Missing required field: {field_name}"
))
# Validate name format
if "name" in agent_card:
name = agent_card["name"]
if not re.match(r"^[a-z][a-z0-9-]*$", name):
issues.append(ValidationIssue(
severity=Severity.ERROR,
rule="agent-name-format",
message="Agent name must be lowercase with hyphens",
suggestion="Use kebab-case: e.g., 'infrastructure-agent'"
))
# Validate version format (semver)
if "version" in agent_card:
version = agent_card["version"]
if not re.match(r"^\d+\.\d+\.\d+$", version):
issues.append(ValidationIssue(
severity=Severity.ERROR,
rule="version-format",
message="Version must follow semver format (X.Y.Z)",
suggestion="Example: 1.0.0"
))
# Validate URL format
if "url" in agent_card:
url = agent_card["url"]
if not url.startswith(("http://", "https://")):
issues.append(ValidationIssue(
severity=Severity.ERROR,
rule="url-format",
message="URL must be a valid HTTP(S) URL"
))
# Validate capabilities
if "capabilities" in agent_card:
caps = agent_card["capabilities"]
if not isinstance(caps, dict):
issues.append(ValidationIssue(
severity=Severity.ERROR,
rule="capabilities-format",
message="Capabilities must be an object"
))
else:
# Check for streaming capability documentation
if caps.get("streaming") and "pushNotifications" not in caps:
issues.append(ValidationIssue(
severity=Severity.WARNING,
rule="streaming-push",
message="Streaming agents should document pushNotifications capability"
))
# Validate skills if present
if "skills" in agent_card:
for skill in agent_card["skills"]:
if "id" not in skill:
issues.append(ValidationIssue(
severity=Severity.ERROR,
rule="skill-id-required",
message="Each skill must have an 'id' field"
))
if "name" not in skill:
issues.append(ValidationIssue(
severity=Severity.ERROR,
rule="skill-name-required",
message="Each skill must have a 'name' field"
))
result = ValidationResult(
valid=not any(i.severity == Severity.ERROR for i in issues),
errors=sum(1 for i in issues if i.severity == Severity.ERROR),
warnings=sum(1 for i in issues if i.severity == Severity.WARNING),
issues=issues
)
return result.to_dict()