Skip to main content
Glama

USPTO Final Petition Decisions MCP Server

by john-walkoe
retention_policy.pyβ€’7.5 kB
""" Log retention and disposal policy implementation. Fixes: - CWE-404: Improper Resource Shutdown or Release - GDPR Article 17 (right to erasure) compliance - SOC 2, PCI-DSS log retention requirements Retention Periods: - Security logs: 90 days (compliance requirement) - Application logs: 30 days (operational needs) - Error logs: 60 days (debugging window) - Audit logs: 365 days (legal/compliance) """ import logging from datetime import datetime, timedelta from pathlib import Path from typing import Optional, Dict, Any class LogRetentionPolicy: """ Implements log retention and automated cleanup. Retention Periods: - Security logs: 90 days (compliance requirement) - Application logs: 30 days (operational needs) - Error logs: 60 days (debugging window) - Audit logs: 365 days (legal/compliance) """ RETENTION_PERIODS = { "fpd-mcp-security.log": 90, # days "fpd-mcp.log": 30, "fpd-mcp-errors.log": 60, "fpd-mcp-audit.log": 365 } def __init__(self, log_dir: Path): """ Initialize retention policy manager. Args: log_dir: Directory containing log files """ self.log_dir = log_dir self.logger = logging.getLogger(__name__) def cleanup_old_logs(self, dry_run: bool = False) -> Dict[str, Any]: """ Remove log files older than retention period. Args: dry_run: If True, only report what would be deleted Returns: Dictionary with cleanup statistics """ now = datetime.now() total_deleted = 0 total_size_freed = 0 for log_file_pattern, retention_days in self.RETENTION_PERIODS.items(): cutoff_date = now - timedelta(days=retention_days) # Find all rotated versions (e.g., fpd-mcp.log.1, fpd-mcp.log.2.gz) for log_file in self.log_dir.glob(f"{log_file_pattern}*"): if not log_file.is_file(): continue # Skip the current active log file (without rotation suffix) if log_file.name == log_file_pattern: continue # Check file modification time mtime = datetime.fromtimestamp(log_file.stat().st_mtime) if mtime < cutoff_date: size = log_file.stat().st_size if dry_run: self.logger.info( f"Would delete: {log_file} (age: {(now - mtime).days} days, " f"size: {size / (1024*1024):.2f}MB)" ) else: self.logger.info( f"Deleting old log: {log_file} (age: {(now - mtime).days} days)" ) try: log_file.unlink() except Exception as e: self.logger.error(f"Failed to delete {log_file}: {e}") continue total_deleted += 1 total_size_freed += size self.logger.info( f"Log cleanup complete: {total_deleted} files, " f"{total_size_freed / (1024*1024):.2f}MB freed" ) return { "files_deleted": total_deleted, "bytes_freed": total_size_freed, "mb_freed": total_size_freed / (1024*1024), "dry_run": dry_run } def get_retention_status(self) -> Dict[str, Any]: """ Get current status of log retention. Returns: Dictionary with retention statistics """ now = datetime.now() status = {} for log_file_pattern, retention_days in self.RETENTION_PERIODS.items(): cutoff_date = now - timedelta(days=retention_days) files_to_delete = [] files_to_keep = [] size_to_delete = 0 size_to_keep = 0 for log_file in self.log_dir.glob(f"{log_file_pattern}*"): if not log_file.is_file(): continue # Skip active log file if log_file.name == log_file_pattern: files_to_keep.append(log_file.name) size_to_keep += log_file.stat().st_size continue mtime = datetime.fromtimestamp(log_file.stat().st_mtime) size = log_file.stat().st_size if mtime < cutoff_date: files_to_delete.append({ "file": log_file.name, "age_days": (now - mtime).days, "size_mb": size / (1024*1024) }) size_to_delete += size else: files_to_keep.append(log_file.name) size_to_keep += size status[log_file_pattern] = { "retention_days": retention_days, "files_to_delete": len(files_to_delete), "files_to_keep": len(files_to_keep), "size_to_delete_mb": size_to_delete / (1024*1024), "size_to_keep_mb": size_to_keep / (1024*1024), "details": files_to_delete } return status def verify_compliance(self) -> Dict[str, bool]: """ Verify retention policy compliance. Returns: Dictionary with compliance status for each log type """ now = datetime.now() compliance = {} for log_file_pattern, retention_days in self.RETENTION_PERIODS.items(): cutoff_date = now - timedelta(days=retention_days) # Check if any files older than retention period exist has_old_files = False for log_file in self.log_dir.glob(f"{log_file_pattern}*"): if not log_file.is_file(): continue # Skip active log file if log_file.name == log_file_pattern: continue mtime = datetime.fromtimestamp(log_file.stat().st_mtime) if mtime < cutoff_date: has_old_files = True break # Compliance is True if no old files exist compliance[log_file_pattern] = not has_old_files return compliance def schedule_cleanup(log_dir: Path, dry_run: bool = False): """ Schedule log cleanup based on retention policy. This function should be called periodically (e.g., daily via cron). Args: log_dir: Directory containing log files dry_run: If True, only report what would be deleted Returns: Cleanup statistics """ policy = LogRetentionPolicy(log_dir) logger = logging.getLogger(__name__) logger.info("Starting scheduled log cleanup") # Get retention status before cleanup status_before = policy.get_retention_status() logger.info(f"Retention status before cleanup: {status_before}") # Perform cleanup results = policy.cleanup_old_logs(dry_run=dry_run) # Verify compliance after cleanup compliance = policy.verify_compliance() logger.info(f"Retention compliance after cleanup: {compliance}") return { "cleanup_results": results, "compliance_status": compliance, "status_before": status_before }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/john-walkoe/uspto_fpd_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server