Skip to main content
Glama
event_hub.py17.6 kB
""" EventHub - Central event processing hub for 82ch Processes events from Observer and routes them to detection engines. No ZeroMQ - direct in-process communication. """ import asyncio from typing import List, Optional, Dict, Any from datetime import datetime from utils import safe_print import sys from pathlib import Path # 현재 실행 중인 파이썬 인터프리터 (mac / linux / windows 모두 공통) PYTHON_CMD = sys.executable # event_hub.py 기준으로 transports/config_finder.py 절대 경로 BASE_DIR = Path(__file__).resolve().parent CONFIG_FINDER_PATH = BASE_DIR / "transports" / "config_finder.py" class EventHub: """ Central event processing hub. Receives events from Observer, stores them in database, and routes them to detection engines for analysis. """ def __init__(self, engines: List, db, ws_handler=None): self.engines = engines self.db = db self.ws_handler = ws_handler # WebSocket handler for real-time updates self.running = False self.background_tasks = set() # 백그라운드 태스크 추적 async def start(self): """Start the EventHub.""" self.running = True safe_print('[EventHub] Started') async def stop(self): """Stop the EventHub.""" self.running = False # 모든 백그라운드 태스크 취소 if self.background_tasks: safe_print(f'[EventHub] Cancelling {len(self.background_tasks)} background tasks...') for task in self.background_tasks: if not task.done(): task.cancel() # 태스크가 완전히 취소될 때까지 대기 try: await asyncio.gather(*self.background_tasks, return_exceptions=True) safe_print('[EventHub] All background tasks cancelled') except Exception as e: safe_print(f'[EventHub] Error cancelling tasks: {e}') self.background_tasks.clear() # Restore Claude & Cursor config on shutdown import subprocess try: # sys.executable + 절대 경로로 플랫폼 독립적으로 실행 subprocess.run( [PYTHON_CMD, str(CONFIG_FINDER_PATH), '--restore', '--app', 'all'], capture_output=True, text=True, timeout=10, ) safe_print('[EventHub] Claude & Cursor config restored') except Exception as e: safe_print(f'[EventHub] Failed to restore config: {e}') safe_print('[EventHub] Stopped') async def process_event(self, event: Dict[str, Any]) -> None: """ Process a single event with optimized pipeline. 1. Save event to database immediately (fast path) 2. Launch engine analysis in background (non-blocking) Args: event: Event dictionary with eventType, producer, data, etc. """ if not self.running: return try: # Step 1: 즉시 DB 저장 (빠른 응답) await self._save_event(event) # Step 2: 백그라운드에서 엔진 분석 실행 asyncio.create_task(self._analyze_event_async(event)) except Exception as e: safe_print(f'[EventHub] Error processing event: {e}') async def process_event_sync(self, event: Dict[str, Any]) -> None: """ 이벤트를 동기적으로 처리 (tools/list 검사 시 사용). Args: event: Event dictionary with eventType, producer, data, etc. """ if not self.running: return try: # Step 1: 즉시 DB 저장 await self._save_event(event) # Step 2: 엔진 분석을 동기적으로 수행 (기다림) await self._analyze_event_async(event, sync_mode=True) except Exception as e: safe_print(f'[EventHub] Error processing event synchronously: {e}') async def _analyze_event_async(self, event: Dict[str, Any], sync_mode: bool = False) -> None: """ 백그라운드에서 엔진 분석 수행 및 결과 일괄 저장. Args: event: 분석할 이벤트 sync_mode: True이면 ToolsPoisoningEngine도 동기적으로 실행 (기다림) """ try: # ToolsPoisoningEngine과 다른 엔진 분리 tools_poisoning_engine = None other_engines = [] for engine in self.engines: if not engine.should_process(event): continue if engine.name == 'ToolsPoisoningEngine': tools_poisoning_engine = engine else: other_engines.append(engine) # 일반 엔진들은 즉시 실행 (빠른 엔진) if other_engines: tasks = [self._process_with_engine(engine, event) for engine in other_engines] results = await asyncio.gather(*tasks, return_exceptions=True) # 결과 수집 all_results = [] for result in results: if result and not isinstance(result, Exception): if isinstance(result, list): all_results.extend(result) else: all_results.append(result) # 결과 일괄 저장 if all_results: await self._save_results_batch(all_results) # ToolsPoisoningEngine 처리 if tools_poisoning_engine: if sync_mode: # 동기 모드: ToolsPoisoningEngine 완료까지 대기 await self._run_tools_poisoning_analysis(tools_poisoning_engine, event) else: # 비동기 모드: 백그라운드 태스크로 실행 task = asyncio.create_task(self._run_tools_poisoning_analysis(tools_poisoning_engine, event)) self.background_tasks.add(task) # 태스크 완료 시 자동으로 제거 task.add_done_callback(self.background_tasks.discard) except Exception as e: safe_print(f'[EventHub] Error in async analysis: {e}') import traceback traceback.print_exc() async def _run_tools_poisoning_analysis(self, engine, event: Dict[str, Any]) -> None: """ ToolsPoisoningEngine 분석을 완전히 독립적으로 실행. 이 함수는 다른 처리를 블로킹하지 않음. Args: engine: ToolsPoisoningEngine 인스턴스 event: 분석할 이벤트 """ try: safe_print(f'[EventHub] _run_tools_poisoning_analysis STARTED') result = await self._process_with_engine(engine, event) safe_print( f'[EventHub] _run_tools_poisoning_analysis COMPLETED ' f'(result: {len(result) if isinstance(result, list) else "None" if result is None else "1"})' ) if result: # 결과 저장 results_list = result if isinstance(result, list) else [result] await self._save_results_batch(results_list) safe_print(f'[EventHub] _run_tools_poisoning_analysis SAVED {len(results_list)} results') except Exception as e: safe_print(f'[EventHub] Error in ToolsPoisoningEngine analysis: {e}') import traceback traceback.print_exc() async def _save_event(self, event: Dict[str, Any]): """Save event to database.""" try: event_type = event.get('eventType', 'Unknown') # Save to raw_events table raw_event_id = await self.db.insert_raw_event(event) if raw_event_id: # Store raw_event_id directly in the event for later use event['_raw_event_id'] = raw_event_id # Save to type-specific tables # Proxy와 MCP 모두 JSON-RPC 프로토콜이므로 rpc_events에 저장 if event_type.lower() in ['rpc', 'jsonrpc', 'mcp', 'proxy']: await self.db.insert_rpc_event(event, raw_event_id) # Extract MCP tool information if present data = event.get('data', {}) message = data.get('message', {}) task = data.get('task', '') if task == 'RECV' and 'tools' in message.get('result', {}): count = await self.db.insert_mcpl() print(f'[EventHub] insert_mcpl returned count: {count}') if count and count > 0: safe_print(f'[EventHub] Extracted {count} tool(s) to mcpl table') # mcpl에 insert된 tools를 백그라운드에서 분석 asyncio.create_task(self._analyze_mcpl_tools(count, event)) # Broadcast server update via WebSocket if self.ws_handler: asyncio.create_task(self.ws_handler.broadcast_server_update()) # Broadcast message update for new events mcp_tag = event.get('mcpTag') if self.ws_handler and mcp_tag: asyncio.create_task(self.ws_handler.broadcast_message_update( raw_event_id, mcp_tag )) except Exception as e: safe_print(f'[EventHub] Error saving event: {e}') async def _save_results_batch(self, results: List[Dict[str, Any]]): """ 엔진 결과를 일괄 저장 (배치 처리). Args: results: 저장할 결과 리스트 """ try: saved_count = 0 for result in results: result_data = result.get('result', {}) original_event = result_data.get('original_event', {}) # Get raw_event_id directly from event raw_event_id = original_event.get('_raw_event_id') # Extract metadata server_name = original_event.get('mcpTag') producer = original_event.get('producer', 'unknown') # Save to DB engine_result_id = await self.db.insert_engine_result( result, raw_event_id, server_name, producer ) if engine_result_id: saved_count += 1 # Broadcast detection result via WebSocket if self.ws_handler and raw_event_id: engine_name = result_data.get('detector', 'unknown') severity = result_data.get('severity', 'none') asyncio.create_task(self.ws_handler.broadcast_detection_result( raw_event_id, engine_name, severity )) if saved_count > 0: safe_print(f'[EventHub] Batch saved {saved_count} detection results') except Exception as e: safe_print(f'[EventHub] Error in batch save: {e}') import traceback traceback.print_exc() async def _save_result(self, result: Dict[str, Any]): """Save single engine detection result to database (legacy method).""" try: result_data = result.get('result', {}) original_event = result_data.get('original_event', {}) # Get raw_event_id directly from event raw_event_id = original_event.get('_raw_event_id') # Extract server name and producer server_name = original_event.get('mcpTag') producer = original_event.get('producer', 'unknown') # Save engine result engine_result_id = await self.db.insert_engine_result( result, raw_event_id, server_name, producer ) if engine_result_id: detector = result_data.get('detector') severity = result_data.get('severity') safe_print( f'[EventHub] Saved detection result (id={engine_result_id}, ' f'detector={detector}, severity={severity}, server={server_name})' ) except Exception as e: safe_print(f'[EventHub] Error saving result: {e}') async def _process_with_engine(self, engine, event: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Process event with a specific engine.""" try: result = await engine.handle_event(event) return result except Exception as e: safe_print(f'[EventHub] [{engine.name}] Error: {e}') return None async def _analyze_mcpl_tools(self, count: int, original_event: Dict[str, Any]): """ mcpl에 insert된 tools를 조회하여 ToolsPoisoningEngine으로 전달 Args: count: insert된 tool 개수 original_event: 원본 이벤트 (mcpTag, producer, raw_event_id 추출용) """ try: # mcpl에서 최근 insert된 tools 조회 tools = await self.db.get_recent_mcpl_tools(limit=count) if not tools: safe_print(f'[EventHub] No tools found in mcpl table') return safe_print(f'[EventHub] Analyzing {len(tools)} tools with ToolsPoisoningEngine') # ToolsPoisoningEngine 찾기 tools_poisoning_engine = None for engine in self.engines: if engine.name == 'ToolsPoisoningEngine': tools_poisoning_engine = engine break if not tools_poisoning_engine: safe_print(f'[EventHub] ToolsPoisoningEngine not found') return # Get raw_event_id from original event parent_raw_event_id = original_event.get('_raw_event_id') # 각 tool에 대해 병렬 분석 수행 tasks = [] for tool_data in tools: # tool_data를 event 형식으로 변환 synthetic_event = { 'eventType': 'MCP', 'producer': tool_data.get('producer', 'unknown'), 'mcpTag': tool_data.get('mcpTag', 'unknown'), 'ts': original_event.get('ts'), # 원본 이벤트의 timestamp 사용 '_raw_event_id': parent_raw_event_id, # 부모 이벤트의 raw_event_id 사용 'data': { 'task': 'RECV', 'message': { 'result': { 'tools': [{ 'name': tool_data.get('tool'), 'description': tool_data.get('tool_description') }] } }, 'mcpTag': tool_data.get('mcpTag', 'unknown') } } # 병렬 처리를 위해 task 추가 task = self._process_with_engine(tools_poisoning_engine, synthetic_event) tasks.append(task) # 모든 분석 완료 대기 results = await asyncio.gather(*tasks, return_exceptions=True) # 결과 수집 all_results = [] for result in results: if result and not isinstance(result, Exception): if isinstance(result, list): all_results.extend(result) else: all_results.append(result) # 결과 일괄 저장 if all_results: await self._save_results_batch(all_results) except Exception as e: safe_print(f'[EventHub] Error analyzing mcpl tools: {e}') async def reload_engine_rules(self, engine_name: str): """ Reload custom rules for a specific engine. Args: engine_name: Name of the engine (e.g., 'pii_leak_engine') """ try: # Map engine_name to class name engine_class_map = { 'pii_leak_engine': 'PIILeakEngine', 'command_injection_engine': 'CommandInjectionEngine', 'file_system_exposure_engine': 'FileSystemExposureEngine', } target_class_name = engine_class_map.get(engine_name) if not target_class_name: safe_print(f'[EventHub] Unknown engine name: {engine_name}') return # Find and reload the engine for engine in self.engines: if engine.name == target_class_name: if hasattr(engine, 'reload_rules'): await engine.reload_rules() safe_print(f'[EventHub] Reloaded rules for {target_class_name}') else: safe_print(f'[EventHub] {target_class_name} does not support rule reloading') break else: safe_print(f'[EventHub] Engine {target_class_name} not found') except Exception as e: safe_print(f'[EventHub] Error reloading engine rules: {e}')

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/seungwon9201/MCP-Dandan'

If you have feedback or need assistance with the MCP directory API, please join our Discord server