Skip to main content
Glama

MockLoop MCP Server

Official
by MockLoop
mcp_compliance.pyโ€ข31.2 kB
""" MCP Compliance Reporter for regulatory compliance and data governance. This module provides comprehensive compliance reporting capabilities including: - Generate compliance reports showing all MCP data usage - Track data sources and transformations for model training transparency - Export audit logs in standard compliance formats (JSON, CSV) - Data retention and purging capabilities per regulatory requirements - GDPR, CCPA, and other data privacy regulation support """ import csv import json import sqlite3 from datetime import datetime, timezone, timedelta from pathlib import Path from typing import Any, Optional, Union from dataclasses import dataclass from enum import Enum import zipfile import hashlib class ComplianceRegulation(Enum): """Supported compliance regulations.""" GDPR = "gdpr" CCPA = "ccpa" HIPAA = "hipaa" SOX = "sox" PCI_DSS = "pci_dss" ISO_27001 = "iso_27001" class ReportFormat(Enum): """Supported report formats.""" JSON = "json" CSV = "csv" XML = "xml" PDF = "pdf" @dataclass class ComplianceReport: """Structured compliance report.""" report_id: str report_type: str regulation: str generated_at: str period_start: str period_end: str total_operations: int data_subjects_count: int data_sources: list[str] processing_purposes: list[str] legal_bases: list[str] retention_policies: list[str] compliance_violations: list[dict[str, Any]] recommendations: list[str] metadata: dict[str, Any] class MCPComplianceReporter: """ Comprehensive compliance reporter for MCP audit logs. Features: - Generate compliance reports for various regulations - Export audit data in multiple formats - Data lineage tracking and visualization - Automated compliance checking - Data retention policy enforcement - Privacy impact assessments """ def __init__( self, audit_db_path: str, reports_output_dir: str = "compliance_reports", default_regulation: ComplianceRegulation = ComplianceRegulation.GDPR, ): """ Initialize the compliance reporter. Args: audit_db_path: Path to the MCP audit database reports_output_dir: Directory to store generated reports default_regulation: Default compliance regulation to apply """ self.audit_db_path = Path(audit_db_path) self.reports_output_dir = Path(reports_output_dir) self.default_regulation = default_regulation # Create reports directory if it doesn't exist self.reports_output_dir.mkdir(parents=True, exist_ok=True) # Initialize compliance rules self._init_compliance_rules() def _init_compliance_rules(self) -> None: """Initialize compliance rules for different regulations.""" self.compliance_rules = { ComplianceRegulation.GDPR: { "max_retention_days": 2555, # 7 years "required_legal_bases": [ "consent", "contract", "legal_obligation", "vital_interests", "public_task", "legitimate_interests", ], "data_subject_rights": [ "access", "rectification", "erasure", "portability", "restriction", "objection", "automated_decision_making", ], "breach_notification_hours": 72, "privacy_by_design": True, }, ComplianceRegulation.CCPA: { "max_retention_days": 1825, # 5 years "consumer_rights": ["know", "delete", "opt_out", "non_discrimination"], "sale_disclosure_required": True, "privacy_policy_required": True, }, ComplianceRegulation.HIPAA: { "max_retention_days": 2190, # 6 years "minimum_necessary_standard": True, "encryption_required": True, "audit_controls_required": True, "access_controls_required": True, }, } def generate_compliance_report( self, regulation: ComplianceRegulation | None = None, start_date: str | None = None, end_date: str | None = None, include_violations: bool = True, include_recommendations: bool = True, ) -> ComplianceReport: """ Generate a comprehensive compliance report. Args: regulation: Compliance regulation to report against start_date: Start date for the report period (ISO format) end_date: End date for the report period (ISO format) include_violations: Whether to include compliance violations include_recommendations: Whether to include recommendations Returns: Generated compliance report """ regulation = regulation or self.default_regulation # Set default date range if not provided if not end_date: end_date = datetime.now(timezone.utc).isoformat() # noqa: UP017 if not start_date: start_date = (datetime.now(timezone.utc) - timedelta(days=30)).isoformat() # noqa: UP017 # Query audit logs for the period audit_logs = self._query_audit_logs(start_date, end_date) # Analyze compliance compliance_analysis = self._analyze_compliance(audit_logs, regulation) # Generate report report = ComplianceReport( report_id=self._generate_report_id(), report_type=f"{regulation.value}_compliance", regulation=regulation.value, generated_at=datetime.now(timezone.utc).isoformat(), # noqa: UP017 period_start=start_date, period_end=end_date, total_operations=len(audit_logs), data_subjects_count=compliance_analysis["data_subjects_count"], data_sources=compliance_analysis["data_sources"], processing_purposes=compliance_analysis["processing_purposes"], legal_bases=compliance_analysis["legal_bases"], retention_policies=compliance_analysis["retention_policies"], compliance_violations=compliance_analysis["violations"] if include_violations else [], recommendations=compliance_analysis["recommendations"] if include_recommendations else [], metadata=compliance_analysis["metadata"], ) return report def export_audit_logs( self, start_date: str | None = None, end_date: str | None = None, filter_criteria: dict[str, Any] | None = None, include_sensitive_data: bool = False, export_format: ReportFormat = ReportFormat.JSON, ) -> str: """ Export audit logs in specified format. Args: start_date: Start date filter (ISO format) end_date: End date filter (ISO format) filter_criteria: Additional filtering criteria include_sensitive_data: Whether to include sensitive data export_format: Export format (JSON, CSV, XML) Returns: Path to the exported file """ # Query audit logs audit_logs = self._query_audit_logs(start_date, end_date, filter_criteria) # Sanitize data if needed if not include_sensitive_data: audit_logs = self._sanitize_audit_logs(audit_logs) # Generate filename timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"mcp_audit_export_{timestamp}.{export_format.value}" filepath = self.reports_output_dir / filename # Export based on format if export_format == ReportFormat.JSON: self._export_json(audit_logs, filepath) elif export_format == ReportFormat.CSV: self._export_csv(audit_logs, filepath) elif export_format == ReportFormat.XML: self._export_xml(audit_logs, filepath) else: raise ValueError(f"Unsupported export format: {export_format}") return str(filepath) def generate_data_lineage_report( self, data_source: str | None = None, operation_type: str | None = None, start_date: str | None = None, end_date: str | None = None, ) -> dict[str, Any]: """ Generate data lineage report for model training transparency. Args: data_source: Specific data source to trace operation_type: Filter by operation type start_date: Start date filter (ISO format) end_date: End date filter (ISO format) Returns: Data lineage report with source tracking """ with sqlite3.connect(str(self.audit_db_path)) as conn: conn.row_factory = sqlite3.Row cursor = conn.cursor() # Build query for data lineage query = """ SELECT mal.entry_id, mal.timestamp, mal.operation_type, mal.operation_name, mal.data_sources, mdl.source_type, mdl.source_identifier, mdl.source_metadata, mdl.transformation_applied FROM mcp_audit_logs mal LEFT JOIN mcp_data_lineage mdl ON mal.entry_id = mdl.entry_id WHERE 1=1 """ params = [] if data_source: query += " AND mdl.source_identifier LIKE ?" params.append(f"%{data_source}%") if operation_type: query += " AND mal.operation_type = ?" params.append(operation_type) if start_date: query += " AND mal.timestamp >= ?" params.append(start_date) if end_date: query += " AND mal.timestamp <= ?" params.append(end_date) query += " ORDER BY mal.timestamp DESC" cursor.execute(query, params) rows = cursor.fetchall() # Process lineage data lineage_map = {} source_stats = {} for row in rows: entry_id = row["entry_id"] source_id = row["source_identifier"] if entry_id not in lineage_map: lineage_map[entry_id] = { "timestamp": row["timestamp"], "operation_type": row["operation_type"], "operation_name": row["operation_name"], "sources": [], } if source_id: source_info = { "source_type": row["source_type"], "source_identifier": source_id, "metadata": json.loads(row["source_metadata"]) if row["source_metadata"] else {}, "transformation": row["transformation_applied"], } lineage_map[entry_id]["sources"].append(source_info) # Update source statistics if source_id not in source_stats: source_stats[source_id] = { "usage_count": 0, "first_used": row["timestamp"], "last_used": row["timestamp"], "operations": set(), } source_stats[source_id]["usage_count"] += 1 source_stats[source_id]["last_used"] = max( source_stats[source_id]["last_used"], row["timestamp"] ) source_stats[source_id]["operations"].add(row["operation_type"]) # Convert sets to lists for JSON serialization for stats in source_stats.values(): stats["operations"] = list(stats["operations"]) return { "lineage_entries": list(lineage_map.values()), "source_statistics": source_stats, "total_entries": len(lineage_map), "unique_sources": len(source_stats), "generated_at": datetime.now(timezone.utc).isoformat(), # noqa: UP017 } def check_retention_compliance(self) -> dict[str, Any]: """ Check compliance with data retention policies. Returns: Retention compliance report """ current_time = datetime.now(timezone.utc) # noqa: UP017 with sqlite3.connect(str(self.audit_db_path)) as conn: conn.row_factory = sqlite3.Row cursor = conn.cursor() # Check for expired logs cursor.execute( """ SELECT COUNT(*) as expired_count, MIN(expires_at) as oldest_expired, MAX(expires_at) as newest_expired FROM mcp_audit_logs WHERE expires_at < ? """, (current_time.isoformat(),), ) expired_result = cursor.fetchone() # Check retention policy distribution cursor.execute(""" SELECT retention_policy, COUNT(*) as count, MIN(timestamp) as oldest_entry, MAX(timestamp) as newest_entry FROM mcp_audit_logs GROUP BY retention_policy """) retention_distribution = [dict(row) for row in cursor.fetchall()] # Check for logs without retention policy cursor.execute(""" SELECT COUNT(*) as count FROM mcp_audit_logs WHERE retention_policy IS NULL OR retention_policy = '' """) missing_policy_count = cursor.fetchone()["count"] return { "expired_logs": { "count": expired_result["expired_count"], "oldest_expired": expired_result["oldest_expired"], "newest_expired": expired_result["newest_expired"], }, "retention_distribution": retention_distribution, "missing_retention_policy": missing_policy_count, "compliance_status": "compliant" if expired_result["expired_count"] == 0 else "non_compliant", "checked_at": current_time.isoformat(), } def purge_expired_data(self, dry_run: bool = True) -> dict[str, Any]: """ Purge expired audit data according to retention policies. Args: dry_run: If True, only report what would be deleted Returns: Purge operation results """ current_time = datetime.now(timezone.utc) # noqa: UP017 with sqlite3.connect(str(self.audit_db_path)) as conn: conn.row_factory = sqlite3.Row cursor = conn.cursor() # Find expired logs cursor.execute( """ SELECT entry_id, timestamp, operation_type, expires_at FROM mcp_audit_logs WHERE expires_at < ? ORDER BY expires_at """, (current_time.isoformat(),), ) expired_logs = [dict(row) for row in cursor.fetchall()] if not dry_run and expired_logs: # Delete expired logs and related data expired_ids = [log["entry_id"] for log in expired_logs] if expired_ids: # Create placeholders for safe SQL execution placeholders = ",".join(["?" for _ in expired_ids]) # Delete from data lineage table sql_data_lineage = f"DELETE FROM mcp_data_lineage WHERE entry_id IN ({placeholders})" # noqa: S608 cursor.execute(sql_data_lineage, expired_ids) # Delete from compliance events table sql_compliance_events = f"DELETE FROM mcp_compliance_events WHERE entry_id IN ({placeholders})" # noqa: S608 cursor.execute(sql_compliance_events, expired_ids) # Delete from audit logs table sql_audit_logs = ( f"DELETE FROM mcp_audit_logs WHERE entry_id IN ({placeholders})" # noqa: S608 ) cursor.execute(sql_audit_logs, expired_ids) conn.commit() return { "operation": "purge_expired_data", "dry_run": dry_run, "expired_logs_found": len(expired_logs), "logs_deleted": len(expired_logs) if not dry_run else 0, "oldest_expired": expired_logs[0]["timestamp"] if expired_logs else None, "newest_expired": expired_logs[-1]["timestamp"] if expired_logs else None, "executed_at": current_time.isoformat(), } def generate_privacy_impact_assessment( self, processing_purpose: str, data_types: list[str], start_date: str | None = None, end_date: str | None = None, ) -> dict[str, Any]: """ Generate Privacy Impact Assessment (PIA) report. Args: processing_purpose: Purpose of data processing data_types: Types of data being processed start_date: Assessment period start date end_date: Assessment period end date Returns: Privacy Impact Assessment report """ # Query relevant audit logs filter_criteria = {"processing_purpose": processing_purpose} audit_logs = self._query_audit_logs(start_date, end_date, filter_criteria) # Analyze privacy risks risk_analysis = self._analyze_privacy_risks(audit_logs, data_types) # Generate recommendations recommendations = self._generate_privacy_recommendations(risk_analysis) return { "assessment_id": self._generate_report_id(), "processing_purpose": processing_purpose, "data_types": data_types, "assessment_period": {"start": start_date, "end": end_date}, "operations_analyzed": len(audit_logs), "risk_analysis": risk_analysis, "recommendations": recommendations, "compliance_status": risk_analysis.get("overall_risk_level", "unknown"), "generated_at": datetime.now(timezone.utc).isoformat(), # noqa: UP017 } def _query_audit_logs( self, start_date: str | None = None, end_date: str | None = None, filter_criteria: dict[str, Any] | None = None, ) -> list[dict[str, Any]]: """Query audit logs with optional filtering.""" with sqlite3.connect(str(self.audit_db_path)) as conn: conn.row_factory = sqlite3.Row cursor = conn.cursor() query = "SELECT * FROM mcp_audit_logs WHERE 1=1" params = [] if start_date: query += " AND timestamp >= ?" params.append(start_date) if end_date: query += " AND timestamp <= ?" params.append(end_date) if filter_criteria: for key, value in filter_criteria.items(): if key in ["operation_type", "user_id", "processing_purpose"]: query += f" AND {key} = ?" params.append(value) query += " ORDER BY timestamp DESC" cursor.execute(query, params) return [dict(row) for row in cursor.fetchall()] def _analyze_compliance( self, audit_logs: list[dict[str, Any]], regulation: ComplianceRegulation ) -> dict[str, Any]: """Analyze audit logs for compliance violations.""" rules = self.compliance_rules.get(regulation, {}) violations = [] recommendations = [] # Track unique data subjects and sources data_subjects = set() data_sources = set() processing_purposes = set() legal_bases = set() retention_policies = set() for log in audit_logs: # Parse JSON fields try: data_sources_list = json.loads(log.get("data_sources", "[]")) json.loads(log.get("compliance_tags", "[]")) except (json.JSONDecodeError, TypeError): data_sources_list = [] # Collect metadata if log.get("data_subject_id"): data_subjects.add(log["data_subject_id"]) data_sources.update(data_sources_list) if log.get("processing_purpose"): processing_purposes.add(log["processing_purpose"]) if log.get("legal_basis"): legal_bases.add(log["legal_basis"]) if log.get("retention_policy"): retention_policies.add(log["retention_policy"]) # Check for violations if regulation == ComplianceRegulation.GDPR: violations.extend(self._check_gdpr_compliance(log, rules)) elif regulation == ComplianceRegulation.CCPA: violations.extend(self._check_ccpa_compliance(log, rules)) # Generate recommendations if not legal_bases and regulation == ComplianceRegulation.GDPR: recommendations.append( "Ensure all data processing has a valid legal basis under GDPR" ) if len(data_subjects) > 100 and regulation in [ ComplianceRegulation.GDPR, ComplianceRegulation.CCPA, ]: recommendations.append( "Consider implementing automated data subject rights management" ) return { "data_subjects_count": len(data_subjects), "data_sources": list(data_sources), "processing_purposes": list(processing_purposes), "legal_bases": list(legal_bases), "retention_policies": list(retention_policies), "violations": violations, "recommendations": recommendations, "metadata": { "regulation": regulation.value, "total_logs_analyzed": len(audit_logs), "analysis_timestamp": datetime.now(timezone.utc).isoformat(), # noqa: UP017 }, } def _check_gdpr_compliance( self, log: dict[str, Any], rules: dict[str, Any] ) -> list[dict[str, Any]]: """Check GDPR compliance for a single log entry.""" violations = [] # Check for missing legal basis if log.get("gdpr_applicable") and not log.get("legal_basis"): violations.append( { "type": "missing_legal_basis", "severity": "high", "entry_id": log["entry_id"], "description": "GDPR-applicable operation without legal basis", "regulation": "GDPR Article 6", } ) # Check retention period if log.get("expires_at"): try: expires_at = datetime.fromisoformat( log["expires_at"].replace("Z", "+00:00") ) created_at = datetime.fromisoformat( log["timestamp"].replace("Z", "+00:00") ) retention_days = (expires_at - created_at).days if retention_days > rules.get("max_retention_days", 2555): violations.append( { "type": "excessive_retention", "severity": "medium", "entry_id": log["entry_id"], "description": f"Retention period ({retention_days} days) exceeds GDPR limits", "regulation": "GDPR Article 5(1)(e)", } ) except (ValueError, TypeError): pass return violations def _check_ccpa_compliance( self, log: dict[str, Any], _rules: dict[str, Any] ) -> list[dict[str, Any]]: """Check CCPA compliance for a single log entry.""" violations = [] # Check for missing privacy disclosures if log.get("ccpa_applicable") and not log.get("processing_purpose"): violations.append( { "type": "missing_processing_purpose", "severity": "medium", "entry_id": log["entry_id"], "description": "CCPA-applicable operation without clear processing purpose", "regulation": "CCPA Section 1798.100", } ) return violations def _analyze_privacy_risks( self, audit_logs: list[dict[str, Any]], data_types: list[str] ) -> dict[str, Any]: """Analyze privacy risks in audit logs.""" risk_factors = { "high_volume_processing": len(audit_logs) > 10000, "sensitive_data_types": any( dt in ["pii", "health", "financial"] for dt in data_types ), "cross_border_transfers": False, # Would need geolocation analysis "automated_decision_making": any( "automated" in log.get("operation_name", "") for log in audit_logs ), "data_sharing": any( "share" in log.get("processing_purpose", "") for log in audit_logs ), } # Calculate overall risk level risk_score = sum(risk_factors.values()) if risk_score >= 4: risk_level = "high" elif risk_score >= 2: risk_level = "medium" else: risk_level = "low" return { "overall_risk_level": risk_level, "risk_score": risk_score, "risk_factors": risk_factors, "mitigation_required": risk_level in ["high", "medium"], } def _generate_privacy_recommendations( self, risk_analysis: dict[str, Any] ) -> list[str]: """Generate privacy recommendations based on risk analysis.""" recommendations = [] if risk_analysis["risk_factors"]["high_volume_processing"]: recommendations.append( "Implement data minimization techniques to reduce processing volume" ) if risk_analysis["risk_factors"]["sensitive_data_types"]: recommendations.append( "Apply additional security controls for sensitive data processing" ) if risk_analysis["risk_factors"]["automated_decision_making"]: recommendations.append( "Ensure transparency and explainability in automated decision-making" ) if risk_analysis["overall_risk_level"] == "high": recommendations.append( "Conduct formal Data Protection Impact Assessment (DPIA)" ) return recommendations def _sanitize_audit_logs( self, audit_logs: list[dict[str, Any]] ) -> list[dict[str, Any]]: """Remove or mask sensitive data from audit logs.""" sanitized_logs = [] for log in audit_logs.copy(): # Remove sensitive fields sensitive_fields = ["data_subject_id", "input_parameters", "output_data"] for field in sensitive_fields: if field in log: log[field] = "[REDACTED]" # Hash user IDs if log.get("user_id"): log["user_id"] = hashlib.sha256(log["user_id"].encode()).hexdigest()[ :16 ] sanitized_logs.append(log) return sanitized_logs def _export_json(self, data: list[dict[str, Any]], filepath: Path) -> None: """Export data to JSON format.""" with open(filepath, "w") as f: json.dump(data, f, indent=2, default=str) def _export_csv(self, data: list[dict[str, Any]], filepath: Path) -> None: """Export data to CSV format.""" if not data: return fieldnames = data[0].keys() with open(filepath, "w", newline="") as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(data) def _export_xml(self, data: list[dict[str, Any]], filepath: Path) -> None: """Export data to XML format.""" try: import defusedxml.ElementTree as ET except ImportError: # Fallback to standard library with security warning import xml.etree.ElementTree as ET # nosec B405 import warnings warnings.warn( "defusedxml not available. Using xml.etree.ElementTree which may be vulnerable to XML attacks. " "Install defusedxml for secure XML processing: pip install defusedxml", UserWarning, stacklevel=2, ) root = ET.Element("audit_logs") for log in data: log_element = ET.SubElement(root, "log_entry") for key, value in log.items(): if value is not None: elem = ET.SubElement(log_element, key) elem.text = str(value) tree = ET.ElementTree(root) tree.write(filepath, encoding="utf-8", xml_declaration=True) def _generate_report_id(self) -> str: """Generate unique report ID.""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") return f"compliance_report_{timestamp}_{hash(timestamp) % 10000:04d}" def create_compliance_reporter( audit_db_path: str, reports_output_dir: str = "compliance_reports", regulation: ComplianceRegulation = ComplianceRegulation.GDPR, ) -> MCPComplianceReporter: """ Factory function to create an MCP compliance reporter instance. Args: audit_db_path: Path to the MCP audit database reports_output_dir: Directory to store generated reports regulation: Default compliance regulation Returns: Configured MCPComplianceReporter instance """ return MCPComplianceReporter( audit_db_path=audit_db_path, reports_output_dir=reports_output_dir, default_regulation=regulation, )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MockLoop/mockloop-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server