Skip to main content
Glama
gerred

MCP Server Replicate

subscribe_to_generation

Subscribe to receive real-time updates for AI image generation tasks, enabling webhook notifications and session-based resource tracking through the Replicate API.

Instructions

Handle resource subscription requests.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
requestYes

Implementation Reference

  • The handler function for the 'subscribe_to_generation' MCP tool. It takes a SubscriptionRequest with uri and session_id, extracts the session, and calls the subscription manager to subscribe to generation updates for the given resource URI.
    @mcp.tool()
    async def subscribe_to_generation(request: SubscriptionRequest) -> EmptyResult:
        """Handle resource subscription requests."""
        if request.uri.startswith("generations://"):
            session = ServerSession(request.session_id)
            await subscription_manager.subscribe(request.uri, session)
        return EmptyResult()
  • Pydantic model defining the input schema for the subscribe_to_generation tool.
    class SubscriptionRequest(BaseModel):
        """Request model for subscription operations."""
    
        uri: str = Field(..., description="Resource URI to subscribe to")
        session_id: str = Field(..., description="ID of the session making the request")
  • The subscribe method of GenerationSubscriptionManager that handles adding the session to the subscriptions dict for the prediction_id extracted from the URI and starts the background polling task if needed.
    async def subscribe(self, uri: str, session: ServerSession):
        """Subscribe a session to generation updates."""
        prediction_id = uri.replace("generations://", "")
        if prediction_id not in self._subscriptions:
            self._subscriptions[prediction_id] = set()
        self._subscriptions[prediction_id].add(session)
    
        # Start checking if not already running
        if not self._check_task:
            self._check_task = asyncio.create_task(self._check_generations())
  • The background polling method _check_generations in GenerationSubscriptionManager that periodically checks the status of subscribed predictions, sends notifications and resources to subscribed sessions when complete, and removes finished subscriptions.
    async def _check_generations(self):
        """Periodically check subscribed generations and notify of updates."""
        async with ReplicateClient(api_token=os.getenv("REPLICATE_API_TOKEN")) as client:
            while True:
                try:
                    for prediction_id, sessions in list(self._subscriptions.items()):
                        try:
                            result = await client.get_prediction_status(prediction_id)
                            # Notify on completion or failure
                            if result["status"] in ["succeeded", "failed", "canceled"]:
                                # For succeeded generations with image output
                                if result["status"] == "succeeded" and result.get("output"):
                                    # For image generation models, output is typically a list with the image URL as first item
                                    image_url = (
                                        result["output"][0] if isinstance(result["output"], list) else result["output"]
                                    )
    
                                    # First send a notification with just the URL and metadata
                                    notification = ResourceUpdatedNotification(
                                        method="notifications/resources/updated",
                                        params={"uri": f"generations://{prediction_id}"},
                                    )
    
                                    # Create text resource with metadata and URL
                                    text_resource = TextResourceContents(
                                        type="text",
                                        uri=f"generations://{prediction_id}",
                                        mimeType="application/json",
                                        text=json.dumps(
                                            {
                                                "status": "succeeded",
                                                "image_url": image_url,
                                                "created_at": result.get("created_at"),
                                                "completed_at": result.get("completed_at"),
                                                "metrics": result.get("metrics", {}),
                                                "urls": result.get("urls", {}),
                                                "input": result.get("input", {}),
                                            },
                                            indent=2,
                                        ),
                                    )
    
                                    # Send notification and text resource to all sessions
                                    for session in sessions:
                                        await session.send_notification(notification)
                                        await session.send_resource(text_resource)
    
                                    # Remove from subscriptions since we've notified
                                    del self._subscriptions[prediction_id]
                                else:
                                    # For failed or canceled generations, create text resource
                                    resource = TextResourceContents(
                                        uri=f"generations://{prediction_id}",
                                        mimeType="application/json",
                                        text=json.dumps(
                                            {
                                                "status": result["status"],
                                                "error": result.get("error"),
                                                "created_at": result.get("created_at"),
                                                "completed_at": result.get("completed_at"),
                                                "metrics": result.get("metrics", {}),
                                                "urls": result.get("urls", {}),
                                            },
                                            indent=2,
                                        ),
                                    )
    
                                # Send notification with the resource
                                notification = ResourceUpdatedNotification(
                                    method="notifications/resources/updated",
                                    params={"uri": AnyUrl(f"generations://{prediction_id}")},
                                )
                                for session in sessions:
                                    await session.send_notification(notification)
                                    # Also send the resource directly
                                    await session.send_resource(resource)
    
                                # Remove completed/failed generation from subscriptions
                                del self._subscriptions[prediction_id]
                        except Exception as e:
                            logger.error(f"Error checking generation {prediction_id}: {e}")
    
                    if not self._subscriptions:
                        break
    
                    await asyncio.sleep(2.0)  # Poll every 2 seconds
                except asyncio.CancelledError:
                    break
                except Exception as e:
                    logger.error(f"Error in generation check loop: {e}")
                    await asyncio.sleep(5.0)  # Back off on errors
  • The @mcp.tool() decorator registers the subscribe_to_generation function as an MCP tool in the FastMCP server instance.
    @mcp.tool()
    async def subscribe_to_generation(request: SubscriptionRequest) -> EmptyResult:
        """Handle resource subscription requests."""
        if request.uri.startswith("generations://"):
            session = ServerSession(request.session_id)
            await subscription_manager.subscribe(request.uri, session)
        return EmptyResult()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/gerred/mcp-server-replicate'

If you have feedback or need assistance with the MCP directory API, please join our Discord server