Skip to main content
Glama
static_analysis.py7.56 kB
""" Processor for static type analysis of Python code using mypy. """ import asyncio import logging import tempfile import os import time from typing import Dict, Any, List from ..jobs.enums import JobStatus from ..jobs.base import JobProcessor, StaticAnalysisJob logger = logging.getLogger("quack") class StaticAnalysisJobProcessor(JobProcessor): """Processor for static analysis jobs using mypy""" async def process(self, job: StaticAnalysisJob) -> None: """ Process a static analysis job using mypy This processor: 1. Creates a temporary file with the code 2. Runs mypy on the file 3. Parses the output into structured data 4. Updates the job with results or error information The job status will be updated to COMPLETED or FAILED based on the outcome of the processing. Args: job: The static analysis job to process """ # Mark job as running job.status = JobStatus.RUNNING job.started_at = time.time() logger.info(f"[{job.job_type.value}:{job.id}] Starting mypy analysis") temp_path = None try: # Create temporary file with tempfile.NamedTemporaryFile(suffix=".py", delete=False) as temp_file: temp_path = temp_file.name temp_file.write(job.code.encode("utf-8")) logger.debug( f"[{job.job_type.value}:{job.id}] Created temporary file at {temp_path}" ) # Run mypy try: # Try up to 3 times with exponential backoff for attempt in range(3): try: if attempt > 0: logger.info( f"[{job.job_type.value}:{job.id}] Retry attempt {attempt + 1}" ) # Wait with exponential backoff await asyncio.sleep(2**attempt) # Run mypy with options for machine-readable output process = await asyncio.create_subprocess_exec( "mypy", "--no-error-summary", "--show-column-numbers", "--show-error-codes", "--no-pretty", temp_path, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) logger.debug( f"[{job.job_type.value}:{job.id}] Mypy process started with PID: {process.pid}" ) # Set a timeout for the process stdout, stderr = await asyncio.wait_for( process.communicate(), timeout=30.0 ) # If we get here, the process completed without timing out break except (OSError, asyncio.TimeoutError) as e: if attempt == 2: # Last attempt raise # Re-raise the exception logger.warning( f"[{job.job_type.value}:{job.id}] Attempt {attempt + 1} failed: {str(e)}" ) # Process results - mypy returns non-zero if it finds type errors mypy_output = stdout.decode().strip() mypy_errors = stderr.decode().strip() if mypy_errors: logger.error( f"[{job.job_type.value}:{job.id}] Mypy error: {mypy_errors}" ) job.status = JobStatus.FAILED job.error = f"Mypy error: {mypy_errors}" job.completed_at = time.time() return # Parse mypy output issues: List[Dict[str, Any]] = [] if mypy_output: for line in mypy_output.splitlines(): parts = line.split(":", 4) if len(parts) >= 4: file_path, line_num, col_num, message = parts[:4] try: line_num = int(line_num) col_num = int(col_num) # Extract error code if present error_code = None if "[" in message and "]" in message: code_part = message.split("[")[1].split("]")[0] if code_part.startswith("error-"): error_code = code_part # Add line content line_content = None if 0 <= line_num - 1 < len(job.code.splitlines()): line_content = job.code.splitlines()[line_num - 1] issues.append( { "line": line_num, "column": col_num, "message": message.strip(), "error_code": error_code, "line_content": line_content, } ) except (ValueError, IndexError): # Skip malformed output lines logger.warning( f"[{job.job_type.value}:{job.id}] Skipping malformed output line: {line}" ) continue # Create result job.result = { "status": "success", "summary": {"issue_count": len(issues)}, "issues": issues, } logger.info( f"[{job.job_type.value}:{job.id}] Analysis complete with {len(issues)} issues" ) job.status = JobStatus.COMPLETED job.completed_at = time.time() except asyncio.TimeoutError: logger.error(f"[{job.job_type.value}:{job.id}] Process timed out") job.status = JobStatus.FAILED job.error = "Process timed out after 30 seconds" job.completed_at = time.time() except Exception as e: logger.error( f"[{job.job_type.value}:{job.id}] Error: {str(e)}", exc_info=True ) job.status = JobStatus.FAILED job.error = f"Error: {str(e)}" job.completed_at = time.time() finally: # Clean up temporary file if temp_path and os.path.exists(temp_path): try: os.unlink(temp_path) logger.debug( f"[{job.job_type.value}:{job.id}] Cleaned up temporary file: {temp_path}" ) except Exception as e: logger.error( f"[{job.job_type.value}:{job.id}] Failed to clean up temporary file: {str(e)}" )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/DXC-Lab-Linkage/quack-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server