Skip to main content
Glama
gerred

MCP Server Replicate

unsubscribe_from_generation

Cancel active subscriptions to AI image generation resources to stop receiving real-time updates and manage session-based connections.

Instructions

Handle resource unsubscribe requests.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
requestYes

Implementation Reference

  • The main handler function for the 'unsubscribe_from_generation' tool. It takes a SubscriptionRequest, extracts the session, and calls the subscription manager's unsubscribe method for generation resources.
    @mcp.tool()
    async def unsubscribe_from_generation(request: SubscriptionRequest) -> EmptyResult:
        """Handle resource unsubscribe requests."""
        if request.uri.startswith("generations://"):
            session = ServerSession(request.session_id)
            await subscription_manager.unsubscribe(request.uri, session)
        return EmptyResult()
  • Pydantic model defining the input schema for both subscribe_to_generation and unsubscribe_from_generation tools, including uri and session_id fields.
    class SubscriptionRequest(BaseModel):
        """Request model for subscription operations."""
    
        uri: str = Field(..., description="Resource URI to subscribe to")
        session_id: str = Field(..., description="ID of the session making the request")
  • Core unsubscribe logic within the GenerationSubscriptionManager class, which removes the session from the prediction's subscribers and cleans up if no more subscribers.
    async def unsubscribe(self, uri: str, session: ServerSession):
        """Unsubscribe a session from generation updates."""
        prediction_id = uri.replace("generations://", "")
        if prediction_id in self._subscriptions:
            self._subscriptions[prediction_id].discard(session)
            if not self._subscriptions[prediction_id]:
                del self._subscriptions[prediction_id]
    
        # Stop checking if no more subscriptions
        if not self._subscriptions and self._check_task:
            self._check_task.cancel()
            self._check_task = None
  • The GenerationSubscriptionManager class that handles subscription management for generation updates, including unsubscribe functionality used by the tool.
    class GenerationSubscriptionManager:
        """Manages subscriptions to generation resources."""
    
        def __init__(self):
            self._subscriptions: dict[str, set[ServerSession]] = {}
            self._check_task: asyncio.Task | None = None
    
        async def subscribe(self, uri: str, session: ServerSession):
            """Subscribe a session to generation updates."""
            prediction_id = uri.replace("generations://", "")
            if prediction_id not in self._subscriptions:
                self._subscriptions[prediction_id] = set()
            self._subscriptions[prediction_id].add(session)
    
            # Start checking if not already running
            if not self._check_task:
                self._check_task = asyncio.create_task(self._check_generations())
    
        async def unsubscribe(self, uri: str, session: ServerSession):
            """Unsubscribe a session from generation updates."""
            prediction_id = uri.replace("generations://", "")
            if prediction_id in self._subscriptions:
                self._subscriptions[prediction_id].discard(session)
                if not self._subscriptions[prediction_id]:
                    del self._subscriptions[prediction_id]
    
            # Stop checking if no more subscriptions
            if not self._subscriptions and self._check_task:
                self._check_task.cancel()
                self._check_task = None
    
        async def _check_generations(self):
            """Periodically check subscribed generations and notify of updates."""
            async with ReplicateClient(api_token=os.getenv("REPLICATE_API_TOKEN")) as client:
                while True:
                    try:
                        for prediction_id, sessions in list(self._subscriptions.items()):
                            try:
                                result = await client.get_prediction_status(prediction_id)
                                # Notify on completion or failure
                                if result["status"] in ["succeeded", "failed", "canceled"]:
                                    # For succeeded generations with image output
                                    if result["status"] == "succeeded" and result.get("output"):
                                        # For image generation models, output is typically a list with the image URL as first item
                                        image_url = (
                                            result["output"][0] if isinstance(result["output"], list) else result["output"]
                                        )
    
                                        # First send a notification with just the URL and metadata
                                        notification = ResourceUpdatedNotification(
                                            method="notifications/resources/updated",
                                            params={"uri": f"generations://{prediction_id}"},
                                        )
    
                                        # Create text resource with metadata and URL
                                        text_resource = TextResourceContents(
                                            type="text",
                                            uri=f"generations://{prediction_id}",
                                            mimeType="application/json",
                                            text=json.dumps(
                                                {
                                                    "status": "succeeded",
                                                    "image_url": image_url,
                                                    "created_at": result.get("created_at"),
                                                    "completed_at": result.get("completed_at"),
                                                    "metrics": result.get("metrics", {}),
                                                    "urls": result.get("urls", {}),
                                                    "input": result.get("input", {}),
                                                },
                                                indent=2,
                                            ),
                                        )
    
                                        # Send notification and text resource to all sessions
                                        for session in sessions:
                                            await session.send_notification(notification)
                                            await session.send_resource(text_resource)
    
                                        # Remove from subscriptions since we've notified
                                        del self._subscriptions[prediction_id]
                                    else:
                                        # For failed or canceled generations, create text resource
                                        resource = TextResourceContents(
                                            uri=f"generations://{prediction_id}",
                                            mimeType="application/json",
                                            text=json.dumps(
                                                {
                                                    "status": result["status"],
                                                    "error": result.get("error"),
                                                    "created_at": result.get("created_at"),
                                                    "completed_at": result.get("completed_at"),
                                                    "metrics": result.get("metrics", {}),
                                                    "urls": result.get("urls", {}),
                                                },
                                                indent=2,
                                            ),
                                        )
    
                                    # Send notification with the resource
                                    notification = ResourceUpdatedNotification(
                                        method="notifications/resources/updated",
                                        params={"uri": AnyUrl(f"generations://{prediction_id}")},
                                    )
                                    for session in sessions:
                                        await session.send_notification(notification)
                                        # Also send the resource directly
                                        await session.send_resource(resource)
    
                                    # Remove completed/failed generation from subscriptions
                                    del self._subscriptions[prediction_id]
                            except Exception as e:
                                logger.error(f"Error checking generation {prediction_id}: {e}")
    
                        if not self._subscriptions:
                            break
    
                        await asyncio.sleep(2.0)  # Poll every 2 seconds
                    except asyncio.CancelledError:
                        break
                    except Exception as e:
                        logger.error(f"Error in generation check loop: {e}")
                        await asyncio.sleep(5.0)  # Back off on errors

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/gerred/mcp-server-replicate'

If you have feedback or need assistance with the MCP directory API, please join our Discord server