import asyncio
import os
import email
import re
from email.header import decode_header
import imaplib
from datetime import datetime
from dotenv import load_dotenv
from pydantic import BaseModel
from langchain_openai import ChatOpenAI
from langchain_mcp_adapters.client import MultiServerMCPClient
load_dotenv()
TARGET_EMAIL = os.getenv("EMAIL_USER")
EMAIL_PASS = os.getenv("EMAIL_PASS")
IMAP_SERVER = "imap.gmail.com"
MCP_TOKEN = os.getenv("MCP_TOKEN", "NGgba3XYHcwzQpKv9wcC_yor6pfaYSDaBfvxvwxP51k")
class EmailOutput(BaseModel):
subject: str
body: str
def extract_name_from_email(from_header: str) -> tuple:
"""
Extrae el nombre y email del campo From.
Ejemplo: "Joel Garcia <joelg@fvdigital.do>" -> ("Joel Garcia", "joelg@fvdigital.do")
Si no hay nombre, intenta extraerlo del email.
"""
name, email_addr = email.utils.parseaddr(from_header)
# Decodificar el nombre si está codificado
if name:
decoded_parts = decode_header(name)
decoded_name = ""
for part, encoding in decoded_parts:
if isinstance(part, bytes):
decoded_name += part.decode(encoding or "utf-8", errors="ignore")
else:
decoded_name += part
name = decoded_name.strip()
# Si no hay nombre, intentar extraerlo del email
if not name and email_addr:
# Tomar la parte antes del @ y capitalizar
local_part = email_addr.split("@")[0]
# Remover números y caracteres especiales, separar por puntos o guiones
cleaned = re.sub(r'[0-9_]', '', local_part)
cleaned = re.sub(r'[.\-]', ' ', cleaned)
# Capitalizar cada palabra
name = cleaned.title().strip()
# Si el nombre tiene varias palabras, usar solo la primera (nombre de pila)
first_name = name.split()[0] if name else "Estimado/a"
return first_name, email_addr
async def handle_email(client, from_name: str, from_email: str, message_body: str, is_fastmcp_native=False):
print(f"--- PROCESANDO CORREO ---")
print(f"De: {from_name} <{from_email}>")
try:
# Manejar cliente nativo de FastMCP vs langchain
if is_fastmcp_native:
# Cliente nativo de FastMCP
tools_result = await client.list_tools()
send_email_tool = next((t for t in tools_result.tools if t.name == "send_email"), None)
if not send_email_tool:
print("Error: No se encontro la herramienta 'send_email'")
return
# Obtener prompt
prompts_result = await client.list_prompts()
email_context_prompt = next((p for p in prompts_result.prompts if p.name == "email_context"), None)
if not email_context_prompt:
print("Error: No se encontro el prompt 'email_context'")
return
# Obtener el contenido del prompt
prompt_result = await client.get_prompt("email_context", {})
context_text = "\n".join([m.content for m in prompt_result.messages])
else:
# Cliente langchain
tools = await client.get_tools()
send_email = next((t for t in tools if t.name == "send_email"), None)
if not send_email:
print("Error: No se encontro la herramienta 'send_email'")
return
context_messages = await client.get_prompt(
server_name="fgj-server",
prompt_name="email_context"
)
context_text = "\n".join([m.content for m in context_messages])
products = [
{"name": "Monitoreo Digital", "description": "Seguimiento 24/7 de marcas en redes sociales."},
{"name": "Publicidad Exterior", "description": "Vallas y pantallas LED en puntos estrategicos."},
{"name": "Produccion Audiovisual", "description": "Videos corporativos, comerciales y contenido para redes."},
{"name": "Desarrollo Web", "description": "Sitios web, aplicaciones y sistemas a medida."}
]
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.4)
prompt = f"""
{context_text}
NOMBRE DEL CLIENTE: {from_name}
EMAIL DEL CLIENTE: {from_email}
Mensaje del cliente:
{message_body}
Servicios disponibles:
{products}
Redacta un correo de respuesta institucional.
- Inicia con "Estimado/a {from_name}," o "Hola {from_name},"
- Responde a su consulta de forma profesional
- Incluye la firma de Belgica Jimenez al final
Devuelve SOLO JSON con:
- subject
- body (HTML bien formateado)
"""
result: EmailOutput = await llm.with_structured_output(
EmailOutput
).ainvoke(prompt)
# Enviar correo según el tipo de cliente
if is_fastmcp_native:
await client.call_tool("send_email", {
"to": from_email,
"subject": result.subject,
"body": result.body
})
else:
await send_email.ainvoke({
"to": from_email,
"subject": result.subject,
"body": result.body
})
print(f"Respuesta enviada a {from_name} ({from_email})")
except Exception as e:
print(f"Error en handle_email: {e}")
def check_emails_sync():
"""Revisa correos usando imaplib estandar (sincrono)"""
emails_to_process = []
try:
mail = imaplib.IMAP4_SSL(IMAP_SERVER)
mail.login(TARGET_EMAIL, EMAIL_PASS)
mail.select("INBOX")
# Buscar correos de HOY que no esten leidos
today = datetime.now().strftime("%d-%b-%Y")
search_criteria = f'(UNSEEN SINCE {today})'
status, messages = mail.search(None, search_criteria)
if status != "OK":
mail.logout()
return []
mail_ids = messages[0].split()
if not mail_ids:
print("No hay correos nuevos de hoy.")
mail.logout()
return []
# Tomar solo los ultimos 5
recent_ids = mail_ids[-5:]
print(f"Encontrados {len(mail_ids)} correos de hoy. Procesando {len(recent_ids)}...")
for m_id in recent_ids:
status, msg_data = mail.fetch(m_id, "(RFC822)")
if status != "OK":
continue
raw_email = msg_data[0][1]
msg = email.message_from_bytes(raw_email)
# Extraer nombre y email del remitente
from_header = msg["From"] or ""
from_name, from_email_addr = extract_name_from_email(from_header)
subject = msg["Subject"]
# Decodificar asunto si esta codificado
if subject:
decoded_parts = decode_header(subject)
subject = ""
for part, encoding in decoded_parts:
if isinstance(part, bytes):
subject += part.decode(encoding or "utf-8", errors="ignore")
else:
subject += part
body = ""
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
payload = part.get_payload(decode=True)
if payload:
body = payload.decode(errors='ignore')
break
else:
payload = msg.get_payload(decode=True)
if payload:
body = payload.decode(errors='ignore')
emails_to_process.append({
"id": m_id,
"from_name": from_name,
"from_email": from_email_addr,
"subject": subject,
"body": body
})
# Marcar como leido
mail.store(m_id, "+FLAGS", "\\Seen")
mail.logout()
except Exception as e:
print(f"Error en check_emails_sync: {e}")
return emails_to_process
async def listen_emails():
print(f"--- SISTEMA INICIADO ---")
print(f"Escuchando en {TARGET_EMAIL}...")
# Usar FastMCP Cloud en producción, localhost en desarrollo
MCP_URL = os.getenv("MCP_URL", "https://vocal-blue-hawk.fastmcp.app/mcp")
# Usar Bearer token para ambos (local y FastMCP Cloud)
# El servidor usa StaticTokenVerifier que acepta Bearer tokens
client = MultiServerMCPClient({
"fgj-server": {
"transport": "streamable_http",
"url": MCP_URL,
"headers": {
"Authorization": f"Bearer {MCP_TOKEN}"
}
}
})
is_fastmcp_cloud = "fastmcp.app" in MCP_URL
while True:
try:
emails = await asyncio.to_thread(check_emails_sync)
for mail_data in emails:
print(f"Nuevo correo: {mail_data['subject']} (de {mail_data['from_name']})")
await handle_email(
client,
mail_data["from_name"],
mail_data["from_email"],
mail_data["body"],
is_fastmcp_native=False # Usamos langchain siempre ahora
)
except Exception as e:
print(f"Error en el bucle principal: {e}")
print("Esperando 20 segundos...")
await asyncio.sleep(20)
if __name__ == "__main__":
print("Iniciando bot de correos FGJ Multimedios...")
try:
asyncio.run(listen_emails())
except KeyboardInterrupt:
print("\nCerrando...")