Skip to main content
Glama
vzeman
by vzeman
http_server.py35.4 kB
"""FastAPI-based HTTP streaming MCP server for Odoo integration.""" import asyncio import json import time import uuid from typing import Any, Dict, List, Optional, Union from contextlib import asynccontextmanager from fastapi import FastAPI, HTTPException, Request, Response from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse, JSONResponse from sse_starlette.sse import EventSourceResponse import uvicorn from mcp.types import Tool, TextContent from .config import get_config from .logger import get_logger from .services.odoo_service import get_odoo_service from .services.cache_service import get_cache_service logger = get_logger(__name__) # Session storage (in production, use Redis or similar) sessions: Dict[str, Dict[str, Any]] = {} # Global start time for uptime calculation _start_time = time.time() def _fix_tool_schema(tool_dict: Dict[str, Any]) -> Dict[str, Any]: """Fix tool schema by adding missing required fields and cleaning annotations.""" # Add title if missing if tool_dict.get("title") is None: tool_dict["title"] = tool_dict["name"].replace("_", " ").title() # Remove outputSchema to avoid validation issues if "outputSchema" in tool_dict: del tool_dict["outputSchema"] # Fix annotations - remove null values and ensure proper types annotations = tool_dict.get("annotations", {}) if annotations is None: annotations = {} # Remove null values from annotations cleaned_annotations = {} for key, value in annotations.items(): if value is not None: cleaned_annotations[key] = value tool_dict["annotations"] = cleaned_annotations return tool_dict async def get_all_tools() -> List[Dict[str, Any]]: """Get all available tools.""" tools = [] # Record management tools record_tools = [ Tool( name="create_record", description="Create new records in Odoo", inputSchema={ "type": "object", "properties": { "model": { "type": "string", "description": "The Odoo model name", }, "values": { "type": "object", "description": "Dictionary of field values", }, }, "required": ["model", "values"], }, ), Tool( name="update_record", description="Update existing records", inputSchema={ "type": "object", "properties": { "model": { "type": "string", "description": "The Odoo model name", }, "ids": { "type": "array", "description": "List of record IDs to update", "items": {"type": "integer"}, }, "values": { "type": "object", "description": "Dictionary of field values to update", }, }, "required": ["model", "ids", "values"], }, ), Tool( name="delete_record", description="Delete records from Odoo (use with caution)", inputSchema={ "type": "object", "properties": { "model": { "type": "string", "description": "The Odoo model name", }, "ids": { "type": "array", "description": "List of record IDs to delete", "items": {"type": "integer"}, }, }, "required": ["model", "ids"], }, ), Tool( name="get_record", description="Get detailed information about specific records", inputSchema={ "type": "object", "properties": { "model": { "type": "string", "description": "The Odoo model name", }, "ids": { "type": "array", "description": "List of record IDs", "items": {"type": "integer"}, }, "fields": { "type": "array", "description": "List of fields to return (optional)", "items": {"type": "string"}, }, }, "required": ["model", "ids"], }, ), Tool( name="execute_method", description="Execute custom methods on Odoo models", inputSchema={ "type": "object", "properties": { "model": { "type": "string", "description": "Odoo model name", }, "method": { "type": "string", "description": "Method name to execute", }, "ids": { "type": "array", "description": "List of record IDs (if method requires)", "items": {"type": "integer"}, }, "args": { "type": "array", "description": "Additional positional arguments", }, "kwargs": { "type": "object", "description": "Additional keyword arguments", }, }, "required": ["model", "method"], }, ), ] # Search tools search_tools = [ Tool( name="search_records", description="Search for records in any Odoo model", inputSchema={ "type": "object", "properties": { "model": { "type": "string", "description": "The Odoo model name (e.g., 'res.partner', 'sale.order')", }, "domain": { "type": "array", "description": "Odoo domain filter (default: [])", "items": {"type": "array"}, "default": [], }, "fields": { "type": "array", "description": "List of fields to return", "items": {"type": "string"}, }, "limit": { "type": "integer", "description": "Maximum number of records", "minimum": 1, "maximum": 1000, }, "offset": { "type": "integer", "description": "Number of records to skip", "minimum": 0, "default": 0, }, "order": { "type": "string", "description": "Sort order (e.g., 'name asc, id desc')", }, }, "required": ["model"], }, ), Tool( name="search_count", description="Count records matching search criteria", inputSchema={ "type": "object", "properties": { "model": { "type": "string", "description": "Odoo model name", }, "domain": { "type": "array", "description": "Search domain in Odoo format", "items": {"type": "array"}, "default": [], }, }, "required": ["model"], }, ), ] # Model tools model_tools = [ Tool( name="list_models", description="Discover available models in your Odoo instance", inputSchema={ "type": "object", "properties": { "transient": { "type": "boolean", "description": "Include transient (wizard) models (default: false)", "default": False, }, "search": { "type": "string", "description": "Filter models by name (case-insensitive)", }, }, }, ), Tool( name="get_model_fields", description="Get field definitions for a model", inputSchema={ "type": "object", "properties": { "model": { "type": "string", "description": "The Odoo model name", }, "fields": { "type": "array", "description": "Specific fields to get info for (optional)", "items": {"type": "string"}, }, "attributes": { "type": "array", "description": "Field attributes to include (optional)", "items": {"type": "string"}, }, }, "required": ["model"], }, ), Tool( name="model_info", description="Get comprehensive information about a model", inputSchema={ "type": "object", "properties": { "model": { "type": "string", "description": "The Odoo model name", }, }, "required": ["model"], }, ), ] # Server management tools server_tools = [ Tool( name="server_status", description="Get server status and health information", inputSchema={ "type": "object", "properties": {}, }, ), Tool( name="cache_stats", description="Get cache statistics and management", inputSchema={ "type": "object", "properties": { "action": { "type": "string", "description": "Cache action: 'stats', 'clear'", "enum": ["stats", "clear"], "default": "stats", }, }, }, ), ] # Combine all tools and fix schemas all_tools = record_tools + search_tools + model_tools + server_tools for tool in all_tools: tool_dict = tool.model_dump() tools.append(_fix_tool_schema(tool_dict)) return tools async def call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]: """Call a tool and return the result.""" logger.info(f"Tool called: {name} with arguments: {arguments}") try: odoo_service = get_odoo_service() # Record management tools if name == "create_record": model = arguments["model"] values = arguments["values"] logger.info(f"Creating record in {model}") result = odoo_service.create(model, values) return [TextContent(type="text", text=json.dumps({"id": result, "message": f"Created record with ID: {result}"}, indent=2))] elif name == "update_record": model = arguments["model"] ids = arguments["ids"] values = arguments["values"] logger.info(f"Updating {len(ids)} record(s) in {model}") success = odoo_service.write(model, ids, values) return [TextContent(type="text", text=json.dumps({"success": success, "ids": ids, "message": f"Update {'successful' if success else 'failed'} for IDs: {ids}"}, indent=2))] elif name == "delete_record": model = arguments["model"] ids = arguments["ids"] logger.info(f"Deleting {len(ids)} record(s) from {model}") success = odoo_service.unlink(model, ids) return [TextContent(type="text", text=json.dumps({"success": success, "ids": ids, "message": f"Delete {'successful' if success else 'failed'} for IDs: {ids}"}, indent=2))] elif name == "get_record": model = arguments["model"] ids = arguments["ids"] fields = arguments.get("fields") logger.info(f"Getting {len(ids)} record(s) from {model}") result = odoo_service.read(model, ids, fields) return [TextContent(type="text", text=json.dumps({"records": result, "count": len(result)}, indent=2, default=str))] elif name == "execute_method": model = arguments["model"] method = arguments["method"] ids = arguments.get("ids", []) args = arguments.get("args", []) kwargs = arguments.get("kwargs", {}) logger.info(f"Executing method '{method}' on {model} with IDs: {ids}") # Prepare arguments - if ids are provided, add them to args if ids: args = [ids] + list(args) # Execute the method result = odoo_service.execute(model, method, *args, **kwargs) return [TextContent(type="text", text=json.dumps({"result": result}, indent=2, default=str))] # Search tools elif name == "search_records": model = arguments["model"] domain = arguments.get("domain", []) fields = arguments.get("fields") limit = arguments.get("limit") offset = arguments.get("offset", 0) order = arguments.get("order") logger.info(f"Searching records in {model} with domain: {domain}") result = odoo_service.search_read( model=model, domain=domain, fields=fields, offset=offset, limit=limit, order=order, ) return [TextContent(type="text", text=json.dumps({"records": result, "count": len(result)}, indent=2, default=str))] elif name == "search_count": model = arguments["model"] domain = arguments.get("domain", []) logger.info(f"Counting records in {model} with domain: {domain}") ids = odoo_service.search(model=model, domain=domain) count = len(ids) return [TextContent(type="text", text=json.dumps({"count": count, "message": f"Found {count} records matching the criteria"}, indent=2))] # Model tools elif name == "list_models": include_transient = arguments.get("transient", False) search_term = arguments.get("search", "").lower() logger.info(f"Listing models (transient: {include_transient}, search: '{search_term}')") models = odoo_service.get_model_list() # Filter models filtered_models = [] for model in models: if not include_transient and model.get("transient", False): continue if search_term and search_term not in model["model"].lower() and search_term not in model["name"].lower(): continue filtered_models.append(model) # Format output if not filtered_models: return [TextContent(type="text", text=json.dumps({"models": [], "count": 0, "message": "No models found matching the criteria."}, indent=2))] output = f"Found {len(filtered_models)} Odoo models:\n\n" for model in sorted(filtered_models, key=lambda x: x["model"]): transient_marker = " (transient)" if model.get("transient", False) else "" output += f"• **{model['model']}**{transient_marker}\n" output += f" {model['name']}\n\n" return [TextContent(type="text", text=json.dumps({"models": filtered_models, "count": len(filtered_models), "formatted_output": output}, indent=2))] elif name == "get_model_fields": model = arguments["model"] fields = arguments.get("fields") attributes = arguments.get("attributes") logger.info(f"Getting fields for model: {model}") field_info = odoo_service.fields_get( model=model, fields=fields, attributes=attributes ) if not field_info: return [TextContent(type="text", text=json.dumps({"fields": {}, "count": 0, "message": f"No fields found for model: {model}"}, indent=2))] # Format output for better readability output = f"Fields for model **{model}**:\n\n" for field_name, field_def in sorted(field_info.items()): field_type = field_def.get("type", "unknown") field_string = field_def.get("string", field_name) required = " (required)" if field_def.get("required", False) else "" readonly = " (readonly)" if field_def.get("readonly", False) else "" output += f"• **{field_name}** ({field_type}){required}{readonly}\n" output += f" {field_string}\n" if field_def.get("help"): output += f" Help: {field_def['help']}\n" output += "\n" return [TextContent(type="text", text=json.dumps({"fields": field_info, "count": len(field_info), "formatted_output": output}, indent=2))] elif name == "model_info": model = arguments["model"] logger.info(f"Getting comprehensive info for model: {model}") # Get model metadata models = odoo_service.get_model_list() model_info = next((m for m in models if m["model"] == model), None) if not model_info: return [TextContent(type="text", text=json.dumps({"error": f"Model '{model}' not found."}, indent=2))] # Get field count field_info = odoo_service.fields_get(model=model) field_count = len(field_info) # Get record count (sample) try: sample_ids = odoo_service.search(model=model, limit=1) has_records = len(sample_ids) > 0 except: has_records = "Unknown" # Format comprehensive output output = f"# Model Information: **{model}**\n\n" output += f"**Name:** {model_info['name']}\n" output += f"**Technical Name:** {model_info['model']}\n" output += f"**Type:** {'Transient (Wizard)' if model_info.get('transient', False) else 'Persistent'}\n" output += f"**Fields:** {field_count}\n" output += f"**Has Records:** {has_records}\n\n" # Show some key fields key_fields = [] for field_name, field_def in field_info.items(): if field_name in ['id', 'name', 'display_name', 'create_date', 'write_date']: key_fields.append(f"• {field_name} ({field_def.get('type', 'unknown')})") if key_fields: output += "**Key Fields:**\n" output += "\n".join(key_fields[:10]) # Show first 10 if len(key_fields) > 10: output += f"\n... and {len(key_fields) - 10} more" output += "\n\n" output += f"Use `get_model_fields` with model='{model}' to see all field details." return [TextContent(type="text", text=json.dumps({ "model_info": model_info, "field_count": field_count, "has_records": has_records, "key_fields": key_fields[:10], "formatted_output": output }, indent=2))] # Server management tools elif name == "server_status": return await handle_server_status(arguments) elif name == "cache_stats": return await handle_cache_stats(arguments) else: return [TextContent(type="text", text=json.dumps({"error": f"Unknown tool: {name}"}, indent=2))] except Exception as e: logger.error(f"Error handling tool {name}: {e}", exc_info=True) return [TextContent(type="text", text=json.dumps({"error": f"{type(e).__name__}: {str(e)}"}, indent=2))] async def handle_server_status(arguments: Dict[str, Any]) -> List[TextContent]: """Handle server status requests.""" config = get_config() cache_service = get_cache_service() # Test Odoo connection try: odoo_service = get_odoo_service() odoo_service.authenticate() odoo_connected = True except Exception as e: logger.warning(f"Odoo connection test failed: {e}") odoo_connected = False uptime = time.time() - _start_time cache_stats = cache_service.stats() # Format output output = "# Odoo MCP Server Status\n\n" output += f"**Status:** {'HEALTHY' if odoo_connected else 'DEGRADED'}\n" output += f"**Version:** 1.0.0\n" output += f"**Uptime:** {uptime:.2f}s\n\n" output += "## Odoo Connection\n" output += f"**Connected:** {'✅ Yes' if odoo_connected else '❌ No'}\n" output += f"**URL:** {config.odoo.url}\n" output += f"**Database:** {config.odoo.database}\n\n" output += "## Cache\n" output += f"**Enabled:** {'✅ Yes' if cache_stats['enabled'] else '❌ No'}\n" output += f"**Size:** {cache_stats['size']}/{cache_stats['max_size']}\n" output += f"**TTL:** {cache_stats['ttl']}s\n\n" output += "## Server Configuration\n" output += f"**Host:** {config.server.host}\n" output += f"**Port:** {config.server.port}\n" output += f"**Debug:** {config.server.debug}\n" output += f"**Log Level:** {config.server.log_level}\n" return [TextContent(type="text", text=json.dumps({ "status": "HEALTHY" if odoo_connected else "DEGRADED", "version": "1.0.0", "uptime": uptime, "odoo_connected": odoo_connected, "odoo_url": config.odoo.url, "odoo_database": config.odoo.database, "cache_stats": cache_stats, "server_config": { "host": config.server.host, "port": config.server.port, "debug": config.server.debug, "log_level": config.server.log_level }, "formatted_output": output }, indent=2))] async def handle_cache_stats(arguments: Dict[str, Any]) -> List[TextContent]: """Handle cache statistics requests.""" cache_service = get_cache_service() action = arguments.get("action", "stats") if action == "clear": cache_service.clear() return [TextContent(type="text", text=json.dumps({"success": True, "message": "Cache cleared successfully"}, indent=2))] # Get cache statistics stats = cache_service.stats() output = "# Cache Statistics\n\n" output += f"**Enabled:** {'✅ Yes' if stats['enabled'] else '❌ No'}\n" output += f"**Current Size:** {stats['size']}\n" output += f"**Maximum Size:** {stats['max_size']}\n" output += f"**TTL:** {stats['ttl']} seconds\n" output += f"**Usage:** {(stats['size'] / stats['max_size'] * 100):.1f}%\n" return [TextContent(type="text", text=json.dumps({ "cache_stats": stats, "usage_percentage": (stats['size'] / stats['max_size'] * 100), "formatted_output": output }, indent=2))] @asynccontextmanager async def lifespan(app: FastAPI): """Application lifespan manager.""" logger.info("Starting Odoo MCP HTTP Server...") try: # Test Odoo connection on startup odoo_service = get_odoo_service() odoo_service.authenticate() logger.info("Odoo connection test successful") except Exception as e: logger.warning(f"Odoo connection test failed: {e}") yield logger.info("Shutting down Odoo MCP HTTP Server...") # Create FastAPI app app = FastAPI( title="Odoo MCP HTTP Server", description="HTTP streaming MCP server for Odoo integration", version="1.0.0", lifespan=lifespan ) # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) def _stream_response(data: Dict[str, Any]): """Generate SSE stream from data.""" yield f"data: {json.dumps(data)}\n\n" @app.get("/") async def root(): """Root endpoint.""" return {"message": "Odoo MCP HTTP Server", "version": "1.0.0"} @app.post("/") async def root_post(request: Request): """Root POST endpoint - redirects to MCP endpoint for compatibility.""" # Forward the request to the MCP endpoint return await mcp_endpoint(request) @app.get("/health") async def health_check(): """Health check endpoint.""" try: odoo_service = get_odoo_service() odoo_service.authenticate() return {"status": "healthy", "odoo_connected": True} except Exception as e: return {"status": "degraded", "odoo_connected": False, "error": str(e)} @app.post("/mcp") async def mcp_endpoint(request: Request): """Main MCP endpoint for handling JSON-RPC requests.""" try: # Parse request body body = await request.json() # Extract JSON-RPC fields method = body.get("method") params = body.get("params", {}) request_id = body.get("id") # Check for streaming preference wants_streaming = request.headers.get("accept") == "text/event-stream" # Generate session ID if not present session_id = request.headers.get("Mcp-Session-Id", str(uuid.uuid4())) logger.info(f"MCP request: {method} (streaming: {wants_streaming})") # Handle different MCP methods if method == "initialize": response = { "jsonrpc": "2.0", "id": request_id, "result": { "protocolVersion": "2024-11-05", "capabilities": { "tools": {}, "resources": {}, "prompts": {}, "logging": { "setLevel": True } }, "serverInfo": { "name": "odoo-mcp-server", "version": "1.0.0" } } } if wants_streaming: return EventSourceResponse( _stream_response(response), headers={"Mcp-Session-Id": session_id} ) else: return JSONResponse( content=response, headers={"Mcp-Session-Id": session_id} ) elif method == "tools/list": tools = await get_all_tools() response = { "jsonrpc": "2.0", "id": request_id, "result": {"tools": tools} } if wants_streaming: return EventSourceResponse(_stream_response(response)) else: return JSONResponse(content=response) elif method == "tools/call": tool_name = params.get("name") arguments = params.get("arguments", {}) try: result = await call_tool(tool_name, arguments) # Handle different result types if isinstance(result, list): # Result is already a list of TextContent objects content = [] for item in result: try: # Try to parse as JSON and return as formatted text parsed_json = json.loads(item.text) content.append({ "type": "text", "text": json.dumps(parsed_json, indent=2) }) except json.JSONDecodeError: # If not JSON, return as plain text content.append({"type": "text", "text": item.text}) elif isinstance(result, str): # Result is a plain string (error case) content = [{"type": "text", "text": result}] else: # Fallback content = [{"type": "text", "text": str(result)}] response = { "jsonrpc": "2.0", "id": request_id, "result": { "content": content } } if wants_streaming: return EventSourceResponse(_stream_response(response)) else: return JSONResponse(content=response) except Exception as e: logger.error(f"Error calling tool {tool_name}: {e}", exc_info=True) response = { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32603, "message": f"Internal error: {str(e)}" } } if wants_streaming: return EventSourceResponse(_stream_response(response)) else: return JSONResponse(content=response) elif method in ["logging/set_level", "logging/set level", "logging/setLevel", "logging/set-level"]: # Handle logging level setting level = params.get("level", "info") # Validate log level against MCP specification valid_levels = ["debug", "info", "notice", "warning", "error", "critical", "alert", "emergency"] if level not in valid_levels: response = { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32602, "message": "Invalid params", "data": f"Invalid log level: {level}. Valid levels: {', '.join(valid_levels)}" } } if wants_streaming: return EventSourceResponse(_stream_response(response)) else: return JSONResponse(content=response) try: # Update the logger level import logging numeric_level = getattr(logging, level.upper(), logging.INFO) logging.getLogger().setLevel(numeric_level) response = { "jsonrpc": "2.0", "id": request_id, "result": {} } if wants_streaming: return EventSourceResponse(_stream_response(response)) else: return JSONResponse(content=response) except Exception as e: logger.error(f"Logging level setting error: {e}") response = { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32603, "message": f"Failed to set logging level: {str(e)}" } } if wants_streaming: return EventSourceResponse(_stream_response(response)) else: return JSONResponse(content=response) elif method == "notifications/cancelled": # Handle notification cancellation - just acknowledge response = { "jsonrpc": "2.0", "id": request_id, "result": {} } if wants_streaming: return EventSourceResponse(_stream_response(response)) else: return JSONResponse(content=response) else: # Log unknown methods for debugging logger.warning(f"Unknown method requested: {method}") response = { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32601, "message": f"Method not found: {method}" } } if wants_streaming: return EventSourceResponse(_stream_response(response)) else: return JSONResponse(content=response) except Exception as e: logger.error(f"Error processing MCP request: {e}", exc_info=True) return JSONResponse( status_code=500, content={ "jsonrpc": "2.0", "id": None, "error": { "code": -32603, "message": f"Internal error: {str(e)}" } } ) if __name__ == "__main__": config = get_config() uvicorn.run( "mcp_server_odoo.http_server:app", host=config.server.host, port=config.server.port, reload=config.server.debug, log_level=config.server.log_level.lower() )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/vzeman/odoo-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server