Skip to main content
Glama

subscribe_to_generation

Subscribe to receive real-time updates for AI image generation tasks, enabling webhook notifications and session-based resource tracking through the Replicate API.

Instructions

Handle resource subscription requests.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
requestYes

Implementation Reference

  • The handler function for the 'subscribe_to_generation' MCP tool. It takes a SubscriptionRequest with uri and session_id, extracts the session, and calls the subscription manager to subscribe to generation updates for the given resource URI.
    @mcp.tool() async def subscribe_to_generation(request: SubscriptionRequest) -> EmptyResult: """Handle resource subscription requests.""" if request.uri.startswith("generations://"): session = ServerSession(request.session_id) await subscription_manager.subscribe(request.uri, session) return EmptyResult()
  • Pydantic model defining the input schema for the subscribe_to_generation tool.
    class SubscriptionRequest(BaseModel): """Request model for subscription operations.""" uri: str = Field(..., description="Resource URI to subscribe to") session_id: str = Field(..., description="ID of the session making the request")
  • The subscribe method of GenerationSubscriptionManager that handles adding the session to the subscriptions dict for the prediction_id extracted from the URI and starts the background polling task if needed.
    async def subscribe(self, uri: str, session: ServerSession): """Subscribe a session to generation updates.""" prediction_id = uri.replace("generations://", "") if prediction_id not in self._subscriptions: self._subscriptions[prediction_id] = set() self._subscriptions[prediction_id].add(session) # Start checking if not already running if not self._check_task: self._check_task = asyncio.create_task(self._check_generations())
  • The background polling method _check_generations in GenerationSubscriptionManager that periodically checks the status of subscribed predictions, sends notifications and resources to subscribed sessions when complete, and removes finished subscriptions.
    async def _check_generations(self): """Periodically check subscribed generations and notify of updates.""" async with ReplicateClient(api_token=os.getenv("REPLICATE_API_TOKEN")) as client: while True: try: for prediction_id, sessions in list(self._subscriptions.items()): try: result = await client.get_prediction_status(prediction_id) # Notify on completion or failure if result["status"] in ["succeeded", "failed", "canceled"]: # For succeeded generations with image output if result["status"] == "succeeded" and result.get("output"): # For image generation models, output is typically a list with the image URL as first item image_url = ( result["output"][0] if isinstance(result["output"], list) else result["output"] ) # First send a notification with just the URL and metadata notification = ResourceUpdatedNotification( method="notifications/resources/updated", params={"uri": f"generations://{prediction_id}"}, ) # Create text resource with metadata and URL text_resource = TextResourceContents( type="text", uri=f"generations://{prediction_id}", mimeType="application/json", text=json.dumps( { "status": "succeeded", "image_url": image_url, "created_at": result.get("created_at"), "completed_at": result.get("completed_at"), "metrics": result.get("metrics", {}), "urls": result.get("urls", {}), "input": result.get("input", {}), }, indent=2, ), ) # Send notification and text resource to all sessions for session in sessions: await session.send_notification(notification) await session.send_resource(text_resource) # Remove from subscriptions since we've notified del self._subscriptions[prediction_id] else: # For failed or canceled generations, create text resource resource = TextResourceContents( uri=f"generations://{prediction_id}", mimeType="application/json", text=json.dumps( { "status": result["status"], "error": result.get("error"), "created_at": result.get("created_at"), "completed_at": result.get("completed_at"), "metrics": result.get("metrics", {}), "urls": result.get("urls", {}), }, indent=2, ), ) # Send notification with the resource notification = ResourceUpdatedNotification( method="notifications/resources/updated", params={"uri": AnyUrl(f"generations://{prediction_id}")}, ) for session in sessions: await session.send_notification(notification) # Also send the resource directly await session.send_resource(resource) # Remove completed/failed generation from subscriptions del self._subscriptions[prediction_id] except Exception as e: logger.error(f"Error checking generation {prediction_id}: {e}") if not self._subscriptions: break await asyncio.sleep(2.0) # Poll every 2 seconds except asyncio.CancelledError: break except Exception as e: logger.error(f"Error in generation check loop: {e}") await asyncio.sleep(5.0) # Back off on errors
  • The @mcp.tool() decorator registers the subscribe_to_generation function as an MCP tool in the FastMCP server instance.
    @mcp.tool() async def subscribe_to_generation(request: SubscriptionRequest) -> EmptyResult: """Handle resource subscription requests.""" if request.uri.startswith("generations://"): session = ServerSession(request.session_id) await subscription_manager.subscribe(request.uri, session) return EmptyResult()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/gerred/mcp-server-replicate'

If you have feedback or need assistance with the MCP directory API, please join our Discord server