Skip to main content
Glama
email_processor.py5.95 kB
from typing import Dict, Any, List, Pattern import re import logging from datetime import datetime from .base_processor import BaseProcessor from mcp.context import MCPContext from models.entity_extractor import BERTEntityExtractor logger = logging.getLogger(__name__) class EmailProcessor(BaseProcessor): """Processor for handling email documents.""" def __init__(self, config: Dict[str, Any]): super().__init__(config) self.entity_extractor = BERTEntityExtractor( model_name=config["models"]["entity_extractor"]["model_name"], confidence_threshold=config["models"]["entity_extractor"]["confidence_threshold"] ) # Regular expressions for common email fields self.patterns = { "from": re.compile(r"(?i)From:\s*([^<\n]+)(?:<([^>]+)>)?"), "to": re.compile(r"(?i)To:\s*([^<\n]+)(?:<([^>]+)>)?"), "subject": re.compile(r"(?i)Subject:\s*(.+)(?:\n|$)"), "date": re.compile(r"(?i)Date:\s*(.+)(?:\n|$)"), "cc": re.compile(r"(?i)Cc:\s*(.+)(?:\n|$)") } def can_handle(self, context: MCPContext) -> bool: """Check if the document is an email.""" if not context.raw_text: if context.compressed: context.decompress() if not context.raw_text: return False # Check if document is already classified if context.metadata.get("document_type") == "email": return True # Look for email indicators in the text email_indicators = [ "from:", "to:", "subject:", "sent:", "cc:", "bcc:", "forwarded message", "original message", "reply" ] text_lower = context.raw_text.lower() indicator_count = sum(1 for indicator in email_indicators if indicator in text_lower) # If at least 3 indicators are found, consider it an email return indicator_count >= 3 def process(self, context: MCPContext) -> MCPContext: """Process an email document.""" if not self.validate_context(context): context.add_to_history( processor_name=self.__class__.__name__, status="skipped", details={"reason": "Invalid context"} ) return context if context.compressed: context.decompress() # Mark document as email context.update_metadata({"document_type": "email"}) # Extract basic fields using regex extracted_fields = self._extract_basic_fields(context.raw_text) for field, value in extracted_fields.items(): context.add_extracted_data(field, value, confidence=0.9) # High confidence for regex matches # Use ML-based entity extraction for more complex fields entities = self.entity_extractor.extract_entities(context.raw_text) # Extract email body (simplified approach) body = self._extract_body(context.raw_text) if body: context.add_extracted_data("body", body, confidence=0.85) # Extract people mentioned in the email if "person" in entities: people = entities["person"] if people: context.add_extracted_data("people_mentioned", [person["text"] for person in people], confidence=self.entity_extractor.get_confidence()) # Extract organizations mentioned if "organization" in entities: orgs = entities["organization"] if orgs: context.add_extracted_data("organizations_mentioned", [org["text"] for org in orgs], confidence=self.entity_extractor.get_confidence()) return context def _extract_basic_fields(self, text: str) -> Dict[str, Any]: """Extract basic email fields using regex patterns.""" results = {} for field, pattern in self.patterns.items(): match = pattern.search(text) if match: if field in ["from", "to"] and match.group(2): # If email address is captured in second group results[field] = { "name": match.group(1).strip(), "email": match.group(2).strip() } else: results[field] = match.group(1).strip() return results def _extract_body(self, text: str) -> str: """Extract the body of the email.""" # Simple heuristic: look for common email header patterns and extract everything after header_patterns = [ r"(?i)Subject:.+\n\s*\n", r"(?i)Date:.+\n\s*\n", r"(?i)To:.+\n\s*\n" ] for pattern in header_patterns: match = re.search(pattern, text) if match: start_idx = match.end() # Look for signature or footer signature_patterns = [ r"\n--\s*\n", # Common signature delimiter r"\n\s*Regards,", r"\n\s*Sincerely," ] end_idx = len(text) for sig_pattern in signature_patterns: sig_match = re.search(sig_pattern, text[start_idx:]) if sig_match: end_idx = start_idx + sig_match.start() break return text[start_idx:end_idx].strip() # Fallback: if we can't identify headers, just return the text return text

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/arifazim/MCP_Document_Classifer'

If you have feedback or need assistance with the MCP directory API, please join our Discord server