Skip to main content
Glama

MedX MCP Server

by yepdama
ARCHITECTURE_EXPLANATION.md14.6 kB
# MCP Server Architecture Explanation ## Overview: Asynchronous Streaming Architecture The MCP server uses an **asynchronous, event-driven architecture** to handle long-running AI operations efficiently. This design allows the server to: 1. Accept requests immediately (non-blocking) 2. Process AI calls in the background 3. Stream results in real-time to multiple clients 4. Handle cancellations gracefully 5. Support idempotent retries --- ## 📊 Global State Variables: Purpose & Usage ### 1. `CALLS: Dict[str, Dict[str, Any]] = {}` **Purpose**: Metadata registry for all calls **Structure:** ```python CALLS = { "call-123": { "status": "running", "tool": "openai_chat", "input": {...}, "session_id": "session-1", "result": "AI response text" # Added when finished } } ``` **Why needed:** - **Status tracking**: Know if call is pending/running/finished/error - **Result storage**: Store final output for later retrieval - **Cancellation**: Mark calls as cancelled - **Session linking**: Connect calls to sessions **Usage locations:** - Line 208: Update status to "running" - Line 248: Store final result - Line 276: Mark as cancelled - Line 392: Check if call exists for cancellation - Line 412: Retrieve session_id for session tracking --- ### 2. `EVENT_QUEUES: Dict[str, asyncio.Queue] = {}` **Purpose**: Per-call event buffers for streaming **Structure:** ```python EVENT_QUEUES = { "call-123": asyncio.Queue([ {"type": "partial", "text": "Hello"}, {"type": "partial", "text": " world"}, {"type": "final", "text": "Hello world"} ]) } ``` **Why needed:** - **Streaming**: Buffer events for SSE consumers - **Decoupling**: Separate producer (background task) from consumer (stream endpoint) - **Multiple clients**: Multiple streams can read same queue **Usage locations:** - Line 172: Create new queue for each call - Line 237: Producer writes partial tokens - Line 244: Producer writes final result - Line 338: Consumer reads from queue - Line 402: Push cancel event to queue --- ### 3. `REQUEST_ID_MAP: Dict[str, Dict[str, Any]] = {}` **Purpose**: Idempotency mapping (request_id → call_id) **Structure:** ```python REQUEST_ID_MAP = { "req-123": { "call_id": "call-456", "status": "finished" } } ``` **Why needed:** - **Idempotency**: Prevent duplicate processing of same request - **Lookup speed**: O(1) lookup for existing requests - **Status tracking**: Know if request was already processed **Usage locations:** - Line 153: Check if request_id already exists - Line 176: Register new request_id mapping - Line 268: Update status when call completes **Lifecycle:** ``` 1. Request arrives with request_id="req-123" 2. Check REQUEST_ID_MAP["req-123"] → Not found 3. Create call_id="call-456" 4. Store: REQUEST_ID_MAP["req-123"] = {"call_id": "call-456", "status": "pending"} 5. When call completes: REQUEST_ID_MAP["req-123"]["status"] = "finished" 6. Retry with same request_id: Returns existing mapping instantly! ``` --- ### 4. `SESSION_BUFFERS: Dict[str, list] = {}` **Purpose**: Conversation history per session **Structure:** ```python SESSION_BUFFERS = { "session-1": [ {"call_id": "call-1", "event": "tool_started", "tool": "openai_chat", ...}, {"call_id": "call-1", "event": "tool_finished", "output": "Hello", ...}, {"call_id": "call-2", "event": "tool_started", ...}, ... ] # Last 50 events only } ``` **Why needed:** - **Conversation context**: Track all interactions in a session - **Orchestrator support**: External systems can query session history - **Debugging**: See full conversation flow - **Analytics**: Understand user behavior patterns **Usage locations:** - Line 212: Record tool start event - Line 260: Record tool finish event - Line 283: Record cancellation event - Line 315: Record error event - Line 90-101: `_append_session_event()` manages buffer size (keeps last 50) **Buffer Management:** ```python # Line 100-101: Sliding window (FIFO) if len(buf) > SESSION_BUFFER_MAX_SIZE: # 50 del buf[:-SESSION_BUFFER_MAX_SIZE] # Remove oldest, keep newest 50 ``` --- ### 5. `TASKS: Dict[str, asyncio.Task] = {}` **Purpose**: Active asyncio task registry for cancellation **Structure:** ```python TASKS = { "call-123": <Task object at 0x...> } ``` **Why needed:** - **Cancellation**: Need Task object to call `.cancel()` - **Task lifecycle**: Track which tasks are running - **Cleanup**: Remove completed tasks from registry **Usage locations:** - Line 182: Store task when call starts - Line 324: Remove task when call completes (cleanup) - Line 398: Get task to cancel it - Line 434: Cancel all tasks **Why asyncio.Task?** ```python # Line 179-181: Create background task task = asyncio.create_task(run_tool_call(...)) # This returns a Task object we can cancel later # Line 400: Cancel the task task.cancel() # Sends CancelledError to the task ``` **Task Lifecycle:** ``` 1. Create: task = asyncio.create_task(...) 2. Store: TASKS[call_id] = task 3. Run: Task executes in background 4. Cancel (if needed): task.cancel() → raises CancelledError in task 5. Cleanup: TASKS.pop(call_id) in finally block ``` --- ## 🔄 Event Queues: Why They're Critical ### What are Event Queues? ```python EVENT_QUEUES: Dict[str, asyncio.Queue] = {} ``` Each call gets its own `asyncio.Queue` that acts as a **message buffer** between the producer (background task) and consumer (streaming endpoint). ### Why Event Queues Are Necessary #### 1. **Decoupling Producer from Consumer** ``` ┌─────────────┐ Queue ┌──────────────┐ │ Background │ ────────[ ]────────> │ SSE Stream │ │ Task │ produces events │ Endpoint │ │ (Producer) │ │ (Consumer) │ └─────────────┘ └──────────────┘ ``` **Problem without queues:** - Background task generates tokens instantly - Stream endpoint might connect later (network delay) - Tokens would be **lost** if there's no buffer **Solution with queues:** - Tokens are **buffered** in the queue - Stream endpoint can read at its own pace - No data loss even if consumer is slow #### 2. **Supporting Multiple Consumers** ```python # Multiple clients can stream the same call_id Client A → GET /stream/{call_id} Client B → GET /stream/{call_id} # Both get the same events! ``` Each queue acts as a **broadcast channel** - all stream connections reading from the same queue get identical events. #### 3. **Non-Blocking Execution** ```python # Line 237: Producer writes without blocking await EVENT_QUEUES[call_id].put({"type": "partial", "text": token}) # Line 349: Consumer reads without blocking data = await asyncio.wait_for(queue.get(), timeout=0.2) ``` **Without queues:** The background task would have to wait for the stream endpoint to be ready (blocking). **With queues:** Producer writes immediately, consumer reads when ready (non-blocking). #### 4. **Graceful Backpressure Handling** If the consumer is slow (e.g., slow network), the queue buffers tokens. If the queue gets full (unlikely with token-by-token), `queue.put()` will wait, providing natural backpressure. #### 5. **Real-Time Streaming Experience** ```python # OpenAI sends: "H" "e" "l" "l" "o" # Queue buffers: ["H", "e", "l", "l", "o"] # Stream sends: "H" → client sees "H" immediately # "e" → client sees "He" # "l" → client sees "Hel" # Each token appears in real-time as it arrives ``` --- ## 🔁 Idempotency: Why It's Essential ### What is Idempotency? **Idempotency** means: "Making the same request multiple times produces the same result." ### The Problem We Solve #### Scenario: Network Issues ``` 1. Client sends POST /mcp/execute (request_id="req-123") 2. Request reaches server ✅ 3. Server starts processing ✅ 4. Network timeout! ❌ (response lost) 5. Client retries: POST /mcp/execute (request_id="req-123") ``` **Without idempotency:** - Server creates **duplicate call** (call_id-1, call_id-2) - OpenAI API called **twice** (costs money!) - Two identical AI responses generated **With idempotency:** - Server recognizes `request_id="req-123"` already exists - Returns existing `call_id` and status - No duplicate processing, no extra costs! ### How Idempotency Works in Our Code ```python # Line 151-163: Idempotency Check if req.request_id: existing = _call_registry_service.get_existing_call(req.request_id) if existing: # Return existing result instead of creating new call return JSONResponse({ "call_id": existing["call_id"], "status": existing["status"] }) # Line 175-176: Register for Future Lookups if req.request_id: _call_registry_service.register_request_id(req.request_id, call_id) ``` ### Why Idempotency Matters for Medical AI 1. **Cost Control**: Prevents duplicate API calls (OpenAI charges per token) 2. **Consistency**: Same request = same response (critical for medical queries) 3. **Reliability**: Handles network failures gracefully 4. **User Experience**: Users can safely retry without side effects ### Request Flow with Idempotency ``` First Request: ┌─────────────────────────────────────┐ │ POST /mcp/execute │ │ {request_id: "req-123", ...} │ └─────────────┬───────────────────────┘ │ ▼ ┌─────────────────┐ │ Check REQUEST_ID│ ──→ Not found │ _MAP │ └────────┬────────┘ │ ▼ Create new call_id="abc-123" Store: REQUEST_ID_MAP["req-123"] = {"call_id": "abc-123", ...} Start processing... Retry Request (same request_id): ┌─────────────────────────────────────┐ │ POST /mcp/execute │ │ {request_id: "req-123", ...} │ └─────────────┬───────────────────────┘ │ ▼ ┌─────────────────┐ │ Check REQUEST_ID│ ──→ Found! │ _MAP │ └────────┬────────┘ │ ▼ Return existing: {"call_id": "abc-123", "status": "finished"} ✅ No duplicate processing! ``` --- ## 🔄 Complete Request Flow ### Example: Medical Query Flow ``` 1. Client Request: POST /mcp/execute { "tool": "openai_chat", "input": {"messages": [{"role": "user", "content": "What is anemia?"}]}, "session_id": "patient-123", "request_id": "req-456" } 2. Server Processing (execute endpoint): ├─ Check idempotency: REQUEST_ID_MAP["req-456"] → Not found ├─ Create call_id: "call-789" ├─ Initialize: CALLS["call-789"] = {"status": "pending", ...} ├─ Create queue: EVENT_QUEUES["call-789"] = asyncio.Queue() ├─ Store mapping: REQUEST_ID_MAP["req-456"] = {"call_id": "call-789", ...} └─ Return: {"call_id": "call-789", "status": "started"} 3. Background Task (run_tool_call): ├─ Update status: CALLS["call-789"]["status"] = "running" ├─ Session event: SESSION_BUFFERS["patient-123"].append({"event": "tool_started", ...}) ├─ Call OpenAI API (streaming) ├─ For each token: │ └─ Put in queue: EVENT_QUEUES["call-789"].put({"type": "partial", "text": token}) ├─ Final event: EVENT_QUEUES["call-789"].put({"type": "final", "text": "Anemia is..."}) ├─ Update: CALLS["call-789"]["status"] = "finished" ├─ Store result: CALLS["call-789"]["result"] = "Anemia is..." ├─ Session event: SESSION_BUFFERS["patient-123"].append({"event": "tool_finished", ...}) └─ Update idempotency: REQUEST_ID_MAP["req-456"]["status"] = "finished" 4. Client Streaming (separate connection): GET /mcp/stream/call-789 ├─ Get queue: EVENT_QUEUES["call-789"] ├─ Poll queue: queue.get() → {"type": "partial", "text": "Anemia"} ├─ Send SSE: event: partial, data: {"type": "partial", "text": "Anemia"} ├─ Poll queue: queue.get() → {"type": "partial", "text": " is..."} ├─ Send SSE: event: partial, data: {"type": "partial", "text": " is..."} ├─ Poll queue: queue.get() → {"type": "final", "text": "Anemia is..."} ├─ Send SSE: event: final, data: {"type": "final", "text": "Anemia is..."} └─ Close stream 5. Retry Scenario (same request_id): POST /mcp/execute {"request_id": "req-456", ...} ├─ Check: REQUEST_ID_MAP["req-456"] → Found! └─ Return: {"call_id": "call-789", "status": "finished"} ✅ No duplicate processing! ``` --- ## 🎯 Design Decisions Summary ### Why This Architecture? 1. **Asynchronous**: Don't block HTTP requests waiting for slow AI responses 2. **Streaming**: Real-time token delivery improves UX 3. **Idempotent**: Handle network failures gracefully 4. **Cancellable**: Users can cancel long-running operations 5. **Scalable**: Background tasks can process multiple calls concurrently 6. **Traceable**: Session buffers enable conversation tracking ### Production Considerations **Current (POC):** - All state in memory (lost on restart) - Single process (not distributed) - No persistence **Production Would Need:** - Redis/database for state persistence - Distributed locks for concurrent access - TTL/cleanup for old state - Multiple worker processes - Monitoring and metrics --- ## Key Takeaways 1. **Event Queues** = Decoupling + Buffering + Real-time streaming 2. **Idempotency** = Cost control + Reliability + User experience 3. **CALLS** = Metadata + Status tracking + Results storage 4. **EVENT_QUEUES** = Streaming buffers + Producer-consumer pattern 5. **REQUEST_ID_MAP** = Idempotency lookup + Duplicate prevention 6. **SESSION_BUFFERS** = Conversation history + Context tracking 7. **TASKS** = Cancellation support + Task lifecycle management All these work together to create a robust, scalable, production-ready MCP server! 🚀

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/yepdama/medical-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server