named_pipe.py•12.1 kB
"""
Named Pipe Transport Layer - Stretch Goal Implementation
Provides faster startup and persistent context through named pipes.
Cross-platform support for Windows, Linux, and macOS.
"""
import asyncio
import os
import sys
import json
import logging
from typing import Dict, Any, Optional, Callable
from pathlib import Path
from datetime import datetime
logger = logging.getLogger(__name__)
class NamedPipeTransport:
"""Named pipe transport for MCP communication."""
def __init__(self, pipe_name: str = "katamari_mcp"):
self.pipe_name = pipe_name
self.pipe_path = self._get_pipe_path()
self.server = None
self.clients = []
self.message_handlers: Dict[str, Callable] = {}
self.context_store = {} # Persistent context between calls
def _get_pipe_path(self) -> str:
"""Get platform-specific pipe path."""
if sys.platform == "win32":
return f"\\\\.\\pipe\\{self.pipe_name}"
else:
# Use socket file for Unix-like systems
return f"/tmp/{self.pipe_name}.sock"
async def start_server(self):
"""Start the named pipe server."""
try:
# Clean up existing pipe/socket
if os.path.exists(self.pipe_path):
os.unlink(self.pipe_path)
if sys.platform == "win32":
await self._start_windows_server()
else:
await self._start_unix_server()
logger.info(f"Named pipe server started on {self.pipe_path}")
except Exception as e:
logger.error(f"Failed to start named pipe server: {e}")
raise
async def _start_windows_server(self):
"""Start Windows named pipe server."""
import win32pipe
import win32file
import win32api
while True:
try:
# Create named pipe
pipe = win32pipe.CreateNamedPipe(
self.pipe_path,
win32pipe.PIPE_ACCESS_DUPLEX,
win32pipe.PIPE_TYPE_MESSAGE | win32pipe.PIPE_WAIT,
win32pipe.PIPE_UNLIMITED_INSTANCES,
4096, # out buffer
4096, # in buffer
0, # timeout
None # security
)
# Wait for client connection
win32pipe.ConnectNamedPipe(pipe, None)
# Handle client in separate task
asyncio.create_task(self._handle_windows_client(pipe))
except Exception as e:
logger.error(f"Windows pipe server error: {e}")
await asyncio.sleep(1)
async def _start_unix_server(self):
"""Start Unix domain socket server."""
import socket
self.server = await asyncio.start_unix_server(
self._handle_unix_client,
self.pipe_path
)
async with self.server:
await self.server.serve_forever()
async def _handle_windows_client(self, pipe):
"""Handle Windows named pipe client."""
import win32file
import win32pipe
try:
while True:
# Read message from client
result, data = win32file.ReadFile(pipe, 4096)
if result == 0: # client disconnected
break
message = data.decode('utf-8').strip()
if message:
response = await self._process_message(message)
# Send response
win32file.WriteFile(pipe, response.encode('utf-8'))
except Exception as e:
logger.error(f"Windows client handler error: {e}")
finally:
win32file.CloseHandle(pipe)
async def _handle_unix_client(self, reader, writer):
"""Handle Unix domain socket client."""
client_addr = writer.get_extra_info('peername')
logger.info(f"Client connected: {client_addr}")
try:
while True:
# Read message from client
data = await reader.read(4096)
if not data: # client disconnected
break
message = data.decode('utf-8').strip()
if message:
response = await self._process_message(message)
# Send response
writer.write(response.encode('utf-8'))
await writer.drain()
except Exception as e:
logger.error(f"Unix client handler error: {e}")
finally:
writer.close()
await writer.wait_closed()
logger.info(f"Client disconnected: {client_addr}")
async def _process_message(self, message: str) -> str:
"""Process incoming message and return response."""
try:
data = json.loads(message)
method = data.get('method')
params = data.get('params', {})
message_id = data.get('id')
# Add context to params
if 'context' not in params:
params['context'] = self.context_store
# Route to appropriate handler
if method in self.message_handlers:
result = await self.message_handlers[method](params)
# Update context if provided
if 'context' in result:
self.context_store.update(result['context'])
response = {
'id': message_id,
'result': result,
'timestamp': datetime.now().isoformat()
}
else:
response = {
'id': message_id,
'error': f"Unknown method: {method}",
'timestamp': datetime.now().isoformat()
}
except json.JSONDecodeError:
response = {
'error': 'Invalid JSON message',
'timestamp': datetime.now().isoformat()
}
except Exception as e:
response = {
'error': str(e),
'timestamp': datetime.now().isoformat()
}
return json.dumps(response) + '\n'
def register_handler(self, method: str, handler: Callable):
"""Register message handler for specific method."""
self.message_handlers[method] = handler
logger.info(f"Registered handler for method: {method}")
def get_context(self, key: str, default: Any = None) -> Any:
"""Get value from context store."""
return self.context_store.get(key, default)
def set_context(self, key: str, value: Any):
"""Set value in context store."""
self.context_store[key] = value
def clear_context(self):
"""Clear all context."""
self.context_store.clear()
class PipeClient:
"""Lightweight client for named pipe communication."""
def __init__(self, pipe_name: str = "katamari_mcp"):
self.pipe_name = pipe_name
self.pipe_path = self._get_pipe_path()
def _get_pipe_path(self) -> str:
"""Get platform-specific pipe path."""
if sys.platform == "win32":
return f"\\\\.\\pipe\\{self.pipe_name}"
else:
return f"/tmp/{self.pipe_name}.sock"
async def send_message(self, method: str, params: Dict[str, Any] = None) -> Dict[str, Any]:
"""Send message to pipe server and get response."""
if params is None:
params = {}
message = {
'method': method,
'params': params,
'id': int(datetime.now().timestamp() * 1000) # unique ID
}
try:
if sys.platform == "win32":
return await self._send_windows_message(message)
else:
return await self._send_unix_message(message)
except Exception as e:
logger.error(f"Failed to send message: {e}")
return {'error': str(e)}
async def _send_windows_message(self, message: Dict) -> Dict[str, Any]:
"""Send message via Windows named pipe."""
import win32file
import win32pipe
import win32api
try:
# Connect to pipe
handle = win32file.CreateFile(
self.pipe_path,
win32file.GENERIC_READ | win32file.GENERIC_WRITE,
0,
None,
win32file.OPEN_EXISTING,
0,
None
)
# Send message
message_str = json.dumps(message) + '\n'
win32file.WriteFile(handle, message_str.encode('utf-8'))
# Read response
result, response = win32file.ReadFile(handle, 4096)
response_data = json.loads(response.decode('utf-8').strip())
win32file.CloseHandle(handle)
return response_data
except Exception as e:
if win32api.GetLastError() == 2: # ERROR_FILE_NOT_FOUND
return {'error': 'Pipe server not running'}
raise
async def _send_unix_message(self, message: Dict) -> Dict[str, Any]:
"""Send message via Unix domain socket."""
import socket
try:
# Create socket and connect
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(self.pipe_path)
# Send message
message_str = json.dumps(message) + '\n'
sock.send(message_str.encode('utf-8'))
# Read response
response = sock.recv(4096).decode('utf-8').strip()
response_data = json.loads(response)
sock.close()
return response_data
except FileNotFoundError:
return {'error': 'Pipe server not running'}
except Exception as e:
raise
# Convenience functions for quick usage
async def start_pipe_server(pipe_name: str = "katamari_mcp"):
"""Start named pipe server with default handlers."""
transport = NamedPipeTransport(pipe_name)
# Register default handlers
from katamari_mcp.router.intelligent_router import IntelligentRouter
router = IntelligentRouter()
async def list_capabilities(params):
capabilities = await router.list_capabilities()
return {'capabilities': capabilities}
async def call_tool(params):
tool_name = params.get('tool_name')
arguments = params.get('arguments', {})
result = await router.route_call(tool_name, arguments)
return {'result': result}
async def get_status(params):
return {
'status': 'running',
'pipe_path': transport.pipe_path,
'context_size': len(transport.context_store)
}
transport.register_handler('list_capabilities', list_capabilities)
transport.register_handler('call_tool', call_tool)
transport.register_handler('get_status', get_status)
await transport.start_server()
async def send_pipe_request(method: str, params: Dict[str, Any] = None,
pipe_name: str = "katamari_mcp") -> Dict[str, Any]:
"""Send request to pipe server."""
client = PipeClient(pipe_name)
return await client.send_message(method, params)
if __name__ == "__main__":
# Example usage
async def main():
print("Starting named pipe server...")
await start_pipe_server()
asyncio.run(main())