stdio.py•2.82 kB
"""
Stdio transport implementation for MCP compatibility.
"""
import asyncio
import json
import sys
from typing import AsyncGenerator
import uuid
from .base import BaseTransport, TransportMessage
class StdioTransport(BaseTransport):
"""Stdio transport for MCP compatibility."""
def __init__(self, config=None):
super().__init__(config)
self.reader = None
self.writer = None
self.connection_id = f"stdio-{uuid.uuid4().hex[:8]}"
async def start(self) -> None:
"""Start stdio transport."""
self.reader = asyncio.StreamReader()
# Create a simple writer for stdout
class StdoutWriter:
def write(self, data):
sys.stdout.buffer.write(data)
async def drain(self):
sys.stdout.flush()
def close(self):
pass
async def wait_closed(self):
pass
self.writer = StdoutWriter()
self.is_running = True
async def stop(self) -> None:
"""Stop stdio transport."""
self.is_running = False
if self.writer:
self.writer.close()
await self.writer.wait_closed()
async def send_message(self, message: TransportMessage) -> None:
"""Send message via stdout."""
if not self.is_running or not self.writer:
return
try:
data = message.model_dump_json(exclude_none=True)
self.writer.write(data.encode('utf-8') + b'\n')
await self.writer.drain()
except Exception as e:
print(f"Error sending message: {e}", file=sys.stderr)
async def receive_messages(self) -> AsyncGenerator[TransportMessage, None]:
"""Receive messages from stdin."""
if not self.is_running:
return
try:
while self.is_running:
line = await asyncio.get_event_loop().run_in_executor(
None, sys.stdin.readline
)
if not line:
break
line = line.strip()
if not line:
continue
try:
data = json.loads(line)
message = TransportMessage(**data)
yield message
except json.JSONDecodeError as e:
print(f"Invalid JSON: {e}", file=sys.stderr)
continue
except Exception as e:
print(f"Message parsing error: {e}", file=sys.stderr)
continue
except Exception as e:
print(f"Receive error: {e}", file=sys.stderr)