Skip to main content
Glama
stdio_handlers.py12 kB
""" STDIO transport verification API handlers. These endpoints are used by the CLI proxy for verifying tool calls and responses. """ import json import time from aiohttp import web from state import state from verification import verify_tool_call, verify_tool_response from utils import safe_print async def handle_verify_request(request): """ Handle verification and logging for all requests from STDIO proxy. POST /verify/request Body: { "message": {...}, // JSON-RPC message "toolName": "...", // Tool name or method name "serverInfo": {...} } Returns: { "blocked": bool, "reason": str, "modified": bool } """ try: data = await request.json() except Exception as e: return web.Response( status=400, text=json.dumps({"error": "Invalid JSON"}), content_type='application/json' ) message = data.get('message') tool_name = data.get('toolName') server_info = data.get('serverInfo', {}) stage = data.get('stage', None) # 구분자 읽기 if not message: return web.Response( status=400, text=json.dumps({"error": "Missing message"}), content_type='application/json' ) try: app_name = server_info.get('appName', 'unknown') server_name = server_info.get('name', 'unknown') method = message.get('method', tool_name) # Determine event type: Proxy only for pre_init stage, MCP otherwise event_type = 'Proxy' if stage == 'pre_init' else 'MCP' # Create event for EventHub (logs all requests) event = { 'ts': int(time.time() * 1000), 'producer': 'local', 'pid': None, 'pname': app_name, 'eventType': event_type, 'mcpTag': server_name, 'data': { 'task': 'SEND', 'message': message, 'mcpTag': server_name } } # Send to EventHub for DB logging if state.event_hub: await state.event_hub.process_event(event) # Only verify tools/call for security if message.get('method') == 'tools/call': tool_args = message.get('params', {}).get('arguments', {}) tool_call_reason = tool_args.get('tool_call_reason', '') tool_args_clean = {k: v for k, v in tool_args.items() if k != 'tool_call_reason'} safe_print(f"[Verify] Tool call: {tool_name} from {app_name}/{server_name}") if tool_call_reason: safe_print(f"[Verify] Tool call reason: {tool_call_reason}") # Verify the tool call (skip logging since we already logged above) verification = await verify_tool_call( tool_name=tool_name, tool_args=tool_args_clean, server_info=server_info, tool_call_reason=tool_call_reason, skip_logging=True ) return web.Response( status=200, text=json.dumps({ "blocked": not verification.allowed, "reason": verification.reason if not verification.allowed else None, "modified": False }), content_type='application/json' ) else: # Other methods: just log, don't block # pre_init 단계면 로그에 표시 if stage == 'pre_init': safe_print(f"[Log] [Pre-Init] Request: {method} from {app_name}/{server_name}") else: safe_print(f"[Log] Request: {method} from {app_name}/{server_name}") return web.Response( status=200, text=json.dumps({ "blocked": False, "reason": None, "modified": False }), content_type='application/json' ) except Exception as e: safe_print(f"[Verify] Error processing request: {e}") import traceback traceback.print_exc() return web.Response( status=500, text=json.dumps({"error": "Processing error", "blocked": False}), content_type='application/json' ) async def handle_verify_response(request): """ Handle verification and logging for all responses from STDIO proxy. POST /verify/response Body: { "message": {...}, // JSON-RPC response message "toolName": "...", // Tool name or method name "serverInfo": {...} } Returns: { "blocked": bool, "reason": str, "modified": bool } """ try: data = await request.json() except Exception as e: return web.Response( status=400, text=json.dumps({"error": "Invalid JSON"}), content_type='application/json' ) message = data.get('message') server_info = data.get('serverInfo', {}) tool_name = data.get('toolName', 'unknown') stage = data.get('stage', None) # 구분자 읽기 skip_analysis = data.get('skip_analysis', False) # 엔진 분석 스킵 플래그 if not message: return web.Response( status=400, text=json.dumps({"error": "Missing message"}), content_type='application/json' ) try: app_name = server_info.get('appName', 'unknown') server_name = server_info.get('name', 'unknown') # Determine event type: Proxy only for pre_init stage, MCP otherwise event_type = 'Proxy' if stage == 'pre_init' else 'MCP' # Create event for EventHub (logs all responses) event = { 'ts': int(time.time() * 1000), 'producer': 'local', 'pid': None, 'pname': app_name, 'eventType': event_type, 'mcpTag': server_name, 'data': { 'task': 'RECV', 'message': message, 'mcpTag': server_name } } # Check if this is a tools/list response - needs synchronous processing is_tools_list = False if message.get('result') and message['result'].get('tools'): is_tools_list = True # Send to EventHub for DB logging and analysis if state.event_hub: if is_tools_list and not skip_analysis: # tools/list: 동기적으로 처리 (엔진 검사 완료까지 대기) stage_label = '[Pre-Init]' if stage == 'pre_init' else '' safe_print(f"[Log] {stage_label} tools/list Response from {app_name}/{server_name} - waiting for engine analysis") await state.event_hub.process_event_sync(event) safe_print(f"[Log] {stage_label} tools/list engine analysis completed for {app_name}/{server_name}") elif is_tools_list and skip_analysis: # tools/list이지만 분석 스킵 (캐시된 응답) safe_print(f"[Log] [Cached] tools/list Response from {app_name}/{server_name} - skipping analysis (already done)") await state.event_hub.process_event(event) else: # 다른 응답: 백그라운드 처리 await state.event_hub.process_event(event) # Only verify tools/call responses for security if tool_name != 'unknown' and tool_name not in ['initialize', 'tools/list', 'notifications/initialized']: safe_print(f"[Verify] Tool response: {tool_name} from {app_name}/{server_name}") # Verify the tool response (skip logging since we already logged above) verification = await verify_tool_response( tool_name=tool_name, response_data=message, server_info=server_info, skip_logging=True ) return web.Response( status=200, text=json.dumps({ "blocked": not verification.allowed, "reason": verification.reason if not verification.allowed else None, "modified": False }), content_type='application/json' ) else: # Other responses: just log, don't block if not is_tools_list: # pre_init 단계면 로그에 표시 if stage == 'pre_init': safe_print(f"[Log] [Pre-Init] Response from {app_name}/{server_name}") else: safe_print(f"[Log] Response from {app_name}/{server_name}") return web.Response( status=200, text=json.dumps({ "blocked": False, "reason": None, "modified": False }), content_type='application/json' ) except Exception as e: safe_print(f"[Verify] Error processing response: {e}") import traceback traceback.print_exc() return web.Response( status=500, text=json.dumps({"error": "Processing error", "blocked": False}), content_type='application/json' ) async def handle_register_tools(request): """ Handle tool registration from CLI proxy. POST /register-tools Body: { "tools": [...], "serverInfo": {...}, "appName": "...", "serverName": "..." } Returns: { "success": bool, "message": str, "stats": {...} } """ try: data = await request.json() except Exception as e: return web.Response( status=400, text=json.dumps({"error": "Invalid JSON"}), content_type='application/json' ) tools = data.get('tools') app_name = data.get('appName') server_name = data.get('serverName') server_info = data.get('serverInfo', {}) if not tools or not isinstance(tools, list): return web.Response( status=400, text=json.dumps({"error": "Invalid tools data"}), content_type='application/json' ) try: safe_print(f"[Register] Registering {len(tools)} tools for {app_name}/{server_name}") # Log tool information for i, tool in enumerate(tools): description = tool.get('description', '(no description)') safe_print(f" {i+1}. {tool.get('name')} - {description}") # Register tools in state await state.register_tools( app_name=app_name, server_name=server_name, tools=tools, server_info=server_info ) # Count tools with/without descriptions with_desc = sum(1 for t in tools if t.get('description')) without_desc = len(tools) - with_desc safe_print(f"[Register] Successfully registered {len(tools)} tools for {app_name}:{server_name}") safe_print(f" {with_desc} with descriptions, {without_desc} without") return web.Response( status=200, text=json.dumps({ "success": True, "message": f"Registered {len(tools)} tools for {app_name}:{server_name}", "stats": { "total": len(tools), "withDescriptions": with_desc, "withoutDescriptions": without_desc } }), content_type='application/json' ) except Exception as e: safe_print(f"[Register] Error registering tools: {e}") return web.Response( status=500, text=json.dumps({"error": "Tool registration error"}), content_type='application/json' )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/seungwon9201/MCP-Dandan'

If you have feedback or need assistance with the MCP directory API, please join our Discord server