Skip to main content
Glama
hachecito

Odoo MCP Improved

by hachecito
tools_purchase.py11 kB
""" Implementación de herramientas (tools) para compras en MCP-Odoo """ from typing import Dict, List, Any, Optional from datetime import datetime, timedelta from mcp.server.fastmcp import FastMCP, Context from .models import ( PurchaseOrderFilter, PurchaseOrderCreate, SupplierPerformanceInput ) def register_purchase_tools(mcp: FastMCP) -> None: """Registra herramientas relacionadas con compras""" @mcp.tool(description="Busca órdenes de compra con filtros avanzados") def search_purchase_orders( ctx: Context, filters: PurchaseOrderFilter ) -> Dict[str, Any]: """ Busca órdenes de compra según los filtros especificados Args: filters: Filtros para la búsqueda de órdenes Returns: Diccionario con resultados de la búsqueda """ odoo = ctx.request_context.lifespan_context.odoo try: # Construir dominio de búsqueda domain = [] if filters.partner_id: domain.append(("partner_id", "=", filters.partner_id)) if filters.date_from: try: datetime.strptime(filters.date_from, "%Y-%m-%d") domain.append(("date_order", ">=", filters.date_from)) except ValueError: return {"success": False, "error": f"Formato de fecha inválido: {filters.date_from}. Use YYYY-MM-DD."} if filters.date_to: try: datetime.strptime(filters.date_to, "%Y-%m-%d") domain.append(("date_order", "<=", filters.date_to)) except ValueError: return {"success": False, "error": f"Formato de fecha inválido: {filters.date_to}. Use YYYY-MM-DD."} if filters.state: domain.append(("state", "=", filters.state)) # Campos a recuperar fields = [ "name", "partner_id", "date_order", "amount_total", "state", "invoice_status", "user_id", "order_line", "date_planned", "date_approve" ] # Ejecutar búsqueda orders = odoo.search_read( "purchase.order", domain, fields=fields, limit=filters.limit, offset=filters.offset, order=filters.order ) # Obtener el conteo total sin límite para paginación total_count = odoo.execute_method("purchase.order", "search_count", domain) return { "success": True, "result": { "count": len(orders), "total_count": total_count, "orders": orders } } except Exception as e: return {"success": False, "error": str(e)} @mcp.tool(description="Crear una nueva orden de compra") def create_purchase_order( ctx: Context, order: PurchaseOrderCreate ) -> Dict[str, Any]: """ Crea una nueva orden de compra Args: order: Datos de la orden a crear Returns: Respuesta con el resultado de la operación """ odoo = ctx.request_context.lifespan_context.odoo try: # Preparar valores para la orden order_vals = { "partner_id": order.partner_id, "order_line": [] } if order.date_order: try: datetime.strptime(order.date_order, "%Y-%m-%d") order_vals["date_order"] = order.date_order except ValueError: return {"success": False, "error": f"Formato de fecha inválido: {order.date_order}. Use YYYY-MM-DD."} # Preparar líneas de orden for line in order.order_lines: line_vals = [ 0, 0, { "product_id": line.product_id, "product_qty": line.product_qty } ] if line.price_unit is not None: line_vals[2]["price_unit"] = line.price_unit order_vals["order_line"].append(line_vals) # Crear orden order_id = odoo.execute_method("purchase.order", "create", order_vals) # Obtener información de la orden creada order_info = odoo.execute_method("purchase.order", "read", [order_id], ["name"])[0] return { "success": True, "result": { "order_id": order_id, "order_name": order_info["name"] } } except Exception as e: return {"success": False, "error": str(e)} @mcp.tool(description="Analiza el rendimiento de los proveedores") def analyze_supplier_performance( ctx: Context, params: SupplierPerformanceInput ) -> Dict[str, Any]: """ Analiza el rendimiento de los proveedores en un período específico Args: params: Parámetros para el análisis Returns: Diccionario con resultados del análisis """ odoo = ctx.request_context.lifespan_context.odoo try: # Validar fechas try: datetime.strptime(params.date_from, "%Y-%m-%d") datetime.strptime(params.date_to, "%Y-%m-%d") except ValueError: return {"success": False, "error": "Formato de fecha inválido. Use YYYY-MM-DD."} # Construir dominio para órdenes confirmadas domain = [ ("date_order", ">=", params.date_from), ("date_order", "<=", params.date_to), ("state", "in", ["purchase", "done"]) ] # Filtrar por proveedores específicos si se proporcionan if params.supplier_ids: domain.append(("partner_id", "in", params.supplier_ids)) # Obtener datos de compras purchase_data = odoo.search_read( "purchase.order", domain, fields=[ "name", "partner_id", "date_order", "amount_total", "date_approve", "date_planned", "effective_date" ] ) # Agrupar por proveedor supplier_data = {} for order in purchase_data: supplier_id = order["partner_id"][0] if order["partner_id"] else 0 supplier_name = order["partner_id"][1] if order["partner_id"] else "Desconocido" if supplier_id not in supplier_data: supplier_data[supplier_id] = { "name": supplier_name, "order_count": 0, "total_amount": 0, "orders": [], "on_time_delivery_count": 0, "late_delivery_count": 0, "avg_delay_days": 0 } supplier_data[supplier_id]["order_count"] += 1 supplier_data[supplier_id]["total_amount"] += order["amount_total"] # Calcular métricas de entrega a tiempo if order.get("effective_date") and order.get("date_planned"): effective_date = datetime.strptime(order["effective_date"].split(" ")[0], "%Y-%m-%d") planned_date = datetime.strptime(order["date_planned"].split(" ")[0], "%Y-%m-%d") delay_days = (effective_date - planned_date).days order_info = { "id": order["id"], "name": order["name"], "amount": order["amount_total"], "date_order": order["date_order"], "planned_date": order["date_planned"], "effective_date": order["effective_date"], "delay_days": delay_days } supplier_data[supplier_id]["orders"].append(order_info) if delay_days <= 0: supplier_data[supplier_id]["on_time_delivery_count"] += 1 else: supplier_data[supplier_id]["late_delivery_count"] += 1 # Calcular métricas adicionales for supplier_id, data in supplier_data.items(): # Calcular promedio de días de retraso delay_days = [order["delay_days"] for order in data["orders"] if "delay_days" in order] if delay_days: data["avg_delay_days"] = sum(delay_days) / len(delay_days) # Calcular porcentaje de entregas a tiempo total_deliveries = data["on_time_delivery_count"] + data["late_delivery_count"] if total_deliveries > 0: data["on_time_delivery_rate"] = (data["on_time_delivery_count"] / total_deliveries) * 100 else: data["on_time_delivery_rate"] = 0 # Eliminar lista detallada de órdenes para reducir tamaño de respuesta data.pop("orders", None) # Ordenar proveedores por monto total top_suppliers = sorted( supplier_data.items(), key=lambda x: x[1]["total_amount"], reverse=True ) # Preparar resultado result = { "period": { "from": params.date_from, "to": params.date_to }, "summary": { "supplier_count": len(supplier_data), "order_count": len(purchase_data), "total_amount": sum(order["amount_total"] for order in purchase_data) }, "suppliers": [ {"id": k, **v} for k, v in top_suppliers ] } return {"success": True, "result": result} except Exception as e: return {"success": False, "error": str(e)}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/hachecito/odoo-mcp-improved'

If you have feedback or need assistance with the MCP directory API, please join our Discord server