Skip to main content
Glama

MockLoop MCP Server

Official
by MockLoop
mcp_audit_logger.py36.1 kB
""" MCP Audit Logger Module Provides comprehensive audit logging capabilities for MCP (Model Context Protocol) operations. Tracks tool executions, data access, compliance requirements, and performance metrics. """ import json import sqlite3 import time import uuid from datetime import datetime, timezone from pathlib import Path from typing import Any, Union import hashlib import logging from enum import Enum logger = logging.getLogger(__name__) class MCPOperationType(Enum): """Enumeration of MCP operation types for audit logging.""" TOOL_EXECUTION = "tool_execution" RESOURCE_ACCESS = "resource_access" CONTEXT_OPERATION = "context_operation" PROMPT_EXECUTION = "prompt_execution" SERVER_OPERATION = "server_operation" class MCPAuditLogger: """ Comprehensive audit logger for MCP operations. Provides detailed logging of tool executions, data access patterns, compliance tracking, and performance monitoring for MCP servers. """ def __init__( self, db_path: str = "mcp_audit.db", session_id: str | None = None, user_id: str | None = None, enable_performance_tracking: bool = True, enable_content_hashing: bool = True, auto_log_session: bool = False, ): """ Initialize the MCP audit logger. Args: db_path: Path to the SQLite database file session_id: Unique session identifier user_id: User identifier for the session enable_performance_tracking: Enable performance metrics collection enable_content_hashing: Enable content hashing for integrity verification auto_log_session: Whether to automatically log session start/end """ self.db_path = Path(db_path) self.session_id = session_id or str(uuid.uuid4()) self.user_id = user_id or "anonymous" self.enable_performance_tracking = enable_performance_tracking self.enable_content_hashing = enable_content_hashing self.auto_log_session = auto_log_session # Initialize database self._init_database() # Log session start if enabled if self.auto_log_session: self._log_session_start() def _init_database(self) -> None: """Initialize the audit database with required tables and handle schema migrations.""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # Create mcp_audit_logs table (main audit table) cursor.execute(""" CREATE TABLE IF NOT EXISTS mcp_audit_logs ( entry_id TEXT PRIMARY KEY, session_id TEXT NOT NULL, user_id TEXT, timestamp TEXT NOT NULL, operation_type TEXT NOT NULL, operation_name TEXT NOT NULL, input_parameters TEXT, output_data TEXT, execution_time_ms REAL, data_sources TEXT, compliance_tags TEXT, processing_purpose TEXT, legal_basis TEXT, content_hash TEXT, gdpr_applicable BOOLEAN DEFAULT FALSE, ccpa_applicable BOOLEAN DEFAULT FALSE, data_subject_id TEXT, context_state_before TEXT, context_state_after TEXT, expires_at TEXT, retention_policy TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """) # Create mcp_data_lineage table cursor.execute(""" CREATE TABLE IF NOT EXISTS mcp_data_lineage ( id INTEGER PRIMARY KEY AUTOINCREMENT, entry_id TEXT NOT NULL, source_uri TEXT NOT NULL, source_type TEXT, source_identifier TEXT, source_metadata TEXT, transformation_applied TEXT, destination_uri TEXT, transformation_type TEXT, data_flow_direction TEXT, timestamp TEXT NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (entry_id) REFERENCES mcp_audit_logs (entry_id) ) """) # Create mcp_compliance_events table cursor.execute(""" CREATE TABLE IF NOT EXISTS mcp_compliance_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, session_id TEXT NOT NULL, event_type TEXT NOT NULL, compliance_framework TEXT NOT NULL, event_details TEXT, risk_level TEXT, mitigation_actions TEXT, timestamp TEXT NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """) # Check table schemas and handle migrations self._handle_schema_migrations(cursor) # Create indexes for better performance cursor.execute( "CREATE INDEX IF NOT EXISTS idx_mcp_audit_logs_session ON mcp_audit_logs(session_id)" ) cursor.execute( "CREATE INDEX IF NOT EXISTS idx_mcp_audit_logs_operation ON mcp_audit_logs(operation_type)" ) cursor.execute( "CREATE INDEX IF NOT EXISTS idx_mcp_audit_logs_timestamp ON mcp_audit_logs(timestamp)" ) cursor.execute( "CREATE INDEX IF NOT EXISTS idx_mcp_data_lineage_entry ON mcp_data_lineage(entry_id)" ) # Check if session_id column exists in mcp_compliance_events before creating index cursor.execute("PRAGMA table_info(mcp_compliance_events)") compliance_columns = [column[1] for column in cursor.fetchall()] if "session_id" in compliance_columns: cursor.execute( "CREATE INDEX IF NOT EXISTS idx_mcp_compliance_events_session ON mcp_compliance_events(session_id)" ) conn.commit() except Exception: logger.exception("Failed to initialize audit database") raise def _handle_schema_migrations(self, cursor) -> None: """Handle database schema migrations for backward compatibility.""" try: # Check mcp_data_lineage table schema cursor.execute("PRAGMA table_info(mcp_data_lineage)") lineage_columns = [column[1] for column in cursor.fetchall()] # If source_uri column doesn't exist, recreate the table if "source_uri" not in lineage_columns: logger.info("Migrating mcp_data_lineage table schema") # Backup existing data if any cursor.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name='mcp_data_lineage'" ) if cursor.fetchone(): cursor.execute( "ALTER TABLE mcp_data_lineage RENAME TO mcp_data_lineage_backup" ) # Create new table with correct schema cursor.execute(""" CREATE TABLE mcp_data_lineage ( id INTEGER PRIMARY KEY AUTOINCREMENT, entry_id TEXT NOT NULL, source_uri TEXT NOT NULL, source_type TEXT, source_identifier TEXT, source_metadata TEXT, transformation_applied TEXT, destination_uri TEXT, transformation_type TEXT, data_flow_direction TEXT, timestamp TEXT NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (entry_id) REFERENCES mcp_audit_logs (entry_id) ) """) # Try to migrate data from backup if it exists and has compatible columns try: cursor.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name='mcp_data_lineage_backup'" ) if cursor.fetchone(): cursor.execute("PRAGMA table_info(mcp_data_lineage_backup)") backup_columns = [column[1] for column in cursor.fetchall()] # Find common columns for migration common_columns = set(backup_columns) & set(lineage_columns) if common_columns and "entry_id" in common_columns: # Validate column names to prevent SQL injection valid_columns = { "id", "entry_id", "source_uri", "source_type", "source_identifier", "source_metadata", "transformation_applied", "destination_uri", "transformation_type", "data_flow_direction", "timestamp", "created_at", } safe_columns = [ col for col in common_columns if col in valid_columns ] if safe_columns: columns_str = ", ".join(safe_columns) # Safe SQL construction with validated column names from database schema sql_query = f""" INSERT INTO mcp_data_lineage ({columns_str}) SELECT {columns_str} FROM mcp_data_lineage_backup """ # noqa: S608 cursor.execute(sql_query) # Drop backup table cursor.execute("DROP TABLE mcp_data_lineage_backup") except Exception as e: logger.warning(f"Could not migrate data from backup table: {e}") # Continue without migration - better to have working schema except Exception as e: logger.warning(f"Schema migration failed: {e}") # Continue - the CREATE TABLE IF NOT EXISTS should handle basic cases def _log_session_start(self) -> None: """Log the start of a new audit session.""" try: # Create a session start entry in the audit logs self.log_tool_execution( tool_name="session_start", input_parameters={ "session_id": self.session_id, "user_id": self.user_id, }, processing_purpose="session_management", legal_basis="legitimate_interests", ) except Exception: logger.exception("Failed to log session start") def log_tool_execution( self, tool_name: str, input_parameters: dict[str, Any] | None = None, execution_result: dict[str, Any] | None = None, execution_time_ms: float | None = None, data_sources: list[str] | None = None, compliance_tags: list[str] | None = None, processing_purpose: str | None = None, legal_basis: str | None = None, user_id: str | None = None, gdpr_applicable: bool = False, ccpa_applicable: bool = False, data_subject_id: str | None = None, retention_policy: str | None = None, ) -> str: """ Log a tool execution event. Args: tool_name: Name of the MCP tool being executed input_parameters: Input parameters passed to the tool execution_result: Result returned by the tool execution_time_ms: Execution time in milliseconds data_sources: List of data sources accessed compliance_tags: Compliance-related tags processing_purpose: Purpose of data processing legal_basis: Legal basis for data processing error_details: Error details if execution failed user_id: Override user ID for this operation gdpr_applicable: Whether GDPR applies to this operation ccpa_applicable: Whether CCPA applies to this operation data_subject_id: ID of the data subject if applicable Returns: Unique entry ID for the logged event """ entry_id = str(uuid.uuid4()) timestamp = datetime.now(timezone.utc).isoformat() # noqa: UP017 # Generate content hash if enabled content_hash = None if self.enable_content_hashing: content_data = { "tool_name": tool_name, "input_parameters": input_parameters, "execution_result": execution_result, } content_hash = hashlib.sha256( json.dumps(content_data, sort_keys=True).encode() ).hexdigest() try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # Insert audit log entry cursor.execute( """ INSERT INTO mcp_audit_logs ( entry_id, session_id, user_id, timestamp, operation_type, operation_name, input_parameters, output_data, execution_time_ms, data_sources, compliance_tags, processing_purpose, legal_basis, content_hash, gdpr_applicable, ccpa_applicable, data_subject_id, retention_policy ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( entry_id, self.session_id, user_id or self.user_id, timestamp, MCPOperationType.TOOL_EXECUTION.value, tool_name, json.dumps(input_parameters) if input_parameters else None, json.dumps(execution_result) if execution_result else None, execution_time_ms, json.dumps(data_sources) if data_sources else None, json.dumps(compliance_tags) if compliance_tags else None, processing_purpose, legal_basis, content_hash, gdpr_applicable, ccpa_applicable, data_subject_id, retention_policy, ), ) # Log data lineage if data sources are provided if data_sources: for source in data_sources: cursor.execute( """ INSERT INTO mcp_data_lineage ( entry_id, source_uri, source_type, source_identifier, source_metadata, transformation_applied, transformation_type, data_flow_direction, timestamp ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( entry_id, source, "external_api", source, json.dumps({"tool_name": tool_name}), "tool_execution", "tool_execution", "input", timestamp, ), ) conn.commit() except Exception: logger.exception("Failed to log tool execution") raise return entry_id def log_resource_access( self, resource_uri: str, access_type: str, metadata: dict[str, Any] | None = None, content_preview: str | None = None, data_sources: list[str] | None = None, compliance_tags: list[str] | None = None, processing_purpose: str | None = None, legal_basis: str | None = None, user_id: str | None = None, gdpr_applicable: bool = False, ccpa_applicable: bool = False, data_subject_id: str | None = None, retention_policy: str | None = None, execution_time_ms: float | None = None, ) -> str: """ Log a resource access event. Args: resource_uri: URI of the resource being accessed access_type: Type of access (read, write, delete, etc.) metadata: Resource metadata content_preview: Preview of resource content data_sources: List of data sources accessed compliance_tags: Compliance-related tags processing_purpose: Purpose of data processing legal_basis: Legal basis for data processing user_id: Override user ID for this operation gdpr_applicable: Whether GDPR applies to this operation ccpa_applicable: Whether CCPA applies to this operation data_subject_id: ID of the data subject if applicable Returns: Unique entry ID for the logged event """ entry_id = str(uuid.uuid4()) timestamp = datetime.now(timezone.utc).isoformat() # noqa: UP017 # Generate content hash if enabled content_hash = None if self.enable_content_hashing and content_preview: content_hash = hashlib.sha256(content_preview.encode()).hexdigest() try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # Insert audit log entry cursor.execute( """ INSERT INTO mcp_audit_logs ( entry_id, session_id, user_id, timestamp, operation_type, operation_name, input_parameters, output_data, data_sources, compliance_tags, processing_purpose, legal_basis, content_hash, gdpr_applicable, ccpa_applicable, data_subject_id, retention_policy ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( entry_id, self.session_id, user_id or self.user_id, timestamp, MCPOperationType.RESOURCE_ACCESS.value, f"resource_access_{access_type}", json.dumps( { "uri": resource_uri, "access_type": access_type, "execution_time_ms": execution_time_ms, } ), json.dumps( {"metadata": metadata, "content_preview": content_preview} ), json.dumps(data_sources) if data_sources else None, json.dumps(compliance_tags) if compliance_tags else None, processing_purpose, legal_basis, content_hash, gdpr_applicable, ccpa_applicable, data_subject_id, retention_policy, ), ) # Log data lineage cursor.execute( """ INSERT INTO mcp_data_lineage ( entry_id, source_uri, source_type, source_identifier, source_metadata, transformation_applied, transformation_type, data_flow_direction, timestamp ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( entry_id, resource_uri, "resource", resource_uri, json.dumps(metadata) if metadata else None, "resource_access", "resource_access", access_type, timestamp, ), ) conn.commit() except Exception: logger.exception("Failed to log resource access") raise return entry_id def log_context_operation( self, operation_type: str, context_key: str, state_before: dict[str, Any] | None = None, state_after: dict[str, Any] | None = None, compliance_tags: list[str] | None = None, processing_purpose: str | None = None, legal_basis: str | None = None, user_id: str | None = None, ) -> str: """ Log a context operation event. Args: operation_type: Type of context operation (set, get, update, delete) context_key: Key of the context being operated on state_before: Context state before the operation state_after: Context state after the operation compliance_tags: Compliance-related tags processing_purpose: Purpose of data processing legal_basis: Legal basis for data processing user_id: Override user ID for this operation Returns: Unique entry ID for the logged event """ entry_id = str(uuid.uuid4()) timestamp = datetime.now(timezone.utc).isoformat() # noqa: UP017 try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # Insert audit log entry cursor.execute( """ INSERT INTO mcp_audit_logs ( entry_id, session_id, user_id, timestamp, operation_type, operation_name, input_parameters, context_state_before, context_state_after, compliance_tags, processing_purpose, legal_basis ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( entry_id, self.session_id, user_id or self.user_id, timestamp, MCPOperationType.CONTEXT_OPERATION.value, f"context_{operation_type}", json.dumps( {"context_key": context_key, "operation": operation_type} ), json.dumps(state_before) if state_before else None, json.dumps(state_after) if state_after else None, json.dumps(compliance_tags) if compliance_tags else None, processing_purpose, legal_basis, ), ) conn.commit() except Exception: logger.exception("Failed to log context operation") raise return entry_id def log_prompt_invocation( self, prompt_name: str, input_parameters: dict[str, Any] | None = None, execution_result: dict[str, Any] | None = None, execution_time_ms: float | None = None, data_sources: list[str] | None = None, compliance_tags: list[str] | None = None, processing_purpose: str | None = None, legal_basis: str | None = None, user_id: str | None = None, gdpr_applicable: bool = False, ccpa_applicable: bool = False, data_subject_id: str | None = None, retention_policy: str | None = None, generated_output: Any = None, ) -> str: """ Log a prompt invocation event. Args: prompt_name: Name of the prompt being invoked input_parameters: Input parameters passed to the prompt execution_result: Result returned by the prompt execution_time_ms: Execution time in milliseconds data_sources: List of data sources accessed compliance_tags: Compliance-related tags processing_purpose: Purpose of data processing legal_basis: Legal basis for data processing user_id: Override user ID for this operation gdpr_applicable: Whether GDPR applies to this operation ccpa_applicable: Whether CCPA applies to this operation data_subject_id: ID of the data subject if applicable retention_policy: Data retention policy generated_output: Output generated by the prompt (alternative to execution_result) Returns: Unique entry ID for the logged event """ entry_id = str(uuid.uuid4()) timestamp = datetime.now(timezone.utc).isoformat() # noqa: UP017 # Use generated_output if provided, otherwise use execution_result output_data = ( generated_output if generated_output is not None else execution_result ) # Generate content hash if enabled content_hash = None if self.enable_content_hashing: content_data = { "prompt_name": prompt_name, "input_parameters": input_parameters, "output_data": output_data, } content_hash = hashlib.sha256( json.dumps(content_data, sort_keys=True).encode() ).hexdigest() try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # Insert audit log entry cursor.execute( """ INSERT INTO mcp_audit_logs ( entry_id, session_id, user_id, timestamp, operation_type, operation_name, input_parameters, output_data, execution_time_ms, data_sources, compliance_tags, processing_purpose, legal_basis, content_hash, gdpr_applicable, ccpa_applicable, data_subject_id, retention_policy ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( entry_id, self.session_id, user_id or self.user_id, timestamp, MCPOperationType.PROMPT_EXECUTION.value, prompt_name, json.dumps(input_parameters) if input_parameters else None, json.dumps(output_data) if output_data else None, execution_time_ms, json.dumps(data_sources) if data_sources else None, json.dumps(compliance_tags) if compliance_tags else None, processing_purpose, legal_basis, content_hash, gdpr_applicable, ccpa_applicable, data_subject_id, retention_policy, ), ) # Log data lineage if data sources are provided if data_sources: for source in data_sources: cursor.execute( """ INSERT INTO mcp_data_lineage ( entry_id, source_uri, source_type, source_identifier, source_metadata, transformation_applied, transformation_type, data_flow_direction, timestamp ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( entry_id, source, "prompt_source", source, json.dumps({"prompt_name": prompt_name}), "prompt_execution", "prompt_execution", "input", timestamp, ), ) conn.commit() except Exception: logger.exception("Failed to log prompt invocation") raise return entry_id def query_audit_logs( self, operation_type: str | None = None, user_id: str | None = None, start_time: str | None = None, end_time: str | None = None, limit: int = 100, offset: int = 0, ) -> list[dict[str, Any]]: """ Query audit logs with optional filters. Args: operation_type: Filter by operation type user_id: Filter by user ID start_time: Filter by start time (ISO format) end_time: Filter by end time (ISO format) limit: Maximum number of results offset: Number of results to skip Returns: List of audit log entries """ try: with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row cursor = conn.cursor() # Build query with filters query = "SELECT * FROM mcp_audit_logs WHERE 1=1" params = [] if operation_type: query += " AND operation_type = ?" params.append(operation_type) if user_id: query += " AND user_id = ?" params.append(user_id) if start_time: query += " AND timestamp >= ?" params.append(start_time) if end_time: query += " AND timestamp <= ?" params.append(end_time) query += " ORDER BY timestamp DESC LIMIT ? OFFSET ?" params.extend([limit, offset]) cursor.execute(query, params) rows = cursor.fetchall() # Convert to list of dictionaries return [dict(row) for row in rows] except Exception: logger.exception("Failed to query audit logs") return [] def cleanup_expired_logs(self) -> int: """ Clean up expired audit logs. Returns: Number of deleted log entries """ try: current_time = datetime.now(timezone.utc).isoformat() # noqa: UP017 with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # Delete expired logs cursor.execute( """ DELETE FROM mcp_audit_logs WHERE expires_at IS NOT NULL AND expires_at < ? """, (current_time,), ) deleted_count = cursor.rowcount conn.commit() return deleted_count except Exception: logger.exception("Failed to cleanup expired logs") return 0 def get_session_summary(self) -> dict[str, Any]: """Get a summary of the current audit session.""" try: with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row cursor = conn.cursor() # Get session statistics cursor.execute( """ SELECT COUNT(*) as total_operations, COUNT(CASE WHEN operation_type = ? THEN 1 END) as tool_executions, COUNT(CASE WHEN operation_type = ? THEN 1 END) as resource_accesses, COUNT(CASE WHEN operation_type = ? THEN 1 END) as context_operations, MIN(timestamp) as session_start, MAX(timestamp) as last_activity FROM mcp_audit_logs WHERE session_id = ? """, ( MCPOperationType.TOOL_EXECUTION.value, MCPOperationType.RESOURCE_ACCESS.value, MCPOperationType.CONTEXT_OPERATION.value, self.session_id, ), ) stats = cursor.fetchone() return { "session_id": self.session_id, "user_id": self.user_id, "total_operations": stats["total_operations"], "tool_executions": stats["tool_executions"], "resource_accesses": stats["resource_accesses"], "context_operations": stats["context_operations"], "session_start": stats["session_start"], "last_activity": stats["last_activity"], } except Exception as e: logger.exception("Failed to get session summary") return {"error": str(e)} def close_session(self) -> None: """Close the current audit session.""" try: # Log session end self.log_tool_execution( tool_name="session_end", input_parameters={"session_id": self.session_id}, processing_purpose="session_management", legal_basis="legitimate_interests", ) except Exception: logger.exception("Failed to close session") def create_audit_logger( db_path: str = "mcp_audit.db", session_id: str | None = None, user_id: str | None = None, enable_performance_tracking: bool = True, enable_content_hashing: bool = True, ) -> MCPAuditLogger: """ Create and configure an MCP audit logger instance. Args: db_path: Path to the SQLite database file session_id: Unique session identifier user_id: User identifier for the session enable_performance_tracking: Enable performance metrics collection enable_content_hashing: Enable content hashing for integrity verification Returns: Configured MCPAuditLogger instance """ return MCPAuditLogger( db_path=db_path, session_id=session_id, user_id=user_id, enable_performance_tracking=enable_performance_tracking, enable_content_hashing=enable_content_hashing, )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MockLoop/mockloop-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server