Skip to main content
Glama
async_extensions.py9.13 kB
""" Extended BLAST MCP server with async job support """ import asyncio import uuid from typing import Any, Optional from pathlib import Path import httpx from mcp.types import Tool, TextContent, ErrorContent from pydantic import BaseModel, Field # Import the original server from .server import BlastServer as BaseBlastServer class AsyncBlastServer(BaseBlastServer): """Extended BLAST server with async job submission""" def __init__(self, settings=None, queue_url: Optional[str] = None): super().__init__(settings) self.queue_url = queue_url or "http://localhost:8000" self._setup_async_handlers() def _setup_async_handlers(self): """Add async job handlers to the existing tools""" @self.server.list_tools() async def list_tools() -> list[Tool]: # Get base tools base_tools = await super(AsyncBlastServer, self).server.list_tools() # Add async variants async_tools = [ Tool( name="blastn_async", description="Submit nucleotide BLAST search as background job", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "Path to query FASTA file" }, "database": { "type": "string", "description": "Database name" }, "evalue": { "type": "number", "default": 10 }, "max_hits": { "type": "integer", "default": 50 }, "notification_email": { "type": "string", "description": "Email for job completion notification" } }, "required": ["query", "database"] } ), Tool( name="get_job_status", description="Check status of async job", inputSchema={ "type": "object", "properties": { "job_id": { "type": "string", "description": "Job ID to check" } }, "required": ["job_id"] } ), Tool( name="get_job_result", description="Retrieve results of completed job", inputSchema={ "type": "object", "properties": { "job_id": { "type": "string", "description": "Job ID to retrieve results for" } }, "required": ["job_id"] } ), ] return base_tools + async_tools @self.server.call_tool() async def call_tool(name: str, arguments: Any) -> list[TextContent | ErrorContent]: # Handle async tools if name == "blastn_async": return await self._submit_blast_job(arguments, "blastn") elif name == "get_job_status": return await self._get_job_status(arguments["job_id"]) elif name == "get_job_result": return await self._get_job_result(arguments["job_id"]) else: # Delegate to parent class return await super(AsyncBlastServer, self).server.call_tool(name, arguments) async def _submit_blast_job(self, arguments: dict, blast_type: str) -> list[TextContent | ErrorContent]: """Submit BLAST job to queue""" try: # Generate job ID job_id = str(uuid.uuid4()) # Prepare job request job_request = { "job_id": job_id, "job_type": blast_type, "parameters": arguments, "priority": 5 } # Submit to queue API async with httpx.AsyncClient() as client: response = await client.post( f"{self.queue_url}/jobs/submit", json=job_request, timeout=30.0 ) if response.status_code != 200: return [ErrorContent(text=f"Failed to submit job: {response.text}")] job_info = response.json() return [TextContent( text=f"Job submitted successfully!\n\n" f"Job ID: {job_info['job_id']}\n" f"Status: {job_info['status']}\n" f"Queue: {blast_type}\n\n" f"Use 'get_job_status' with this job ID to check progress." )] except Exception as e: return [ErrorContent(text=f"Error submitting job: {str(e)}")] async def _get_job_status(self, job_id: str) -> list[TextContent | ErrorContent]: """Get job status from queue""" try: async with httpx.AsyncClient() as client: response = await client.get( f"{self.queue_url}/jobs/{job_id}/status", timeout=10.0 ) if response.status_code == 404: return [ErrorContent(text=f"Job {job_id} not found")] elif response.status_code != 200: return [ErrorContent(text=f"Failed to get job status: {response.text}")] job_info = response.json() status_text = f"Job ID: {job_info['job_id']}\n" status_text += f"Status: {job_info['status']}\n" status_text += f"Created: {job_info['created_at']}\n" if job_info.get('started_at'): status_text += f"Started: {job_info['started_at']}\n" if job_info.get('progress'): status_text += f"Progress: {job_info['progress']}%\n" if job_info['status'] == 'completed': status_text += f"Completed: {job_info['completed_at']}\n" status_text += "\nUse 'get_job_result' to retrieve the results." elif job_info['status'] == 'failed': status_text += f"Failed: {job_info.get('error', 'Unknown error')}\n" return [TextContent(text=status_text)] except Exception as e: return [ErrorContent(text=f"Error checking job status: {str(e)}")] async def _get_job_result(self, job_id: str) -> list[TextContent | ErrorContent]: """Get job results from storage""" try: async with httpx.AsyncClient() as client: response = await client.get( f"{self.queue_url}/jobs/{job_id}/result", timeout=30.0 ) if response.status_code == 404: return [ErrorContent(text=f"Job {job_id} not found")] elif response.status_code != 200: return [ErrorContent(text=f"Failed to get job result: {response.text}")] result = response.json() if result['status'] != 'completed': return [ErrorContent(text=f"Job is not completed yet. Status: {result['status']}")] # For demo, return summary + download link result_text = f"Job {job_id} Results\n" result_text += "=" * 40 + "\n\n" if 'summary' in result: result_text += "Summary:\n" for key, value in result['summary'].items(): result_text += f" {key}: {value}\n" result_text += "\n" result_text += f"Full results available at:\n{result['result_url']}\n" result_text += "\nResults will be available for 7 days." return [TextContent(text=result_text)] except Exception as e: return [ErrorContent(text=f"Error retrieving job results: {str(e)}")]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/bio-mcp/bio-mcp-blast'

If you have feedback or need assistance with the MCP directory API, please join our Discord server