Skip to main content
Glama

CodeAnalysis MCP Server

by 0xjcf
metrics.py17.9 kB
#!/usr/bin/env python3 # MEMORY_ANCHOR: code_health_metrics """Code Health Metrics Calculator This script calculates various code health metrics including complexity, test coverage, and code churn. Maturity: beta Why: Tracking code health metrics helps identify areas of the codebase that need attention, ensuring the codebase remains maintainable and robust. Automated metrics provide an objective measure of code quality that can be tracked over time. """ import os import json import glob import subprocess import datetime import argparse import re from pathlib import Path import pandas as pd import matplotlib.pyplot as plt from radon.complexity import cc_visit from radon.metrics import h_visit from radon.raw import analyze # Define paths BASE_DIR = Path(__file__).parent.parent OUTPUT_DIR = BASE_DIR / "code_health" METRICS_FILE = OUTPUT_DIR / "metrics.json" HISTORY_FILE = OUTPUT_DIR / "history.json" VISUALIZATIONS_DIR = OUTPUT_DIR / "visualizations" def ensure_directories(): """Ensure all necessary directories exist.""" OUTPUT_DIR.mkdir(parents=True, exist_ok=True) VISUALIZATIONS_DIR.mkdir(parents=True, exist_ok=True) def get_git_files(): """Get all files tracked by git.""" try: result = subprocess.run( ["git", "ls-files"], capture_output=True, text=True, check=True ) return [file for file in result.stdout.splitlines() if os.path.exists(file)] except subprocess.CalledProcessError as e: print(f"Error getting git files: {e}") return [] def filter_files(files, extensions): """Filter files by extension.""" return [file for file in files if any(file.endswith(ext) for ext in extensions)] def calculate_complexity(file_path): """Calculate cyclomatic complexity for a file.""" try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() # Use radon to calculate complexity results = cc_visit(content) # Calculate average complexity if results: total_complexity = sum(result.complexity for result in results) avg_complexity = total_complexity / len(results) max_complexity = max(result.complexity for result in results) return { "average": avg_complexity, "maximum": max_complexity, "total": total_complexity, "functions": len(results) } return { "average": 0, "maximum": 0, "total": 0, "functions": 0 } except Exception as e: print(f"Error calculating complexity for {file_path}: {e}") return { "average": 0, "maximum": 0, "total": 0, "functions": 0 } def calculate_maintainability(file_path): """Calculate maintainability index for a file.""" try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() # Use radon to calculate maintainability result = h_visit(content) if result: return { "maintainability_index": result.mi, "rank": result.rank } return { "maintainability_index": 0, "rank": "F" } except Exception as e: print(f"Error calculating maintainability for {file_path}: {e}") return { "maintainability_index": 0, "rank": "F" } def calculate_raw_metrics(file_path): """Calculate raw metrics for a file.""" try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() # Use radon to calculate raw metrics result = analyze(content) return { "loc": result.loc, "lloc": result.lloc, "sloc": result.sloc, "comments": result.comments, "multi": result.multi, "blank": result.blank, "single_comments": result.single_comments } except Exception as e: print(f"Error calculating raw metrics for {file_path}: {e}") return { "loc": 0, "lloc": 0, "sloc": 0, "comments": 0, "multi": 0, "blank": 0, "single_comments": 0 } def calculate_test_coverage(): """Calculate test coverage using coverage.py.""" try: # Run coverage subprocess.run( ["coverage", "run", "-m", "pytest"], capture_output=True, check=False ) # Get coverage report result = subprocess.run( ["coverage", "json", "-o", "coverage.json"], capture_output=True, text=True, check=False ) # Load coverage data if os.path.exists("coverage.json"): with open("coverage.json", 'r') as f: coverage_data = json.load(f) # Clean up os.remove("coverage.json") return { "total": coverage_data.get("totals", {}).get("percent_covered", 0), "files": { file: data.get("summary", {}).get("percent_covered", 0) for file, data in coverage_data.get("files", {}).items() } } return { "total": 0, "files": {} } except Exception as e: print(f"Error calculating test coverage: {e}") return { "total": 0, "files": {} } def calculate_code_churn(): """Calculate code churn (changes over time).""" try: # Get git log with stats result = subprocess.run( ["git", "log", "--numstat", "--format=%H,%at"], capture_output=True, text=True, check=True ) # Parse git log commits = [] current_commit = None current_timestamp = None for line in result.stdout.splitlines(): if line.strip() == "": continue # Check if this is a commit line if "," in line and len(line.split(",")) == 2: if current_commit is not None: commits.append({ "hash": current_commit, "timestamp": current_timestamp, "files": [] }) parts = line.split(",") current_commit = parts[0] current_timestamp = int(parts[1]) continue # Parse file stats parts = line.split("\t") if len(parts) == 3: try: added = int(parts[0]) if parts[0] != "-" else 0 deleted = int(parts[1]) if parts[1] != "-" else 0 filename = parts[2] if current_commit is not None: commits[-1]["files"].append({ "filename": filename, "added": added, "deleted": deleted }) except ValueError: continue # Add the last commit if current_commit is not None and current_commit not in [c["hash"] for c in commits]: commits.append({ "hash": current_commit, "timestamp": current_timestamp, "files": [] }) # Calculate churn by file file_churn = {} for commit in commits: for file_data in commit["files"]: filename = file_data["filename"] if filename not in file_churn: file_churn[filename] = { "added": 0, "deleted": 0, "commits": 0, "last_modified": 0 } file_churn[filename]["added"] += file_data["added"] file_churn[filename]["deleted"] += file_data["deleted"] file_churn[filename]["commits"] += 1 file_churn[filename]["last_modified"] = max( file_churn[filename]["last_modified"], commit["timestamp"] ) # Calculate churn rate (changes per day) for filename, data in file_churn.items(): # Convert timestamp to days since epoch days_since_epoch = data["last_modified"] / (60 * 60 * 24) # Calculate churn rate (changes per day) total_changes = data["added"] + data["deleted"] data["churn_rate"] = total_changes / max(1, data["commits"]) return file_churn except Exception as e: print(f"Error calculating code churn: {e}") return {} def calculate_metrics(): """Calculate all metrics for the codebase.""" # Get all files files = get_git_files() # Filter Python files python_files = filter_files(files, [".py"]) # Calculate metrics for each file file_metrics = {} for file_path in python_files: print(f"Processing {file_path}...") file_metrics[file_path] = { "complexity": calculate_complexity(file_path), "maintainability": calculate_maintainability(file_path), "raw": calculate_raw_metrics(file_path) } # Calculate test coverage coverage = calculate_test_coverage() # Calculate code churn churn = calculate_code_churn() # Add churn and coverage to file metrics for file_path, metrics in file_metrics.items(): metrics["coverage"] = coverage.get("files", {}).get(file_path, 0) metrics["churn"] = churn.get(file_path, { "added": 0, "deleted": 0, "commits": 0, "churn_rate": 0, "last_modified": 0 }) # Calculate overall metrics overall_metrics = { "complexity": { "average": sum(m["complexity"]["average"] for m in file_metrics.values()) / max(1, len(file_metrics)), "maximum": max((m["complexity"]["maximum"] for m in file_metrics.values()), default=0) }, "maintainability": { "average": sum(m["maintainability"]["maintainability_index"] for m in file_metrics.values()) / max(1, len(file_metrics)) }, "raw": { "total_loc": sum(m["raw"]["loc"] for m in file_metrics.values()), "total_lloc": sum(m["raw"]["lloc"] for m in file_metrics.values()), "total_sloc": sum(m["raw"]["sloc"] for m in file_metrics.values()), "total_comments": sum(m["raw"]["comments"] for m in file_metrics.values()), "comment_ratio": sum(m["raw"]["comments"] for m in file_metrics.values()) / max(1, sum(m["raw"]["sloc"] for m in file_metrics.values())) }, "coverage": { "total": coverage.get("total", 0) }, "churn": { "total_added": sum(m["churn"]["added"] for m in file_metrics.values()), "total_deleted": sum(m["churn"]["deleted"] for m in file_metrics.values()), "total_commits": max((m["churn"]["commits"] for m in file_metrics.values()), default=0), "average_churn_rate": sum(m["churn"]["churn_rate"] for m in file_metrics.values()) / max(1, len(file_metrics)) } } # Create final metrics object metrics = { "timestamp": datetime.datetime.now().isoformat(), "overall": overall_metrics, "files": file_metrics } return metrics def save_metrics(metrics): """Save metrics to a file.""" with open(METRICS_FILE, 'w') as f: json.dump(metrics, f, indent=2) # Update history history = [] if os.path.exists(HISTORY_FILE): with open(HISTORY_FILE, 'r') as f: try: history = json.load(f) except json.JSONDecodeError: history = [] # Add current metrics to history history_entry = { "timestamp": metrics["timestamp"], "overall": metrics["overall"] } history.append(history_entry) # Save history with open(HISTORY_FILE, 'w') as f: json.dump(history, f, indent=2) def generate_visualizations(metrics): """Generate visualizations from metrics.""" # Create directory for visualizations os.makedirs(VISUALIZATIONS_DIR, exist_ok=True) # 1. Complexity by file complexity_data = { file_path: data["complexity"]["average"] for file_path, data in metrics["files"].items() } # Sort by complexity complexity_data = dict(sorted(complexity_data.items(), key=lambda x: x[1], reverse=True)[:20]) plt.figure(figsize=(12, 8)) plt.bar(complexity_data.keys(), complexity_data.values()) plt.xticks(rotation=90) plt.title("Average Cyclomatic Complexity by File (Top 20)") plt.tight_layout() plt.savefig(VISUALIZATIONS_DIR / "complexity.png") plt.close() # 2. Maintainability by file maintainability_data = { file_path: data["maintainability"]["maintainability_index"] for file_path, data in metrics["files"].items() } # Sort by maintainability (ascending) maintainability_data = dict(sorted(maintainability_data.items(), key=lambda x: x[1])[:20]) plt.figure(figsize=(12, 8)) plt.bar(maintainability_data.keys(), maintainability_data.values()) plt.xticks(rotation=90) plt.title("Maintainability Index by File (Bottom 20)") plt.tight_layout() plt.savefig(VISUALIZATIONS_DIR / "maintainability.png") plt.close() # 3. Code churn by file churn_data = { file_path: data["churn"]["churn_rate"] for file_path, data in metrics["files"].items() if data["churn"]["commits"] > 0 } # Sort by churn rate churn_data = dict(sorted(churn_data.items(), key=lambda x: x[1], reverse=True)[:20]) plt.figure(figsize=(12, 8)) plt.bar(churn_data.keys(), churn_data.values()) plt.xticks(rotation=90) plt.title("Code Churn Rate by File (Top 20)") plt.tight_layout() plt.savefig(VISUALIZATIONS_DIR / "churn.png") plt.close() # 4. Test coverage by file coverage_data = { file_path: data["coverage"] for file_path, data in metrics["files"].items() if data["coverage"] > 0 } # Sort by coverage (ascending) coverage_data = dict(sorted(coverage_data.items(), key=lambda x: x[1])[:20]) plt.figure(figsize=(12, 8)) plt.bar(coverage_data.keys(), coverage_data.values()) plt.xticks(rotation=90) plt.title("Test Coverage by File (Bottom 20)") plt.tight_layout() plt.savefig(VISUALIZATIONS_DIR / "coverage.png") plt.close() # 5. Metrics over time if os.path.exists(HISTORY_FILE): with open(HISTORY_FILE, 'r') as f: history = json.load(f) # Convert to DataFrame df = pd.DataFrame([ { "timestamp": entry["timestamp"], "complexity": entry["overall"]["complexity"]["average"], "maintainability": entry["overall"]["maintainability"]["average"], "coverage": entry["overall"]["coverage"]["total"], "churn_rate": entry["overall"]["churn"]["average_churn_rate"] } for entry in history ]) # Convert timestamp to datetime df["timestamp"] = pd.to_datetime(df["timestamp"]) # Sort by timestamp df = df.sort_values("timestamp") # Plot metrics over time plt.figure(figsize=(12, 8)) plt.subplot(2, 2, 1) plt.plot(df["timestamp"], df["complexity"]) plt.title("Average Complexity Over Time") plt.xticks(rotation=45) plt.subplot(2, 2, 2) plt.plot(df["timestamp"], df["maintainability"]) plt.title("Average Maintainability Over Time") plt.xticks(rotation=45) plt.subplot(2, 2, 3) plt.plot(df["timestamp"], df["coverage"]) plt.title("Test Coverage Over Time") plt.xticks(rotation=45) plt.subplot(2, 2, 4) plt.plot(df["timestamp"], df["churn_rate"]) plt.title("Average Churn Rate Over Time") plt.xticks(rotation=45) plt.tight_layout() plt.savefig(VISUALIZATIONS_DIR / "metrics_over_time.png") plt.close() def main(): parser = argparse.ArgumentParser(description="Calculate code health metrics") parser.add_argument("--visualize-only", action="store_true", help="Only generate visualizations") args = parser.parse_args() ensure_directories() if not args.visualize_only: print("Calculating metrics...") metrics = calculate_metrics() save_metrics(metrics) print(f"Metrics saved to {METRICS_FILE}") else: # Load existing metrics if os.path.exists(METRICS_FILE): with open(METRICS_FILE, 'r') as f: metrics = json.load(f) else: print(f"Metrics file {METRICS_FILE} not found. Run without --visualize-only first.") return print("Generating visualizations...") generate_visualizations(metrics) print(f"Visualizations saved to {VISUALIZATIONS_DIR}") if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/0xjcf/MCP_CodeAnalysis'

If you have feedback or need assistance with the MCP directory API, please join our Discord server