#!/usr/bin/env python3
"""
MCP Server for Odoo - Timesheet and Expense Management.
This MCP server allows managing timesheets and expense reports
in Odoo via the XML-RPC protocol.
"""
from __future__ import annotations
import base64
import logging
import mimetypes
import os
import socket
import sys
import xmlrpc.client
from collections.abc import Callable
from datetime import date, timedelta
from pathlib import Path
from enum import StrEnum
from functools import wraps
from typing import Any, ParamSpec, TypeVar
from dotenv import load_dotenv
from mcp.server.fastmcp import FastMCP
# Logger configuration
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
# Odoo configuration
ODOO_URL: str = os.getenv("ODOO_URL", "")
ODOO_DB: str = os.getenv("ODOO_DB", "")
ODOO_USERNAME: str = os.getenv("ODOO_USERNAME", "")
ODOO_API_KEY: str = os.getenv("ODOO_API_KEY", "")
# ============== CONSTANTS ==============
class ExpenseState(StrEnum):
"""Possible states of an expense."""
DRAFT = "draft"
REPORTED = "reported"
APPROVED = "approved"
DONE = "done"
REFUSED = "refused"
EXPENSE_STATE_LABELS: dict[ExpenseState | str, str] = {
ExpenseState.DRAFT: "Draft",
ExpenseState.REPORTED: "Submitted",
ExpenseState.APPROVED: "Approved",
ExpenseState.DONE: "Paid",
ExpenseState.REFUSED: "Refused",
}
VALID_EXPENSE_STATES: frozenset[str] = frozenset(state.value for state in ExpenseState)
# ============== TYPES ==============
# Type aliases for Odoo
OdooDomain = list[tuple[str, str, Any]]
OdooRecord = dict[str, Any]
OdooMany2one = list[int | str] | bool # [id, name] or False
# Generics for decorator
P = ParamSpec("P")
R = TypeVar("R")
# ============== EXCEPTIONS ==============
class OdooError(Exception):
"""Custom exception for Odoo errors."""
class OdooConnectionError(OdooError):
"""Odoo connection error."""
class OdooAuthenticationError(OdooError):
"""Odoo authentication error."""
# ============== VALIDATION ==============
def validate_config() -> None:
"""Validate that all configuration variables are present."""
missing: list[str] = []
if not ODOO_URL:
missing.append("ODOO_URL")
if not ODOO_DB:
missing.append("ODOO_DB")
if not ODOO_USERNAME:
missing.append("ODOO_USERNAME")
if not ODOO_API_KEY:
missing.append("ODOO_API_KEY")
if missing:
logger.error("Missing variables in .env: %s", ", ".join(missing))
sys.exit(1)
def validate_date(date_str: str) -> bool:
"""Validate date format (YYYY-MM-DD)."""
try:
date.fromisoformat(date_str)
return True
except ValueError:
return False
# ============== DECORATORS ==============
def handle_odoo_errors(func: Callable[P, str]) -> Callable[P, str]:
"""Decorator to handle Odoo errors uniformly."""
@wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> str:
try:
return func(*args, **kwargs)
except OdooError as e:
logger.exception("Odoo error in %s", func.__name__)
return f"Error: {e}"
return wrapper
# Create MCP server
mcp = FastMCP("odoo-timesheet-expenses")
class OdooClient:
"""Client to connect to Odoo via XML-RPC."""
__slots__ = ("url", "db", "username", "api_key", "_uid", "_common", "_models")
def __init__(self) -> None:
self.url: str = ODOO_URL
self.db: str = ODOO_DB
self.username: str = ODOO_USERNAME
self.api_key: str = ODOO_API_KEY
self._uid: int | None = None
self._common: xmlrpc.client.ServerProxy | None = None
self._models: xmlrpc.client.ServerProxy | None = None
@property
def common(self) -> xmlrpc.client.ServerProxy:
"""Return the common proxy (reused)."""
if self._common is None:
self._common = xmlrpc.client.ServerProxy(f"{self.url}/xmlrpc/2/common")
return self._common
@property
def uid(self) -> int:
"""Authenticate and return user UID."""
if self._uid is None:
try:
self._uid = self.common.authenticate(
self.db, self.username, self.api_key, {}
)
if not self._uid:
raise OdooAuthenticationError(
"Odoo authentication failed. Check your credentials."
)
except xmlrpc.client.Fault as e:
raise OdooAuthenticationError(
f"Authentication error: {e.faultString}"
) from e
except (ConnectionError, TimeoutError, socket.error, OSError) as e:
raise OdooConnectionError(f"Connection error: {e}") from e
return self._uid
@property
def models(self) -> xmlrpc.client.ServerProxy:
"""Return the proxy for Odoo models (reused)."""
if self._models is None:
self._models = xmlrpc.client.ServerProxy(f"{self.url}/xmlrpc/2/object")
return self._models
def execute(self, model: str, method: str, *args: Any, **kwargs: Any) -> Any:
"""Execute a method on an Odoo model."""
try:
return self.models.execute_kw(
self.db, self.uid, self.api_key,
model, method, args, kwargs
)
except xmlrpc.client.Fault as e:
raise OdooError(f"Odoo error ({model}.{method}): {e.faultString}") from e
def search_read(
self,
model: str,
domain: OdooDomain | None = None,
fields: list[str] | None = None,
limit: int = 100,
offset: int = 0,
order: str | None = None,
) -> list[OdooRecord]:
"""Search and read records."""
domain = domain or []
params: dict[str, Any] = {"limit": limit, "offset": offset}
if fields:
params["fields"] = fields
if order:
params["order"] = order
return self.execute(model, "search_read", domain, **params)
def read(
self,
model: str,
ids: list[int],
fields: list[str] | None = None,
) -> list[OdooRecord]:
"""Read records by ID."""
params: dict[str, Any] = {}
if fields:
params["fields"] = fields
return self.execute(model, "read", ids, **params)
def create(self, model: str, values: dict[str, Any]) -> int:
"""Create a record."""
return self.execute(model, "create", [values])
def write(self, model: str, ids: list[int], values: dict[str, Any]) -> bool:
"""Update records."""
return self.execute(model, "write", ids, values)
def unlink(self, model: str, ids: list[int]) -> bool:
"""Delete records."""
return self.execute(model, "unlink", ids)
def exists(self, model: str, record_id: int) -> bool:
"""Check if a record exists."""
result: int = self.execute(model, "search_count", [("id", "=", record_id)])
return result > 0
# Global client instance
odoo = OdooClient()
# ============== HELPERS ==============
def _get_many2one_name(field: OdooMany2one, default: str = "N/A") -> str:
"""Extract name from an Odoo Many2one field."""
if isinstance(field, list) and len(field) >= 2:
return str(field[1])
return default
def _format_pagination(count: int, limit: int, offset: int, item_name: str = "entries") -> str:
"""Format pagination message."""
result = f"\n*{count} {item_name} displayed*"
if count == limit:
result += f"\n*Use offset={offset + limit} to see more.*"
return result
# ============== TIMESHEET TOOLS ==============
@mcp.tool()
@handle_odoo_errors
def list_projects(limit: int = 50, offset: int = 0) -> str:
"""
List available projects in Odoo.
Args:
limit: Maximum number of projects to return (default: 50)
offset: Offset for pagination (default: 0)
Returns:
List of projects with their ID and name
"""
projects = odoo.search_read(
"project.project",
[("active", "=", True)],
["id", "name", "partner_id"],
limit=limit,
offset=offset,
order="name"
)
result = "# Available Projects\n\n"
if offset > 0:
result += f"*(Page starting from entry {offset + 1})*\n\n"
for p in projects:
partner_name = _get_many2one_name(p.get("partner_id"))
result += f"- **ID {p['id']}**: {p['name']} (Client: {partner_name})\n"
if not projects:
result += "No projects found.\n"
else:
result += _format_pagination(len(projects), limit, offset, "project(s)")
return result
@mcp.tool()
@handle_odoo_errors
def list_tasks(project_id: int | None = None, limit: int = 50, offset: int = 0) -> str:
"""
List available tasks in Odoo.
Args:
project_id: Filter by project ID (optional)
limit: Maximum number of tasks to return (default: 50)
offset: Offset for pagination (default: 0)
Returns:
List of tasks with their ID, name and project
"""
domain: OdooDomain = [("active", "=", True)]
if project_id is not None:
if not odoo.exists("project.project", project_id):
return f"Error: Project ID {project_id} not found."
domain.append(("project_id", "=", project_id))
tasks = odoo.search_read(
"project.task",
domain,
["id", "name", "project_id", "stage_id"],
limit=limit,
offset=offset,
order="project_id, name"
)
result = "# Available Tasks\n\n"
if offset > 0:
result += f"*(Page starting from entry {offset + 1})*\n\n"
for t in tasks:
project_name = _get_many2one_name(t.get("project_id"))
stage_name = _get_many2one_name(t.get("stage_id"))
result += f"- **ID {t['id']}**: {t['name']}\n - Project: {project_name}\n - Stage: {stage_name}\n"
if not tasks:
result += "No tasks found.\n"
else:
result += _format_pagination(len(tasks), limit, offset, "task(s)")
return result
@mcp.tool()
@handle_odoo_errors
def list_timesheets(
date_from: str | None = None,
date_to: str | None = None,
project_id: int | None = None,
limit: int = 100,
offset: int = 0,
all_users: bool = False,
) -> str:
"""
List existing timesheet entries.
Args:
date_from: Start date (format YYYY-MM-DD, optional)
date_to: End date (format YYYY-MM-DD, optional)
project_id: Filter by project ID (optional)
limit: Maximum number of entries (default: 100)
offset: Offset for pagination (default: 0)
all_users: If False, filter on current user (default: False)
Returns:
List of timesheet entries
"""
# Date validation
if date_from is not None and not validate_date(date_from):
return f"Error: Invalid date format for date_from: {date_from}. Use YYYY-MM-DD."
if date_to is not None and not validate_date(date_to):
return f"Error: Invalid date format for date_to: {date_to}. Use YYYY-MM-DD."
# Default: filter on current user
domain: OdooDomain = [] if all_users else [("user_id", "=", odoo.uid)]
if date_from is not None:
domain.append(("date", ">=", date_from))
if date_to is not None:
domain.append(("date", "<=", date_to))
if project_id is not None:
domain.append(("project_id", "=", project_id))
timesheets = odoo.search_read(
"account.analytic.line",
domain,
["id", "date", "name", "unit_amount", "project_id", "task_id"],
limit=limit,
offset=offset,
order="date desc"
)
result = "# Timesheet Entries\n\n"
if offset > 0:
result += f"*(Page starting from entry {offset + 1})*\n\n"
total_hours: float = 0.0
for ts in timesheets:
project_name = _get_many2one_name(ts.get("project_id"))
task_name = _get_many2one_name(ts.get("task_id"))
hours: float = ts.get("unit_amount", 0)
total_hours += hours
result += f"- **ID {ts['id']}** - {ts['date']}\n"
result += f" - Description: {ts['name']}\n"
result += f" - Hours: {hours}h\n"
result += f" - Project: {project_name}\n"
result += f" - Task: {task_name}\n\n"
if not timesheets:
result += "No entries found.\n"
result += f"\n**Total displayed: {total_hours}h** ({len(timesheets)} entries)"
if len(timesheets) == limit:
result += f"\n*Use offset={offset + limit} to see more entries.*"
return result
@mcp.tool()
@handle_odoo_errors
def get_timesheet_summary_by_employee(
date_from: str,
date_to: str,
expected_hours_per_day: float = 8.0,
) -> str:
"""
Get a summary of hours logged by employee for a given period.
Args:
date_from: Start date (format YYYY-MM-DD)
date_to: End date (format YYYY-MM-DD)
expected_hours_per_day: Expected hours per working day (default: 8.0)
Returns:
Summary of hours by employee with comparison to expected hours
"""
# Date validation
if not validate_date(date_from):
return f"Error: Invalid date format for date_from: {date_from}. Use YYYY-MM-DD."
if not validate_date(date_to):
return f"Error: Invalid date format for date_to: {date_to}. Use YYYY-MM-DD."
# Calculate working days in the period
start_date = date.fromisoformat(date_from)
end_date = date.fromisoformat(date_to)
working_days = 0
current = start_date
while current <= end_date:
if current.weekday() < 5: # Monday to Friday
working_days += 1
current += timedelta(days=1)
expected_hours = working_days * expected_hours_per_day
# Fetch all timesheets for the period with employee
timesheets = odoo.search_read(
"account.analytic.line",
[("date", ">=", date_from), ("date", "<=", date_to), ("project_id", "!=", False)],
["id", "employee_id", "unit_amount"],
limit=5000
)
# Aggregate by employee
hours_by_employee: dict[int, dict[str, Any]] = {}
for ts in timesheets:
emp = ts.get("employee_id")
if emp and isinstance(emp, list) and len(emp) >= 2:
emp_id = emp[0]
emp_name = emp[1]
if emp_id not in hours_by_employee:
hours_by_employee[emp_id] = {"name": emp_name, "hours": 0.0}
hours_by_employee[emp_id]["hours"] += ts.get("unit_amount", 0)
# Sort by hours (descending)
sorted_employees = sorted(
hours_by_employee.items(),
key=lambda x: x[1]["hours"],
reverse=True
)
result = f"# Hours Summary - {date_from} to {date_to}\n\n"
result += f"**Working days:** {working_days} | **Expected hours:** {expected_hours}h\n\n"
result += "| Employee | Hours | Expected | Diff | Status |\n"
result += "|----------|-------|----------|------|--------|\n"
for emp_id, data in sorted_employees:
hours = data["hours"]
diff = hours - expected_hours
pct = (hours / expected_hours * 100) if expected_hours > 0 else 0
# Fun categories
if pct >= 120:
status = "Overachiever"
elif pct >= 100:
status = "Model Employee"
elif pct >= 80:
status = "Almost Perfect"
elif pct >= 60:
status = "Could Do Better"
elif pct >= 40:
status = "Slacker Detected"
elif pct > 0:
status = "Office Ghost"
else:
status = "Missing in Action"
diff_str = f"+{diff:.1f}" if diff >= 0 else f"{diff:.1f}"
result += f"| {data['name']} | {hours:.1f}h | {expected_hours:.0f}h | {diff_str}h | {status} |\n"
if not sorted_employees:
result += "| No data | - | - | - | - |\n"
return result
@mcp.tool()
@handle_odoo_errors
def create_timesheet(
project_id: int,
hours: float,
description: str,
task_id: int | None = None,
date_entry: str | None = None,
) -> str:
"""
Create a new timesheet entry.
Args:
project_id: Project ID
hours: Number of hours worked
description: Description of work done
task_id: Task ID (optional)
date_entry: Entry date in YYYY-MM-DD format (default: today)
Returns:
Confirmation with created entry ID
"""
# Validation
if hours <= 0:
return "Error: Number of hours must be positive."
if not description.strip():
return "Error: Description cannot be empty."
if date_entry is None:
date_entry = date.today().isoformat()
elif not validate_date(date_entry):
return f"Error: Invalid date format: {date_entry}. Use YYYY-MM-DD."
# Check project exists
if not odoo.exists("project.project", project_id):
return f"Error: Project ID {project_id} not found."
# Check task exists if specified
if task_id is not None and not odoo.exists("project.task", task_id):
return f"Error: Task ID {task_id} not found."
values: dict[str, Any] = {
"project_id": project_id,
"unit_amount": hours,
"name": description,
"date": date_entry,
}
if task_id is not None:
values["task_id"] = task_id
entry_id = odoo.create("account.analytic.line", values)
return (
f"Timesheet created successfully! ID: {entry_id}\n"
f"- Date: {date_entry}\n"
f"- Hours: {hours}h\n"
f"- Description: {description}"
)
@mcp.tool()
@handle_odoo_errors
def update_timesheet(
timesheet_id: int,
hours: float | None = None,
description: str | None = None,
date_entry: str | None = None,
) -> str:
"""
Update an existing timesheet entry.
Args:
timesheet_id: ID of entry to update
hours: New number of hours (optional)
description: New description (optional)
date_entry: New date in YYYY-MM-DD format (optional)
Returns:
Update confirmation
"""
# Check entry exists
if not odoo.exists("account.analytic.line", timesheet_id):
return f"Error: Timesheet ID {timesheet_id} not found."
# Validation
if hours is not None and hours <= 0:
return "Error: Number of hours must be positive."
if description is not None and not description.strip():
return "Error: Description cannot be empty."
if date_entry is not None and not validate_date(date_entry):
return f"Error: Invalid date format: {date_entry}. Use YYYY-MM-DD."
values: dict[str, Any] = {}
if hours is not None:
values["unit_amount"] = hours
if description is not None:
values["name"] = description
if date_entry is not None:
values["date"] = date_entry
if not values:
return "No modifications specified."
odoo.write("account.analytic.line", [timesheet_id], values)
return f"Timesheet ID {timesheet_id} updated successfully!\nModifications: {values}"
@mcp.tool()
@handle_odoo_errors
def delete_timesheet(timesheet_id: int) -> str:
"""
Delete a timesheet entry.
Args:
timesheet_id: ID of entry to delete
Returns:
Deletion confirmation
"""
if not odoo.exists("account.analytic.line", timesheet_id):
return f"Error: Timesheet ID {timesheet_id} not found."
odoo.unlink("account.analytic.line", [timesheet_id])
return f"Timesheet ID {timesheet_id} deleted successfully!"
# ============== EXPENSE TOOLS ==============
@mcp.tool()
@handle_odoo_errors
def list_expense_categories(limit: int = 100, offset: int = 0) -> str:
"""
List available expense categories (service type products).
Args:
limit: Maximum number of categories to return (default: 100)
offset: Offset for pagination (default: 0)
Returns:
List of categories with their ID and name
"""
products = odoo.search_read(
"product.product",
[("can_be_expensed", "=", True)],
["id", "name", "standard_price", "uom_id"],
limit=limit,
offset=offset,
order="name"
)
result = "# Available Expense Categories\n\n"
if offset > 0:
result += f"*(Page starting from entry {offset + 1})*\n\n"
for p in products:
uom_name = _get_many2one_name(p.get("uom_id"), "Unit")
result += (
f"- **ID {p['id']}**: {p['name']} "
f"(Standard price: {p.get('standard_price', 0)}, Unit: {uom_name})\n"
)
if not products:
result += (
"No expense category found. "
"Check that you have products with 'can_be_expensed=True'.\n"
)
else:
result += _format_pagination(len(products), limit, offset, "category(ies)")
return result
@mcp.tool()
@handle_odoo_errors
def list_expenses(
date_from: str | None = None,
date_to: str | None = None,
state: str | None = None,
limit: int = 50,
offset: int = 0,
) -> str:
"""
List existing expenses.
Args:
date_from: Start date (format YYYY-MM-DD, optional)
date_to: End date (format YYYY-MM-DD, optional)
state: Filter by state: draft, reported, approved, done, refused (optional)
limit: Maximum number of entries (default: 50)
offset: Offset for pagination (default: 0)
Returns:
List of expenses
"""
# Date validation
if date_from is not None and not validate_date(date_from):
return f"Error: Invalid date format for date_from: {date_from}. Use YYYY-MM-DD."
if date_to is not None and not validate_date(date_to):
return f"Error: Invalid date format for date_to: {date_to}. Use YYYY-MM-DD."
# State validation
if state is not None and state not in VALID_EXPENSE_STATES:
return f"Error: Invalid state '{state}'. Valid states: {', '.join(VALID_EXPENSE_STATES)}"
domain: OdooDomain = []
if date_from is not None:
domain.append(("date", ">=", date_from))
if date_to is not None:
domain.append(("date", "<=", date_to))
if state is not None:
domain.append(("state", "=", state))
expenses = odoo.search_read(
"hr.expense",
domain,
["id", "name", "date", "total_amount", "state", "product_id", "description"],
limit=limit,
offset=offset,
order="date desc"
)
result = "# Expenses\n\n"
if offset > 0:
result += f"*(Page starting from entry {offset + 1})*\n\n"
total_amount: float = 0.0
for exp in expenses:
product_name = _get_many2one_name(exp.get("product_id"))
amount: float = exp.get("total_amount", 0)
total_amount += amount
state_label = EXPENSE_STATE_LABELS.get(exp.get("state", ""), exp.get("state", ""))
result += f"- **ID {exp['id']}** - {exp['date']}\n"
result += f" - Name: {exp['name']}\n"
result += f" - Amount: {amount}\n"
result += f" - Category: {product_name}\n"
result += f" - State: {state_label}\n"
if exp.get("description"):
result += f" - Description: {exp['description']}\n"
result += "\n"
if not expenses:
result += "No expenses found.\n"
result += f"\n**Total displayed: {total_amount}** ({len(expenses)} expenses)"
if len(expenses) == limit:
result += f"\n*Use offset={offset + limit} to see more entries.*"
return result
@mcp.tool()
@handle_odoo_errors
def create_expense(
name: str,
product_id: int,
total_amount: float,
description: str | None = None,
date_expense: str | None = None,
quantity: float = 1.0,
) -> str:
"""
Create a new expense report.
Args:
name: Expense name/title
product_id: Expense category ID (see list_expense_categories)
total_amount: Total expense amount
description: Detailed description (optional)
date_expense: Expense date in YYYY-MM-DD format (default: today)
quantity: Quantity (default: 1.0)
Returns:
Confirmation with created expense ID
"""
# Validation
if not name.strip():
return "Error: Expense name cannot be empty."
if total_amount <= 0:
return "Error: Amount must be positive."
if quantity <= 0:
return "Error: Quantity must be positive."
if date_expense is None:
date_expense = date.today().isoformat()
elif not validate_date(date_expense):
return f"Error: Invalid date format: {date_expense}. Use YYYY-MM-DD."
# Check category exists
if not odoo.exists("product.product", product_id):
return (
f"Error: Expense category ID {product_id} not found. "
"Use list_expense_categories to see available categories."
)
# Get employee linked to connected user
employees = odoo.search_read(
"hr.employee",
[("user_id", "=", odoo.uid)],
["id"],
limit=1
)
if not employees:
return (
"Error: No employee found for your user. "
"Contact your Odoo administrator."
)
employee_id: int = employees[0]["id"]
values: dict[str, Any] = {
"name": name,
"product_id": product_id,
"total_amount": total_amount,
"date": date_expense,
"employee_id": employee_id,
"quantity": quantity,
}
if description is not None:
values["description"] = description
expense_id = odoo.create("hr.expense", values)
return (
f"Expense created successfully! ID: {expense_id}\n"
f"- Name: {name}\n"
f"- Amount: {total_amount}\n"
f"- Date: {date_expense}"
)
@mcp.tool()
@handle_odoo_errors
def update_expense(
expense_id: int,
name: str | None = None,
total_amount: float | None = None,
description: str | None = None,
date_expense: str | None = None,
payment_mode: str | None = None,
currency: str | None = None,
analytic_account_id: int | None = None,
) -> str:
"""
Update an existing expense (only if in draft state).
Args:
expense_id: ID of expense to update
name: New name (optional)
total_amount: New amount (optional)
description: New description (optional)
date_expense: New date in YYYY-MM-DD format (optional)
payment_mode: Payment mode - "company" (paid by company) or "employee" (paid by employee) (optional)
currency: ISO currency code (e.g., "EUR", "USD") (optional)
analytic_account_id: Analytic account ID for distribution (100%) (optional)
Returns:
Update confirmation
"""
# Check expense exists and get its state
expenses = odoo.search_read(
"hr.expense",
[("id", "=", expense_id)],
["state"],
limit=1
)
if not expenses:
return f"Error: Expense ID {expense_id} not found."
state = expenses[0].get("state")
if state != ExpenseState.DRAFT:
state_label = EXPENSE_STATE_LABELS.get(state, state).lower()
return (
f"Error: Cannot modify a {state_label} expense. "
"Only draft expenses can be modified."
)
# Validation
if name is not None and not name.strip():
return "Error: Name cannot be empty."
if total_amount is not None and total_amount <= 0:
return "Error: Amount must be positive."
if date_expense is not None and not validate_date(date_expense):
return f"Error: Invalid date format: {date_expense}. Use YYYY-MM-DD."
if payment_mode is not None and payment_mode not in ("company", "employee"):
return "Error: payment_mode must be 'company' or 'employee'."
# Search currency if specified
currency_id: int | None = None
if currency is not None:
currencies = odoo.search_read(
"res.currency",
[("name", "=", currency.upper())],
["id", "name"],
limit=1
)
if not currencies:
return f"Error: Currency '{currency}' not found. Use an ISO code (EUR, USD, GBP, etc.)."
currency_id = currencies[0]["id"]
values: dict[str, Any] = {}
if name is not None:
values["name"] = name
if total_amount is not None:
values["total_amount"] = total_amount
if description is not None:
values["description"] = description
if date_expense is not None:
values["date"] = date_expense
if payment_mode is not None:
# Odoo uses 'own_account' for employee and 'company_account' for company
values["payment_mode"] = "company_account" if payment_mode == "company" else "own_account"
if currency_id is not None:
values["currency_id"] = currency_id
if analytic_account_id is not None:
# Odoo uses a JSON dict format: {"account_id": percentage}
values["analytic_distribution"] = {str(analytic_account_id): 100}
if not values:
return "No modifications specified."
odoo.write("hr.expense", [expense_id], values)
return f"Expense ID {expense_id} updated successfully!\nModifications: {values}"
@mcp.tool()
@handle_odoo_errors
def delete_expense(expense_id: int) -> str:
"""
Delete an expense (only if in draft state).
Args:
expense_id: ID of expense to delete
Returns:
Deletion confirmation
"""
# Check expense exists and get its state
expenses = odoo.search_read(
"hr.expense",
[("id", "=", expense_id)],
["state", "name"],
limit=1
)
if not expenses:
return f"Error: Expense ID {expense_id} not found."
expense = expenses[0]
state = expense.get("state")
if state != ExpenseState.DRAFT:
state_label = EXPENSE_STATE_LABELS.get(state, state).lower()
return (
f"Error: Cannot delete a {state_label} expense. "
"Only draft expenses can be deleted."
)
odoo.unlink("hr.expense", [expense_id])
return f"Expense '{expense.get('name')}' (ID {expense_id}) deleted successfully!"
@mcp.tool()
@handle_odoo_errors
def add_expense_attachment(
expense_id: int,
file_path: str,
filename: str | None = None,
) -> str:
"""
Add an attachment to an existing expense.
Args:
expense_id: Expense ID
file_path: Absolute path to file to attach
filename: Filename in Odoo (optional, uses original filename by default)
Returns:
Confirmation with created attachment ID
"""
# Check expense exists
if not odoo.exists("hr.expense", expense_id):
return f"Error: Expense ID {expense_id} not found."
# Check file exists
path = Path(file_path)
if not path.exists():
return f"Error: File not found: {file_path}"
if not path.is_file():
return f"Error: Path is not a file: {file_path}"
# Read and encode file in base64
try:
with open(path, "rb") as f:
file_content = f.read()
file_data = base64.b64encode(file_content).decode("utf-8")
except OSError as e:
return f"Error reading file: {e}"
# Determine filename
attachment_name = filename if filename else path.name
# Determine MIME type
mimetype, _ = mimetypes.guess_type(str(path))
if mimetype is None:
mimetype = "application/octet-stream"
# Create attachment in Odoo
values: dict[str, Any] = {
"name": attachment_name,
"datas": file_data,
"res_model": "hr.expense",
"res_id": expense_id,
"type": "binary",
"mimetype": mimetype,
}
attachment_id = odoo.create("ir.attachment", values)
# Calculate file size for display
file_size = len(file_content)
if file_size < 1024:
size_str = f"{file_size} bytes"
elif file_size < 1024 * 1024:
size_str = f"{file_size / 1024:.1f} KB"
else:
size_str = f"{file_size / (1024 * 1024):.1f} MB"
return (
f"Attachment added successfully!\n"
f"- Attachment ID: {attachment_id}\n"
f"- Expense ID: {expense_id}\n"
f"- File: {attachment_name}\n"
f"- Size: {size_str}\n"
f"- Type: {mimetype}"
)
@mcp.tool()
@handle_odoo_errors
def list_expense_attachments(expense_id: int) -> str:
"""
List attachments for an expense.
Args:
expense_id: Expense ID
Returns:
List of attachments with their ID and name
"""
# Check expense exists
if not odoo.exists("hr.expense", expense_id):
return f"Error: Expense ID {expense_id} not found."
attachments = odoo.search_read(
"ir.attachment",
[("res_model", "=", "hr.expense"), ("res_id", "=", expense_id)],
["id", "name", "mimetype", "file_size", "create_date"],
order="create_date desc"
)
result = f"# Attachments for Expense ID {expense_id}\n\n"
if not attachments:
result += "No attachments found.\n"
return result
for att in attachments:
file_size = att.get("file_size", 0)
if file_size < 1024:
size_str = f"{file_size} bytes"
elif file_size < 1024 * 1024:
size_str = f"{file_size / 1024:.1f} KB"
else:
size_str = f"{file_size / (1024 * 1024):.1f} MB"
result += f"- **ID {att['id']}**: {att['name']}\n"
result += f" - Type: {att.get('mimetype', 'N/A')}\n"
result += f" - Size: {size_str}\n"
result += f" - Added: {att.get('create_date', 'N/A')}\n\n"
result += f"**Total: {len(attachments)} attachment(s)**"
return result
# ============== CONTACT TOOLS ==============
@mcp.tool()
@handle_odoo_errors
def list_contacts(
customer: bool | None = None,
supplier: bool | None = None,
company: bool | None = None,
limit: int = 50,
offset: int = 0,
) -> str:
"""
List contacts (partners) in Odoo.
Args:
customer: Filter customers (True/False/None for all)
supplier: Filter suppliers (True/False/None for all)
company: Filter companies (True) or individuals (False)
limit: Maximum number of contacts (default: 50)
offset: Offset for pagination (default: 0)
Returns:
List of contacts with their information
"""
domain: OdooDomain = [("active", "=", True)]
if customer is not None:
domain.append(("customer_rank", ">" if customer else "=", 0))
if supplier is not None:
domain.append(("supplier_rank", ">" if supplier else "=", 0))
if company is not None:
domain.append(("is_company", "=", company))
contacts = odoo.search_read(
"res.partner",
domain,
["id", "name", "email", "phone", "city", "country_id", "is_company", "customer_rank", "supplier_rank"],
limit=limit,
offset=offset,
order="name"
)
result = "# Contacts\n\n"
if offset > 0:
result += f"*(Page starting from entry {offset + 1})*\n\n"
for c in contacts:
contact_type = "Company" if c.get("is_company") else "Individual"
roles = []
if c.get("customer_rank", 0) > 0:
roles.append("Customer")
if c.get("supplier_rank", 0) > 0:
roles.append("Supplier")
role_str = ", ".join(roles) if roles else "Contact"
country_name = _get_many2one_name(c.get("country_id"))
location = f"{c.get('city', '')}, {country_name}" if c.get("city") else country_name
result += f"- **ID {c['id']}**: {c['name']} ({contact_type})\n"
result += f" - Type: {role_str}\n"
if c.get("email"):
result += f" - Email: {c['email']}\n"
if c.get("phone"):
result += f" - Phone: {c['phone']}\n"
if location != "N/A":
result += f" - Location: {location}\n"
result += "\n"
if not contacts:
result += "No contacts found.\n"
else:
result += _format_pagination(len(contacts), limit, offset, "contact(s)")
return result
@mcp.tool()
@handle_odoo_errors
def get_contact(contact_id: int) -> str:
"""
Get details of a specific contact.
Args:
contact_id: Contact ID
Returns:
Complete contact details
"""
contacts = odoo.search_read(
"res.partner",
[("id", "=", contact_id)],
[
"id", "name", "email", "phone", "mobile", "street", "street2",
"city", "zip", "country_id", "state_id", "vat", "website",
"is_company", "parent_id", "child_ids", "customer_rank", "supplier_rank",
"comment", "create_date"
],
limit=1
)
if not contacts:
return f"Error: Contact ID {contact_id} not found."
c = contacts[0]
contact_type = "Company" if c.get("is_company") else "Individual"
result = f"# Contact: {c['name']}\n\n"
result += f"- **ID**: {c['id']}\n"
result += f"- **Type**: {contact_type}\n"
if c.get("parent_id"):
result += f"- **Parent company**: {_get_many2one_name(c['parent_id'])}\n"
if c.get("email"):
result += f"- **Email**: {c['email']}\n"
if c.get("phone"):
result += f"- **Phone**: {c['phone']}\n"
if c.get("mobile"):
result += f"- **Mobile**: {c['mobile']}\n"
if c.get("website"):
result += f"- **Website**: {c['website']}\n"
# Address
address_parts = []
if c.get("street"):
address_parts.append(c["street"])
if c.get("street2"):
address_parts.append(c["street2"])
if c.get("zip") or c.get("city"):
address_parts.append(f"{c.get('zip', '')} {c.get('city', '')}".strip())
if c.get("country_id"):
address_parts.append(_get_many2one_name(c["country_id"]))
if address_parts:
result += f"- **Address**: {', '.join(address_parts)}\n"
if c.get("vat"):
result += f"- **VAT**: {c['vat']}\n"
# Roles
roles = []
if c.get("customer_rank", 0) > 0:
roles.append("Customer")
if c.get("supplier_rank", 0) > 0:
roles.append("Supplier")
if roles:
result += f"- **Roles**: {', '.join(roles)}\n"
if c.get("comment"):
result += f"\n**Notes:**\n{c['comment']}\n"
return result
@mcp.tool()
@handle_odoo_errors
def create_contact(
name: str,
email: str | None = None,
phone: str | None = None,
is_company: bool = False,
customer: bool = True,
supplier: bool = False,
street: str | None = None,
city: str | None = None,
zip_code: str | None = None,
parent_id: int | None = None,
) -> str:
"""
Create a new contact.
Args:
name: Contact name
email: Email address (optional)
phone: Phone number (optional)
is_company: True if it's a company, False for individual
customer: Mark as customer (default: True)
supplier: Mark as supplier (default: False)
street: Address (optional)
city: City (optional)
zip_code: Zip code (optional)
parent_id: Parent company ID for contacts (optional)
Returns:
Confirmation with created contact ID
"""
if not name.strip():
return "Error: Contact name cannot be empty."
if parent_id is not None and not odoo.exists("res.partner", parent_id):
return f"Error: Parent contact ID {parent_id} not found."
values: dict[str, Any] = {
"name": name.strip(),
"is_company": is_company,
"customer_rank": 1 if customer else 0,
"supplier_rank": 1 if supplier else 0,
}
if email is not None:
values["email"] = email
if phone is not None:
values["phone"] = phone
if street is not None:
values["street"] = street
if city is not None:
values["city"] = city
if zip_code is not None:
values["zip"] = zip_code
if parent_id is not None:
values["parent_id"] = parent_id
contact_id = odoo.create("res.partner", values)
contact_type = "Company" if is_company else "Contact"
return (
f"{contact_type} created successfully! ID: {contact_id}\n"
f"- Name: {name}\n"
f"- Email: {email or 'N/A'}\n"
f"- Customer: {'Yes' if customer else 'No'}\n"
f"- Supplier: {'Yes' if supplier else 'No'}"
)
# ============== INVOICE TOOLS ==============
@mcp.tool()
@handle_odoo_errors
def list_invoices(
invoice_type: str | None = None,
state: str | None = None,
partner_id: int | None = None,
date_from: str | None = None,
date_to: str | None = None,
limit: int = 50,
offset: int = 0,
) -> str:
"""
List invoices in Odoo.
Args:
invoice_type: Invoice type (out_invoice=customer, in_invoice=supplier, out_refund=customer credit note, in_refund=supplier credit note)
state: State (draft, posted, cancel)
partner_id: Filter by partner
date_from: Start date (YYYY-MM-DD)
date_to: End date (YYYY-MM-DD)
limit: Maximum number of invoices (default: 50)
offset: Offset for pagination (default: 0)
Returns:
List of invoices
"""
valid_types = ["out_invoice", "in_invoice", "out_refund", "in_refund"]
valid_states = ["draft", "posted", "cancel"]
if invoice_type is not None and invoice_type not in valid_types:
return f"Error: Invalid type '{invoice_type}'. Valid types: {', '.join(valid_types)}"
if state is not None and state not in valid_states:
return f"Error: Invalid state '{state}'. Valid states: {', '.join(valid_states)}"
if date_from is not None and not validate_date(date_from):
return f"Error: Invalid date format for date_from: {date_from}"
if date_to is not None and not validate_date(date_to):
return f"Error: Invalid date format for date_to: {date_to}"
domain: OdooDomain = [("move_type", "in", valid_types)]
if invoice_type is not None:
domain.append(("move_type", "=", invoice_type))
if state is not None:
domain.append(("state", "=", state))
if partner_id is not None:
domain.append(("partner_id", "=", partner_id))
if date_from is not None:
domain.append(("invoice_date", ">=", date_from))
if date_to is not None:
domain.append(("invoice_date", "<=", date_to))
invoices = odoo.search_read(
"account.move",
domain,
["id", "name", "partner_id", "invoice_date", "invoice_date_due", "amount_total", "amount_residual", "state", "move_type", "currency_id"],
limit=limit,
offset=offset,
order="invoice_date desc, id desc"
)
type_labels = {
"out_invoice": "Customer Invoice",
"in_invoice": "Supplier Invoice",
"out_refund": "Customer Credit Note",
"in_refund": "Supplier Credit Note",
}
state_labels = {
"draft": "Draft",
"posted": "Posted",
"cancel": "Cancelled",
}
result = "# Invoices\n\n"
if offset > 0:
result += f"*(Page starting from entry {offset + 1})*\n\n"
total = 0.0
for inv in invoices:
partner_name = _get_many2one_name(inv.get("partner_id"))
currency = _get_many2one_name(inv.get("currency_id"), "EUR")
type_label = type_labels.get(inv.get("move_type"), inv.get("move_type"))
state_label = state_labels.get(inv.get("state"), inv.get("state"))
amount = inv.get("amount_total", 0)
residual = inv.get("amount_residual", 0)
total += amount
result += f"- **{inv['name']}** (ID {inv['id']})\n"
result += f" - Type: {type_label}\n"
result += f" - Partner: {partner_name}\n"
result += f" - Date: {inv.get('invoice_date', 'N/A')}\n"
result += f" - Due date: {inv.get('invoice_date_due', 'N/A')}\n"
result += f" - Amount: {amount} {currency}\n"
if residual > 0:
result += f" - Amount due: {residual} {currency}\n"
result += f" - State: {state_label}\n\n"
if not invoices:
result += "No invoices found.\n"
else:
result += f"\n**Total: {total}** ({len(invoices)} invoices)"
if len(invoices) == limit:
result += f"\n*Use offset={offset + limit} to see more.*"
return result
@mcp.tool()
@handle_odoo_errors
def get_invoice(invoice_id: int) -> str:
"""
Get details of a specific invoice with its lines.
Args:
invoice_id: Invoice ID
Returns:
Complete invoice details
"""
invoices = odoo.search_read(
"account.move",
[("id", "=", invoice_id)],
[
"id", "name", "partner_id", "invoice_date", "invoice_date_due",
"amount_untaxed", "amount_tax", "amount_total", "amount_residual",
"state", "move_type", "currency_id", "ref", "narration", "invoice_line_ids"
],
limit=1
)
if not invoices:
return f"Error: Invoice ID {invoice_id} not found."
inv = invoices[0]
type_labels = {
"out_invoice": "Customer Invoice",
"in_invoice": "Supplier Invoice",
"out_refund": "Customer Credit Note",
"in_refund": "Supplier Credit Note",
}
state_labels = {
"draft": "Draft",
"posted": "Posted",
"cancel": "Cancelled",
}
result = f"# Invoice: {inv['name']}\n\n"
result += f"- **ID**: {inv['id']}\n"
result += f"- **Type**: {type_labels.get(inv.get('move_type'), inv.get('move_type'))}\n"
result += f"- **Partner**: {_get_many2one_name(inv.get('partner_id'))}\n"
result += f"- **Date**: {inv.get('invoice_date', 'N/A')}\n"
result += f"- **Due date**: {inv.get('invoice_date_due', 'N/A')}\n"
result += f"- **State**: {state_labels.get(inv.get('state'), inv.get('state'))}\n"
if inv.get("ref"):
result += f"- **Reference**: {inv['ref']}\n"
currency = _get_many2one_name(inv.get("currency_id"), "EUR")
result += f"\n## Amounts ({currency})\n"
result += f"- Untaxed: {inv.get('amount_untaxed', 0)}\n"
result += f"- Tax: {inv.get('amount_tax', 0)}\n"
result += f"- **Total**: {inv.get('amount_total', 0)}\n"
if inv.get("amount_residual", 0) > 0:
result += f"- **Amount due**: {inv.get('amount_residual', 0)}\n"
# Get invoice lines
if inv.get("invoice_line_ids"):
lines = odoo.search_read(
"account.move.line",
[("id", "in", inv["invoice_line_ids"]), ("display_type", "=", "product")],
["name", "quantity", "price_unit", "price_subtotal", "product_id"],
limit=100
)
if lines:
result += "\n## Lines\n"
for line in lines:
product_name = _get_many2one_name(line.get("product_id"), line.get("name", ""))
result += f"- {product_name}: {line.get('quantity', 1)} x {line.get('price_unit', 0)} = {line.get('price_subtotal', 0)}\n"
return result
# ============== SALE ORDER TOOLS ==============
@mcp.tool()
@handle_odoo_errors
def list_sale_orders(
state: str | None = None,
partner_id: int | None = None,
date_from: str | None = None,
date_to: str | None = None,
limit: int = 50,
offset: int = 0,
) -> str:
"""
List sale orders (quotations/orders).
Args:
state: State (draft=Quotation, sent=Sent, sale=Sales Order, done=Locked, cancel=Cancelled)
partner_id: Filter by customer
date_from: Start date (YYYY-MM-DD)
date_to: End date (YYYY-MM-DD)
limit: Maximum number (default: 50)
offset: Offset for pagination (default: 0)
Returns:
List of orders
"""
valid_states = ["draft", "sent", "sale", "done", "cancel"]
if state is not None and state not in valid_states:
return f"Error: Invalid state '{state}'. Valid states: {', '.join(valid_states)}"
if date_from is not None and not validate_date(date_from):
return f"Error: Invalid date format for date_from: {date_from}"
if date_to is not None and not validate_date(date_to):
return f"Error: Invalid date format for date_to: {date_to}"
domain: OdooDomain = []
if state is not None:
domain.append(("state", "=", state))
if partner_id is not None:
domain.append(("partner_id", "=", partner_id))
if date_from is not None:
domain.append(("date_order", ">=", date_from))
if date_to is not None:
domain.append(("date_order", "<=", date_to))
orders = odoo.search_read(
"sale.order",
domain,
["id", "name", "partner_id", "date_order", "amount_total", "state", "currency_id", "user_id"],
limit=limit,
offset=offset,
order="date_order desc, id desc"
)
state_labels = {
"draft": "Quotation",
"sent": "Quotation Sent",
"sale": "Sales Order",
"done": "Locked",
"cancel": "Cancelled",
}
result = "# Sale Orders\n\n"
if offset > 0:
result += f"*(Page starting from entry {offset + 1})*\n\n"
total = 0.0
for order in orders:
partner_name = _get_many2one_name(order.get("partner_id"))
currency = _get_many2one_name(order.get("currency_id"), "EUR")
user_name = _get_many2one_name(order.get("user_id"))
state_label = state_labels.get(order.get("state"), order.get("state"))
amount = order.get("amount_total", 0)
total += amount
result += f"- **{order['name']}** (ID {order['id']})\n"
result += f" - Customer: {partner_name}\n"
result += f" - Date: {order.get('date_order', 'N/A')[:10] if order.get('date_order') else 'N/A'}\n"
result += f" - Amount: {amount} {currency}\n"
result += f" - State: {state_label}\n"
result += f" - Salesperson: {user_name}\n\n"
if not orders:
result += "No orders found.\n"
else:
result += f"\n**Total: {total}** ({len(orders)} orders)"
if len(orders) == limit:
result += f"\n*Use offset={offset + limit} to see more.*"
return result
@mcp.tool()
@handle_odoo_errors
def get_sale_order(order_id: int) -> str:
"""
Get details of a sale order with its lines.
Args:
order_id: Order ID
Returns:
Complete order details
"""
orders = odoo.search_read(
"sale.order",
[("id", "=", order_id)],
[
"id", "name", "partner_id", "date_order", "validity_date",
"amount_untaxed", "amount_tax", "amount_total", "state",
"currency_id", "user_id", "note", "order_line"
],
limit=1
)
if not orders:
return f"Error: Order ID {order_id} not found."
order = orders[0]
state_labels = {
"draft": "Quotation",
"sent": "Quotation Sent",
"sale": "Sales Order",
"done": "Locked",
"cancel": "Cancelled",
}
result = f"# Order: {order['name']}\n\n"
result += f"- **ID**: {order['id']}\n"
result += f"- **Customer**: {_get_many2one_name(order.get('partner_id'))}\n"
result += f"- **Date**: {order.get('date_order', 'N/A')[:10] if order.get('date_order') else 'N/A'}\n"
if order.get("validity_date"):
result += f"- **Validity**: {order['validity_date']}\n"
result += f"- **Salesperson**: {_get_many2one_name(order.get('user_id'))}\n"
result += f"- **State**: {state_labels.get(order.get('state'), order.get('state'))}\n"
currency = _get_many2one_name(order.get("currency_id"), "EUR")
result += f"\n## Amounts ({currency})\n"
result += f"- Untaxed: {order.get('amount_untaxed', 0)}\n"
result += f"- Tax: {order.get('amount_tax', 0)}\n"
result += f"- **Total**: {order.get('amount_total', 0)}\n"
# Get lines
if order.get("order_line"):
lines = odoo.search_read(
"sale.order.line",
[("id", "in", order["order_line"])],
["name", "product_id", "product_uom_qty", "price_unit", "price_subtotal"],
limit=100
)
if lines:
result += "\n## Lines\n"
for line in lines:
product_name = _get_many2one_name(line.get("product_id"), line.get("name", ""))
result += f"- {product_name}: {line.get('product_uom_qty', 1)} x {line.get('price_unit', 0)} = {line.get('price_subtotal', 0)}\n"
if order.get("note"):
result += f"\n## Notes\n{order['note']}\n"
return result
# ============== PRODUCT TOOLS ==============
@mcp.tool()
@handle_odoo_errors
def list_products(
product_type: str | None = None,
categ_id: int | None = None,
available: bool | None = None,
limit: int = 50,
offset: int = 0,
) -> str:
"""
List products in Odoo.
Args:
product_type: Product type (consu=Consumable, service=Service, product=Storable)
categ_id: Filter by product category
available: Filter products available for sale (True/False)
limit: Maximum number (default: 50)
offset: Offset for pagination (default: 0)
Returns:
List of products
"""
valid_types = ["consu", "service", "product"]
if product_type is not None and product_type not in valid_types:
return f"Error: Invalid type '{product_type}'. Valid types: {', '.join(valid_types)}"
domain: OdooDomain = [("active", "=", True)]
if product_type is not None:
domain.append(("detailed_type", "=", product_type))
if categ_id is not None:
domain.append(("categ_id", "=", categ_id))
if available is not None:
domain.append(("sale_ok", "=", available))
products = odoo.search_read(
"product.product",
domain,
["id", "name", "default_code", "list_price", "standard_price", "qty_available", "detailed_type", "categ_id"],
limit=limit,
offset=offset,
order="name"
)
type_labels = {
"consu": "Consumable",
"service": "Service",
"product": "Storable",
}
result = "# Products\n\n"
if offset > 0:
result += f"*(Page starting from entry {offset + 1})*\n\n"
for p in products:
type_label = type_labels.get(p.get("detailed_type"), p.get("detailed_type"))
categ_name = _get_many2one_name(p.get("categ_id"))
result += f"- **ID {p['id']}**: {p['name']}\n"
if p.get("default_code"):
result += f" - Ref: {p['default_code']}\n"
result += f" - Type: {type_label}\n"
result += f" - Category: {categ_name}\n"
result += f" - Sale price: {p.get('list_price', 0)}\n"
result += f" - Cost: {p.get('standard_price', 0)}\n"
if p.get("detailed_type") == "product":
result += f" - Stock: {p.get('qty_available', 0)}\n"
result += "\n"
if not products:
result += "No products found.\n"
else:
result += _format_pagination(len(products), limit, offset, "product(s)")
return result
@mcp.tool()
@handle_odoo_errors
def get_product(product_id: int) -> str:
"""
Get details of a specific product.
Args:
product_id: Product ID
Returns:
Complete product details
"""
products = odoo.search_read(
"product.product",
[("id", "=", product_id)],
[
"id", "name", "default_code", "barcode", "list_price", "standard_price",
"qty_available", "virtual_available", "detailed_type", "categ_id",
"description", "description_sale", "uom_id", "uom_po_id", "sale_ok", "purchase_ok"
],
limit=1
)
if not products:
return f"Error: Product ID {product_id} not found."
p = products[0]
type_labels = {
"consu": "Consumable",
"service": "Service",
"product": "Storable",
}
result = f"# Product: {p['name']}\n\n"
result += f"- **ID**: {p['id']}\n"
if p.get("default_code"):
result += f"- **Reference**: {p['default_code']}\n"
if p.get("barcode"):
result += f"- **Barcode**: {p['barcode']}\n"
result += f"- **Type**: {type_labels.get(p.get('detailed_type'), p.get('detailed_type'))}\n"
result += f"- **Category**: {_get_many2one_name(p.get('categ_id'))}\n"
result += f"- **Unit of Measure**: {_get_many2one_name(p.get('uom_id'))}\n"
result += "\n## Prices\n"
result += f"- Sale price: {p.get('list_price', 0)}\n"
result += f"- Cost: {p.get('standard_price', 0)}\n"
if p.get("detailed_type") == "product":
result += "\n## Stock\n"
result += f"- On hand: {p.get('qty_available', 0)}\n"
result += f"- Forecasted: {p.get('virtual_available', 0)}\n"
result += "\n## Availability\n"
result += f"- Can be sold: {'Yes' if p.get('sale_ok') else 'No'}\n"
result += f"- Can be purchased: {'Yes' if p.get('purchase_ok') else 'No'}\n"
if p.get("description_sale"):
result += f"\n## Sale Description\n{p['description_sale']}\n"
return result
# ============== EMPLOYEE TOOLS ==============
@mcp.tool()
@handle_odoo_errors
def list_employees(
department_id: int | None = None,
limit: int = 50,
offset: int = 0,
) -> str:
"""
List employees in Odoo.
Args:
department_id: Filter by department
limit: Maximum number (default: 50)
offset: Offset for pagination (default: 0)
Returns:
List of employees
"""
domain: OdooDomain = [("active", "=", True)]
if department_id is not None:
domain.append(("department_id", "=", department_id))
employees = odoo.search_read(
"hr.employee",
domain,
["id", "name", "job_title", "department_id", "work_email", "work_phone", "parent_id"],
limit=limit,
offset=offset,
order="name"
)
result = "# Employees\n\n"
if offset > 0:
result += f"*(Page starting from entry {offset + 1})*\n\n"
for e in employees:
dept_name = _get_many2one_name(e.get("department_id"))
manager_name = _get_many2one_name(e.get("parent_id"))
result += f"- **ID {e['id']}**: {e['name']}\n"
if e.get("job_title"):
result += f" - Position: {e['job_title']}\n"
result += f" - Department: {dept_name}\n"
if manager_name != "N/A":
result += f" - Manager: {manager_name}\n"
if e.get("work_email"):
result += f" - Email: {e['work_email']}\n"
if e.get("work_phone"):
result += f" - Phone: {e['work_phone']}\n"
result += "\n"
if not employees:
result += "No employees found.\n"
else:
result += _format_pagination(len(employees), limit, offset, "employee(s)")
return result
@mcp.tool()
@handle_odoo_errors
def get_employee(employee_id: int) -> str:
"""
Get details of a specific employee.
Args:
employee_id: Employee ID
Returns:
Complete employee details
"""
employees = odoo.search_read(
"hr.employee",
[("id", "=", employee_id)],
[
"id", "name", "job_title", "department_id", "parent_id",
"work_email", "work_phone", "mobile_phone",
"user_id", "coach_id", "address_id"
],
limit=1
)
if not employees:
return f"Error: Employee ID {employee_id} not found."
e = employees[0]
result = f"# Employee: {e['name']}\n\n"
result += f"- **ID**: {e['id']}\n"
if e.get("job_title"):
result += f"- **Position**: {e['job_title']}\n"
result += f"- **Department**: {_get_many2one_name(e.get('department_id'))}\n"
manager_name = _get_many2one_name(e.get("parent_id"))
if manager_name != "N/A":
result += f"- **Manager**: {manager_name}\n"
coach_name = _get_many2one_name(e.get("coach_id"))
if coach_name != "N/A":
result += f"- **Coach**: {coach_name}\n"
result += "\n## Contact\n"
if e.get("work_email"):
result += f"- Work email: {e['work_email']}\n"
if e.get("work_phone"):
result += f"- Work phone: {e['work_phone']}\n"
if e.get("mobile_phone"):
result += f"- Mobile: {e['mobile_phone']}\n"
user_name = _get_many2one_name(e.get("user_id"))
if user_name != "N/A":
result += f"\n- **Odoo user**: {user_name}\n"
return result
@mcp.tool()
@handle_odoo_errors
def list_departments(limit: int = 50, offset: int = 0) -> str:
"""
List departments in Odoo.
Args:
limit: Maximum number (default: 50)
offset: Offset for pagination (default: 0)
Returns:
List of departments
"""
departments = odoo.search_read(
"hr.department",
[("active", "=", True)],
["id", "name", "manager_id", "parent_id", "total_employee"],
limit=limit,
offset=offset,
order="name"
)
result = "# Departments\n\n"
if offset > 0:
result += f"*(Page starting from entry {offset + 1})*\n\n"
for d in departments:
manager_name = _get_many2one_name(d.get("manager_id"))
parent_name = _get_many2one_name(d.get("parent_id"))
result += f"- **ID {d['id']}**: {d['name']}\n"
result += f" - Manager: {manager_name}\n"
if parent_name != "N/A":
result += f" - Parent department: {parent_name}\n"
result += f" - Employee count: {d.get('total_employee', 0)}\n\n"
if not departments:
result += "No departments found.\n"
else:
result += _format_pagination(len(departments), limit, offset, "department(s)")
return result
# ============== LEAVE ALLOCATION TOOLS ==============
@mcp.tool()
@handle_odoo_errors
def list_leave_types(limit: int = 50, offset: int = 0) -> str:
"""
List available leave types in Odoo.
Args:
limit: Maximum number (default: 50)
offset: Offset for pagination (default: 0)
Returns:
List of leave types with their ID and name
"""
leave_types = odoo.search_read(
"hr.leave.type",
[("active", "=", True)],
["id", "name", "requires_allocation", "allocation_validation_type"],
limit=limit,
offset=offset,
order="name"
)
result = "# Leave Types\n\n"
if offset > 0:
result += f"*(Page starting from entry {offset + 1})*\n\n"
for lt in leave_types:
requires_alloc = "Yes" if lt.get("requires_allocation") == "yes" else "No"
result += f"- **ID {lt['id']}**: {lt['name']}\n"
result += f" - Requires allocation: {requires_alloc}\n\n"
if not leave_types:
result += "No leave types found.\n"
else:
result += _format_pagination(len(leave_types), limit, offset, "leave type(s)")
return result
@mcp.tool()
@handle_odoo_errors
def create_leave_allocation(
leave_type_id: int,
number_of_days: float,
name: str,
employee_id: int | None = None,
department_id: int | None = None,
all_employees: bool = False,
date_from: str | None = None,
date_to: str | None = None,
) -> str:
"""
Create a leave allocation for employees.
Args:
leave_type_id: Leave type ID (see list_leave_types)
number_of_days: Number of days to allocate
name: Allocation name/reason
employee_id: Specific employee ID (optional)
department_id: Department ID to allocate to all employees in department (optional)
all_employees: If True, allocate to all employees (default: False)
date_from: Start validity date YYYY-MM-DD (optional)
date_to: End validity date YYYY-MM-DD (optional)
Returns:
Confirmation with created allocation ID(s)
"""
# Validation
if number_of_days <= 0:
return "Error: Number of days must be positive."
if not name.strip():
return "Error: Allocation name cannot be empty."
# Check leave type exists
if not odoo.exists("hr.leave.type", leave_type_id):
return f"Error: Leave type ID {leave_type_id} not found. Use list_leave_types to see available types."
# Date validation
if date_from is not None and not validate_date(date_from):
return f"Error: Invalid date format for date_from: {date_from}. Use YYYY-MM-DD."
if date_to is not None and not validate_date(date_to):
return f"Error: Invalid date format for date_to: {date_to}. Use YYYY-MM-DD."
# Determine which employees to allocate to
employees_to_allocate: list[dict[str, Any]] = []
if employee_id is not None:
# Single employee
if not odoo.exists("hr.employee", employee_id):
return f"Error: Employee ID {employee_id} not found."
employees_to_allocate = [{"id": employee_id}]
elif department_id is not None:
# All employees in department
if not odoo.exists("hr.department", department_id):
return f"Error: Department ID {department_id} not found."
employees_to_allocate = odoo.search_read(
"hr.employee",
[("department_id", "=", department_id), ("active", "=", True)],
["id", "name"],
limit=500
)
elif all_employees:
# All active employees
employees_to_allocate = odoo.search_read(
"hr.employee",
[("active", "=", True)],
["id", "name"],
limit=500
)
else:
return "Error: You must specify employee_id, department_id, or set all_employees=True."
if not employees_to_allocate:
return "Error: No employees found for the specified criteria."
# Create allocations
created_ids: list[int] = []
errors: list[str] = []
for emp in employees_to_allocate:
values: dict[str, Any] = {
"name": name,
"holiday_status_id": leave_type_id,
"number_of_days": number_of_days,
"employee_id": emp["id"],
}
if date_from is not None:
values["date_from"] = date_from
if date_to is not None:
values["date_to"] = date_to
try:
alloc_id = odoo.create("hr.leave.allocation", values)
created_ids.append(alloc_id)
except OdooError as e:
errors.append(f"Employee {emp.get('name', emp['id'])}: {e}")
# Build result message
result = f"# Leave Allocation Results\n\n"
result += f"- **Allocation name**: {name}\n"
result += f"- **Days**: {number_of_days}\n"
result += f"- **Successfully created**: {len(created_ids)} allocation(s)\n"
if created_ids:
result += f"- **Allocation IDs**: {created_ids}\n"
if errors:
result += f"\n## Errors ({len(errors)})\n"
for error in errors[:10]: # Show first 10 errors
result += f"- {error}\n"
if len(errors) > 10:
result += f"- ... and {len(errors) - 10} more errors\n"
return result
@mcp.tool()
@handle_odoo_errors
def list_leave_allocations(
employee_id: int | None = None,
leave_type_id: int | None = None,
state: str | None = None,
limit: int = 50,
offset: int = 0,
) -> str:
"""
List leave allocations.
Args:
employee_id: Filter by employee ID (optional)
leave_type_id: Filter by leave type ID (optional)
state: Filter by state: draft, confirm, validate, refuse (optional)
limit: Maximum number (default: 50)
offset: Offset for pagination (default: 0)
Returns:
List of leave allocations
"""
valid_states = ["draft", "confirm", "validate", "refuse"]
if state is not None and state not in valid_states:
return f"Error: Invalid state '{state}'. Valid states: {', '.join(valid_states)}"
domain: OdooDomain = []
if employee_id is not None:
domain.append(("employee_id", "=", employee_id))
if leave_type_id is not None:
domain.append(("holiday_status_id", "=", leave_type_id))
if state is not None:
domain.append(("state", "=", state))
allocations = odoo.search_read(
"hr.leave.allocation",
domain,
["id", "name", "employee_id", "holiday_status_id", "number_of_days", "state", "date_from", "date_to"],
limit=limit,
offset=offset,
order="create_date desc"
)
state_labels = {
"draft": "Draft",
"confirm": "To Approve",
"validate": "Approved",
"refuse": "Refused",
}
result = "# Leave Allocations\n\n"
if offset > 0:
result += f"*(Page starting from entry {offset + 1})*\n\n"
total_days = 0.0
for alloc in allocations:
emp_name = _get_many2one_name(alloc.get("employee_id"))
leave_type = _get_many2one_name(alloc.get("holiday_status_id"))
state_label = state_labels.get(alloc.get("state", ""), alloc.get("state", ""))
days = alloc.get("number_of_days", 0)
total_days += days
result += f"- **ID {alloc['id']}**: {alloc.get('name', 'N/A')}\n"
result += f" - Employee: {emp_name}\n"
result += f" - Leave type: {leave_type}\n"
result += f" - Days: {days}\n"
result += f" - State: {state_label}\n"
if alloc.get("date_from"):
result += f" - Valid from: {alloc['date_from']}\n"
if alloc.get("date_to"):
result += f" - Valid to: {alloc['date_to']}\n"
result += "\n"
if not allocations:
result += "No allocations found.\n"
else:
result += f"\n**Total: {total_days} days** ({len(allocations)} allocations)"
if len(allocations) == limit:
result += f"\n*Use offset={offset + limit} to see more.*"
return result
@mcp.tool()
@handle_odoo_errors
def approve_leave_allocation(allocation_id: int) -> str:
"""
Approve a leave allocation (change state from draft/confirm to validate).
Args:
allocation_id: Allocation ID to approve
Returns:
Confirmation message
"""
# Check allocation exists
allocations = odoo.search_read(
"hr.leave.allocation",
[("id", "=", allocation_id)],
["id", "name", "state", "employee_id"],
limit=1
)
if not allocations:
return f"Error: Allocation ID {allocation_id} not found."
alloc = allocations[0]
if alloc.get("state") == "validate":
return f"Allocation '{alloc.get('name')}' is already approved."
if alloc.get("state") == "refuse":
return f"Error: Cannot approve a refused allocation. Create a new one instead."
try:
# Call the action_validate method
odoo.execute("hr.leave.allocation", "action_validate", [allocation_id])
emp_name = _get_many2one_name(alloc.get("employee_id"))
return f"Allocation '{alloc.get('name')}' for {emp_name} has been approved!"
except OdooError as e:
return f"Error approving allocation: {e}"
# ============== PUBLIC HOLIDAYS TOOLS ==============
@mcp.tool()
@handle_odoo_errors
def create_public_holiday(
name: str,
date: str,
calendar_id: int = 1,
) -> str:
"""
Create a public holiday in Odoo (global time off on working calendar).
Args:
name: Holiday name (e.g., "Nouvel An", "Fête nationale")
date: Holiday date in YYYY-MM-DD format
calendar_id: Working calendar ID (default: 1 = Standard 40 hours/week)
Returns:
Confirmation with created holiday ID
"""
# Validate date
if not validate_date(date):
return f"Error: Invalid date format: {date}. Use YYYY-MM-DD."
# Check calendar exists
if not odoo.exists("resource.calendar", calendar_id):
return f"Error: Calendar ID {calendar_id} not found."
# Create the global leave (public holiday)
# For all-day events, we need to set time_type to 'leave' and provide full day times
values = {
"name": name,
"calendar_id": calendar_id,
"date_from": f"{date} 00:00:00",
"date_to": f"{date} 23:59:59",
"time_type": "leave",
}
try:
holiday_id = odoo.create("resource.calendar.leaves", values)
return f"Public holiday '{name}' on {date} created successfully (ID: {holiday_id})."
except OdooError as e:
return f"Error creating public holiday: {e}"
@mcp.tool()
@handle_odoo_errors
def list_public_holidays(
calendar_id: int = 1,
year: int | None = None,
limit: int = 50,
offset: int = 0,
) -> str:
"""
List public holidays (global time off) from a working calendar.
Args:
calendar_id: Working calendar ID (default: 1)
year: Filter by year (optional)
limit: Maximum number of entries (default: 50)
offset: Offset for pagination (default: 0)
Returns:
List of public holidays
"""
domain: list[Any] = [
("calendar_id", "=", calendar_id),
("resource_id", "=", False), # Global leaves have no specific resource
]
if year is not None:
domain.append(("date_from", ">=", f"{year}-01-01"))
domain.append(("date_to", "<=", f"{year}-12-31"))
holidays = odoo.search_read(
"resource.calendar.leaves",
domain,
["id", "name", "date_from", "date_to"],
limit=limit,
offset=offset,
order="date_from"
)
result = "# Public Holidays\n\n"
if year:
result += f"*Year: {year}*\n\n"
for h in holidays:
date_from = h.get("date_from", "")[:10] if h.get("date_from") else "N/A"
result += f"- **ID {h['id']}**: {h.get('name', 'N/A')} ({date_from})\n"
if not holidays:
result += "No public holidays found.\n"
else:
result += f"\n*{len(holidays)} public holiday(s) displayed*"
if len(holidays) == limit:
result += f"\n*Use offset={offset + limit} to see more.*"
return result
# ============== UTILITY TOOLS ==============
@mcp.tool()
@handle_odoo_errors
def test_connection() -> str:
"""
Test the connection to Odoo.
Returns:
Connection and user information
"""
version: dict[str, Any] = odoo.common.version()
uid = odoo.uid
# Get user info
users = odoo.search_read(
"res.users",
[("id", "=", uid)],
["name", "login", "company_id"],
limit=1
)
user_info = users[0] if users else {}
company_name = _get_many2one_name(user_info.get("company_id"))
return f"""# Odoo Connection Successful!
- **URL**: {ODOO_URL}
- **Database**: {ODOO_DB}
- **Odoo Version**: {version.get('server_version', 'N/A')}
- **User**: {user_info.get('name', 'N/A')} ({user_info.get('login', 'N/A')})
- **Company**: {company_name}
- **UID**: {uid}
"""
@mcp.tool()
@handle_odoo_errors
def search_records(
model: str,
field: str,
value: str,
limit: int = 20,
offset: int = 0,
) -> str:
"""
Search records in any Odoo model.
Args:
model: Technical model name (e.g., res.partner, sale.order)
field: Field to search on (e.g., name, email)
value: Value to search for (partial match with ilike)
limit: Maximum number of results (default: 20)
offset: Offset for pagination (default: 0)
Returns:
List of found records
"""
# Validation
if not model.strip():
return "Error: Model name cannot be empty."
if not field.strip():
return "Error: Field name cannot be empty."
if not value.strip():
return "Error: Search value cannot be empty."
records = odoo.search_read(
model,
[(field, "ilike", value)],
["id", "name", "display_name"],
limit=limit,
offset=offset
)
result = f"# Results for '{value}' in {model}.{field}\n\n"
if offset > 0:
result += f"*(Page starting from entry {offset + 1})*\n\n"
for r in records:
display = r.get("display_name") or r.get("name") or f"ID {r['id']}"
result += f"- **ID {r['id']}**: {display}\n"
if not records:
result += "No results found.\n"
else:
result += _format_pagination(len(records), limit, offset, "result(s)")
return result
# ============== R&D ANALYTICS TOOLS ==============
@mcp.tool()
@handle_odoo_errors
def get_rd_project_costs(
date_from: str | None = None,
date_to: str | None = None,
) -> str:
"""
Get total costs per R&D project from analytic accounts.
Args:
date_from: Start date (format YYYY-MM-DD, optional)
date_to: End date (format YYYY-MM-DD, optional)
Returns:
Summary of costs by R&D project
"""
# Get all R&D analytic accounts
rd_accounts = odoo.search_read(
"account.analytic.account",
["|", ("name", "ilike", "R&D"), ("name", "ilike", "Research")],
["id", "name"],
limit=100
)
if not rd_accounts:
return "No R&D analytic accounts found."
result = "# R&D Project Costs Summary\n\n"
if date_from or date_to:
result += f"*Period: {date_from or 'start'} to {date_to or 'now'}*\n\n"
total_cost = 0.0
total_hours = 0.0
project_data = []
for account in rd_accounts:
account_id = account["id"]
account_name = account["name"]
# Skip template accounts
if "Template" in account_name:
continue
# Build domain for analytic lines
domain: OdooDomain = [("account_id", "=", account_id)]
if date_from:
domain.append(("date", ">=", date_from))
if date_to:
domain.append(("date", "<=", date_to))
# Get all analytic lines for this account
lines = odoo.search_read(
"account.analytic.line",
domain,
["id", "amount", "unit_amount", "category"],
limit=10000
)
# Calculate totals
project_cost = sum(abs(line.get("amount", 0) or 0) for line in lines if (line.get("amount") or 0) < 0)
project_hours = sum(line.get("unit_amount", 0) or 0 for line in lines)
entry_count = len(lines)
if project_cost > 0 or project_hours > 0:
project_data.append({
"name": account_name,
"cost": project_cost,
"hours": project_hours,
"entries": entry_count
})
total_cost += project_cost
total_hours += project_hours
# Sort by cost descending
project_data.sort(key=lambda x: x["cost"], reverse=True)
# Format output
result += "| Projet | Coût (€) | Heures | Entrées |\n"
result += "|--------|----------|--------|--------|\n"
for p in project_data:
result += f"| {p['name']} | {p['cost']:,.2f} | {p['hours']:,.2f} | {p['entries']} |\n"
result += f"| **TOTAL** | **{total_cost:,.2f}** | **{total_hours:,.2f}** | |\n"
return result
if __name__ == "__main__":
validate_config()
logger.info("Starting MCP Odoo server...")
mcp.run()