Skip to main content
Glama

Scribe MCP Server

by paxocial
integrity_verifier.py18.8 kB
"""File system integrity verification and validation.""" from __future__ import annotations import asyncio import hashlib import json import logging import time from dataclasses import dataclass, field from enum import Enum from pathlib import Path from typing import Any, Dict, List, Optional, Set, Tuple from scribe_mcp.storage.base import StorageBackend from scribe_mcp.utils.time import utcnow class IntegrityStatus(Enum): """File integrity status.""" VALID = "valid" # File matches database record CORRUPTED = "corrupted" # File content is corrupted MISSING = "missing" # File exists in database but not on disk UNTRACKED = "untracked" # File exists on disk but not in database MODIFIED = "modified" # File modified since last sync ACCESS_DENIED = "access_denied" # Cannot access file @dataclass class IntegrityCheck: """Result of integrity check for a single file.""" file_path: Path status: IntegrityStatus expected_hash: Optional[str] actual_hash: Optional[str] file_size: int last_modified: float database_timestamp: Optional[float] error_message: Optional[str] = None metadata: Dict[str, Any] = field(default_factory=dict) @dataclass class IntegrityReport: """Summary of integrity verification results.""" timestamp: str total_files_checked: int status_counts: Dict[IntegrityStatus, int] files_with_issues: List[IntegrityCheck] check_duration: float recommendations: List[str] class IntegrityVerifier: """Verifies file system integrity against database records.""" def __init__( self, storage: StorageBackend, project_root: Path, cache_duration: float = 300.0 # 5 minutes ): self.storage = storage self.project_root = Path(project_root) self.cache_duration = cache_duration self._logger = logging.getLogger(__name__) self._integrity_cache: Dict[str, Tuple[float, IntegrityCheck]] = {} self._verification_lock = asyncio.Lock() async def verify_file_integrity(self, file_path: Path) -> IntegrityCheck: """Verify integrity of a single file.""" try: # Check cache first cache_key = str(file_path) if cache_key in self._integrity_cache: cached_time, cached_result = self._integrity_cache[cache_key] if time.time() - cached_time < self.cache_duration: return cached_result # Get database record db_record = await self._get_database_record(file_path) # Check file on disk file_info = await self._get_file_info(file_path) # Perform integrity check check_result = await self._perform_integrity_check(file_path, db_record, file_info) # Cache result self._integrity_cache[cache_key] = (time.time(), check_result) return check_result except Exception as e: self._logger.error(f"Failed to verify integrity for {file_path}: {e}") return IntegrityCheck( file_path=file_path, status=IntegrityStatus.ACCESS_DENIED, expected_hash=None, actual_hash=None, file_size=0, last_modified=0, database_timestamp=None, error_message=str(e) ) async def verify_project_integrity( self, file_patterns: Optional[List[str]] = None, include_untracked: bool = True ) -> IntegrityReport: """Verify integrity of all tracked files in the project.""" start_time = time.time() async with self._verification_lock: try: # Get files to check files_to_check = await self._get_files_to_check(file_patterns, include_untracked) # Check each file checks = [] for file_path in files_to_check: check = await self.verify_file_integrity(file_path) checks.append(check) # Generate report report = await self._generate_integrity_report(checks, time.time() - start_time) self._logger.info(f"Integrity verification completed: {report.total_files_checked} files checked") return report except Exception as e: self._logger.error(f"Failed to verify project integrity: {e}") return IntegrityReport( timestamp=utcnow().isoformat(), total_files_checked=0, status_counts={}, files_with_issues=[], check_duration=time.time() - start_time, recommendations=[f"Integrity verification failed: {e}"] ) async def _get_database_record(self, file_path: Path) -> Optional[Dict[str, Any]]: """Get database record for a file.""" try: # Try sync_status first result = await self.storage._fetchone( """ SELECT last_file_hash, last_sync_at, sync_status FROM sync_status WHERE file_path = ? AND project_root = ? """, (str(file_path), str(self.project_root)) ) if result: return { 'hash': result['last_file_hash'], 'timestamp': result['last_sync_at'], 'status': result['sync_status'] } # Try document_sections result = await self.storage._fetchone( """ SELECT file_hash, updated_at FROM document_sections WHERE file_path = ? AND project_root = ? ORDER BY updated_at DESC LIMIT 1 """, (str(file_path), str(self.project_root)) ) if result: return { 'hash': result['file_hash'], 'timestamp': result['updated_at'], 'status': 'synced' } return None except Exception as e: self._logger.debug(f"Failed to get database record for {file_path}: {e}") return None async def _get_file_info(self, file_path: Path) -> Optional[Dict[str, Any]]: """Get file information from disk.""" try: if not file_path.exists(): return None stat = file_path.stat() # Calculate file hash file_hash = await self._calculate_file_hash(file_path) return { 'hash': file_hash, 'size': stat.st_size, 'modified': stat.st_mtime, 'exists': True, 'accessible': True } except (OSError, PermissionError) as e: self._logger.debug(f"Cannot access file {file_path}: {e}") return { 'hash': None, 'size': 0, 'modified': 0, 'exists': file_path.exists(), 'accessible': False, 'error': str(e) } async def _perform_integrity_check( self, file_path: Path, db_record: Optional[Dict[str, Any]], file_info: Optional[Dict[str, Any]] ) -> IntegrityCheck: """Perform the actual integrity check.""" # File doesn't exist on disk if not file_info or not file_info.get('exists'): if db_record: return IntegrityCheck( file_path=file_path, status=IntegrityStatus.MISSING, expected_hash=db_record.get('hash'), actual_hash=None, file_size=0, last_modified=0, database_timestamp=db_record.get('timestamp') ) else: # File doesn't exist anywhere return IntegrityCheck( file_path=file_path, status=IntegrityStatus.VALID, expected_hash=None, actual_hash=None, file_size=0, last_modified=0, database_timestamp=None ) # File exists but is not accessible if not file_info.get('accessible'): return IntegrityCheck( file_path=file_path, status=IntegrityStatus.ACCESS_DENIED, expected_hash=db_record.get('hash') if db_record else None, actual_hash=None, file_size=file_info.get('size', 0), last_modified=file_info.get('modified', 0), database_timestamp=db_record.get('timestamp') if db_record else None, error_message=file_info.get('error', 'Access denied') ) # File exists but no database record if not db_record: return IntegrityCheck( file_path=file_path, status=IntegrityStatus.UNTRACKED, expected_hash=None, actual_hash=file_info.get('hash'), file_size=file_info.get('size', 0), last_modified=file_info.get('modified', 0), database_timestamp=None ) # Compare hashes expected_hash = db_record.get('hash') actual_hash = file_info.get('hash') if expected_hash == actual_hash: status = IntegrityStatus.VALID else: # Check if file was modified after last sync file_time = float(file_info.get('modified', 0) or 0.0) db_raw = db_record.get('timestamp', 0) try: if isinstance(db_raw, (int, float)): db_time = float(db_raw) else: from datetime import datetime db_time = datetime.fromisoformat(str(db_raw)).timestamp() except Exception: db_time = 0.0 if file_time > db_time: status = IntegrityStatus.MODIFIED else: status = IntegrityStatus.CORRUPTED return IntegrityCheck( file_path=file_path, status=status, expected_hash=expected_hash, actual_hash=actual_hash, file_size=file_info.get('size', 0), last_modified=file_info.get('modified', 0), database_timestamp=db_record.get('timestamp') ) async def _get_files_to_check( self, file_patterns: Optional[List[str]], include_untracked: bool ) -> List[Path]: """Get list of files to check.""" files = set() # Get tracked files from database try: # From sync_status results = await self.storage._fetchall( """ SELECT DISTINCT file_path FROM sync_status WHERE project_root = ? """, (str(self.project_root),) ) for result in results: files.add(Path(result['file_path'])) # From document_sections results = await self.storage._fetchall( """ SELECT DISTINCT file_path FROM document_sections WHERE project_root = ? """, (str(self.project_root),) ) for result in results: files.add(Path(result['file_path'])) except Exception as e: self._logger.warning(f"Failed to get tracked files from database: {e}") # Add untracked files if requested if include_untracked: patterns = file_patterns or ['**/*.md', '**/*.txt', '**/*.json'] for pattern in patterns: try: for file_path in self.project_root.glob(pattern): if file_path.is_file(): files.add(file_path) except Exception as e: self._logger.warning(f"Failed to glob pattern {pattern}: {e}") return list(files) async def _generate_integrity_report(self, checks: List[IntegrityCheck], duration: float) -> IntegrityReport: """Generate integrity verification report.""" # Count statuses status_counts = {} files_with_issues = [] for check in checks: status_counts[check.status] = status_counts.get(check.status, 0) + 1 if check.status != IntegrityStatus.VALID: files_with_issues.append(check) # Generate recommendations recommendations = self._generate_recommendations(status_counts, files_with_issues) return IntegrityReport( timestamp=utcnow().isoformat(), total_files_checked=len(checks), status_counts=status_counts, files_with_issues=files_with_issues, check_duration=duration, recommendations=recommendations ) def _generate_recommendations( self, status_counts: Dict[IntegrityStatus, int], files_with_issues: List[IntegrityCheck] ) -> List[str]: """Generate recommendations based on integrity check results.""" recommendations = [] if status_counts.get(IntegrityStatus.CORRUPTED, 0) > 0: recommendations.append( f"Found {status_counts[IntegrityStatus.CORRUPTED]} corrupted files. " "Consider restoring from backup or database." ) if status_counts.get(IntegrityStatus.MISSING, 0) > 0: recommendations.append( f"Found {status_counts[IntegrityStatus.MISSING]} missing files. " "Check if files were intentionally deleted or need restoration." ) if status_counts.get(IntegrityStatus.MODIFIED, 0) > 0: recommendations.append( f"Found {status_counts[IntegrityStatus.MODIFIED]} modified files. " "Run sync to update database with latest changes." ) if status_counts.get(IntegrityStatus.UNTRACKED, 0) > 0: recommendations.append( f"Found {status_counts[IntegrityStatus.UNTRACKED]} untracked files. " "Consider adding them to the database if they should be tracked." ) if status_counts.get(IntegrityStatus.ACCESS_DENIED, 0) > 0: recommendations.append( f"Found {status_counts[IntegrityStatus.ACCESS_DENIED]} inaccessible files. " "Check file permissions and accessibility." ) if not files_with_issues: recommendations.append("All files have valid integrity. No action needed.") return recommendations async def _calculate_file_hash(self, file_path: Path) -> Optional[str]: """Calculate SHA-256 hash of file content (fast-path: first 64KB).""" try: h = hashlib.sha256() with open(file_path, 'rb') as f: content = f.read(65536) h.update(content) return h.hexdigest() except (OSError, IOError): return None async def repair_file(self, file_path: Path, repair_strategy: str = "database_wins") -> bool: """Attempt to repair a file with integrity issues.""" try: check = await self.verify_file_integrity(file_path) if check.status == IntegrityStatus.VALID: return True # No repair needed if repair_strategy == "database_wins": # Restore from database db_record = await self._get_database_record(file_path) if db_record and db_record.get('hash'): # Get content from document_sections content_result = await self.storage._fetchone( """ SELECT content FROM document_sections WHERE file_path = ? AND file_hash = ? AND project_root = ? ORDER BY updated_at DESC LIMIT 1 """, (str(file_path), db_record['hash'], str(self.project_root)) ) if content_result: file_path.write_text(content_result['content'], encoding='utf-8') self._logger.info(f"Repaired {file_path} from database") return True elif repair_strategy == "file_wins": # Update database with file content if file_path.exists(): content = file_path.read_text(encoding='utf-8') file_hash = await self._calculate_file_hash(file_path) await self.storage._execute( """ INSERT OR REPLACE INTO document_sections (project_root, file_path, relative_path, content, file_hash, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?) """, [ str(self.project_root), str(file_path), str(file_path.relative_to(self.project_root)), content, file_hash, utcnow(), utcnow() ] ) self._logger.info(f"Updated database with {file_path} content") return True return False except Exception as e: self._logger.error(f"Failed to repair {file_path}: {e}") return False async def clear_cache(self): """Clear the integrity check cache.""" self._integrity_cache.clear() self._logger.debug("Integrity check cache cleared") def get_cache_stats(self) -> Dict[str, Any]: """Get statistics about the integrity check cache.""" return { 'cached_files': len(self._integrity_cache), 'cache_duration': self.cache_duration, 'oldest_entry': min((t for t, _ in self._integrity_cache.values()), default=0), 'newest_entry': max((t for t, _ in self._integrity_cache.values()), default=0) }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/paxocial/scribe_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server