Skip to main content
Glama

MockLoop MCP Server

Official
by MockLoop
audit.pyโ€ข10.9 kB
""" SchemaPin Audit Logging Module Provides audit logging capabilities for SchemaPin verification events. """ import json import logging import sqlite3 import uuid from datetime import datetime, UTC from typing import Any from .config import VerificationResult logger = logging.getLogger(__name__) class SchemaPinAuditLogger: """SchemaPin-specific audit logging for MockLoop integration.""" def __init__(self, db_path: str = "mcp_audit.db"): """Initialize audit logger with database path.""" self.db_path = db_path self._ensure_tables_exist() def _ensure_tables_exist(self) -> None: """Ensure SchemaPin audit tables exist.""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # Check if schemapin_verification_logs table exists cursor.execute(""" SELECT name FROM sqlite_master WHERE type='table' AND name='schemapin_verification_logs' """) if not cursor.fetchone(): # Create the table if it doesn't exist cursor.execute(""" CREATE TABLE IF NOT EXISTS schemapin_verification_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, entry_id TEXT NOT NULL, tool_id TEXT NOT NULL, domain TEXT, verification_result TEXT NOT NULL, signature_valid BOOLEAN, key_pinned BOOLEAN, policy_action TEXT, error_details TEXT, execution_time_ms REAL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_schemapin_verification_entry ON schemapin_verification_logs(entry_id) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_schemapin_verification_tool ON schemapin_verification_logs(tool_id) """) conn.commit() except Exception: logger.exception("Failed to ensure SchemaPin audit tables exist") async def log_verification_attempt(self, tool_id: str, domain: str | None, result: VerificationResult, execution_time_ms: float) -> None: """ Log SchemaPin verification attempts. Args: tool_id: Tool identifier domain: Domain being verified result: Verification result execution_time_ms: Execution time in milliseconds """ try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO schemapin_verification_logs (entry_id, tool_id, domain, verification_result, signature_valid, key_pinned, policy_action, execution_time_ms) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( str(uuid.uuid4()), tool_id, domain, "success" if result.valid else "failure", result.valid, result.key_pinned, "allow" if result.valid else "block", execution_time_ms )) conn.commit() except Exception: logger.exception("Failed to log verification attempt") async def log_verification_error(self, tool_id: str, domain: str | None, error: str) -> None: """ Log SchemaPin verification errors. Args: tool_id: Tool identifier domain: Domain being verified error: Error message """ try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO schemapin_verification_logs (entry_id, tool_id, domain, verification_result, signature_valid, key_pinned, policy_action, error_details) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( str(uuid.uuid4()), tool_id, domain, "error", False, False, "error", error )) conn.commit() except Exception: logger.exception("Failed to log verification error") async def log_key_pinning_event(self, tool_id: str, domain: str, public_key: str, action: str) -> None: """ Log key pinning events (pin, update, revoke). Args: tool_id: Tool identifier domain: Domain the key belongs to public_key: Public key being pinned action: Action being performed (pin, update, revoke) """ try: # For now, log as a verification event with special metadata # In a full implementation, this might have its own table with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO schemapin_verification_logs (entry_id, tool_id, domain, verification_result, signature_valid, key_pinned, policy_action, error_details) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( str(uuid.uuid4()), tool_id, domain, f"key_{action}", True, action == "pin", action, json.dumps({ "action": action, "public_key_hash": hash(public_key), # Don't store full key "timestamp": datetime.now(UTC).isoformat() }) )) conn.commit() except Exception: logger.exception("Failed to log key pinning event") async def log_policy_decision(self, tool_id: str, policy_action: str, reason: str, policy_mode: str) -> None: """ Log policy enforcement decisions. Args: tool_id: Tool identifier policy_action: Action taken by policy reason: Reason for the decision policy_mode: Policy mode in effect """ try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO schemapin_verification_logs (entry_id, tool_id, verification_result, policy_action, error_details) VALUES (?, ?, ?, ?, ?) """, ( str(uuid.uuid4()), tool_id, f"policy_{policy_action}", policy_action, json.dumps({ "reason": reason, "policy_mode": policy_mode, "timestamp": datetime.now(UTC).isoformat() }) )) conn.commit() except Exception: logger.exception("Failed to log policy decision") def get_verification_stats(self, start_date: str | None = None, end_date: str | None = None) -> dict[str, Any]: """ Get verification statistics for a date range. Args: start_date: Start date in ISO format end_date: End date in ISO format Returns: Dictionary with verification statistics """ try: with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row cursor = conn.cursor() # Use parameterized queries to avoid SQL injection if start_date and end_date: base_query = """ SELECT COUNT(*) as total_verifications, SUM(CASE WHEN signature_valid = 1 THEN 1 ELSE 0 END) as successful_verifications, SUM(CASE WHEN signature_valid = 0 THEN 1 ELSE 0 END) as failed_verifications, COUNT(DISTINCT tool_id) as unique_tools, COUNT(DISTINCT domain) as unique_domains FROM schemapin_verification_logs WHERE created_at BETWEEN ? AND ? """ policy_query = """ SELECT policy_action, COUNT(*) as count FROM schemapin_verification_logs WHERE created_at BETWEEN ? AND ? GROUP BY policy_action """ params = [start_date, end_date] else: base_query = """ SELECT COUNT(*) as total_verifications, SUM(CASE WHEN signature_valid = 1 THEN 1 ELSE 0 END) as successful_verifications, SUM(CASE WHEN signature_valid = 0 THEN 1 ELSE 0 END) as failed_verifications, COUNT(DISTINCT tool_id) as unique_tools, COUNT(DISTINCT domain) as unique_domains FROM schemapin_verification_logs """ policy_query = """ SELECT policy_action, COUNT(*) as count FROM schemapin_verification_logs GROUP BY policy_action """ params = [] cursor.execute(base_query, params) stats = dict(cursor.fetchone()) # Get policy action breakdown cursor.execute(policy_query, params) policy_stats = {row["policy_action"]: row["count"] for row in cursor.fetchall()} stats["policy_breakdown"] = policy_stats return stats except Exception: logger.exception("Failed to get verification stats") return {}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MockLoop/mockloop-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server