# odoo_mcp_api.py - VERSIÓN COMPLETA Y CORREGIDA v1.0.1
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import xmlrpc.client
import json
from datetime import datetime, timedelta
from typing import Optional, List
import os
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(
title="Odoo MCP Server for Claude",
description="Marketing & Sales Manager AI - Odoo Data Access",
version="1.0.1"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class OdooConnector:
def __init__(self):
self.url = os.getenv('ODOO_URL')
self.db = os.getenv('ODOO_DB')
self.username = os.getenv('ODOO_USERNAME')
self.api_key = os.getenv('ODOO_API_KEY')
self.uid = None
self.models = None
logger.info(f"🔧 Initializing Odoo connector for {self.url}")
def authenticate(self):
try:
common = xmlrpc.client.ServerProxy(f'{self.url}/xmlrpc/2/common')
self.uid = common.authenticate(self.db, self.username, self.api_key, {})
self.models = xmlrpc.client.ServerProxy(f'{self.url}/xmlrpc/2/object')
logger.info(f"✅ Authenticated with Odoo - UID: {self.uid}")
return self.uid
except Exception as e:
logger.error(f"❌ Authentication failed: {e}")
raise
def execute_kw(self, model: str, method: str, args: list, kwargs: dict = None):
if not self.uid:
self.authenticate()
try:
result = self.models.execute_kw(
self.db, self.uid, self.api_key,
model, method, args, kwargs or {}
)
logger.info(f"📊 Executed {model}.{method} - Returned {len(result) if isinstance(result, list) else 1} records")
return result
except Exception as e:
logger.error(f"❌ Error executing {model}.{method}: {e}")
raise
# Inicializar conector global
odoo = OdooConnector()
# ==================== MODELOS PYDANTIC ====================
class SalesDataRequest(BaseModel):
days_back: int = 30
state: Optional[str] = None
partner_ids: Optional[List[int]] = None
min_amount: Optional[float] = None
class CustomerInsightsRequest(BaseModel):
segment: str = "all" # all, vip, at_risk, new, inactive, regular
min_purchases: Optional[int] = None
min_revenue: Optional[float] = None
class OpportunitiesRequest(BaseModel):
stage: Optional[str] = None
min_probability: Optional[int] = None
days_inactive: Optional[int] = None
class ProductPerformanceRequest(BaseModel):
days_back: int = 90
top_n: int = 20
class CustomerSearchRequest(BaseModel):
query: str
limit: int = 10
# ==================== ENDPOINTS ====================
@app.get("/")
async def root():
return {
"service": "Odoo MCP Server for Claude",
"status": "running",
"description": "Marketing & Sales Manager AI - Intelligent Odoo Data Access",
"version": "1.0.1",
"endpoints": {
"health": "GET /health - Check server health and Odoo connection",
"tools": "GET /tools - List all available tools",
"sales": "POST /get_sales_data - Get sales orders with filters",
"customers": "POST /get_customer_insights - Customer segmentation (RFM analysis)",
"opportunities": "POST /get_crm_opportunities - CRM pipeline data",
"products": "POST /get_product_performance - Product sales performance",
"team": "POST /get_sales_team_performance - Sales team metrics",
"search": "POST /search_customers - Search customers by name/email/phone",
"comprehensive": "POST /get_comprehensive_data - All data for complete analysis"
}
}
@app.get("/health")
async def health():
"""Health check endpoint"""
try:
if not odoo.uid:
odoo.authenticate()
return {
"status": "healthy",
"odoo_connected": True,
"odoo_uid": odoo.uid,
"odoo_url": odoo.url,
"odoo_db": odoo.db,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
return {
"status": "unhealthy",
"odoo_connected": False,
"error": str(e),
"timestamp": datetime.now().isoformat()
}
@app.get("/tools")
async def list_tools():
"""Lista todas las herramientas disponibles para Claude"""
return {
"tools": [
{
"name": "get_sales_data",
"description": "Obtiene datos de ventas con filtros opcionales",
"parameters": ["days_back", "state", "partner_ids", "min_amount"]
},
{
"name": "get_customer_insights",
"description": "Analiza comportamiento y segmentación de clientes (RFM)",
"parameters": ["segment", "min_purchases", "min_revenue"]
},
{
"name": "get_crm_opportunities",
"description": "Obtiene oportunidades del pipeline de ventas",
"parameters": ["stage", "min_probability", "days_inactive"]
},
{
"name": "get_product_performance",
"description": "Analiza rendimiento de productos por ventas",
"parameters": ["days_back", "top_n"]
},
{
"name": "get_sales_team_performance",
"description": "Métricas de rendimiento del equipo de ventas",
"parameters": ["days_back"]
},
{
"name": "search_customers",
"description": "Busca clientes por nombre, email o teléfono",
"parameters": ["query", "limit"]
},
{
"name": "get_comprehensive_data",
"description": "Obtiene todos los datos necesarios para análisis completo",
"parameters": ["days_back"]
}
]
}
@app.post("/get_sales_data")
async def get_sales_data(request: SalesDataRequest):
"""Obtiene datos de ventas de Odoo con filtros opcionales"""
try:
date_from = (datetime.now() - timedelta(days=request.days_back)).strftime('%Y-%m-%d')
filters = [['date_order', '>=', date_from]]
if request.state:
filters.append(['state', '=', request.state])
if request.partner_ids:
filters.append(['partner_id', 'in', request.partner_ids])
if request.min_amount:
filters.append(['amount_total', '>=', request.min_amount])
sales = odoo.execute_kw('sale.order', 'search_read', [filters], {
'fields': ['name', 'partner_id', 'date_order', 'amount_total',
'state', 'user_id', 'team_id'],
'order': 'date_order desc',
'limit': 1000
})
# Calcular estadísticas
total_revenue = sum(s.get('amount_total', 0) for s in sales)
avg_order = total_revenue / len(sales) if sales else 0
return {
"success": True,
"count": len(sales),
"data": sales,
"summary": {
"total_revenue": round(total_revenue, 2),
"avg_order_value": round(avg_order, 2),
"period_days": request.days_back,
"date_from": date_from,
"date_to": datetime.now().strftime('%Y-%m-%d')
}
}
except Exception as e:
logger.error(f"Error in get_sales_data: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.post("/get_customer_insights")
async def get_customer_insights(request: CustomerInsightsRequest):
"""Analiza comportamiento y segmentación RFM de clientes"""
try:
partners = odoo.execute_kw('res.partner', 'search_read',
[[['customer_rank', '>', 0]]], {
'fields': ['name', 'email', 'phone', 'create_date'],
'limit': 1000
})
insights = []
for partner in partners:
# Obtener órdenes del cliente
orders = odoo.execute_kw('sale.order', 'search_read',
[[['partner_id', '=', partner['id']], ['state', 'in', ['sale', 'done']]]], {
'fields': ['date_order', 'amount_total']
})
if orders:
total_revenue = sum(o['amount_total'] for o in orders)
num_purchases = len(orders)
last_order_date = max(o['date_order'] for o in orders)
days_since_last = (datetime.now() - datetime.strptime(last_order_date[:10], '%Y-%m-%d')).days
# Segmentación RFM
if total_revenue > 10000 and num_purchases > 5:
segment = "vip"
elif days_since_last > 180 and num_purchases > 2:
segment = "at_risk"
elif num_purchases == 1 and days_since_last < 30:
segment = "new"
elif days_since_last > 365:
segment = "inactive"
else:
segment = "regular"
# Filtrar por segmento solicitado
if request.segment == "all" or request.segment == segment:
insight = {
'partner_id': partner['id'],
'name': partner['name'],
'email': partner['email'],
'phone': partner['phone'],
'total_revenue': round(total_revenue, 2),
'num_purchases': num_purchases,
'avg_order_value': round(total_revenue / num_purchases, 2),
'last_order_date': last_order_date,
'days_since_last': days_since_last,
'segment': segment,
'customer_since': partner['create_date'],
'ltv_score': round(total_revenue * (1 - min(days_since_last / 365, 1)), 2)
}
# Aplicar filtros adicionales
if request.min_purchases and insight['num_purchases'] < request.min_purchases:
continue
if request.min_revenue and insight['total_revenue'] < request.min_revenue:
continue
insights.append(insight)
# Ordenar por revenue
insights.sort(key=lambda x: x['total_revenue'], reverse=True)
return {
"success": True,
"count": len(insights),
"data": insights[:100], # Limitar a top 100
"summary": {
"segments": {
"vip": len([c for c in insights if c['segment'] == 'vip']),
"regular": len([c for c in insights if c['segment'] == 'regular']),
"at_risk": len([c for c in insights if c['segment'] == 'at_risk']),
"new": len([c for c in insights if c['segment'] == 'new']),
"inactive": len([c for c in insights if c['segment'] == 'inactive'])
},
"total_revenue": round(sum(c['total_revenue'] for c in insights), 2),
"avg_revenue_per_customer": round(sum(c['total_revenue'] for c in insights) / len(insights), 2) if insights else 0
}
}
except Exception as e:
logger.error(f"Error in get_customer_insights: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.post("/get_crm_opportunities")
async def get_crm_opportunities(request: OpportunitiesRequest):
"""Obtiene y analiza oportunidades del CRM"""
try:
filters = []
if request.stage:
filters.append(['stage_id.name', '=', request.stage])
if request.min_probability is not None:
filters.append(['probability', '>=', request.min_probability])
if request.days_inactive:
date_limit = (datetime.now() - timedelta(days=request.days_inactive)).strftime('%Y-%m-%d')
filters.append(['write_date', '<', date_limit])
opportunities = odoo.execute_kw('crm.lead', 'search_read', [filters], {
'fields': ['name', 'partner_id', 'expected_revenue', 'probability',
'stage_id', 'user_id', 'team_id', 'date_deadline',
'create_date', 'write_date'],
'order': 'expected_revenue desc',
'limit': 500
})
# Calcular métricas del pipeline
total_pipeline = sum(o.get('expected_revenue', 0) or 0 for o in opportunities)
weighted_pipeline = sum(
(o.get('expected_revenue', 0) or 0) * (o.get('probability', 0) or 0) / 100
for o in opportunities
)
return {
"success": True,
"count": len(opportunities),
"data": opportunities,
"pipeline_metrics": {
"total_opportunities": len(opportunities),
"total_pipeline_value": round(total_pipeline, 2),
"weighted_pipeline_value": round(weighted_pipeline, 2),
"avg_deal_size": round(total_pipeline / len(opportunities), 2) if opportunities else 0,
"avg_probability": round(sum(o.get('probability', 0) or 0 for o in opportunities) / len(opportunities), 2) if opportunities else 0
}
}
except Exception as e:
logger.error(f"Error in get_crm_opportunities: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.post("/get_product_performance")
async def get_product_performance(request: ProductPerformanceRequest):
"""Analiza rendimiento de productos - VERSIÓN CORREGIDA"""
try:
date_from = (datetime.now() - timedelta(days=request.days_back)).strftime('%Y-%m-%d')
order_lines = odoo.execute_kw('sale.order.line', 'search_read',
[[['order_id.date_order', '>=', date_from], ['order_id.state', 'in', ['sale', 'done']]]], {
'fields': ['product_id', 'product_uom_qty', 'price_subtotal'],
'limit': 5000
})
product_stats = {}
skipped_lines = 0
for line in order_lines:
# FIX: Verificar que product_id no sea False o None
if not line.get('product_id') or line['product_id'] is False:
skipped_lines += 1
continue
prod_id = line['product_id'][0]
if prod_id not in product_stats:
product_stats[prod_id] = {
'product_name': line['product_id'][1],
'total_qty': 0,
'total_revenue': 0
}
product_stats[prod_id]['total_qty'] += line.get('product_uom_qty', 0)
product_stats[prod_id]['total_revenue'] += line.get('price_subtotal', 0)
if skipped_lines > 0:
logger.warning(f"Skipped {skipped_lines} order lines without product_id")
# Convertir a lista
performance = [
{
'product_id': pid,
'product_name': stats['product_name'],
'total_qty_sold': stats['total_qty'],
'total_revenue': round(stats['total_revenue'], 2)
}
for pid, stats in product_stats.items()
]
# Ordenar por revenue y limitar
performance.sort(key=lambda x: x['total_revenue'], reverse=True)
return {
"success": True,
"count": len(performance),
"data": performance[:request.top_n],
"summary": {
"total_products": len(performance),
"total_revenue": round(sum(p['total_revenue'] for p in performance), 2),
"period_days": request.days_back,
"skipped_lines": skipped_lines
}
}
except Exception as e:
logger.error(f"Error in get_product_performance: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.post("/get_sales_team_performance")
async def get_sales_team_performance(request: SalesDataRequest):
"""Analiza rendimiento del equipo de ventas"""
try:
date_from = (datetime.now() - timedelta(days=request.days_back)).strftime('%Y-%m-%d')
sales = odoo.execute_kw('sale.order', 'search_read',
[[['date_order', '>=', date_from], ['state', 'in', ['sale', 'done']]]], {
'fields': ['user_id', 'amount_total']
})
team_stats = {}
for sale in sales:
if sale.get('user_id'):
user_id = sale['user_id'][0]
if user_id not in team_stats:
team_stats[user_id] = {
'user_name': sale['user_id'][1],
'total_revenue': 0,
'num_deals': 0
}
team_stats[user_id]['total_revenue'] += sale['amount_total']
team_stats[user_id]['num_deals'] += 1
# Calcular métricas
performance = [
{
'user_id': uid,
'user_name': stats['user_name'],
'total_revenue': round(stats['total_revenue'], 2),
'num_deals': stats['num_deals'],
'avg_deal_size': round(stats['total_revenue'] / stats['num_deals'], 2)
}
for uid, stats in team_stats.items()
]
performance.sort(key=lambda x: x['total_revenue'], reverse=True)
return {
"success": True,
"count": len(performance),
"data": performance,
"team_summary": {
"total_revenue": round(sum(p['total_revenue'] for p in performance), 2),
"total_deals": sum(p['num_deals'] for p in performance),
"avg_deal_size": round(sum(p['total_revenue'] for p in performance) / sum(p['num_deals'] for p in performance), 2) if performance else 0,
"period_days": request.days_back
}
}
except Exception as e:
logger.error(f"Error in get_sales_team_performance: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.post("/search_customers")
async def search_customers(request: CustomerSearchRequest):
"""Busca clientes por nombre, email o teléfono"""
try:
filters = [
'|', '|', '|',
['name', 'ilike', request.query],
['email', 'ilike', request.query],
['phone', 'ilike', request.query],
['ref', 'ilike', request.query]
]
customers = odoo.execute_kw('res.partner', 'search_read', [filters], {
'fields': ['name', 'email', 'phone', 'city', 'country_id',
'customer_rank', 'sale_order_count'],
'limit': request.limit
})
return {
"success": True,
"count": len(customers),
"data": customers,
"query": request.query
}
except Exception as e:
logger.error(f"Error in search_customers: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.post("/get_comprehensive_data")
async def get_comprehensive_data(request: SalesDataRequest):
"""
Endpoint especial que obtiene TODOS los datos necesarios para análisis completo.
Ideal para que Claude haga análisis profundos con una sola llamada.
"""
try:
logger.info(f"📊 Getting comprehensive data for last {request.days_back} days")
# Preparar requests
sales_req = SalesDataRequest(days_back=request.days_back)
customer_req = CustomerInsightsRequest(segment="all")
opp_req = OpportunitiesRequest()
product_req = ProductPerformanceRequest(days_back=request.days_back)
# Obtener todos los datos
sales_data = await get_sales_data(sales_req)
customer_data = await get_customer_insights(customer_req)
opp_data = await get_crm_opportunities(opp_req)
product_data = await get_product_performance(product_req)
team_data = await get_sales_team_performance(sales_req)
return {
"success": True,
"period_days": request.days_back,
"generated_at": datetime.now().isoformat(),
"data": {
"sales": sales_data,
"customers": customer_data,
"opportunities": opp_data,
"products": product_data,
"team": team_data
},
"executive_summary": {
"total_revenue": sales_data["summary"]["total_revenue"],
"num_sales": sales_data["count"],
"total_customers": customer_data["count"],
"vip_customers": customer_data["summary"]["segments"]["vip"],
"at_risk_customers": customer_data["summary"]["segments"]["at_risk"],
"new_customers": customer_data["summary"]["segments"]["new"],
"pipeline_value": opp_data["pipeline_metrics"]["weighted_pipeline_value"],
"total_opportunities": opp_data["count"],
"top_product": product_data["data"][0]["product_name"] if product_data["data"] else "N/A",
"top_product_revenue": product_data["data"][0]["total_revenue"] if product_data["data"] else 0,
"team_size": team_data["count"],
"top_seller": team_data["data"][0]["user_name"] if team_data["data"] else "N/A"
}
}
except Exception as e:
logger.error(f"Error in get_comprehensive_data: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
# Autenticar al inicio
try:
odoo.authenticate()
except Exception as e:
logger.error(f"⚠️ Failed to authenticate on startup: {e}")
# Iniciar servidor
uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info")