"""
Centralized validation logic for all operations.
This service provides consistent validation, sanitization, and security
checks across the entire application.
"""
import logging
import re
from pathlib import Path
from typing import Dict, Any, List, Optional
from .commitizen_core import CommitzenCore
from .gitpython_core import GitPythonCore
from ..errors import (
ValidationError,
RepositoryError,
ConfigurationError,
handle_validation_errors,
)
logger = logging.getLogger(__name__)
class ValidationService:
"""Centralized validation logic for all operations."""
@staticmethod
@handle_validation_errors
def validate_commit_message(
message: str, commitizen_core: CommitzenCore
) -> Dict[str, Any]:
"""
Validate commit message with detailed feedback.
Args:
message: Commit message to validate
commitizen_core: CommitzenCore instance for validation
Returns:
Dict containing validation results and feedback
"""
# Basic checks
if not message or not message.strip():
raise ValidationError(
"Commit message cannot be empty",
validation_type="commit_message",
invalid_value=message,
)
# Length check
length_valid = len(message) <= 1000
if not length_valid:
raise ValidationError(
f"Commit message too long ({len(message)} characters, max 1000)",
validation_type="commit_message_length",
invalid_value=message,
expected_format="Maximum 1000 characters",
)
# Format validation using Commitizen
format_valid = commitizen_core.validate_message(message)
# Extract commit type if possible
commit_type = None
if format_valid:
# Try to extract type from conventional format
match = re.match(r"^(\w+)(?:\([^)]+\))?:", message)
if match:
commit_type = match.group(1)
# Get available types for feedback
available_types = [t["value"] for t in commitizen_core.get_commit_types()]
if not format_valid:
raise ValidationError(
"Invalid commit message format",
validation_type="commit_message_format",
invalid_value=message,
expected_format="type(scope): subject",
)
return {
"is_valid": True,
"message": message,
"checks": {
"not_empty": True,
"format_valid": format_valid,
"length_valid": length_valid,
"commit_type": commit_type,
"type_valid": commit_type in available_types if commit_type else None,
},
"feedback": {
"format": "Valid format",
"length": f"Length: {len(message)} characters",
"type": f"Type: {commit_type}" if commit_type else "No type detected",
"available_types": available_types,
},
}
@staticmethod
@handle_validation_errors
def validate_repository_state(git_core: GitPythonCore) -> Dict[str, Any]:
"""
Validate repository is ready for operations.
Args:
git_core: GitPythonCore instance for repository checks
Returns:
Dict containing repository validation results
"""
# Get repository status
status = git_core.get_repository_status()
# Perform checks
checks = {
"is_git_repository": status.get("is_git_repository", False),
"has_commits": status.get("head_commit") is not None,
"has_staged_files": len(status.get("staged_files", [])) > 0,
"has_unstaged_files": len(status.get("unstaged_files", [])) > 0,
"has_untracked_files": len(status.get("untracked_files", [])) > 0,
"staging_clean": status.get("staging_clean", True),
}
# Build feedback
feedback = []
if not checks["is_git_repository"]:
feedback.append("Not a git repository")
raise RepositoryError(
"Not a valid git repository", repo_path=str(git_core.repo_path)
)
if not checks["has_staged_files"]:
feedback.append("No files staged for commit")
raise RepositoryError(
"No files staged for commit", repo_path=str(git_core.repo_path)
)
if not checks["has_commits"]:
feedback.append("Repository has no commits yet")
if checks["has_unstaged_files"]:
feedback.append(f"{len(status['unstaged_files'])} unstaged files present")
if checks["has_untracked_files"]:
feedback.append(f"{len(status['untracked_files'])} untracked files present")
# Determine overall readiness
ready_for_commit = checks["is_git_repository"] and checks["has_staged_files"]
return {
"is_valid": ready_for_commit,
"checks": checks,
"feedback": feedback,
"repository_status": status,
}
@staticmethod
@handle_validation_errors
def validate_file_paths(file_paths: List[str], repo_path: str) -> Dict[str, Any]:
"""
Validate file paths are safe and within repository.
Args:
file_paths: List of file paths to validate
repo_path: Repository root path
Returns:
Dict containing validation results for each file
"""
repo_root = Path(repo_path).resolve()
results = {"all_valid": True, "files": [], "invalid_count": 0, "valid_count": 0}
for file_path in file_paths:
file_result = {
"path": file_path,
"is_valid": False,
"exists": False,
"within_repo": False,
"error": None,
}
try:
# Check for empty path
if not file_path or not file_path.strip():
file_result["error"] = "Empty file path"
results["files"].append(file_result)
results["all_valid"] = False
results["invalid_count"] += 1
continue
# Resolve path
full_path = (repo_root / file_path).resolve()
# Check if within repository
if not str(full_path).startswith(str(repo_root)):
file_result["error"] = "Path outside repository bounds"
results["files"].append(file_result)
results["all_valid"] = False
results["invalid_count"] += 1
# Raise exception for path traversal attempts
raise RepositoryError(
f"Path traversal attempt detected: {file_path}",
repo_path=str(repo_path),
)
file_result["within_repo"] = True
# Check if exists
file_result["exists"] = full_path.exists()
# Mark as valid if within repo (existence is optional)
file_result["is_valid"] = True
results["valid_count"] += 1
except RepositoryError:
# Re-raise repository errors
raise
except Exception as e:
file_result["error"] = str(e)
results["all_valid"] = False
results["invalid_count"] += 1
results["files"].append(file_result)
return results
@staticmethod
def sanitize_inputs(**kwargs) -> Dict[str, Any]:
"""
Sanitize all inputs for security.
Args:
**kwargs: Input parameters to sanitize
Returns:
Dict containing sanitized inputs
"""
sanitized = {}
for key, value in kwargs.items():
if value is None:
sanitized[key] = None
continue
if isinstance(value, str):
# Remove dangerous shell metacharacters
dangerous_chars = ["`", "$", ";", "|", "&", ">", "<", "\x00"]
clean_value = value
for char in dangerous_chars:
clean_value = clean_value.replace(char, "")
# Remove excessive whitespace
clean_value = re.sub(r"\s+", " ", clean_value.strip())
sanitized[key] = clean_value
elif isinstance(value, list):
# Recursively sanitize list items
sanitized[key] = [
ValidationService.sanitize_inputs(item=item)["item"]
if isinstance(item, (str, dict))
else item
for item in value
]
elif isinstance(value, dict):
# Recursively sanitize dict values
sanitized[key] = ValidationService.sanitize_inputs(**value)
else:
# Keep other types as-is
sanitized[key] = value
return sanitized
@staticmethod
@handle_validation_errors
def validate_parameters(
params: Dict[str, Any], schema: Dict[str, Any]
) -> Dict[str, Any]:
"""
Validate parameters against schema.
Args:
params: Parameters to validate
schema: Schema defining required and optional parameters
Returns:
Dict containing validation results
"""
errors = []
warnings = []
# Check required parameters
required = schema.get("required", [])
for param in required:
if param not in params or params[param] is None:
errors.append(f"Missing required parameter: {param}")
# If required parameters are missing, raise error
if errors:
raise ValidationError(
f"Missing required parameters: {', '.join(errors)}",
validation_type="parameter_validation",
invalid_value=str(params),
)
# Check parameter types
properties = schema.get("properties", {})
type_errors = []
for param, value in params.items():
if param in properties:
param_schema = properties[param]
expected_type = param_schema.get("type")
if expected_type:
# Handle type validation
if expected_type == "string" and not isinstance(value, str):
type_errors.append(
f"Parameter '{param}' should be string, got {type(value).__name__}"
)
elif expected_type == "boolean" and not isinstance(value, bool):
type_errors.append(
f"Parameter '{param}' should be boolean, got {type(value).__name__}"
)
elif expected_type == "integer" and not isinstance(value, int):
type_errors.append(
f"Parameter '{param}' should be integer, got {type(value).__name__}"
)
elif expected_type == "array" and not isinstance(value, list):
type_errors.append(
f"Parameter '{param}' should be array, got {type(value).__name__}"
)
elif expected_type == "object" and not isinstance(value, dict):
type_errors.append(
f"Parameter '{param}' should be object, got {type(value).__name__}"
)
else:
warnings.append(f"Unknown parameter: {param}")
# If type errors exist, raise validation error
if type_errors:
raise ValidationError(
f"Parameter type validation failed: {', '.join(type_errors)}",
validation_type="parameter_type_validation",
invalid_value=str(params),
)
return {
"is_valid": True,
"warnings": warnings,
"parameters": params,
"schema": schema,
}
@staticmethod
@handle_validation_errors
def validate_commit_type(
commit_type: str, available_types: List[str]
) -> Dict[str, Any]:
"""
Validate commit type against available types.
Args:
commit_type: Commit type to validate
available_types: List of available commit types
Returns:
Dict containing validation result and suggestions
"""
if not commit_type:
raise ValidationError(
"Commit type cannot be empty",
validation_type="commit_type",
invalid_value="",
expected_format=f"One of: {', '.join(available_types[:5])}",
)
# Exact match
if commit_type in available_types:
return {"is_valid": True, "commit_type": commit_type, "match_type": "exact"}
# Case-insensitive match
lower_type = commit_type.lower()
for available in available_types:
if available.lower() == lower_type:
return {
"is_valid": True,
"commit_type": available,
"match_type": "case_insensitive",
"original": commit_type,
}
# Find similar types
suggestions = []
for available in available_types:
if (
lower_type in available.lower()
or available.lower() in lower_type
or available.lower().startswith(lower_type[:3])
):
suggestions.append(available)
# Raise validation error with suggestions
raise ValidationError(
f"Invalid commit type: {commit_type}",
validation_type="commit_type",
invalid_value=commit_type,
expected_format=f"One of: {', '.join(available_types)}",
)