Skip to main content
Glama

Jupyter MCP Server

by datalayer
handlers.py16.7 kB
# Copyright (c) 2023-2024 Datalayer, Inc. # # BSD 3-Clause License """ Tornado request handlers for the Jupyter MCP Server extension. This module provides handlers that bridge between Tornado (Jupyter Server) and FastMCP, managing the MCP protocol lifecycle and request proxying. """ import json import logging import tornado.web from typing import Any from tornado.web import RequestHandler from jupyter_server.base.handlers import JupyterHandler from jupyter_server.extension.handler import ExtensionHandlerMixin from jupyter_mcp_server.jupyter_extension.context import get_server_context from jupyter_mcp_server.jupyter_extension.backends.local_backend import LocalBackend from jupyter_mcp_server.jupyter_extension.backends.remote_backend import RemoteBackend logger = logging.getLogger(__name__) class MCPSSEHandler(RequestHandler): """ Server-Sent Events (SSE) handler for MCP protocol. This handler implements the MCP SSE transport by directly calling the registered MCP tools instead of trying to wrap the Starlette app. The MCP protocol uses SSE for streaming responses from the server to the client. """ def check_xsrf_cookie(self): """Disable XSRF checking for MCP protocol requests.""" pass def set_default_headers(self): """Set headers for SSE and CORS.""" self.set_header("Content-Type", "text/event-stream") self.set_header("Cache-Control", "no-cache") self.set_header("Connection", "keep-alive") self.set_header("Access-Control-Allow-Origin", "*") self.set_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS") self.set_header("Access-Control-Allow-Headers", "Content-Type, Authorization") async def options(self, *args, **kwargs): """Handle CORS preflight requests.""" self.set_status(204) self.finish() async def get(self): """Handle SSE connection establishment.""" # Import here to avoid circular dependency from jupyter_mcp_server.server import mcp # For now, just acknowledge the connection # The actual MCP protocol would be handled via POST self.write("event: connected\ndata: {}\n\n") await self.flush() async def post(self): """Handle MCP protocol messages.""" # Import here to avoid circular dependency from jupyter_mcp_server.server import mcp try: # Parse the JSON-RPC request body = json.loads(self.request.body.decode('utf-8')) method = body.get("method") params = body.get("params", {}) request_id = body.get("id") logger.info(f"MCP request: method={method}, id={request_id}") # Handle notifications (id is None) - these don't require a response per JSON-RPC 2.0 # But in HTTP transport, we need to acknowledge the request if request_id is None: logger.info(f"Received notification: {method} - acknowledging without result") # Return empty response - the client should handle notifications without expecting a result # Some clients may send this as POST and expect HTTP 200 with no JSON-RPC response self.set_status(200) self.finish() return # Handle different MCP methods if method == "initialize": # Return server capabilities response = { "jsonrpc": "2.0", "id": request_id, "result": { "protocolVersion": "2024-11-05", "capabilities": { "tools": {}, "prompts": {}, "resources": {} }, "serverInfo": { "name": "Jupyter MCP Server", "version": "0.14.0" } } } logger.info(f"Sending initialize response: {response}") elif method == "tools/list": # List available tools from FastMCP from jupyter_mcp_server.server import mcp logger.info("Calling mcp.list_tools()...") try: # Use FastMCP's list_tools method - returns list of Tool objects tools_list = await mcp.list_tools() logger.info(f"Got {len(tools_list)} tools from FastMCP") # Convert to MCP protocol format tools = [] for tool in tools_list: tools.append({ "name": tool.name, "description": tool.description, "inputSchema": tool.inputSchema }) logger.info(f"Converted {len(tools)} tools to MCP format") response = { "jsonrpc": "2.0", "id": request_id, "result": { "tools": tools } } except Exception as e: logger.error(f"Error listing tools: {e}", exc_info=True) response = { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32603, "message": f"Internal error listing tools: {str(e)}" } } elif method == "tools/call": # Execute a tool from jupyter_mcp_server.server import mcp tool_name = params.get("name") tool_arguments = params.get("arguments", {}) logger.info(f"Calling tool: {tool_name} with args: {tool_arguments}") try: # Use FastMCP's call_tool method result = await mcp.call_tool(tool_name, tool_arguments) # Handle tuple results from FastMCP if isinstance(result, tuple) and len(result) >= 1: # FastMCP returns (content_list, metadata_dict) content_list = result[0] if isinstance(content_list, list): # Serialize TextContent objects to dicts serialized_content = [] for item in content_list: if hasattr(item, 'model_dump'): serialized_content.append(item.model_dump()) elif hasattr(item, 'dict'): serialized_content.append(item.dict()) elif isinstance(item, dict): serialized_content.append(item) else: serialized_content.append({"type": "text", "text": str(item)}) result_dict = {"content": serialized_content} else: result_dict = {"content": [{"type": "text", "text": str(result)}]} # Convert result to dict - it's a CallToolResult with content list elif hasattr(result, 'model_dump'): result_dict = result.model_dump() elif hasattr(result, 'dict'): result_dict = result.dict() elif hasattr(result, 'content'): # Extract content directly if it has a content attribute result_dict = {"content": result.content} else: # Last resort: check if it's already a string if isinstance(result, str): result_dict = {"content": [{"type": "text", "text": result}]} else: # If it's some other type, try to serialize it result_dict = {"content": [{"type": "text", "text": str(result)}]} logger.warning(f"Used fallback str() conversion for type {type(result)}") logger.info(f"Converted result to dict") response = { "jsonrpc": "2.0", "id": request_id, "result": result_dict } except Exception as e: logger.error(f"Error calling tool: {e}", exc_info=True) response = { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32603, "message": f"Internal error calling tool: {str(e)}" } } elif method == "prompts/list": # List available prompts - return empty list if no prompts defined logger.info("Listing prompts...") response = { "jsonrpc": "2.0", "id": request_id, "result": { "prompts": [] } } elif method == "resources/list": # List available resources - return empty list if no resources defined logger.info("Listing resources...") response = { "jsonrpc": "2.0", "id": request_id, "result": { "resources": [] } } else: # Method not supported response = { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32601, "message": f"Method not found: {method}" } } # Send response self.set_header("Content-Type", "application/json") logger.info(f"Sending response: {json.dumps(response)[:200]}...") self.write(json.dumps(response)) self.finish() except Exception as e: logger.error(f"Error handling MCP request: {e}", exc_info=True) self.set_status(500) self.write(json.dumps({ "jsonrpc": "2.0", "id": body.get("id") if 'body' in locals() else None, "error": { "code": -32603, "message": str(e) } })) self.finish() class MCPHandler(ExtensionHandlerMixin, JupyterHandler): """Base handler for MCP endpoints with common functionality.""" def get_backend(self): """ Get the appropriate backend based on configuration. Returns: Backend instance (LocalBackend or RemoteBackend) """ context = get_server_context() # Check if we should use local backend if context.is_local_document() or context.is_local_runtime(): return LocalBackend(context.serverapp) else: # Use remote backend document_url = self.settings.get("mcp_document_url") document_token = self.settings.get("mcp_document_token", "") runtime_url = self.settings.get("mcp_runtime_url") runtime_token = self.settings.get("mcp_runtime_token", "") return RemoteBackend( document_url=document_url, document_token=document_token, runtime_url=runtime_url, runtime_token=runtime_token ) def set_default_headers(self): """Set CORS headers for MCP clients.""" self.set_header("Access-Control-Allow-Origin", "*") self.set_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS") self.set_header("Access-Control-Allow-Headers", "Content-Type, Authorization") def options(self, *args, **kwargs): """Handle OPTIONS requests for CORS preflight.""" self.set_status(204) self.finish() class MCPHealthHandler(MCPHandler): """ Health check endpoint. GET /mcp/healthz """ @tornado.web.authenticated def get(self): """Handle health check request.""" context = get_server_context() health_info = { "status": "healthy", "context_type": context.context_type, "document_url": context.document_url or self.settings.get("mcp_document_url"), "runtime_url": context.runtime_url or self.settings.get("mcp_runtime_url"), "extension": "jupyter_mcp_server", "version": "0.14.0" } self.set_header("Content-Type", "application/json") self.write(json.dumps(health_info)) self.finish() class MCPToolsListHandler(MCPHandler): """ List available MCP tools. GET /mcp/tools/list """ @tornado.web.authenticated async def get(self): """Return list of available tools dynamically from the tool registry.""" # Import here to avoid circular dependency from jupyter_mcp_server.server import get_registered_tools # Get tools dynamically from the MCP server registry tools = await get_registered_tools() response = { "tools": tools, "count": len(tools) } self.set_header("Content-Type", "application/json") self.write(json.dumps(response)) self.finish() class MCPToolsCallHandler(MCPHandler): """ Execute an MCP tool. POST /mcp/tools/call Body: {"tool_name": "...", "arguments": {...}} """ @tornado.web.authenticated async def post(self): """Handle tool execution request.""" try: # Parse request body body = json.loads(self.request.body.decode('utf-8')) tool_name = body.get("tool_name") arguments = body.get("arguments", {}) if not tool_name: self.set_status(400) self.write(json.dumps({"error": "tool_name is required"})) self.finish() return logger.info(f"Executing tool: {tool_name} with args: {arguments}") # Get backend backend = self.get_backend() # Execute tool based on name # For now, return a placeholder response # TODO: Implement actual tool routing result = await self._execute_tool(tool_name, arguments, backend) response = { "success": True, "result": result } self.set_header("Content-Type", "application/json") self.write(json.dumps(response)) self.finish() except Exception as e: logger.error(f"Error executing tool: {e}", exc_info=True) self.set_status(500) self.write(json.dumps({ "success": False, "error": str(e) })) self.finish() async def _execute_tool(self, tool_name: str, arguments: dict[str, Any], backend): """ Route tool execution to appropriate implementation. Args: tool_name: Name of tool to execute arguments: Tool arguments backend: Backend instance Returns: Tool execution result """ # TODO: Implement actual tool routing # For now, return a simple response if tool_name == "list_notebooks": notebooks = await backend.list_notebooks() return {"notebooks": notebooks} # Placeholder for other tools return f"Tool {tool_name} executed with backend {type(backend).__name__}"

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/datalayer/jupyter-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server