Skip to main content
Glama
golfamigo

Odoo MCP Unified Server

by golfamigo
tools_sales.py14.4 kB
""" Implementación de herramientas (tools) para ventas en MCP-Odoo """ from typing import Dict, List, Any, Optional from datetime import datetime, timedelta from fastmcp import FastMCP from .models import ( SalesOrderFilter, SalesOrderCreate, SalesPerformanceInput ) from .odoo_client import get_odoo_client def register_sales_tools(mcp: FastMCP) -> None: """Registra herramientas relacionadas con ventas""" # Helper function to get Odoo client def _get_odoo(): return get_odoo_client() @mcp.tool(description="Busca pedidos de venta con filtros avanzados") def search_sales_orders( partner_id: Optional[int] = None, date_from: Optional[str] = None, date_to: Optional[str] = None, state: Optional[str] = None, limit: int = 20, offset: int = 0, order: Optional[str] = None ) -> Dict[str, Any]: """ Busca pedidos de venta según los filtros especificados Args: partner_id: Filtrar por cliente ID (opcional) date_from: Fecha inicial en formato YYYY-MM-DD (opcional) date_to: Fecha final en formato YYYY-MM-DD (opcional) state: Estado del pedido, ej: 'sale', 'draft', 'done' (opcional) limit: Límite de resultados (default: 20) offset: Offset para paginación (default: 0) order: Criterio de ordenación, ej: 'date_order DESC' (opcional) Returns: Diccionario con resultados de la búsqueda """ odoo = _get_odoo() try: # Construir dominio de búsqueda domain = [] if partner_id: domain.append(("partner_id", "=", partner_id)) if date_from: try: datetime.strptime(date_from, "%Y-%m-%d") domain.append(("date_order", ">=", date_from)) except ValueError: return {"success": False, "error": f"Formato de fecha inválido: {date_from}. Use YYYY-MM-DD."} if date_to: try: datetime.strptime(date_to, "%Y-%m-%d") domain.append(("date_order", "<=", date_to)) except ValueError: return {"success": False, "error": f"Formato de fecha inválido: {date_to}. Use YYYY-MM-DD."} if state: domain.append(("state", "=", state)) # Campos a recuperar fields = [ "name", "partner_id", "date_order", "amount_total", "state", "invoice_status", "user_id", "order_line" ] # Ejecutar búsqueda orders = odoo.search_read( "sale.order", domain, fields=fields, limit=limit, offset=offset, order=order ) # Obtener el conteo total sin límite para paginación total_count = odoo.execute_method("sale.order", "search_count", domain) return { "success": True, "result": { "count": len(orders), "total_count": total_count, "orders": orders } } except Exception as e: return {"success": False, "error": str(e)} @mcp.tool(description="Crear un nuevo pedido de venta") def create_sales_order( partner_id: int, order_lines: List[Dict[str, Any]], date_order: Optional[str] = None ) -> Dict[str, Any]: """ Crea un nuevo pedido de venta Args: partner_id: ID del cliente order_lines: Lista de líneas del pedido. Cada línea debe tener: - product_id (int): ID del producto - product_uom_qty (float): Cantidad - price_unit (float, opcional): Precio unitario date_order: Fecha del pedido en formato YYYY-MM-DD (opcional) Returns: Respuesta con el resultado de la operación """ odoo = _get_odoo() try: # Preparar valores para el pedido order_vals = { "partner_id": partner_id, "order_line": [] } if date_order: try: datetime.strptime(date_order, "%Y-%m-%d") order_vals["date_order"] = date_order except ValueError: return {"success": False, "error": f"Formato de fecha inválido: {date_order}. Use YYYY-MM-DD."} # Preparar líneas de pedido for line in order_lines: if not isinstance(line, dict): return {"success": False, "error": "Cada línea debe ser un diccionario"} if "product_id" not in line or "product_uom_qty" not in line: return {"success": False, "error": "Cada línea debe contener product_id y product_uom_qty"} line_vals = [ 0, 0, { "product_id": line["product_id"], "product_uom_qty": line["product_uom_qty"] } ] if "price_unit" in line and line["price_unit"] is not None: line_vals[2]["price_unit"] = line["price_unit"] order_vals["order_line"].append(line_vals) # Crear pedido order_id = odoo.execute_method("sale.order", "create", order_vals) # Obtener información del pedido creado order_info = odoo.execute_method("sale.order", "read", [order_id], ["name"])[0] return { "success": True, "result": { "order_id": order_id, "order_name": order_info["name"] } } except Exception as e: return {"success": False, "error": str(e)} @mcp.tool(description="Analiza el rendimiento de ventas en un período") def analyze_sales_performance( date_from: str, date_to: str, group_by: Optional[str] = None ) -> Dict[str, Any]: """ Analiza el rendimiento de ventas en un período específico Args: date_from: Fecha inicial en formato YYYY-MM-DD date_to: Fecha final en formato YYYY-MM-DD group_by: Agrupar resultados por 'product', 'customer', o 'salesperson' (opcional) Returns: Diccionario con resultados del análisis """ odoo = _get_odoo() try: # Validar fechas try: datetime.strptime(date_from, "%Y-%m-%d") datetime.strptime(date_to, "%Y-%m-%d") except ValueError: return {"success": False, "error": "Formato de fecha inválido. Use YYYY-MM-DD."} # Construir dominio para pedidos confirmados domain = [ ("date_order", ">=", date_from), ("date_order", "<=", date_to), ("state", "in", ["sale", "done"]) ] # Obtener datos de ventas sales_data = odoo.search_read( "sale.order", domain, fields=["name", "partner_id", "date_order", "amount_total", "user_id"] ) # Calcular período anterior para comparación date_from_dt = datetime.strptime(date_from, "%Y-%m-%d") date_to_dt = datetime.strptime(date_to, "%Y-%m-%d") delta = date_to_dt - date_from_dt prev_date_to = date_from_dt - timedelta(days=1) prev_date_from = prev_date_to - delta prev_domain = [ ("date_order", ">=", prev_date_from.strftime("%Y-%m-%d")), ("date_order", "<=", prev_date_to.strftime("%Y-%m-%d")), ("state", "in", ["sale", "done"]) ] prev_sales_data = odoo.search_read( "sale.order", prev_domain, fields=["amount_total"] ) # Calcular totales current_total = sum(order["amount_total"] for order in sales_data) previous_total = sum(order["amount_total"] for order in prev_sales_data) # Calcular cambio porcentual percent_change = 0 if previous_total > 0: percent_change = ((current_total - previous_total) / previous_total) * 100 # Agrupar según el parámetro group_by grouped_data = {} if group_by: if group_by == "product": # Obtener líneas de pedido para analizar productos order_ids = [order["id"] for order in sales_data] if order_ids: order_lines = odoo.search_read( "sale.order.line", [("order_id", "in", order_ids)], fields=["product_id", "product_uom_qty", "price_subtotal"] ) # Agrupar por producto product_data = {} for line in order_lines: product_id = line["product_id"][0] if line["product_id"] else 0 product_name = line["product_id"][1] if line["product_id"] else "Desconocido" if product_id not in product_data: product_data[product_id] = { "name": product_name, "quantity": 0, "amount": 0 } product_data[product_id]["quantity"] += line["product_uom_qty"] product_data[product_id]["amount"] += line["price_subtotal"] # Ordenar por monto top_products = sorted( product_data.items(), key=lambda x: x[1]["amount"], reverse=True ) grouped_data["products"] = [ {"id": k, **v} for k, v in top_products[:10] ] elif group_by == "customer": # Agrupar por cliente customer_data = {} for order in sales_data: customer_id = order["partner_id"][0] if order["partner_id"] else 0 customer_name = order["partner_id"][1] if order["partner_id"] else "Desconocido" if customer_id not in customer_data: customer_data[customer_id] = { "name": customer_name, "order_count": 0, "amount": 0 } customer_data[customer_id]["order_count"] += 1 customer_data[customer_id]["amount"] += order["amount_total"] # Ordenar por monto top_customers = sorted( customer_data.items(), key=lambda x: x[1]["amount"], reverse=True ) grouped_data["customers"] = [ {"id": k, **v} for k, v in top_customers[:10] ] elif group_by == "salesperson": # Agrupar por vendedor salesperson_data = {} for order in sales_data: salesperson_id = order["user_id"][0] if order["user_id"] else 0 salesperson_name = order["user_id"][1] if order["user_id"] else "Desconocido" if salesperson_id not in salesperson_data: salesperson_data[salesperson_id] = { "name": salesperson_name, "order_count": 0, "amount": 0 } salesperson_data[salesperson_id]["order_count"] += 1 salesperson_data[salesperson_id]["amount"] += order["amount_total"] # Ordenar por monto top_salespersons = sorted( salesperson_data.items(), key=lambda x: x[1]["amount"], reverse=True ) grouped_data["salespersons"] = [ {"id": k, **v} for k, v in top_salespersons ] # Preparar resultado result = { "period": { "from": date_from, "to": date_to }, "summary": { "order_count": len(sales_data), "total_amount": current_total, "previous_period": { "from": prev_date_from.strftime("%Y-%m-%d"), "to": prev_date_to.strftime("%Y-%m-%d"), "order_count": len(prev_sales_data), "total_amount": previous_total }, "percent_change": round(percent_change, 2) } } # Añadir datos agrupados si existen if grouped_data: result["grouped_data"] = grouped_data return {"success": True, "result": result} except Exception as e: return {"success": False, "error": str(e)}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/golfamigo/odooMcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server