test_simple.py•6.43 kB
#!/usr/bin/env python3
"""
Script de prueba simple basado en simulate_recomendation.py que funciona
Este script usa exactamente la misma lógica que funciona
"""
import os
import sys
import asyncio
from pathlib import Path
# Agregar el directorio src al path
sys.path.insert(0, str(Path(__file__).parent / 'src'))
from dotenv import load_dotenv
from src.config import config
from src.gemini import gemini_client
from src.supabase_client import supabase_client
async def test_simple_rag():
    """Prueba simple del flujo RAG usando la lógica que funciona"""
    
    print("="*60)
    print("TEST RAG - Usando lógica de simulate_recomendation.py")
    print("="*60)
    
    # Perfil de prueba (igual al de simulate_recomendation.py)
    profile = {
        "actividad": "Diseñador gráfico freelance",
        "ingresos_anuales": 450000,
        "empleados": 0,
        "metodos_pago": ["transferencia", "efectivo"],
        "estado": "Ciudad de México",
        "has_rfc": True,
        "has_efirma": True,
        "emite_cfdi": True,
        "declara_mensual": True
    }
    
    print(f"\n1. PERFIL:")
    print(f"   Actividad: {profile['actividad']}")
    print(f"   Ingresos: ${profile['ingresos_anuales']:,}")
    print(f"   Estado: {profile['estado']}")
    
    # Generar query semántica (igual a profile_to_query)
    parts = []
    parts.append(f"Actividad: {profile.get('actividad','')}")
    parts.append(f"Ingresos anuales estimados: {profile.get('ingresos_anuales','')}")
    parts.append(f"Número de empleados: {profile.get('empleados','')}")
    parts.append(f"Métodos de pago: {', '.join(profile.get('metodos_pago', []))}")
    parts.append(f"Entidad federativa: {profile.get('estado','')}")
    parts.append(f"¿Tiene RFC?: {profile.get('has_rfc')}")
    parts.append(f"¿Tiene e.firma?: {profile.get('has_efirma')}")
    parts.append(f"¿Emite CFDI?: {profile.get('emite_cfdi')}")
    parts.append(f"¿Presenta declaraciones mensuales?: {profile.get('declara_mensual')}")
    
    query = ("Perfil fiscal. Necesito sugerir régimen, pasos de formalización, "
             "obligaciones y calendario básico, citando fuentes del SAT: \n" + "\n".join(parts))
    
    print(f"\n2. QUERY GENERADA:")
    print(f"   {query[:150]}...")
    
    # Generar embedding
    print(f"\n3. GENERANDO EMBEDDING...")
    print(f"   Modelo: {config.GEMINI_EMBED_MODEL}")
    print(f"   Dimensión: {config.EMBED_DIM}")
    
    try:
        embedding = await gemini_client.generate_embedding(query)
        print(f"   ✅ Embedding generado: {len(embedding)} dimensiones")
        print(f"   Primeros 5 valores: {embedding[:5]}")
    except Exception as e:
        print(f"   ❌ Error: {e}")
        return
    
    # Buscar documentos
    print(f"\n4. BUSCANDO DOCUMENTOS EN SUPABASE...")
    print(f"   Threshold: {config.SIMILARITY_THRESHOLD}")
    print(f"   Top-K: {config.TOPK_DOCUMENTS}")
    
    try:
        documents = await supabase_client.search_similar_documents(
            embedding,
            limit=config.TOPK_DOCUMENTS,
            threshold=config.SIMILARITY_THRESHOLD
        )
        
        if documents:
            print(f"   ✅ Encontrados {len(documents)} documentos")
            for i, doc in enumerate(documents, 1):
                print(f"\n   Documento {i}:")
                print(f"   - Título: {doc.get('title', 'N/A')}")
                print(f"   - Scope: {doc.get('scope', 'N/A')}")
                print(f"   - Similitud: {doc.get('similarity', 0):.4f}")
                print(f"   - URL: {doc.get('source_url', 'N/A')[:50]}...")
        else:
            print(f"   ⚠️  No se encontraron documentos")
            print(f"\n   DIAGNÓSTICO:")
            print(f"   - Verifica que la tabla fiscai_documents tenga datos")
            print(f"   - Ejecuta: SELECT COUNT(*) FROM fiscai_documents;")
            print(f"   - Si está vacía, necesitas insertar documentos primero")
            return
            
    except Exception as e:
        print(f"   ❌ Error: {e}")
        import traceback
        traceback.print_exc()
        return
    
    # Construir contexto (igual a build_context)
    print(f"\n5. CONSTRUYENDO CONTEXTO...")
    blocks = []
    for i, r in enumerate(documents, start=1):
        title = r.get("title","")
        scope = r.get("scope","")
        url   = r.get("source_url","")
        txt   = r.get("content","")
        blocks.append(f"[{i}] {title} — {scope}\nFuente: {url}\n{txt}")
    
    context = "\n\n".join(blocks)
    print(f"   ✅ Contexto construido: {len(context)} caracteres")
    
    # Generar recomendación
    print(f"\n6. GENERANDO RECOMENDACIÓN CON GEMINI...")
    print(f"   Modelo: {config.GEMINI_MODEL}")
    
    try:
        recommendation = await gemini_client.generate_recommendation(profile, context)
        
        print(f"\n{'='*60}")
        print("RECOMENDACIÓN GENERADA:")
        print('='*60)
        print(recommendation)
        print('='*60)
        
        print(f"\n✅ ÉXITO:")
        print(f"   - Longitud: {len(recommendation)} caracteres")
        print(f"   - Fuentes utilizadas: {len(documents)}")
        print(f"   - Modelo generación: {config.GEMINI_MODEL}")
        print(f"   - Modelo embedding: {config.GEMINI_EMBED_MODEL}")
        
        return {
            'success': True,
            'recommendation': recommendation,
            'sources': documents,
            'matches_count': len(documents)
        }
        
    except Exception as e:
        print(f"   ❌ Error: {e}")
        import traceback
        traceback.print_exc()
        return None
async def main():
    load_dotenv()
    
    print("\n" + "="*60)
    print("CONFIGURACIÓN:")
    print("="*60)
    print(f"SUPABASE_URL: {config.SUPABASE_URL[:30]}...")
    print(f"GEMINI_MODEL: {config.GEMINI_MODEL}")
    print(f"GEMINI_EMBED_MODEL: {config.GEMINI_EMBED_MODEL}")
    print(f"EMBED_DIM: {config.EMBED_DIM}")
    print(f"SIMILARITY_THRESHOLD: {config.SIMILARITY_THRESHOLD}")
    print(f"TOPK_DOCUMENTS: {config.TOPK_DOCUMENTS}")
    print()
    
    result = await test_simple_rag()
    
    if result and result.get('success'):
        print("\n" + "="*60)
        print("✅ PRUEBA COMPLETADA EXITOSAMENTE")
        print("="*60)
    else:
        print("\n" + "="*60)
        print("❌ PRUEBA FALLÓ")
        print("="*60)
if __name__ == "__main__":
    asyncio.run(main())