"""
Processor for static type analysis of Python code using mypy.
"""
import asyncio
import logging
import tempfile
import subprocess
from mcp import JobProcessor
log = logging.getLogger(__name__)
class TestProcessor(JobProcessor):
def __init__(self):
super().__init__()
async def process(self, job):
filename = job.get("filename", "submission.py")
code = job["code"]
with tempfile.NamedTemporaryFile(mode="w+", suffix=".py", delete=False) as tmp_file:
tmp_file.write(code)
tmp_file_path = tmp_file.name
try:
result = subprocess.run(
["pytest", tmp_file_path, "-q", "--tb=short", "--disable-warnings", "--maxfail=5"],
capture_output=True, text=True, timeout=10
)
output = result.stdout.strip()
passed = output.count("PASSED")
failed = output.count("FAILED")
job["result"] = {
"status": "success" if result.returncode == 0 else "failure",
"passed": passed,
"failed": failed,
"output": output
}
except Exception as e:
job["result"] = {
"status": "error",
"error_message": str(e)
}
return job