main.py•7.14 kB
from fastmcp import FastMCP
import os
import re
import subprocess
from typing import List
mcp = FastMCP("Demo")
def find_java_files(directory: str) -> List[str]:
java_files = []
for root, dirs, files in os.walk(directory):
for f in files:
if f.endswith(".java"):
java_files.append(os.path.join(root, f))
return java_files
def find_method_signatures(file_path: str) -> List[str]:
signatures = []
with open(file_path, "r", encoding="UTF-8") as f:
for line in f:
line = line.strip()
if line.startswith("public"):
signatures.append(line)
return signatures
@mcp.tool()
def get_all_public_methods(dir: str = "A2/src/main/java/") -> List[str]:
"""
Find all public method signatures in the Java codebase.
"""
files = find_java_files(dir)
signatures = []
for f in files:
signatures.extend(find_method_signatures(f))
return signatures
@mcp.tool
def generate_tests(source_file: str) -> str:
"""Generate JUnit test cases for a given Java source file."""
if not os.path.exists(source_file):
return f"Source file not found: {source_file}"
# Read file
with open(source_file, "r") as f:
code = f.read()
# Find class name
class_match = re.search(r"class\s+(\w+)", code)
if not class_match:
return "Could not find a class in this file."
class_name = class_match.group(1)
test_class_name = class_name + "Test"
# Find public methods
method_pattern = r"public\s+[\w<>\[\]]+\s+(\w+)\s*\("
methods = re.findall(method_pattern, code)
if not methods:
return f"No public methods found in {class_name}."
# Build test file content
test_methods = ""
for method in methods:
test_methods += f"""
@Test
void test_{method}() {{
// TODO: implement test
}}
"""
test_content = f"""import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;
public class {test_class_name} {{
{test_methods}
}}
"""
# Output folder
output_dir = "A2/src/test/java/org/example"
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, f"{test_class_name}.java")
# Write test file
with open(output_path, "w") as f:
f.write(test_content)
return f"Generated test file at: {output_path}"
@mcp.tool
def run_tests():
return subprocess.run(
["mvn", "-f", "A2/pom.xml", "test"],
capture_output=True,
text=True)
REPO_PATH = "/Users/amyfan/Documents/School/SE333/Final-Project-SE333"
def run_git(args):
return subprocess.run(
["git"] + args,
cwd=REPO_PATH,
capture_output=True,
text=True
)
@mcp.tool
def git_status() -> dict:
"""Return clean status, staged changes, and conflicts."""
proc = run_git(["status", "--short"])
lines = proc.stdout.strip().splitlines()
staged = []
unstaged = []
conflicts = []
for line in lines:
status = line[:2]
file = line[3:]
if "U" in status:
conflicts.append(file)
elif status[0].strip(): # staged indicator
staged.append(file)
else:
unstaged.append(file)
return {
"staged": staged,
"unstaged": unstaged,
"conflicts": conflicts,
"raw": proc.stdout,
}
@mcp.tool
def git_add_all():
"""Stage all relevant changes (excluding build artifacts)."""
# basic approach: git add . then git reset target/
run_git(["add", "."])
run_git(["reset", "target/"])
@mcp.tool
def git_commit(message: str) -> dict:
"""Commit staged changes with a message."""
proc = run_git(["commit", "-m", message])
return {"exit_code": proc.returncode, "stdout": proc.stdout, "stderr": proc.stderr}
@mcp.tool
def git_push(remote: str = "origin") -> dict:
"""Push commits to the remote repository."""
proc = run_git(["push", remote])
return {"exit_code": proc.returncode, "stdout": proc.stdout, "stderr": proc.stderr}
@mcp.tool
def git_pull_request(base: str = "main", title: str = "", body: str = "") -> dict:
"""
Create a pull request and return the URL.
Requires GitHub CLI to be configured.
"""
args = [
"gh", "pr", "create",
"--base", base,
"--title", title,
"--body", body
]
proc = subprocess.run(args, cwd=REPO_PATH, capture_output=True, text=True)
return {"exit_code": proc.returncode, "stdout": proc.stdout, "stderr": proc.stderr}
@mcp.tool
def analyze_coverage(xml_path: str = "A2/target/site/jacoco/jacoco.xml") -> dict:
"""
Parse JaCoCo XML and find uncovered methods.
"""
import xml.etree.ElementTree as ET
import os
if not os.path.exists(xml_path):
return {"error": f"Coverage report not found at {xml_path}"}
tree = ET.parse(xml_path)
root = tree.getroot()
uncovered = []
for pkg in root.iter("package"):
for cls in pkg.iter("class"):
class_name = cls.get("name")
for m in cls.iter("method"):
method_name = m.get("name")
counter = m.find("counter[@type='INSTRUCTION']")
if counter is None:
continue
missed = int(counter.get("missed"))
covered = int(counter.get("covered"))
total = missed + covered
coverage = 0 if total == 0 else covered / total
if coverage < 1.0:
uncovered.append({
"class": class_name,
"method": method_name,
"coverage": coverage,
"missed": missed,
"covered": covered,
})
return {
"uncovered_methods": uncovered,
"count": len(uncovered)
}
@mcp.tool()
def run_checkstyle() -> str:
"""Run Maven Checkstyle and generate the XML report."""
result = subprocess.run(
["mvn", "clean", "test"],
capture_output=True,
text=True
)
return result.stdout
@mcp.tool()
def analyze_checkstyle(xml_path: str = "A2/target/checkstyle-result.xml") -> dict:
"""
Parse Checkstyle XML and return a list of violations.
"""
import xml.etree.ElementTree as ET
if not os.path.exists(xml_path):
return {"error": f"No checkstyle report found at {xml_path}"}
tree = ET.parse(xml_path)
root = tree.getroot()
violations = []
for file_elem in root.findall("file"):
file_name = file_elem.get("name")
for error in file_elem.findall("error"):
violations.append({
"file": file_name,
"line": error.get("line"),
"column": error.get("column"),
"severity": error.get("severity"),
"message": error.get("message"),
"source": error.get("source"),
})
return {"violations": violations, "count": len(violations)}
if __name__ == "__main__":
mcp.run(transport="sse")