Skip to main content
Glama
jariass2
by jariass2
odoo_mcp_api.py.backup22.7 kB
# odoo_mcp_api.py - VERSIÓN COMPLETA Y CORREGIDA v1.0.1 from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel import xmlrpc.client import json from datetime import datetime, timedelta from typing import Optional, List import os import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = FastAPI( title="Odoo MCP Server for Claude", description="Marketing & Sales Manager AI - Odoo Data Access", version="1.0.1" ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) class OdooConnector: def __init__(self): self.url = os.getenv('ODOO_URL') self.db = os.getenv('ODOO_DB') self.username = os.getenv('ODOO_USERNAME') self.api_key = os.getenv('ODOO_API_KEY') self.uid = None self.models = None logger.info(f"🔧 Initializing Odoo connector for {self.url}") def authenticate(self): try: common = xmlrpc.client.ServerProxy(f'{self.url}/xmlrpc/2/common') self.uid = common.authenticate(self.db, self.username, self.api_key, {}) self.models = xmlrpc.client.ServerProxy(f'{self.url}/xmlrpc/2/object') logger.info(f"✅ Authenticated with Odoo - UID: {self.uid}") return self.uid except Exception as e: logger.error(f"❌ Authentication failed: {e}") raise def execute_kw(self, model: str, method: str, args: list, kwargs: dict = None): if not self.uid: self.authenticate() try: result = self.models.execute_kw( self.db, self.uid, self.api_key, model, method, args, kwargs or {} ) logger.info(f"📊 Executed {model}.{method} - Returned {len(result) if isinstance(result, list) else 1} records") return result except Exception as e: logger.error(f"❌ Error executing {model}.{method}: {e}") raise # Inicializar conector global odoo = OdooConnector() # ==================== MODELOS PYDANTIC ==================== class SalesDataRequest(BaseModel): days_back: int = 30 state: Optional[str] = None partner_ids: Optional[List[int]] = None min_amount: Optional[float] = None class CustomerInsightsRequest(BaseModel): segment: str = "all" # all, vip, at_risk, new, inactive, regular min_purchases: Optional[int] = None min_revenue: Optional[float] = None class OpportunitiesRequest(BaseModel): stage: Optional[str] = None min_probability: Optional[int] = None days_inactive: Optional[int] = None class ProductPerformanceRequest(BaseModel): days_back: int = 90 top_n: int = 20 class CustomerSearchRequest(BaseModel): query: str limit: int = 10 # ==================== ENDPOINTS ==================== @app.get("/") async def root(): return { "service": "Odoo MCP Server for Claude", "status": "running", "description": "Marketing & Sales Manager AI - Intelligent Odoo Data Access", "version": "1.0.1", "endpoints": { "health": "GET /health - Check server health and Odoo connection", "tools": "GET /tools - List all available tools", "sales": "POST /get_sales_data - Get sales orders with filters", "customers": "POST /get_customer_insights - Customer segmentation (RFM analysis)", "opportunities": "POST /get_crm_opportunities - CRM pipeline data", "products": "POST /get_product_performance - Product sales performance", "team": "POST /get_sales_team_performance - Sales team metrics", "search": "POST /search_customers - Search customers by name/email/phone", "comprehensive": "POST /get_comprehensive_data - All data for complete analysis" } } @app.get("/health") async def health(): """Health check endpoint""" try: if not odoo.uid: odoo.authenticate() return { "status": "healthy", "odoo_connected": True, "odoo_uid": odoo.uid, "odoo_url": odoo.url, "odoo_db": odoo.db, "timestamp": datetime.now().isoformat() } except Exception as e: return { "status": "unhealthy", "odoo_connected": False, "error": str(e), "timestamp": datetime.now().isoformat() } @app.get("/tools") async def list_tools(): """Lista todas las herramientas disponibles para Claude""" return { "tools": [ { "name": "get_sales_data", "description": "Obtiene datos de ventas con filtros opcionales", "parameters": ["days_back", "state", "partner_ids", "min_amount"] }, { "name": "get_customer_insights", "description": "Analiza comportamiento y segmentación de clientes (RFM)", "parameters": ["segment", "min_purchases", "min_revenue"] }, { "name": "get_crm_opportunities", "description": "Obtiene oportunidades del pipeline de ventas", "parameters": ["stage", "min_probability", "days_inactive"] }, { "name": "get_product_performance", "description": "Analiza rendimiento de productos por ventas", "parameters": ["days_back", "top_n"] }, { "name": "get_sales_team_performance", "description": "Métricas de rendimiento del equipo de ventas", "parameters": ["days_back"] }, { "name": "search_customers", "description": "Busca clientes por nombre, email o teléfono", "parameters": ["query", "limit"] }, { "name": "get_comprehensive_data", "description": "Obtiene todos los datos necesarios para análisis completo", "parameters": ["days_back"] } ] } @app.post("/get_sales_data") async def get_sales_data(request: SalesDataRequest): """Obtiene datos de ventas de Odoo con filtros opcionales""" try: date_from = (datetime.now() - timedelta(days=request.days_back)).strftime('%Y-%m-%d') filters = [['date_order', '>=', date_from]] if request.state: filters.append(['state', '=', request.state]) if request.partner_ids: filters.append(['partner_id', 'in', request.partner_ids]) if request.min_amount: filters.append(['amount_total', '>=', request.min_amount]) sales = odoo.execute_kw('sale.order', 'search_read', [filters], { 'fields': ['name', 'partner_id', 'date_order', 'amount_total', 'state', 'user_id', 'team_id'], 'order': 'date_order desc', 'limit': 1000 }) # Calcular estadísticas total_revenue = sum(s.get('amount_total', 0) for s in sales) avg_order = total_revenue / len(sales) if sales else 0 return { "success": True, "count": len(sales), "data": sales, "summary": { "total_revenue": round(total_revenue, 2), "avg_order_value": round(avg_order, 2), "period_days": request.days_back, "date_from": date_from, "date_to": datetime.now().strftime('%Y-%m-%d') } } except Exception as e: logger.error(f"Error in get_sales_data: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @app.post("/get_customer_insights") async def get_customer_insights(request: CustomerInsightsRequest): """Analiza comportamiento y segmentación RFM de clientes""" try: partners = odoo.execute_kw('res.partner', 'search_read', [[['customer_rank', '>', 0]]], { 'fields': ['name', 'email', 'phone', 'create_date'], 'limit': 1000 }) insights = [] for partner in partners: # Obtener órdenes del cliente orders = odoo.execute_kw('sale.order', 'search_read', [[['partner_id', '=', partner['id']], ['state', 'in', ['sale', 'done']]]], { 'fields': ['date_order', 'amount_total'] }) if orders: total_revenue = sum(o['amount_total'] for o in orders) num_purchases = len(orders) last_order_date = max(o['date_order'] for o in orders) days_since_last = (datetime.now() - datetime.strptime(last_order_date[:10], '%Y-%m-%d')).days # Segmentación RFM if total_revenue > 10000 and num_purchases > 5: segment = "vip" elif days_since_last > 180 and num_purchases > 2: segment = "at_risk" elif num_purchases == 1 and days_since_last < 30: segment = "new" elif days_since_last > 365: segment = "inactive" else: segment = "regular" # Filtrar por segmento solicitado if request.segment == "all" or request.segment == segment: insight = { 'partner_id': partner['id'], 'name': partner['name'], 'email': partner['email'], 'phone': partner['phone'], 'total_revenue': round(total_revenue, 2), 'num_purchases': num_purchases, 'avg_order_value': round(total_revenue / num_purchases, 2), 'last_order_date': last_order_date, 'days_since_last': days_since_last, 'segment': segment, 'customer_since': partner['create_date'], 'ltv_score': round(total_revenue * (1 - min(days_since_last / 365, 1)), 2) } # Aplicar filtros adicionales if request.min_purchases and insight['num_purchases'] < request.min_purchases: continue if request.min_revenue and insight['total_revenue'] < request.min_revenue: continue insights.append(insight) # Ordenar por revenue insights.sort(key=lambda x: x['total_revenue'], reverse=True) return { "success": True, "count": len(insights), "data": insights[:100], # Limitar a top 100 "summary": { "segments": { "vip": len([c for c in insights if c['segment'] == 'vip']), "regular": len([c for c in insights if c['segment'] == 'regular']), "at_risk": len([c for c in insights if c['segment'] == 'at_risk']), "new": len([c for c in insights if c['segment'] == 'new']), "inactive": len([c for c in insights if c['segment'] == 'inactive']) }, "total_revenue": round(sum(c['total_revenue'] for c in insights), 2), "avg_revenue_per_customer": round(sum(c['total_revenue'] for c in insights) / len(insights), 2) if insights else 0 } } except Exception as e: logger.error(f"Error in get_customer_insights: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @app.post("/get_crm_opportunities") async def get_crm_opportunities(request: OpportunitiesRequest): """Obtiene y analiza oportunidades del CRM""" try: filters = [] if request.stage: filters.append(['stage_id.name', '=', request.stage]) if request.min_probability is not None: filters.append(['probability', '>=', request.min_probability]) if request.days_inactive: date_limit = (datetime.now() - timedelta(days=request.days_inactive)).strftime('%Y-%m-%d') filters.append(['write_date', '<', date_limit]) opportunities = odoo.execute_kw('crm.lead', 'search_read', [filters], { 'fields': ['name', 'partner_id', 'expected_revenue', 'probability', 'stage_id', 'user_id', 'team_id', 'date_deadline', 'create_date', 'write_date'], 'order': 'expected_revenue desc', 'limit': 500 }) # Calcular métricas del pipeline total_pipeline = sum(o.get('expected_revenue', 0) or 0 for o in opportunities) weighted_pipeline = sum( (o.get('expected_revenue', 0) or 0) * (o.get('probability', 0) or 0) / 100 for o in opportunities ) return { "success": True, "count": len(opportunities), "data": opportunities, "pipeline_metrics": { "total_opportunities": len(opportunities), "total_pipeline_value": round(total_pipeline, 2), "weighted_pipeline_value": round(weighted_pipeline, 2), "avg_deal_size": round(total_pipeline / len(opportunities), 2) if opportunities else 0, "avg_probability": round(sum(o.get('probability', 0) or 0 for o in opportunities) / len(opportunities), 2) if opportunities else 0 } } except Exception as e: logger.error(f"Error in get_crm_opportunities: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @app.post("/get_product_performance") async def get_product_performance(request: ProductPerformanceRequest): """Analiza rendimiento de productos - VERSIÓN CORREGIDA""" try: date_from = (datetime.now() - timedelta(days=request.days_back)).strftime('%Y-%m-%d') order_lines = odoo.execute_kw('sale.order.line', 'search_read', [[['order_id.date_order', '>=', date_from], ['order_id.state', 'in', ['sale', 'done']]]], { 'fields': ['product_id', 'product_uom_qty', 'price_subtotal'], 'limit': 5000 }) product_stats = {} skipped_lines = 0 for line in order_lines: # FIX: Verificar que product_id no sea False o None if not line.get('product_id') or line['product_id'] is False: skipped_lines += 1 continue prod_id = line['product_id'][0] if prod_id not in product_stats: product_stats[prod_id] = { 'product_name': line['product_id'][1], 'total_qty': 0, 'total_revenue': 0 } product_stats[prod_id]['total_qty'] += line.get('product_uom_qty', 0) product_stats[prod_id]['total_revenue'] += line.get('price_subtotal', 0) if skipped_lines > 0: logger.warning(f"Skipped {skipped_lines} order lines without product_id") # Convertir a lista performance = [ { 'product_id': pid, 'product_name': stats['product_name'], 'total_qty_sold': stats['total_qty'], 'total_revenue': round(stats['total_revenue'], 2) } for pid, stats in product_stats.items() ] # Ordenar por revenue y limitar performance.sort(key=lambda x: x['total_revenue'], reverse=True) return { "success": True, "count": len(performance), "data": performance[:request.top_n], "summary": { "total_products": len(performance), "total_revenue": round(sum(p['total_revenue'] for p in performance), 2), "period_days": request.days_back, "skipped_lines": skipped_lines } } except Exception as e: logger.error(f"Error in get_product_performance: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @app.post("/get_sales_team_performance") async def get_sales_team_performance(request: SalesDataRequest): """Analiza rendimiento del equipo de ventas""" try: date_from = (datetime.now() - timedelta(days=request.days_back)).strftime('%Y-%m-%d') sales = odoo.execute_kw('sale.order', 'search_read', [[['date_order', '>=', date_from], ['state', 'in', ['sale', 'done']]]], { 'fields': ['user_id', 'amount_total'] }) team_stats = {} for sale in sales: if sale.get('user_id'): user_id = sale['user_id'][0] if user_id not in team_stats: team_stats[user_id] = { 'user_name': sale['user_id'][1], 'total_revenue': 0, 'num_deals': 0 } team_stats[user_id]['total_revenue'] += sale['amount_total'] team_stats[user_id]['num_deals'] += 1 # Calcular métricas performance = [ { 'user_id': uid, 'user_name': stats['user_name'], 'total_revenue': round(stats['total_revenue'], 2), 'num_deals': stats['num_deals'], 'avg_deal_size': round(stats['total_revenue'] / stats['num_deals'], 2) } for uid, stats in team_stats.items() ] performance.sort(key=lambda x: x['total_revenue'], reverse=True) return { "success": True, "count": len(performance), "data": performance, "team_summary": { "total_revenue": round(sum(p['total_revenue'] for p in performance), 2), "total_deals": sum(p['num_deals'] for p in performance), "avg_deal_size": round(sum(p['total_revenue'] for p in performance) / sum(p['num_deals'] for p in performance), 2) if performance else 0, "period_days": request.days_back } } except Exception as e: logger.error(f"Error in get_sales_team_performance: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @app.post("/search_customers") async def search_customers(request: CustomerSearchRequest): """Busca clientes por nombre, email o teléfono""" try: filters = [ '|', '|', '|', ['name', 'ilike', request.query], ['email', 'ilike', request.query], ['phone', 'ilike', request.query], ['ref', 'ilike', request.query] ] customers = odoo.execute_kw('res.partner', 'search_read', [filters], { 'fields': ['name', 'email', 'phone', 'city', 'country_id', 'customer_rank', 'sale_order_count'], 'limit': request.limit }) return { "success": True, "count": len(customers), "data": customers, "query": request.query } except Exception as e: logger.error(f"Error in search_customers: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @app.post("/get_comprehensive_data") async def get_comprehensive_data(request: SalesDataRequest): """ Endpoint especial que obtiene TODOS los datos necesarios para análisis completo. Ideal para que Claude haga análisis profundos con una sola llamada. """ try: logger.info(f"📊 Getting comprehensive data for last {request.days_back} days") # Preparar requests sales_req = SalesDataRequest(days_back=request.days_back) customer_req = CustomerInsightsRequest(segment="all") opp_req = OpportunitiesRequest() product_req = ProductPerformanceRequest(days_back=request.days_back) # Obtener todos los datos sales_data = await get_sales_data(sales_req) customer_data = await get_customer_insights(customer_req) opp_data = await get_crm_opportunities(opp_req) product_data = await get_product_performance(product_req) team_data = await get_sales_team_performance(sales_req) return { "success": True, "period_days": request.days_back, "generated_at": datetime.now().isoformat(), "data": { "sales": sales_data, "customers": customer_data, "opportunities": opp_data, "products": product_data, "team": team_data }, "executive_summary": { "total_revenue": sales_data["summary"]["total_revenue"], "num_sales": sales_data["count"], "total_customers": customer_data["count"], "vip_customers": customer_data["summary"]["segments"]["vip"], "at_risk_customers": customer_data["summary"]["segments"]["at_risk"], "new_customers": customer_data["summary"]["segments"]["new"], "pipeline_value": opp_data["pipeline_metrics"]["weighted_pipeline_value"], "total_opportunities": opp_data["count"], "top_product": product_data["data"][0]["product_name"] if product_data["data"] else "N/A", "top_product_revenue": product_data["data"][0]["total_revenue"] if product_data["data"] else 0, "team_size": team_data["count"], "top_seller": team_data["data"][0]["user_name"] if team_data["data"] else "N/A" } } except Exception as e: logger.error(f"Error in get_comprehensive_data: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": import uvicorn # Autenticar al inicio try: odoo.authenticate() except Exception as e: logger.error(f"⚠️ Failed to authenticate on startup: {e}") # Iniciar servidor uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jariass2/odoo_mcp_server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server