Skip to main content
Glama
jolfr

Commit Helper MCP

by jolfr
validation_service.py14.4 kB
""" Centralized validation logic for all operations. This service provides consistent validation, sanitization, and security checks across the entire application. """ import logging import re from pathlib import Path from typing import Dict, Any, List, Optional from .commitizen_core import CommitzenCore from .gitpython_core import GitPythonCore from ..errors import ( ValidationError, RepositoryError, ConfigurationError, handle_validation_errors, ) logger = logging.getLogger(__name__) class ValidationService: """Centralized validation logic for all operations.""" @staticmethod @handle_validation_errors def validate_commit_message( message: str, commitizen_core: CommitzenCore ) -> Dict[str, Any]: """ Validate commit message with detailed feedback. Args: message: Commit message to validate commitizen_core: CommitzenCore instance for validation Returns: Dict containing validation results and feedback """ # Basic checks if not message or not message.strip(): raise ValidationError( "Commit message cannot be empty", validation_type="commit_message", invalid_value=message, ) # Length check length_valid = len(message) <= 1000 if not length_valid: raise ValidationError( f"Commit message too long ({len(message)} characters, max 1000)", validation_type="commit_message_length", invalid_value=message, expected_format="Maximum 1000 characters", ) # Format validation using Commitizen format_valid = commitizen_core.validate_message(message) # Extract commit type if possible commit_type = None if format_valid: # Try to extract type from conventional format match = re.match(r"^(\w+)(?:\([^)]+\))?:", message) if match: commit_type = match.group(1) # Get available types for feedback available_types = [t["value"] for t in commitizen_core.get_commit_types()] if not format_valid: raise ValidationError( "Invalid commit message format", validation_type="commit_message_format", invalid_value=message, expected_format="type(scope): subject", ) return { "is_valid": True, "message": message, "checks": { "not_empty": True, "format_valid": format_valid, "length_valid": length_valid, "commit_type": commit_type, "type_valid": commit_type in available_types if commit_type else None, }, "feedback": { "format": "Valid format", "length": f"Length: {len(message)} characters", "type": f"Type: {commit_type}" if commit_type else "No type detected", "available_types": available_types, }, } @staticmethod @handle_validation_errors def validate_repository_state(git_core: GitPythonCore) -> Dict[str, Any]: """ Validate repository is ready for operations. Args: git_core: GitPythonCore instance for repository checks Returns: Dict containing repository validation results """ # Get repository status status = git_core.get_repository_status() # Perform checks checks = { "is_git_repository": status.get("is_git_repository", False), "has_commits": status.get("head_commit") is not None, "has_staged_files": len(status.get("staged_files", [])) > 0, "has_unstaged_files": len(status.get("unstaged_files", [])) > 0, "has_untracked_files": len(status.get("untracked_files", [])) > 0, "staging_clean": status.get("staging_clean", True), } # Build feedback feedback = [] if not checks["is_git_repository"]: feedback.append("Not a git repository") raise RepositoryError( "Not a valid git repository", repo_path=str(git_core.repo_path) ) if not checks["has_staged_files"]: feedback.append("No files staged for commit") raise RepositoryError( "No files staged for commit", repo_path=str(git_core.repo_path) ) if not checks["has_commits"]: feedback.append("Repository has no commits yet") if checks["has_unstaged_files"]: feedback.append(f"{len(status['unstaged_files'])} unstaged files present") if checks["has_untracked_files"]: feedback.append(f"{len(status['untracked_files'])} untracked files present") # Determine overall readiness ready_for_commit = checks["is_git_repository"] and checks["has_staged_files"] return { "is_valid": ready_for_commit, "checks": checks, "feedback": feedback, "repository_status": status, } @staticmethod @handle_validation_errors def validate_file_paths(file_paths: List[str], repo_path: str) -> Dict[str, Any]: """ Validate file paths are safe and within repository. Args: file_paths: List of file paths to validate repo_path: Repository root path Returns: Dict containing validation results for each file """ repo_root = Path(repo_path).resolve() results = {"all_valid": True, "files": [], "invalid_count": 0, "valid_count": 0} for file_path in file_paths: file_result = { "path": file_path, "is_valid": False, "exists": False, "within_repo": False, "error": None, } try: # Check for empty path if not file_path or not file_path.strip(): file_result["error"] = "Empty file path" results["files"].append(file_result) results["all_valid"] = False results["invalid_count"] += 1 continue # Resolve path full_path = (repo_root / file_path).resolve() # Check if within repository if not str(full_path).startswith(str(repo_root)): file_result["error"] = "Path outside repository bounds" results["files"].append(file_result) results["all_valid"] = False results["invalid_count"] += 1 # Raise exception for path traversal attempts raise RepositoryError( f"Path traversal attempt detected: {file_path}", repo_path=str(repo_path), ) file_result["within_repo"] = True # Check if exists file_result["exists"] = full_path.exists() # Mark as valid if within repo (existence is optional) file_result["is_valid"] = True results["valid_count"] += 1 except RepositoryError: # Re-raise repository errors raise except Exception as e: file_result["error"] = str(e) results["all_valid"] = False results["invalid_count"] += 1 results["files"].append(file_result) return results @staticmethod def sanitize_inputs(**kwargs) -> Dict[str, Any]: """ Sanitize all inputs for security. Args: **kwargs: Input parameters to sanitize Returns: Dict containing sanitized inputs """ sanitized = {} for key, value in kwargs.items(): if value is None: sanitized[key] = None continue if isinstance(value, str): # Remove dangerous shell metacharacters dangerous_chars = ["`", "$", ";", "|", "&", ">", "<", "\x00"] clean_value = value for char in dangerous_chars: clean_value = clean_value.replace(char, "") # Remove excessive whitespace clean_value = re.sub(r"\s+", " ", clean_value.strip()) sanitized[key] = clean_value elif isinstance(value, list): # Recursively sanitize list items sanitized[key] = [ ValidationService.sanitize_inputs(item=item)["item"] if isinstance(item, (str, dict)) else item for item in value ] elif isinstance(value, dict): # Recursively sanitize dict values sanitized[key] = ValidationService.sanitize_inputs(**value) else: # Keep other types as-is sanitized[key] = value return sanitized @staticmethod @handle_validation_errors def validate_parameters( params: Dict[str, Any], schema: Dict[str, Any] ) -> Dict[str, Any]: """ Validate parameters against schema. Args: params: Parameters to validate schema: Schema defining required and optional parameters Returns: Dict containing validation results """ errors = [] warnings = [] # Check required parameters required = schema.get("required", []) for param in required: if param not in params or params[param] is None: errors.append(f"Missing required parameter: {param}") # If required parameters are missing, raise error if errors: raise ValidationError( f"Missing required parameters: {', '.join(errors)}", validation_type="parameter_validation", invalid_value=str(params), ) # Check parameter types properties = schema.get("properties", {}) type_errors = [] for param, value in params.items(): if param in properties: param_schema = properties[param] expected_type = param_schema.get("type") if expected_type: # Handle type validation if expected_type == "string" and not isinstance(value, str): type_errors.append( f"Parameter '{param}' should be string, got {type(value).__name__}" ) elif expected_type == "boolean" and not isinstance(value, bool): type_errors.append( f"Parameter '{param}' should be boolean, got {type(value).__name__}" ) elif expected_type == "integer" and not isinstance(value, int): type_errors.append( f"Parameter '{param}' should be integer, got {type(value).__name__}" ) elif expected_type == "array" and not isinstance(value, list): type_errors.append( f"Parameter '{param}' should be array, got {type(value).__name__}" ) elif expected_type == "object" and not isinstance(value, dict): type_errors.append( f"Parameter '{param}' should be object, got {type(value).__name__}" ) else: warnings.append(f"Unknown parameter: {param}") # If type errors exist, raise validation error if type_errors: raise ValidationError( f"Parameter type validation failed: {', '.join(type_errors)}", validation_type="parameter_type_validation", invalid_value=str(params), ) return { "is_valid": True, "warnings": warnings, "parameters": params, "schema": schema, } @staticmethod @handle_validation_errors def validate_commit_type( commit_type: str, available_types: List[str] ) -> Dict[str, Any]: """ Validate commit type against available types. Args: commit_type: Commit type to validate available_types: List of available commit types Returns: Dict containing validation result and suggestions """ if not commit_type: raise ValidationError( "Commit type cannot be empty", validation_type="commit_type", invalid_value="", expected_format=f"One of: {', '.join(available_types[:5])}", ) # Exact match if commit_type in available_types: return {"is_valid": True, "commit_type": commit_type, "match_type": "exact"} # Case-insensitive match lower_type = commit_type.lower() for available in available_types: if available.lower() == lower_type: return { "is_valid": True, "commit_type": available, "match_type": "case_insensitive", "original": commit_type, } # Find similar types suggestions = [] for available in available_types: if ( lower_type in available.lower() or available.lower() in lower_type or available.lower().startswith(lower_type[:3]) ): suggestions.append(available) # Raise validation error with suggestions raise ValidationError( f"Invalid commit type: {commit_type}", validation_type="commit_type", invalid_value=commit_type, expected_format=f"One of: {', '.join(available_types)}", )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jolfr/commit-helper-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server