Skip to main content
Glama
server_fastmcp.py11.3 kB
import argparse import asyncio import os import json import logging import smtplib from email.mime.application import MIMEApplication from email.mime.image import MIMEImage from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from enum import Enum from typing import Optional, Union, Set from mcp import McpError, GetPromptResult, stdio_server from mcp.server import Server, FastMCP from mcp.types import (Tool, ErrorData, Prompt, PromptArgument, PromptMessage, TextContent, INVALID_PARAMS, INTERNAL_ERROR, METHOD_NOT_FOUND) from pydantic import BaseModel, Field from dotenv import load_dotenv logger = logging.getLogger('mcp_email_server') logger.info("Starting MCP Email Server") load_dotenv() sender = os.getenv("SENDER") password = os.getenv("PASSWORD") # Get the directory where the service is started server_dir = os.path.dirname(os.path.abspath(__file__)) mcp = FastMCP("MCP-Email") def initialization_email_config(): with open(os.path.join(server_dir, "email.json"), "r", encoding="UTF-8") as file: return json.load(file) email_config = initialization_email_config() ATTACHMENT_FOLDER = None class EmailMessage(BaseModel): receiver: list[str] = Field(description="The list of recipient email addresses, supports multiple recipients") body: str = Field(description="The main content of the email") subject: str = Field(description="The subject line of the email") attachments: Union[list[str], str] = Field(default=[], description="Email attachments, just need to get the file name of the attachment") class EmailTools(str, Enum): SEND_EMAIL = "send_email" SEARCH_ATTACHMENTS = "search_attachments" async def send_email(email_message: EmailMessage): """Send email asynchronously. Args: email_message (EmailMessage): Email message object, including email content, recipients, etc. Returns: str: Returns the message sent by email """ smtp_server, smtp_port = get_smtp_info() if not (smtp_server and smtp_port): raise ValueError("Please check that your email address is entered correctly, or it is not a supported email service") logger.info(f"send email message: {email_message.model_json_schema()}") # Build email content message = MIMEMultipart() message["From"] = sender message["To"] = ", ".join(email_message.receiver) # Convert the recipient list to a comma-delimited string message["Subject"] = email_message.subject message.attach(MIMEText(email_message.body, "plain")) if isinstance(email_message.attachments, str): try: email_message.attachments = email_message.attachments.replace("\\", "\\\\") attachments = json.loads(email_message.attachments) except Exception as e: raise ValueError(f"email message attachments error: {e}") else: email_message.attachments = attachments # Attach attachments to email messages if email_message.attachments: for file in email_message.attachments: absolute_path = os.path.join(ATTACHMENT_FOLDER, file) if os.path.isfile(absolute_path): message.attach(attach_file(absolute_path)) else: raise ValueError(f"{absolute_path} not exists") try: # Choose correct SMTP class based on port smtp_class = smtplib.SMTP_SSL if smtp_port == 465 else smtplib.SMTP # Create SMTP connection with timeout with smtp_class(smtp_server, smtp_port, timeout=10) as server: if smtp_port != 465: server.starttls() # Enable TLS for non-SSL connections # Login and send server.login(sender, password) server.send_message(email_message) return f"Email sent successfully from {sender}" except smtplib.SMTPAuthenticationError: raise ValueError("Authentication failed - check username and password") except smtplib.SMTPServerDisconnected: raise ConnectionError("Server disconnected unexpectedly") except smtplib.SMTPException as e: raise smtplib.SMTPException(f"SMTP error occurred: {str(e)}") except TimeoutError: raise ConnectionError("Connection timed out") except Exception as e: raise Exception(f"Unexpected error: {str(e)}") def get_smtp_info() -> tuple[str, int] | tuple[None, None]: """Get the SMTP server address and port from the configuration based on the sender's email domain name. Returns: tuple[str, int] | tuple[None, None]: Returns the matching SMTP server address and port; if not found, returns (None, None). """ # Extract the domain name part of the sender's email domain = f"@{sender.split('@')[1]}" # Traverse the configuration and find the matching domain name for config in email_config: if config.get("domain") == domain: return config.get("server"), config.get("port") # No matching configuration found return None, None def get_lack_params(arguments: dict) -> str | None: """Check if the parameters dictionary is missing a required field for the EmailMessage model. Args: arguments (dict): The supplied parameter dictionary. Returns: str | None: If a field is missing, returns an error message for the missing field; otherwise returns None. """ # Get the field names of the EmailMessage model required_fields = EmailMessage.model_fields.keys() # Find the missing fields missing_fields = [field for field in required_fields if field not in arguments] # Logging logger.info(f"Missing params: {missing_fields}") # If there is a missing field, returns an error message; otherwise returns None if missing_fields: return "\n".join(f"{field} is required" for field in missing_fields) return None async def search_attachments(pattern: str, ignore_case: bool = True) -> str: """ Searches for file paths matching the specified pattern in the specified directory. Args: pattern: the text pattern to search ignore_case: whether to ignore case Returns: A collection of matching file paths """ # 规范化目录路径 root_dir = os.path.abspath(os.path.expanduser(ATTACHMENT_FOLDER)) if not os.path.exists(root_dir): raise ValueError(f"directory no exists: {ATTACHMENT_FOLDER}") matches = set() pattern = pattern.lower() if ignore_case else pattern # 遍历目录 for root, _, files in os.walk(root_dir): for file in files: file_name = file.lower() if ignore_case else file if pattern in file_name: full_path = os.path.join(root, file) matches.add(full_path) return "\n".join(matches) def attach_file(file_path): # Define allowed file types ALLOWED_EXTENSIONS = { 'document': ['doc', 'docx', 'xls', 'xlsx', 'ppt', 'pptx', 'pdf'], 'archive': ['zip', 'rar', '7z', 'tar', 'gz'], 'text': ['txt', 'log', 'csv', 'json', 'xml'], 'image': ['jpg', 'jpeg', 'png', 'gif', 'bmp'], 'other': ['md'] # Other special formats allowed } # Flatten the list of allowed extensions allowed_extensions = [ext for exts in ALLOWED_EXTENSIONS.values() for ext in exts] with open(file_path, 'rb') as f: file_data = f.read() filename = os.path.basename(file_path) ext = filename.lower().split('.')[-1] # Check if the file type is allowed if ext not in allowed_extensions: raise ValueError(f"Unsupported file types: {ext}") # Process according to file type if ext in ALLOWED_EXTENSIONS['document']: attachment = MIMEApplication(file_data, _subtype=ext) elif ext in ALLOWED_EXTENSIONS['archive']: attachment = MIMEApplication(file_data) elif ext in ALLOWED_EXTENSIONS['text']: try: attachment = MIMEText(file_data.decode('UTF-8'), 'plain') except UnicodeDecodeError: # If UTF-8 decoding is not possible, treat it as a normal attachment attachment = MIMEApplication(file_data) elif ext in ALLOWED_EXTENSIONS['image']: attachment = MIMEImage(file_data) else: # Other allowed formats use generic processing attachment = MIMEApplication(file_data) attachment.add_header('Content-Disposition', 'attachment', filename=filename) return attachment @mcp.tool() async def send_email_tool(receiver: list, body: str, subject: str, attachments: list = []): """A function call tool designed to send emails based on the provided recipients, subject, body, and optional attachments. This tool ensures secure and accurate email delivery while supporting multiple recipients and customizable content. It is ideal for automating email workflows. After gathering the necessary information (recipients, subject, body, and attachments), the tool displays the composed email details to the user for review. The email is only sent after the user confirms the details, ensuring accuracy and intentionality in communication. Args: receiver: The list of recipient email addresses, supports multiple recipients. body: The main content of the email. subject: The subject line of the email. attachments: Email attachments, just need to get the file name of the attachment. """ try: args = EmailMessage(receiver=receiver, body=body, subject=subject, attachments=attachments) except ValueError as e: raise McpError(ErrorData(code=INVALID_PARAMS, message=str(e))) try: email_response = await send_email(args) return [TextContent(type="text", text=f"Send email response: \n{email_response}")] except Exception as e: raise McpError(ErrorData(code=INTERNAL_ERROR, message=str(e))) @mcp.tool() async def search_attachments_tool(pattern: str): """Searches for files in a specified directory that match a given pattern. The search can be case-insensitive and returns the full paths of all matching files. This tool is useful for locating specific files or attachments within a directory structure. Args: pattern: The text pattern to search for in file names. The search is case-insensitive by default. """ try: search_response = await asyncio.wait_for(search_attachments(pattern=pattern), timeout=3) return [TextContent(type="text", text=f"Search attachments response: \n{search_response}")] except asyncio.TimeoutError as e: raise McpError(ErrorData(code=INTERNAL_ERROR, message=str(e))) except Exception as e: raise McpError(ErrorData(code=INTERNAL_ERROR, message=str(e))) if __name__ == "__main__": # Initialize and run the server parser = argparse.ArgumentParser("Search files in a specified directory") parser.add_argument("--dir", type=str, help="Specify the directory where attachments are located.") args = parser.parse_args() ATTACHMENT_FOLDER = args.dir mcp.run(transport='stdio')

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Shy2593666979/mcp-server-email'

If you have feedback or need assistance with the MCP directory API, please join our Discord server