PyGithub MCP Server
by AstroMined
- scripts
- coverage
"""
Test collection and execution with coverage.
This module handles collecting pytest tests and running them with coverage.
"""
import os
import re
import sys
import json
import subprocess
from typing import List, Dict, Tuple, Any
from datetime import datetime
from .models import TestFailure
def get_file_from_nodeid(nodeid: str) -> str:
"""Extract file path from pytest nodeid."""
return nodeid.split("::")[0] if "::" in nodeid else nodeid
def get_line_from_location(location: str) -> int:
"""Extract line number from pytest location string."""
try:
return int(location.split(":")[-1]) if ":" in location else 0
except ValueError:
return 0
def collect_tests(include_integration: bool = False) -> List[str]:
"""
Collect all test nodeids.
Args:
include_integration: Whether to include integration tests
Returns:
List of test nodeids
"""
print("Collecting tests... ")
# First collect all tests to get a count
collect_cmd = ["python", "-m", "pytest", "tests", "--collect-only", "-q"]
if include_integration:
collect_cmd.append("--run-integration")
else:
collect_cmd.extend(["-m", "not integration"])
print(f"Debug: Collection command: {' '.join(collect_cmd)}")
try:
result = subprocess.run(collect_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
# Show collection output for debugging
print(f"Collection output: {result.stdout.strip()}")
if result.stderr:
print(f"Collection stderr: {result.stderr.strip()}")
# Each line in the output should be a test ID in the quiet mode
test_list = [line.strip() for line in result.stdout.splitlines() if line.strip()]
# Count total tests
total_tests = len(test_list)
# Print the test count for info
print(f"Collected {total_tests} Tests")
return test_list
except Exception as e:
print(f"Error during test collection: {e}")
return []
def group_tests_by_module(test_list: List[str]) -> Dict[str, Dict]:
"""
Group a list of tests by their module.
Args:
test_list: List of test nodeids
Returns:
Dictionary mapping module names to test information
"""
modules = {}
for test_path in test_list:
if not test_path:
continue
# Extract the module part of the test path
parts = test_path.split('::')
if len(parts) > 0:
# Get module path - categorize by directory and file
test_path = parts[0]
# Extract the category (e.g., unit/errors, integration/client)
if '/' in test_path:
# Extract test category from path like "tests/unit/errors/test_handlers.py"
path_parts = test_path.split('/')
if len(path_parts) >= 3:
# For example: "unit/errors" or "integration/client"
module_key = f"{path_parts[1]}/{path_parts[2]}"
# Add human readable name for the module
module_name = f"{path_parts[1].capitalize()} {path_parts[2].capitalize()}"
# Add to modules dictionary with count
if module_key not in modules:
modules[module_key] = {
"name": module_name,
"tests": [],
}
modules[module_key]["tests"].append(test_path)
# Add counts to each module
for key in modules:
modules[key]["count"] = len(modules[key]["tests"])
return modules
def run_module_tests(module_name: str, test_files: List[str], package_path: str, include_integration: bool) -> Tuple[str, List[TestFailure], int]:
"""
Run tests for a specific module with coverage.
Args:
module_name: Human-readable name of the module
test_files: List of test files to run
package_path: Path to the package to measure coverage for
include_integration: Whether to run integration tests
Returns:
Tuple of (output, failures, test_count)
"""
print(f"Running {module_name} Tests ({len(test_files)})")
# Build the test command for this module
module_cmd = [
"python", "-m", "pytest",
f"--cov={package_path}",
"--cov-append", # Append to coverage data for each module
"--json-report",
f"--json-report-file=test_results.json",
]
# Add test files to the command
module_cmd.extend(test_files)
if include_integration:
module_cmd.append("--run-integration")
print(f"Debug: Running module command: {' '.join(module_cmd)}")
# Run tests for this module
start_time = datetime.now()
result = subprocess.run(
module_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
end_time = datetime.now()
elapsed = (end_time - start_time).total_seconds()
return_code = result.returncode
print(f"Module tests completed in {elapsed:.1f} seconds (Return code: {return_code})")
# If output is too long, truncate it
output_sample = (result.stdout + result.stderr)
if len(output_sample) > 500:
output_sample = output_sample[:500] + "... [truncated]"
if return_code != 0 and return_code != 5: # 5 is test failures
print(f"Warning: Module tests returned non-zero exit code. Output sample:\n{output_sample}")
# Parse failures for this module
failures = []
try:
if os.path.exists("test_results.json"):
with open("test_results.json") as f:
test_results = json.load(f)
for test in test_results.get("tests", []):
if test.get("outcome") != "passed":
# Create failure record
failure = TestFailure(
name=test.get("nodeid", "Unknown"),
outcome=test.get("outcome", "error"),
message=test.get("call", {}).get("longrepr", "No details available"),
duration=test.get("duration", 0.0),
file=get_file_from_nodeid(test.get("nodeid", "")),
line=get_line_from_location(test.get("location", ""))
)
failures.append(failure)
if failures:
print(f"Found {len(failures)} test failures in this module")
except Exception as e:
print(f"Error parsing test failures for module {module_name}: {e}")
return output_sample, failures, len(test_files)
def run_coverage(package_path: str = "src/pygithub_mcp_server", include_integration: bool = False) -> Tuple[str, List[TestFailure]]:
"""
Run pytest with coverage and return the output and test failures.
Args:
package_path: Path to the package to measure coverage for
include_integration: Whether to run integration tests
Returns:
Tuple of (coverage_output, test_failures)
"""
# Install required dependencies if needed
subprocess.run(["python", "-m", "pip", "install", "--quiet", "--upgrade",
"pytest", "pytest-cov", "pytest-json-report"],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Clear any existing coverage data
if os.path.exists(".coverage"):
os.remove(".coverage")
# Collect tests
test_list = collect_tests(include_integration)
if not test_list:
print("No tests found or test collection failed!")
return "", []
total_tests = len(test_list)
print(f"Collected {total_tests} Tests")
# Group tests by module
modules = group_tests_by_module(test_list)
if not modules:
print("Warning: Could not group tests by module. Running all tests at once.")
# Fallback to running all tests at once
all_cmd = [
"python", "-m", "pytest",
"tests/",
f"--cov={package_path}",
"--cov-report=term", # Don't use term-missing to avoid stdout clutter
"--json-report",
f"--json-report-file=test_results.json",
]
if include_integration:
all_cmd.append("--run-integration")
print(f"Debug: Running fallback command: {' '.join(all_cmd)}")
result = subprocess.run(
all_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# Print return code to help debug issues
print(f"Return code: {result.returncode}")
# If there's an error, show a sample of the output to help diagnose
if result.returncode != 0:
output_sample = result.stdout + result.stderr
if len(output_sample) > 500:
output_sample = output_sample[:500] + "... [truncated]"
print(f"Command output sample:\n{output_sample}")
# Generate coverage report
cov_report = subprocess.run(
["python", "-m", "coverage", "report", f"--include={package_path}/*"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# Parse failures
failures = []
try:
if os.path.exists("test_results.json"):
with open("test_results.json") as f:
test_results = json.load(f)
for test in test_results.get("tests", []):
if test.get("outcome") != "passed":
# Create failure record
failure = TestFailure(
name=test.get("nodeid", "Unknown"),
outcome=test.get("outcome", "error"),
message=test.get("call", {}).get("longrepr", "No details available"),
duration=test.get("duration", 0.0),
file=get_file_from_nodeid(test.get("nodeid", "")),
line=get_line_from_location(test.get("location", ""))
)
failures.append(failure)
if failures:
print(f"Found {len(failures)} test failures")
except Exception as e:
print(f"Error parsing test failures: {e}")
return cov_report.stdout, failures
# Run tests module by module with coverage
all_output = ""
all_failures = []
tests_completed = 0
for module_key, module_info in modules.items():
module_name = module_info["name"]
test_list = module_info["tests"]
# Run tests for this module
output, failures, test_count = run_module_tests(
module_name,
test_list,
package_path,
include_integration
)
# Add failures to the list
all_failures.extend(failures)
# Update progress
tests_completed += test_count
print(f"{tests_completed} of {total_tests} Tests Complete")
# Generate a final coverage report
print("Generating final coverage report...")
cov_report_cmd = [
"python", "-m", "coverage", "report",
f"--include={package_path}/*"
]
final_result = subprocess.run(
cov_report_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
all_output = final_result.stdout
print(f"All tests completed. Total failures: {len(all_failures)}")
return all_output, all_failures