Skip to main content
Glama
replicate.py1.59 kB
""" Replicate service for running models """ import os from typing import Any, Dict, Optional try: import replicate REPLICATE_AVAILABLE = True except ImportError: REPLICATE_AVAILABLE = False class ReplicateService: """Service for interacting with the Replicate API""" def __init__(self, api_key: Optional[str] = None): if not REPLICATE_AVAILABLE: raise ImportError("Replicate library not available. Install with: pip install replicate") self.api_key = api_key or os.getenv("REPLICATE_API_TOKEN") if not self.api_key: raise ValueError("Replicate API key is required") # The replicate client will use the provided token for all calls. self.client = replicate.Client(api_token=self.api_key) async def run(self, model: str, input: Dict[str, Any]) -> Any: """Run a model on Replicate using the native async method.""" try: # Use the client's native async_run method output = await self.client.async_run(model, input) return output except Exception as e: print(f"Replicate run error: {e}") return None def get_replicate_service(api_key: Optional[str] = None) -> Optional[ReplicateService]: """ Get a Replicate service instance Args: api_key: Optional API key. If not provided, will try to get from environment Returns: ReplicateService instance or None if not available """ try: return ReplicateService(api_key) except (ImportError, ValueError): return None

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/GeorgeStrakhov/mcpeasy'

If you have feedback or need assistance with the MCP directory API, please join our Discord server