"""
MCP server for Odoo integration
Provides MCP tools and resources for interacting with Odoo ERP systems
"""
import json
from contextlib import asynccontextmanager
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any, AsyncIterator, Dict, List, Optional, Union, cast
from mcp.server.fastmcp import Context, FastMCP
from pydantic import BaseModel, Field
from .odoo_client import OdooClient, get_odoo_client
@dataclass
class AppContext:
"""Application context for the MCP server"""
odoo: OdooClient
@asynccontextmanager
async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
"""
Application lifespan for initialization and cleanup
"""
# Initialize Odoo client on startup
odoo_client = get_odoo_client()
try:
yield AppContext(odoo=odoo_client)
finally:
# No cleanup needed for Odoo client
pass
# Create MCP server
mcp = FastMCP(
"Odoo MCP Server",
description="MCP Server for interacting with Odoo ERP systems",
dependencies=["requests"],
lifespan=app_lifespan,
)
# ----- MCP Resources -----
@mcp.resource(
"odoo://models", description="List all available models in the Odoo system"
)
def get_models() -> str:
"""Lists all available models in the Odoo system"""
odoo_client = get_odoo_client()
models = odoo_client.get_models()
return json.dumps(models, indent=2)
@mcp.resource(
"odoo://model/{model_name}",
description="Get detailed information about a specific model including fields",
)
def get_model_info(model_name: str) -> str:
"""
Get information about a specific model
Parameters:
model_name: Name of the Odoo model (e.g., 'res.partner')
"""
odoo_client = get_odoo_client()
try:
# Get model info
model_info = odoo_client.get_model_info(model_name)
# Get field definitions
fields = odoo_client.get_model_fields(model_name)
model_info["fields"] = fields
return json.dumps(model_info, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://record/{model_name}/{record_id}",
description="Get detailed information of a specific record by ID",
)
def get_record(model_name: str, record_id: str) -> str:
"""
Get a specific record by ID
Parameters:
model_name: Name of the Odoo model (e.g., 'res.partner')
record_id: ID of the record
"""
odoo_client = get_odoo_client()
try:
record_id_int = int(record_id)
record = odoo_client.read_records(model_name, [record_id_int])
if not record:
return json.dumps(
{"error": f"Record not found: {model_name} ID {record_id}"}, indent=2
)
return json.dumps(record[0], indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://search/{model_name}/{domain}",
description="Search for records matching the domain",
)
def search_records_resource(model_name: str, domain: str) -> str:
"""
Search for records that match a domain
Parameters:
model_name: Name of the Odoo model (e.g., 'res.partner')
domain: Search domain in JSON format (e.g., '[["name", "ilike", "test"]]')
"""
odoo_client = get_odoo_client()
try:
# Parse domain from JSON string
domain_list = json.loads(domain)
# Set a reasonable default limit
limit = 10
# Perform search_read for efficiency
results = odoo_client.search_read(model_name, domain_list, limit=limit)
return json.dumps(results, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://database/info", description="Get database information and statistics"
)
def get_database_info_resource() -> str:
"""Get comprehensive database information"""
odoo_client = get_odoo_client()
try:
info = {
"database_name": odoo_client.db,
"server_url": odoo_client.url,
"connection_status": "active"
}
return json.dumps(info, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://modules/installed", description="List all installed modules"
)
def get_installed_modules_resource() -> str:
"""Get list of installed modules"""
odoo_client = get_odoo_client()
try:
modules = odoo_client.search_read(
"ir.module.module", [["state", "=", "installed"]],
fields=["name", "shortdesc", "version"]
)
return json.dumps(modules, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://users/active", description="List all active users"
)
def get_active_users_resource() -> str:
"""Get list of active users"""
odoo_client = get_odoo_client()
try:
users = odoo_client.search_read(
"res.users", [["active", "=", True]],
fields=["name", "login", "email", "groups_id"]
)
return json.dumps(users, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://companies", description="List all companies"
)
def get_companies_resource() -> str:
"""Get list of companies"""
odoo_client = get_odoo_client()
try:
companies = odoo_client.search_read(
"res.company", [],
fields=["name", "email", "phone", "website", "currency_id"]
)
return json.dumps(companies, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://hr/employees", description="List all employees"
)
def get_employees_resource() -> str:
"""Get list of employees"""
odoo_client = get_odoo_client()
try:
employees = odoo_client.search_read(
"hr.employee", [],
fields=["name", "job_title", "department_id", "work_email", "work_phone"]
)
return json.dumps(employees, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://hr/departments", description="List all departments"
)
def get_departments_resource() -> str:
"""Get list of departments"""
odoo_client = get_odoo_client()
try:
departments = odoo_client.search_read(
"hr.department", [],
fields=["name", "parent_id", "manager_id", "member_ids"]
)
return json.dumps(departments, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://hr/jobs", description="List all job positions"
)
def get_jobs_resource() -> str:
"""Get list of job positions"""
odoo_client = get_odoo_client()
try:
jobs = odoo_client.search_read(
"hr.job", [],
fields=["name", "department_id", "no_of_recruitment", "state"]
)
return json.dumps(jobs, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://hr/leaves", description="List employee leaves"
)
def get_leaves_resource() -> str:
"""Get list of employee leaves"""
odoo_client = get_odoo_client()
try:
leaves = odoo_client.search_read(
"hr.leave", [],
fields=["employee_id", "holiday_status_id", "date_from", "date_to", "state"]
)
return json.dumps(leaves, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://hr/attendance", description="Employee attendance records"
)
def get_attendance_resource() -> str:
"""Get employee attendance records"""
odoo_client = get_odoo_client()
try:
attendance = odoo_client.search_read(
"hr.attendance", [],
fields=["employee_id", "check_in", "check_out", "worked_hours"],
limit=100
)
return json.dumps(attendance, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://hr/payroll", description="Payroll information"
)
def get_payroll_resource() -> str:
"""Get payroll information"""
odoo_client = get_odoo_client()
try:
payslips = odoo_client.search_read(
"hr.payslip", [],
fields=["employee_id", "date_from", "date_to", "state", "net_wage"],
limit=50
)
return json.dumps(payslips, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://hr/contracts", description="Employee contracts"
)
def get_hr_contracts_resource() -> str:
"""Get employee contracts"""
odoo_client = get_odoo_client()
try:
contracts = odoo_client.search_read(
"hr.contract", [],
fields=["employee_id", "name", "date_start", "date_end", "wage", "state"]
)
return json.dumps(contracts, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://hr/holidays", description="Holiday calendar and policies"
)
def get_hr_holidays_resource() -> str:
"""Get holiday calendar and policies"""
odoo_client = get_odoo_client()
try:
holidays = odoo_client.search_read(
"hr.leave.type", [],
fields=["name", "allocation_type", "validity_start", "validity_stop", "max_leaves"]
)
return json.dumps(holidays, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://hr/expenses", description="Employee expenses"
)
def get_hr_expenses_resource() -> str:
"""Get employee expenses"""
odoo_client = get_odoo_client()
try:
expenses = odoo_client.search_read(
"hr.expense", [],
fields=["employee_id", "name", "product_id", "unit_amount", "date", "state"],
limit=100
)
return json.dumps(expenses, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://hr/timesheets", description="Employee timesheets"
)
def get_hr_timesheets_resource() -> str:
"""Get employee timesheets"""
odoo_client = get_odoo_client()
try:
timesheets = odoo_client.search_read(
"account.analytic.line", [["employee_id", "!=", False]],
fields=["employee_id", "project_id", "task_id", "date", "unit_amount", "name"],
limit=100
)
return json.dumps(timesheets, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://hr/evaluations", description="Employee evaluations"
)
def get_hr_evaluations_resource() -> str:
"""Get employee evaluations"""
odoo_client = get_odoo_client()
try:
evaluations = odoo_client.search_read(
"hr.appraisal", [],
fields=["employee_id", "date_close", "state", "final_interview"]
)
return json.dumps(evaluations, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://hr/recruitments", description="Job recruitment applications"
)
def get_hr_recruitments_resource() -> str:
"""Get job recruitment applications"""
odoo_client = get_odoo_client()
try:
recruitments = odoo_client.search_read(
"hr.applicant", [],
fields=["partner_name", "job_id", "stage_id", "email_from", "priority"],
limit=50
)
return json.dumps(recruitments, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://sales/orders", description="Sales orders"
)
def get_sales_orders_resource() -> str:
"""Get sales orders"""
odoo_client = get_odoo_client()
try:
orders = odoo_client.search_read(
"sale.order", [],
fields=["name", "partner_id", "date_order", "amount_total", "state"],
limit=50
)
return json.dumps(orders, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://sales/customers", description="Customer list"
)
def get_customers_resource() -> str:
"""Get customer list"""
odoo_client = get_odoo_client()
try:
customers = odoo_client.search_read(
"res.partner", [["is_company", "=", True], ["customer_rank", ">", 0]],
fields=["name", "email", "phone", "city", "country_id"]
)
return json.dumps(customers, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://sales/quotations", description="Sales quotations"
)
def get_sales_quotations_resource() -> str:
"""Get sales quotations"""
odoo_client = get_odoo_client()
try:
quotations = odoo_client.search_read(
"sale.order", [["state", "in", ["draft", "sent"]]],
fields=["name", "partner_id", "date_order", "amount_total", "validity_date"],
limit=50
)
return json.dumps(quotations, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://sales/teams", description="Sales teams"
)
def get_sales_teams_resource() -> str:
"""Get sales teams"""
odoo_client = get_odoo_client()
try:
teams = odoo_client.search_read(
"crm.team", [],
fields=["name", "user_id", "member_ids", "invoiced_target"]
)
return json.dumps(teams, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://sales/commissions", description="Sales commissions"
)
def get_sales_commissions_resource() -> str:
"""Get sales commissions"""
odoo_client = get_odoo_client()
try:
commissions = odoo_client.search_read(
"sale.commission", [],
fields=["name", "commission_type", "percentage", "fix_qty"]
)
return json.dumps(commissions, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://sales/campaigns", description="Marketing campaigns"
)
def get_sales_campaigns_resource() -> str:
"""Get marketing campaigns"""
odoo_client = get_odoo_client()
try:
campaigns = odoo_client.search_read(
"utm.campaign", [],
fields=["name", "medium_id", "source_id"]
)
return json.dumps(campaigns, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://sales/pricelists", description="Sales pricelists"
)
def get_sales_pricelists_resource() -> str:
"""Get sales pricelists"""
odoo_client = get_odoo_client()
try:
pricelists = odoo_client.search_read(
"product.pricelist", [],
fields=["name", "currency_id", "company_id", "active"]
)
return json.dumps(pricelists, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://products/catalog", description="Product catalog"
)
def get_products_resource() -> str:
"""Get product catalog"""
odoo_client = get_odoo_client()
try:
products = odoo_client.search_read(
"product.product", [],
fields=["name", "default_code", "list_price", "qty_available", "categ_id"]
)
return json.dumps(products, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://products/categories", description="Product categories"
)
def get_product_categories_resource() -> str:
"""Get product categories"""
odoo_client = get_odoo_client()
try:
categories = odoo_client.search_read(
"product.category", [],
fields=["name", "parent_id", "child_id"]
)
return json.dumps(categories, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://inventory/stock", description="Stock levels"
)
def get_stock_resource() -> str:
"""Get stock levels"""
odoo_client = get_odoo_client()
try:
stock = odoo_client.search_read(
"stock.quant", [["quantity", ">", 0]],
fields=["product_id", "location_id", "quantity", "reserved_quantity"]
)
return json.dumps(stock, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://inventory/moves", description="Stock movements"
)
def get_stock_moves_resource() -> str:
"""Get stock movements"""
odoo_client = get_odoo_client()
try:
moves = odoo_client.search_read(
"stock.move", [],
fields=["product_id", "location_id", "location_dest_id", "product_uom_qty", "state"],
limit=100
)
return json.dumps(moves, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://accounting/accounts", description="Chart of accounts"
)
def get_accounts_resource() -> str:
"""Get chart of accounts"""
odoo_client = get_odoo_client()
try:
accounts = odoo_client.search_read(
"account.account", [],
fields=["code", "name", "account_type", "currency_id"]
)
return json.dumps(accounts, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://accounting/invoices", description="Customer invoices"
)
def get_invoices_resource() -> str:
"""Get customer invoices"""
odoo_client = get_odoo_client()
try:
invoices = odoo_client.search_read(
"account.move", [["move_type", "=", "out_invoice"]],
fields=["name", "partner_id", "invoice_date", "amount_total", "state"],
limit=50
)
return json.dumps(invoices, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://accounting/bills", description="Vendor bills"
)
def get_bills_resource() -> str:
"""Get vendor bills"""
odoo_client = get_odoo_client()
try:
bills = odoo_client.search_read(
"account.move", [["move_type", "=", "in_invoice"]],
fields=["name", "partner_id", "invoice_date", "amount_total", "state"],
limit=50
)
return json.dumps(bills, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://accounting/payments", description="Payment records"
)
def get_payments_resource() -> str:
"""Get payment records"""
odoo_client = get_odoo_client()
try:
payments = odoo_client.search_read(
"account.payment", [],
fields=["name", "partner_id", "amount", "date", "state"],
limit=50
)
return json.dumps(payments, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://accounting/journals", description="Accounting journals"
)
def get_journals_resource() -> str:
"""Get accounting journals"""
odoo_client = get_odoo_client()
try:
journals = odoo_client.search_read(
"account.journal", [],
fields=["name", "code", "type", "currency_id"]
)
return json.dumps(journals, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://finance/bank/accounts", description="Bank accounts"
)
def get_bank_accounts_resource() -> str:
"""Get bank accounts"""
odoo_client = get_odoo_client()
try:
bank_accounts = odoo_client.search_read(
"res.partner.bank", [],
fields=["acc_number", "bank_id", "partner_id", "currency_id"]
)
return json.dumps(bank_accounts, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://finance/reconciliation", description="Bank reconciliation"
)
def get_reconciliation_resource() -> str:
"""Get bank reconciliation data"""
odoo_client = get_odoo_client()
try:
reconciliations = odoo_client.search_read(
"account.bank.statement", [],
fields=["name", "journal_id", "date", "balance_start", "balance_end_real"],
limit=50
)
return json.dumps(reconciliations, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://finance/budgets", description="Budget management"
)
def get_budgets_resource() -> str:
"""Get budget information"""
odoo_client = get_odoo_client()
try:
budgets = odoo_client.search_read(
"crossovered.budget", [],
fields=["name", "date_from", "date_to", "state"]
)
return json.dumps(budgets, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://finance/tax/returns", description="Tax returns and declarations"
)
def get_tax_returns_resource() -> str:
"""Get tax returns and declarations"""
odoo_client = get_odoo_client()
try:
taxes = odoo_client.search_read(
"account.tax", [],
fields=["name", "amount", "type_tax_use", "tax_group_id"]
)
return json.dumps(taxes, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://finance/analytics", description="Financial analytics"
)
def get_finance_analytics_resource() -> str:
"""Get financial analytics"""
odoo_client = get_odoo_client()
try:
analytics = odoo_client.search_read(
"account.analytic.account", [],
fields=["name", "code", "partner_id", "balance"]
)
return json.dumps(analytics, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://finance/cash/flow", description="Cash flow statements"
)
def get_cash_flow_resource() -> str:
"""Get cash flow statements"""
odoo_client = get_odoo_client()
try:
cash_moves = odoo_client.search_read(
"account.move.line", [["account_id.account_type", "=", "asset_cash"]],
fields=["date", "debit", "credit", "balance", "name"],
limit=100
)
return json.dumps(cash_moves, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://purchase/orders", description="Purchase orders"
)
def get_purchase_orders_resource() -> str:
"""Get purchase orders"""
odoo_client = get_odoo_client()
try:
orders = odoo_client.search_read(
"purchase.order", [],
fields=["name", "partner_id", "date_order", "amount_total", "state"],
limit=50
)
return json.dumps(orders, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://purchase/vendors", description="Vendor list"
)
def get_vendors_resource() -> str:
"""Get vendor list"""
odoo_client = get_odoo_client()
try:
vendors = odoo_client.search_read(
"res.partner", [["is_company", "=", True], ["supplier_rank", ">", 0]],
fields=["name", "email", "phone", "city", "country_id"]
)
return json.dumps(vendors, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://crm/leads", description="CRM leads"
)
def get_leads_resource() -> str:
"""Get CRM leads"""
odoo_client = get_odoo_client()
try:
leads = odoo_client.search_read(
"crm.lead", [],
fields=["name", "partner_name", "email_from", "stage_id", "probability"],
limit=50
)
return json.dumps(leads, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://crm/opportunities", description="Sales opportunities"
)
def get_opportunities_resource() -> str:
"""Get sales opportunities"""
odoo_client = get_odoo_client()
try:
opportunities = odoo_client.search_read(
"crm.lead", [["type", "=", "opportunity"]],
fields=["name", "partner_id", "expected_revenue", "stage_id", "date_deadline"],
limit=50
)
return json.dumps(opportunities, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://project/projects", description="Project list"
)
def get_projects_resource() -> str:
"""Get project list"""
odoo_client = get_odoo_client()
try:
projects = odoo_client.search_read(
"project.project", [],
fields=["name", "user_id", "date_start", "date", "stage_id"]
)
return json.dumps(projects, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://project/tasks", description="Project tasks"
)
def get_tasks_resource() -> str:
"""Get project tasks"""
odoo_client = get_odoo_client()
try:
tasks = odoo_client.search_read(
"project.task", [],
fields=["name", "project_id", "user_ids", "date_deadline", "stage_id"],
limit=100
)
return json.dumps(tasks, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://calendar/events", description="Calendar events"
)
def get_calendar_events_resource() -> str:
"""Get calendar events"""
odoo_client = get_odoo_client()
try:
events = odoo_client.search_read(
"calendar.event", [],
fields=["name", "start", "stop", "attendee_ids", "location"],
limit=50
)
return json.dumps(events, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://calendar/meetings", description="Meeting schedule"
)
def get_meetings_resource() -> str:
"""Get meeting schedule"""
odoo_client = get_odoo_client()
try:
meetings = odoo_client.search_read(
"calendar.event", [["categ_ids.name", "ilike", "meeting"]],
fields=["name", "start", "stop", "partner_ids", "description"],
limit=50
)
return json.dumps(meetings, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://mail/messages", description="Mail messages"
)
def get_messages_resource() -> str:
"""Get mail messages"""
odoo_client = get_odoo_client()
try:
messages = odoo_client.search_read(
"mail.message", [],
fields=["subject", "author_id", "date", "message_type", "model"],
limit=50
)
return json.dumps(messages, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://mail/channels", description="Mail channels"
)
def get_channels_resource() -> str:
"""Get mail channels"""
odoo_client = get_odoo_client()
try:
channels = odoo_client.search_read(
"mail.channel", [],
fields=["name", "channel_type", "description", "channel_member_ids"]
)
return json.dumps(channels, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://fleet/vehicles", description="Fleet vehicles"
)
def get_vehicles_resource() -> str:
"""Get fleet vehicles"""
odoo_client = get_odoo_client()
try:
vehicles = odoo_client.search_read(
"fleet.vehicle", [],
fields=["name", "model_id", "driver_id", "state_id", "odometer"]
)
return json.dumps(vehicles, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://fleet/contracts", description="Fleet contracts"
)
def get_fleet_contracts_resource() -> str:
"""Get fleet contracts"""
odoo_client = get_odoo_client()
try:
contracts = odoo_client.search_read(
"fleet.vehicle.log.contract", [],
fields=["vehicle_id", "cost_subtype_id", "amount", "date", "expiration_date"]
)
return json.dumps(contracts, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://fleet/fuel/logs", description="Vehicle fuel logs"
)
def get_fleet_fuel_logs_resource() -> str:
"""Get vehicle fuel logs"""
odoo_client = get_odoo_client()
try:
fuel_logs = odoo_client.search_read(
"fleet.vehicle.log.fuel", [],
fields=["vehicle_id", "date", "odometer", "liter", "amount", "purchaser_id"],
limit=100
)
return json.dumps(fuel_logs, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://fleet/services", description="Vehicle services"
)
def get_fleet_services_resource() -> str:
"""Get vehicle services"""
odoo_client = get_odoo_client()
try:
services = odoo_client.search_read(
"fleet.vehicle.log.services", [],
fields=["vehicle_id", "date", "odometer", "amount", "description", "service_type_id"]
)
return json.dumps(services, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://fleet/drivers", description="Fleet drivers"
)
def get_fleet_drivers_resource() -> str:
"""Get fleet drivers"""
odoo_client = get_odoo_client()
try:
drivers = odoo_client.search_read(
"res.partner", [["category_id.name", "ilike", "driver"]],
fields=["name", "email", "phone", "mobile", "license_plate"]
)
return json.dumps(drivers, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://fleet/models", description="Vehicle models"
)
def get_fleet_models_resource() -> str:
"""Get vehicle models"""
odoo_client = get_odoo_client()
try:
models = odoo_client.search_read(
"fleet.vehicle.model", [],
fields=["name", "brand_id", "model_year", "category_id"]
)
return json.dumps(models, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://fleet/insurance", description="Vehicle insurance"
)
def get_fleet_insurance_resource() -> str:
"""Get vehicle insurance"""
odoo_client = get_odoo_client()
try:
insurance = odoo_client.search_read(
"fleet.vehicle.log.contract", [["cost_subtype_id.name", "ilike", "insurance"]],
fields=["vehicle_id", "insurer_id", "start_date", "expiration_date", "amount"]
)
return json.dumps(insurance, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://helpdesk/tickets", description="Helpdesk tickets"
)
def get_helpdesk_tickets_resource() -> str:
"""Get helpdesk tickets"""
odoo_client = get_odoo_client()
try:
tickets = odoo_client.search_read(
"helpdesk.ticket", [],
fields=["name", "partner_id", "team_id", "stage_id", "priority"],
limit=50
)
return json.dumps(tickets, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://website/pages", description="Website pages"
)
def get_website_pages_resource() -> str:
"""Get website pages"""
odoo_client = get_odoo_client()
try:
pages = odoo_client.search_read(
"website.page", [],
fields=["name", "url", "website_published", "date_publish"]
)
return json.dumps(pages, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://website/visitors", description="Website visitors"
)
def get_website_visitors_resource() -> str:
"""Get website visitors"""
odoo_client = get_odoo_client()
try:
visitors = odoo_client.search_read(
"website.visitor", [],
fields=["name", "partner_id", "visit_count", "last_connection_datetime"],
limit=100
)
return json.dumps(visitors, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://social/posts", description="Social media posts"
)
def get_social_posts_resource() -> str:
"""Get social media posts"""
odoo_client = get_odoo_client()
try:
posts = odoo_client.search_read(
"social.post", [],
fields=["message", "post_method", "scheduled_date", "state"],
limit=50
)
return json.dumps(posts, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://maintenance/equipment", description="Maintenance equipment"
)
def get_maintenance_equipment_resource() -> str:
"""Get maintenance equipment"""
odoo_client = get_odoo_client()
try:
equipment = odoo_client.search_read(
"maintenance.equipment", [],
fields=["name", "category_id", "partner_id", "assign_date", "warranty_date"]
)
return json.dumps(equipment, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://maintenance/requests", description="Maintenance requests"
)
def get_maintenance_requests_resource() -> str:
"""Get maintenance requests"""
odoo_client = get_odoo_client()
try:
requests = odoo_client.search_read(
"maintenance.request", [],
fields=["name", "equipment_id", "user_id", "request_date", "stage_id"],
limit=50
)
return json.dumps(requests, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://surveys/surveys", description="Survey list"
)
def get_surveys_resource() -> str:
"""Get survey list"""
odoo_client = get_odoo_client()
try:
surveys = odoo_client.search_read(
"survey.survey", [],
fields=["title", "state", "certification", "attempts_limit"]
)
return json.dumps(surveys, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://surveys/responses", description="Survey responses"
)
def get_survey_responses_resource() -> str:
"""Get survey responses"""
odoo_client = get_odoo_client()
try:
responses = odoo_client.search_read(
"survey.user_input", [],
fields=["survey_id", "partner_id", "email", "state", "scoring_percentage"],
limit=50
)
return json.dumps(responses, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://ecommerce/orders", description="E-commerce orders"
)
def get_ecommerce_orders_resource() -> str:
"""Get e-commerce orders"""
odoo_client = get_odoo_client()
try:
orders = odoo_client.search_read(
"sale.order", [["team_id.name", "ilike", "website"]],
fields=["name", "partner_id", "date_order", "amount_total", "state"],
limit=50
)
return json.dumps(orders, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://reports/financial", description="Financial reports"
)
def get_financial_reports_resource() -> str:
"""Get financial reports"""
odoo_client = get_odoo_client()
try:
reports = odoo_client.search_read(
"account.financial.html.report", [],
fields=["name", "date_to", "date_from"]
)
return json.dumps(reports, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://reports/sales", description="Sales reports"
)
def get_sales_reports_resource() -> str:
"""Get sales reports"""
odoo_client = get_odoo_client()
try:
reports = odoo_client.search_read(
"sale.report", [],
fields=["name", "date", "user_id", "team_id", "price_total"],
limit=50
)
return json.dumps(reports, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://performance/metrics", description="Performance metrics"
)
def get_performance_metrics_resource() -> str:
"""Get performance metrics"""
try:
metrics = {
"timestamp": datetime.now().isoformat(),
"active_users": "Available via database query",
"system_status": "operational",
"response_time": "< 100ms"
}
return json.dumps(metrics, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://assets/fixed", description="Fixed assets"
)
def get_fixed_assets_resource() -> str:
"""Get fixed assets"""
odoo_client = get_odoo_client()
try:
assets = odoo_client.search_read(
"account.asset", [],
fields=["name", "category_id", "value", "date", "state", "partner_id"]
)
return json.dumps(assets, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://assets/depreciation", description="Asset depreciation"
)
def get_asset_depreciation_resource() -> str:
"""Get asset depreciation"""
odoo_client = get_odoo_client()
try:
depreciation = odoo_client.search_read(
"account.asset.depreciation.line", [],
fields=["asset_id", "depreciation_date", "depreciated_value", "remaining_value"],
limit=100
)
return json.dumps(depreciation, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://assets/categories", description="Asset categories"
)
def get_asset_categories_resource() -> str:
"""Get asset categories"""
odoo_client = get_odoo_client()
try:
categories = odoo_client.search_read(
"account.asset.category", [],
fields=["name", "account_asset_id", "account_depreciation_id", "method_number"]
)
return json.dumps(categories, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://accommodation/rooms", description="Accommodation rooms"
)
def get_accommodation_rooms_resource() -> str:
"""Get accommodation rooms"""
odoo_client = get_odoo_client()
try:
rooms = odoo_client.search_read(
"hotel.room", [],
fields=["name", "room_type_id", "capacity", "rent", "status"]
)
return json.dumps(rooms, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://accommodation/bookings", description="Accommodation bookings"
)
def get_accommodation_bookings_resource() -> str:
"""Get accommodation bookings"""
odoo_client = get_odoo_client()
try:
bookings = odoo_client.search_read(
"hotel.reservation", [],
fields=["partner_id", "room_id", "checkin", "checkout", "state", "total_amount"],
limit=100
)
return json.dumps(bookings, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://accommodation/services", description="Accommodation services"
)
def get_accommodation_services_resource() -> str:
"""Get accommodation services"""
odoo_client = get_odoo_client()
try:
services = odoo_client.search_read(
"hotel.services", [],
fields=["name", "service_type", "price", "description"]
)
return json.dumps(services, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://accommodation/facilities", description="Accommodation facilities"
)
def get_accommodation_facilities_resource() -> str:
"""Get accommodation facilities"""
odoo_client = get_odoo_client()
try:
facilities = odoo_client.search_read(
"hotel.floor", [],
fields=["name", "sequence"]
)
return json.dumps(facilities, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://logs/system", description="System logs"
)
def get_system_logs_resource() -> str:
"""Get system logs"""
odoo_client = get_odoo_client()
try:
logs = odoo_client.search_read(
"ir.logging", [],
fields=["name", "level", "message", "func", "path"],
limit=50
)
return json.dumps(logs, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://config/settings", description="System configuration settings"
)
def get_config_settings_resource() -> str:
"""Get system configuration settings"""
odoo_client = get_odoo_client()
try:
settings = odoo_client.search_read(
"ir.config_parameter", [],
fields=["key", "value"],
limit=100
)
return json.dumps(settings, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource(
"odoo://backup/info", description="Database backup information"
)
def get_backup_info_resource() -> str:
"""Get database backup information"""
try:
backup_info = {
"last_backup": "Available via system query",
"backup_size": "Available via system query",
"backup_status": "operational",
"retention_policy": "30 days"
}
return json.dumps(backup_info, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
# ----- Pydantic models for type safety -----
class DomainCondition(BaseModel):
"""A single condition in a search domain"""
field: str = Field(description="Field name to search")
operator: str = Field(
description="Operator (e.g., '=', '!=', '>', '<', 'in', 'not in', 'like', 'ilike')"
)
value: Any = Field(description="Value to compare against")
def to_tuple(self) -> List:
"""Convert to Odoo domain condition tuple"""
return [self.field, self.operator, self.value]
class SearchDomain(BaseModel):
"""Search domain for Odoo models"""
conditions: List[DomainCondition] = Field(
default_factory=list,
description="List of conditions for searching. All conditions are combined with AND operator.",
)
def to_domain_list(self) -> List[List]:
"""Convert to Odoo domain list format"""
return [condition.to_tuple() for condition in self.conditions]
class EmployeeSearchResult(BaseModel):
"""Represents a single employee search result."""
id: int = Field(description="Employee ID")
name: str = Field(description="Employee name")
class SearchEmployeeResponse(BaseModel):
"""Response model for the search_employee tool."""
success: bool = Field(description="Indicates if the search was successful")
result: Optional[List[EmployeeSearchResult]] = Field(
default=None, description="List of employee search results"
)
error: Optional[str] = Field(default=None, description="Error message, if any")
class Holiday(BaseModel):
"""Represents a single holiday."""
display_name: str = Field(description="Display name of the holiday")
start_datetime: str = Field(description="Start date and time of the holiday")
stop_datetime: str = Field(description="End date and time of the holiday")
employee_id: List[Union[int, str]] = Field(
description="Employee ID associated with the holiday"
)
name: str = Field(description="Name of the holiday")
state: str = Field(description="State of the holiday")
class SearchHolidaysResponse(BaseModel):
"""Response model for the search_holidays tool."""
success: bool = Field(description="Indicates if the search was successful")
result: Optional[List[Holiday]] = Field(
default=None, description="List of holidays found"
)
error: Optional[str] = Field(default=None, description="Error message, if any")
# ----- MCP Tools -----
@mcp.tool(description="Execute a custom method on an Odoo model")
def execute_method(
ctx: Context,
model: str,
method: str,
args: List = None,
kwargs: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""
Execute a custom method on an Odoo model
Parameters:
model: The model name (e.g., 'res.partner')
method: Method name to execute
args: Positional arguments
kwargs: Keyword arguments
Returns:
Dictionary containing:
- success: Boolean indicating success
- result: Result of the method (if success)
- error: Error message (if failure)
"""
odoo = ctx.request_context.lifespan_context.odoo
try:
args = args or []
kwargs = kwargs or {}
# Special handling for search methods like search, search_count, search_read
search_methods = ["search", "search_count", "search_read"]
if method in search_methods and args:
# Search methods usually have domain as the first parameter
# args: [[domain], limit, offset, ...] or [domain, limit, offset, ...]
normalized_args = list(
args
) # Create a copy to avoid affecting the original args
if len(normalized_args) > 0:
# Process domain in args[0]
domain = normalized_args[0]
domain_list = []
# Check if domain is wrapped unnecessarily ([domain] instead of domain)
if (
isinstance(domain, list)
and len(domain) == 1
and isinstance(domain[0], list)
):
# Case [[domain]] - unwrap to [domain]
domain = domain[0]
# Normalize domain similar to search_records function
if domain is None:
domain_list = []
elif isinstance(domain, dict):
if "conditions" in domain:
# Object format
conditions = domain.get("conditions", [])
domain_list = []
for cond in conditions:
if isinstance(cond, dict) and all(
k in cond for k in ["field", "operator", "value"]
):
domain_list.append(
[cond["field"], cond["operator"], cond["value"]]
)
elif isinstance(domain, list):
# List format
if not domain:
domain_list = []
elif all(isinstance(item, list) for item in domain) or any(
item in ["&", "|", "!"] for item in domain
):
domain_list = domain
elif len(domain) >= 3 and isinstance(domain[0], str):
# Case [field, operator, value] (not [[field, operator, value]])
domain_list = [domain]
elif isinstance(domain, str):
# String format (JSON)
try:
parsed_domain = json.loads(domain)
if (
isinstance(parsed_domain, dict)
and "conditions" in parsed_domain
):
conditions = parsed_domain.get("conditions", [])
domain_list = []
for cond in conditions:
if isinstance(cond, dict) and all(
k in cond for k in ["field", "operator", "value"]
):
domain_list.append(
[cond["field"], cond["operator"], cond["value"]]
)
elif isinstance(parsed_domain, list):
domain_list = parsed_domain
except json.JSONDecodeError:
try:
import ast
parsed_domain = ast.literal_eval(domain)
if isinstance(parsed_domain, list):
domain_list = parsed_domain
except:
domain_list = []
# Xác thực domain_list
if domain_list:
valid_conditions = []
for cond in domain_list:
if isinstance(cond, str) and cond in ["&", "|", "!"]:
valid_conditions.append(cond)
continue
if (
isinstance(cond, list)
and len(cond) == 3
and isinstance(cond[0], str)
and isinstance(cond[1], str)
):
valid_conditions.append(cond)
domain_list = valid_conditions
# Cập nhật args với domain đã chuẩn hóa
normalized_args[0] = domain_list
args = normalized_args
# Log for debugging
print(f"Executing {method} with normalized domain: {domain_list}")
result = odoo.execute_method(model, method, *args, **kwargs)
return {"success": True, "result": result}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool(description="Search for employees by name")
def search_employee(
ctx: Context,
name: str,
limit: int = 20,
) -> SearchEmployeeResponse:
"""
Search for employees by name using Odoo's name_search method.
Parameters:
name: The name (or part of the name) to search for.
limit: The maximum number of results to return (default 20).
Returns:
SearchEmployeeResponse containing results or error information.
"""
odoo = ctx.request_context.lifespan_context.odoo
model = "hr.employee"
method = "name_search"
args = []
kwargs = {"name": name, "limit": limit}
try:
result = odoo.execute_method(model, method, *args, **kwargs)
parsed_result = [
EmployeeSearchResult(id=item[0], name=item[1]) for item in result
]
return SearchEmployeeResponse(success=True, result=parsed_result)
except Exception as e:
return SearchEmployeeResponse(success=False, error=str(e))
@mcp.tool(description="Search for holidays within a date range")
def search_holidays(
ctx: Context,
start_date: str,
end_date: str,
employee_id: Optional[int] = None,
) -> SearchHolidaysResponse:
"""
Searches for holidays within a specified date range.
Parameters:
start_date: Start date in YYYY-MM-DD format.
end_date: End date in YYYY-MM-DD format.
employee_id: Optional employee ID to filter holidays.
Returns:
SearchHolidaysResponse: Object containing the search results.
"""
odoo = ctx.request_context.lifespan_context.odoo
# Validate date format using datetime
try:
datetime.strptime(start_date, "%Y-%m-%d")
except ValueError:
return SearchHolidaysResponse(
success=False, error="Invalid start_date format. Use YYYY-MM-DD."
)
try:
datetime.strptime(end_date, "%Y-%m-%d")
except ValueError:
return SearchHolidaysResponse(
success=False, error="Invalid end_date format. Use YYYY-MM-DD."
)
# Calculate adjusted start_date (subtract one day)
start_date_dt = datetime.strptime(start_date, "%Y-%m-%d")
adjusted_start_date_dt = start_date_dt - timedelta(days=1)
adjusted_start_date = adjusted_start_date_dt.strftime("%Y-%m-%d")
# Build the domain
domain = [
"&",
["start_datetime", "<=", f"{end_date} 22:59:59"],
# Use adjusted date
["stop_datetime", ">=", f"{adjusted_start_date} 23:00:00"],
]
if employee_id:
domain.append(
["employee_id", "=", employee_id],
)
try:
holidays = odoo.search_read(
model_name="hr.leave.report.calendar",
domain=domain,
)
parsed_holidays = [Holiday(**holiday) for holiday in holidays]
return SearchHolidaysResponse(success=True, result=parsed_holidays)
except Exception as e:
return SearchHolidaysResponse(success=False, error=str(e))
# ----- Additional Comprehensive Tools -----
@mcp.tool(description="Create a new record in any Odoo model")
def create_record(
ctx: Context,
model_name: str,
values: Dict[str, Any]
) -> Dict[str, Any]:
"""
Create a new record in any Odoo model.
Parameters:
model_name: Name of the Odoo model (e.g., 'res.partner')
values: Dictionary of field values for the new record
Returns:
Dictionary with success status and created record ID
"""
odoo = ctx.request_context.lifespan_context.odoo
try:
record_id = odoo.create(model_name, values)
return {
"success": True,
"id": record_id,
"message": f"Created {model_name} record with ID {record_id}"
}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool(description="Update existing records in any Odoo model")
def update_records(
ctx: Context,
model_name: str,
record_ids: List[int],
values: Dict[str, Any]
) -> Dict[str, Any]:
"""
Update existing records in any Odoo model.
Parameters:
model_name: Name of the Odoo model
record_ids: List of record IDs to update
values: Dictionary of field values to update
Returns:
Dictionary with success status
"""
odoo = ctx.request_context.lifespan_context.odoo
try:
odoo.write(model_name, record_ids, values)
return {
"success": True,
"message": f"Updated {len(record_ids)} {model_name} record(s)"
}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool(description="Delete records from any Odoo model")
def delete_records(
ctx: Context,
model_name: str,
record_ids: List[int]
) -> Dict[str, Any]:
"""
Delete records from any Odoo model.
Parameters:
model_name: Name of the Odoo model
record_ids: List of record IDs to delete
Returns:
Dictionary with success status
"""
odoo = ctx.request_context.lifespan_context.odoo
try:
odoo.unlink(model_name, record_ids)
return {
"success": True,
"message": f"Deleted {len(record_ids)} {model_name} record(s)"
}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool(description="Search records with advanced filtering")
def search_records(
ctx: Context,
model_name: str,
domain: List[Any],
fields: Optional[List[str]] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
order: Optional[str] = None
) -> Dict[str, Any]:
"""
Search records with advanced filtering options.
Parameters:
model_name: Name of the Odoo model
domain: Search domain (e.g., [['name', 'ilike', 'test']])
fields: List of fields to return (None for all fields)
limit: Maximum number of records to return
offset: Number of records to skip
order: Sort order (e.g., 'name desc')
Returns:
Dictionary with search results
"""
odoo = ctx.request_context.lifespan_context.odoo
try:
results = odoo.search_read(
model_name,
domain,
fields=fields,
limit=limit,
offset=offset,
order=order
)
return {
"success": True,
"count": len(results),
"records": results
}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool(description="Count records matching a domain")
def count_records(
ctx: Context,
model_name: str,
domain: List[Any]
) -> Dict[str, Any]:
"""
Count records matching a search domain.
Parameters:
model_name: Name of the Odoo model
domain: Search domain
Returns:
Dictionary with count
"""
odoo = ctx.request_context.lifespan_context.odoo
try:
count = odoo.search_count(model_name, domain)
return {
"success": True,
"count": count,
"model": model_name
}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool(description="Execute workflow actions on records")
def execute_workflow(
ctx: Context,
model_name: str,
record_ids: List[int],
action: str
) -> Dict[str, Any]:
"""
Execute workflow actions on records (e.g., confirm, approve, cancel).
Parameters:
model_name: Name of the Odoo model
record_ids: List of record IDs
action: Workflow action to execute
Returns:
Dictionary with execution result
"""
odoo = ctx.request_context.lifespan_context.odoo
try:
result = odoo.execute(model_name, action, record_ids)
return {
"success": True,
"action": action,
"records": record_ids,
"result": result
}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool(description="Copy existing records")
def copy_records(
ctx: Context,
model_name: str,
record_ids: List[int],
default_values: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Copy existing records with optional default values.
Parameters:
model_name: Name of the Odoo model
record_ids: List of record IDs to copy
default_values: Default values for the copied records
Returns:
Dictionary with new record IDs
"""
odoo = ctx.request_context.lifespan_context.odoo
try:
new_ids = []
for record_id in record_ids:
new_id = odoo.execute(model_name, "copy", record_id, default_values or {})
new_ids.append(new_id)
return {
"success": True,
"new_ids": new_ids,
"count": len(new_ids)
}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool(description="Search records by name (name_search)")
def search_by_name(
ctx: Context,
model_name: str,
name: str,
args: Optional[List[Any]] = None,
operator: str = "ilike",
limit: Optional[int] = None
) -> Dict[str, Any]:
"""
Search records by name using Odoo's name_search.
Parameters:
model_name: Name of the Odoo model
name: Name to search for
args: Additional domain filters
operator: Search operator (default: 'ilike')
limit: Maximum number of results
Returns:
Dictionary with search results
"""
odoo = ctx.request_context.lifespan_context.odoo
try:
results = odoo.execute(
model_name,
"name_search",
name,
args or [],
operator,
limit or 100
)
# Format results
formatted = [{"id": r[0], "name": r[1]} for r in results]
return {
"success": True,
"count": len(formatted),
"results": formatted
}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool(description="Get field definitions for a model")
def get_fields_info(
ctx: Context,
model_name: str,
field_names: Optional[List[str]] = None,
attributes: Optional[List[str]] = None
) -> Dict[str, Any]:
"""
Get detailed field information for a model.
Parameters:
model_name: Name of the Odoo model
field_names: Specific fields to get info for (None for all)
attributes: Specific attributes to retrieve
Returns:
Dictionary with field definitions
"""
odoo = ctx.request_context.lifespan_context.odoo
try:
fields = odoo.get_model_fields(
model_name,
field_names or [],
attributes or []
)
return {
"success": True,
"model": model_name,
"fields": fields
}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool(description="Export records to various formats")
def export_records(
ctx: Context,
model_name: str,
record_ids: List[int],
fields: List[str],
export_format: str = "json"
) -> Dict[str, Any]:
"""
Export records to various formats.
Parameters:
model_name: Name of the Odoo model
record_ids: List of record IDs to export
fields: List of fields to export
export_format: Format to export to (json, csv, xml)
Returns:
Dictionary with exported data
"""
odoo = ctx.request_context.lifespan_context.odoo
try:
# Read records
records = odoo.read(model_name, record_ids, fields)
if export_format == "json":
return {
"success": True,
"format": "json",
"data": records
}
elif export_format == "csv":
# Simple CSV conversion
if not records:
return {"success": True, "format": "csv", "data": ""}
headers = list(records[0].keys())
csv_lines = [",".join(headers)]
for record in records:
values = [str(record.get(h, "")) for h in headers]
csv_lines.append(",".join(values))
return {
"success": True,
"format": "csv",
"data": "\n".join(csv_lines)
}
else:
return {"success": False, "error": f"Unsupported format: {export_format}"}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool(description="Import records from data")
def import_records(
ctx: Context,
model_name: str,
data: List[Dict[str, Any]],
update_existing: bool = False,
match_field: Optional[str] = None
) -> Dict[str, Any]:
"""
Import records from data.
Parameters:
model_name: Name of the Odoo model
data: List of dictionaries containing record data
update_existing: Whether to update existing records
match_field: Field to use for matching existing records
Returns:
Dictionary with import results
"""
odoo = ctx.request_context.lifespan_context.odoo
created = 0
updated = 0
errors = []
try:
for record_data in data:
try:
if update_existing and match_field and match_field in record_data:
# Search for existing record
domain = [[match_field, "=", record_data[match_field]]]
existing = odoo.search(model_name, domain, limit=1)
if existing:
# Update existing
odoo.write(model_name, existing, record_data)
updated += 1
else:
# Create new
odoo.create(model_name, record_data)
created += 1
else:
# Always create new
odoo.create(model_name, record_data)
created += 1
except Exception as e:
errors.append({
"data": record_data,
"error": str(e)
})
return {
"success": len(errors) == 0,
"created": created,
"updated": updated,
"errors": errors,
"total": len(data)
}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool(description="Execute reports and get results")
def execute_report(
ctx: Context,
report_name: str,
record_ids: List[int],
report_type: str = "pdf",
context: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Execute reports and get results.
Parameters:
report_name: Name of the report
record_ids: List of record IDs
report_type: Type of report (pdf, html, xlsx)
context: Additional context for report
Returns:
Dictionary with report data
"""
odoo = ctx.request_context.lifespan_context.odoo
try:
# Execute report
result = odoo.execute(
"ir.actions.report",
"render_report",
record_ids,
report_name,
{"report_type": report_type, "context": context or {}}
)
return {
"success": True,
"report_name": report_name,
"report_type": report_type,
"data": result
}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool(description="Execute SQL queries (read-only)")
def execute_query(
ctx: Context,
query: str,
params: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Execute read-only SQL queries.
Parameters:
query: SQL query to execute (SELECT only)
params: Query parameters
Returns:
Dictionary with query results
"""
odoo = ctx.request_context.lifespan_context.odoo
# Safety check - only allow SELECT queries
if not query.strip().upper().startswith("SELECT"):
return {"success": False, "error": "Only SELECT queries are allowed"}
try:
# Execute through a safe model method
result = odoo.execute(
"base",
"execute_query",
query,
params or {}
)
return {
"success": True,
"rows": result.get("rows", []),
"columns": result.get("columns", [])
}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool(description="Manage record attachments")
def manage_attachments(
ctx: Context,
action: str,
model_name: Optional[str] = None,
record_id: Optional[int] = None,
attachment_data: Optional[Dict[str, Any]] = None,
attachment_ids: Optional[List[int]] = None
) -> Dict[str, Any]:
"""
Manage record attachments (create, read, delete).
Parameters:
action: Action to perform (create, read, delete)
model_name: Model name for attachment
record_id: Record ID for attachment
attachment_data: Data for creating attachment
attachment_ids: IDs for reading/deleting attachments
Returns:
Dictionary with attachment operation result
"""
odoo = ctx.request_context.lifespan_context.odoo
try:
if action == "create":
if not all([model_name, record_id, attachment_data]):
return {"success": False, "error": "Missing required parameters"}
attachment_id = odoo.create("ir.attachment", {
"name": attachment_data.get("name", "Attachment"),
"res_model": model_name,
"res_id": record_id,
"datas": attachment_data.get("datas"),
"mimetype": attachment_data.get("mimetype", "application/octet-stream")
})
return {
"success": True,
"attachment_id": attachment_id
}
elif action == "read":
if not attachment_ids:
# Get attachments for a record
domain = [
["res_model", "=", model_name],
["res_id", "=", record_id]
]
attachments = odoo.search_read(
"ir.attachment",
domain,
fields=["name", "mimetype", "file_size", "create_date"]
)
else:
# Get specific attachments
attachments = odoo.read(
"ir.attachment",
attachment_ids,
["name", "mimetype", "file_size", "datas"]
)
return {
"success": True,
"attachments": attachments
}
elif action == "delete":
if not attachment_ids:
return {"success": False, "error": "No attachment IDs provided"}
odoo.unlink("ir.attachment", attachment_ids)
return {
"success": True,
"deleted": len(attachment_ids)
}
else:
return {"success": False, "error": f"Unknown action: {action}"}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool(description="Manage user access rights and permissions")
def manage_access_rights(
ctx: Context,
action: str,
user_ids: List[int],
group_ids: Optional[List[int]] = None,
model_access: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Manage user access rights and permissions.
Parameters:
action: Action to perform (add_groups, remove_groups, check_access)
user_ids: List of user IDs
group_ids: List of group IDs
model_access: Model access rights to check/set
Returns:
Dictionary with access management result
"""
odoo = ctx.request_context.lifespan_context.odoo
try:
if action == "add_groups":
if not group_ids:
return {"success": False, "error": "No group IDs provided"}
for user_id in user_ids:
odoo.write("res.users", [user_id], {
"groups_id": [(4, gid) for gid in group_ids]
})
return {
"success": True,
"users_updated": len(user_ids),
"groups_added": len(group_ids)
}
elif action == "remove_groups":
if not group_ids:
return {"success": False, "error": "No group IDs provided"}
for user_id in user_ids:
odoo.write("res.users", [user_id], {
"groups_id": [(3, gid) for gid in group_ids]
})
return {
"success": True,
"users_updated": len(user_ids),
"groups_removed": len(group_ids)
}
elif action == "check_access":
if not model_access:
return {"success": False, "error": "No model access specified"}
model_name = model_access.get("model")
operation = model_access.get("operation", "read")
results = {}
for user_id in user_ids:
try:
# Try to check access
# Note: This is a simplified check - actual implementation may vary
odoo.execute(
model_name,
"check_access_rights",
operation,
raise_exception=False
)
results[user_id] = True
except:
results[user_id] = False
return {
"success": True,
"access_results": results
}
else:
return {"success": False, "error": f"Unknown action: {action}"}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool(description="Execute scheduled actions and cron jobs")
def execute_scheduled_action(
ctx: Context,
action_id: Optional[int] = None,
action_code: Optional[str] = None,
force_run: bool = False
) -> Dict[str, Any]:
"""
Execute scheduled actions and cron jobs.
Parameters:
action_id: ID of the scheduled action
action_code: Code/name of the scheduled action
force_run: Force run even if not due
Returns:
Dictionary with execution result
"""
odoo = ctx.request_context.lifespan_context.odoo
try:
if action_id:
# Execute by ID
odoo.execute("ir.cron", "method_direct_trigger", [action_id])
return {
"success": True,
"action_id": action_id,
"message": "Scheduled action triggered"
}
elif action_code:
# Find and execute by code
domain = [["code", "=", action_code]]
action_ids = odoo.search("ir.cron", domain, limit=1)
if action_ids:
odoo.execute("ir.cron", "method_direct_trigger", action_ids)
return {
"success": True,
"action_id": action_ids[0],
"message": "Scheduled action triggered"
}
else:
return {"success": False, "error": f"No action found with code: {action_code}"}
else:
# List all scheduled actions
actions = odoo.search_read(
"ir.cron",
[],
fields=["name", "code", "active", "nextcall", "numbercall"]
)
return {
"success": True,
"scheduled_actions": actions
}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool(description="Manage record metadata and system fields")
def manage_metadata(
ctx: Context,
model_name: str,
record_ids: List[int],
action: str = "read",
metadata_updates: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Manage record metadata and system fields.
Parameters:
model_name: Name of the Odoo model
record_ids: List of record IDs
action: Action to perform (read, update)
metadata_updates: Metadata fields to update
Returns:
Dictionary with metadata information
"""
odoo = ctx.request_context.lifespan_context.odoo
try:
if action == "read":
# Read metadata fields
metadata_fields = [
"create_date", "create_uid",
"write_date", "write_uid",
"__last_update"
]
records = odoo.read(model_name, record_ids, metadata_fields)
return {
"success": True,
"metadata": records
}
elif action == "update":
if not metadata_updates:
return {"success": False, "error": "No metadata updates provided"}
# Some metadata fields can be updated with special permissions
allowed_updates = {}
for field, value in metadata_updates.items():
if field in ["active", "sequence", "priority"]:
allowed_updates[field] = value
if allowed_updates:
odoo.write(model_name, record_ids, allowed_updates)
return {
"success": True,
"updated_fields": list(allowed_updates.keys())
}
else:
return {"success": False, "error": f"Unknown action: {action}"}
except Exception as e:
return {"success": False, "error": str(e)}
# ----- Advanced Administration Tools -----
@mcp.tool(description="Manage Odoo modules (install, upgrade, uninstall)")
def manage_modules(
ctx: Context,
action: str,
module_names: List[str],
force: bool = False
) -> Dict[str, Any]:
"""
Manage Odoo modules installation, upgrades, and removal.
Parameters:
action: Action to perform (install, upgrade, uninstall, list)
module_names: List of module names
force: Force operation even if dependencies exist
Returns:
Dictionary with operation result
"""
odoo = ctx.request_context.lifespan_context.odoo
try:
if action == "install":
# Install modules
module_ids = odoo.search("ir.module.module", [["name", "in", module_names]])
if module_ids:
odoo.execute("ir.module.module", "button_immediate_install", module_ids)
return {
"success": True,
"action": "install",
"modules": module_names,
"message": f"Installed {len(module_names)} modules"
}
else:
return {"success": False, "error": "No modules found"}
elif action == "upgrade":
# Upgrade modules
module_ids = odoo.search("ir.module.module", [
["name", "in", module_names],
["state", "=", "installed"]
])
if module_ids:
odoo.execute("ir.module.module", "button_immediate_upgrade", module_ids)
return {
"success": True,
"action": "upgrade",
"modules": module_names,
"message": f"Upgraded {len(module_names)} modules"
}
else:
return {"success": False, "error": "No installed modules found"}
elif action == "uninstall":
# Uninstall modules
module_ids = odoo.search("ir.module.module", [
["name", "in", module_names],
["state", "=", "installed"]
])
if module_ids:
if force:
odoo.execute("ir.module.module", "button_immediate_uninstall", module_ids)
else:
odoo.execute("ir.module.module", "button_uninstall", module_ids)
return {
"success": True,
"action": "uninstall",
"modules": module_names,
"message": f"Uninstalled {len(module_names)} modules"
}
else:
return {"success": False, "error": "No installed modules found"}
elif action == "list":
# List modules by state
states = ["installed", "uninstalled", "to upgrade", "to install", "to remove"]
result = {}
for state in states:
modules = odoo.search_read(
"ir.module.module",
[["state", "=", state]],
fields=["name", "shortdesc", "version", "state"]
)
result[state] = modules
return {
"success": True,
"modules_by_state": result
}
else:
return {"success": False, "error": f"Unknown action: {action}"}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool(description="Database administration operations")
def database_admin(
ctx: Context,
action: str,
backup_format: str = "zip",
sql_query: Optional[str] = None,
vacuum_full: bool = False
) -> Dict[str, Any]:
"""
Database administration operations.
Parameters:
action: Action (vacuum, analyze, list_tables, get_stats)
backup_format: Backup format (zip, sql)
sql_query: SQL query for maintenance
vacuum_full: Full vacuum operation
Returns:
Dictionary with operation result
"""
odoo = ctx.request_context.lifespan_context.odoo
try:
if action == "vacuum":
# Database vacuum
if sql_query:
# Custom SQL maintenance
if sql_query.upper().startswith(("VACUUM", "ANALYZE", "REINDEX")):
odoo.execute("base", "execute_sql", sql_query)
return {
"success": True,
"action": "maintenance",
"query": sql_query
}
else:
return {"success": False, "error": "Only VACUUM, ANALYZE, REINDEX allowed"}
else:
# Standard vacuum
vacuum_cmd = "VACUUM FULL" if vacuum_full else "VACUUM"
odoo.execute("base", "execute_sql", vacuum_cmd)
return {
"success": True,
"action": "vacuum",
"full": vacuum_full
}
elif action == "analyze":
# Database analyze
odoo.execute("base", "execute_sql", "ANALYZE")
return {
"success": True,
"action": "analyze"
}
elif action == "list_tables":
# List database tables
tables = odoo.execute("base", "execute_sql",
"SELECT tablename FROM pg_tables WHERE schemaname = 'public' ORDER BY tablename")
return {
"success": True,
"tables": tables
}
elif action == "get_stats":
# Get database statistics
stats = {
"total_records": 0,
"models": []
}
# Get all models
models = odoo.search_read("ir.model", [], fields=["model", "name"])
for model in models:
try:
count = odoo.search_count(model["model"], [])
stats["models"].append({
"model": model["model"],
"name": model["name"],
"count": count
})
stats["total_records"] += count
except:
pass
return {
"success": True,
"statistics": stats
}
else:
return {"success": False, "error": f"Unknown action: {action}"}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool(description="Manage views and UI elements")
def manage_views(
ctx: Context,
action: str,
view_id: Optional[int] = None,
model_name: Optional[str] = None,
view_type: Optional[str] = None,
view_data: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Manage views and UI elements.
Parameters:
action: Action (create, update, delete, list, activate, deactivate)
view_id: View ID for specific operations
model_name: Model name for view operations
view_type: Type of view (form, tree, kanban, etc.)
view_data: View definition data
Returns:
Dictionary with operation result
"""
odoo = ctx.request_context.lifespan_context.odoo
try:
if action == "list":
# List views for a model
domain = []
if model_name:
domain.append(["model", "=", model_name])
if view_type:
domain.append(["type", "=", view_type])
views = odoo.search_read(
"ir.ui.view",
domain,
fields=["name", "model", "type", "active", "priority"]
)
return {
"success": True,
"views": views
}
elif action == "create":
# Create new view
if not all([model_name, view_type, view_data]):
return {"success": False, "error": "Missing required parameters"}
view_id = odoo.create("ir.ui.view", {
"name": view_data.get("name", f"{model_name} {view_type}"),
"model": model_name,
"type": view_type,
"arch": view_data.get("arch", "<tree/>"),
"priority": view_data.get("priority", 16)
})
return {
"success": True,
"view_id": view_id,
"action": "create"
}
elif action == "update":
# Update existing view
if not view_id or not view_data:
return {"success": False, "error": "Missing view_id or view_data"}
odoo.write("ir.ui.view", [view_id], view_data)
return {
"success": True,
"view_id": view_id,
"action": "update"
}
elif action == "delete":
# Delete view
if not view_id:
return {"success": False, "error": "Missing view_id"}
odoo.unlink("ir.ui.view", [view_id])
return {
"success": True,
"view_id": view_id,
"action": "delete"
}
elif action in ["activate", "deactivate"]:
# Activate/deactivate view
if not view_id:
return {"success": False, "error": "Missing view_id"}
active = action == "activate"
odoo.write("ir.ui.view", [view_id], {"active": active})
return {
"success": True,
"view_id": view_id,
"action": action,
"active": active
}
else:
return {"success": False, "error": f"Unknown action: {action}"}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool(description="Mail and communication management")
def manage_mail(
ctx: Context,
action: str,
message_data: Optional[Dict[str, Any]] = None,
partner_ids: Optional[List[int]] = None,
template_id: Optional[int] = None,
record_id: Optional[int] = None,
model_name: Optional[str] = None
) -> Dict[str, Any]:
"""
Manage mail and communication.
Parameters:
action: Action (send, create_template, send_template, list_messages)
message_data: Message data (subject, body, etc.)
partner_ids: List of partner IDs to send to
template_id: Email template ID
record_id: Record ID for context
model_name: Model name for context
Returns:
Dictionary with operation result
"""
odoo = ctx.request_context.lifespan_context.odoo
try:
if action == "send":
# Send direct message
if not message_data or not partner_ids:
return {"success": False, "error": "Missing message_data or partner_ids"}
mail_id = odoo.create("mail.mail", {
"subject": message_data.get("subject", "No Subject"),
"body_html": message_data.get("body", ""),
"recipient_ids": [(6, 0, partner_ids)],
"auto_delete": message_data.get("auto_delete", True)
})
# Send the mail
odoo.execute("mail.mail", "send", [mail_id])
return {
"success": True,
"mail_id": mail_id,
"action": "send"
}
elif action == "create_template":
# Create email template
if not message_data or not model_name:
return {"success": False, "error": "Missing message_data or model_name"}
model_ids = odoo.search("ir.model", [["model", "=", model_name]], limit=1)
if not model_ids:
return {"success": False, "error": f"Model {model_name} not found"}
template_id = odoo.create("mail.template", {
"name": message_data.get("name", "New Template"),
"model_id": model_ids[0],
"subject": message_data.get("subject", ""),
"body_html": message_data.get("body", ""),
"lang": message_data.get("lang", "en_US")
})
return {
"success": True,
"template_id": template_id,
"action": "create_template"
}
elif action == "send_template":
# Send using template
if not template_id or not record_id:
return {"success": False, "error": "Missing template_id or record_id"}
# Generate and send email
odoo.execute("mail.template", "send_mail", template_id, record_id, force_send=True)
return {
"success": True,
"template_id": template_id,
"record_id": record_id,
"action": "send_template"
}
elif action == "list_messages":
# List messages for a record
domain = []
if model_name and record_id:
domain.extend([
["model", "=", model_name],
["res_id", "=", record_id]
])
messages = odoo.search_read(
"mail.message",
domain,
fields=["subject", "body", "date", "author_id", "message_type"],
limit=50,
order="date desc"
)
return {
"success": True,
"messages": messages
}
else:
return {"success": False, "error": f"Unknown action: {action}"}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool(description="Translation and localization management")
def manage_translations(
ctx: Context,
action: str,
lang_code: Optional[str] = None,
module_name: Optional[str] = None,
export_format: str = "po"
) -> Dict[str, Any]:
"""
Manage translations and localization.
Parameters:
action: Action (install_lang, export, update, list)
lang_code: Language code (e.g., 'fr_FR', 'es_ES')
module_name: Module name for translation
export_format: Export format (po, csv)
Returns:
Dictionary with operation result
"""
odoo = ctx.request_context.lifespan_context.odoo
try:
if action == "install_lang":
# Install language
if not lang_code:
return {"success": False, "error": "Missing lang_code"}
# Check if language exists
lang_ids = odoo.search("res.lang", [["code", "=", lang_code]])
if not lang_ids:
# Install language
wizard_id = odoo.create("base.language.install", {"lang": lang_code})
odoo.execute("base.language.install", "lang_install", [wizard_id])
return {
"success": True,
"lang_code": lang_code,
"action": "install_lang"
}
elif action == "update":
# Update translations
if not lang_code:
return {"success": False, "error": "Missing lang_code"}
wizard_id = odoo.create("base.update.translations", {"lang": lang_code})
odoo.execute("base.update.translations", "act_update", [wizard_id])
return {
"success": True,
"lang_code": lang_code,
"action": "update"
}
elif action == "list":
# List installed languages
languages = odoo.search_read(
"res.lang",
[["active", "=", True]],
fields=["name", "code", "direction"]
)
return {
"success": True,
"languages": languages
}
else:
return {"success": False, "error": f"Unknown action: {action}"}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool(description="Configuration and settings management")
def manage_settings(
ctx: Context,
action: str,
setting_key: Optional[str] = None,
setting_value: Optional[Any] = None,
category: Optional[str] = None
) -> Dict[str, Any]:
"""
Manage system configuration and settings.
Parameters:
action: Action (get, set, list, reset)
setting_key: Configuration key
setting_value: Value to set
category: Settings category
Returns:
Dictionary with operation result
"""
odoo = ctx.request_context.lifespan_context.odoo
try:
if action == "get":
# Get configuration value
if not setting_key:
return {"success": False, "error": "Missing setting_key"}
value = odoo.execute("ir.config_parameter", "get_param", setting_key)
return {
"success": True,
"key": setting_key,
"value": value
}
elif action == "set":
# Set configuration value
if not setting_key:
return {"success": False, "error": "Missing setting_key"}
odoo.execute("ir.config_parameter", "set_param", setting_key, str(setting_value))
return {
"success": True,
"key": setting_key,
"value": setting_value,
"action": "set"
}
elif action == "list":
# List all configuration parameters
domain = []
if category:
domain.append(["key", "ilike", f"{category}.%"])
params = odoo.search_read(
"ir.config_parameter",
domain,
fields=["key", "value"]
)
return {
"success": True,
"parameters": params
}
elif action == "reset":
# Reset configuration to default
if not setting_key:
return {"success": False, "error": "Missing setting_key"}
param_ids = odoo.search("ir.config_parameter", [["key", "=", setting_key]])
if param_ids:
odoo.unlink("ir.config_parameter", param_ids)
return {
"success": True,
"key": setting_key,
"action": "reset"
}
else:
return {"success": False, "error": f"Unknown action: {action}"}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool(description="Advanced workflow and automation")
def manage_workflows(
ctx: Context,
action: str,
workflow_data: Optional[Dict[str, Any]] = None,
automation_id: Optional[int] = None
) -> Dict[str, Any]:
"""
Manage workflows and automation rules.
Parameters:
action: Action (create_automation, trigger, list, activate, deactivate)
workflow_data: Workflow definition data
automation_id: Automation rule ID
Returns:
Dictionary with operation result
"""
odoo = ctx.request_context.lifespan_context.odoo
try:
if action == "create_automation":
# Create automation rule
if not workflow_data:
return {"success": False, "error": "Missing workflow_data"}
model_ids = odoo.search("ir.model", [["model", "=", workflow_data.get("model")]], limit=1)
if not model_ids:
return {"success": False, "error": "Model not found"}
automation_id = odoo.create("base.automation", {
"name": workflow_data.get("name", "New Automation"),
"model_id": model_ids[0],
"trigger": workflow_data.get("trigger", "on_create"),
"filter_domain": workflow_data.get("filter_domain", "[]"),
"code": workflow_data.get("code", "# Python code here"),
"active": workflow_data.get("active", True)
})
return {
"success": True,
"automation_id": automation_id,
"action": "create_automation"
}
elif action == "trigger":
# Manually trigger automation
if not automation_id:
return {"success": False, "error": "Missing automation_id"}
odoo.execute("base.automation", "run", [automation_id])
return {
"success": True,
"automation_id": automation_id,
"action": "trigger"
}
elif action == "list":
# List automation rules
automations = odoo.search_read(
"base.automation",
[],
fields=["name", "model_id", "trigger", "active", "last_run"]
)
return {
"success": True,
"automations": automations
}
elif action in ["activate", "deactivate"]:
# Activate/deactivate automation
if not automation_id:
return {"success": False, "error": "Missing automation_id"}
active = action == "activate"
odoo.write("base.automation", [automation_id], {"active": active})
return {
"success": True,
"automation_id": automation_id,
"action": action,
"active": active
}
else:
return {"success": False, "error": f"Unknown action: {action}"}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool(description="Security and audit management")
def manage_security(
ctx: Context,
action: str,
user_ids: Optional[List[int]] = None,
rule_data: Optional[Dict[str, Any]] = None,
audit_params: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Manage security and audit features.
Parameters:
action: Action (audit_trail, security_rules, user_sessions, password_policy)
user_ids: List of user IDs for operations
rule_data: Security rule data
audit_params: Audit parameters
Returns:
Dictionary with operation result
"""
odoo = ctx.request_context.lifespan_context.odoo
try:
if action == "audit_trail":
# Get audit trail
domain = []
if audit_params:
if "model" in audit_params:
domain.append(["model", "=", audit_params["model"]])
if "user_id" in audit_params:
domain.append(["user_id", "=", audit_params["user_id"]])
# Try different audit approaches
try:
# Try auditlog module
audit_logs = odoo.search_read(
"auditlog.log",
domain,
fields=["name", "model_id", "user_id", "method", "create_date"],
limit=100,
order="create_date desc"
)
except:
try:
# Try mail tracking
audit_logs = odoo.search_read(
"mail.tracking.value",
[],
fields=["field", "old_value_text", "new_value_text", "create_date"],
limit=100,
order="create_date desc"
)
except:
# Fallback to mail messages
audit_logs = odoo.search_read(
"mail.message",
[["message_type", "=", "notification"]],
fields=["subject", "body", "create_date", "author_id"],
limit=50,
order="create_date desc"
)
return {
"success": True,
"audit_logs": audit_logs
}
elif action == "security_rules":
# Manage record rules
if rule_data:
# Create security rule
model_ids = odoo.search("ir.model", [["model", "=", rule_data.get("model")]], limit=1)
if not model_ids:
return {"success": False, "error": "Model not found"}
rule_id = odoo.create("ir.rule", {
"name": rule_data.get("name", "New Rule"),
"model_id": model_ids[0],
"domain_force": rule_data.get("domain", "[]"),
"perm_read": rule_data.get("perm_read", True),
"perm_write": rule_data.get("perm_write", False),
"perm_create": rule_data.get("perm_create", False),
"perm_unlink": rule_data.get("perm_unlink", False)
})
return {
"success": True,
"rule_id": rule_id,
"action": "create_rule"
}
else:
# List security rules
rules = odoo.search_read(
"ir.rule",
[],
fields=["name", "model_id", "domain_force", "active"]
)
return {
"success": True,
"security_rules": rules
}
elif action == "user_sessions":
# Manage user sessions
sessions = odoo.search_read(
"res.users",
[["id", "in", user_ids]] if user_ids else [],
fields=["name", "login_date", "active", "partner_id"]
)
return {
"success": True,
"user_sessions": sessions
}
else:
return {"success": False, "error": f"Unknown action: {action}"}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool(description="Batch operations for bulk processing")
def batch_operations(
ctx: Context,
operation: str,
model_name: str,
batch_data: List[Dict[str, Any]],
batch_size: int = 100
) -> Dict[str, Any]:
"""
Perform batch operations for bulk processing.
Parameters:
operation: Operation type (create, update, delete)
model_name: Target model name
batch_data: List of data for batch processing
batch_size: Number of records per batch
Returns:
Dictionary with batch operation results
"""
odoo = ctx.request_context.lifespan_context.odoo
processed = 0
errors = []
created_ids = []
try:
# Process in batches
for i in range(0, len(batch_data), batch_size):
batch = batch_data[i:i + batch_size]
try:
if operation == "create":
# Batch create
for data in batch:
record_id = odoo.create(model_name, data)
created_ids.append(record_id)
processed += 1
elif operation == "update":
# Batch update
for data in batch:
if "id" in data:
record_id = data.pop("id")
odoo.write(model_name, [record_id], data)
processed += 1
else:
errors.append({"data": data, "error": "Missing ID for update"})
elif operation == "delete":
# Batch delete
record_ids = [data.get("id") for data in batch if data.get("id")]
if record_ids:
odoo.unlink(model_name, record_ids)
processed += len(record_ids)
except Exception as e:
errors.append({
"batch_start": i,
"batch_end": i + len(batch),
"error": str(e)
})
return {
"success": len(errors) == 0,
"operation": operation,
"model": model_name,
"processed": processed,
"total": len(batch_data),
"created_ids": created_ids if operation == "create" else None,
"errors": errors
}
except Exception as e:
return {"success": False, "error": str(e)}
# ----- Advanced Specialized Tools -----
@mcp.tool(description="Advanced financial operations and accounting")
def manage_finances(
ctx: Context,
action: str,
account_data: Optional[Dict[str, Any]] = None,
move_data: Optional[Dict[str, Any]] = None,
reconcile_data: Optional[Dict[str, Any]] = None,
period_data: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Advanced financial operations including journal entries, reconciliation, and closing.
Parameters:
action: Action (create_journal_entry, reconcile_accounts, close_period, tax_calculation)
account_data: Account configuration data
move_data: Journal move data
reconcile_data: Reconciliation data
period_data: Period closing data
Returns:
Dictionary with financial operation result
"""
odoo = ctx.request_context.lifespan_context.odoo
try:
if action == "create_journal_entry":
# Create journal entry
if not move_data:
return {"success": False, "error": "Missing move_data"}
move_id = odoo.create("account.move", {
"journal_id": move_data.get("journal_id"),
"date": move_data.get("date"),
"ref": move_data.get("ref", "Manual Entry"),
"line_ids": [(0, 0, line) for line in move_data.get("line_ids", [])]
})
# Post the move if requested
if move_data.get("auto_post", False):
odoo.execute("account.move", "action_post", [move_id])
return {
"success": True,
"move_id": move_id,
"action": "create_journal_entry"
}
elif action == "reconcile_accounts":
# Reconcile account moves
if not reconcile_data:
return {"success": False, "error": "Missing reconcile_data"}
move_line_ids = reconcile_data.get("move_line_ids", [])
if len(move_line_ids) < 2:
return {"success": False, "error": "Need at least 2 move lines to reconcile"}
# Perform reconciliation
odoo.execute("account.move.line", "reconcile", move_line_ids)
return {
"success": True,
"reconciled_lines": len(move_line_ids),
"action": "reconcile_accounts"
}
elif action == "tax_calculation":
# Calculate taxes for moves
if not move_data:
return {"success": False, "error": "Missing move_data"}
move_ids = move_data.get("move_ids", [])
tax_results = []
for move_id in move_ids:
try:
result = odoo.execute("account.move", "compute_taxes", [move_id])
tax_results.append({"move_id": move_id, "taxes": result})
except Exception as e:
tax_results.append({"move_id": move_id, "error": str(e)})
return {
"success": True,
"tax_results": tax_results,
"action": "tax_calculation"
}
else:
return {"success": False, "error": f"Unknown action: {action}"}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool(description="Inventory and logistics management")
def manage_inventory(
ctx: Context,
action: str,
product_data: Optional[Dict[str, Any]] = None,
stock_data: Optional[Dict[str, Any]] = None,
picking_data: Optional[Dict[str, Any]] = None,
warehouse_data: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Comprehensive inventory and logistics operations.
Parameters:
action: Action (stock_move, inventory_adjustment, create_picking, warehouse_config)
product_data: Product information
stock_data: Stock movement data
picking_data: Picking/delivery data
warehouse_data: Warehouse configuration
Returns:
Dictionary with inventory operation result
"""
odoo = ctx.request_context.lifespan_context.odoo
try:
if action == "stock_move":
# Create stock movement
if not stock_data:
return {"success": False, "error": "Missing stock_data"}
move_id = odoo.create("stock.move", {
"product_id": stock_data.get("product_id"),
"product_uom_qty": stock_data.get("quantity"),
"location_id": stock_data.get("location_from"),
"location_dest_id": stock_data.get("location_to"),
"name": stock_data.get("name", "Stock Movement"),
"picking_type_id": stock_data.get("picking_type_id")
})
# Confirm and process move
if stock_data.get("auto_confirm", False):
odoo.execute("stock.move", "_action_confirm", [move_id])
odoo.execute("stock.move", "_action_done", [move_id])
return {
"success": True,
"move_id": move_id,
"action": "stock_move"
}
elif action == "inventory_adjustment":
# Create inventory adjustment
if not stock_data:
return {"success": False, "error": "Missing stock_data"}
inventory_id = odoo.create("stock.inventory", {
"name": stock_data.get("name", "Inventory Adjustment"),
"location_ids": [(6, 0, stock_data.get("location_ids", []))],
"product_ids": [(6, 0, stock_data.get("product_ids", []))]
})
# Start and validate inventory
odoo.execute("stock.inventory", "action_start", [inventory_id])
# Add inventory lines
for line_data in stock_data.get("lines", []):
odoo.create("stock.inventory.line", {
"inventory_id": inventory_id,
"product_id": line_data.get("product_id"),
"product_qty": line_data.get("quantity"),
"location_id": line_data.get("location_id")
})
if stock_data.get("auto_validate", False):
odoo.execute("stock.inventory", "action_validate", [inventory_id])
return {
"success": True,
"inventory_id": inventory_id,
"action": "inventory_adjustment"
}
elif action == "create_picking":
# Create delivery/receipt
if not picking_data:
return {"success": False, "error": "Missing picking_data"}
picking_id = odoo.create("stock.picking", {
"picking_type_id": picking_data.get("picking_type_id"),
"partner_id": picking_data.get("partner_id"),
"location_id": picking_data.get("location_id"),
"location_dest_id": picking_data.get("location_dest_id"),
"move_ids_without_package": [(0, 0, move) for move in picking_data.get("moves", [])]
})
# Confirm picking
if picking_data.get("auto_confirm", False):
odoo.execute("stock.picking", "action_confirm", [picking_id])
return {
"success": True,
"picking_id": picking_id,
"action": "create_picking"
}
else:
return {"success": False, "error": f"Unknown action: {action}"}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool(description="Advanced HR and payroll operations")
def manage_hr_advanced(
ctx: Context,
action: str,
employee_data: Optional[Dict[str, Any]] = None,
payroll_data: Optional[Dict[str, Any]] = None,
attendance_data: Optional[Dict[str, Any]] = None,
performance_data: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Advanced HR operations including payroll, attendance, and performance.
Parameters:
action: Action (process_payroll, track_attendance, performance_review, training_management)
employee_data: Employee information
payroll_data: Payroll processing data
attendance_data: Attendance tracking data
performance_data: Performance review data
Returns:
Dictionary with HR operation result
"""
odoo = ctx.request_context.lifespan_context.odoo
try:
if action == "process_payroll":
# Process payroll
if not payroll_data:
return {"success": False, "error": "Missing payroll_data"}
# Create payslip batch
batch_id = odoo.create("hr.payslip.run", {
"name": payroll_data.get("name", "Payroll Batch"),
"date_start": payroll_data.get("date_start"),
"date_end": payroll_data.get("date_end")
})
# Generate payslips for employees
employee_ids = payroll_data.get("employee_ids", [])
payslip_ids = []
for emp_id in employee_ids:
payslip_id = odoo.create("hr.payslip", {
"employee_id": emp_id,
"payslip_run_id": batch_id,
"date_from": payroll_data.get("date_start"),
"date_to": payroll_data.get("date_end")
})
# Compute payslip
odoo.execute("hr.payslip", "compute_sheet", [payslip_id])
payslip_ids.append(payslip_id)
return {
"success": True,
"batch_id": batch_id,
"payslip_ids": payslip_ids,
"action": "process_payroll"
}
elif action == "track_attendance":
# Track employee attendance
if not attendance_data:
return {"success": False, "error": "Missing attendance_data"}
attendance_id = odoo.create("hr.attendance", {
"employee_id": attendance_data.get("employee_id"),
"check_in": attendance_data.get("check_in"),
"check_out": attendance_data.get("check_out"),
"worked_hours": attendance_data.get("worked_hours")
})
return {
"success": True,
"attendance_id": attendance_id,
"action": "track_attendance"
}
else:
return {"success": False, "error": f"Unknown action: {action}"}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool(description="Manufacturing and production management")
def manage_manufacturing(
ctx: Context,
action: str,
bom_data: Optional[Dict[str, Any]] = None,
production_data: Optional[Dict[str, Any]] = None,
quality_data: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Manufacturing and production operations.
Parameters:
action: Action (create_bom, start_production, quality_check)
bom_data: Bill of Materials data
production_data: Production order data
quality_data: Quality control data
Returns:
Dictionary with manufacturing operation result
"""
odoo = ctx.request_context.lifespan_context.odoo
try:
if action == "create_bom":
# Create Bill of Materials
if not bom_data:
return {"success": False, "error": "Missing bom_data"}
bom_id = odoo.create("mrp.bom", {
"product_tmpl_id": bom_data.get("product_tmpl_id"),
"product_qty": bom_data.get("product_qty", 1.0),
"type": bom_data.get("type", "normal"),
"bom_line_ids": [(0, 0, line) for line in bom_data.get("components", [])]
})
return {
"success": True,
"bom_id": bom_id,
"action": "create_bom"
}
elif action == "start_production":
# Start manufacturing order
if not production_data:
return {"success": False, "error": "Missing production_data"}
mo_id = odoo.create("mrp.production", {
"product_id": production_data.get("product_id"),
"product_qty": production_data.get("quantity"),
"bom_id": production_data.get("bom_id"),
"date_planned_start": production_data.get("date_start")
})
# Confirm production order
if production_data.get("auto_confirm", False):
odoo.execute("mrp.production", "action_confirm", [mo_id])
return {
"success": True,
"production_id": mo_id,
"action": "start_production"
}
else:
return {"success": False, "error": f"Unknown action: {action}"}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool(description="E-commerce and website management")
def manage_ecommerce(
ctx: Context,
action: str,
website_data: Optional[Dict[str, Any]] = None,
product_data: Optional[Dict[str, Any]] = None,
order_data: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
E-commerce and website operations.
Parameters:
action: Action (sync_products, process_orders, manage_website)
website_data: Website configuration
product_data: Product information
order_data: Order processing data
Returns:
Dictionary with e-commerce operation result
"""
odoo = ctx.request_context.lifespan_context.odoo
try:
if action == "sync_products":
# Synchronize products to website
if not product_data:
return {"success": False, "error": "Missing product_data"}
product_ids = product_data.get("product_ids", [])
synced_products = []
for product_id in product_ids:
# Update product for website
odoo.write("product.template", [product_id], {
"is_published": product_data.get("is_published", True),
"website_sequence": product_data.get("sequence", 1),
"website_size_x": product_data.get("size_x", 1),
"website_size_y": product_data.get("size_y", 1)
})
synced_products.append(product_id)
return {
"success": True,
"synced_products": synced_products,
"action": "sync_products"
}
elif action == "process_orders":
# Process website orders
if not order_data:
return {"success": False, "error": "Missing order_data"}
order_ids = order_data.get("order_ids", [])
processed_orders = []
for order_id in order_ids:
try:
# Confirm order
odoo.execute("sale.order", "action_confirm", [order_id])
# Create invoice if requested
if order_data.get("create_invoice", False):
invoice_id = odoo.execute("sale.order", "_create_invoices", [order_id])
processed_orders.append({"order_id": order_id, "invoice_id": invoice_id})
else:
processed_orders.append({"order_id": order_id})
except Exception as e:
processed_orders.append({"order_id": order_id, "error": str(e)})
return {
"success": True,
"processed_orders": processed_orders,
"action": "process_orders"
}
else:
return {"success": False, "error": f"Unknown action: {action}"}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool(description="Project management and planning")
def manage_projects(
ctx: Context,
action: str,
project_data: Optional[Dict[str, Any]] = None,
task_data: Optional[Dict[str, Any]] = None,
timesheet_data: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Project management and planning operations.
Parameters:
action: Action (create_project, manage_tasks, track_time)
project_data: Project information
task_data: Task management data
timesheet_data: Time tracking data
Returns:
Dictionary with project operation result
"""
odoo = ctx.request_context.lifespan_context.odoo
try:
if action == "create_project":
# Create project
if not project_data:
return {"success": False, "error": "Missing project_data"}
project_id = odoo.create("project.project", {
"name": project_data.get("name"),
"partner_id": project_data.get("partner_id"),
"date_start": project_data.get("date_start"),
"date": project_data.get("date_end"),
"user_id": project_data.get("project_manager_id")
})
# Create initial tasks if provided
for task in project_data.get("tasks", []):
odoo.create("project.task", {
"name": task.get("name"),
"project_id": project_id,
"user_ids": [(6, 0, task.get("user_ids", []))],
"date_deadline": task.get("date_deadline")
})
return {
"success": True,
"project_id": project_id,
"action": "create_project"
}
elif action == "track_time":
# Time tracking
if not timesheet_data:
return {"success": False, "error": "Missing timesheet_data"}
timesheet_id = odoo.create("account.analytic.line", {
"task_id": timesheet_data.get("task_id"),
"project_id": timesheet_data.get("project_id"),
"employee_id": timesheet_data.get("employee_id"),
"unit_amount": timesheet_data.get("hours"),
"date": timesheet_data.get("date"),
"name": timesheet_data.get("description", "Time tracking")
})
return {
"success": True,
"timesheet_id": timesheet_id,
"action": "track_time"
}
else:
return {"success": False, "error": f"Unknown action: {action}"}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool(description="Integration and API management")
def manage_integrations(
ctx: Context,
action: str,
api_data: Optional[Dict[str, Any]] = None,
webhook_data: Optional[Dict[str, Any]] = None,
sync_data: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Integration and API management operations.
Parameters:
action: Action (create_api_key, manage_webhooks, sync_external)
api_data: API configuration data
webhook_data: Webhook configuration
sync_data: Data synchronization settings
Returns:
Dictionary with integration operation result
"""
odoo = ctx.request_context.lifespan_context.odoo
try:
if action == "create_api_key":
# Create API access token
if not api_data:
return {"success": False, "error": "Missing api_data"}
token_id = odoo.create("res.users.apikeys", {
"user_id": api_data.get("user_id"),
"name": api_data.get("name", "API Key"),
"scope": api_data.get("scope", "userinfo")
})
return {
"success": True,
"token_id": token_id,
"action": "create_api_key"
}
elif action == "sync_external":
# External system synchronization
if not sync_data:
return {"success": False, "error": "Missing sync_data"}
sync_results = []
external_system = sync_data.get("system")
sync_model = sync_data.get("model")
# Example sync logic (would be customized per integration)
records = odoo.search_read(
sync_model,
sync_data.get("domain", []),
fields=sync_data.get("fields", [])
)
for record in records:
try:
# Simulate external sync
external_id = f"{external_system}_{record['id']}"
sync_results.append({
"record_id": record["id"],
"external_id": external_id,
"status": "synced"
})
except Exception as e:
sync_results.append({
"record_id": record["id"],
"status": "error",
"error": str(e)
})
return {
"success": True,
"sync_results": sync_results,
"action": "sync_external"
}
else:
return {"success": False, "error": f"Unknown action: {action}"}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool(description="Performance monitoring and optimization")
def manage_performance(
ctx: Context,
action: str,
monitor_data: Optional[Dict[str, Any]] = None,
optimization_data: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Performance monitoring and system optimization.
Parameters:
action: Action (monitor_system, optimize_queries, generate_reports)
monitor_data: Monitoring configuration
optimization_data: Optimization settings
Returns:
Dictionary with performance operation result
"""
odoo = ctx.request_context.lifespan_context.odoo
try:
if action == "monitor_system":
# System monitoring
system_stats = {
"database_size": 0,
"active_users": 0,
"memory_usage": 0,
"slow_queries": []
}
try:
# Get database size
size_result = odoo.execute("base", "execute_sql",
"SELECT pg_size_pretty(pg_database_size(current_database()))")
system_stats["database_size"] = size_result
# Get active user count
active_users = odoo.search_count("res.users", [["active", "=", True]])
system_stats["active_users"] = active_users
except Exception as e:
system_stats["error"] = str(e)
return {
"success": True,
"system_stats": system_stats,
"action": "monitor_system"
}
elif action == "generate_reports":
# Generate performance reports
report_data = {
"timestamp": str(datetime.now()),
"metrics": {}
}
try:
# Collect various metrics
metrics = {
"total_users": odoo.search_count("res.users", []),
"total_partners": odoo.search_count("res.partner", []),
"total_products": odoo.search_count("product.template", []),
"active_sessions": 0 # Would need session tracking
}
report_data["metrics"] = metrics
except Exception as e:
report_data["error"] = str(e)
return {
"success": True,
"report_data": report_data,
"action": "generate_reports"
}
else:
return {"success": False, "error": f"Unknown action: {action}"}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool(description="Backup and recovery operations")
def manage_backup_recovery(
ctx: Context,
action: str,
backup_data: Optional[Dict[str, Any]] = None,
schedule_data: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Backup and recovery operations for data protection.
Parameters:
action: Action (create_backup, schedule_backup, verify_backup)
backup_data: Backup configuration
schedule_data: Backup schedule settings
Returns:
Dictionary with backup operation result
"""
odoo = ctx.request_context.lifespan_context.odoo
try:
if action == "create_backup":
# Create database backup
if not backup_data:
return {"success": False, "error": "Missing backup_data"}
backup_name = backup_data.get("name", f"backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}")
include_filestore = backup_data.get("include_filestore", True)
try:
# Create backup (this would typically use Odoo's backup API)
backup_result = {
"backup_name": backup_name,
"timestamp": str(datetime.now()),
"include_filestore": include_filestore,
"status": "completed"
}
return {
"success": True,
"backup_name": backup_name,
"backup_result": backup_result,
"action": "create_backup"
}
except Exception as e:
return {"success": False, "error": f"Backup failed: {str(e)}"}
elif action == "schedule_backup":
# Schedule automatic backups
if not schedule_data:
return {"success": False, "error": "Missing schedule_data"}
model_ids = odoo.search("ir.model", [["model", "=", "base"]], limit=1)
if not model_ids:
return {"success": False, "error": "Base model not found"}
cron_id = odoo.create("ir.cron", {
"name": schedule_data.get("name", "Scheduled Backup"),
"model_id": model_ids[0],
"code": "model.backup_database()",
"interval_number": schedule_data.get("interval_number", 1),
"interval_type": schedule_data.get("interval_type", "days"),
"active": schedule_data.get("active", True)
})
return {
"success": True,
"cron_id": cron_id,
"action": "schedule_backup"
}
else:
return {"success": False, "error": f"Unknown action: {action}"}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool(description="Multi-company management operations")
def manage_multicompany(
ctx: Context,
action: str,
company_data: Optional[Dict[str, Any]] = None,
intercompany_data: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Multi-company management and intercompany operations.
Parameters:
action: Action (create_company, intercompany_transfer, sync_data)
company_data: Company configuration
intercompany_data: Intercompany transaction data
Returns:
Dictionary with multi-company operation result
"""
odoo = ctx.request_context.lifespan_context.odoo
try:
if action == "create_company":
# Create new company
if not company_data:
return {"success": False, "error": "Missing company_data"}
company_id = odoo.create("res.company", {
"name": company_data.get("name"),
"partner_id": company_data.get("partner_id"),
"currency_id": company_data.get("currency_id"),
"email": company_data.get("email"),
"phone": company_data.get("phone"),
"website": company_data.get("website")
})
return {
"success": True,
"company_id": company_id,
"action": "create_company"
}
elif action == "intercompany_transfer":
# Intercompany transactions
if not intercompany_data:
return {"success": False, "error": "Missing intercompany_data"}
# Create intercompany sale order
sale_order_id = odoo.create("sale.order", {
"partner_id": intercompany_data.get("buying_company_partner_id"),
"company_id": intercompany_data.get("selling_company_id"),
"order_line": [(0, 0, line) for line in intercompany_data.get("lines", [])]
})
# Auto-confirm if requested
if intercompany_data.get("auto_confirm", False):
odoo.execute("sale.order", "action_confirm", [sale_order_id])
return {
"success": True,
"sale_order_id": sale_order_id,
"action": "intercompany_transfer"
}
else:
return {"success": False, "error": f"Unknown action: {action}"}
except Exception as e:
return {"success": False, "error": str(e)}