Skip to main content
Glama

Spark EventLog MCP Server

by yhyyz
server.py41.4 kB
""" Spark Event Log Analysis MCP Server with FastAPI Integration A FastMCP 2.0 based MCP server integrated with FastAPI for comprehensive Spark event log analysis, providing both MCP tools and HTTP API endpoints. """ import os import sys import asyncio from datetime import datetime from typing import Dict, Any, Optional, List from pathlib import Path from contextlib import asynccontextmanager # Import FastAPI and FastMCP from fastapi import FastAPI, HTTPException, Depends from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse, JSONResponse, HTMLResponse from fastmcp import FastMCP # Import our modules from .models.schemas import ( ParseEventLogInput, AnalyzePerformanceInput, GenerateReportInput, GetOptimizationSuggestionsInput, DataSource, AnalysisConfig, ReportConfig, AnalysisResult, GeneratedReport, OptimizationSuggestion ) from .core.mature_data_loader import MatureDataLoader from .tools.mature_analyzer import MatureSparkEventLogAnalyzer from .tools.mature_report_generator import HTMLReportGenerator from .models.mature_models import MatureAnalysisResult from .utils.helpers import setup_logging, load_config_from_env, create_error_response, create_success_response # Load configuration config = load_config_from_env() logger = setup_logging(config["log_level"]) # Initialize components mature_data_loader = MatureDataLoader(config) analyzer = MatureSparkEventLogAnalyzer() report_generator = HTMLReportGenerator() # Server state _current_analysis: Optional[MatureAnalysisResult] = None _current_data_source: Optional[DataSource] = None _server_host: str = "localhost" _server_port: int = 7799 _transport_mode: str = "streamable-http" # Define report data directory PROJECT_ROOT = Path(__file__).parent.parent.parent REPORT_DATA_DIR = PROJECT_ROOT / "report_data" # Ensure report data directory exists REPORT_DATA_DIR.mkdir(parents=True, exist_ok=True) logger.info(f"Report data directory: {REPORT_DATA_DIR}") # Create MCP server with all original tools mcp = FastMCP( name=config["server_name"], version=config["server_version"] ) # ==================== MCP Tools (保留所有原有工具) ==================== async def parse_eventlog(input_data: ParseEventLogInput) -> Dict[str, Any]: """ Parse Spark event logs from various data sources (S3, URL, local files) This tool loads and validates Spark event log data, making it available for subsequent analysis operations. Args: input_data: Configuration specifying the data source Returns: Parsing results with summary statistics """ global _current_data_source try: logger.info(f"Parsing event logs from {input_data.data_source.source_type}: {input_data.data_source.path}") # Validate data source first validation_result = await mature_data_loader.validate_data_source(input_data.data_source) if not validation_result["is_valid"]: return create_error_response( "ValidationError", f"Invalid data source: {validation_result['error_message']}", validation_result ) # Store current data source _current_data_source = input_data.data_source # Generate summary summary = { "source_type": input_data.data_source.source_type, "source_path": input_data.data_source.path, "validation_info": validation_result["info"], "status": "Data source validated successfully" } if validation_result["warnings"]: summary["warnings"] = validation_result["warnings"] logger.info(f"Successfully validated data source: {input_data.data_source.path}") return create_success_response( { "parsed": True, "summary": summary }, { "parse_timestamp": "completed", "data_cached": True } ) except Exception as e: logger.error(f"Failed to parse event logs: {str(e)}") return create_error_response( "ParseError", f"Failed to parse event logs: {str(e)}" ) async def analyze_performance(input_data: AnalyzePerformanceInput) -> Dict[str, Any]: """ Perform comprehensive performance analysis of Spark event logs This tool analyzes parsed event logs to extract performance metrics, resource utilization, shuffle statistics, and task execution patterns. Args: input_data: Analysis configuration and optional data source Returns: Complete analysis results with metrics and insights """ global _current_analysis, _current_data_source try: # Use provided data source or current one data_source = input_data.data_source or _current_data_source if not data_source: return create_error_response( "ConfigurationError", "No data source available. Please run parse_eventlog first or provide a data source." ) logger.info(f"Starting performance analysis with config: {input_data.analysis_config.analysis_depth}") # Load event logs using mature data loader if input_data.data_source: if data_source.source_type == "s3": event_logs_data = await mature_data_loader.load_from_s3(data_source.path) elif data_source.source_type == "url": event_logs_data = await mature_data_loader.load_from_url(data_source.path) elif data_source.source_type == "local": event_logs_data = await mature_data_loader.load_from_upload(data_source.path) else: return create_error_response( "DataError", f"Unsupported data source type: {data_source.source_type}" ) else: return create_error_response( "DataError", "Please provide a data_source parameter for analysis." ) # Perform analysis using mature analyzer analysis_result = analyzer.analyze(event_logs_data) # Store current analysis _current_analysis = analysis_result # Create response with summary summary = analyzer.get_analysis_summary() logger.info(f"Analysis completed for application: {analysis_result.application_id}") return create_success_response( { "analysis_complete": True, "analysis_result": analysis_result.dict(), "summary": summary }, { "analysis_timestamp": analysis_result.analysis_timestamp.isoformat(), "config_used": input_data.analysis_config.dict() } ) except Exception as e: logger.error(f"Performance analysis failed: {str(e)}") return create_error_response( "AnalysisError", f"Performance analysis failed: {str(e)}" ) @mcp.tool() async def generate_report(input_data: GenerateReportInput) -> Dict[str, Any]: """ Generate comprehensive reports with complete end-to-end processing This tool provides complete end-to-end Spark event log processing and report generation: 1. Data parsing and validation from various sources (S3, URL, local files) 2. Comprehensive performance analysis with configurable depth and metrics 3. Optimization suggestions extraction and categorization 4. Formatted report generation with visualizations and recommendations The tool automatically handles the complete pipeline and can work in multiple modes: - End-to-end: Provide data_source for complete processing from raw data - Analysis + Report: Use current session data for analysis and reporting - Report-only: Provide existing analysis_result to skip analysis phase Args: input_data: Complete configuration including: - report_config: Report formatting and content configuration (format, visualizations, etc.) - data_source: Optional data source for end-to-end processing (S3/URL/local file) - analysis_config: Optional analysis configuration (depth, metrics, etc.) - analysis_result: Optional existing analysis result (skips parsing and analysis if provided) Returns: Comprehensive report with content, metadata, analysis results, and optimization suggestions including parsing status, analysis metadata, and suggestion summaries """ global _current_analysis, _current_data_source try: parsing_performed = False analysis_performed = False analysis_result = input_data.analysis_result # Phase 1: Data Parsing (if needed) if not analysis_result: data_source_to_use = input_data.data_source or _current_data_source # If we have a new data source, perform parsing if input_data.data_source and input_data.data_source != _current_data_source: logger.info(f"Starting end-to-end processing with new data source: {input_data.data_source.source_type}://{input_data.data_source.path}") # Create parse input and call parse_eventlog internally parse_input = ParseEventLogInput(data_source=input_data.data_source) parse_response = await parse_eventlog(parse_input) # Check parsing success if not parse_response.get("success", False): return create_error_response( "ParseError", f"Data parsing failed: {parse_response.get('message', 'Unknown error')}" ) parsing_performed = True data_source_to_use = input_data.data_source logger.info("Data parsing completed successfully") # Verify we have a data source for analysis if not data_source_to_use: return create_error_response( "ConfigurationError", "No data source available for processing. " "Please provide data_source parameter or run parse_eventlog first, or provide existing analysis_result." ) # Phase 2: Performance Analysis (if needed) if not analysis_result: logger.info("No existing analysis found, performing new analysis") # Use provided analysis config or default analysis_config = input_data.analysis_config or AnalysisConfig() # Create analysis input and call analyze_performance internally analysis_input = AnalyzePerformanceInput( analysis_config=analysis_config, data_source=data_source_to_use ) analysis_response = await analyze_performance(analysis_input) # Check analysis success if not analysis_response.get("success", False): return create_error_response( "AnalysisError", f"Performance analysis failed: {analysis_response.get('message', 'Unknown error')}" ) # Get analysis result from current session analysis_result = _current_analysis analysis_performed = True if not analysis_result: return create_error_response( "AnalysisError", "Analysis completed but no result available" ) logger.info(f"Performance analysis completed for application: {analysis_result.application_id}") # Phase 3: Optimization Suggestions Extraction logger.info("Extracting optimization suggestions") suggestions_input = GetOptimizationSuggestionsInput( focus_areas=[], # Get all suggestions priority_filter=None # No priority filter ) suggestions_response = await get_optimization_suggestions(suggestions_input) optimization_suggestions = [] if suggestions_response.get("success", False): optimization_suggestions = suggestions_response.get("data", {}).get("suggestions", []) logger.info(f"Retrieved {len(optimization_suggestions)} optimization suggestions") else: logger.warning(f"Failed to get optimization suggestions: {suggestions_response.get('message', 'Unknown error')}") # Phase 4: Report Generation logger.info(f"Generating {input_data.report_config.report_format} report") if input_data.report_config.report_format == "html": # Generate HTML report with server configuration report_address = await report_generator.generate_html_report( analysis_result, server_host=_server_host, server_port=_server_port, transport_mode=_transport_mode ) if _transport_mode.lower() == "streamable-http": # HTTP mode: return HTTP URL for browser access response_data = { "report_generated": True, "report_format": "html", "title": f"Spark Analysis Report - {analysis_result.application_name}", "report_address": report_address, } else: # stdio mode: return file path response_data = { "report_generated": True, "report_format": "html", "title": f"Spark Analysis Report - {analysis_result.application_name}", "report_address": report_address, } elif input_data.report_config.report_format == "json": # Return JSON format with full analysis data and suggestions response_data = { "report_generated": True, "report_format": "json", "report_data": analysis_result.dict(), "optimization_suggestions": optimization_suggestions } else: return create_error_response( "ReportError", f"Unsupported report format: {input_data.report_config.report_format}" ) # Add comprehensive processing metadata response_data["processing_metadata"] = { "end_to_end_processing": parsing_performed or analysis_performed, "phases_completed": { "data_parsing": parsing_performed, "performance_analysis": analysis_performed, "optimization_extraction": len(optimization_suggestions) > 0, "report_generation": True }, "data_source": { "type": _current_data_source.source_type if _current_data_source else None, "path": _current_data_source.path if _current_data_source else None } if _current_data_source else None } # Add analysis metadata response_data["analysis_metadata"] = { "application_id": analysis_result.application_id, "application_name": analysis_result.application_name, "analysis_timestamp": analysis_result.analysis_timestamp.isoformat(), "total_optimization_suggestions": len(optimization_suggestions), "analysis_summary": { "total_jobs": len(analysis_result.jobs), "total_tasks": sum(job.num_tasks for job in analysis_result.jobs), "total_duration_ms": analysis_result.duration_ms, "successful_jobs": analysis_result.successful_jobs, "failed_jobs": analysis_result.failed_jobs, "total_executors": analysis_result.total_executors } } # Add optimization suggestions summary if optimization_suggestions: priority_counts = {"HIGH": 0, "MEDIUM": 0, "LOW": 0} category_counts = {} for suggestion in optimization_suggestions: priority = suggestion.get('priority', 'LOW') category = suggestion.get('category', 'OTHER') priority_counts[priority] = priority_counts.get(priority, 0) + 1 category_counts[category] = category_counts.get(category, 0) + 1 response_data["optimization_summary"] = { "total_suggestions": len(optimization_suggestions), "by_priority": priority_counts, "by_category": category_counts, "high_priority_suggestions": [s for s in optimization_suggestions if s.get('priority') == 'HIGH'] } logger.info(f"End-to-end report generation completed successfully") return create_success_response( response_data, { "generation_timestamp": datetime.now().isoformat(), "config_used": { "report_config": input_data.report_config.dict(), "analysis_config": input_data.analysis_config.dict() if input_data.analysis_config else None }, "processing_summary": { "parsing_performed": parsing_performed, "analysis_performed": analysis_performed, "suggestions_included": len(optimization_suggestions), "total_processing_phases": sum([parsing_performed, analysis_performed, True, True]) # parse, analyze, suggest, report } } ) except Exception as e: logger.error(f"End-to-end report generation failed: {str(e)}") return create_error_response( "ReportError", f"End-to-end report generation failed: {str(e)}" ) async def get_optimization_suggestions( input_data: GetOptimizationSuggestionsInput ) -> Dict[str, Any]: """ Get targeted optimization suggestions based on analysis results This tool extracts and filters optimization recommendations from the current analysis, allowing focus on specific areas or priority levels. Args: input_data: Filter configuration for suggestions Returns: Filtered optimization suggestions with implementation details """ global _current_analysis try: if not _current_analysis: return create_error_response( "ConfigurationError", "No analysis result available. Please run analyze_performance first." ) logger.info(f"Retrieving optimization suggestions with filters: {input_data.focus_areas or 'all'}") # Get filtered suggestions suggestions = analyzer.get_optimization_suggestions( focus_areas=input_data.focus_areas, priority_filter=input_data.priority_filter ) # Group suggestions by category and priority categorized_suggestions = {} priority_counts = {"HIGH": 0, "MEDIUM": 0, "LOW": 0} for suggestion in suggestions: category = suggestion['category'] if category not in categorized_suggestions: categorized_suggestions[category] = [] categorized_suggestions[category].append(suggestion) priority_counts[suggestion['priority']] += 1 response_data = { "suggestions_found": len(suggestions), "suggestions": suggestions, "categorized_suggestions": categorized_suggestions, "priority_breakdown": priority_counts } # Add configuration recommendations summary if suggestions: config_params = {} for suggestion in suggestions: config_params.update(suggestion['config_parameters']) response_data["recommended_spark_config"] = config_params logger.info(f"Retrieved {len(suggestions)} optimization suggestions") return create_success_response( response_data, { "filters_applied": { "focus_areas": input_data.focus_areas, "priority_filter": input_data.priority_filter }, "total_available": len(_current_analysis.optimization_recommendations) } ) except Exception as e: logger.error(f"Failed to get optimization suggestions: {str(e)}") return create_error_response( "SuggestionError", f"Failed to get optimization suggestions: {str(e)}" ) @mcp.tool() async def get_analysis_status() -> Dict[str, Any]: """ Get current analysis session status and summary information Returns information about the current analysis session, including data source, analysis configuration, and key metrics. Returns: Current session status and summary """ global _current_analysis, _current_data_source try: status = { "session_active": _current_analysis is not None, "data_source_loaded": _current_data_source is not None, } if _current_data_source: status["data_source"] = { "type": _current_data_source.source_type, "path": _current_data_source.path } status["data_info"] = { "source_type": _current_data_source.source_type, "path": _current_data_source.path } if _current_analysis: status["analysis_summary"] = analyzer.get_analysis_summary() status["optimization_suggestions_available"] = len(_current_analysis.optimization_recommendations) return create_success_response(status) except Exception as e: logger.error(f"Failed to get analysis status: {str(e)}") return create_error_response( "StatusError", f"Failed to get analysis status: {str(e)}" ) @mcp.tool() async def clear_session() -> Dict[str, Any]: """ Clear current analysis session and cached data This tool resets the server state, clearing all cached analysis results and data sources. Use this to start a fresh analysis session. Returns: Confirmation of session clearing """ global _current_analysis, _current_data_source try: # Clear global state _current_analysis = None _current_data_source = None logger.info("Session cleared successfully") return create_success_response({ "session_cleared": True, "message": "All cached data and analysis results have been cleared" }) except Exception as e: logger.error(f"Failed to clear session: {str(e)}") return create_error_response( "ClearError", f"Failed to clear session: {str(e)}" ) # ==================== MCP Resources (保留所有原有资源) ==================== @mcp.resource("server://info") async def server_info(): """Provide server information and capabilities""" return { "uri": "server://info", "name": "Spark EventLog MCP Server Info", "content": { "name": config["server_name"], "version": config["server_version"], "description": "End-to-end MCP Server for comprehensive Spark event log analysis with integrated processing pipeline", "primary_capability": "Single-command end-to-end Spark event log processing and report generation", "processing_pipeline": [ "Data parsing and validation from multiple sources (S3, URL, local files)", "Comprehensive performance analysis with configurable metrics", "Intelligent optimization suggestions with priority categorization", "Professional report generation with interactive visualizations" ], "key_features": [ "End-to-end processing in single tool call", "Automatic data source detection and validation", "Configurable analysis depth and focus areas", "Multi-format report generation (HTML/JSON)", "Real-time processing metadata and phase tracking", "Comprehensive optimization recommendations", "Interactive HTML reports with FastAPI integration", "Session management and data caching" ], "supported_data_sources": ["s3", "url", "local"], "default_source_type": config["default_source_type"], "supported_report_formats": ["html", "json"], "analysis_capabilities": [ "Application-level metrics and timing analysis", "Stage-by-stage execution breakdown", "Task distribution and execution patterns", "Shuffle performance and data locality analysis", "Resource utilization (CPU, memory, storage)", "Garbage collection impact assessment", "Performance bottleneck identification", "Optimization opportunity detection" ], "configuration": { "cache_enabled": config["cache_enabled"], "cache_ttl": config["cache_ttl"], "end_to_end_processing": True, "automatic_optimization_suggestions": True, "interactive_reports": True } }, "mimeType": "application/json" } @mcp.resource("docs://tools") async def tools_documentation(): """Provide comprehensive tool documentation""" return { "uri": "docs://tools", "name": "Spark EventLog MCP Server Tools Documentation", "content": { "tools": { "generate_report": { "description": "Complete end-to-end Spark event log processing and report generation", "input": "GenerateReportInput with comprehensive configuration options", "output": "Generated report with complete processing metadata, analysis results, and optimization suggestions", "processing_modes": { "end_to_end": "Provide data_source for complete processing from raw data to final report", "analysis_and_report": "Use current session data for analysis and report generation", "report_only": "Provide existing analysis_result to generate report from existing analysis" }, "processing_phases": [ "1. Data Parsing - Load and validate Spark event logs from various sources", "2. Performance Analysis - Extract metrics, resource utilization, and execution patterns", "3. Optimization Suggestions - Generate categorized recommendations for performance improvements", "4. Report Generation - Create formatted reports with visualizations and insights" ], "example_end_to_end": { "data_source": { "source_type": "s3", "path": "s3://my-bucket/spark-logs/application_123/" }, "analysis_config": { "analysis_depth": "detailed", "include_shuffle_analysis": True, "include_resource_analysis": True, "include_task_analysis": True, "include_optimization_suggestions": True }, "report_config": { "report_format": "html", "include_visualizations": True, "include_raw_metrics": False } }, "example_report_only": { "analysis_result": { "application_id": "app-20231201-123456", "jobs": "...", "optimization_recommendations": "..." }, "report_config": { "report_format": "json", "include_visualizations": False, "include_raw_metrics": True } }, "supported_formats": ["html", "json"], "supported_sources": ["s3", "url", "local"], "features": [ "Automatic data source validation and parsing", "Configurable performance analysis depth and metrics", "Intelligent optimization suggestions with priority categorization", "Interactive HTML reports with visualizations", "Comprehensive JSON reports with raw data", "Processing metadata and phase tracking", "Error handling and detailed logging throughout pipeline" ] }, "get_analysis_status": { "description": "Get current session status and processing summary", "input": "No parameters required", "output": "Current session information including data source status, analysis availability, and processing history" }, "clear_session": { "description": "Clear current analysis session and reset all cached data", "input": "No parameters required", "output": "Session clearing confirmation and reset status" } }, "workflow_guide": { "simple_usage": [ "Single call: generate_report with data_source for complete end-to-end processing", "Result: Comprehensive report with all analysis and optimization insights" ], "advanced_usage": [ "1. Configure analysis_config for specific analysis requirements", "2. Use report_config to customize report format and content", "3. Leverage processing_metadata to track pipeline execution", "4. Use optimization_summary for focused performance improvements" ], "internal_functions": { "parse_eventlog": "Internal data parsing and validation (called automatically)", "analyze_performance": "Internal performance analysis engine (called automatically)", "get_optimization_suggestions": "Internal optimization recommendation engine (called automatically)" } }, "data_sources": { "s3": { "format": {"source_type": "s3", "path": "s3://bucket-name/path/to/logs/"}, "requirements": "AWS credentials configured or IAM role permissions" }, "url": { "format": {"source_type": "url", "path": "https://example.com/spark-logs.zip"}, "requirements": "Publicly accessible URL or authenticated endpoint" }, "local": { "format": {"source_type": "local", "path": "/path/to/local/spark-logs/"}, "requirements": "Local file system access to log files" } } }, "mimeType": "application/json" } @mcp.resource("health://components") async def check_components(): """Check health of server components""" try: # Test basic functionality test_config = AnalysisConfig() test_report_config = ReportConfig() health_data = { "status": "healthy", "components": { "data_loader": "operational", "analyzer": "operational", "report_generator": "operational" }, "configuration": { "cache_enabled": config["cache_enabled"], "aws_configured": bool(config.get("aws_access_key_id")) } } return { "uri": "health://components", "name": "MCP Server Health Check", "content": health_data, "mimeType": "application/json" } except Exception as e: return { "uri": "health://components", "name": "MCP Server Health Check", "content": { "status": "unhealthy", "error": str(e) }, "mimeType": "application/json" } # ==================== FastAPI Integration ==================== # Create MCP ASGI app mcp_app = mcp.http_app(path='/mcp') # Define FastAPI lifespan @asynccontextmanager async def fastapi_lifespan(app: FastAPI): """FastAPI lifespan for initialization and cleanup""" logger.info("FastAPI app starting up...") yield logger.info("FastAPI app shutting down...") # Combine lifespans @asynccontextmanager async def combined_lifespan(app: FastAPI): """Combined lifespan for both FastAPI and MCP""" async with fastapi_lifespan(app): async with mcp_app.lifespan(app): yield # Create FastAPI app with combined lifespan fastapi_app = FastAPI( title="Spark EventLog Analysis API", version=config["server_version"], description="RESTful API for Spark event log analysis with MCP integration", lifespan=combined_lifespan ) # Add CORS middleware (if needed) # Note: Only add if your FastAPI endpoints need CORS, not for MCP routes # fastapi_app.add_middleware( # CORSMiddleware, # allow_origins=["*"], # allow_credentials=True, # allow_methods=["*"], # allow_headers=["*"], # ) # ==================== FastAPI HTTP Endpoints ==================== @fastapi_app.get("/") async def root(): """Root endpoint""" return { "service": "Spark EventLog Analysis API", "version": config["server_version"], "description": "RESTful API for Spark event log analysis with MCP integration", "endpoints": { "health": "/health", "api_docs": "/docs", "mcp_endpoint": "/mcp", "reports": { "list_reports": "GET /api/reports - 列出所有报告(JSON)", "view_report_1": "GET /reports/{filename} - 在浏览器中查看报告(HTML)", "view_report_2": "GET /api/reports/{filename} - 在浏览器中查看报告(HTML)", "delete_report": "DELETE /api/reports/{filename} - 删除报告" } } } @fastapi_app.get("/health") async def health_check(): """Health check endpoint""" return { "status": "healthy", "service": "Spark EventLog Analysis API", "version": config["server_version"], "timestamp": datetime.now().isoformat() } # ==================== Report API Endpoints ==================== @fastapi_app.get("/api/reports") async def list_reports(): """ 列出所有可用的报告文件 Returns: List of available HTML report files with metadata """ try: reports = [] # 扫描 report_data 目录 if REPORT_DATA_DIR.exists(): for file_path in REPORT_DATA_DIR.glob("*.html"): file_stat = file_path.stat() reports.append({ "filename": file_path.name, "size": file_stat.st_size, "created": datetime.fromtimestamp(file_stat.st_ctime).isoformat(), "modified": datetime.fromtimestamp(file_stat.st_mtime).isoformat(), "url": f"/reports/{file_path.name}" }) # 按修改时间降序排序(最新的在前面) reports.sort(key=lambda x: x["modified"], reverse=True) return { "total": len(reports), "reports": reports, "report_directory": str(REPORT_DATA_DIR) } except Exception as e: logger.error(f"Failed to list reports: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to list reports: {str(e)}") @fastapi_app.get("/api/reports/{filename}") async def get_report_html(filename: str): """ 直接返回 HTML 报告文件,在浏览器中显示 Args: filename: Report filename Returns: HTML file content """ try: file_path = REPORT_DATA_DIR / filename if not file_path.exists(): raise HTTPException(status_code=404, detail=f"Report not found: {filename}") if not file_path.is_file(): raise HTTPException(status_code=400, detail=f"Not a file: {filename}") # 检查文件扩展名 if file_path.suffix.lower() not in [".html", ".htm"]: raise HTTPException(status_code=400, detail=f"Not an HTML file: {filename}") # 读取 HTML 文件内容并直接返回 with open(file_path, 'r', encoding='utf-8') as f: html_content = f.read() return HTMLResponse(content=html_content, status_code=200) except HTTPException: raise except Exception as e: logger.error(f"Failed to get report: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to get report: {str(e)}") @fastapi_app.delete("/api/reports/{filename}") async def delete_report(filename: str): """ 删除指定的报告文件 Args: filename: Report filename to delete Returns: Deletion confirmation """ try: file_path = REPORT_DATA_DIR / filename if not file_path.exists(): raise HTTPException(status_code=404, detail=f"Report not found: {filename}") if not file_path.is_file(): raise HTTPException(status_code=400, detail=f"Not a file: {filename}") # 删除文件 file_path.unlink() logger.info(f"Deleted report: {filename}") return { "success": True, "message": f"Report deleted: {filename}", "filename": filename } except HTTPException: raise except Exception as e: logger.error(f"Failed to delete report: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to delete report: {str(e)}") # ==================== Static Files ==================== # Mount static files for reports # 注意: StaticFiles 必须在所有其他路由之后挂载,因为它会匹配所有路径 # 移除 html=True 参数,让浏览器根据 content-type 自动处理 fastapi_app.mount("/reports", StaticFiles(directory=str(REPORT_DATA_DIR)), name="reports") # Mount MCP server at root fastapi_app.mount("/", mcp_app) def main(): """Main entry point for the integrated server""" global _server_host, _server_port, _transport_mode logger.info(f"Starting {config['server_name']} v{config['server_version']} with FastAPI integration") logger.info(f"Configuration: Cache={'ON' if config['cache_enabled'] else 'OFF'}, " f"Log Level={config['log_level']}") # Get transport mode configuration transport_mode = os.getenv("MCP_TRANSPORT", "streamable-http") mcp_host = os.getenv("MCP_HOST", "localhost") mcp_port = int(os.getenv("MCP_PORT", "7799")) # Update global state _server_host = mcp_host _server_port = mcp_port _transport_mode = transport_mode try: if transport_mode.lower() == "streamable-http": # Use HTTP transport mode with FastAPI logger.info(f"Starting HTTP server with FastAPI on {mcp_host}:{mcp_port}") logger.info(f"API Documentation available at: http://{mcp_host}:{mcp_port}/docs") logger.info(f"MCP endpoint available at: http://{mcp_host}:{mcp_port}/mcp") logger.info(f"Reports directory: {REPORT_DATA_DIR}") logger.info(f"View reports at: http://{mcp_host}:{mcp_port}/reports/<filename>") logger.info(f"List reports at: http://{mcp_host}:{mcp_port}/api/reports") # Run FastAPI app with uvicorn import uvicorn uvicorn.run( fastapi_app, host=mcp_host, port=mcp_port, log_level=config["log_level"].lower() ) else: # Default to stdio transport mode (original MCP only) mcp.run(transport="stdio") except KeyboardInterrupt: logger.info("Server shutdown requested") except Exception as e: logger.error(f"Server error: {str(e)}") sys.exit(1) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/yhyyz/spark-eventlog-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server