Skip to main content
Glama

Optimized Memory MCP Server V2

by AgentWong
stdio.py2.89 kB
""" Stdio Server Transport Module This module provides functionality for creating an stdio-based transport layer that can be used to communicate with an MCP client through standard input/output streams. Example usage: ``` async def run_server(): async with stdio_server() as (read_stream, write_stream): # read_stream contains incoming JSONRPCMessages from stdin # write_stream allows sending JSONRPCMessages to stdout server = await create_my_server() await server.run(read_stream, write_stream, init_options) anyio.run(run_server) ``` """ import sys from contextlib import asynccontextmanager import anyio import anyio.lowlevel from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream import mcp.types as types @asynccontextmanager async def stdio_server( stdin: anyio.AsyncFile[str] | None = None, stdout: anyio.AsyncFile[str] | None = None, ): """ Server transport for stdio: this communicates with an MCP client by reading from the current process' stdin and writing to stdout. """ # Purposely not using context managers for these, as we don't want to close # standard process handles. if not stdin: stdin = anyio.wrap_file(sys.stdin) if not stdout: stdout = anyio.wrap_file(sys.stdout) read_stream: MemoryObjectReceiveStream[types.JSONRPCMessage | Exception] read_stream_writer: MemoryObjectSendStream[types.JSONRPCMessage | Exception] write_stream: MemoryObjectSendStream[types.JSONRPCMessage] write_stream_reader: MemoryObjectReceiveStream[types.JSONRPCMessage] read_stream_writer, read_stream = anyio.create_memory_object_stream(0) write_stream, write_stream_reader = anyio.create_memory_object_stream(0) async def stdin_reader(): try: async with read_stream_writer: async for line in stdin: try: message = types.JSONRPCMessage.model_validate_json(line) except Exception as exc: await read_stream_writer.send(exc) continue await read_stream_writer.send(message) except anyio.ClosedResourceError: await anyio.lowlevel.checkpoint() async def stdout_writer(): try: async with write_stream_reader: async for message in write_stream_reader: json = message.model_dump_json(by_alias=True, exclude_none=True) await stdout.write(json + "\n") await stdout.flush() except anyio.ClosedResourceError: await anyio.lowlevel.checkpoint() async with anyio.create_task_group() as tg: tg.start_soon(stdin_reader) tg.start_soon(stdout_writer) yield read_stream, write_stream

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/AgentWong/optimized-memory-mcp-serverv2'

If you have feedback or need assistance with the MCP directory API, please join our Discord server