Skip to main content
Glama

Trade Surveillance Support MCP Server

by vic3custodio
server.py•19 kB
#!/usr/bin/env python3 """ Trade Surveillance Support MCP Server This MCP server provides tools to automate trade surveillance support workflows: - Parse user inquiry emails - Search relevant SQL config files and Java code - Execute Java processes to generate reports - Streamline user support and investigation """ import asyncio import logging from pathlib import Path from typing import Any from mcp.server.fastmcp import FastMCP from .metadata_index import MetadataIndex # Initialize logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize FastMCP server mcp = FastMCP("Trade Surveillance Support") # Initialize metadata index metadata_index = MetadataIndex() @mcp.tool() async def parse_email_inquiry(email_content: str) -> dict[str, Any]: """ Parse a user inquiry email to extract key information for investigation. This tool analyzes email content and extracts: - Inquiry type (trade issue, report request, data verification, etc.) - Related trade IDs or account numbers - Time period of interest - Priority level - Required actions Args: email_content: The full text content of the user's inquiry email Returns: A dictionary containing parsed information including inquiry_type, trade_ids, time_period, priority, and suggested_actions """ import re from datetime import datetime, timedelta content_lower = email_content.lower() # Extract trade IDs (common patterns: TRD123456, TRADE-123456, T-123456, #123456) trade_id_patterns = [ r'\bTRD[\-_]?\d{5,10}\b', r'\bTRADE[\-_]?\d{5,10}\b', r'\bT[\-_]\d{5,10}\b', r'#\d{5,10}\b', r'\btrade\s+(?:id|number|ref)[\s:]+(\d{5,10})\b' ] trade_ids = [] for pattern in trade_id_patterns: matches = re.findall(pattern, email_content, re.IGNORECASE) trade_ids.extend(matches) trade_ids = list(set(trade_ids)) # Remove duplicates # Extract account numbers (common patterns: ACC123456, ACCT-123456, Account: 123456) account_patterns = [ r'\bACC(?:T)?[\-_]?\d{5,10}\b', r'\baccount[\s:]+(\d{5,10})\b' ] account_numbers = [] for pattern in account_patterns: matches = re.findall(pattern, email_content, re.IGNORECASE) account_numbers.extend(matches) account_numbers = list(set(account_numbers)) # Extract time periods (dates, ranges, relative times) time_period = None date_patterns = [ r'\b(\d{1,2}[-/]\d{1,2}[-/]\d{2,4})\b', # MM/DD/YYYY or DD-MM-YYYY r'\b(\d{4}[-/]\d{1,2}[-/]\d{1,2})\b', # YYYY-MM-DD r'\b(jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)[a-z]*\s+\d{1,2},?\s+\d{4}\b', r'\b\d{1,2}\s+(jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)[a-z]*\s+\d{4}\b' ] dates_found = [] for pattern in date_patterns: dates_found.extend(re.findall(pattern, email_content, re.IGNORECASE)) # Check for relative time references if 'last week' in content_lower or 'past week' in content_lower: time_period = "last_week" elif 'last month' in content_lower or 'past month' in content_lower: time_period = "last_month" elif 'yesterday' in content_lower: time_period = "yesterday" elif 'today' in content_lower or 'this morning' in content_lower: time_period = "today" elif 'last 7 days' in content_lower or 'past 7 days' in content_lower: time_period = "last_7_days" elif 'last 30 days' in content_lower or 'past 30 days' in content_lower: time_period = "last_30_days" elif dates_found: time_period = dates_found[0] if isinstance(dates_found[0], str) else str(dates_found[0]) # Determine inquiry type based on keywords inquiry_type = "general_inquiry" inquiry_keywords = { "trade_issue": ["trade error", "failed trade", "trade problem", "transaction failed", "execution issue"], "report_request": ["generate report", "need report", "send report", "report for", "can you provide"], "data_verification": ["verify", "check data", "confirm", "validate", "reconcile"], "settlement_inquiry": ["settlement", "settle", "payment", "clearing"], "compliance_check": ["compliance", "audit", "regulatory", "regulation"], "position_inquiry": ["position", "holdings", "balance", "exposure"], "transaction_history": ["transaction history", "trade history", "past trades", "historical"] } for inq_type, keywords in inquiry_keywords.items(): if any(keyword in content_lower for keyword in keywords): inquiry_type = inq_type break # Determine priority based on urgency indicators priority = "medium" high_priority_words = ["urgent", "asap", "immediately", "critical", "emergency", "high priority"] low_priority_words = ["when you can", "no rush", "low priority", "whenever"] if any(word in content_lower for word in high_priority_words): priority = "high" elif any(word in content_lower for word in low_priority_words): priority = "low" # Generate suggested actions based on inquiry type suggested_actions = [] if inquiry_type == "report_request": suggested_actions = [ "Search for relevant report config files", "Identify appropriate Java report generator", "Execute report generation", "Send report to requester" ] elif inquiry_type == "trade_issue": suggested_actions = [ "Search trade transaction records", "Check for error logs", "Verify trade details", "Generate diagnostic report" ] elif inquiry_type == "data_verification": suggested_actions = [ "Query relevant data sources", "Run reconciliation checks", "Generate verification report" ] elif inquiry_type == "settlement_inquiry": suggested_actions = [ "Search settlement config files", "Check settlement status", "Generate settlement report" ] elif inquiry_type == "compliance_check": suggested_actions = [ "Search compliance config files", "Run audit checks", "Generate compliance report" ] else: suggested_actions = [ "Analyze inquiry details", "Search for relevant config files", "Identify appropriate action" ] result = { "status": "parsed", "inquiry_type": inquiry_type, "trade_ids": trade_ids, "account_numbers": account_numbers, "time_period": time_period, "priority": priority, "suggested_actions": suggested_actions, "raw_content_preview": email_content[:300] + "..." if len(email_content) > 300 else email_content, "content_length": len(email_content) } logger.info(f"Parsed email inquiry: {inquiry_type}, Priority: {priority}, Found {len(trade_ids)} trade IDs") return result @mcp.tool() async def search_sql_configs( search_keywords: str, config_directory: str = "./configs" ) -> dict[str, Any]: """ Search for SQL configuration files using metadata keywords instead of file paths. This tool searches through indexed SQL config files by their metadata annotations. Files should include metadata comments like: -- @keywords: trade, transaction, daily_report -- @type: compliance_check -- @description: Daily trade reconciliation report Args: search_keywords: Keywords to search for (e.g., "trade settlement", "compliance", "daily report") config_directory: Path to the directory containing SQL config files (used for initial scan) Returns: A dictionary containing matching config files with their metadata """ logger.info(f"Searching SQL configs for keywords: {search_keywords}") # Scan directory if index is empty if not metadata_index.index.get("sql_configs"): logger.info(f"Scanning SQL configs in: {config_directory}") metadata_index.scan_sql_configs(config_directory) # Search by keywords matches = metadata_index.search(search_keywords, file_type="sql") result = { "status": "success", "search_keywords": search_keywords, "matches_found": len(matches), "config_files": matches } logger.info(f"Found {len(matches)} SQL config matches") return result @mcp.tool() async def search_java_code( search_keywords: str, code_directory: str = "./src" ) -> dict[str, Any]: """ Search for Java classes using metadata keywords instead of file paths. This tool searches through indexed Java files by their javadoc metadata annotations. Java classes should include metadata in javadoc comments like: /** * @keywords trade, settlement, report_generator * @type report_engine * @description Generates daily settlement reports */ Args: search_keywords: Keywords to search for (e.g., "report generator", "trade processor") code_directory: Path to the directory containing Java source files (used for initial scan) Returns: A dictionary containing matching Java files with their metadata and methods """ logger.info(f"Searching Java code for keywords: {search_keywords}") # Scan directory if index is empty if not metadata_index.index.get("java_classes"): logger.info(f"Scanning Java classes in: {code_directory}") metadata_index.scan_java_classes(code_directory) # Search by keywords matches = metadata_index.search(search_keywords, file_type="java") result = { "status": "success", "search_keywords": search_keywords, "matches_found": len(matches), "java_classes": matches } logger.info(f"Found {len(matches)} Java class matches") return result @mcp.tool() async def execute_java_report( java_class: str, config_file: str, output_directory: str = "./reports" ) -> dict[str, Any]: """ Execute a Java unit test to generate a trade surveillance report. This tool runs the unit test for the specified Java class with the given config file to generate the required report or data extract. Running tests ensures the report generation is validated during execution. Args: java_class: The fully qualified Java class name (e.g., "com.trade.SettlementReportGenerator") config_file: Path to the SQL config file to use output_directory: Directory where the report should be saved Returns: A dictionary containing execution status, report path, and any errors """ import subprocess import os from datetime import datetime from pathlib import Path start_time = datetime.now() # Derive test class name (e.g., SettlementReportGenerator -> SettlementReportGeneratorTest) class_simple_name = java_class.split('.')[-1] test_class_name = f"{class_simple_name}Test" # Get the package path from the java_class if '.' in java_class: package_parts = java_class.rsplit('.', 1)[0] test_class_full = f"{package_parts}.{test_class_name}" else: test_class_full = test_class_name # Create output directory if it doesn't exist output_path = Path(output_directory) output_path.mkdir(parents=True, exist_ok=True) # Generate report filename based on config and timestamp config_name = Path(config_file).stem timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") report_filename = f"{config_name}_report_{timestamp}.csv" report_path = output_path / report_filename result = { "status": "running", "java_class": java_class, "test_class": test_class_full, "config_file": config_file, "output_directory": str(output_directory), "report_path": str(report_path), "execution_time": None, "errors": [], "test_output": None } try: logger.info(f"Executing Java test: {test_class_full} for class: {java_class}") # Set environment variables for the test to use env = os.environ.copy() env['CONFIG_FILE'] = str(Path(config_file).absolute()) env['OUTPUT_FILE'] = str(report_path.absolute()) # Determine build tool (Maven or Gradle) project_root = Path.cwd() has_maven = (project_root / "pom.xml").exists() has_gradle = (project_root / "build.gradle").exists() or (project_root / "build.gradle.kts").exists() if has_maven: # Run Maven test for specific test class cmd = [ "mvn", "test", f"-Dtest={test_class_name}", f"-DconfigFile={config_file}", f"-DoutputFile={report_path}" ] logger.info(f"Running Maven command: {' '.join(cmd)}") elif has_gradle: # Run Gradle test for specific test class cmd = [ "./gradlew", "test", f"--tests {test_class_name}", f"-DconfigFile={config_file}", f"-DoutputFile={report_path}" ] logger.info(f"Running Gradle command: {' '.join(cmd)}") else: # Fallback to direct JUnit execution cmd = [ "java", "-cp", "target/test-classes:target/classes:lib/*", # Adjust classpath as needed "org.junit.runner.JUnitCore", test_class_full ] logger.info(f"Running JUnit command: {' '.join(cmd)}") # Execute the test process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=env, cwd=str(project_root) ) stdout, stderr = await process.communicate() # Calculate execution time execution_time = (datetime.now() - start_time).total_seconds() result["execution_time"] = f"{execution_time:.2f}s" # Process output stdout_text = stdout.decode('utf-8') if stdout else "" stderr_text = stderr.decode('utf-8') if stderr else "" result["test_output"] = stdout_text if process.returncode == 0: result["status"] = "success" logger.info(f"Test execution completed successfully in {execution_time:.2f}s") # Verify report was created if report_path.exists(): file_size = report_path.stat().st_size result["report_size"] = f"{file_size} bytes" logger.info(f"Report generated: {report_path} ({file_size} bytes)") else: result["status"] = "completed_no_output" result["errors"].append("Test passed but report file was not created") logger.warning("Test passed but no report file found") else: result["status"] = "failed" result["errors"].append(f"Test execution failed with exit code {process.returncode}") if stderr_text: result["errors"].append(stderr_text) logger.error(f"Test execution failed: {stderr_text}") except FileNotFoundError as e: result["status"] = "error" result["errors"].append(f"Build tool not found: {str(e)}") result["errors"].append("Ensure Maven (mvn) or Gradle (./gradlew) is installed and in PATH") logger.error(f"Build tool not found: {e}") except Exception as e: result["status"] = "error" result["errors"].append(f"Unexpected error: {str(e)}") logger.error(f"Unexpected error during test execution: {e}", exc_info=True) return result @mcp.tool() async def rebuild_metadata_index( config_directory: str = "./configs", code_directory: str = "./src" ) -> dict[str, Any]: """ Rebuild the metadata index by scanning all SQL configs and Java files. Use this tool when you've added new files or updated metadata annotations. The index is automatically built on first search, but you can manually rebuild it with this tool. Args: config_directory: Path to the directory containing SQL config files code_directory: Path to the directory containing Java source files Returns: A summary of the indexing operation """ logger.info("Rebuilding metadata index...") sql_count = len(metadata_index.scan_sql_configs(config_directory)) java_count = len(metadata_index.scan_java_classes(code_directory)) result = { "status": "success", "sql_configs_indexed": sql_count, "java_classes_indexed": java_count, "total_files_indexed": sql_count + java_count, "index_file": str(metadata_index.index_file.absolute()) } logger.info(f"Index rebuilt: {sql_count} SQL configs, {java_count} Java classes") return result @mcp.tool() async def generate_response_summary( parsed_email: dict[str, Any], config_files: list[str], report_path: str ) -> str: """ Generate a summary response for the user inquiry with all relevant information. This tool combines all the gathered information into a clear, actionable response that can be sent back to the user. Args: parsed_email: The parsed email inquiry data config_files: List of config files that were used report_path: Path to the generated report file Returns: A formatted summary string ready to send to the user """ summary = f""" Trade Surveillance Support - Response Summary ============================================== Inquiry Type: {parsed_email.get('inquiry_type', 'Unknown')} Priority: {parsed_email.get('priority', 'Medium')} Actions Taken: - Analyzed inquiry email - Located {len(config_files)} relevant configuration files - Generated report: {report_path} Next Steps: {chr(10).join(f"- {action}" for action in parsed_email.get('suggested_actions', []))} Report Location: {report_path} Please review the generated report and let me know if you need any additional information. """ logger.info("Generated response summary") return summary.strip() def main(): """ Main entry point for the Trade Surveillance MCP server. """ logger.info("Starting Trade Surveillance MCP Server...") mcp.run(transport='stdio') if __name__ == "__main__": main()

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/vic3custodio/mcp_test_2'

If you have feedback or need assistance with the MCP directory API, please join our Discord server