Skip to main content
Glama
server.py8.48 kB
""" MCP server implementation for Quack. """ import logging from collections.abc import AsyncIterator from contextlib import asynccontextmanager from typing import Any, Dict, Optional from mcp.server.fastmcp import Context, FastMCP from .jobs.enums import JobStatus, JobType from .jobs.factory import JobFactory from .jobs.manager import JobManager from .processors.basedpyright import BasedPyrightJobProcessor from .processors.lint import LintJobProcessor from .processors.static_analysis import StaticAnalysisJobProcessor logger = logging.getLogger("quack") # Lifespan context manager for initializing the job manager @asynccontextmanager async def server_lifespan(server: FastMCP) -> AsyncIterator[Dict[str, Any]]: """ Manage server startup and shutdown lifecycle. Args: server: The FastMCP server instance Yields: Dictionary with initialized resources """ # Initialize resources on startup job_manager = JobManager() logger.info("[Server] Job manager initialized") try: yield {"job_manager": job_manager} finally: # Clean up on shutdown (if needed) logger.info("[Server] Shutting down") def create_server() -> FastMCP: """ Create and configure the Quack MCP server Returns: Configured FastMCP server instance """ # Create the MCP server with lifespan mcp = FastMCP("Quack", lifespan=server_lifespan) # Register processors JobFactory.register_processor(JobType.LINT, LintJobProcessor()) JobFactory.register_processor( JobType.STATIC_ANALYSIS, StaticAnalysisJobProcessor() ) JobFactory.register_processor( JobType.BASEDPYRIGHT, BasedPyrightJobProcessor() ) # Generic job submission tool @mcp.tool() async def submit_code( job_type: str, code: str, ctx: Context ) -> Dict[str, Any]: """ Submit Python code for analysis Args: job_type: Type of analysis to perform ("lint", "static_analysis", or "basedpyright") code: Python code content to analyze Returns: Dictionary with job ID for checking results later """ job_manager = ctx.request_context.lifespan_context["job_manager"] # Validate job type try: job_type_enum = JobType.from_string(job_type) except ValueError as e: logger.warning(f"[Server] Invalid job type: {job_type}") return {"status": "error", "message": str(e)} # Submit job job = job_manager.submit_job(job_type_enum, code) logger.info( f"[{job.job_type.value}:{job.id}] Submitted new job ({len(code)} bytes)" ) return { "status": "accepted", "job_id": job.id, "job_type": job.job_type.value, "message": f"Code submitted for {job_type}. Use get_job_results to check status.", } # Convenience tools for specific types @mcp.tool() async def submit_code_for_linting( code: str, ctx: Context ) -> Dict[str, Any]: """ Submit Python code for linting analysis Args: code: Python code content to analyze Returns: Dictionary with job ID for checking results later """ # Reuse generic submit_code tool with "lint" type return await submit_code("lint", code, ctx) @mcp.tool() async def submit_code_for_static_analysis( code: str, ctx: Context ) -> Dict[str, Any]: """ Submit Python code for static type analysis Args: code: Python code content to analyze Returns: Dictionary with job ID for checking results later """ # Reuse generic submit_code tool with "static_analysis" type return await submit_code("static_analysis", code, ctx) @mcp.tool() async def submit_code_for_basedpyright( code: str, ctx: Context, severity: str = "all", top_n: int = -1 ) -> Dict[str, Any]: """ Submit Python code for basedpyright static type analysis Args: code: Python code content to analyze severity: Severity filter ("error", "warning", "info", or "all") top_n: Maximum number of issues to return (-1 for all) Returns: Dictionary with job ID for checking results later """ job_manager = ctx.request_context.lifespan_context["job_manager"] # Validate severity parameter valid_severities = ["error", "warning", "info", "all"] if severity not in valid_severities: logger.warning(f"[Server] Invalid severity: {severity}") return { "status": "error", "message": f"Invalid severity. Must be one of: {valid_severities}", } # Validate top_n parameter if not isinstance(top_n, int) or top_n < -1 or top_n == 0: logger.warning(f"[Server] Invalid top_n: {top_n}") return { "status": "error", "message": "top_n must be a positive integer or -1 for all", } # Convert -1 to None for internal use top_n_internal = None if top_n == -1 else top_n # Submit job with filtering parameters job = job_manager.submit_job( JobType.BASEDPYRIGHT, code, severity, top_n_internal ) logger.info( f"[{job.job_type.value}:{job.id}] Submitted new job ({len(code)} bytes, severity={severity}, top_n={top_n})" ) return { "status": "accepted", "job_id": job.id, "job_type": job.job_type.value, "severity": severity, "top_n": top_n, "message": "Code submitted for basedpyright analysis. Use get_job_results to check status.", } # Get job results tool @mcp.tool() async def get_job_results(job_id: str, ctx: Context) -> Dict[str, Any]: """ Get the results of a previously submitted job Args: job_id: ID of the job Returns: Dictionary with job status and results if available """ job_manager = ctx.request_context.lifespan_context["job_manager"] job = job_manager.get_job(job_id) if not job: logger.warning(f"[Job] Requested unknown job: {job_id}") return { "status": "error", "message": f"No job found with ID: {job_id}", } logger.info( f"[{job.job_type.value}:{job_id}] Status check: {job.status.value}" ) # Return appropriate response based on job status if job.status == JobStatus.COMPLETED: return { "status": "completed", "job_type": job.job_type.value, "results": job.result, "execution_time": job.execution_time, } elif job.status == JobStatus.FAILED: return { "status": "failed", "job_type": job.job_type.value, "error": job.error, "execution_time": job.execution_time, } else: # Still in progress return { "status": job.status.value, "job_type": job.job_type.value, "message": f"Job is {job.status.value}. Please check again later.", } # List jobs tool @mcp.tool() async def list_jobs( ctx: Context, job_type: Optional[str] = None ) -> Dict[str, Any]: """ List all jobs and their statuses Args: ctx: Context object job_type: Optional filter for job type Returns: Dictionary with list of jobs and their statuses """ job_manager = ctx.request_context.lifespan_context["job_manager"] # Convert string job type to enum if provided job_type_enum = None if job_type: try: job_type_enum = JobType.from_string(job_type) except ValueError: return { "status": "error", "message": f"Invalid job type: {job_type}", } return { "jobs": job_manager.list_jobs(job_type_enum), "stats": job_manager.get_stats(), } return mcp

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/DXC-Lab-Linkage/quack-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server