Odoo MCP Server
by tuanle96
Verified
"""
MCP server for Odoo integration
Provides MCP tools and resources for interacting with Odoo ERP systems
"""
import json
from contextlib import asynccontextmanager
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any, AsyncIterator, Dict, List, Optional, Union, cast
from mcp.server.fastmcp import Context, FastMCP
from pydantic import BaseModel, Field
from .odoo_client import OdooClient, get_odoo_client
@dataclass
class AppContext:
"""Application context for the MCP server"""
odoo: OdooClient
@asynccontextmanager
async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
"""
Application lifespan for initialization and cleanup
"""
# Initialize Odoo client on startup
odoo_client = get_odoo_client()
try:
yield AppContext(odoo=odoo_client)
finally:
# No cleanup needed for Odoo client
pass
# Create MCP server
mcp = FastMCP(
"Odoo MCP Server",
description="MCP Server for interacting with Odoo ERP systems",
dependencies=["requests"],
lifespan=app_lifespan,
)
# ----- MCP Resources -----
@mcp.resource(
"odoo://models", description="List all available models in the Odoo system"
)
def get_models() -> str:
"""Lists all available models in the Odoo system"""
odoo_client = get_odoo_client()
models = odoo_client.get_models()
return json.dumps(models, indent=2)
@mcp.resource(
"odoo://model/{model_name}",
description="Get detailed information about a specific model including fields",
)
def get_model_info(model_name: str) -> str:
"""
Get information about a specific model
Parameters:
model_name: Name of the Odoo model (e.g., 'res.partner')
"""
odoo_client = get_odoo_client()
try:
# Get model info
model_info = odoo_client.get_model_info(model_name)
# Get field definitions
fields = odoo_client.get_model_fields(model_name)
model_info["fields"] = fields
return json.dumps(model_info, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://record/{model_name}/{record_id}",
description="Get detailed information of a specific record by ID",
)
def get_record(model_name: str, record_id: str) -> str:
"""
Get a specific record by ID
Parameters:
model_name: Name of the Odoo model (e.g., 'res.partner')
record_id: ID of the record
"""
odoo_client = get_odoo_client()
try:
record_id_int = int(record_id)
record = odoo_client.read_records(model_name, [record_id_int])
if not record:
return json.dumps(
{"error": f"Record not found: {model_name} ID {record_id}"}, indent=2
)
return json.dumps(record[0], indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://search/{model_name}/{domain}",
description="Search for records matching the domain",
)
def search_records_resource(model_name: str, domain: str) -> str:
"""
Search for records that match a domain
Parameters:
model_name: Name of the Odoo model (e.g., 'res.partner')
domain: Search domain in JSON format (e.g., '[["name", "ilike", "test"]]')
"""
odoo_client = get_odoo_client()
try:
# Parse domain from JSON string
domain_list = json.loads(domain)
# Set a reasonable default limit
limit = 10
# Perform search_read for efficiency
results = odoo_client.search_read(model_name, domain_list, limit=limit)
return json.dumps(results, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
# ----- Pydantic models for type safety -----
class DomainCondition(BaseModel):
"""A single condition in a search domain"""
field: str = Field(description="Field name to search")
operator: str = Field(
description="Operator (e.g., '=', '!=', '>', '<', 'in', 'not in', 'like', 'ilike')"
)
value: Any = Field(description="Value to compare against")
def to_tuple(self) -> List:
"""Convert to Odoo domain condition tuple"""
return [self.field, self.operator, self.value]
class SearchDomain(BaseModel):
"""Search domain for Odoo models"""
conditions: List[DomainCondition] = Field(
default_factory=list,
description="List of conditions for searching. All conditions are combined with AND operator.",
)
def to_domain_list(self) -> List[List]:
"""Convert to Odoo domain list format"""
return [condition.to_tuple() for condition in self.conditions]
class EmployeeSearchResult(BaseModel):
"""Represents a single employee search result."""
id: int = Field(description="Employee ID")
name: str = Field(description="Employee name")
class SearchEmployeeResponse(BaseModel):
"""Response model for the search_employee tool."""
success: bool = Field(description="Indicates if the search was successful")
result: Optional[List[EmployeeSearchResult]] = Field(
default=None, description="List of employee search results"
)
error: Optional[str] = Field(default=None, description="Error message, if any")
class Holiday(BaseModel):
"""Represents a single holiday."""
display_name: str = Field(description="Display name of the holiday")
start_datetime: str = Field(description="Start date and time of the holiday")
stop_datetime: str = Field(description="End date and time of the holiday")
employee_id: List[Union[int, str]] = Field(
description="Employee ID associated with the holiday"
)
name: str = Field(description="Name of the holiday")
state: str = Field(description="State of the holiday")
class SearchHolidaysResponse(BaseModel):
"""Response model for the search_holidays tool."""
success: bool = Field(description="Indicates if the search was successful")
result: Optional[List[Holiday]] = Field(
default=None, description="List of holidays found"
)
error: Optional[str] = Field(default=None, description="Error message, if any")
# ----- MCP Tools -----
@mcp.tool(description="Execute a custom method on an Odoo model")
def execute_method(
ctx: Context,
model: str,
method: str,
args: List = None,
kwargs: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""
Execute a custom method on an Odoo model
Parameters:
model: The model name (e.g., 'res.partner')
method: Method name to execute
args: Positional arguments
kwargs: Keyword arguments
Returns:
Dictionary containing:
- success: Boolean indicating success
- result: Result of the method (if success)
- error: Error message (if failure)
"""
odoo = ctx.request_context.lifespan_context.odoo
try:
args = args or []
kwargs = kwargs or {}
# Special handling for search methods like search, search_count, search_read
search_methods = ["search", "search_count", "search_read"]
if method in search_methods and args:
# Search methods usually have domain as the first parameter
# args: [[domain], limit, offset, ...] or [domain, limit, offset, ...]
normalized_args = list(
args
) # Create a copy to avoid affecting the original args
if len(normalized_args) > 0:
# Process domain in args[0]
domain = normalized_args[0]
domain_list = []
# Check if domain is wrapped unnecessarily ([domain] instead of domain)
if (
isinstance(domain, list)
and len(domain) == 1
and isinstance(domain[0], list)
):
# Case [[domain]] - unwrap to [domain]
domain = domain[0]
# Normalize domain similar to search_records function
if domain is None:
domain_list = []
elif isinstance(domain, dict):
if "conditions" in domain:
# Object format
conditions = domain.get("conditions", [])
domain_list = []
for cond in conditions:
if isinstance(cond, dict) and all(
k in cond for k in ["field", "operator", "value"]
):
domain_list.append(
[cond["field"], cond["operator"], cond["value"]]
)
elif isinstance(domain, list):
# List format
if not domain:
domain_list = []
elif all(isinstance(item, list) for item in domain) or any(
item in ["&", "|", "!"] for item in domain
):
domain_list = domain
elif len(domain) >= 3 and isinstance(domain[0], str):
# Case [field, operator, value] (not [[field, operator, value]])
domain_list = [domain]
elif isinstance(domain, str):
# String format (JSON)
try:
parsed_domain = json.loads(domain)
if (
isinstance(parsed_domain, dict)
and "conditions" in parsed_domain
):
conditions = parsed_domain.get("conditions", [])
domain_list = []
for cond in conditions:
if isinstance(cond, dict) and all(
k in cond for k in ["field", "operator", "value"]
):
domain_list.append(
[cond["field"], cond["operator"], cond["value"]]
)
elif isinstance(parsed_domain, list):
domain_list = parsed_domain
except json.JSONDecodeError:
try:
import ast
parsed_domain = ast.literal_eval(domain)
if isinstance(parsed_domain, list):
domain_list = parsed_domain
except:
domain_list = []
# Xác thực domain_list
if domain_list:
valid_conditions = []
for cond in domain_list:
if isinstance(cond, str) and cond in ["&", "|", "!"]:
valid_conditions.append(cond)
continue
if (
isinstance(cond, list)
and len(cond) == 3
and isinstance(cond[0], str)
and isinstance(cond[1], str)
):
valid_conditions.append(cond)
domain_list = valid_conditions
# Cập nhật args với domain đã chuẩn hóa
normalized_args[0] = domain_list
args = normalized_args
# Log for debugging
print(f"Executing {method} with normalized domain: {domain_list}")
result = odoo.execute_method(model, method, *args, **kwargs)
return {"success": True, "result": result}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool(description="Search for employees by name")
def search_employee(
ctx: Context,
name: str,
limit: int = 20,
) -> SearchEmployeeResponse:
"""
Search for employees by name using Odoo's name_search method.
Parameters:
name: The name (or part of the name) to search for.
limit: The maximum number of results to return (default 20).
Returns:
SearchEmployeeResponse containing results or error information.
"""
odoo = ctx.request_context.lifespan_context.odoo
model = "hr.employee"
method = "name_search"
args = []
kwargs = {"name": name, "limit": limit}
try:
result = odoo.execute_method(model, method, *args, **kwargs)
parsed_result = [
EmployeeSearchResult(id=item[0], name=item[1]) for item in result
]
return SearchEmployeeResponse(success=True, result=parsed_result)
except Exception as e:
return SearchEmployeeResponse(success=False, error=str(e))
@mcp.tool(description="Search for holidays within a date range")
def search_holidays(
ctx: Context,
start_date: str,
end_date: str,
employee_id: Optional[int] = None,
) -> SearchHolidaysResponse:
"""
Searches for holidays within a specified date range.
Parameters:
start_date: Start date in YYYY-MM-DD format.
end_date: End date in YYYY-MM-DD format.
employee_id: Optional employee ID to filter holidays.
Returns:
SearchHolidaysResponse: Object containing the search results.
"""
odoo = ctx.request_context.lifespan_context.odoo
# Validate date format using datetime
try:
datetime.strptime(start_date, "%Y-%m-%d")
except ValueError:
return SearchHolidaysResponse(
success=False, error="Invalid start_date format. Use YYYY-MM-DD."
)
try:
datetime.strptime(end_date, "%Y-%m-%d")
except ValueError:
return SearchHolidaysResponse(
success=False, error="Invalid end_date format. Use YYYY-MM-DD."
)
# Calculate adjusted start_date (subtract one day)
start_date_dt = datetime.strptime(start_date, "%Y-%m-%d")
adjusted_start_date_dt = start_date_dt - timedelta(days=1)
adjusted_start_date = adjusted_start_date_dt.strftime("%Y-%m-%d")
# Build the domain
domain = [
"&",
["start_datetime", "<=", f"{end_date} 22:59:59"],
# Use adjusted date
["stop_datetime", ">=", f"{adjusted_start_date} 23:00:00"],
]
if employee_id:
domain.append(
["employee_id", "=", employee_id],
)
try:
holidays = odoo.search_read(
model_name="hr.leave.report.calendar",
domain=domain,
)
parsed_holidays = [Holiday(**holiday) for holiday in holidays]
return SearchHolidaysResponse(success=True, result=parsed_holidays)
except Exception as e:
return SearchHolidaysResponse(success=False, error=str(e))