Skip to main content
Glama
golfamigo

Odoo MCP Unified Server

by golfamigo
tools_inventory.py18.5 kB
""" Implementación de herramientas (tools) para inventario en MCP-Odoo """ from typing import Dict, List, Any, Optional from datetime import datetime, timedelta from fastmcp import FastMCP from .models import ( ProductAvailabilityInput, InventoryAdjustmentCreate, InventoryTurnoverInput ) from .odoo_client import get_odoo_client def register_inventory_tools(mcp: FastMCP) -> None: """Registra herramientas relacionadas con inventario""" # Helper function to get Odoo client def _get_odoo(): return get_odoo_client() @mcp.tool(description="Verifica la disponibilidad de stock para uno o más productos") def check_product_availability( product_ids: List[int], location_id: Optional[int] = None ) -> Dict[str, Any]: """ Verifica la disponibilidad de stock para uno o más productos Args: product_ids: Lista de IDs de productos a verificar location_id: ID de la ubicación específica (opcional) Returns: Diccionario con información de disponibilidad """ odoo = _get_odoo() try: # Verificar que los productos existen products = odoo.search_read( "product.product", [("id", "in", product_ids)], fields=["name", "default_code", "type", "uom_id"] ) if not products: return {"success": False, "error": "No se encontraron productos con los IDs proporcionados"} # Mapear IDs a nombres para referencia product_names = {p["id"]: p["name"] for p in products} # Obtener disponibilidad availability = {} for product_id in product_ids: # Construir contexto para la consulta context = {} if location_id: context["location"] = location_id # Obtener cantidad disponible usando el método qty_available try: product_data = odoo.execute_method( "product.product", "read", [product_id], ["qty_available", "virtual_available", "incoming_qty", "outgoing_qty"], context ) if product_data: product_info = product_data[0] availability[product_id] = { "name": product_names.get(product_id, f"Producto {product_id}"), "qty_available": product_info["qty_available"], "virtual_available": product_info["virtual_available"], "incoming_qty": product_info["incoming_qty"], "outgoing_qty": product_info["outgoing_qty"] } else: availability[product_id] = { "name": product_names.get(product_id, f"Producto {product_id}"), "error": "Producto no encontrado" } except Exception as e: availability[product_id] = { "name": product_names.get(product_id, f"Producto {product_id}"), "error": str(e) } # Obtener información de la ubicación si se especificó location_info = None if location_id: try: location_data = odoo.search_read( "stock.location", [("id", "=", location_id)], fields=["name", "complete_name"] ) if location_data: location_info = location_data[0] except Exception: location_info = {"id": location_id, "name": "Ubicación desconocida"} return { "success": True, "result": { "products": availability, "location": location_info } } except Exception as e: return {"success": False, "error": str(e)} @mcp.tool(description="Crea un ajuste de inventario para corregir el stock") def create_inventory_adjustment( name: str, adjustment_lines: List[Dict[str, Any]], date: Optional[str] = None ) -> Dict[str, Any]: """ Crea un ajuste de inventario para corregir el stock Args: name: Nombre o descripción del ajuste adjustment_lines: Lista de líneas de ajuste. Cada línea debe tener: - product_id (int): ID del producto - location_id (int): ID de la ubicación - product_qty (float): Cantidad contada real date: Fecha del ajuste en formato YYYY-MM-DD (opcional) Returns: Respuesta con el resultado de la operación """ odoo = _get_odoo() try: # Verificar la versión de Odoo para determinar el modelo correcto # En Odoo 13.0+, se usa 'stock.inventory' # En Odoo 15.0+, se usa 'stock.quant' directamente # Intentar obtener el modelo stock.inventory inventory_model_exists = odoo.execute_method( "ir.model", "search_count", [("model", "=", "stock.inventory")] ) > 0 if inventory_model_exists: # Usar el flujo de stock.inventory (Odoo 13.0, 14.0) # Crear el inventario inventory_vals = { "name": name, "line_ids": [] } if date: try: datetime.strptime(date, "%Y-%m-%d") inventory_vals["date"] = date except ValueError: return {"success": False, "error": f"Formato de fecha inválido: {date}. Use YYYY-MM-DD."} # Crear el inventario inventory_id = odoo.execute_method("stock.inventory", "create", inventory_vals) # Añadir líneas al inventario for line in adjustment_lines: if not isinstance(line, dict): return {"success": False, "error": "Cada línea debe ser un diccionario"} if "product_id" not in line or "location_id" not in line or "product_qty" not in line: return {"success": False, "error": "Cada línea debe contener product_id, location_id y product_qty"} line_vals = { "inventory_id": inventory_id, "product_id": line["product_id"], "location_id": line["location_id"], "product_qty": line["product_qty"] } odoo.execute_method("stock.inventory.line", "create", line_vals) # Confirmar el inventario odoo.execute_method("stock.inventory", "action_validate", [inventory_id]) return { "success": True, "result": { "inventory_id": inventory_id, "name": name } } else: # Usar el flujo de stock.quant (Odoo 15.0+) result_ids = [] for line in adjustment_lines: if not isinstance(line, dict): return {"success": False, "error": "Cada línea debe ser un diccionario"} if "product_id" not in line or "location_id" not in line or "product_qty" not in line: return {"success": False, "error": "Cada línea debe contener product_id, location_id y product_qty"} # Buscar el quant existente quant_domain = [ ("product_id", "=", line["product_id"]), ("location_id", "=", line["location_id"]) ] quants = odoo.search_read( "stock.quant", quant_domain, fields=["id", "quantity"] ) if quants: # Actualizar quant existente quant_id = quants[0]["id"] odoo.execute_method( "stock.quant", "write", [quant_id], {"inventory_quantity": line["product_qty"]} ) result_ids.append(quant_id) else: # Crear nuevo quant quant_vals = { "product_id": line["product_id"], "location_id": line["location_id"], "inventory_quantity": line["product_qty"] } quant_id = odoo.execute_method("stock.quant", "create", quant_vals) result_ids.append(quant_id) # Aplicar el inventario odoo.execute_method("stock.quant", "action_apply_inventory", result_ids) return { "success": True, "result": { "quant_ids": result_ids, "name": name } } except Exception as e: return {"success": False, "error": str(e)} @mcp.tool(description="Calcula y analiza la rotación de inventario") def analyze_inventory_turnover( date_from: str, date_to: str, product_ids: Optional[List[int]] = None, category_id: Optional[int] = None ) -> Dict[str, Any]: """ Calcula y analiza la rotación de inventario Args: date_from: Fecha inicial en formato YYYY-MM-DD date_to: Fecha final en formato YYYY-MM-DD product_ids: Lista de IDs de productos a analizar (opcional) category_id: ID de categoría de producto para filtrar (opcional) Returns: Diccionario con resultados del análisis """ odoo = _get_odoo() try: # Validar fechas try: date_from_dt = datetime.strptime(date_from, "%Y-%m-%d") date_to_dt = datetime.strptime(date_to, "%Y-%m-%d") except ValueError: return {"success": False, "error": "Formato de fecha inválido. Use YYYY-MM-DD."} # Construir dominio para productos product_domain = [("type", "=", "product")] # Solo productos almacenables if product_ids: product_domain.append(("id", "in", product_ids)) if category_id: product_domain.append(("categ_id", "=", category_id)) # Obtener productos products = odoo.search_read( "product.product", product_domain, fields=["name", "default_code", "categ_id", "standard_price"] ) if not products: return {"success": False, "error": "No se encontraron productos con los criterios especificados"} # Calcular rotación para cada producto product_turnover = {} for product in products: product_id = product["id"] # 1. Obtener movimientos de salida (ventas) en el período outgoing_domain = [ ("product_id", "=", product_id), ("date", ">=", date_from), ("date", "<=", date_to), ("location_dest_id.usage", "=", "customer") # Destino: cliente ] outgoing_moves = odoo.search_read( "stock.move", outgoing_domain, fields=["product_uom_qty", "price_unit"] ) # Calcular costo de ventas cogs = sum(move["product_uom_qty"] * (move.get("price_unit") or product["standard_price"]) for move in outgoing_moves) # 2. Obtener valor de inventario promedio # Intentar obtener valoración de inventario al inicio y fin del período # Método 1: Usar informes de valoración si están disponibles try: # Valoración al inicio del período context_start = { "to_date": date_from } valuation_start = odoo.execute_method( "product.product", "read", [product_id], ["stock_value"], context_start ) # Valoración al final del período context_end = { "to_date": date_to } valuation_end = odoo.execute_method( "product.product", "read", [product_id], ["stock_value"], context_end ) start_value = valuation_start[0]["stock_value"] if valuation_start else 0 end_value = valuation_end[0]["stock_value"] if valuation_end else 0 avg_inventory_value = (start_value + end_value) / 2 except Exception: # Método 2: Estimación basada en precio estándar y cantidad # Obtener cantidad al inicio context_start = { "to_date": date_from } qty_start = odoo.execute_method( "product.product", "read", [product_id], ["qty_available"], context_start ) # Obtener cantidad al final context_end = { "to_date": date_to } qty_end = odoo.execute_method( "product.product", "read", [product_id], ["qty_available"], context_end ) start_qty = qty_start[0]["qty_available"] if qty_start else 0 end_qty = qty_end[0]["qty_available"] if qty_end else 0 avg_qty = (start_qty + end_qty) / 2 avg_inventory_value = avg_qty * product["standard_price"] # 3. Calcular métricas de rotación turnover_ratio = 0 days_inventory = 0 if avg_inventory_value > 0: turnover_ratio = cogs / avg_inventory_value # Días de inventario (basado en el período analizado) days_in_period = (date_to_dt - date_from_dt).days + 1 if turnover_ratio > 0: days_inventory = days_in_period / turnover_ratio # Guardar resultados product_turnover[product_id] = { "name": product["name"], "default_code": product["default_code"], "category": product["categ_id"][1] if product["categ_id"] else "Sin categoría", "cogs": cogs, "avg_inventory_value": avg_inventory_value, "turnover_ratio": turnover_ratio, "days_inventory": days_inventory } # Ordenar productos por rotación (de mayor a menor) sorted_products = sorted( product_turnover.items(), key=lambda x: x[1]["turnover_ratio"], reverse=True ) # Calcular promedios generales total_cogs = sum(data["cogs"] for _, data in product_turnover.items()) total_avg_value = sum(data["avg_inventory_value"] for _, data in product_turnover.items()) overall_turnover = 0 overall_days = 0 if total_avg_value > 0: overall_turnover = total_cogs / total_avg_value days_in_period = (date_to_dt - date_from_dt).days + 1 if overall_turnover > 0: overall_days = days_in_period / overall_turnover # Preparar resultado result = { "period": { "from": date_from, "to": date_to, "days": (date_to_dt - date_from_dt).days + 1 }, "summary": { "product_count": len(products), "total_cogs": total_cogs, "total_avg_inventory_value": total_avg_value, "overall_turnover_ratio": overall_turnover, "overall_days_inventory": overall_days }, "products": [ {"id": k, **v} for k, v in sorted_products ] } return {"success": True, "result": result} except Exception as e: return {"success": False, "error": str(e)}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/golfamigo/odooMcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server