Skip to main content
Glama

Katamari MCP Server

by ciphernaut
named_pipe.py12.1 kB
""" Named Pipe Transport Layer - Stretch Goal Implementation Provides faster startup and persistent context through named pipes. Cross-platform support for Windows, Linux, and macOS. """ import asyncio import os import sys import json import logging from typing import Dict, Any, Optional, Callable from pathlib import Path from datetime import datetime logger = logging.getLogger(__name__) class NamedPipeTransport: """Named pipe transport for MCP communication.""" def __init__(self, pipe_name: str = "katamari_mcp"): self.pipe_name = pipe_name self.pipe_path = self._get_pipe_path() self.server = None self.clients = [] self.message_handlers: Dict[str, Callable] = {} self.context_store = {} # Persistent context between calls def _get_pipe_path(self) -> str: """Get platform-specific pipe path.""" if sys.platform == "win32": return f"\\\\.\\pipe\\{self.pipe_name}" else: # Use socket file for Unix-like systems return f"/tmp/{self.pipe_name}.sock" async def start_server(self): """Start the named pipe server.""" try: # Clean up existing pipe/socket if os.path.exists(self.pipe_path): os.unlink(self.pipe_path) if sys.platform == "win32": await self._start_windows_server() else: await self._start_unix_server() logger.info(f"Named pipe server started on {self.pipe_path}") except Exception as e: logger.error(f"Failed to start named pipe server: {e}") raise async def _start_windows_server(self): """Start Windows named pipe server.""" import win32pipe import win32file import win32api while True: try: # Create named pipe pipe = win32pipe.CreateNamedPipe( self.pipe_path, win32pipe.PIPE_ACCESS_DUPLEX, win32pipe.PIPE_TYPE_MESSAGE | win32pipe.PIPE_WAIT, win32pipe.PIPE_UNLIMITED_INSTANCES, 4096, # out buffer 4096, # in buffer 0, # timeout None # security ) # Wait for client connection win32pipe.ConnectNamedPipe(pipe, None) # Handle client in separate task asyncio.create_task(self._handle_windows_client(pipe)) except Exception as e: logger.error(f"Windows pipe server error: {e}") await asyncio.sleep(1) async def _start_unix_server(self): """Start Unix domain socket server.""" import socket self.server = await asyncio.start_unix_server( self._handle_unix_client, self.pipe_path ) async with self.server: await self.server.serve_forever() async def _handle_windows_client(self, pipe): """Handle Windows named pipe client.""" import win32file import win32pipe try: while True: # Read message from client result, data = win32file.ReadFile(pipe, 4096) if result == 0: # client disconnected break message = data.decode('utf-8').strip() if message: response = await self._process_message(message) # Send response win32file.WriteFile(pipe, response.encode('utf-8')) except Exception as e: logger.error(f"Windows client handler error: {e}") finally: win32file.CloseHandle(pipe) async def _handle_unix_client(self, reader, writer): """Handle Unix domain socket client.""" client_addr = writer.get_extra_info('peername') logger.info(f"Client connected: {client_addr}") try: while True: # Read message from client data = await reader.read(4096) if not data: # client disconnected break message = data.decode('utf-8').strip() if message: response = await self._process_message(message) # Send response writer.write(response.encode('utf-8')) await writer.drain() except Exception as e: logger.error(f"Unix client handler error: {e}") finally: writer.close() await writer.wait_closed() logger.info(f"Client disconnected: {client_addr}") async def _process_message(self, message: str) -> str: """Process incoming message and return response.""" try: data = json.loads(message) method = data.get('method') params = data.get('params', {}) message_id = data.get('id') # Add context to params if 'context' not in params: params['context'] = self.context_store # Route to appropriate handler if method in self.message_handlers: result = await self.message_handlers[method](params) # Update context if provided if 'context' in result: self.context_store.update(result['context']) response = { 'id': message_id, 'result': result, 'timestamp': datetime.now().isoformat() } else: response = { 'id': message_id, 'error': f"Unknown method: {method}", 'timestamp': datetime.now().isoformat() } except json.JSONDecodeError: response = { 'error': 'Invalid JSON message', 'timestamp': datetime.now().isoformat() } except Exception as e: response = { 'error': str(e), 'timestamp': datetime.now().isoformat() } return json.dumps(response) + '\n' def register_handler(self, method: str, handler: Callable): """Register message handler for specific method.""" self.message_handlers[method] = handler logger.info(f"Registered handler for method: {method}") def get_context(self, key: str, default: Any = None) -> Any: """Get value from context store.""" return self.context_store.get(key, default) def set_context(self, key: str, value: Any): """Set value in context store.""" self.context_store[key] = value def clear_context(self): """Clear all context.""" self.context_store.clear() class PipeClient: """Lightweight client for named pipe communication.""" def __init__(self, pipe_name: str = "katamari_mcp"): self.pipe_name = pipe_name self.pipe_path = self._get_pipe_path() def _get_pipe_path(self) -> str: """Get platform-specific pipe path.""" if sys.platform == "win32": return f"\\\\.\\pipe\\{self.pipe_name}" else: return f"/tmp/{self.pipe_name}.sock" async def send_message(self, method: str, params: Dict[str, Any] = None) -> Dict[str, Any]: """Send message to pipe server and get response.""" if params is None: params = {} message = { 'method': method, 'params': params, 'id': int(datetime.now().timestamp() * 1000) # unique ID } try: if sys.platform == "win32": return await self._send_windows_message(message) else: return await self._send_unix_message(message) except Exception as e: logger.error(f"Failed to send message: {e}") return {'error': str(e)} async def _send_windows_message(self, message: Dict) -> Dict[str, Any]: """Send message via Windows named pipe.""" import win32file import win32pipe import win32api try: # Connect to pipe handle = win32file.CreateFile( self.pipe_path, win32file.GENERIC_READ | win32file.GENERIC_WRITE, 0, None, win32file.OPEN_EXISTING, 0, None ) # Send message message_str = json.dumps(message) + '\n' win32file.WriteFile(handle, message_str.encode('utf-8')) # Read response result, response = win32file.ReadFile(handle, 4096) response_data = json.loads(response.decode('utf-8').strip()) win32file.CloseHandle(handle) return response_data except Exception as e: if win32api.GetLastError() == 2: # ERROR_FILE_NOT_FOUND return {'error': 'Pipe server not running'} raise async def _send_unix_message(self, message: Dict) -> Dict[str, Any]: """Send message via Unix domain socket.""" import socket try: # Create socket and connect sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.connect(self.pipe_path) # Send message message_str = json.dumps(message) + '\n' sock.send(message_str.encode('utf-8')) # Read response response = sock.recv(4096).decode('utf-8').strip() response_data = json.loads(response) sock.close() return response_data except FileNotFoundError: return {'error': 'Pipe server not running'} except Exception as e: raise # Convenience functions for quick usage async def start_pipe_server(pipe_name: str = "katamari_mcp"): """Start named pipe server with default handlers.""" transport = NamedPipeTransport(pipe_name) # Register default handlers from katamari_mcp.router.intelligent_router import IntelligentRouter router = IntelligentRouter() async def list_capabilities(params): capabilities = await router.list_capabilities() return {'capabilities': capabilities} async def call_tool(params): tool_name = params.get('tool_name') arguments = params.get('arguments', {}) result = await router.route_call(tool_name, arguments) return {'result': result} async def get_status(params): return { 'status': 'running', 'pipe_path': transport.pipe_path, 'context_size': len(transport.context_store) } transport.register_handler('list_capabilities', list_capabilities) transport.register_handler('call_tool', call_tool) transport.register_handler('get_status', get_status) await transport.start_server() async def send_pipe_request(method: str, params: Dict[str, Any] = None, pipe_name: str = "katamari_mcp") -> Dict[str, Any]: """Send request to pipe server.""" client = PipeClient(pipe_name) return await client.send_message(method, params) if __name__ == "__main__": # Example usage async def main(): print("Starting named pipe server...") await start_pipe_server() asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ciphernaut/katamari-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server