Skip to main content
Glama
claude_ipc_server.py53.3 kB
#!/usr/bin/env python3 """ Claude IPC MCP Server for WSL - Inter-Process Communication between Claude Code instances Uses TCP sockets for WSL-to-WSL communication on Windows 10 """ import asyncio import json import logging import socket from datetime import datetime, timedelta from typing import Dict, List, Any, Optional, Tuple import sys import os import threading import time import sqlite3 import hashlib import secrets from pathlib import Path from mcp.server import Server from mcp.types import ( Resource, Tool, TextContent, LoggingLevel ) # Configuration IPC_HOST = "127.0.0.1" # Localhost works across WSL instances IPC_PORT = 9876 # Choose a port that's likely free HEARTBEAT_INTERVAL = 30 # seconds # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class RateLimiter: """Simple in-memory rate limiter""" def __init__(self, max_requests: int = 100, window_seconds: int = 60): self.max_requests = max_requests self.window_seconds = window_seconds self.requests = {} # instance_id -> list of timestamps self.lock = threading.Lock() def is_allowed(self, instance_id: str) -> bool: """Check if request is allowed under rate limit""" with self.lock: now = time.time() # Initialize if needed if instance_id not in self.requests: self.requests[instance_id] = [] # Remove old requests outside window self.requests[instance_id] = [ ts for ts in self.requests[instance_id] if now - ts < self.window_seconds ] # Check if under limit if len(self.requests[instance_id]) >= self.max_requests: return False # Record this request self.requests[instance_id].append(now) return True class MessageBroker: """Message broker that runs as a separate thread with SQLite persistence""" def __init__(self, host: str, port: int): self.host = host self.port = port self.queues: Dict[str, List[Dict[str, Any]]] = {} self.instances: Dict[str, datetime] = {} self.running = False self.server_socket = None self.lock = threading.Lock() # Name change tracking self.name_history: Dict[str, Tuple[str, datetime]] = {} # old_name -> (new_name, when) self.last_rename: Dict[str, datetime] = {} # instance_id -> last rename time # Session management for security self.sessions: Dict[str, Dict[str, Any]] = {} # session_token -> {instance_id, created_at} self.instance_sessions: Dict[str, str] = {} # instance_id -> session_token # Rate limiting self.rate_limiter = RateLimiter(max_requests=100, window_seconds=60) # SQLite persistence - secure location self.db_dir = os.path.expanduser("~/.claude-ipc-data") self.db_path = os.path.join(self.db_dir, "messages.db") self._init_database() self._load_from_database() def _init_database(self): """Initialize SQLite database with required tables""" try: # Create secure directory if it doesn't exist os.makedirs(self.db_dir, 0o700, exist_ok=True) # Create database connection conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Set secure permissions on database file if os.path.exists(self.db_path): os.chmod(self.db_path, 0o600) # Messages table cursor.execute(''' CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, from_id TEXT NOT NULL, to_id TEXT NOT NULL, content TEXT NOT NULL, timestamp TEXT NOT NULL, data TEXT, summary TEXT, large_file_path TEXT, read_flag INTEGER DEFAULT 0 ) ''') # Instances table cursor.execute(''' CREATE TABLE IF NOT EXISTS instances ( instance_id TEXT PRIMARY KEY, last_seen TEXT NOT NULL ) ''') # Sessions table - now with hashed tokens and expiration cursor.execute(''' CREATE TABLE IF NOT EXISTS sessions ( session_token_hash TEXT PRIMARY KEY, instance_id TEXT NOT NULL, created_at TEXT NOT NULL, expires_at TEXT NOT NULL ) ''') # Name history table cursor.execute(''' CREATE TABLE IF NOT EXISTS name_history ( old_name TEXT PRIMARY KEY, new_name TEXT NOT NULL, changed_at TEXT NOT NULL ) ''') conn.commit() conn.close() logger.info(f"SQLite database initialized at {self.db_path}") except Exception as e: logger.error(f"Failed to initialize database: {e}") # Fall back to in-memory only if DB fails self.db_path = None def _load_from_database(self): """Load existing messages and instances from database""" if not self.db_path: return try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Load unread messages cursor.execute(''' SELECT from_id, to_id, content, timestamp, data, summary, large_file_path FROM messages WHERE read_flag = 0 ORDER BY timestamp ''') for row in cursor.fetchall(): from_id, to_id, content, timestamp, data, summary, large_file_path = row # Reconstruct message in the expected format msg_content = {"content": content} if data: msg_content["data"] = json.loads(data) msg_data = { 'from': from_id, 'to': to_id, 'timestamp': timestamp, 'message': msg_content } # Add extra fields if present if summary: msg_data['summary'] = summary if large_file_path: msg_data['large_file_path'] = large_file_path if to_id not in self.queues: self.queues[to_id] = [] self.queues[to_id].append(msg_data) # Load active instances cursor.execute('SELECT instance_id, last_seen FROM instances') for row in cursor.fetchall(): instance_id, last_seen = row self.instances[instance_id] = datetime.fromisoformat(last_seen) # Load sessions - Note: we store hashes, not raw tokens # We'll need to handle validation differently now # For now, just track which instances have active sessions cursor.execute(''' SELECT instance_id, expires_at FROM sessions WHERE expires_at > ? ''', (datetime.now().isoformat(),)) active_instances = set() for row in cursor.fetchall(): instance_id, expires_at = row active_instances.add(instance_id) # Clean up expired sessions cursor.execute('DELETE FROM sessions WHERE expires_at <= ?', (datetime.now().isoformat(),)) conn.commit() # Load name history cursor.execute('SELECT old_name, new_name, changed_at FROM name_history') for row in cursor.fetchall(): old_name, new_name, changed_at = row self.name_history[old_name] = (new_name, datetime.fromisoformat(changed_at)) conn.close() logger.info(f"Loaded {sum(len(q) for q in self.queues.values())} messages from database") except Exception as e: logger.error(f"Failed to load from database: {e}") def _save_message_to_db(self, from_id: str, to_id: str, msg_data: Dict[str, Any]): """Save message to SQLite database""" if not self.db_path: return try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() message = msg_data.get("message", {}) content = message.get("content", "") data = message.get("data") # Extract summary and large file path if present summary = None large_file_path = None if data and isinstance(data, dict): if "large_message_file" in data: large_file_path = data["large_message_file"] # Extract summary from content if "Full content saved to:" in content: summary = content.split("Full content saved to:")[0].strip() cursor.execute(''' INSERT INTO messages (from_id, to_id, content, timestamp, data, summary, large_file_path) VALUES (?, ?, ?, ?, ?, ?, ?) ''', ( from_id, to_id, content, msg_data["timestamp"], json.dumps(data) if data else None, summary, large_file_path )) conn.commit() conn.close() except Exception as e: logger.error(f"Failed to save message to database: {e}") def _mark_messages_as_read(self, instance_id: str, message_ids: List[int]): """Mark messages as read in the database""" if not self.db_path or not message_ids: return try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() placeholders = ','.join('?' for _ in message_ids) cursor.execute(f''' UPDATE messages SET read_flag = 1 WHERE to_id = ? AND id IN ({placeholders}) ''', [instance_id] + message_ids) conn.commit() conn.close() except Exception as e: logger.error(f"Failed to mark messages as read: {e}") def _save_instance_to_db(self, instance_id: str): """Save or update instance in database""" if not self.db_path: return try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' INSERT OR REPLACE INTO instances (instance_id, last_seen) VALUES (?, ?) ''', (instance_id, datetime.now().isoformat())) conn.commit() conn.close() except Exception as e: logger.error(f"Failed to save instance to database: {e}") def _save_session_to_db(self, session_token: str, instance_id: str): """Save session to database""" if not self.db_path: return try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Hash the token and set expiration (24 hours from now) token_hash = self._hash_token(session_token) now = datetime.now() expires_at = now + timedelta(hours=24) cursor.execute(''' INSERT OR REPLACE INTO sessions (session_token_hash, instance_id, created_at, expires_at) VALUES (?, ?, ?, ?) ''', (token_hash, instance_id, now.isoformat(), expires_at.isoformat())) conn.commit() conn.close() except Exception as e: logger.error(f"Failed to save session to database: {e}") def _validate_instance_id(self, instance_id: str) -> bool: """Validate instance ID for security""" import re # Allow only alphanumeric, hyphens, underscores, 1-32 chars if not instance_id or len(instance_id) > 32: return False return bool(re.match(r'^[a-zA-Z0-9_-]+$', instance_id)) def _hash_token(self, token: str) -> str: """Hash a session token for secure storage""" # Use SHA-256 with a salt for security salt = "claude-ipc-mcp-v2" # In production, use unique salt per deployment return hashlib.sha256(f"{salt}:{token}".encode()).hexdigest() def start(self): """Start the message broker server""" self.running = True threading.Thread(target=self._run_server, daemon=True).start() def stop(self): """Stop the message broker server""" self.running = False if self.server_socket: self.server_socket.close() def _run_server(self): """Run the TCP server""" try: self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server_socket.bind((self.host, self.port)) self.server_socket.listen(5) logger.info(f"Message broker listening on {self.host}:{self.port}") while self.running: try: self.server_socket.settimeout(1.0) client_socket, address = self.server_socket.accept() threading.Thread( target=self._handle_client, args=(client_socket,), daemon=True ).start() except socket.timeout: continue except Exception as e: if self.running: logger.error(f"Server error: {e}") except Exception as e: logger.error(f"Failed to start message broker: {e}") def _handle_client(self, client_socket: socket.socket): """Handle a client connection""" try: # Read smaller initial chunk to prevent DoS (M-03 fix) data = client_socket.recv(4096).decode('utf-8') request = json.loads(data) response = self._process_request(request) client_socket.send(json.dumps(response).encode('utf-8')) client_socket.close() except Exception as e: logger.error(f"Client handling error: {e}") try: error_response = {"status": "error", "message": str(e)} client_socket.send(json.dumps(error_response).encode('utf-8')) except: pass finally: client_socket.close() def _clean_expired_forwards(self): """Remove name forwards older than 2 hours""" now = datetime.now() expired = [] for old_name, (new_name, timestamp) in self.name_history.items(): if (now - timestamp).total_seconds() > 7200: # 2 hours expired.append(old_name) for name in expired: del self.name_history[name] def _clean_expired_messages(self): """Remove messages older than 7 days for unregistered instances""" now = datetime.now() expired_days = 7 * 24 * 3600 # 7 days in seconds for instance_id in list(self.queues.keys()): # Only clean messages for unregistered instances if instance_id not in self.instances: # Filter out expired messages unexpired_messages = [] for msg in self.queues[instance_id]: try: msg_time = datetime.fromisoformat(msg['timestamp']) if (now - msg_time).total_seconds() < expired_days: unexpired_messages.append(msg) except (KeyError, ValueError): # Keep messages with invalid timestamps (safer) unexpired_messages.append(msg) # Update queue or remove if empty if unexpired_messages: self.queues[instance_id] = unexpired_messages else: del self.queues[instance_id] # Also clean database if self.db_path: try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Delete old messages for unregistered instances cursor.execute(''' DELETE FROM messages WHERE datetime(timestamp) < datetime('now', '-7 days') AND to_id NOT IN (SELECT instance_id FROM instances) ''') conn.commit() conn.close() except Exception as e: logger.error(f"Failed to clean expired messages from database: {e}") def _resolve_name(self, name: str) -> str: """Resolve a name through forwarding history""" self._clean_expired_forwards() self._clean_expired_messages() if name in self.name_history: new_name, timestamp = self.name_history[name] return new_name return name def _create_summary(self, content: str, max_length: int = 150) -> str: """Create a 2-sentence summary of content""" # Clean up content content = content.strip() # Try to extract first two sentences sentences = [] temp = "" for char in content: temp += char if char in '.!?' and len(temp.strip()) > 10: sentences.append(temp.strip()) temp = "" if len(sentences) >= 2: break if sentences: summary = " ".join(sentences[:2]) else: # If no sentences found, just truncate summary = content[:max_length].strip() if len(content) > max_length: summary += "..." return summary def _save_large_message(self, from_id: str, to_id: str, content: str) -> str: """Save large message to file and return file path""" # Validate instance IDs first if not self._validate_instance_id(from_id) or not self._validate_instance_id(to_id): raise ValueError("Invalid instance ID for file path") # Additional sanitization for filenames safe_from = from_id.replace("/", "_").replace("\\", "_") safe_to = to_id.replace("/", "_").replace("\\", "_") # Create timestamp-based filename timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") filename = f"{timestamp}_{safe_from}_{safe_to}_message.md" # Use secure directory for large messages large_msg_dir = os.path.join(self.db_dir, "large-messages") os.makedirs(large_msg_dir, 0o700, exist_ok=True) filepath = os.path.join(large_msg_dir, filename) # Calculate size in KB size_kb = len(content.encode('utf-8')) / 1024 # Create file content file_content = f"""# IPC Message From: {from_id} To: {to_id} Time: {datetime.now().isoformat()} Size: {size_kb:.1f}KB ## Content {content} """ try: # Directory already created with secure permissions above with open(filepath, 'w', encoding='utf-8') as f: f.write(file_content) # Set secure permissions on the file os.chmod(filepath, 0o600) return filepath except Exception as e: logger.error(f"Failed to save large message: {e}") return None def _validate_session(self, request: Dict[str, Any], action: str) -> Optional[str]: """Validate session token and return instance_id if valid""" if action == "register": # Registration doesn't need session token return None session_token = request.get("session_token") if not session_token: return None # Hash the provided token to compare with database token_hash = self._hash_token(session_token) # Check database for valid session if not self.db_path: return None try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Check if token exists and is not expired cursor.execute(''' SELECT instance_id FROM sessions WHERE session_token_hash = ? AND expires_at > ? ''', (token_hash, datetime.now().isoformat())) result = cursor.fetchone() conn.close() if result: return result[0] # Return instance_id return None except Exception as e: logger.error(f"Session validation error: {e}") return None def _process_request(self, request: Dict[str, Any]) -> Dict[str, Any]: """Process a broker request""" action = request.get("action") with self.lock: # Validate session for non-registration actions if action != "register": instance_id = self._validate_session(request, action) if not instance_id: return {"status": "error", "message": "Invalid or missing session token"} # Override any claimed instance_id with the validated one if "from_id" in request: request["from_id"] = instance_id if "instance_id" in request: request["instance_id"] = instance_id # Check rate limit for authenticated requests if not self.rate_limiter.is_allowed(instance_id): return {"status": "error", "message": "Rate limit exceeded. Please wait before sending more requests."} if action == "register": instance_id = request["instance_id"] # Validate instance ID format if not self._validate_instance_id(instance_id): return {"status": "error", "message": "Invalid instance ID format. Use 1-32 alphanumeric characters, hyphens, or underscores."} # Rate limit registration attempts (use IP or a special key) if not self.rate_limiter.is_allowed(f"register_{instance_id}"): return {"status": "error", "message": "Too many registration attempts. Please wait."} # Validate auth token (shared secret) auth_token = request.get("auth_token") shared_secret = os.environ.get("IPC_SHARED_SECRET", "") if shared_secret: import hashlib expected_token = hashlib.sha256(f"{instance_id}:{shared_secret}".encode()).hexdigest() if auth_token != expected_token: return {"status": "error", "message": "Invalid auth token"} # Generate session token session_token = secrets.token_urlsafe(32) # Register instance self.instances[instance_id] = datetime.now() # Save to database self._save_instance_to_db(instance_id) self._save_session_to_db(session_token, instance_id) # Preserve existing queue or create new one if instance_id not in self.queues: self.queues[instance_id] = [] queued_count = 0 else: queued_count = len(self.queues[instance_id]) response = { "status": "ok", "session_token": session_token, "message": f"Registered {instance_id}" } if queued_count > 0: response["message"] = f"Registered {instance_id} with {queued_count} queued messages" return response elif action == "send": from_id = request["from_id"] to_id = request["to_id"] message = request["message"] # Validate to_id format if not self._validate_instance_id(to_id): return {"status": "error", "message": "Invalid recipient ID format"} # Check message size (10KB threshold) content = message.get("content", "") content_size = len(content.encode('utf-8')) size_threshold = 10 * 1024 # 10KB if content_size > size_threshold: # Save large message to file filepath = self._save_large_message(from_id, to_id, content) if filepath: # Create summary and update message summary = self._create_summary(content) message = { "content": f"{summary} Full content saved to: {filepath}", "data": message.get("data", {}) } message["data"]["large_message_file"] = filepath message["data"]["original_size_kb"] = round(content_size / 1024, 1) # Resolve name through forwarding if needed resolved_to = self._resolve_name(to_id) forwarded = resolved_to != to_id # Create queue for future instances if it doesn't exist if resolved_to not in self.queues: self.queues[resolved_to] = [] future_delivery = True else: future_delivery = not (resolved_to in self.instances) # Check queue limit (100 messages per instance) if len(self.queues[resolved_to]) >= 100: return {"status": "error", "message": f"Message queue full for {resolved_to} (100 message limit)"} msg_data = { "from": from_id, "to": resolved_to, "timestamp": datetime.now().isoformat(), "message": message } self.queues[resolved_to].append(msg_data) # Save to SQLite self._save_message_to_db(from_id, resolved_to, msg_data) if forwarded: return {"status": "ok", "message": f"Message forwarded from {to_id} to {resolved_to}"} elif future_delivery: return {"status": "ok", "message": f"Message queued for {resolved_to} (not yet registered)"} else: return {"status": "ok", "message": "Message sent"} elif action == "broadcast": from_id = request["from_id"] message = request["message"] count = 0 for instance_id in self.queues: if instance_id != from_id: msg_data = { "from": from_id, "to": instance_id, "timestamp": datetime.now().isoformat(), "message": message } self.queues[instance_id].append(msg_data) # Save to SQLite self._save_message_to_db(from_id, instance_id, msg_data) count += 1 return {"status": "ok", "message": f"Broadcast to {count} instances"} elif action == "check": # instance_id already validated and set from session instance_id = request["instance_id"] # Resolve name through forwarding if needed resolved_id = self._resolve_name(instance_id) if resolved_id not in self.queues: return {"status": "ok", "messages": []} messages = self.queues[resolved_id] self.queues[resolved_id] = [] # Mark messages as read in database if self.db_path and messages: try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Get message IDs to mark as read for msg in messages: cursor.execute(''' UPDATE messages SET read_flag = 1 WHERE to_id = ? AND timestamp = ? AND read_flag = 0 ''', (resolved_id, msg.get("timestamp"))) conn.commit() conn.close() except Exception as e: logger.error(f"Failed to mark messages as read: {e}") return {"status": "ok", "messages": messages} elif action == "list": instances = [ {"id": id, "last_seen": ts.isoformat()} for id, ts in self.instances.items() ] return {"status": "ok", "instances": instances} elif action == "rename": # Get validated instance_id from session old_id = request.get("old_id") # This will be overridden by session validation new_id = request["new_id"] # Validate new_id format if not self._validate_instance_id(new_id): return {"status": "error", "message": "Invalid new instance ID format"} # The old_id should match the session's instance_id (enforced by _process_request) # Check if old instance exists if old_id not in self.instances: return {"status": "error", "message": f"Instance {old_id} not found"} if new_id in self.instances: return {"status": "error", "message": f"Instance {new_id} already exists"} # Check rate limit (1 hour) now = datetime.now() if old_id in self.last_rename: time_since_last = (now - self.last_rename[old_id]).total_seconds() if time_since_last < 3600: # 1 hour in seconds minutes_left = int((3600 - time_since_last) / 60) return {"status": "error", "message": f"Rate limit: can rename again in {minutes_left} minutes"} # Move the queue if old_id in self.queues: self.queues[new_id] = self.queues.pop(old_id) else: self.queues[new_id] = [] # Update instance record self.instances[new_id] = self.instances.pop(old_id) # Set up name forwarding self.name_history[old_id] = (new_id, now) # Update rate limit tracking self.last_rename[new_id] = now if old_id in self.last_rename: del self.last_rename[old_id] # Update session mapping if old_id in self.instance_sessions: session_token = self.instance_sessions.pop(old_id) self.instance_sessions[new_id] = session_token # Update session info if session_token in self.sessions: self.sessions[session_token]["instance_id"] = new_id # Broadcast rename notification for instance_id in self.queues: if instance_id != new_id: notification = { "from": "system", "to": instance_id, "timestamp": now.isoformat(), "message": {"content": f"📝 {old_id} renamed to {new_id}"} } self.queues[instance_id].append(notification) return {"status": "ok", "message": f"Renamed {old_id} to {new_id}"} else: return {"status": "error", "message": f"Unknown action: {action}"} # Global broker instance broker = MessageBroker(IPC_HOST, IPC_PORT) # Session storage for this MCP instance current_session_token = None current_instance_id = None # Try to start broker (only first instance will succeed) try: broker.start() logger.info("Started message broker") except: logger.info("Message broker already running") class BrokerClient: """Client for communicating with the message broker""" @staticmethod def send_request(request: Dict[str, Any]) -> Dict[str, Any]: """Send a request to the broker""" try: client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client_socket.settimeout(5.0) client_socket.connect((IPC_HOST, IPC_PORT)) client_socket.send(json.dumps(request).encode('utf-8')) response_data = client_socket.recv(65536).decode('utf-8') response = json.loads(response_data) client_socket.close() return response except Exception as e: return {"status": "error", "message": f"Broker connection failed: {e}"} # Create MCP server app = Server("claude-ipc-wsl") @app.list_resources() async def list_resources() -> List[Resource]: """List available resources""" return [ Resource( uri="ipc://status", name="IPC Status", mimeType="application/json", description="Current status of the IPC system" ) ] @app.read_resource() async def read_resource(uri: str) -> str: """Read resource content""" if uri == "ipc://status": response = BrokerClient.send_request({"action": "list"}) return json.dumps({ "broker_host": IPC_HOST, "broker_port": IPC_PORT, "status": response.get("status", "unknown"), "instances": response.get("instances", []) }, indent=2) return json.dumps({"error": "Unknown resource"}) @app.list_tools() async def list_tools() -> List[Tool]: """List available tools""" return [ Tool( name="register", description="Register this Claude instance with the IPC system", inputSchema={ "type": "object", "properties": { "instance_id": { "type": "string", "description": "Unique identifier for this instance (e.g., 'wsl1', 'wsl2')" } }, "required": ["instance_id"] } ), Tool( name="send", description="Send a message to another Claude instance", inputSchema={ "type": "object", "properties": { "from_id": { "type": "string", "description": "Your instance ID" }, "to_id": { "type": "string", "description": "Target instance ID" }, "content": { "type": "string", "description": "Message content" }, "data": { "type": "object", "description": "Optional structured data to send" } }, "required": ["from_id", "to_id", "content"] } ), Tool( name="broadcast", description="Broadcast a message to all other Claude instances", inputSchema={ "type": "object", "properties": { "from_id": { "type": "string", "description": "Your instance ID" }, "content": { "type": "string", "description": "Message content" }, "data": { "type": "object", "description": "Optional structured data" } }, "required": ["from_id", "content"] } ), Tool( name="check", description="Check for new messages", inputSchema={ "type": "object", "properties": { "instance_id": { "type": "string", "description": "Your instance ID" } }, "required": ["instance_id"] } ), Tool( name="list_instances", description="List all active Claude instances", inputSchema={ "type": "object", "properties": {} } ), Tool( name="share_file", description="Share file content with another instance", inputSchema={ "type": "object", "properties": { "from_id": { "type": "string", "description": "Your instance ID" }, "to_id": { "type": "string", "description": "Target instance ID" }, "filepath": { "type": "string", "description": "Path to file to share" }, "description": { "type": "string", "description": "Description of the file" } }, "required": ["from_id", "to_id", "filepath"] } ), Tool( name="share_command", description="Execute a command and share output with another instance", inputSchema={ "type": "object", "properties": { "from_id": { "type": "string", "description": "Your instance ID" }, "to_id": { "type": "string", "description": "Target instance ID" }, "command": { "type": "string", "description": "Command to execute" }, "description": { "type": "string", "description": "Description of what this command does" } }, "required": ["from_id", "to_id", "command"] } ), Tool( name="rename", description="Rename your instance ID (rate limited to once per hour)", inputSchema={ "type": "object", "properties": { "old_id": { "type": "string", "description": "Your current instance ID" }, "new_id": { "type": "string", "description": "The new instance ID you want" } }, "required": ["old_id", "new_id"] } ), Tool( name="auto_process", description="Automatically check and process IPC messages (for use with auto-check feature)", inputSchema={ "type": "object", "properties": { "instance_id": { "type": "string", "description": "Your instance ID" } }, "required": ["instance_id"] } ) ] @app.call_tool() async def call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]: """Execute tool calls""" if name == "register": # Calculate auth token instance_id = arguments["instance_id"] shared_secret = os.environ.get("IPC_SHARED_SECRET", "") auth_token = "" if shared_secret: import hashlib auth_token = hashlib.sha256(f"{instance_id}:{shared_secret}".encode()).hexdigest() response = BrokerClient.send_request({ "action": "register", "instance_id": instance_id, "auth_token": auth_token }) # Store session token for this MCP instance if response.get("status") == "ok" and response.get("session_token"): # Store in a global variable for this MCP session global current_session_token, current_instance_id current_session_token = response["session_token"] current_instance_id = instance_id return [TextContent(type="text", text=json.dumps(response, indent=2))] elif name == "send": if not current_session_token: return [TextContent(type="text", text="Error: Not registered. Please register first.")] message = { "content": arguments["content"], "data": arguments.get("data", {}) } response = BrokerClient.send_request({ "action": "send", "from_id": arguments["from_id"], "to_id": arguments["to_id"], "message": message, "session_token": current_session_token }) return [TextContent(type="text", text=json.dumps(response, indent=2))] elif name == "broadcast": if not current_session_token: return [TextContent(type="text", text="Error: Not registered. Please register first.")] message = { "content": arguments["content"], "data": arguments.get("data", {}) } response = BrokerClient.send_request({ "action": "broadcast", "from_id": arguments["from_id"], "message": message, "session_token": current_session_token }) return [TextContent(type="text", text=json.dumps(response, indent=2))] elif name == "check": if not current_session_token: return [TextContent(type="text", text="Error: Not registered. Please register first.")] response = BrokerClient.send_request({ "action": "check", "instance_id": arguments["instance_id"], "session_token": current_session_token }) if response["status"] == "ok" and response.get("messages"): formatted = "New messages:\n" for msg in response["messages"]: formatted += f"\nFrom: {msg['from']}\n" formatted += f"Time: {msg['timestamp']}\n" formatted += f"Content: {msg['message']['content']}\n" if msg['message'].get('data'): formatted += f"Data: {json.dumps(msg['message']['data'], indent=2)}\n" return [TextContent(type="text", text=formatted)] else: return [TextContent(type="text", text="No new messages")] elif name == "list_instances": response = BrokerClient.send_request({"action": "list"}) return [TextContent(type="text", text=json.dumps(response, indent=2))] elif name == "share_file": if not current_session_token: return [TextContent(type="text", text="Error: Not registered. Please register first.")] try: with open(arguments["filepath"], 'r') as f: content = f.read() message = { "content": f"Shared file: {arguments['filepath']}", "data": { "type": "file", "filepath": arguments["filepath"], "content": content, "description": arguments.get("description", "") } } response = BrokerClient.send_request({ "action": "send", "from_id": arguments["from_id"], "to_id": arguments["to_id"], "message": message, "session_token": current_session_token }) return [TextContent(type="text", text=f"File shared: {json.dumps(response, indent=2)}")] except Exception as e: return [TextContent(type="text", text=f"Error sharing file: {e}")] elif name == "share_command": if not current_session_token: return [TextContent(type="text", text="Error: Not registered. Please register first.")] try: import subprocess import shlex # Parse command safely to prevent injection try: cmd_args = shlex.split(arguments["command"]) except ValueError as e: return [TextContent(type="text", text=f"Invalid command format: {e}")] # Run without shell=True for security result = subprocess.run( cmd_args, shell=False, capture_output=True, text=True, timeout=30 # Add timeout to prevent hanging ) message = { "content": f"Command output: {arguments['command']}", "data": { "type": "command", "command": arguments["command"], "stdout": result.stdout, "stderr": result.stderr, "returncode": result.returncode, "description": arguments.get("description", "") } } response = BrokerClient.send_request({ "action": "send", "from_id": arguments["from_id"], "to_id": arguments["to_id"], "message": message, "session_token": current_session_token }) return [TextContent(type="text", text=f"Command output shared: {json.dumps(response, indent=2)}")] except Exception as e: return [TextContent(type="text", text=f"Error executing command: {e}")] elif name == "rename": if not current_session_token: return [TextContent(type="text", text="Error: Not registered. Please register first.")] response = BrokerClient.send_request({ "action": "rename", "old_id": arguments["old_id"], "new_id": arguments["new_id"], "session_token": current_session_token }) # Update stored instance_id if rename succeeded if response.get("status") == "ok": current_instance_id = arguments["new_id"] return [TextContent(type="text", text=json.dumps(response, indent=2))] elif name == "auto_process": if not current_session_token: return [TextContent(type="text", text="Error: Not registered. Please register first.")] instance_id = arguments["instance_id"] # Check for messages using existing check functionality check_response = BrokerClient.send_request({ "action": "check", "instance_id": instance_id, "session_token": current_session_token }) if check_response.get("status") != "ok": return [TextContent(type="text", text=f"Error checking messages: {check_response.get('message')}")] messages = check_response.get("messages", []) if not messages: return [TextContent(type="text", text="No messages to process.")] # Process each message processed = [] for msg in messages: sender = msg.get("from", "unknown") content = msg.get("message", {}).get("content", "") timestamp = msg.get("timestamp", "") # Log what we're processing action_taken = f"From {sender}: {content[:50]}..." # Here we could add smart processing logic: # - If content contains "read", read the mentioned file # - If content contains "urgent", send acknowledgment # - etc. # For now, just acknowledge receipt if sender in ["fred", "claude", "nessa"]: # Known senders ack_response = BrokerClient.send_request({ "action": "send", "from_id": instance_id, "to_id": sender, "message": { "content": f"Auto-processed your message from {timestamp}. Content received: '{content[:30]}...'", "data": {"auto_processed": True} }, "session_token": current_session_token }) if ack_response.get("status") == "ok": action_taken += " [Acknowledged]" processed.append(action_taken) # Update last check time import time os.makedirs("/tmp/claude-ipc-mcp", exist_ok=True) config_file = "/tmp/claude-ipc-mcp/auto_check_config.json" if os.path.exists(config_file): with open(config_file, 'r') as f: config = json.load(f) config["last_check"] = time.strftime("%Y-%m-%dT%H:%M:%S") with open(config_file, 'w') as f: json.dump(config, f, indent=2) # Return summary summary = f"Auto-processed {len(messages)} message(s):\n" summary += "\n".join(f" {i+1}. {p}" for i, p in enumerate(processed)) return [TextContent(type="text", text=summary)] return [TextContent(type="text", text=f"Unknown tool: {name}")] async def run_server(): """Run the MCP server""" from mcp.server.stdio import stdio_server logger.info("Starting Claude IPC MCP Server for WSL") logger.info(f"Broker endpoint: {IPC_HOST}:{IPC_PORT}") async with stdio_server() as (read_stream, write_stream): await app.run( read_stream, write_stream, app.create_initialization_options() ) def main(): """Main entry point""" try: asyncio.run(run_server()) except KeyboardInterrupt: logger.info("Server stopped by user") broker.stop() except Exception as e: logger.error(f"Server error: {e}") sys.exit(1) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jdez427/claude-ipc-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server