Skip to main content
Glama
server.py11.9 kB
import sys import asyncio from fastmcp import FastMCP from fastmcp.server.dependencies import get_http_headers from dotenv import load_dotenv from pydantic_settings import BaseSettings, SettingsConfigDict from pydantic import Field from typing import Optional, List, Dict from grabba import ( Grabba, Job, JobResult, JobStats, GetJobResponse, GetJobsResponse, JobExecutionStatus, GetJobResultResponse, JobExecutionResponse, JobCreationResponse, JobEstimatedCostResponse, PuppetRegion, JobStatsResponse ) class ServerConfig(BaseSettings): PORT: int = Field(8283, description="The PORT the MCP server should run on.") API_KEY: str = Field(None, description="The API key for accessing the Grabba python SDK.") MCP_SERVER_TRANSPORT: str = Field("stdio", description="The transport protocol for the MCP mcp.") model_config = SettingsConfigDict( env_file=".env", extra="ignore" ) class GrabbaService: def __init__(self, api_key: str): if not api_key: raise ValueError("API Key cannot be empty.") self.grabba = Grabba(api_key) async def fetch_stats_data(self) -> tuple[str, Optional[JobStats]]: """Fetch usage stats and user token balance""" try: result: JobStatsResponse = self.grabba.get_stats() return result.message, result.job_stats except Exception as err: return f"Error fetching usage stats: {str(err)}", None async def estimate_job_cost(self, extraction_data: Job) -> tuple[str, Optional[List[Job]]]: """Get the estimated cost of a job before creating or scheduling it""" try: result: JobEstimatedCostResponse = self.grabba.estimate_job_cost(job=extraction_data) return result.message, result.job_estimated_cost except Exception as err: return f"Error estimating job cost: {str(err)}", None async def extract_data(self, extraction_data: Job) -> tuple[str, Optional[Dict]]: """Schedule a new data extraction job. [Web Search Tool - when used with markdown tasks]""" try: result: JobExecutionResponse = self.grabba.extract(job=extraction_data) if result.status == JobExecutionStatus.SUCCESS: job_result: JobResult = result.job_result return result.message, job_result return result.message, result.job_result except Exception as err: return f"Error scheduling job: {str(err)}", None async def create_job(self, extraction_data: Job) -> tuple[str, Optional[Job]]: """Create a new data extraction job (Without scheduling it)""" try: result: JobCreationResponse = self.grabba.create_job(job=extraction_data) return result.message, result.job except Exception as err: return f"Error creating job: {str(err)}", None async def schedule_job(self, job_id: str) -> tuple[str, Optional[Dict]]: """Schedule an existing job to run immediately""" try: result: JobExecutionResponse = self.grabba.schedule_job(job_id=job_id) return result.message, result.job_result except Exception as err: return f"Error scheduling job: {str(err)}", None async def fetch_jobs_data(self) -> tuple[str, Optional[List[Job]]]: """Fetch all jobs for the current user""" try: result: GetJobsResponse = self.grabba.get_jobs() return result.message, result.jobs except Exception as err: return f"Error fetching jobs: {str(err)}", None async def fetch_job_data(self, job_id: str) -> tuple[str, Optional[Job]]: """Fetch details of a specific job""" try: result: GetJobResponse = self.grabba.get_job(job_id) return result.message, result.job except Exception as err: return f"Error fetching job: {str(err)}", None async def delete_job_data(self, job_id: str) -> tuple[str, None]: """Delete a specific job""" try: self.grabba.delete_job(job_id) return f"Successfully deleted job {job_id}", None except Exception as err: return f"Error deleting job: {str(err)}", None async def fetch_job_result_data(self, job_result_id: str) -> tuple[str, Optional[Dict]]: """Fetch results of a completed job""" try: result: GetJobResultResponse = self.grabba.get_job_result(job_result_id) return result.message, result.job_result except Exception as err: return f"Error fetching job results: {str(err)}", None async def delete_job_result_data(self, job_result_id: str) -> tuple[str, None]: """Delete results of a completed job""" try: self.grabba.delete_job_result(job_result_id) return f"Successfully deleted job result {job_result_id}", None except Exception as err: return f"Error deleting job results: {str(err)}", None async def fetch_available_regions(self) -> tuple[str, Optional[List[Dict[str, PuppetRegion]]]]: """Fetch all available puppet (web agent) regions for scheduling web data extractions.""" try: return "Fetched available regions successfully", self.grabba.get_available_regions() except Exception as err: return f"Error fetching jobs: {str(err)}", None # Load environment variables from .env file load_dotenv() # Instantiate ServerConfig server_config = ServerConfig() # === ARGUMENT PARSING LOGIC === if len(sys.argv) > 1: transport = sys.argv[1] # Check if the first argument is a known transport type valid_transports = ["stdio", "streamable-http", "sse"] if transport in valid_transports: server_config.MCP_SERVER_TRANSPORT = transport print(f"Overriding transport protocol from command line: {server_config.MCP_SERVER_TRANSPORT}") # Initialize the MCP server mcp = FastMCP(name="grabba-agent") # Helper function to get GrabbaService instance within tool async def _get_grabba_service_instance() -> GrabbaService: """ Resolves the GrabbaService dependency. This is called by each tool to get an authenticated GrabbaService instance. """ headers = get_http_headers() # Get API_KEY from headers api_key = headers.get("api_key") or server_config.API_KEY if not api_key: raise ValueError("API Key is missing. Provide it via API_KEY header or API_KEY env var.") return GrabbaService(api_key=api_key) ############################# # Tools SPecifications # ############################# @mcp.tool( name="fetch_stats_data", description="Fetches usage statistics and current user token balance for Grabba. Takes no parameters.", tags={"billing", "usage"} ) async def fetch_stats_data_tool() -> tuple[str, Optional[JobStats]]: grabba_service = await _get_grabba_service_instance() return await grabba_service.fetch_stats_data() @mcp.tool( name="estimate_job_cost", description="Estimates the cost of a Grabba job before creation or scheduling. Requires a 'Job' object detailing the extraction tasks.", tags={"billing"} ) async def estimate_job_cost_tool(extraction_data: Job) -> tuple[str, Optional[Dict]]: grabba_service = await _get_grabba_service_instance() return await grabba_service.estimate_job_cost(extraction_data) @mcp.tool( name="create_job", description="Creates a new data extraction job in Grabba without immediately scheduling it for execution. Requires a 'Job' object detailing the extraction tasks.", tags={"management"} ) async def create_job_tool(extraction_data: Job) -> tuple[str, Optional[Job]]: grabba_service = await _get_grabba_service_instance() return await grabba_service.create_job(extraction_data) @mcp.tool( name="extract_data", description="Schedules a new data extraction job with Grabba. Requires a 'Job' object detailing the extraction tasks.", tags={"catalog", "search"}, ) async def extract_data_tool(extraction_data: Job) -> tuple[str, Optional[Dict]]: grabba_service = await _get_grabba_service_instance() return await grabba_service.extract_data(extraction_data) @mcp.tool( name="schedule_existing_job", description="Schedules an existing Grabba job to run immediately. Requires the 'job_id' of the existing job." ) async def schedule_job_tool(job_id: str) -> tuple[str, Optional[Dict]]: grabba_service = await _get_grabba_service_instance() return await grabba_service.schedule_job(job_id) @mcp.tool( name="fetch_all_jobs", description="Fetches all Grabba jobs for the current user. Takes no parameters." ) async def fetch_jobs_data_tool() -> tuple[str, Optional[List[Job]]]: grabba_service = await _get_grabba_service_instance() return await grabba_service.fetch_jobs_data() @mcp.tool( name="fetch_specific_job", description="Fetches details of a specific Grabba job by its ID. Requires the 'job_id' of the job." ) async def fetch_job_data_tool(job_id: str) -> tuple[str, Optional[Job]]: grabba_service = await _get_grabba_service_instance() return await grabba_service.fetch_job_data(job_id) @mcp.tool( name="delete_job", description="Deletes a specific Grabba job. Requires the 'job_id' of the job to delete." ) async def delete_job_data_tool(job_id: str) -> tuple[str, None]: grabba_service = await _get_grabba_service_instance() return await grabba_service.delete_job_data(job_id) @mcp.tool( name="fetch_job_result", description="Fetches results of a completed Grabba job by its result ID. Requires the 'job_result_id' of the result." ) async def fetch_job_result_data_tool(job_result_id: str) -> tuple[str, Optional[Dict]]: grabba_service = await _get_grabba_service_instance() return await grabba_service.fetch_job_result_data(job_result_id) @mcp.tool( name="delete_job_result", description="Deletes results of a completed Grabba job. Requires the 'job_result_id' of the result to delete." ) async def delete_job_result_data_tool(job_result_id: str) -> tuple[str, None]: grabba_service = await _get_grabba_service_instance() return await grabba_service.delete_job_result_data(job_result_id) @mcp.tool( name="fetch_available_regions", description="Fetches a list of all available puppet (web agent) regions that can be used for scheduling web data extractions. Takes no parameters.", tags={"configuration"} ) async def fetch_available_regions_tool() -> tuple[str, Optional[List[PuppetRegion]]]: grabba_service = await _get_grabba_service_instance() return await grabba_service.fetch_available_regions() def main(): if server_config.MCP_SERVER_TRANSPORT == "streamable-http": # Start the MCP server using FastMCP's built-in run method # This will handle HTTP communication protocol (e.g., streamable-http) print("Starting Grabba MCP server (streamable-http transport)...") asyncio.run(mcp.run_streamable_http_async( host="0.0.0.0", port=server_config.PORT, path="/" )) elif server_config.MCP_SERVER_TRANSPORT == "sse": # Start the MCP server using FastMCP's built-in run method # This will handle SSE communication protocol print("Starting Grabba MCP server (sse transport)...") asyncio.run(mcp.run_sse_async( host="0.0.0.0", port=server_config.PORT, path="/" )) else: if not server_config.API_KEY: raise ValueError("API Key required for stdio transport.") # Start the MCP server using StdioTransport print("Starting Grabba MCP server (stdio transport)...") asyncio.run(mcp.run_stdio_async()) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/grabba-dev/grabba-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server