Skip to main content
Glama
stdio_server.py30.9 kB
#!/usr/bin/env python3 """ Stdio-based MCP server for Omnispindle using FastMCP. This module provides a standard input/output transport layer for the MCP protocol, allowing the Omnispindle tools to be used by Claude Desktop and other MCP clients that expect stdio communication. Usage: python -m src.Omnispindle.stdio_server """ import asyncio import logging import os import sys from typing import Dict, Any, Optional, Annotated from pydantic import Field from jose import jwt from jose.exceptions import JWTError from .auth_flow import ensure_authenticated, run_async_in_thread from .auth_utils import verify_auth0_token, get_jwks, AUTH_CONFIG from fastmcp import FastMCP from .context import Context from . import tools from .documentation_manager import get_tool_doc, build_tool_docstring # Configure logging to stderr so it doesn't interfere with stdio protocol logging.basicConfig( stream=sys.stderr, level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Tool loadout configurations - same as FastAPI server TOOL_LOADOUTS = { "full": [ "add_todo", "query_todos", "update_todo", "delete_todo", "get_todo", "mark_todo_complete", "list_todos_by_status", "search_todos", "list_project_todos", "add_lesson", "get_lesson", "update_lesson", "delete_lesson", "search_lessons", "grep_lessons", "list_lessons", "query_todo_logs", "list_projects", "explain", "add_explanation", "point_out_obvious", "bring_your_own", "inventorium_sessions_list", "inventorium_sessions_get", "inventorium_sessions_create", "inventorium_sessions_spawn", "inventorium_sessions_fork", "inventorium_sessions_genealogy", "inventorium_sessions_tree", "inventorium_todos_link_session" ], "basic": [ "add_todo", "query_todos", "update_todo", "get_todo", "mark_todo_complete", "list_todos_by_status", "list_project_todos" ], "minimal": [ "add_todo", "query_todos", "get_todo", "mark_todo_complete" ], "lessons": [ "add_lesson", "get_lesson", "update_lesson", "delete_lesson", "search_lessons", "grep_lessons", "list_lessons" ], "admin": [ "query_todos", "update_todo", "delete_todo", "query_todo_logs", "list_projects", "explain", "add_explanation" "query_todos", "update_todo", "delete_todo", "query_todo_logs", "list_projects", "explain", "add_explanation", "inventorium_sessions_list", "inventorium_sessions_get", "inventorium_sessions_create", "inventorium_sessions_fork", "inventorium_sessions_genealogy", "inventorium_sessions_tree", "inventorium_todos_link_session" ] } async def verify_auth0_token(token: str) -> Optional[Dict[str, Any]]: """Verifies an Auth0 token and returns the payload.""" try: unverified_header = jwt.get_unverified_header(token) jwks = get_jwks() rsa_key = {} for key in jwks["keys"]: if key["kid"] == unverified_header["kid"]: rsa_key = { "kty": key["kty"], "kid": key["kid"], "use": key["use"], "n": key["n"], "e": key["e"], } break if not rsa_key: logger.error("Unable to find appropriate key in JWKS") return None payload = jwt.decode( token, rsa_key, algorithms=["RS256"], audience=AUTH_CONFIG.audience, issuer=f"https://{AUTH_CONFIG.domain}/", ) return payload except JWTError as e: logger.error(f"JWT Error: {e}") return None except Exception as e: logger.error(f"An unexpected error occurred during token verification: {e}") return None def _create_context() -> Context: """Create a context object with REQUIRED environment-based user information.""" # Priority 1: Auth0 Token auth0_token = os.getenv("AUTH0_TOKEN") # If no token, trigger browser-based authentication if not auth0_token: logger.info("No AUTH0_TOKEN found, initiating browser-based authentication...") try: # Use run_async_in_thread to handle the async ensure_authenticated call def sync_ensure_auth(): import asyncio try: loop = asyncio.get_running_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) return loop.run_until_complete(ensure_authenticated()) auth0_token = sync_ensure_auth() logger.info("✅ Browser authentication successful!") except Exception as e: logger.error(f"Authentication failed: {e}") # Fall back to other methods pass if auth0_token: logger.info("🔐 Found AUTH0_TOKEN, attempting verification...") # Note: This is a blocking call in an async context. # For stdio server, this is acceptable as it runs once at the start # of each tool call. user_payload = {} async def verify_token_async(): nonlocal user_payload payload = await verify_auth0_token(auth0_token) if payload: user_payload.update(payload) run_async_in_thread(verify_token_async()) if user_payload: user_payload["auth_method"] = "auth0" user_payload["access_token"] = auth0_token logger.info(f"Authenticated via Auth0: {user_payload.get('sub')}") return Context(user=user_payload) else: logger.error("Auth0 token verification failed. Falling back.") # Check for API key first, then fall back to email/user_id api_key = os.getenv("MCP_API_KEY") user_email = os.getenv("MCP_USER_EMAIL") user_id = os.getenv("MCP_USER_ID") if api_key: # Use API key authentication - we'll trust the API key format validation # This allows using API keys from the Inventorium dashboard logger.info(f"🔐 Using API key authentication: {api_key[:12]}...") user = { "email": "api-key-user", # Placeholder - real validation would happen server-side "sub": api_key[:16], # Use key prefix as identifier "auth_method": "api_key", "api_key": api_key } return Context(user=user) if not user_email and not user_id: logger.error("❌ Authentication required for STDIO MCP server") logger.error("💡 Setup authentication with: python -m src.Omnispindle auth --setup") logger.error("🔑 Or manually set: MCP_USER_EMAIL, MCP_USER_ID, or MCP_API_KEY environment variables") logger.error("🔑 Alternatively, provide an AUTH0_TOKEN for secure authentication.") raise ValueError( "Authentication required: MCP_USER_EMAIL, MCP_USER_ID, or MCP_API_KEY must be set. " "Run 'python -m src.Omnispindle auth --setup' to configure authentication." ) user = { "email": user_email, "sub": user_id or user_email, # Use email as fallback ID "auth_method": "environment" } logger.info(f"🔐 Authenticated user: {user_email or user_id}") return Context(user=user) class OmniSpindleStdioServer: """Stdio-based MCP server for Omnispindle tools using FastMCP.""" def __init__(self): self.server = FastMCP(name="omnispindle") self._register_tools() logger.info("OmniSpindleStdioServer initialized with FastMCP") def _register_tools(self): """Register tools based on OMNISPINDLE_TOOL_LOADOUT env var.""" loadout = os.getenv("OMNISPINDLE_TOOL_LOADOUT", "full").lower() if loadout not in TOOL_LOADOUTS: logger.warning(f"Unknown loadout '{loadout}', using 'full'") loadout = "full" enabled = TOOL_LOADOUTS[loadout] logger.info(f"Loading '{loadout}' loadout: {enabled}") # Tool registry with loadout-aware documentation tool_registry = { "add_todo": { "func": tools.add_todo, "doc": get_tool_doc("add_todo") }, "query_todos": { "func": tools.query_todos, "doc": get_tool_doc("query_todos") }, "update_todo": { "func": tools.update_todo, "doc": get_tool_doc("update_todo") }, "delete_todo": { "func": tools.delete_todo, "doc": get_tool_doc("delete_todo") }, "get_todo": { "func": tools.get_todo, "doc": get_tool_doc("get_todo") }, "mark_todo_complete": { "func": tools.mark_todo_complete, "doc": get_tool_doc("mark_todo_complete") }, "list_todos_by_status": { "func": tools.list_todos_by_status, "doc": get_tool_doc("list_todos_by_status") }, "search_todos": { "func": tools.search_todos, "doc": get_tool_doc("search_todos") }, "list_project_todos": { "func": tools.list_project_todos, "doc": get_tool_doc("list_project_todos") }, "add_lesson": { "func": tools.add_lesson, "doc": get_tool_doc("add_lesson") }, "get_lesson": { "func": tools.get_lesson, "doc": get_tool_doc("get_lesson") }, "update_lesson": { "func": tools.update_lesson, "doc": get_tool_doc("update_lesson") }, "delete_lesson": { "func": tools.delete_lesson, "doc": get_tool_doc("delete_lesson") }, "search_lessons": { "func": tools.search_lessons, "doc": get_tool_doc("search_lessons") }, "grep_lessons": { "func": tools.grep_lessons, "doc": get_tool_doc("grep_lessons") }, "list_lessons": { "func": tools.list_lessons, "doc": get_tool_doc("list_lessons") }, "query_todo_logs": { "func": tools.query_todo_logs, "doc": get_tool_doc("query_todo_logs") }, "list_projects": { "func": tools.list_projects, "doc": get_tool_doc("list_projects") }, "explain": { "func": tools.explain_tool, "doc": get_tool_doc("explain") }, "add_explanation": { "func": tools.add_explanation, "doc": get_tool_doc("add_explanation") }, "point_out_obvious": { "func": tools.point_out_obvious, "doc": get_tool_doc("point_out_obvious") }, "bring_your_own": { "func": tools.bring_your_own, "doc": get_tool_doc("bring_your_own") }, "inventorium_sessions_list": { "func": tools.inventorium_sessions_list, "doc": get_tool_doc("inventorium_sessions_list") }, "inventorium_sessions_get": { "func": tools.inventorium_sessions_get, "doc": get_tool_doc("inventorium_sessions_get") }, "inventorium_sessions_create": { "func": tools.inventorium_sessions_create, "doc": get_tool_doc("inventorium_sessions_create") }, "inventorium_sessions_spawn": { "func": tools.inventorium_sessions_spawn, "doc": get_tool_doc("inventorium_sessions_spawn") }, "inventorium_todos_link_session": { "func": tools.inventorium_todos_link_session, "doc": get_tool_doc("inventorium_todos_link_session") }, "inventorium_sessions_fork": { "func": tools.inventorium_sessions_fork, "doc": get_tool_doc("inventorium_sessions_fork") }, "inventorium_sessions_genealogy": { "func": tools.inventorium_sessions_genealogy, "doc": get_tool_doc("inventorium_sessions_genealogy") }, "inventorium_sessions_tree": { "func": tools.inventorium_sessions_tree, "doc": get_tool_doc("inventorium_sessions_tree") } } # Register enabled tools dynamically for tool_name in enabled: if tool_name in tool_registry: tool_info = tool_registry[tool_name] # Create dynamic tool function with proper signature def make_tool(name, func, docstring): def create_wrapper(): if name == "add_todo": @self.server.tool() async def add_todo( description: Annotated[str, Field(description="Task description")], project: Annotated[str, Field(description="Project name")], priority: Annotated[str, Field(description="Critical|High|Medium|Low")] = "Medium", target_agent: Annotated[str, Field(description="user|AI name")] = "user", metadata: Annotated[Optional[Dict[str, Any]], Field(description="{key: value} pairs")] = None ) -> str: """Create task. Returns ID and project stats.""" ctx = _create_context() return await func(description, project, priority, target_agent, metadata, ctx=ctx) return add_todo elif name == "query_todos": @self.server.tool() async def query_todos( filter: Annotated[Optional[Dict[str, Any]], Field(description="{status: 'pending', project: 'name'}")] = None, projection: Annotated[Optional[Dict[str, Any]], Field(description="{field: 1} to include")] = None, limit: Annotated[int, Field(description="Max results")] = 100, ctx: Annotated[Optional[str], Field(description="Additional context")] = None ) -> str: """Query with MongoDB filters. Ex: {status: 'pending', project: 'name'}""" context = _create_context() return await func(filter, projection, limit, ctx=context) return query_todos elif name == "update_todo": @self.server.tool() async def update_todo( todo_id: Annotated[str, Field(description="Todo ID")], updates: Annotated[dict, Field(description="{field: new_value}")] ) -> str: """Update todo. Fields: description, priority, status, metadata.""" ctx = _create_context() return await func(todo_id, updates, ctx=ctx) return update_todo elif name == "delete_todo": @self.server.tool() async def delete_todo(todo_id: str) -> str: ctx = _create_context() return await func(todo_id, ctx=ctx) delete_todo.__doc__ = docstring return delete_todo elif name == "get_todo": @self.server.tool() async def get_todo( todo_id: Annotated[str, Field(description="Todo ID")] ) -> str: """Get todo by ID""" ctx = _create_context() return await func(todo_id, ctx=ctx) return get_todo elif name == "mark_todo_complete": @self.server.tool() async def mark_todo_complete( todo_id: Annotated[str, Field(description="Todo ID")], comment: Annotated[Optional[str], Field(description="Optional completion comment")] = None ) -> str: """Mark completed. Optional comment.""" ctx = _create_context() return await func(todo_id, comment, ctx=ctx) return mark_todo_complete elif name == "list_todos_by_status": @self.server.tool() async def list_todos_by_status( status: Annotated[str, Field(description="pending|completed|initial|blocked|in_progress")], limit: Annotated[int, Field(description="Max results")] = 100 ) -> str: """List by status: pending|completed|initial|blocked|in_progress""" ctx = _create_context() return await func(status, limit, ctx=ctx) return list_todos_by_status elif name == "search_todos": @self.server.tool() async def search_todos(query: str, fields: Optional[list] = None, limit: int = 100, ctx: Optional[str] = None) -> str: context = _create_context() return await func(query, fields, limit, ctx=context) search_todos.__doc__ = docstring return search_todos elif name == "list_project_todos": @self.server.tool() async def list_project_todos( project: Annotated[str, Field(description="Project name")], limit: Annotated[int, Field(description="Max results")] = 5 ) -> str: """List recent pending todos for project""" ctx = _create_context() return await func(project, limit, ctx=ctx) return list_project_todos elif name == "add_lesson": @self.server.tool() async def add_lesson(language: str, topic: str, lesson_learned: str, tags: Optional[list] = None) -> str: ctx = _create_context() return await func(language, topic, lesson_learned, tags, ctx=ctx) add_lesson.__doc__ = docstring return add_lesson elif name == "get_lesson": @self.server.tool() async def get_lesson(lesson_id: str) -> str: ctx = _create_context() return await func(lesson_id, ctx=ctx) get_lesson.__doc__ = docstring return get_lesson elif name == "update_lesson": @self.server.tool() async def update_lesson(lesson_id: str, updates: dict) -> str: ctx = _create_context() return await func(lesson_id, updates, ctx=ctx) update_lesson.__doc__ = docstring return update_lesson elif name == "delete_lesson": @self.server.tool() async def delete_lesson(lesson_id: str) -> str: ctx = _create_context() return await func(lesson_id, ctx=ctx) delete_lesson.__doc__ = docstring return delete_lesson elif name == "search_lessons": @self.server.tool() async def search_lessons(query: str, fields: Optional[list] = None, limit: int = 100) -> str: ctx = _create_context() return await func(query, fields, limit, ctx=ctx) search_lessons.__doc__ = docstring return search_lessons elif name == "grep_lessons": @self.server.tool() async def grep_lessons(pattern: str, limit: int = 20) -> str: ctx = _create_context() return await func(pattern, limit, ctx=ctx) grep_lessons.__doc__ = docstring return grep_lessons elif name == "list_lessons": @self.server.tool() async def list_lessons(limit: int = 100) -> str: ctx = _create_context() return await func(limit, ctx=ctx) list_lessons.__doc__ = docstring return list_lessons elif name == "query_todo_logs": @self.server.tool() async def query_todo_logs(filter_type: str = 'all', project: str = 'all', page: int = 1, page_size: int = 20) -> str: ctx = _create_context() return await func(filter_type, project, page, page_size, ctx=ctx) query_todo_logs.__doc__ = docstring return query_todo_logs elif name == "list_projects": @self.server.tool() async def list_projects(include_details: bool = False, madness_root: str = "/Users/d.edens/lab/madness_interactive") -> str: ctx = _create_context() return await func(include_details, madness_root, ctx=ctx) list_projects.__doc__ = docstring return list_projects elif name == "explain": @self.server.tool() async def explain(topic: str) -> str: ctx = _create_context() return await func(topic, ctx=ctx) explain.__doc__ = docstring return explain elif name == "add_explanation": @self.server.tool() async def add_explanation(topic: str, content: str, kind: str = "concept", author: str = "system") -> str: ctx = _create_context() return await func(topic, content, kind, author, ctx=ctx) add_explanation.__doc__ = docstring return add_explanation elif name == "point_out_obvious": @self.server.tool() async def point_out_obvious(observation: str, sarcasm_level: int = 5) -> str: ctx = _create_context() return await func(observation, sarcasm_level, ctx=ctx) point_out_obvious.__doc__ = docstring return point_out_obvious elif name == "bring_your_own": @self.server.tool() async def bring_your_own(tool_name: str, code: str, runtime: str = "python", timeout: int = 30, args: Optional[Dict[str, Any]] = None, persist: bool = False) -> str: ctx = _create_context() return await func(tool_name, code, runtime, timeout, args, persist, ctx=ctx) bring_your_own.__doc__ = docstring return bring_your_own elif name == "inventorium_sessions_list": @self.server.tool() async def inventorium_sessions_list(project: Optional[str] = None, limit: int = 50) -> str: ctx = _create_context() return await func(project, limit, ctx=ctx) inventorium_sessions_list.__doc__ = docstring return inventorium_sessions_list elif name == "inventorium_sessions_get": @self.server.tool() async def inventorium_sessions_get(session_id: str) -> str: ctx = _create_context() return await func(session_id, ctx=ctx) inventorium_sessions_get.__doc__ = docstring return inventorium_sessions_get elif name == "inventorium_sessions_create": @self.server.tool() async def inventorium_sessions_create(project: str, title: Optional[str] = None, initial_prompt: Optional[str] = None, agentic_tool: str = "claude-code") -> str: ctx = _create_context() return await func(project, title, initial_prompt, agentic_tool, ctx=ctx) inventorium_sessions_create.__doc__ = docstring return inventorium_sessions_create elif name == "inventorium_sessions_spawn": @self.server.tool() async def inventorium_sessions_spawn(parent_session_id: str, prompt: str, todo_id: Optional[str] = None, title: Optional[str] = None) -> str: ctx = _create_context() return await func(parent_session_id, prompt, todo_id, title, ctx=ctx) inventorium_sessions_spawn.__doc__ = docstring return inventorium_sessions_spawn elif name == "inventorium_todos_link_session": @self.server.tool() async def inventorium_todos_link_session(todo_id: str, session_id: str) -> str: ctx = _create_context() return await func(todo_id, session_id, ctx=ctx) inventorium_todos_link_session.__doc__ = docstring return inventorium_todos_link_session elif name == "inventorium_sessions_fork": @self.server.tool() async def inventorium_sessions_fork(session_id: str, title: Optional[str] = None, include_messages: bool = True, inherit_todos: bool = True, initial_status: Optional[str] = None) -> str: ctx = _create_context() return await func(session_id, title, include_messages, inherit_todos, initial_status, ctx=ctx) inventorium_sessions_fork.__doc__ = docstring return inventorium_sessions_fork elif name == "inventorium_sessions_genealogy": @self.server.tool() async def inventorium_sessions_genealogy(session_id: str) -> str: ctx = _create_context() return await func(session_id, ctx=ctx) inventorium_sessions_genealogy.__doc__ = docstring return inventorium_sessions_genealogy elif name == "inventorium_sessions_tree": @self.server.tool() async def inventorium_sessions_tree(project: Optional[str] = None, limit: int = 200) -> str: ctx = _create_context() return await func(project, limit, ctx=ctx) inventorium_sessions_tree.__doc__ = docstring return inventorium_sessions_tree return create_wrapper() make_tool(tool_name, tool_info["func"], tool_info["doc"]) async def run(self): """Run the stdio server.""" logger.info("Starting Omnispindle stdio MCP server with FastMCP") await self.server.run_stdio_async() async def main(): """Main entry point for stdio server.""" server = OmniSpindleStdioServer() await server.run() if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MadnessEngineering/fastmcp-todo-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server