Skip to main content
Glama

Katamari MCP Server

by ciphernaut
validator.py18.9 kB
""" Package Validation System for Katamari MCP Provides security and compatibility validation for capabilities and packages. """ import ast import hashlib import json import re from pathlib import Path from typing import Any, Dict, List, Optional, Set, Tuple from dataclasses import dataclass from enum import Enum from pydantic import BaseModel class ValidationSeverity(Enum): """Severity levels for validation issues.""" INFO = "info" WARNING = "warning" ERROR = "error" CRITICAL = "critical" class ValidationCategory(Enum): """Categories of validation checks.""" SECURITY = "security" COMPATIBILITY = "compatibility" PERFORMANCE = "performance" DEPENDENCIES = "dependencies" CODE_QUALITY = "code_quality" @dataclass class ValidationIssue: """A validation issue found during package analysis.""" severity: ValidationSeverity category: ValidationCategory message: str file_path: Optional[str] = None line_number: Optional[int] = None suggestion: Optional[str] = None @dataclass class ValidationResult: """Result of package validation.""" is_valid: bool issues: List[ValidationIssue] package_hash: str metadata: Dict[str, Any] def get_issues_by_severity(self, severity: ValidationSeverity) -> List[ValidationIssue]: """Get all issues of a specific severity.""" return [issue for issue in self.issues if issue.severity == severity] def get_issues_by_category(self, category: ValidationCategory) -> List[ValidationIssue]: """Get all issues of a specific category.""" return [issue for issue in self.issues if issue.category == category] def has_critical_issues(self) -> bool: """Check if there are any critical issues.""" return any(issue.severity == ValidationSeverity.CRITICAL for issue in self.issues) class SecurityValidator: """Validates security aspects of Python packages.""" # Dangerous imports and functions DANGEROUS_IMPORTS = { 'os.system', 'subprocess.call', 'subprocess.run', 'subprocess.Popen', 'eval', 'exec', 'compile', '__import__', 'open', 'file', 'socket.socket', 'urllib.request.urlopen', 'requests.post', 'shutil.rmtree', 'tempfile.mktemp', 'pickle.loads', 'marshal.loads' } DANGEROUS_FUNCTIONS = { 'eval', 'exec', 'compile', '__import__' } # Suspicious patterns SUSPICIOUS_PATTERNS = [ r'password\s*=\s*["\'][^"\']+["\']', r'secret\s*=\s*["\'][^"\']+["\']', r'api_key\s*=\s*["\'][^"\']+["\']', r'token\s*=\s*["\'][^"\']+["\']', r'exec\s*\(', r'eval\s*\(', r'subprocess\.', r'os\.system', r'__import__\s*\(' ] @classmethod def validate_file(cls, file_path: Path) -> List[ValidationIssue]: """Validate a single Python file for security issues.""" issues = [] try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() # Parse AST tree = ast.parse(content, filename=str(file_path)) # Check for dangerous imports for node in ast.walk(tree): if isinstance(node, ast.Import): for alias in node.names: full_name = alias.name if any(dangerous in full_name for dangerous in cls.DANGEROUS_IMPORTS): issues.append(ValidationIssue( severity=ValidationSeverity.WARNING, category=ValidationCategory.SECURITY, message=f"Potentially dangerous import: {full_name}", file_path=str(file_path), line_number=node.lineno, suggestion="Review if this import is necessary and consider safer alternatives" )) elif isinstance(node, ast.ImportFrom): if node.module: full_name = node.module if any(dangerous in full_name for dangerous in cls.DANGEROUS_IMPORTS): issues.append(ValidationIssue( severity=ValidationSeverity.WARNING, category=ValidationCategory.SECURITY, message=f"Potentially dangerous import from: {full_name}", file_path=str(file_path), line_number=node.lineno, suggestion="Review if this import is necessary and consider safer alternatives" )) # Check for dangerous function calls elif isinstance(node, ast.Call): if isinstance(node.func, ast.Name): func_name = node.func.id if func_name in cls.DANGEROUS_FUNCTIONS: issues.append(ValidationIssue( severity=ValidationSeverity.CRITICAL, category=ValidationCategory.SECURITY, message=f"Dangerous function call: {func_name}()", file_path=str(file_path), line_number=node.lineno, suggestion="Avoid using dynamic code execution functions" )) # Check for suspicious patterns in source code lines = content.split('\n') for line_num, line in enumerate(lines, 1): for pattern in cls.SUSPICIOUS_PATTERNS: if re.search(pattern, line, re.IGNORECASE): issues.append(ValidationIssue( severity=ValidationSeverity.WARNING, category=ValidationCategory.SECURITY, message=f"Suspicious pattern detected: {pattern}", file_path=str(file_path), line_number=line_num, suggestion="Review this line for potential security issues" )) except SyntaxError as e: issues.append(ValidationIssue( severity=ValidationSeverity.ERROR, category=ValidationCategory.CODE_QUALITY, message=f"Syntax error in file: {e}", file_path=str(file_path), line_number=e.lineno )) except Exception as e: issues.append(ValidationIssue( severity=ValidationSeverity.ERROR, category=ValidationCategory.CODE_QUALITY, message=f"Error analyzing file: {e}", file_path=str(file_path) )) return issues class CompatibilityValidator: """Validates compatibility aspects of packages.""" def __init__(self, target_python_version: str = "3.9"): self.target_python_version = target_python_version def validate_file(self, file_path: Path) -> List[ValidationIssue]: """Validate a single Python file for compatibility issues.""" issues = [] try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() tree = ast.parse(content, filename=str(file_path)) # Check for Python version compatibility for node in ast.walk(tree): # Check for type hints compatibility if isinstance(node, ast.AnnAssign) and node.annotation: self._check_type_annotation(node.annotation, file_path, issues) # Check for f-strings (Python 3.6+) if isinstance(node, ast.JoinedStr): if self.target_python_version < "3.6": issues.append(ValidationIssue( severity=ValidationSeverity.WARNING, category=ValidationCategory.COMPATIBILITY, message="f-strings require Python 3.6+", file_path=str(file_path), line_number=node.lineno, suggestion="Use .format() or % formatting for older Python versions" )) # Check for walrus operator (Python 3.8+) if isinstance(node, ast.NamedExpr): if self.target_python_version < "3.8": issues.append(ValidationIssue( severity=ValidationSeverity.WARNING, category=ValidationCategory.COMPATIBILITY, message="Walrus operator (:=) requires Python 3.8+", file_path=str(file_path), line_number=node.lineno, suggestion="Use traditional assignment for older Python versions" )) except Exception as e: issues.append(ValidationIssue( severity=ValidationSeverity.ERROR, category=ValidationCategory.CODE_QUALITY, message=f"Error analyzing file: {e}", file_path=str(file_path) )) return issues def _check_type_annotation(self, annotation: ast.AST, file_path: Path, issues: List[ValidationIssue]): """Check type annotation for compatibility issues.""" # Check for Union types (Python 3.10+ syntax) if isinstance(annotation, ast.BinOp) and isinstance(annotation.op, ast.BitOr): if self.target_python_version < "3.10": issues.append(ValidationIssue( severity=ValidationSeverity.INFO, category=ValidationCategory.COMPATIBILITY, message="Union type syntax (X | Y) requires Python 3.10+, use typing.Union instead", file_path=str(file_path), line_number=getattr(annotation, 'lineno', None), suggestion="Use typing.Union[X, Y] for older Python versions" )) class DependencyValidator: """Validates package dependencies.""" def __init__(self): self.known_safe_packages = { 'requests', 'urllib3', 'certifi', 'charset-normalizer', 'idna', 'pydantic', 'typing-extensions', 'click', 'colorama', 'pytest', 'pytest-asyncio', 'pytest-cov', 'coverage', 'black', 'ruff', 'mypy', 'flake8', 'isort' } def validate_dependencies(self, requirements_file: Path) -> List[ValidationIssue]: """Validate dependencies from requirements.txt or pyproject.toml.""" issues = [] if not requirements_file.exists(): return issues try: if requirements_file.name == 'requirements.txt': with open(requirements_file, 'r') as f: lines = f.readlines() for line_num, line in enumerate(lines, 1): line = line.strip() if line and not line.startswith('#'): package_name = line.split('==')[0].split('>=')[0].split('<=')[0].strip() self._check_package(package_name, requirements_file, line_num, issues) elif requirements_file.name == 'pyproject.toml': # Parse pyproject.toml for dependencies try: import tomllib except ImportError: try: import tomli as tomllib except ImportError: issues.append(ValidationIssue( severity=ValidationSeverity.WARNING, category=ValidationCategory.DEPENDENCIES, message="tomllib/tomli not available for parsing pyproject.toml", file_path=str(requirements_file), suggestion="Install tomli for Python < 3.11" )) return issues with open(requirements_file, 'rb') as f: data = tomllib.load(f) deps = [] if 'project' in data and 'dependencies' in data['project']: deps = data['project']['dependencies'] elif 'tool' in data and 'poetry' in data['tool']: deps = list(data['tool']['poetry'].get('dependencies', {}).keys()) if 'python' in deps: deps.remove('python') for dep in deps: package_name = dep.split('==')[0].split('>=')[0].split('<=')[0].strip() self._check_package(package_name, requirements_file, None, issues) except Exception as e: issues.append(ValidationIssue( severity=ValidationSeverity.ERROR, category=ValidationCategory.DEPENDENCIES, message=f"Error parsing dependencies: {e}", file_path=str(requirements_file) )) return issues def _check_package(self, package_name: str, file_path: Path, line_num: Optional[int], issues: List[ValidationIssue]): """Check a single package for security concerns.""" # Check for known problematic packages problematic_packages = { 'setuptools': "Older versions may have security vulnerabilities", 'pip': "Should not be a runtime dependency", 'wheel': "Should not be a runtime dependency" } if package_name.lower() in problematic_packages: issues.append(ValidationIssue( severity=ValidationSeverity.WARNING, category=ValidationCategory.DEPENDENCIES, message=f"Potentially problematic dependency: {package_name} - {problematic_packages[package_name.lower()]}", file_path=str(file_path), line_number=line_num, suggestion="Review if this dependency is necessary" )) class PackageValidator: """Main package validator that coordinates all validation checks.""" def __init__(self, target_python_version: str = "3.9"): self.security_validator = SecurityValidator() self.compatibility_validator = CompatibilityValidator(target_python_version) self.dependency_validator = DependencyValidator() def validate_package(self, package_path: Path) -> ValidationResult: """Validate a complete package directory.""" issues = [] metadata = {} # Find all Python files python_files = list(package_path.rglob("*.py")) # Validate each Python file for py_file in python_files: # Security validation security_issues = self.security_validator.validate_file(py_file) issues.extend(security_issues) # Compatibility validation compat_issues = self.compatibility_validator.validate_file(py_file) issues.extend(compat_issues) # Validate dependencies requirements_files = [ package_path / "requirements.txt", package_path / "pyproject.toml" ] for req_file in requirements_files: if req_file.exists(): dep_issues = self.dependency_validator.validate_dependencies(req_file) issues.extend(dep_issues) # Calculate package hash package_hash = self._calculate_package_hash(package_path) # Collect metadata metadata.update({ "python_files_count": len(python_files), "package_path": str(package_path), "validation_timestamp": str(Path().cwd()), "target_python_version": self.compatibility_validator.target_python_version }) # Determine if package is valid is_valid = not any(issue.severity == ValidationSeverity.CRITICAL for issue in issues) return ValidationResult( is_valid=is_valid, issues=issues, package_hash=package_hash, metadata=metadata ) def _calculate_package_hash(self, package_path: Path) -> str: """Calculate a hash of all Python files in the package.""" hash_md5 = hashlib.md5() for py_file in sorted(package_path.rglob("*.py")): try: with open(py_file, 'rb') as f: hash_md5.update(f.read()) except Exception: continue return hash_md5.hexdigest() def generate_report(self, result: ValidationResult) -> str: """Generate a human-readable validation report.""" report = [] report.append("=" * 60) report.append("PACKAGE VALIDATION REPORT") report.append("=" * 60) report.append(f"Package Valid: {'✅ YES' if result.is_valid else '❌ NO'}") report.append(f"Package Hash: {result.package_hash}") report.append(f"Total Issues: {len(result.issues)}") # Group issues by severity for severity in [ValidationSeverity.CRITICAL, ValidationSeverity.ERROR, ValidationSeverity.WARNING, ValidationSeverity.INFO]: issues = result.get_issues_by_severity(severity) if issues: report.append(f"\n{severity.value.upper()} ISSUES ({len(issues)}):") report.append("-" * 40) for issue in issues: location = f"{issue.file_path}:{issue.line_number}" if issue.file_path else "Unknown" report.append(f" • {issue.message}") report.append(f" Location: {location}") if issue.suggestion: report.append(f" Suggestion: {issue.suggestion}") # Summary by category report.append(f"\nISSUES BY CATEGORY:") report.append("-" * 40) for category in ValidationCategory: count = len(result.get_issues_by_category(category)) if count > 0: report.append(f" {category.value}: {count}") report.append("\n" + "=" * 60) return "\n".join(report)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ciphernaut/katamari-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server