"""
Utility tools for Odoo.
Provides general-purpose tools like connection testing and generic search.
"""
from __future__ import annotations
from collections import defaultdict
from typing import TYPE_CHECKING, Any
from ..constants import OdooModel
from ..decorators import handle_odoo_errors
from ..formatters import MarkdownBuilder, format_money
from ..validators import validate_date_range
from .base import get_odoo_client, normalize_pagination
if TYPE_CHECKING:
from mcp.server.fastmcp import FastMCP
def register_tools(mcp: "FastMCP") -> None:
"""Register utility tools with the MCP server."""
@mcp.tool()
@handle_odoo_errors
def test_connection() -> str:
"""
Test the connection to Odoo.
Returns:
Connection and user information
"""
client = get_odoo_client()
info = client.test_connection()
builder = MarkdownBuilder("Odoo Connection Test")
builder.add_key_value({
"Status": info["status"],
"Server Version": info["server_version"],
"Database": info["database"],
"User": info["user"],
"User ID": info["user_id"],
"Company": info["company"],
})
return builder.build()
@mcp.tool()
@handle_odoo_errors
def search_records(
model: str,
field: str,
value: str,
limit: int = 20,
offset: int = 0,
) -> str:
"""
Search records in any Odoo model.
Args:
model: Technical model name (e.g., res.partner, sale.order)
field: Field to search on (e.g., name, email)
value: Value to search for (partial match with ilike)
limit: Maximum number of results (default: 20)
offset: Offset for pagination (default: 0)
Returns:
List of found records
"""
limit, offset = normalize_pagination(limit, offset)
client = get_odoo_client()
# Search with ilike for partial matching
domain = [(field, "ilike", value)]
# Try to get common fields
try:
records = client.search_read(
model,
domain,
["name", "display_name"],
limit=limit,
offset=offset,
)
except Exception:
# If name/display_name don't exist, just get IDs
ids = client.search(model, domain, limit=limit, offset=offset)
if not ids:
return f"No records found in {model} where {field} matches '{value}'."
records = [{"id": id, "name": f"Record {id}"} for id in ids]
if not records:
return f"No records found in {model} where {field} matches '{value}'."
builder = MarkdownBuilder(f"Search Results: {model}")
builder.add_text(f"*Search: {field} contains '{value}'*\n")
rows = []
for rec in records:
name = rec.get("display_name") or rec.get("name") or f"ID {rec['id']}"
rows.append([rec["id"], name])
builder.add_table(
["ID", "Name"],
rows,
alignments=["right", "left"],
)
builder.add_pagination(len(records), limit, offset)
return builder.build()
@mcp.tool()
@handle_odoo_errors
def get_rd_project_costs(
date_from: str | None = None,
date_to: str | None = None,
) -> str:
"""
Get total costs per R&D project from analytic accounts.
Args:
date_from: Start date (format YYYY-MM-DD, optional)
date_to: End date (format YYYY-MM-DD, optional)
Returns:
Summary of costs by R&D project
"""
date_from, date_to = validate_date_range(date_from, date_to)
client = get_odoo_client()
# Build domain
domain: list[Any] = []
if date_from:
domain.append(("date", ">=", date_from))
if date_to:
domain.append(("date", "<=", date_to))
# Get analytic lines
lines = client.search_read(
OdooModel.ANALYTIC_LINE,
domain,
["account_id", "amount", "name", "date"],
limit=500,
)
if not lines:
msg = "No analytic lines found"
if date_from or date_to:
msg += f" (period: {date_from or 'start'} to {date_to or 'end'})"
return msg + "."
# Aggregate by account
account_totals: dict[str, float] = defaultdict(float)
account_names: dict[str, str] = {}
for line in lines:
account = line.get("account_id")
if account and isinstance(account, (list, tuple)) and len(account) >= 2:
acc_id = str(account[0])
acc_name = account[1]
account_totals[acc_id] += abs(line.get("amount", 0))
account_names[acc_id] = acc_name
builder = MarkdownBuilder("R&D Project Costs")
if date_from or date_to:
builder.add_text(f"*Period: {date_from or 'start'} to {date_to or 'end'}*\n")
rows = []
total = 0.0
for acc_id, amount in sorted(account_totals.items(), key=lambda x: -x[1]):
name = account_names.get(acc_id, f"Account {acc_id}")
total += amount
rows.append([
acc_id,
name,
format_money(amount),
])
builder.add_table(
["ID", "Project/Account", "Total Cost"],
rows,
alignments=["right", "left", "right"],
)
builder.add_line()
builder.add_text(f"**Grand Total: {format_money(total)}**")
return builder.build()