Skip to main content
Glama

Katamari MCP Server

by ciphernaut
stdio.py2.82 kB
""" Stdio transport implementation for MCP compatibility. """ import asyncio import json import sys from typing import AsyncGenerator import uuid from .base import BaseTransport, TransportMessage class StdioTransport(BaseTransport): """Stdio transport for MCP compatibility.""" def __init__(self, config=None): super().__init__(config) self.reader = None self.writer = None self.connection_id = f"stdio-{uuid.uuid4().hex[:8]}" async def start(self) -> None: """Start stdio transport.""" self.reader = asyncio.StreamReader() # Create a simple writer for stdout class StdoutWriter: def write(self, data): sys.stdout.buffer.write(data) async def drain(self): sys.stdout.flush() def close(self): pass async def wait_closed(self): pass self.writer = StdoutWriter() self.is_running = True async def stop(self) -> None: """Stop stdio transport.""" self.is_running = False if self.writer: self.writer.close() await self.writer.wait_closed() async def send_message(self, message: TransportMessage) -> None: """Send message via stdout.""" if not self.is_running or not self.writer: return try: data = message.model_dump_json(exclude_none=True) self.writer.write(data.encode('utf-8') + b'\n') await self.writer.drain() except Exception as e: print(f"Error sending message: {e}", file=sys.stderr) async def receive_messages(self) -> AsyncGenerator[TransportMessage, None]: """Receive messages from stdin.""" if not self.is_running: return try: while self.is_running: line = await asyncio.get_event_loop().run_in_executor( None, sys.stdin.readline ) if not line: break line = line.strip() if not line: continue try: data = json.loads(line) message = TransportMessage(**data) yield message except json.JSONDecodeError as e: print(f"Invalid JSON: {e}", file=sys.stderr) continue except Exception as e: print(f"Message parsing error: {e}", file=sys.stderr) continue except Exception as e: print(f"Receive error: {e}", file=sys.stderr)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ciphernaut/katamari-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server