Skip to main content
Glama
joelGuerraArias

FGJ Multimedios MCP Server

client.py9.8 kB
import asyncio import os import email import re from email.header import decode_header import imaplib from datetime import datetime from dotenv import load_dotenv from pydantic import BaseModel from langchain_openai import ChatOpenAI from langchain_mcp_adapters.client import MultiServerMCPClient load_dotenv() TARGET_EMAIL = os.getenv("EMAIL_USER") EMAIL_PASS = os.getenv("EMAIL_PASS") IMAP_SERVER = "imap.gmail.com" MCP_TOKEN = os.getenv("MCP_TOKEN", "NGgba3XYHcwzQpKv9wcC_yor6pfaYSDaBfvxvwxP51k") class EmailOutput(BaseModel): subject: str body: str def extract_name_from_email(from_header: str) -> tuple: """ Extrae el nombre y email del campo From. Ejemplo: "Joel Garcia <joelg@fvdigital.do>" -> ("Joel Garcia", "joelg@fvdigital.do") Si no hay nombre, intenta extraerlo del email. """ name, email_addr = email.utils.parseaddr(from_header) # Decodificar el nombre si está codificado if name: decoded_parts = decode_header(name) decoded_name = "" for part, encoding in decoded_parts: if isinstance(part, bytes): decoded_name += part.decode(encoding or "utf-8", errors="ignore") else: decoded_name += part name = decoded_name.strip() # Si no hay nombre, intentar extraerlo del email if not name and email_addr: # Tomar la parte antes del @ y capitalizar local_part = email_addr.split("@")[0] # Remover números y caracteres especiales, separar por puntos o guiones cleaned = re.sub(r'[0-9_]', '', local_part) cleaned = re.sub(r'[.\-]', ' ', cleaned) # Capitalizar cada palabra name = cleaned.title().strip() # Si el nombre tiene varias palabras, usar solo la primera (nombre de pila) first_name = name.split()[0] if name else "Estimado/a" return first_name, email_addr async def handle_email(client, from_name: str, from_email: str, message_body: str, is_fastmcp_native=False): print(f"--- PROCESANDO CORREO ---") print(f"De: {from_name} <{from_email}>") try: # Manejar cliente nativo de FastMCP vs langchain if is_fastmcp_native: # Cliente nativo de FastMCP tools_result = await client.list_tools() send_email_tool = next((t for t in tools_result.tools if t.name == "send_email"), None) if not send_email_tool: print("Error: No se encontro la herramienta 'send_email'") return # Obtener prompt prompts_result = await client.list_prompts() email_context_prompt = next((p for p in prompts_result.prompts if p.name == "email_context"), None) if not email_context_prompt: print("Error: No se encontro el prompt 'email_context'") return # Obtener el contenido del prompt prompt_result = await client.get_prompt("email_context", {}) context_text = "\n".join([m.content for m in prompt_result.messages]) else: # Cliente langchain tools = await client.get_tools() send_email = next((t for t in tools if t.name == "send_email"), None) if not send_email: print("Error: No se encontro la herramienta 'send_email'") return context_messages = await client.get_prompt( server_name="fgj-server", prompt_name="email_context" ) context_text = "\n".join([m.content for m in context_messages]) products = [ {"name": "Monitoreo Digital", "description": "Seguimiento 24/7 de marcas en redes sociales."}, {"name": "Publicidad Exterior", "description": "Vallas y pantallas LED en puntos estrategicos."}, {"name": "Produccion Audiovisual", "description": "Videos corporativos, comerciales y contenido para redes."}, {"name": "Desarrollo Web", "description": "Sitios web, aplicaciones y sistemas a medida."} ] llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.4) prompt = f""" {context_text} NOMBRE DEL CLIENTE: {from_name} EMAIL DEL CLIENTE: {from_email} Mensaje del cliente: {message_body} Servicios disponibles: {products} Redacta un correo de respuesta institucional. - Inicia con "Estimado/a {from_name}," o "Hola {from_name}," - Responde a su consulta de forma profesional - Incluye la firma de Belgica Jimenez al final Devuelve SOLO JSON con: - subject - body (HTML bien formateado) """ result: EmailOutput = await llm.with_structured_output( EmailOutput ).ainvoke(prompt) # Enviar correo según el tipo de cliente if is_fastmcp_native: await client.call_tool("send_email", { "to": from_email, "subject": result.subject, "body": result.body }) else: await send_email.ainvoke({ "to": from_email, "subject": result.subject, "body": result.body }) print(f"Respuesta enviada a {from_name} ({from_email})") except Exception as e: print(f"Error en handle_email: {e}") def check_emails_sync(): """Revisa correos usando imaplib estandar (sincrono)""" emails_to_process = [] try: mail = imaplib.IMAP4_SSL(IMAP_SERVER) mail.login(TARGET_EMAIL, EMAIL_PASS) mail.select("INBOX") # Buscar correos de HOY que no esten leidos today = datetime.now().strftime("%d-%b-%Y") search_criteria = f'(UNSEEN SINCE {today})' status, messages = mail.search(None, search_criteria) if status != "OK": mail.logout() return [] mail_ids = messages[0].split() if not mail_ids: print("No hay correos nuevos de hoy.") mail.logout() return [] # Tomar solo los ultimos 5 recent_ids = mail_ids[-5:] print(f"Encontrados {len(mail_ids)} correos de hoy. Procesando {len(recent_ids)}...") for m_id in recent_ids: status, msg_data = mail.fetch(m_id, "(RFC822)") if status != "OK": continue raw_email = msg_data[0][1] msg = email.message_from_bytes(raw_email) # Extraer nombre y email del remitente from_header = msg["From"] or "" from_name, from_email_addr = extract_name_from_email(from_header) subject = msg["Subject"] # Decodificar asunto si esta codificado if subject: decoded_parts = decode_header(subject) subject = "" for part, encoding in decoded_parts: if isinstance(part, bytes): subject += part.decode(encoding or "utf-8", errors="ignore") else: subject += part body = "" if msg.is_multipart(): for part in msg.walk(): if part.get_content_type() == "text/plain": payload = part.get_payload(decode=True) if payload: body = payload.decode(errors='ignore') break else: payload = msg.get_payload(decode=True) if payload: body = payload.decode(errors='ignore') emails_to_process.append({ "id": m_id, "from_name": from_name, "from_email": from_email_addr, "subject": subject, "body": body }) # Marcar como leido mail.store(m_id, "+FLAGS", "\\Seen") mail.logout() except Exception as e: print(f"Error en check_emails_sync: {e}") return emails_to_process async def listen_emails(): print(f"--- SISTEMA INICIADO ---") print(f"Escuchando en {TARGET_EMAIL}...") # Usar FastMCP Cloud en producción, localhost en desarrollo MCP_URL = os.getenv("MCP_URL", "https://vocal-blue-hawk.fastmcp.app/mcp") # Usar Bearer token para ambos (local y FastMCP Cloud) # El servidor usa StaticTokenVerifier que acepta Bearer tokens client = MultiServerMCPClient({ "fgj-server": { "transport": "streamable_http", "url": MCP_URL, "headers": { "Authorization": f"Bearer {MCP_TOKEN}" } } }) is_fastmcp_cloud = "fastmcp.app" in MCP_URL while True: try: emails = await asyncio.to_thread(check_emails_sync) for mail_data in emails: print(f"Nuevo correo: {mail_data['subject']} (de {mail_data['from_name']})") await handle_email( client, mail_data["from_name"], mail_data["from_email"], mail_data["body"], is_fastmcp_native=False # Usamos langchain siempre ahora ) except Exception as e: print(f"Error en el bucle principal: {e}") print("Esperando 20 segundos...") await asyncio.sleep(20) if __name__ == "__main__": print("Iniciando bot de correos FGJ Multimedios...") try: asyncio.run(listen_emails()) except KeyboardInterrupt: print("\nCerrando...")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/joelGuerraArias/mcpcorreos'

If you have feedback or need assistance with the MCP directory API, please join our Discord server