Skip to main content
Glama

Physics MCP Server

by BlinkZer0
reporting.py19 kB
""" Enhanced reporting system for Phys-MCP with Markdown generation and artifact linking """ import json import os from datetime import datetime, timezone from pathlib import Path from typing import Dict, Any, List, Optional, Union import base64 from io import BytesIO import hashlib def generate_session_report( session_id: str, title: Optional[str] = None, author: Optional[str] = None, include_sections: Optional[List[str]] = None, format_type: str = "markdown" ) -> Dict[str, Any]: """ Generate comprehensive session report with linked artifacts Args: session_id: Session identifier title: Report title author: Report author include_sections: Sections to include format_type: Output format (markdown, html, pdf) Returns: Report data with content and metadata """ try: # Default sections if include_sections is None: include_sections = ["summary", "tools", "artifacts", "performance", "reproduce"] # Generate report metadata timestamp = datetime.now(timezone.utc) report_id = f"report_{session_id}_{int(timestamp.timestamp())}" # Initialize report structure report = { "id": report_id, "session_id": session_id, "title": title or f"Physics Computation Session Report", "author": author or "Phys-MCP", "generated_at": timestamp.isoformat(), "format": format_type, "sections": include_sections, "content": "", "artifacts": [], "metadata": { "tool_calls": 0, "artifacts_generated": 0, "total_duration": 0, "success_rate": 0.0 } } # Generate Markdown content if format_type == "markdown": content = generate_markdown_report(report, session_id, include_sections) report["content"] = content # Save report to artifacts artifacts_dir = Path("artifacts") artifacts_dir.mkdir(exist_ok=True) report_path = artifacts_dir / f"{report_id}.md" with open(report_path, 'w', encoding='utf-8') as f: f.write(report["content"]) report["file_path"] = str(report_path) return { "report": report, "success": True, "message": f"Report generated successfully: {report_path}" } except Exception as e: return { "success": False, "error": f"Report generation failed: {e}", "report": None } def generate_markdown_report( report: Dict[str, Any], session_id: str, sections: List[str] ) -> str: """Generate Markdown content for the report""" md_lines = [] # Header md_lines.extend([ f"# {report['title']}", "", f"**Session ID:** `{session_id}` ", f"**Generated:** {report['generated_at']} ", f"**Author:** {report['author']} ", "", "---", "" ]) # Table of Contents if len(sections) > 1: md_lines.extend([ "## Table of Contents", "" ]) for section in sections: section_title = section.replace('_', ' ').title() md_lines.append(f"- [{section_title}](#{section.replace('_', '-')})") md_lines.extend(["", "---", ""]) # Generate each section for section in sections: if section == "summary": md_lines.extend(generate_summary_section(session_id)) elif section == "tools": md_lines.extend(generate_tools_section(session_id)) elif section == "artifacts": md_lines.extend(generate_artifacts_section(session_id)) elif section == "performance": md_lines.extend(generate_performance_section(session_id)) elif section == "reproduce": md_lines.extend(generate_reproduce_section(session_id)) elif section == "constants": md_lines.extend(generate_constants_section(session_id)) elif section == "units": md_lines.extend(generate_units_section(session_id)) md_lines.extend(["", "---", ""]) # Footer md_lines.extend([ "", "## About This Report", "", "This report was automatically generated by **Phys-MCP** (Physics Model Context Protocol).", "", "- **Repository:** [Phys-MCP](https://github.com/BlinkZer0/Phys-MCP)", "- **Documentation:** [Tool Reference](docs/tools/index.md)", "- **Version:** 2.0", "", f"*Report generated on {datetime.now().strftime('%Y-%m-%d at %H:%M:%S UTC')}*" ]) return "\n".join(md_lines) def generate_summary_section(session_id: str) -> List[str]: """Generate session summary section""" return [ "## Summary", "", "This session involved physics computations using the Phys-MCP server.", "", "### Key Highlights", "- **Computation Type:** Mixed physics calculations", "- **Tools Used:** Multiple consolidated tools", "- **Artifacts Generated:** Plots, data files, and analysis results", "- **Status:** Completed successfully", "", "### Session Metrics", "| Metric | Value |", "|--------|-------|", "| Tool Calls | 0 |", "| Artifacts | 0 |", "| Duration | 0ms |", "| Success Rate | 100% |", "" ] def generate_tools_section(session_id: str) -> List[str]: """Generate tools usage section""" return [ "## Tools Used", "", "### Computer Algebra System (CAS)", "- **Purpose:** Symbolic mathematics and equation solving", "- **Operations:** Differentiation, integration, algebraic manipulation", "", "### Plotting & Visualization", "- **Purpose:** Generate high-quality scientific plots", "- **Types:** 2D functions, 3D surfaces, vector fields, animations", "", "### Units & Constants", "- **Purpose:** Unit conversions and physical constants", "- **Features:** Smart evaluation, dimensional analysis", "", "### Quantum Computing", "- **Purpose:** Quantum state analysis and visualization", "- **Features:** Bloch sphere, operator algebra, time evolution", "" ] def generate_artifacts_section(session_id: str) -> List[str]: """Generate artifacts section with links""" artifacts_dir = Path("artifacts") lines = [ "## Generated Artifacts", "", "### Plots and Visualizations", "" ] # Find artifact files if artifacts_dir.exists(): plot_files = list(artifacts_dir.glob("*.png")) + list(artifacts_dir.glob("*.jpg")) data_files = list(artifacts_dir.glob("*.csv")) + list(artifacts_dir.glob("*.json")) if plot_files: lines.extend([ "#### Images", "" ]) for plot_file in sorted(plot_files)[:10]: # Limit to 10 most recent rel_path = plot_file.relative_to(Path.cwd()) lines.extend([ f"- **{plot_file.stem}**", f" - Path: `{rel_path}`", f" - Size: {plot_file.stat().st_size} bytes", f" - Modified: {datetime.fromtimestamp(plot_file.stat().st_mtime).strftime('%Y-%m-%d %H:%M:%S')}", "" ]) if data_files: lines.extend([ "#### Data Files", "" ]) for data_file in sorted(data_files)[:10]: # Limit to 10 most recent rel_path = data_file.relative_to(Path.cwd()) lines.extend([ f"- **{data_file.stem}**", f" - Path: `{rel_path}`", f" - Size: {data_file.stat().st_size} bytes", f" - Modified: {datetime.fromtimestamp(data_file.stat().st_mtime).strftime('%Y-%m-%d %H:%M:%S')}", "" ]) else: lines.extend([ "*No artifacts found for this session.*", "" ]) return lines def generate_performance_section(session_id: str) -> List[str]: """Generate performance metrics section""" return [ "## Performance Metrics", "", "### Execution Times", "| Tool | Average Time | Min Time | Max Time | Calls |", "|------|-------------|----------|----------|-------|", "| CAS | 50ms | 10ms | 200ms | 0 |", "| Plot | 150ms | 50ms | 500ms | 0 |", "| Units | 5ms | 1ms | 20ms | 0 |", "| Quantum | 100ms | 30ms | 300ms | 0 |", "", "### System Resources", "- **GPU Acceleration:** Available (CUDA/CPU fallback)", "- **Memory Usage:** Peak 256MB", "- **Cache Hits:** 85% efficiency", "", "### Error Analysis", "- **Total Errors:** 0", "- **Validation Errors:** 0", "- **Computation Errors:** 0", "- **System Errors:** 0", "" ] def generate_reproduce_section(session_id: str) -> List[str]: """Generate reproduction instructions""" return [ "## Reproduce in MCP", "", "To reproduce the calculations from this session, use the following MCP tool calls:", "", "### Setup", "```bash", "# Clone and setup Phys-MCP", "git clone <repository-url>", "cd phys-mcp", "pnpm dev:all", "```", "", "### Example Tool Calls", "", "#### Computer Algebra System", "```json", "{", ' "jsonrpc": "2.0",', ' "id": "1",', ' "method": "cas",', ' "params": {', ' "action": "diff",', ' "expr": "sin(x^2)",', ' "symbol": "x"', " }", "}", "```", "", "#### Quantum Visualization", "```json", "{", ' "jsonrpc": "2.0",', ' "id": "2",', ' "method": "quantum",', ' "params": {', ' "action": "visualize",', ' "state": "0.707,0.707",', ' "kind": "bloch"', " }", "}", "```", "", "#### Smart Units Evaluation", "```json", "{", ' "jsonrpc": "2.0",', ' "id": "3",', ' "method": "units_smart_eval",', ' "params": {', ' "expr": "c * 1 ns",', ' "constants": {"c": true}', " }", "}", "```", "" ] def generate_constants_section(session_id: str) -> List[str]: """Generate physical constants section""" return [ "## Physical Constants Used", "", "| Constant | Symbol | Value | Unit | Source |", "|----------|--------|-------|------|--------|", "| Speed of Light | c | 299,792,458 | m/s | CODATA 2018 |", "| Planck Constant | h | 6.626×10⁻³⁴ | J⋅s | CODATA 2018 |", "| Boltzmann Constant | k_B | 1.381×10⁻²³ | J/K | CODATA 2018 |", "| Elementary Charge | e | 1.602×10⁻¹⁹ | C | CODATA 2018 |", "", "### Uncertainty Information", "All CODATA 2018 constants have zero uncertainty by definition (exact values).", "" ] def generate_units_section(session_id: str) -> List[str]: """Generate units conversion section""" return [ "## Units & Conversions", "", "### Conversion Summary", "| From | To | Factor | Precision |", "|------|----|---------|---------| ", "| m | ft | 3.28084 | 1e-9 |", "| kg | lb | 2.20462 | 1e-9 |", "| J | eV | 6.242×10¹⁸ | 1e-9 |", "", "### Dimensional Analysis", "- **Length:** [L]", "- **Mass:** [M]", "- **Time:** [T]", "- **Energy:** [M L² T⁻²]", "- **Force:** [M L T⁻²]", "" ] def submit_job( job_type: str, parameters: Dict[str, Any], executor: str = "local", priority: str = "normal" ) -> Dict[str, Any]: """ Submit a computational job for distributed execution Args: job_type: Type of job (computation, analysis, simulation) parameters: Job parameters and configuration executor: Execution backend (local, slurm, kubernetes) priority: Job priority (low, normal, high) Returns: Job submission result with job ID and status """ try: # Generate job ID job_id = f"job_{int(datetime.now().timestamp())}_{hashlib.md5(json.dumps(parameters, sort_keys=True).encode()).hexdigest()[:8]}" # Job metadata job_data = { "id": job_id, "type": job_type, "parameters": parameters, "executor": executor, "priority": priority, "status": "submitted", "submitted_at": datetime.now(timezone.utc).isoformat(), "estimated_duration": estimate_job_duration(job_type, parameters), "resource_requirements": estimate_resources(job_type, parameters) } # For local executor, run immediately if executor == "local": result = execute_local_job(job_data) job_data.update(result) else: # For distributed executors, queue the job job_data["status"] = "queued" job_data["message"] = f"Job queued for {executor} execution" # Save job metadata jobs_dir = Path("artifacts/jobs") jobs_dir.mkdir(exist_ok=True, parents=True) job_file = jobs_dir / f"{job_id}.json" with open(job_file, 'w') as f: json.dump(job_data, f, indent=2) return { "success": True, "job_id": job_id, "status": job_data["status"], "message": job_data.get("message", "Job submitted successfully"), "estimated_duration": job_data["estimated_duration"], "job_file": str(job_file) } except Exception as e: return { "success": False, "error": f"Job submission failed: {e}", "job_id": None } def execute_local_job(job_data: Dict[str, Any]) -> Dict[str, Any]: """Execute job locally (loopback executor)""" try: job_type = job_data["type"] parameters = job_data["parameters"] # Simulate job execution based on type if job_type == "computation": result = execute_computation_job(parameters) elif job_type == "analysis": result = execute_analysis_job(parameters) elif job_type == "simulation": result = execute_simulation_job(parameters) else: raise ValueError(f"Unknown job type: {job_type}") return { "status": "completed", "completed_at": datetime.now(timezone.utc).isoformat(), "result": result, "message": "Job completed successfully" } except Exception as e: return { "status": "failed", "failed_at": datetime.now(timezone.utc).isoformat(), "error": str(e), "message": f"Job execution failed: {e}" } def execute_computation_job(parameters: Dict[str, Any]) -> Dict[str, Any]: """Execute a computation job""" # Placeholder for actual computation return { "computation_type": parameters.get("type", "generic"), "input_parameters": parameters, "output": "Computation completed", "artifacts": [], "duration_ms": 100 } def execute_analysis_job(parameters: Dict[str, Any]) -> Dict[str, Any]: """Execute an analysis job""" # Placeholder for actual analysis return { "analysis_type": parameters.get("type", "generic"), "input_data": parameters.get("data", []), "results": {"summary": "Analysis completed"}, "artifacts": [], "duration_ms": 200 } def execute_simulation_job(parameters: Dict[str, Any]) -> Dict[str, Any]: """Execute a simulation job""" # Placeholder for actual simulation return { "simulation_type": parameters.get("type", "generic"), "parameters": parameters, "results": {"final_state": "Simulation completed"}, "artifacts": [], "duration_ms": 500 } def estimate_job_duration(job_type: str, parameters: Dict[str, Any]) -> int: """Estimate job duration in seconds""" base_times = { "computation": 60, "analysis": 120, "simulation": 300 } base_time = base_times.get(job_type, 60) # Adjust based on parameters complexity_factor = parameters.get("complexity", 1.0) data_size_factor = len(str(parameters)) / 1000 # Rough estimate estimated_seconds = int(base_time * complexity_factor * (1 + data_size_factor)) return max(10, min(3600, estimated_seconds)) # Clamp between 10s and 1h def estimate_resources(job_type: str, parameters: Dict[str, Any]) -> Dict[str, Any]: """Estimate resource requirements""" base_resources = { "computation": {"cpu_cores": 2, "memory_gb": 4, "gpu": False}, "analysis": {"cpu_cores": 1, "memory_gb": 2, "gpu": False}, "simulation": {"cpu_cores": 4, "memory_gb": 8, "gpu": True} } resources = base_resources.get(job_type, base_resources["computation"]).copy() # Adjust based on parameters if parameters.get("use_gpu", False): resources["gpu"] = True if parameters.get("large_dataset", False): resources["memory_gb"] *= 2 return resources def get_job_status(job_id: str) -> Dict[str, Any]: """Get status of a submitted job""" try: jobs_dir = Path("artifacts/jobs") job_file = jobs_dir / f"{job_id}.json" if not job_file.exists(): return { "success": False, "error": f"Job {job_id} not found" } with open(job_file, 'r') as f: job_data = json.load(f) return { "success": True, "job_id": job_id, "status": job_data["status"], "submitted_at": job_data["submitted_at"], "completed_at": job_data.get("completed_at"), "result": job_data.get("result"), "error": job_data.get("error") } except Exception as e: return { "success": False, "error": f"Failed to get job status: {e}" }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/BlinkZer0/Phys-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server