Skip to main content
Glama
gerred

MCP Server Replicate

subscribe_to_generation

Subscribe to receive real-time updates for AI image generation tasks, enabling webhook notifications and session-based resource tracking through the Replicate API.

Instructions

Handle resource subscription requests.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
requestYes

Implementation Reference

  • The handler function for the 'subscribe_to_generation' MCP tool. It takes a SubscriptionRequest with uri and session_id, extracts the session, and calls the subscription manager to subscribe to generation updates for the given resource URI.
    @mcp.tool()
    async def subscribe_to_generation(request: SubscriptionRequest) -> EmptyResult:
        """Handle resource subscription requests."""
        if request.uri.startswith("generations://"):
            session = ServerSession(request.session_id)
            await subscription_manager.subscribe(request.uri, session)
        return EmptyResult()
  • Pydantic model defining the input schema for the subscribe_to_generation tool.
    class SubscriptionRequest(BaseModel):
        """Request model for subscription operations."""
    
        uri: str = Field(..., description="Resource URI to subscribe to")
        session_id: str = Field(..., description="ID of the session making the request")
  • The subscribe method of GenerationSubscriptionManager that handles adding the session to the subscriptions dict for the prediction_id extracted from the URI and starts the background polling task if needed.
    async def subscribe(self, uri: str, session: ServerSession):
        """Subscribe a session to generation updates."""
        prediction_id = uri.replace("generations://", "")
        if prediction_id not in self._subscriptions:
            self._subscriptions[prediction_id] = set()
        self._subscriptions[prediction_id].add(session)
    
        # Start checking if not already running
        if not self._check_task:
            self._check_task = asyncio.create_task(self._check_generations())
  • The background polling method _check_generations in GenerationSubscriptionManager that periodically checks the status of subscribed predictions, sends notifications and resources to subscribed sessions when complete, and removes finished subscriptions.
    async def _check_generations(self):
        """Periodically check subscribed generations and notify of updates."""
        async with ReplicateClient(api_token=os.getenv("REPLICATE_API_TOKEN")) as client:
            while True:
                try:
                    for prediction_id, sessions in list(self._subscriptions.items()):
                        try:
                            result = await client.get_prediction_status(prediction_id)
                            # Notify on completion or failure
                            if result["status"] in ["succeeded", "failed", "canceled"]:
                                # For succeeded generations with image output
                                if result["status"] == "succeeded" and result.get("output"):
                                    # For image generation models, output is typically a list with the image URL as first item
                                    image_url = (
                                        result["output"][0] if isinstance(result["output"], list) else result["output"]
                                    )
    
                                    # First send a notification with just the URL and metadata
                                    notification = ResourceUpdatedNotification(
                                        method="notifications/resources/updated",
                                        params={"uri": f"generations://{prediction_id}"},
                                    )
    
                                    # Create text resource with metadata and URL
                                    text_resource = TextResourceContents(
                                        type="text",
                                        uri=f"generations://{prediction_id}",
                                        mimeType="application/json",
                                        text=json.dumps(
                                            {
                                                "status": "succeeded",
                                                "image_url": image_url,
                                                "created_at": result.get("created_at"),
                                                "completed_at": result.get("completed_at"),
                                                "metrics": result.get("metrics", {}),
                                                "urls": result.get("urls", {}),
                                                "input": result.get("input", {}),
                                            },
                                            indent=2,
                                        ),
                                    )
    
                                    # Send notification and text resource to all sessions
                                    for session in sessions:
                                        await session.send_notification(notification)
                                        await session.send_resource(text_resource)
    
                                    # Remove from subscriptions since we've notified
                                    del self._subscriptions[prediction_id]
                                else:
                                    # For failed or canceled generations, create text resource
                                    resource = TextResourceContents(
                                        uri=f"generations://{prediction_id}",
                                        mimeType="application/json",
                                        text=json.dumps(
                                            {
                                                "status": result["status"],
                                                "error": result.get("error"),
                                                "created_at": result.get("created_at"),
                                                "completed_at": result.get("completed_at"),
                                                "metrics": result.get("metrics", {}),
                                                "urls": result.get("urls", {}),
                                            },
                                            indent=2,
                                        ),
                                    )
    
                                # Send notification with the resource
                                notification = ResourceUpdatedNotification(
                                    method="notifications/resources/updated",
                                    params={"uri": AnyUrl(f"generations://{prediction_id}")},
                                )
                                for session in sessions:
                                    await session.send_notification(notification)
                                    # Also send the resource directly
                                    await session.send_resource(resource)
    
                                # Remove completed/failed generation from subscriptions
                                del self._subscriptions[prediction_id]
                        except Exception as e:
                            logger.error(f"Error checking generation {prediction_id}: {e}")
    
                    if not self._subscriptions:
                        break
    
                    await asyncio.sleep(2.0)  # Poll every 2 seconds
                except asyncio.CancelledError:
                    break
                except Exception as e:
                    logger.error(f"Error in generation check loop: {e}")
                    await asyncio.sleep(5.0)  # Back off on errors
  • The @mcp.tool() decorator registers the subscribe_to_generation function as an MCP tool in the FastMCP server instance.
    @mcp.tool()
    async def subscribe_to_generation(request: SubscriptionRequest) -> EmptyResult:
        """Handle resource subscription requests."""
        if request.uri.startswith("generations://"):
            session = ServerSession(request.session_id)
            await subscription_manager.subscribe(request.uri, session)
        return EmptyResult()
Behavior1/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

With no annotations provided, the description carries the full burden of behavioral disclosure but offers almost none. 'Handle resource subscription requests' doesn't indicate whether this creates a persistent subscription, what happens when subscribed, whether authentication is required, potential side effects, or what the tool actually does beyond the vague 'handle' verb. This is inadequate for a tool that presumably establishes some ongoing relationship with resources.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness4/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is extremely concise at just 4 words, which could be appropriate if it were more informative. However, this brevity results in under-specification rather than efficient communication. The single sentence is front-loaded but doesn't contain enough substance to be truly helpful.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness1/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

For a subscription tool with no annotations, no output schema, and 0% schema description coverage, the description is completely inadequate. It doesn't explain what 'generation' refers to, what subscribing achieves, what the response looks like, or any behavioral characteristics. Given the complexity implied by the sibling tools (prediction, generation, webhooks), this description leaves critical gaps about the tool's purpose and operation.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters2/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

The description provides zero information about parameters, while the schema has 0% description coverage for the single parameter 'request'. The schema shows 'request' contains 'uri' and 'session_id' sub-parameters, but neither the description nor schema descriptions explain what format the URI should follow, what constitutes a valid session_id, or what 'SubscriptionRequest' actually means in this context. The description fails to compensate for the complete lack of schema documentation.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose2/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description 'Handle resource subscription requests' is vague and tautological - it essentially restates the tool name 'subscribe_to_generation' without specifying what kind of resources or what 'generation' refers to. While it indicates this is for subscription operations, it doesn't clarify what 'generation' means in this context or distinguish it from sibling tools like 'unsubscribe_from_generation'.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines1/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

The description provides no guidance on when to use this tool versus alternatives. There's no mention of prerequisites, when this subscription is needed, or how it differs from related tools like 'unsubscribe_from_generation' or 'create_prediction'. The agent receives no contextual guidance about appropriate use cases.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/gerred/mcp-server-replicate'

If you have feedback or need assistance with the MCP directory API, please join our Discord server