# testing_agent.py
from __future__ import annotations
import json
import os
import subprocess
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, asdict
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Any, Optional
from mcp.server.fastmcp import FastMCP
# ---------------------------------------------------------------------------
# CONFIG / CONSTANTS
# ---------------------------------------------------------------------------
# Directories to exclude from scanning
EXCLUDED_DIRS = {
"node_modules",
".git",
".next",
".turbo",
".cache",
"dist",
"build",
"coverage",
"out",
".venv",
"venv",
".mcp",
"site-packages",
"__pycache__",
".vscode",
".idea",
".pytest_cache",
".playwright",
}
# Files we generate for other agents / tooling
SCAN_SNAPSHOT_NAME = ".mcp_scan_snapshot.json"
TEST_REPORT_NAME = ".mcp_test_report.json"
COVERAGE_MAP_NAME = "TEST_COVERAGE_MAP.md"
TEST_PLAN_NAME = "TEST_PLAN_AI.md"
# Thresholds removed - tests run as long as test files exist
# Parallel execution configuration
DEFAULT_MAX_WORKERS = 4 # Default number of parallel test runners
# ---------------------------------------------------------------------------
# DATA CLASSES
# ---------------------------------------------------------------------------
@dataclass
class TestRunResult:
framework: str
command: str
exit_code: int
raw_output: str
start_time: str
end_time: str
status: str # success / failed / skipped / error
reason: Optional[str] = None
@dataclass
class LanguageDetection:
language: str
source_files: int
test_files: int
test_roots: List[str]
status: str # detected / none
note: Optional[str] = None
# ---------------------------------------------------------------------------
# DETECTION
# ---------------------------------------------------------------------------
def detect_languages(root: Path) -> Dict[str, LanguageDetection]:
"""
Walk the repo once and detect:
- Python sources + tests
- JS/TS sources + tests
- Go sources + tests
- Rust sources + tests
- Java sources + tests
- Ruby sources + tests
"""
# Initialize counters for all supported languages
python_sources = python_tests = 0
js_ts_sources = js_ts_tests = 0
go_sources = go_tests = 0
rust_sources = rust_tests = 0
java_sources = java_tests = 0
ruby_sources = ruby_tests = 0
test_roots = {
"python": set(),
"js_ts": set(),
"go": set(),
"rust": set(),
"java": set(),
"ruby": set(),
}
for dirpath, dirnames, filenames in os.walk(root):
# Remove excluded dirs in-place so os.walk does not descend into them
dirnames[:] = [d for d in dirnames if d not in EXCLUDED_DIRS]
rel_dir = Path(dirpath).relative_to(root)
for fname in filenames:
# Python
if fname.endswith(".py"):
is_test = (
fname.startswith("test_")
or fname.endswith("_test.py")
or "tests" in rel_dir.parts
or "__tests__" in rel_dir.parts
)
if is_test:
python_tests += 1
test_roots["python"].add(str(rel_dir))
else:
python_sources += 1
# JS / TS
elif fname.endswith((".js", ".jsx", ".ts", ".tsx")):
is_test = (
fname.endswith((".test.js", ".test.jsx", ".test.ts", ".test.tsx",
".spec.js", ".spec.jsx", ".spec.ts", ".spec.tsx"))
or "tests" in rel_dir.parts
or "__tests__" in rel_dir.parts
)
if is_test:
js_ts_tests += 1
test_roots["js_ts"].add(str(rel_dir))
else:
js_ts_sources += 1
# Go
elif fname.endswith(".go"):
is_test = fname.endswith("_test.go")
if is_test:
go_tests += 1
test_roots["go"].add(str(rel_dir))
else:
go_sources += 1
# Rust
elif fname.endswith(".rs"):
is_test = (
"tests" in rel_dir.parts
or fname.endswith("_test.rs")
or "test" in rel_dir.parts
)
if is_test:
rust_tests += 1
test_roots["rust"].add(str(rel_dir))
else:
rust_sources += 1
# Java
elif fname.endswith(".java"):
is_test = (
fname.endswith("Test.java")
or fname.endswith("Tests.java")
or "test" in rel_dir.parts
or "tests" in rel_dir.parts
)
if is_test:
java_tests += 1
test_roots["java"].add(str(rel_dir))
else:
java_sources += 1
# Ruby
elif fname.endswith(".rb"):
is_test = (
fname.endswith("_test.rb")
or fname.endswith("_spec.rb")
or "test" in rel_dir.parts
or "tests" in rel_dir.parts
or "spec" in rel_dir.parts
)
if is_test:
ruby_tests += 1
test_roots["ruby"].add(str(rel_dir))
else:
ruby_sources += 1
languages: Dict[str, LanguageDetection] = {
"python": LanguageDetection(
language="python",
source_files=python_sources,
test_files=python_tests,
test_roots=sorted(test_roots["python"]),
status="detected" if python_sources > 0 else "none",
note=None if python_sources > 0 else "No Python *.py files detected",
),
"js_ts": LanguageDetection(
language="javascript_typescript",
source_files=js_ts_sources,
test_files=js_ts_tests,
test_roots=sorted(test_roots["js_ts"]),
status="detected" if js_ts_sources > 0 else "none",
note=None if js_ts_sources > 0 else "No JS/TS source files detected",
),
"go": LanguageDetection(
language="go",
source_files=go_sources,
test_files=go_tests,
test_roots=sorted(test_roots["go"]),
status="detected" if go_sources > 0 else "none",
note=None if go_sources > 0 else "No Go *.go files detected",
),
"rust": LanguageDetection(
language="rust",
source_files=rust_sources,
test_files=rust_tests,
test_roots=sorted(test_roots["rust"]),
status="detected" if rust_sources > 0 else "none",
note=None if rust_sources > 0 else "No Rust *.rs files detected",
),
"java": LanguageDetection(
language="java",
source_files=java_sources,
test_files=java_tests,
test_roots=sorted(test_roots["java"]),
status="detected" if java_sources > 0 else "none",
note=None if java_sources > 0 else "No Java *.java files detected",
),
"ruby": LanguageDetection(
language="ruby",
source_files=ruby_sources,
test_files=ruby_tests,
test_roots=sorted(test_roots["ruby"]),
status="detected" if ruby_sources > 0 else "none",
note=None if ruby_sources > 0 else "No Ruby *.rb files detected",
),
}
return languages
def write_scan_snapshot(root: Path, languages: Dict[str, LanguageDetection]) -> Dict[str, Any]:
"""
Write the detection result to .mcp_scan_snapshot.json
"""
snapshot = {
"scanned_at": datetime.utcnow().isoformat() + "Z",
"root": str(root),
"excluded_dirs": sorted(EXCLUDED_DIRS),
"languages": {
name: {
"language": det.language,
"source_files": det.source_files,
"test_files": det.test_files,
"test_roots": det.test_roots,
"status": det.status,
"note": det.note,
}
for name, det in languages.items()
},
}
snapshot_path = root / SCAN_SNAPSHOT_NAME
snapshot_path.write_text(json.dumps(snapshot, indent=2), encoding="utf-8")
return snapshot
def write_all_skipped_report(root: Path, languages: Dict[str, LanguageDetection]) -> Dict[str, Any]:
"""
When literally no sources are detected for any supported language,
we write a report that marks everything as skipped (Option B behaviour).
"""
now = datetime.utcnow().isoformat() + "Z"
# Map languages to their test frameworks
framework_map = {
"python": "pytest",
"js_ts": "jest",
"go": "go test",
"rust": "cargo test",
"java": "maven/gradle",
"ruby": "rspec/minitest",
}
runs: List[Dict[str, Any]] = []
for lang_key, framework in framework_map.items():
runs.append(
asdict(
TestRunResult(
framework=framework,
command="",
exit_code=-1,
raw_output="Skipped: no matching source files detected.",
start_time=now,
end_time=now,
status="skipped",
reason=languages[lang_key].note,
)
)
)
report = {
"executed_at": now,
"root": str(root),
"summary": {
"total_runs": len(runs),
"success": 0,
"failed": 0,
"skipped": len(runs),
},
"runs": runs,
}
(root / TEST_REPORT_NAME).write_text(json.dumps(report, indent=2), encoding="utf-8")
return report
# ---------------------------------------------------------------------------
# TEST PLAN (for Gemini / Claude to work from)
# ---------------------------------------------------------------------------
def build_test_plan(root: Path, languages: Dict[str, LanguageDetection]) -> str:
"""
Build TEST_PLAN_AI.md with a simple, structured overview
that Gemini / Claude can extend into real tests.
"""
lines: List[str] = []
lines.append("# AI Test Plan (Phase 2)")
lines.append("")
lines.append(f"- Generated at: {datetime.utcnow().isoformat()}Z")
lines.append(f"- Project root: `{root}`")
lines.append("")
for key, det in languages.items():
lines.append(f"## {det.language.capitalize()} Tests")
lines.append("")
lines.append(f"- Status: **{det.status}**")
lines.append(f"- Source files detected: **{det.source_files}**")
lines.append(f"- Test files detected: **{det.test_files}**")
if det.test_roots:
lines.append("- Test roots:")
for root_dir in det.test_roots:
lines.append(f" - `{root_dir}`")
else:
lines.append("- Test roots: *(none detected)*")
# Suggested tasks based on project state
if det.source_files > 0:
lines.append("")
lines.append("### Suggested AI Test Tasks")
if det.test_files > 0:
lines.append("- Review existing tests and identify coverage gaps.")
lines.append("- Generate tests for new or recently changed modules.")
lines.append("- Prioritize critical paths and pure functions first.")
else:
lines.append("- Create an initial minimal test suite.")
lines.append("- Start with smoke tests for critical entrypoints.")
else:
lines.append("")
lines.append("### Suggested AI Test Tasks")
lines.append("- No sources detected; test generation not applicable yet.")
lines.append("")
path = root / TEST_PLAN_NAME
path.write_text("\n".join(lines), encoding="utf-8")
return str(path)
# ---------------------------------------------------------------------------
# SMART FRAMEWORK DETECTION
# ---------------------------------------------------------------------------
def detect_test_frameworks(root: Path) -> Dict[str, str]:
"""
Detect test frameworks by looking for configuration files.
Returns a dict mapping language keys to detected test commands.
"""
frameworks = {}
# JavaScript/TypeScript - check package.json for test scripts
package_json = root / "package.json"
if package_json.exists():
try:
import json
with open(package_json) as f:
pkg_data = json.load(f)
scripts = pkg_data.get("scripts", {})
# Check for common test scripts
if "test" in scripts:
test_cmd = scripts["test"]
if "jest" in test_cmd:
frameworks["js_ts"] = "npx jest --runInBand"
elif "mocha" in test_cmd:
frameworks["js_ts"] = "npm test"
elif "vitest" in test_cmd:
frameworks["js_ts"] = "npx vitest run"
else:
frameworks["js_ts"] = "npm test"
elif "scripts" in pkg_data and any("jest" in v for v in scripts.values()):
frameworks["js_ts"] = "npx jest --runInBand"
except (json.JSONDecodeError, IOError):
pass
# Java - check for pom.xml (Maven) or build.gradle (Gradle)
if (root / "pom.xml").exists():
frameworks["java"] = "mvn test"
elif (root / "build.gradle").exists() or (root / "build.gradle.kts").exists():
frameworks["java"] = "gradle test"
# Rust - check for Cargo.toml
if (root / "Cargo.toml").exists():
frameworks["rust"] = "cargo test --verbose"
# Go - standard go test
if any(root.glob("*.go")):
frameworks["go"] = "go test ./... -v"
# Ruby - check for Gemfile and detect RSpec or Minitest
gemfile = root / "Gemfile"
if gemfile.exists():
try:
content = gemfile.read_text()
if "rspec" in content.lower():
frameworks["ruby"] = "bundle exec rspec"
elif "minitest" in content.lower():
frameworks["ruby"] = "bundle exec rake test"
except IOError:
pass
# Check for spec/ directory (RSpec convention)
if (root / "spec").exists() and not frameworks.get("ruby"):
frameworks["ruby"] = "rspec"
return frameworks
# ---------------------------------------------------------------------------
# PLANNING & EXECUTION OF TEST RUNS
# ---------------------------------------------------------------------------
def plan_test_runs(languages: Dict[str, LanguageDetection], root: Optional[Path] = None) -> List[Dict[str, Any]]:
"""
Create test run plans for detected languages.
- If source files exist AND test files exist: plan real test execution
- If source files exist BUT no test files: skip (nothing to test)
- If no source files: skip (language not detected)
- Uses smart framework detection when root path is provided
"""
planned: List[Dict[str, Any]] = []
now = datetime.utcnow().isoformat() + "Z"
# Detect frameworks if root is provided
detected_frameworks = {}
if root:
detected_frameworks = detect_test_frameworks(root)
# Helper function to create skip result
def create_skip_result(framework: str, command: str, reason: str, output: str) -> Dict[str, Any]:
return asdict(
TestRunResult(
framework=framework,
command=command,
exit_code=-1,
raw_output=output,
start_time=now,
end_time=now,
status="skipped",
reason=reason,
)
)
# Python
py = languages["python"]
python_cmd = detected_frameworks.get("python", "pytest")
if py.source_files == 0:
planned.append(create_skip_result(
"pytest", python_cmd,
"No Python source files detected in repository.",
"Skipped: no Python source files detected."
))
elif py.test_files == 0:
planned.append(create_skip_result(
"pytest", python_cmd,
f"Detected {py.source_files} Python source files but no test files to execute.",
"Skipped: no Python test files found."
))
else:
planned.append({"framework": "pytest", "command": python_cmd})
# JavaScript / TypeScript
js = languages["js_ts"]
js_cmd = detected_frameworks.get("js_ts", "npx jest --runInBand")
if js.source_files == 0:
planned.append(create_skip_result(
"jest", js_cmd,
"No JavaScript/TypeScript source files detected in repository.",
"Skipped: no JS/TS source files detected."
))
elif js.test_files == 0:
planned.append(create_skip_result(
"jest", js_cmd,
f"Detected {js.source_files} JS/TS source files but no test files to execute.",
"Skipped: no JS/TS test files found."
))
else:
planned.append({"framework": "jest", "command": js_cmd})
# Go
go = languages["go"]
go_cmd = detected_frameworks.get("go", "go test ./... -v")
if go.source_files == 0:
planned.append(create_skip_result(
"go test", go_cmd,
"No Go source files detected in repository.",
"Skipped: no Go source files detected."
))
elif go.test_files == 0:
planned.append(create_skip_result(
"go test", go_cmd,
f"Detected {go.source_files} Go source files but no test files to execute.",
"Skipped: no Go test files found."
))
else:
planned.append({"framework": "go test", "command": go_cmd})
# Rust
rust = languages["rust"]
rust_cmd = detected_frameworks.get("rust", "cargo test --verbose")
if rust.source_files == 0:
planned.append(create_skip_result(
"cargo test", rust_cmd,
"No Rust source files detected in repository.",
"Skipped: no Rust source files detected."
))
elif rust.test_files == 0:
planned.append(create_skip_result(
"cargo test", rust_cmd,
f"Detected {rust.source_files} Rust source files but no test files to execute.",
"Skipped: no Rust test files found."
))
else:
planned.append({"framework": "cargo test", "command": rust_cmd})
# Java
java = languages["java"]
java_cmd = detected_frameworks.get("java", "mvn test || gradle test")
if java.source_files == 0:
planned.append(create_skip_result(
"maven/gradle", java_cmd,
"No Java source files detected in repository.",
"Skipped: no Java source files detected."
))
elif java.test_files == 0:
planned.append(create_skip_result(
"maven/gradle", java_cmd,
f"Detected {java.source_files} Java source files but no test files to execute.",
"Skipped: no Java test files found."
))
else:
planned.append({"framework": "maven/gradle", "command": java_cmd})
# Ruby
ruby = languages["ruby"]
ruby_cmd = detected_frameworks.get("ruby", "rspec || rake test")
if ruby.source_files == 0:
planned.append(create_skip_result(
"rspec/minitest", ruby_cmd,
"No Ruby source files detected in repository.",
"Skipped: no Ruby source files detected."
))
elif ruby.test_files == 0:
planned.append(create_skip_result(
"rspec/minitest", ruby_cmd,
f"Detected {ruby.source_files} Ruby source files but no test files to execute.",
"Skipped: no Ruby test files found."
))
else:
planned.append({"framework": "rspec/minitest", "command": ruby_cmd})
return planned
def execute_planned_runs(root: Path, planned: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Execute the real commands and pass through the synthetic 'skipped' runs.
"""
results: List[Dict[str, Any]] = []
success = failed = skipped = 0
for item in planned:
# Synthetic skip (already a full TestRunResult dict)
if "status" in item and "framework" in item and "command" in item:
results.append(item)
if item.get("status") == "skipped":
skipped += 1
continue
framework = item["framework"]
cmd = item["command"]
start = datetime.utcnow().isoformat() + "Z"
try:
proc = subprocess.run(
cmd,
shell=True,
cwd=str(root),
capture_output=True,
text=True,
)
end = datetime.utcnow().isoformat() + "Z"
raw = (proc.stdout or "") + "\n" + (proc.stderr or "")
status = "success" if proc.returncode == 0 else "failed"
if status == "success":
success += 1
else:
failed += 1
results.append(
asdict(
TestRunResult(
framework=framework,
command=cmd,
exit_code=proc.returncode,
raw_output=raw.strip(),
start_time=start,
end_time=end,
status=status,
)
)
)
except Exception as exc:
end = datetime.utcnow().isoformat() + "Z"
skipped += 1
results.append(
asdict(
TestRunResult(
framework=framework,
command=cmd,
exit_code=-1,
raw_output=str(exc),
start_time=start,
end_time=end,
status="error",
reason=str(exc),
)
)
)
report: Dict[str, Any] = {
"executed_at": datetime.utcnow().isoformat() + "Z",
"root": str(root),
"summary": {
"total_runs": len(results),
"success": success,
"failed": failed,
"skipped": skipped,
},
"runs": results,
}
(root / TEST_REPORT_NAME).write_text(json.dumps(report, indent=2), encoding="utf-8")
return report
def execute_single_test_run(root: Path, item: Dict[str, Any]) -> Dict[str, Any]:
"""
Execute a single test run. Helper function for parallel execution.
"""
framework = item["framework"]
cmd = item["command"]
start = datetime.utcnow().isoformat() + "Z"
try:
proc = subprocess.run(
cmd,
shell=True,
cwd=str(root),
capture_output=True,
text=True,
)
end = datetime.utcnow().isoformat() + "Z"
raw = (proc.stdout or "") + "\n" + (proc.stderr or "")
status = "success" if proc.returncode == 0 else "failed"
return asdict(
TestRunResult(
framework=framework,
command=cmd,
exit_code=proc.returncode,
raw_output=raw.strip(),
start_time=start,
end_time=end,
status=status,
)
)
except Exception as exc:
end = datetime.utcnow().isoformat() + "Z"
return asdict(
TestRunResult(
framework=framework,
command=cmd,
exit_code=-1,
raw_output=str(exc),
start_time=start,
end_time=end,
status="error",
reason=str(exc),
)
)
def execute_planned_runs_parallel(
root: Path,
planned: List[Dict[str, Any]],
max_workers: int = DEFAULT_MAX_WORKERS
) -> Dict[str, Any]:
"""
Execute test commands in parallel using ThreadPoolExecutor.
Pass through synthetic 'skipped' runs unchanged.
Args:
root: Project root directory
planned: List of test run plans (mix of actual runs and skipped entries)
max_workers: Maximum number of parallel test runners (default: 4)
Returns:
Test report dictionary with execution results
"""
results: List[Dict[str, Any]] = []
success = failed = skipped = 0
# Separate skipped items from actual test runs
skipped_items = []
test_runs = []
for item in planned:
# Synthetic skip (already a full TestRunResult dict)
if "status" in item and "framework" in item and "command" in item:
skipped_items.append(item)
if item.get("status") == "skipped":
skipped += 1
else:
test_runs.append(item)
# Execute test runs in parallel
if test_runs:
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all test runs
future_to_item = {
executor.submit(execute_single_test_run, root, item): item
for item in test_runs
}
# Collect results as they complete
for future in as_completed(future_to_item):
result = future.result()
results.append(result)
# Update counters
status = result.get("status")
if status == "success":
success += 1
elif status == "failed":
failed += 1
elif status in ("error", "skipped"):
skipped += 1
# Combine with skipped items
results.extend(skipped_items)
# Generate report
report: Dict[str, Any] = {
"executed_at": datetime.utcnow().isoformat() + "Z",
"root": str(root),
"summary": {
"total_runs": len(results),
"success": success,
"failed": failed,
"skipped": skipped,
},
"runs": results,
}
(root / TEST_REPORT_NAME).write_text(json.dumps(report, indent=2), encoding="utf-8")
return report
# ---------------------------------------------------------------------------
# COVERAGE MAP (Markdown for humans/Claude)
# ---------------------------------------------------------------------------
def build_coverage_map(root: Path, snapshot: Dict[str, Any], report: Dict[str, Any]) -> str:
"""
Build TEST_COVERAGE_MAP.md combining detection & test results.
"""
lines: List[str] = []
lines.append("# Test Coverage Map (Phase 2)")
lines.append("")
lines.append(f"- Generated at: {datetime.utcnow().isoformat()}Z")
lines.append(f"- Project root: `{root}`")
lines.append("")
languages = snapshot.get("languages", {})
lines.append("## Languages")
for name, det in languages.items():
lines.append(f"### {det.get('language', name)}")
lines.append(f"- Source files: **{det.get('source_files', 0)}**")
lines.append(f"- Test files: **{det.get('test_files', 0)}**")
lines.append(f"- Status: **{det.get('status', 'unknown')}**")
note = det.get("note")
if note:
lines.append(f"- Note: {note}")
lines.append("")
lines.append("## Test Runs")
summary = report.get("summary", {})
lines.append(f"- Total runs: **{summary.get('total_runs', 0)}**")
lines.append(f"- Success: **{summary.get('success', 0)}**")
lines.append(f"- Failed: **{summary.get('failed', 0)}**")
lines.append(f"- Skipped: **{summary.get('skipped', 0)}**")
lines.append("")
lines.append("| Framework | Status | Exit Code | Reason |")
lines.append("|----------|--------|-----------|--------|")
for run in report.get("runs", []):
framework = run.get("framework", "")
status = run.get("status", "")
exit_code = run.get("exit_code", "")
reason = run.get("reason", "") or ""
reason = reason.replace("|", "/")
lines.append(f"| {framework} | {status} | {exit_code} | {reason} |")
path = root / COVERAGE_MAP_NAME
path.write_text("\n".join(lines), encoding="utf-8")
return str(path)
# ---------------------------------------------------------------------------
# MCP SERVER + TOOLS (PHASE 2)
# ---------------------------------------------------------------------------
mcp = FastMCP("local-testing-agent-phase2")
@mcp.tool()
def scan_repository(project_root: str = ".") -> Dict[str, Any]:
"""
Phase 2 – Option B:
Only scan, write snapshot + AI test plan.
No tests executed.
"""
root = Path(project_root).resolve()
languages = detect_languages(root)
snapshot = write_scan_snapshot(root, languages)
plan_path = build_test_plan(root, languages)
return {
"project_root": str(root),
"snapshot_file": str(root / SCAN_SNAPSHOT_NAME),
"test_plan_file": plan_path,
"languages": snapshot["languages"],
}
@mcp.tool()
def run_all_tests(project_root: str = ".") -> Dict[str, Any]:
"""
Phase 2 – Option B:
- Detect languages
- Write snapshot
- Build AI test plan
- Respect thresholds (MIN_SOURCE_FILES / MIN_TEST_FILES)
- If nothing testable: write skipped report
- Else: plan + execute tests
- Always write coverage map
"""
root = Path(project_root).resolve()
languages = detect_languages(root)
snapshot = write_scan_snapshot(root, languages)
plan_path = build_test_plan(root, languages)
# No sources for any language → everything skipped, but still logged
if all(det.source_files == 0 for det in languages.values()):
report = write_all_skipped_report(root, languages)
coverage_path = build_coverage_map(root, snapshot, report)
return {
"project_root": str(root),
"snapshot_file": str(root / SCAN_SNAPSHOT_NAME),
"test_plan_file": plan_path,
"report_file": str(root / TEST_REPORT_NAME),
"coverage_map_file": coverage_path,
"note": "No source files detected for any supported language; all tests skipped.",
"report": report,
}
# There are sources somewhere → honour thresholds and run what we can
planned = plan_test_runs(languages, root)
report = execute_planned_runs(root, planned)
coverage_path = build_coverage_map(root, snapshot, report)
return {
"project_root": str(root),
"snapshot_file": str(root / SCAN_SNAPSHOT_NAME),
"test_plan_file": plan_path,
"report_file": str(root / TEST_REPORT_NAME),
"coverage_map_file": coverage_path,
"report": report,
}
@mcp.tool()
def full_phase2_pipeline(project_root: str = ".") -> Dict[str, Any]:
"""
Convenience tool:
scan → plan → run → coverage in one call.
Same as run_all_tests, exposed under a more explicit name.
"""
return run_all_tests(project_root=project_root)
@mcp.tool()
def run_all_tests_parallel(project_root: str = ".", max_workers: int = DEFAULT_MAX_WORKERS) -> Dict[str, Any]:
"""
Phase 2 with parallel execution:
- Detect languages
- Write snapshot
- Build AI test plan
- Execute tests in parallel (max_workers threads)
- Always write coverage map
Args:
project_root: Root directory of the project to test (default: current directory)
max_workers: Maximum number of parallel test runners (default: 4)
Returns:
Dictionary with test results and file paths
"""
root = Path(project_root).resolve()
languages = detect_languages(root)
snapshot = write_scan_snapshot(root, languages)
plan_path = build_test_plan(root, languages)
# No sources for any language → everything skipped, but still logged
if all(det.source_files == 0 for det in languages.values()):
report = write_all_skipped_report(root, languages)
coverage_path = build_coverage_map(root, snapshot, report)
return {
"project_root": str(root),
"snapshot_file": str(root / SCAN_SNAPSHOT_NAME),
"test_plan_file": plan_path,
"report_file": str(root / TEST_REPORT_NAME),
"coverage_map_file": coverage_path,
"note": "No source files detected for any supported language; all tests skipped.",
"report": report,
}
# There are sources somewhere → plan and run tests in parallel
planned = plan_test_runs(languages, root)
report = execute_planned_runs_parallel(root, planned, max_workers)
coverage_path = build_coverage_map(root, snapshot, report)
return {
"project_root": str(root),
"snapshot_file": str(root / SCAN_SNAPSHOT_NAME),
"test_plan_file": plan_path,
"report_file": str(root / TEST_REPORT_NAME),
"coverage_map_file": coverage_path,
"report": report,
}
if __name__ == "__main__":
mcp.run()