validator.py•18.9 kB
"""
Package Validation System for Katamari MCP
Provides security and compatibility validation for capabilities and packages.
"""
import ast
import hashlib
import json
import re
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Tuple
from dataclasses import dataclass
from enum import Enum
from pydantic import BaseModel
class ValidationSeverity(Enum):
"""Severity levels for validation issues."""
INFO = "info"
WARNING = "warning"
ERROR = "error"
CRITICAL = "critical"
class ValidationCategory(Enum):
"""Categories of validation checks."""
SECURITY = "security"
COMPATIBILITY = "compatibility"
PERFORMANCE = "performance"
DEPENDENCIES = "dependencies"
CODE_QUALITY = "code_quality"
@dataclass
class ValidationIssue:
"""A validation issue found during package analysis."""
severity: ValidationSeverity
category: ValidationCategory
message: str
file_path: Optional[str] = None
line_number: Optional[int] = None
suggestion: Optional[str] = None
@dataclass
class ValidationResult:
"""Result of package validation."""
is_valid: bool
issues: List[ValidationIssue]
package_hash: str
metadata: Dict[str, Any]
def get_issues_by_severity(self, severity: ValidationSeverity) -> List[ValidationIssue]:
"""Get all issues of a specific severity."""
return [issue for issue in self.issues if issue.severity == severity]
def get_issues_by_category(self, category: ValidationCategory) -> List[ValidationIssue]:
"""Get all issues of a specific category."""
return [issue for issue in self.issues if issue.category == category]
def has_critical_issues(self) -> bool:
"""Check if there are any critical issues."""
return any(issue.severity == ValidationSeverity.CRITICAL for issue in self.issues)
class SecurityValidator:
"""Validates security aspects of Python packages."""
# Dangerous imports and functions
DANGEROUS_IMPORTS = {
'os.system', 'subprocess.call', 'subprocess.run', 'subprocess.Popen',
'eval', 'exec', 'compile', '__import__', 'open', 'file',
'socket.socket', 'urllib.request.urlopen', 'requests.post',
'shutil.rmtree', 'tempfile.mktemp', 'pickle.loads', 'marshal.loads'
}
DANGEROUS_FUNCTIONS = {
'eval', 'exec', 'compile', '__import__'
}
# Suspicious patterns
SUSPICIOUS_PATTERNS = [
r'password\s*=\s*["\'][^"\']+["\']',
r'secret\s*=\s*["\'][^"\']+["\']',
r'api_key\s*=\s*["\'][^"\']+["\']',
r'token\s*=\s*["\'][^"\']+["\']',
r'exec\s*\(',
r'eval\s*\(',
r'subprocess\.',
r'os\.system',
r'__import__\s*\('
]
@classmethod
def validate_file(cls, file_path: Path) -> List[ValidationIssue]:
"""Validate a single Python file for security issues."""
issues = []
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Parse AST
tree = ast.parse(content, filename=str(file_path))
# Check for dangerous imports
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
full_name = alias.name
if any(dangerous in full_name for dangerous in cls.DANGEROUS_IMPORTS):
issues.append(ValidationIssue(
severity=ValidationSeverity.WARNING,
category=ValidationCategory.SECURITY,
message=f"Potentially dangerous import: {full_name}",
file_path=str(file_path),
line_number=node.lineno,
suggestion="Review if this import is necessary and consider safer alternatives"
))
elif isinstance(node, ast.ImportFrom):
if node.module:
full_name = node.module
if any(dangerous in full_name for dangerous in cls.DANGEROUS_IMPORTS):
issues.append(ValidationIssue(
severity=ValidationSeverity.WARNING,
category=ValidationCategory.SECURITY,
message=f"Potentially dangerous import from: {full_name}",
file_path=str(file_path),
line_number=node.lineno,
suggestion="Review if this import is necessary and consider safer alternatives"
))
# Check for dangerous function calls
elif isinstance(node, ast.Call):
if isinstance(node.func, ast.Name):
func_name = node.func.id
if func_name in cls.DANGEROUS_FUNCTIONS:
issues.append(ValidationIssue(
severity=ValidationSeverity.CRITICAL,
category=ValidationCategory.SECURITY,
message=f"Dangerous function call: {func_name}()",
file_path=str(file_path),
line_number=node.lineno,
suggestion="Avoid using dynamic code execution functions"
))
# Check for suspicious patterns in source code
lines = content.split('\n')
for line_num, line in enumerate(lines, 1):
for pattern in cls.SUSPICIOUS_PATTERNS:
if re.search(pattern, line, re.IGNORECASE):
issues.append(ValidationIssue(
severity=ValidationSeverity.WARNING,
category=ValidationCategory.SECURITY,
message=f"Suspicious pattern detected: {pattern}",
file_path=str(file_path),
line_number=line_num,
suggestion="Review this line for potential security issues"
))
except SyntaxError as e:
issues.append(ValidationIssue(
severity=ValidationSeverity.ERROR,
category=ValidationCategory.CODE_QUALITY,
message=f"Syntax error in file: {e}",
file_path=str(file_path),
line_number=e.lineno
))
except Exception as e:
issues.append(ValidationIssue(
severity=ValidationSeverity.ERROR,
category=ValidationCategory.CODE_QUALITY,
message=f"Error analyzing file: {e}",
file_path=str(file_path)
))
return issues
class CompatibilityValidator:
"""Validates compatibility aspects of packages."""
def __init__(self, target_python_version: str = "3.9"):
self.target_python_version = target_python_version
def validate_file(self, file_path: Path) -> List[ValidationIssue]:
"""Validate a single Python file for compatibility issues."""
issues = []
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
tree = ast.parse(content, filename=str(file_path))
# Check for Python version compatibility
for node in ast.walk(tree):
# Check for type hints compatibility
if isinstance(node, ast.AnnAssign) and node.annotation:
self._check_type_annotation(node.annotation, file_path, issues)
# Check for f-strings (Python 3.6+)
if isinstance(node, ast.JoinedStr):
if self.target_python_version < "3.6":
issues.append(ValidationIssue(
severity=ValidationSeverity.WARNING,
category=ValidationCategory.COMPATIBILITY,
message="f-strings require Python 3.6+",
file_path=str(file_path),
line_number=node.lineno,
suggestion="Use .format() or % formatting for older Python versions"
))
# Check for walrus operator (Python 3.8+)
if isinstance(node, ast.NamedExpr):
if self.target_python_version < "3.8":
issues.append(ValidationIssue(
severity=ValidationSeverity.WARNING,
category=ValidationCategory.COMPATIBILITY,
message="Walrus operator (:=) requires Python 3.8+",
file_path=str(file_path),
line_number=node.lineno,
suggestion="Use traditional assignment for older Python versions"
))
except Exception as e:
issues.append(ValidationIssue(
severity=ValidationSeverity.ERROR,
category=ValidationCategory.CODE_QUALITY,
message=f"Error analyzing file: {e}",
file_path=str(file_path)
))
return issues
def _check_type_annotation(self, annotation: ast.AST, file_path: Path, issues: List[ValidationIssue]):
"""Check type annotation for compatibility issues."""
# Check for Union types (Python 3.10+ syntax)
if isinstance(annotation, ast.BinOp) and isinstance(annotation.op, ast.BitOr):
if self.target_python_version < "3.10":
issues.append(ValidationIssue(
severity=ValidationSeverity.INFO,
category=ValidationCategory.COMPATIBILITY,
message="Union type syntax (X | Y) requires Python 3.10+, use typing.Union instead",
file_path=str(file_path),
line_number=getattr(annotation, 'lineno', None),
suggestion="Use typing.Union[X, Y] for older Python versions"
))
class DependencyValidator:
"""Validates package dependencies."""
def __init__(self):
self.known_safe_packages = {
'requests', 'urllib3', 'certifi', 'charset-normalizer', 'idna',
'pydantic', 'typing-extensions', 'click', 'colorama',
'pytest', 'pytest-asyncio', 'pytest-cov', 'coverage',
'black', 'ruff', 'mypy', 'flake8', 'isort'
}
def validate_dependencies(self, requirements_file: Path) -> List[ValidationIssue]:
"""Validate dependencies from requirements.txt or pyproject.toml."""
issues = []
if not requirements_file.exists():
return issues
try:
if requirements_file.name == 'requirements.txt':
with open(requirements_file, 'r') as f:
lines = f.readlines()
for line_num, line in enumerate(lines, 1):
line = line.strip()
if line and not line.startswith('#'):
package_name = line.split('==')[0].split('>=')[0].split('<=')[0].strip()
self._check_package(package_name, requirements_file, line_num, issues)
elif requirements_file.name == 'pyproject.toml':
# Parse pyproject.toml for dependencies
try:
import tomllib
except ImportError:
try:
import tomli as tomllib
except ImportError:
issues.append(ValidationIssue(
severity=ValidationSeverity.WARNING,
category=ValidationCategory.DEPENDENCIES,
message="tomllib/tomli not available for parsing pyproject.toml",
file_path=str(requirements_file),
suggestion="Install tomli for Python < 3.11"
))
return issues
with open(requirements_file, 'rb') as f:
data = tomllib.load(f)
deps = []
if 'project' in data and 'dependencies' in data['project']:
deps = data['project']['dependencies']
elif 'tool' in data and 'poetry' in data['tool']:
deps = list(data['tool']['poetry'].get('dependencies', {}).keys())
if 'python' in deps:
deps.remove('python')
for dep in deps:
package_name = dep.split('==')[0].split('>=')[0].split('<=')[0].strip()
self._check_package(package_name, requirements_file, None, issues)
except Exception as e:
issues.append(ValidationIssue(
severity=ValidationSeverity.ERROR,
category=ValidationCategory.DEPENDENCIES,
message=f"Error parsing dependencies: {e}",
file_path=str(requirements_file)
))
return issues
def _check_package(self, package_name: str, file_path: Path, line_num: Optional[int], issues: List[ValidationIssue]):
"""Check a single package for security concerns."""
# Check for known problematic packages
problematic_packages = {
'setuptools': "Older versions may have security vulnerabilities",
'pip': "Should not be a runtime dependency",
'wheel': "Should not be a runtime dependency"
}
if package_name.lower() in problematic_packages:
issues.append(ValidationIssue(
severity=ValidationSeverity.WARNING,
category=ValidationCategory.DEPENDENCIES,
message=f"Potentially problematic dependency: {package_name} - {problematic_packages[package_name.lower()]}",
file_path=str(file_path),
line_number=line_num,
suggestion="Review if this dependency is necessary"
))
class PackageValidator:
"""Main package validator that coordinates all validation checks."""
def __init__(self, target_python_version: str = "3.9"):
self.security_validator = SecurityValidator()
self.compatibility_validator = CompatibilityValidator(target_python_version)
self.dependency_validator = DependencyValidator()
def validate_package(self, package_path: Path) -> ValidationResult:
"""Validate a complete package directory."""
issues = []
metadata = {}
# Find all Python files
python_files = list(package_path.rglob("*.py"))
# Validate each Python file
for py_file in python_files:
# Security validation
security_issues = self.security_validator.validate_file(py_file)
issues.extend(security_issues)
# Compatibility validation
compat_issues = self.compatibility_validator.validate_file(py_file)
issues.extend(compat_issues)
# Validate dependencies
requirements_files = [
package_path / "requirements.txt",
package_path / "pyproject.toml"
]
for req_file in requirements_files:
if req_file.exists():
dep_issues = self.dependency_validator.validate_dependencies(req_file)
issues.extend(dep_issues)
# Calculate package hash
package_hash = self._calculate_package_hash(package_path)
# Collect metadata
metadata.update({
"python_files_count": len(python_files),
"package_path": str(package_path),
"validation_timestamp": str(Path().cwd()),
"target_python_version": self.compatibility_validator.target_python_version
})
# Determine if package is valid
is_valid = not any(issue.severity == ValidationSeverity.CRITICAL for issue in issues)
return ValidationResult(
is_valid=is_valid,
issues=issues,
package_hash=package_hash,
metadata=metadata
)
def _calculate_package_hash(self, package_path: Path) -> str:
"""Calculate a hash of all Python files in the package."""
hash_md5 = hashlib.md5()
for py_file in sorted(package_path.rglob("*.py")):
try:
with open(py_file, 'rb') as f:
hash_md5.update(f.read())
except Exception:
continue
return hash_md5.hexdigest()
def generate_report(self, result: ValidationResult) -> str:
"""Generate a human-readable validation report."""
report = []
report.append("=" * 60)
report.append("PACKAGE VALIDATION REPORT")
report.append("=" * 60)
report.append(f"Package Valid: {'✅ YES' if result.is_valid else '❌ NO'}")
report.append(f"Package Hash: {result.package_hash}")
report.append(f"Total Issues: {len(result.issues)}")
# Group issues by severity
for severity in [ValidationSeverity.CRITICAL, ValidationSeverity.ERROR,
ValidationSeverity.WARNING, ValidationSeverity.INFO]:
issues = result.get_issues_by_severity(severity)
if issues:
report.append(f"\n{severity.value.upper()} ISSUES ({len(issues)}):")
report.append("-" * 40)
for issue in issues:
location = f"{issue.file_path}:{issue.line_number}" if issue.file_path else "Unknown"
report.append(f" • {issue.message}")
report.append(f" Location: {location}")
if issue.suggestion:
report.append(f" Suggestion: {issue.suggestion}")
# Summary by category
report.append(f"\nISSUES BY CATEGORY:")
report.append("-" * 40)
for category in ValidationCategory:
count = len(result.get_issues_by_category(category))
if count > 0:
report.append(f" {category.value}: {count}")
report.append("\n" + "=" * 60)
return "\n".join(report)