Skip to main content
Glama
verification.py9 kB
""" Verification module for security scanning of tool calls and responses. Integrates with EventHub to send events to detection engines. """ from typing import Dict, Any, Optional import json import time import asyncio import uuid from state import state, BlockingRequest from utils import safe_print from websocket_handler import ws_handler class VerificationResult: """Result of a verification check.""" def __init__(self, allowed: bool = True, reason: Optional[str] = None): self.allowed = allowed self.reason = reason async def verify_tool_call( tool_name: str, tool_args: Dict[str, Any], server_info: Dict[str, Any], tool_call_reason: str = "", skip_logging: bool = False, producer: str = "local" ) -> VerificationResult: """ Verify a tool call against security policies. Args: tool_name: Name of the tool being called tool_args: Arguments passed to the tool server_info: Information about the MCP server tool_call_reason: User's explanation for the tool call skip_logging: If True, skip EventHub logging (used when already logged) Returns: VerificationResult indicating if the call is allowed """ app_name = server_info.get('appName', 'unknown') server_name = server_info.get('name', 'unknown') safe_print(f"[Verification] Checking tool call: {tool_name}") safe_print(f"[Verification] Server: {app_name}/{server_name}") if tool_call_reason: safe_print(f"[Verification] Tool call reason: {tool_call_reason}") # Send event to EventHub for engine analysis (unless already logged) if not skip_logging and state.event_hub: event = { 'ts': int(time.time() * 1000), 'producer': producer, 'pid': None, 'pname': app_name, 'eventType': 'MCP', 'mcpTag': server_name, 'data': { 'task': 'SEND', 'message': { 'jsonrpc': '2.0', 'method': 'tools/call', 'params': { 'name': tool_name, 'arguments': {**tool_args, 'tool_call_reason': tool_call_reason} if tool_call_reason else tool_args } }, 'mcpTag': server_name } } # Process event asynchronously (don't wait) try: await state.event_hub.process_event(event) except Exception as e: safe_print(f"[Verification] Error sending event to EventHub: {e}") # Run real-time engine analysis for blocking decision if state.event_hub: try: # Create event for analysis analysis_event = { 'ts': int(time.time() * 1000), 'producer': producer, 'pid': None, 'pname': app_name, 'eventType': 'MCP', 'mcpTag': server_name, 'data': { 'task': 'SEND', 'message': { 'jsonrpc': '2.0', 'method': 'tools/call', 'params': { 'name': tool_name, 'arguments': {**tool_args, 'tool_call_reason': tool_call_reason} if tool_call_reason else tool_args } }, 'mcpTag': server_name } } # Run engines synchronously and check for high severity detection_results = await _run_realtime_analysis(analysis_event) if detection_results: # Found high severity detection - ask user high_severity_results = [r for r in detection_results if r.get('severity') == 'high'] if high_severity_results: # Create blocking request and wait for user decision request_id = str(uuid.uuid4()) future = asyncio.get_event_loop().create_future() blocking_request = BlockingRequest( request_id=request_id, event_data=analysis_event, detection_results=high_severity_results, engine_name=high_severity_results[0].get('detector', 'Unknown'), severity='high', server_name=server_name, tool_name=tool_name, future=future ) await state.add_blocking_request(blocking_request) # Broadcast to frontend await ws_handler.broadcast_blocking_request( request_id=request_id, event_data=analysis_event, detection_results=high_severity_results, engine_name=blocking_request.engine_name, severity='high', server_name=server_name, tool_name=tool_name ) # Wait for user decision (with timeout) try: allowed = await asyncio.wait_for(future, timeout=60.0) if not allowed: return VerificationResult( allowed=False, reason=f"Blocked by user: {blocking_request.engine_name} detected high severity threat" ) except asyncio.TimeoutError: await state.remove_blocking_request(request_id) return VerificationResult( allowed=False, reason="Blocked: User decision timeout (60s)" ) except Exception as e: safe_print(f"[Verification] Error in real-time analysis: {e}") return VerificationResult(allowed=True) async def _run_realtime_analysis(event: Dict[str, Any]) -> list: """ Run real-time engine analysis for blocking decision. Args: event: Event to analyze Returns: List of detection results with high severity """ if not state.event_hub: return [] results = [] # Run CommandInjection and FileSystemExposure engines (not ToolsPoisoning) for engine in state.event_hub.engines: if engine.name == 'ToolsPoisoningEngine': continue # Skip - this is for tool descriptions only if not engine.should_process(event): continue try: result = await engine.handle_event(event) if result: result_data = result.get('result', {}) severity = result_data.get('severity', 'none') if severity in ['high', 'medium']: results.append(result_data) except Exception as e: safe_print(f"[Verification] Engine {engine.name} error: {e}") return results async def verify_tool_response( tool_name: str, response_data: Dict[str, Any], server_info: Dict[str, Any], skip_logging: bool = False ) -> VerificationResult: """ Verify a tool response against security policies. Args: tool_name: Name of the tool that was called response_data: Response data from the tool server_info: Information about the MCP server skip_logging: If True, skip EventHub logging (used when already logged) Returns: VerificationResult indicating if the response is allowed """ app_name = server_info.get('appName', 'unknown') server_name = server_info.get('name', 'unknown') safe_print(f"[Verification] Checking tool response: {tool_name}") # Send event to EventHub for engine analysis (unless already logged) if not skip_logging and state.event_hub: event = { 'ts': int(time.time() * 1000), 'producer': 'local', # STDIO 'pid': None, 'pname': app_name, 'eventType': 'MCP', 'mcpTag': server_name, 'data': { 'task': 'RECV', 'message': response_data, 'mcpTag': server_name } } # Process event asynchronously try: await state.event_hub.process_event(event) except Exception as e: safe_print(f"[Verification] Error sending event to EventHub: {e}") # Basic warning checks response_str = json.dumps(response_data).lower() sensitive_patterns = ["password", "api_key", "secret", "token", "credential"] for pattern in sensitive_patterns: if pattern in response_str: safe_print(f"[Verification] Warning: Response may contain sensitive data: {pattern}") return VerificationResult(allowed=True)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/seungwon9201/MCP-Dandan'

If you have feedback or need assistance with the MCP directory API, please join our Discord server