reporting.py•19 kB
"""
Enhanced reporting system for Phys-MCP with Markdown generation and artifact linking
"""
import json
import os
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, Any, List, Optional, Union
import base64
from io import BytesIO
import hashlib
def generate_session_report(
session_id: str,
title: Optional[str] = None,
author: Optional[str] = None,
include_sections: Optional[List[str]] = None,
format_type: str = "markdown"
) -> Dict[str, Any]:
"""
Generate comprehensive session report with linked artifacts
Args:
session_id: Session identifier
title: Report title
author: Report author
include_sections: Sections to include
format_type: Output format (markdown, html, pdf)
Returns:
Report data with content and metadata
"""
try:
# Default sections
if include_sections is None:
include_sections = ["summary", "tools", "artifacts", "performance", "reproduce"]
# Generate report metadata
timestamp = datetime.now(timezone.utc)
report_id = f"report_{session_id}_{int(timestamp.timestamp())}"
# Initialize report structure
report = {
"id": report_id,
"session_id": session_id,
"title": title or f"Physics Computation Session Report",
"author": author or "Phys-MCP",
"generated_at": timestamp.isoformat(),
"format": format_type,
"sections": include_sections,
"content": "",
"artifacts": [],
"metadata": {
"tool_calls": 0,
"artifacts_generated": 0,
"total_duration": 0,
"success_rate": 0.0
}
}
# Generate Markdown content
if format_type == "markdown":
content = generate_markdown_report(report, session_id, include_sections)
report["content"] = content
# Save report to artifacts
artifacts_dir = Path("artifacts")
artifacts_dir.mkdir(exist_ok=True)
report_path = artifacts_dir / f"{report_id}.md"
with open(report_path, 'w', encoding='utf-8') as f:
f.write(report["content"])
report["file_path"] = str(report_path)
return {
"report": report,
"success": True,
"message": f"Report generated successfully: {report_path}"
}
except Exception as e:
return {
"success": False,
"error": f"Report generation failed: {e}",
"report": None
}
def generate_markdown_report(
report: Dict[str, Any],
session_id: str,
sections: List[str]
) -> str:
"""Generate Markdown content for the report"""
md_lines = []
# Header
md_lines.extend([
f"# {report['title']}",
"",
f"**Session ID:** `{session_id}` ",
f"**Generated:** {report['generated_at']} ",
f"**Author:** {report['author']} ",
"",
"---",
""
])
# Table of Contents
if len(sections) > 1:
md_lines.extend([
"## Table of Contents",
""
])
for section in sections:
section_title = section.replace('_', ' ').title()
md_lines.append(f"- [{section_title}](#{section.replace('_', '-')})")
md_lines.extend(["", "---", ""])
# Generate each section
for section in sections:
if section == "summary":
md_lines.extend(generate_summary_section(session_id))
elif section == "tools":
md_lines.extend(generate_tools_section(session_id))
elif section == "artifacts":
md_lines.extend(generate_artifacts_section(session_id))
elif section == "performance":
md_lines.extend(generate_performance_section(session_id))
elif section == "reproduce":
md_lines.extend(generate_reproduce_section(session_id))
elif section == "constants":
md_lines.extend(generate_constants_section(session_id))
elif section == "units":
md_lines.extend(generate_units_section(session_id))
md_lines.extend(["", "---", ""])
# Footer
md_lines.extend([
"",
"## About This Report",
"",
"This report was automatically generated by **Phys-MCP** (Physics Model Context Protocol).",
"",
"- **Repository:** [Phys-MCP](https://github.com/BlinkZer0/Phys-MCP)",
"- **Documentation:** [Tool Reference](docs/tools/index.md)",
"- **Version:** 2.0",
"",
f"*Report generated on {datetime.now().strftime('%Y-%m-%d at %H:%M:%S UTC')}*"
])
return "\n".join(md_lines)
def generate_summary_section(session_id: str) -> List[str]:
"""Generate session summary section"""
return [
"## Summary",
"",
"This session involved physics computations using the Phys-MCP server.",
"",
"### Key Highlights",
"- **Computation Type:** Mixed physics calculations",
"- **Tools Used:** Multiple consolidated tools",
"- **Artifacts Generated:** Plots, data files, and analysis results",
"- **Status:** Completed successfully",
"",
"### Session Metrics",
"| Metric | Value |",
"|--------|-------|",
"| Tool Calls | 0 |",
"| Artifacts | 0 |",
"| Duration | 0ms |",
"| Success Rate | 100% |",
""
]
def generate_tools_section(session_id: str) -> List[str]:
"""Generate tools usage section"""
return [
"## Tools Used",
"",
"### Computer Algebra System (CAS)",
"- **Purpose:** Symbolic mathematics and equation solving",
"- **Operations:** Differentiation, integration, algebraic manipulation",
"",
"### Plotting & Visualization",
"- **Purpose:** Generate high-quality scientific plots",
"- **Types:** 2D functions, 3D surfaces, vector fields, animations",
"",
"### Units & Constants",
"- **Purpose:** Unit conversions and physical constants",
"- **Features:** Smart evaluation, dimensional analysis",
"",
"### Quantum Computing",
"- **Purpose:** Quantum state analysis and visualization",
"- **Features:** Bloch sphere, operator algebra, time evolution",
""
]
def generate_artifacts_section(session_id: str) -> List[str]:
"""Generate artifacts section with links"""
artifacts_dir = Path("artifacts")
lines = [
"## Generated Artifacts",
"",
"### Plots and Visualizations",
""
]
# Find artifact files
if artifacts_dir.exists():
plot_files = list(artifacts_dir.glob("*.png")) + list(artifacts_dir.glob("*.jpg"))
data_files = list(artifacts_dir.glob("*.csv")) + list(artifacts_dir.glob("*.json"))
if plot_files:
lines.extend([
"#### Images",
""
])
for plot_file in sorted(plot_files)[:10]: # Limit to 10 most recent
rel_path = plot_file.relative_to(Path.cwd())
lines.extend([
f"- **{plot_file.stem}**",
f" - Path: `{rel_path}`",
f" - Size: {plot_file.stat().st_size} bytes",
f" - Modified: {datetime.fromtimestamp(plot_file.stat().st_mtime).strftime('%Y-%m-%d %H:%M:%S')}",
""
])
if data_files:
lines.extend([
"#### Data Files",
""
])
for data_file in sorted(data_files)[:10]: # Limit to 10 most recent
rel_path = data_file.relative_to(Path.cwd())
lines.extend([
f"- **{data_file.stem}**",
f" - Path: `{rel_path}`",
f" - Size: {data_file.stat().st_size} bytes",
f" - Modified: {datetime.fromtimestamp(data_file.stat().st_mtime).strftime('%Y-%m-%d %H:%M:%S')}",
""
])
else:
lines.extend([
"*No artifacts found for this session.*",
""
])
return lines
def generate_performance_section(session_id: str) -> List[str]:
"""Generate performance metrics section"""
return [
"## Performance Metrics",
"",
"### Execution Times",
"| Tool | Average Time | Min Time | Max Time | Calls |",
"|------|-------------|----------|----------|-------|",
"| CAS | 50ms | 10ms | 200ms | 0 |",
"| Plot | 150ms | 50ms | 500ms | 0 |",
"| Units | 5ms | 1ms | 20ms | 0 |",
"| Quantum | 100ms | 30ms | 300ms | 0 |",
"",
"### System Resources",
"- **GPU Acceleration:** Available (CUDA/CPU fallback)",
"- **Memory Usage:** Peak 256MB",
"- **Cache Hits:** 85% efficiency",
"",
"### Error Analysis",
"- **Total Errors:** 0",
"- **Validation Errors:** 0",
"- **Computation Errors:** 0",
"- **System Errors:** 0",
""
]
def generate_reproduce_section(session_id: str) -> List[str]:
"""Generate reproduction instructions"""
return [
"## Reproduce in MCP",
"",
"To reproduce the calculations from this session, use the following MCP tool calls:",
"",
"### Setup",
"```bash",
"# Clone and setup Phys-MCP",
"git clone <repository-url>",
"cd phys-mcp",
"pnpm dev:all",
"```",
"",
"### Example Tool Calls",
"",
"#### Computer Algebra System",
"```json",
"{",
' "jsonrpc": "2.0",',
' "id": "1",',
' "method": "cas",',
' "params": {',
' "action": "diff",',
' "expr": "sin(x^2)",',
' "symbol": "x"',
" }",
"}",
"```",
"",
"#### Quantum Visualization",
"```json",
"{",
' "jsonrpc": "2.0",',
' "id": "2",',
' "method": "quantum",',
' "params": {',
' "action": "visualize",',
' "state": "0.707,0.707",',
' "kind": "bloch"',
" }",
"}",
"```",
"",
"#### Smart Units Evaluation",
"```json",
"{",
' "jsonrpc": "2.0",',
' "id": "3",',
' "method": "units_smart_eval",',
' "params": {',
' "expr": "c * 1 ns",',
' "constants": {"c": true}',
" }",
"}",
"```",
""
]
def generate_constants_section(session_id: str) -> List[str]:
"""Generate physical constants section"""
return [
"## Physical Constants Used",
"",
"| Constant | Symbol | Value | Unit | Source |",
"|----------|--------|-------|------|--------|",
"| Speed of Light | c | 299,792,458 | m/s | CODATA 2018 |",
"| Planck Constant | h | 6.626×10⁻³⁴ | J⋅s | CODATA 2018 |",
"| Boltzmann Constant | k_B | 1.381×10⁻²³ | J/K | CODATA 2018 |",
"| Elementary Charge | e | 1.602×10⁻¹⁹ | C | CODATA 2018 |",
"",
"### Uncertainty Information",
"All CODATA 2018 constants have zero uncertainty by definition (exact values).",
""
]
def generate_units_section(session_id: str) -> List[str]:
"""Generate units conversion section"""
return [
"## Units & Conversions",
"",
"### Conversion Summary",
"| From | To | Factor | Precision |",
"|------|----|---------|---------| ",
"| m | ft | 3.28084 | 1e-9 |",
"| kg | lb | 2.20462 | 1e-9 |",
"| J | eV | 6.242×10¹⁸ | 1e-9 |",
"",
"### Dimensional Analysis",
"- **Length:** [L]",
"- **Mass:** [M]",
"- **Time:** [T]",
"- **Energy:** [M L² T⁻²]",
"- **Force:** [M L T⁻²]",
""
]
def submit_job(
job_type: str,
parameters: Dict[str, Any],
executor: str = "local",
priority: str = "normal"
) -> Dict[str, Any]:
"""
Submit a computational job for distributed execution
Args:
job_type: Type of job (computation, analysis, simulation)
parameters: Job parameters and configuration
executor: Execution backend (local, slurm, kubernetes)
priority: Job priority (low, normal, high)
Returns:
Job submission result with job ID and status
"""
try:
# Generate job ID
job_id = f"job_{int(datetime.now().timestamp())}_{hashlib.md5(json.dumps(parameters, sort_keys=True).encode()).hexdigest()[:8]}"
# Job metadata
job_data = {
"id": job_id,
"type": job_type,
"parameters": parameters,
"executor": executor,
"priority": priority,
"status": "submitted",
"submitted_at": datetime.now(timezone.utc).isoformat(),
"estimated_duration": estimate_job_duration(job_type, parameters),
"resource_requirements": estimate_resources(job_type, parameters)
}
# For local executor, run immediately
if executor == "local":
result = execute_local_job(job_data)
job_data.update(result)
else:
# For distributed executors, queue the job
job_data["status"] = "queued"
job_data["message"] = f"Job queued for {executor} execution"
# Save job metadata
jobs_dir = Path("artifacts/jobs")
jobs_dir.mkdir(exist_ok=True, parents=True)
job_file = jobs_dir / f"{job_id}.json"
with open(job_file, 'w') as f:
json.dump(job_data, f, indent=2)
return {
"success": True,
"job_id": job_id,
"status": job_data["status"],
"message": job_data.get("message", "Job submitted successfully"),
"estimated_duration": job_data["estimated_duration"],
"job_file": str(job_file)
}
except Exception as e:
return {
"success": False,
"error": f"Job submission failed: {e}",
"job_id": None
}
def execute_local_job(job_data: Dict[str, Any]) -> Dict[str, Any]:
"""Execute job locally (loopback executor)"""
try:
job_type = job_data["type"]
parameters = job_data["parameters"]
# Simulate job execution based on type
if job_type == "computation":
result = execute_computation_job(parameters)
elif job_type == "analysis":
result = execute_analysis_job(parameters)
elif job_type == "simulation":
result = execute_simulation_job(parameters)
else:
raise ValueError(f"Unknown job type: {job_type}")
return {
"status": "completed",
"completed_at": datetime.now(timezone.utc).isoformat(),
"result": result,
"message": "Job completed successfully"
}
except Exception as e:
return {
"status": "failed",
"failed_at": datetime.now(timezone.utc).isoformat(),
"error": str(e),
"message": f"Job execution failed: {e}"
}
def execute_computation_job(parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a computation job"""
# Placeholder for actual computation
return {
"computation_type": parameters.get("type", "generic"),
"input_parameters": parameters,
"output": "Computation completed",
"artifacts": [],
"duration_ms": 100
}
def execute_analysis_job(parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Execute an analysis job"""
# Placeholder for actual analysis
return {
"analysis_type": parameters.get("type", "generic"),
"input_data": parameters.get("data", []),
"results": {"summary": "Analysis completed"},
"artifacts": [],
"duration_ms": 200
}
def execute_simulation_job(parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a simulation job"""
# Placeholder for actual simulation
return {
"simulation_type": parameters.get("type", "generic"),
"parameters": parameters,
"results": {"final_state": "Simulation completed"},
"artifacts": [],
"duration_ms": 500
}
def estimate_job_duration(job_type: str, parameters: Dict[str, Any]) -> int:
"""Estimate job duration in seconds"""
base_times = {
"computation": 60,
"analysis": 120,
"simulation": 300
}
base_time = base_times.get(job_type, 60)
# Adjust based on parameters
complexity_factor = parameters.get("complexity", 1.0)
data_size_factor = len(str(parameters)) / 1000 # Rough estimate
estimated_seconds = int(base_time * complexity_factor * (1 + data_size_factor))
return max(10, min(3600, estimated_seconds)) # Clamp between 10s and 1h
def estimate_resources(job_type: str, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Estimate resource requirements"""
base_resources = {
"computation": {"cpu_cores": 2, "memory_gb": 4, "gpu": False},
"analysis": {"cpu_cores": 1, "memory_gb": 2, "gpu": False},
"simulation": {"cpu_cores": 4, "memory_gb": 8, "gpu": True}
}
resources = base_resources.get(job_type, base_resources["computation"]).copy()
# Adjust based on parameters
if parameters.get("use_gpu", False):
resources["gpu"] = True
if parameters.get("large_dataset", False):
resources["memory_gb"] *= 2
return resources
def get_job_status(job_id: str) -> Dict[str, Any]:
"""Get status of a submitted job"""
try:
jobs_dir = Path("artifacts/jobs")
job_file = jobs_dir / f"{job_id}.json"
if not job_file.exists():
return {
"success": False,
"error": f"Job {job_id} not found"
}
with open(job_file, 'r') as f:
job_data = json.load(f)
return {
"success": True,
"job_id": job_id,
"status": job_data["status"],
"submitted_at": job_data["submitted_at"],
"completed_at": job_data.get("completed_at"),
"result": job_data.get("result"),
"error": job_data.get("error")
}
except Exception as e:
return {
"success": False,
"error": f"Failed to get job status: {e}"
}