Skip to main content
Glama

Doris MCP Server

Official
by apache
multiworker_app.py25.6 kB
#!/usr/bin/env python3 # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. """ Multi-worker application module for doris-mcp-server This module provides full MCP functionality with multi-worker support. Each worker process creates its own MCP server and session manager using the same robust architecture as the single-worker mode. """ import os import asyncio from contextlib import asynccontextmanager import json import logging from typing import Any # Import MCP components with compatibility handling # Use the same import strategy as main.py for consistency MCP_VERSION = 'unknown' Server = None InitializationOptions = None Prompt = None Resource = None TextContent = None Tool = None def _import_mcp_with_compatibility(): """Import MCP components with multi-version compatibility""" global MCP_VERSION, Server, InitializationOptions, Prompt, Resource, TextContent, Tool try: # Strategy 1: Try direct server-only imports to avoid client-side issues from mcp.server import Server as _Server from mcp.server.models import InitializationOptions as _InitOptions from mcp.types import ( Prompt as _Prompt, Resource as _Resource, TextContent as _TextContent, Tool as _Tool, ) # Assign to globals Server = _Server InitializationOptions = _InitOptions Prompt = _Prompt Resource = _Resource TextContent = _TextContent Tool = _Tool # Try to get version safely try: import mcp MCP_VERSION = getattr(mcp, '__version__', None) if not MCP_VERSION: # Fallback: try to get version from package metadata try: import importlib.metadata MCP_VERSION = importlib.metadata.version('mcp') except Exception: # Second fallback: try pkg_resources try: import pkg_resources MCP_VERSION = pkg_resources.get_distribution('mcp').version except Exception: MCP_VERSION = 'detected-but-version-unknown' except Exception: # Version detection failed, but imports worked try: import importlib.metadata MCP_VERSION = importlib.metadata.version('mcp') except Exception: try: import pkg_resources MCP_VERSION = pkg_resources.get_distribution('mcp').version except Exception: MCP_VERSION = 'imported-successfully' logger = logging.getLogger(__name__) logger.info(f"MCP components imported successfully in multiworker, version: {MCP_VERSION}") return True except Exception as import_error: logger = logging.getLogger(__name__) # Strategy 2: Handle RequestContext compatibility issues in 1.9.x versions error_str = str(import_error).lower() if 'requestcontext' in error_str and 'too few arguments' in error_str: logger.warning(f"Detected MCP RequestContext compatibility issue: {import_error}") logger.info("Attempting comprehensive workaround for MCP 1.9.x RequestContext issue...") try: # Comprehensive monkey patch approach import sys import types # Create and install mock modules before any MCP imports if 'mcp.shared.context' not in sys.modules: mock_context_module = types.ModuleType('mcp.shared.context') class FlexibleRequestContext: """Flexible RequestContext that accepts variable arguments""" def __init__(self, *args, **kwargs): self.args = args self.kwargs = kwargs def __class_getitem__(cls, params): # Accept any number of parameters and return cls return cls # Add other methods that might be called def __getattr__(self, name): return lambda *args, **kwargs: None mock_context_module.RequestContext = FlexibleRequestContext sys.modules['mcp.shared.context'] = mock_context_module # Also patch the typing system to be more permissive original_check_generic = None try: import typing if hasattr(typing, '_check_generic'): original_check_generic = typing._check_generic def permissive_check_generic(cls, params, elen): # Don't enforce strict parameter count checking return typing._check_generic = permissive_check_generic except Exception: pass # Clear any cached imports that might have failed modules_to_clear = [k for k in sys.modules.keys() if k.startswith('mcp.')] for module in modules_to_clear: if module in sys.modules: del sys.modules[module] # Now try importing again with the patches in place from mcp.server import Server as _Server from mcp.server.models import InitializationOptions as _InitOptions from mcp.types import ( Prompt as _Prompt, Resource as _Resource, TextContent as _TextContent, Tool as _Tool, ) # Assign to globals Server = _Server InitializationOptions = _InitOptions Prompt = _Prompt Resource = _Resource TextContent = _TextContent Tool = _Tool # Try to detect actual version even in compatibility mode try: import importlib.metadata actual_version = importlib.metadata.version('mcp') MCP_VERSION = f'compatibility-mode-{actual_version}' except Exception: try: import pkg_resources actual_version = pkg_resources.get_distribution('mcp').version MCP_VERSION = f'compatibility-mode-{actual_version}' except Exception: MCP_VERSION = 'compatibility-mode-1.9.x' logger.info("MCP 1.9.x compatibility workaround successful in multiworker!") # Restore original typing function if we patched it if original_check_generic: typing._check_generic = original_check_generic return True except Exception as workaround_error: logger.error(f"MCP compatibility workaround failed in multiworker: {workaround_error}") # Restore original typing function if we patched it if original_check_generic: try: import typing typing._check_generic = original_check_generic except Exception: pass logger.error(f"Failed to import MCP components in multiworker: {import_error}") return False # Perform MCP import with compatibility handling if not _import_mcp_with_compatibility(): raise ImportError( "Failed to import MCP components in multiworker. Please ensure MCP is properly installed. " "Supported versions: 1.8.x, 1.9.x" ) from starlette.applications import Starlette from starlette.routing import Route from starlette.responses import JSONResponse, Response # Import Doris MCP components from .tools.tools_manager import DorisToolsManager from .tools.prompts_manager import DorisPromptsManager from .tools.resources_manager import DorisResourcesManager from .utils.config import DorisConfig from .utils.db import DorisConnectionManager from .utils.security import DorisSecurityManager # Global variables for worker-specific instances _worker_server = None _worker_session_manager = None _worker_connection_manager = None _worker_security_manager = None _worker_session_manager_context = None _worker_initialized = False def get_mcp_capabilities(): """Get MCP capabilities for worker - use the same logic as main.py""" try: # For MCP 1.9.x and newer from mcp.server.lowlevel.server import NotificationOptions capabilities = { "resources": {}, "tools": {}, "prompts": {}, "notification_options": { "prompts_changed": True, "resources_changed": True, "tools_changed": True } } return capabilities except Exception as e: # Import logger properly from .utils.logger import get_logger logger = get_logger(__name__) logger.warning(f"Failed to get full capabilities in multiworker: {e}") return { "resources": {}, "tools": {}, "prompts": {} } async def initialize_worker(): """Initialize MCP server and managers for this worker process""" global _worker_server, _worker_session_manager, _worker_connection_manager, _worker_security_manager, _worker_session_manager_context, _worker_initialized, _oauth_handlers, _token_handlers if _worker_initialized: return try: # Import logger properly from .utils.logger import get_logger logger = get_logger(__name__) logger.info(f"Initializing MCP worker process {os.getpid()}") # Create configuration config = DorisConfig.from_env() # Initialize enhanced logging system from .utils.config import ConfigManager config_manager = ConfigManager(config) config_manager.setup_logging() # Create security manager _worker_security_manager = DorisSecurityManager(config) # Initialize security manager first (includes JWT setup if enabled) await _worker_security_manager.initialize() logger.info(f"Worker {os.getpid()} security manager initialization completed") # Create connection manager with token manager for token-bound DB config token_manager = _worker_security_manager.auth_provider.token_manager if hasattr(_worker_security_manager, 'auth_provider') and hasattr(_worker_security_manager.auth_provider, 'token_manager') else None _worker_connection_manager = DorisConnectionManager(config, _worker_security_manager, token_manager) # Set connection manager reference in security manager for database validation _worker_security_manager.connection_manager = _worker_connection_manager await _worker_connection_manager.initialize() # Create MCP server _worker_server = Server("doris-mcp-server") # Create managers resources_manager = DorisResourcesManager(_worker_connection_manager) tools_manager = DorisToolsManager(_worker_connection_manager) prompts_manager = DorisPromptsManager(_worker_connection_manager) # Setup MCP handlers @_worker_server.list_resources() async def handle_list_resources() -> list[Resource]: """Handle resource list request""" try: logger.info("Handling resource list request in worker") resources = await resources_manager.list_resources() logger.info(f"Returning {len(resources)} resources from worker") return resources except Exception as e: logger.error(f"Failed to handle resource list request in worker: {e}") return [] @_worker_server.read_resource() async def handle_read_resource(uri: str) -> str: """Handle resource read request""" try: logger.info(f"Handling resource read request in worker: {uri}") content = await resources_manager.read_resource(uri) return content except Exception as e: logger.error(f"Failed to handle resource read request in worker: {e}") return json.dumps( {"error": f"Failed to read resource: {str(e)}", "uri": uri}, ensure_ascii=False, indent=2, ) @_worker_server.list_tools() async def handle_list_tools() -> list[Tool]: """Handle tool list request""" try: logger.info("Handling tool list request in worker") tools = await tools_manager.list_tools() logger.info(f"Returning {len(tools)} tools from worker") return tools except Exception as e: logger.error(f"Failed to handle tool list request in worker: {e}") return [] @_worker_server.call_tool() async def handle_call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]: """Handle tool call request""" try: logger.info(f"Handling tool call request in worker: {name}") result = await tools_manager.call_tool(name, arguments) return [TextContent(type="text", text=result)] except Exception as e: logger.error(f"Failed to handle tool call request in worker: {e}") error_result = json.dumps( { "error": f"Tool call failed: {str(e)}", "tool_name": name, "arguments": arguments, }, ensure_ascii=False, indent=2, ) return [TextContent(type="text", text=error_result)] @_worker_server.list_prompts() async def handle_list_prompts() -> list[Prompt]: """Handle prompt list request""" try: logger.info("Handling prompt list request in worker") prompts = await prompts_manager.list_prompts() logger.info(f"Returning {len(prompts)} prompts from worker") return prompts except Exception as e: logger.error(f"Failed to handle prompt list request in worker: {e}") return [] @_worker_server.get_prompt() async def handle_get_prompt(name: str, arguments: dict[str, Any]) -> str: """Handle prompt get request""" try: logger.info(f"Handling prompt get request in worker: {name}") result = await prompts_manager.get_prompt(name, arguments) return result except Exception as e: logger.error(f"Failed to handle prompt get request in worker: {e}") error_result = json.dumps( { "error": f"Failed to get prompt: {str(e)}", "prompt_name": name, "arguments": arguments, }, ensure_ascii=False, indent=2, ) return error_result # Create session manager for this worker from mcp.server.streamable_http_manager import StreamableHTTPSessionManager _worker_session_manager = StreamableHTTPSessionManager( app=_worker_server, json_response=True, stateless=True # Use stateless mode for multi-worker compatibility ) # Start the session manager context _worker_session_manager_context = _worker_session_manager.run() await _worker_session_manager_context.__aenter__() # Initialize OAuth and Token handlers from .auth.oauth_handlers import OAuthHandlers from .auth.token_handlers import TokenHandlers _oauth_handlers = OAuthHandlers(_worker_security_manager) _token_handlers = TokenHandlers(_worker_security_manager, config) _worker_initialized = True logger.info(f"Worker {os.getpid()} MCP initialization completed successfully") except Exception as e: from .utils.logger import get_logger logger = get_logger(__name__) logger.error(f"Failed to initialize worker {os.getpid()}: {e}") import traceback logger.error("Complete error stack:") logger.error(traceback.format_exc()) raise async def health_check(request): """Health check endpoint that shows worker PID""" return JSONResponse({ "status": "healthy", "service": "doris-mcp-server", "worker_pid": os.getpid(), "worker_mode": "multi-process-full-mcp", "mcp_initialized": _worker_initialized, "mcp_version": MCP_VERSION }) # OAuth and Token handlers (initialize after worker setup) _oauth_handlers = None _token_handlers = None async def oauth_login(request): """OAuth login endpoint""" if not _oauth_handlers: return JSONResponse({"error": "OAuth not initialized"}, status_code=503) return await _oauth_handlers.handle_login(request) async def oauth_callback(request): """OAuth callback endpoint""" if not _oauth_handlers: return JSONResponse({"error": "OAuth not initialized"}, status_code=503) return await _oauth_handlers.handle_callback(request) async def oauth_provider_info(request): """OAuth provider info endpoint""" if not _oauth_handlers: return JSONResponse({"error": "OAuth not initialized"}, status_code=503) return await _oauth_handlers.handle_provider_info(request) async def oauth_demo(request): """OAuth demo page endpoint""" if not _oauth_handlers: from starlette.responses import HTMLResponse return HTMLResponse("<h1>OAuth not initialized</h1>") return await _oauth_handlers.handle_demo_page(request) # Token management endpoints async def token_create(request): """Token creation endpoint""" if not _token_handlers: return JSONResponse({"error": "Token handlers not initialized"}, status_code=503) return await _token_handlers.handle_create_token(request) async def token_revoke(request): """Token revocation endpoint""" if not _token_handlers: return JSONResponse({"error": "Token handlers not initialized"}, status_code=503) return await _token_handlers.handle_revoke_token(request) async def token_list(request): """Token listing endpoint""" if not _token_handlers: return JSONResponse({"error": "Token handlers not initialized"}, status_code=503) return await _token_handlers.handle_list_tokens(request) async def token_stats(request): """Token statistics endpoint""" if not _token_handlers: return JSONResponse({"error": "Token handlers not initialized"}, status_code=503) return await _token_handlers.handle_token_stats(request) async def token_cleanup(request): """Token cleanup endpoint""" if not _token_handlers: return JSONResponse({"error": "Token handlers not initialized"}, status_code=503) return await _token_handlers.handle_cleanup_tokens(request) async def token_management(request): """Token management page endpoint""" if not _token_handlers: from starlette.responses import HTMLResponse return HTMLResponse("<h1>Token handlers not initialized</h1>") return await _token_handlers.handle_management_page(request) async def root_info(request): """Root endpoint""" return JSONResponse({ "service": "doris-mcp-server", "mode": "multi-worker-full-mcp", "worker_pid": os.getpid(), "mcp_initialized": _worker_initialized, "mcp_version": MCP_VERSION, "endpoints": { "health": "/health", "mcp": "/mcp" } }) @asynccontextmanager async def lifespan(app): """Application lifespan manager""" # Startup try: await initialize_worker() # Import logger properly from .utils.logger import get_logger logger = get_logger(__name__) logger.info(f"Worker {os.getpid()} startup completed") yield finally: # Shutdown from .utils.logger import get_logger logger = get_logger(__name__) # Close session manager context if _worker_session_manager_context: try: await _worker_session_manager_context.__aexit__(None, None, None) logger.info(f"Worker {os.getpid()} session manager context closed") except Exception as e: logger.error(f"Error closing worker session manager context: {e}") if _worker_connection_manager: try: await _worker_connection_manager.close() logger.info(f"Worker {os.getpid()} connection manager closed") except Exception as e: logger.error(f"Error closing worker connection manager: {e}") if _worker_security_manager: try: await _worker_security_manager.shutdown() logger.info(f"Worker {os.getpid()} security manager shutdown completed") except Exception as e: logger.error(f"Error shutting down worker security manager: {e}") # Shutdown logging system try: from .utils.logger import shutdown_logging shutdown_logging() except Exception as e: logger.error(f"Error shutting down logging system: {e}") async def mcp_asgi_app(scope, receive, send): """ASGI app that handles MCP requests""" if not _worker_initialized: # Send error response if worker not initialized await send({ 'type': 'http.response.start', 'status': 503, 'headers': [(b'content-type', b'application/json')] }) await send({ 'type': 'http.response.body', 'body': b'{"error": "Worker not initialized"}' }) return # Import logger properly from .utils.logger import get_logger logger = get_logger(__name__) # Get request path for logging path = scope.get('path', '') method = scope.get('method', 'UNKNOWN') logger.debug(f"Worker {os.getpid()} handling MCP request: {method} {path}") # Handle the request directly without nested run context await _worker_session_manager.handle_request(scope, receive, send) # Create Starlette app with basic routes basic_app = Starlette( debug=True, routes=[ Route("/", root_info, methods=["GET"]), Route("/health", health_check, methods=["GET"]), # OAuth endpoints Route("/auth/login", oauth_login, methods=["GET"]), Route("/auth/callback", oauth_callback, methods=["GET"]), Route("/auth/provider", oauth_provider_info, methods=["GET"]), Route("/auth/demo", oauth_demo, methods=["GET"]), # Token management endpoints Route("/token/create", token_create, methods=["GET", "POST"]), Route("/token/revoke", token_revoke, methods=["GET", "DELETE"]), Route("/token/list", token_list, methods=["GET"]), Route("/token/stats", token_stats, methods=["GET"]), Route("/token/cleanup", token_cleanup, methods=["GET", "POST"]), Route("/token/management", token_management, methods=["GET"]), ], lifespan=lifespan ) # Create main ASGI app that routes between basic app and MCP async def app(scope, receive, send): """Main ASGI app that routes requests""" path = scope.get('path', '/') if path == "/mcp" or path.startswith('/mcp/'): # Handle MCP requests with session manager await mcp_asgi_app(scope, receive, send) else: # Handle other requests with basic Starlette app (includes auth endpoints) await basic_app(scope, receive, send)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/apache/doris-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server