Skip to main content
Glama
kdkiss

Stepstone Job Search MCP Server

by kdkiss
stdio_transport.py5.15 kB
"""Adaptive stdio transport that supports both newline and Content-Length framing.""" from __future__ import annotations import sys from contextlib import asynccontextmanager from typing import Literal import anyio from anyio.streams.memory import ( MemoryObjectReceiveStream, MemoryObjectSendStream, ) from io import TextIOWrapper import mcp.types as types from mcp.shared.message import SessionMessage FramingMode = Literal["newline", "content-length"] @asynccontextmanager async def adaptive_stdio_server(): """Create a stdio transport that accepts both framing strategies.""" stdin = anyio.wrap_file(TextIOWrapper(sys.stdin.buffer, encoding="utf-8")) stdout = anyio.wrap_file(TextIOWrapper(sys.stdout.buffer, encoding="utf-8")) read_stream_writer: MemoryObjectSendStream[SessionMessage | Exception] read_stream: MemoryObjectReceiveStream[SessionMessage | Exception] write_stream: MemoryObjectSendStream[SessionMessage] write_stream_reader: MemoryObjectReceiveStream[SessionMessage] read_stream_writer, read_stream = anyio.create_memory_object_stream(0) write_stream, write_stream_reader = anyio.create_memory_object_stream(0) framing_mode: dict[str, FramingMode] = {} framing_event = anyio.Event() async def stdin_reader() -> None: async with read_stream_writer: async for line in stdin: payload: str | None = None if line.lower().startswith("content-length:"): headers = [line] content_length: int | None = None while True: next_line = await stdin.readline() if next_line in ("", "\n", "\r\n"): break headers.append(next_line) for header_line in headers: if ":" not in header_line: continue name, value = header_line.split(":", 1) if name.strip().lower() == "content-length": try: content_length = int(value.strip()) except ValueError: # pragma: no cover - invalid header value content_length = None break if content_length is None: await read_stream_writer.send(ValueError("Missing Content-Length header")) continue remaining = content_length chunks: list[str] = [] while remaining > 0: chunk = await stdin.read(remaining) if chunk == "": # EOF before full body break chunks.append(chunk) remaining -= len(chunk) body = "".join(chunks) if len(body) != content_length: await read_stream_writer.send(ValueError("Incomplete Content-Length body")) continue payload = body if not framing_event.is_set(): framing_mode["value"] = "content-length" framing_event.set() else: stripped = line.strip() if not stripped: continue payload = stripped if not framing_event.is_set(): framing_mode["value"] = "newline" framing_event.set() try: message = types.JSONRPCMessage.model_validate_json(payload) except Exception as exc: # pragma: no cover - invalid payload await read_stream_writer.send(exc) continue await read_stream_writer.send(SessionMessage(message)) async def stdout_writer() -> None: async with write_stream_reader: async for session_message in write_stream_reader: message_json = session_message.message.model_dump_json( by_alias=True, exclude_none=True ) if not framing_event.is_set(): framing_mode["value"] = "content-length" framing_event.set() mode = framing_mode.get("value", "content-length") try: if mode == "content-length": header = f"Content-Length: {len(message_json.encode('utf-8'))}\r\n\r\n" await stdout.write(header + message_json) else: await stdout.write(message_json + "\n") await stdout.flush() except anyio.ClosedResourceError: # pragma: no cover - closed pipe break async with anyio.create_task_group() as tg: tg.start_soon(stdin_reader) tg.start_soon(stdout_writer) yield read_stream, write_stream

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kdkiss/mcp-stepstone'

If you have feedback or need assistance with the MCP directory API, please join our Discord server