unsubscribe_from_generation
Cancel active subscriptions to AI image generation resources to stop receiving real-time updates and manage session-based connections.
Instructions
Handle resource unsubscribe requests.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| request | Yes |
Implementation Reference
- The main handler function for the 'unsubscribe_from_generation' tool. It takes a SubscriptionRequest, extracts the session, and calls the subscription manager's unsubscribe method for generation resources.@mcp.tool() async def unsubscribe_from_generation(request: SubscriptionRequest) -> EmptyResult: """Handle resource unsubscribe requests.""" if request.uri.startswith("generations://"): session = ServerSession(request.session_id) await subscription_manager.unsubscribe(request.uri, session) return EmptyResult()
- Pydantic model defining the input schema for both subscribe_to_generation and unsubscribe_from_generation tools, including uri and session_id fields.class SubscriptionRequest(BaseModel): """Request model for subscription operations.""" uri: str = Field(..., description="Resource URI to subscribe to") session_id: str = Field(..., description="ID of the session making the request")
- Core unsubscribe logic within the GenerationSubscriptionManager class, which removes the session from the prediction's subscribers and cleans up if no more subscribers.async def unsubscribe(self, uri: str, session: ServerSession): """Unsubscribe a session from generation updates.""" prediction_id = uri.replace("generations://", "") if prediction_id in self._subscriptions: self._subscriptions[prediction_id].discard(session) if not self._subscriptions[prediction_id]: del self._subscriptions[prediction_id] # Stop checking if no more subscriptions if not self._subscriptions and self._check_task: self._check_task.cancel() self._check_task = None
- The GenerationSubscriptionManager class that handles subscription management for generation updates, including unsubscribe functionality used by the tool.class GenerationSubscriptionManager: """Manages subscriptions to generation resources.""" def __init__(self): self._subscriptions: dict[str, set[ServerSession]] = {} self._check_task: asyncio.Task | None = None async def subscribe(self, uri: str, session: ServerSession): """Subscribe a session to generation updates.""" prediction_id = uri.replace("generations://", "") if prediction_id not in self._subscriptions: self._subscriptions[prediction_id] = set() self._subscriptions[prediction_id].add(session) # Start checking if not already running if not self._check_task: self._check_task = asyncio.create_task(self._check_generations()) async def unsubscribe(self, uri: str, session: ServerSession): """Unsubscribe a session from generation updates.""" prediction_id = uri.replace("generations://", "") if prediction_id in self._subscriptions: self._subscriptions[prediction_id].discard(session) if not self._subscriptions[prediction_id]: del self._subscriptions[prediction_id] # Stop checking if no more subscriptions if not self._subscriptions and self._check_task: self._check_task.cancel() self._check_task = None async def _check_generations(self): """Periodically check subscribed generations and notify of updates.""" async with ReplicateClient(api_token=os.getenv("REPLICATE_API_TOKEN")) as client: while True: try: for prediction_id, sessions in list(self._subscriptions.items()): try: result = await client.get_prediction_status(prediction_id) # Notify on completion or failure if result["status"] in ["succeeded", "failed", "canceled"]: # For succeeded generations with image output if result["status"] == "succeeded" and result.get("output"): # For image generation models, output is typically a list with the image URL as first item image_url = ( result["output"][0] if isinstance(result["output"], list) else result["output"] ) # First send a notification with just the URL and metadata notification = ResourceUpdatedNotification( method="notifications/resources/updated", params={"uri": f"generations://{prediction_id}"}, ) # Create text resource with metadata and URL text_resource = TextResourceContents( type="text", uri=f"generations://{prediction_id}", mimeType="application/json", text=json.dumps( { "status": "succeeded", "image_url": image_url, "created_at": result.get("created_at"), "completed_at": result.get("completed_at"), "metrics": result.get("metrics", {}), "urls": result.get("urls", {}), "input": result.get("input", {}), }, indent=2, ), ) # Send notification and text resource to all sessions for session in sessions: await session.send_notification(notification) await session.send_resource(text_resource) # Remove from subscriptions since we've notified del self._subscriptions[prediction_id] else: # For failed or canceled generations, create text resource resource = TextResourceContents( uri=f"generations://{prediction_id}", mimeType="application/json", text=json.dumps( { "status": result["status"], "error": result.get("error"), "created_at": result.get("created_at"), "completed_at": result.get("completed_at"), "metrics": result.get("metrics", {}), "urls": result.get("urls", {}), }, indent=2, ), ) # Send notification with the resource notification = ResourceUpdatedNotification( method="notifications/resources/updated", params={"uri": AnyUrl(f"generations://{prediction_id}")}, ) for session in sessions: await session.send_notification(notification) # Also send the resource directly await session.send_resource(resource) # Remove completed/failed generation from subscriptions del self._subscriptions[prediction_id] except Exception as e: logger.error(f"Error checking generation {prediction_id}: {e}") if not self._subscriptions: break await asyncio.sleep(2.0) # Poll every 2 seconds except asyncio.CancelledError: break except Exception as e: logger.error(f"Error in generation check loop: {e}") await asyncio.sleep(5.0) # Back off on errors