Skip to main content
Glama
langfuse.py12.3 kB
"""Langfuse provider client for Shepherd MCP.""" from __future__ import annotations import base64 import os from datetime import datetime from typing import Any import httpx from shepherd_mcp.models.langfuse import ( LangfuseObservation, LangfuseObservationsResponse, LangfuseScore, LangfuseScoresResponse, LangfuseSession, LangfuseSessionsResponse, LangfuseTrace, LangfuseTracesResponse, ) from shepherd_mcp.providers.base import ( AuthenticationError, BaseProvider, NotFoundError, ProviderError, RateLimitError, ) class LangfuseClient(BaseProvider): """Client for Langfuse API. Uses Basic Auth with public_key:secret_key. API Reference: https://api.reference.langfuse.com/ """ DEFAULT_HOST = "https://cloud.langfuse.com" def __init__( self, public_key: str | None = None, secret_key: str | None = None, host: str | None = None, ) -> None: """Initialize the client. Args: public_key: Langfuse public API key. If not provided, reads from LANGFUSE_PUBLIC_KEY env var. secret_key: Langfuse secret API key. If not provided, reads from LANGFUSE_SECRET_KEY env var. host: Langfuse host URL. If not provided, reads from LANGFUSE_HOST env var or defaults to cloud.langfuse.com. """ self.public_key = public_key or os.environ.get("LANGFUSE_PUBLIC_KEY") self.secret_key = secret_key or os.environ.get("LANGFUSE_SECRET_KEY") if not self.public_key or not self.secret_key: raise AuthenticationError( "No API keys provided. Set LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY " "environment variables." ) self.host = (host or os.environ.get("LANGFUSE_HOST", self.DEFAULT_HOST)).rstrip("/") # Create Basic Auth header credentials = f"{self.public_key}:{self.secret_key}" encoded = base64.b64encode(credentials.encode()).decode() self._auth_header = f"Basic {encoded}" self._client = httpx.Client( timeout=30.0, headers={ "Authorization": self._auth_header, "Content-Type": "application/json", }, ) @property def name(self) -> str: """Return the provider name.""" return "langfuse" def _handle_error_response(self, response: httpx.Response) -> None: """Handle error responses from the API.""" if response.status_code == 401: raise AuthenticationError("Authentication failed. Check your API keys.") if response.status_code == 404: try: detail = response.json().get("message", "Resource not found") except Exception: detail = "Resource not found" raise NotFoundError(detail) if response.status_code == 429: raise RateLimitError("Rate limit exceeded. Please try again later.") if response.status_code >= 400: try: detail = response.json().get("message", f"HTTP {response.status_code}") except Exception: detail = f"HTTP {response.status_code}" raise ProviderError(detail) def _parse_timestamp(self, timestamp: str | None) -> str | None: """Parse timestamp to ISO 8601 format.""" if not timestamp: return None # If already in ISO format, return as-is if "T" in timestamp: return timestamp # Try common date formats formats = [ "%Y-%m-%d", "%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", ] for fmt in formats: try: dt = datetime.strptime(timestamp, fmt) return dt.isoformat() + "Z" except ValueError: continue # Return as-is if no format matches return timestamp def _get(self, path: str, params: dict[str, Any] | None = None) -> dict[str, Any]: """Make a GET request.""" response = self._client.get(f"{self.host}{path}", params=params) self._handle_error_response(response) return response.json() def _post(self, path: str, json: dict[str, Any] | None = None) -> dict[str, Any]: """Make a POST request.""" response = self._client.post(f"{self.host}{path}", json=json) self._handle_error_response(response) return response.json() # ======================================================================== # Traces API # ======================================================================== def list_traces( self, limit: int = 50, page: int = 1, user_id: str | None = None, name: str | None = None, session_id: str | None = None, tags: list[str] | None = None, from_timestamp: str | None = None, to_timestamp: str | None = None, ) -> LangfuseTracesResponse: """List traces with pagination and filters. Args: limit: Maximum number of results per page. page: Page number (1-indexed). user_id: Filter by user ID. name: Filter by trace name. session_id: Filter by session ID. tags: Filter by tags. from_timestamp: Filter by start timestamp. to_timestamp: Filter by end timestamp. Returns: LangfuseTracesResponse with traces data and pagination meta. """ params: dict[str, Any] = { "limit": limit, "page": page, } if user_id: params["userId"] = user_id if name: params["name"] = name if session_id: params["sessionId"] = session_id if tags: params["tags"] = tags if from_timestamp: params["fromTimestamp"] = self._parse_timestamp(from_timestamp) if to_timestamp: params["toTimestamp"] = self._parse_timestamp(to_timestamp) data = self._get("/api/public/traces", params) return LangfuseTracesResponse(**data) def get_trace(self, trace_id: str) -> LangfuseTrace: """Get a specific trace with its observations. Args: trace_id: The trace ID to fetch. Returns: LangfuseTrace with full trace data including observations. """ data = self._get(f"/api/public/traces/{trace_id}") return LangfuseTrace(**data) # ======================================================================== # Sessions API # ======================================================================== def list_sessions( self, limit: int = 50, page: int = 1, from_timestamp: str | None = None, to_timestamp: str | None = None, ) -> LangfuseSessionsResponse: """List sessions with pagination. Args: limit: Maximum number of results per page. page: Page number (1-indexed). from_timestamp: Filter by start timestamp. to_timestamp: Filter by end timestamp. Returns: LangfuseSessionsResponse with sessions data and pagination meta. """ params: dict[str, Any] = { "limit": limit, "page": page, } if from_timestamp: params["fromTimestamp"] = self._parse_timestamp(from_timestamp) if to_timestamp: params["toTimestamp"] = self._parse_timestamp(to_timestamp) data = self._get("/api/public/sessions", params) return LangfuseSessionsResponse(**data) def get_session(self, session_id: str) -> LangfuseSession: """Get a specific session. Args: session_id: The session ID to fetch. Returns: LangfuseSession with session data. """ data = self._get(f"/api/public/sessions/{session_id}") return LangfuseSession(**data) # ======================================================================== # Observations API # ======================================================================== def list_observations( self, limit: int = 50, page: int = 1, name: str | None = None, user_id: str | None = None, trace_id: str | None = None, obs_type: str | None = None, from_timestamp: str | None = None, to_timestamp: str | None = None, ) -> LangfuseObservationsResponse: """List observations with pagination and filters. Args: limit: Maximum number of results per page. page: Page number (1-indexed). name: Filter by observation name. user_id: Filter by user ID. trace_id: Filter by trace ID. obs_type: Filter by type (GENERATION, SPAN, EVENT). from_timestamp: Filter by start timestamp. to_timestamp: Filter by end timestamp. Returns: LangfuseObservationsResponse with observations data. """ params: dict[str, Any] = { "limit": limit, "page": page, } if name: params["name"] = name if user_id: params["userId"] = user_id if trace_id: params["traceId"] = trace_id if obs_type: params["type"] = obs_type if from_timestamp: params["fromTimestamp"] = self._parse_timestamp(from_timestamp) if to_timestamp: params["toTimestamp"] = self._parse_timestamp(to_timestamp) data = self._get("/api/public/observations", params) return LangfuseObservationsResponse(**data) def get_observation(self, observation_id: str) -> LangfuseObservation: """Get a specific observation. Args: observation_id: The observation ID to fetch. Returns: LangfuseObservation with observation data. """ data = self._get(f"/api/public/observations/{observation_id}") return LangfuseObservation(**data) # ======================================================================== # Scores API # ======================================================================== def list_scores( self, limit: int = 50, page: int = 1, name: str | None = None, user_id: str | None = None, trace_id: str | None = None, from_timestamp: str | None = None, to_timestamp: str | None = None, ) -> LangfuseScoresResponse: """List scores with pagination and filters. Args: limit: Maximum number of results per page. page: Page number (1-indexed). name: Filter by score name. user_id: Filter by user ID. trace_id: Filter by trace ID. from_timestamp: Filter by start timestamp. to_timestamp: Filter by end timestamp. Returns: LangfuseScoresResponse with scores data. """ params: dict[str, Any] = { "limit": limit, "page": page, } if name: params["name"] = name if user_id: params["userId"] = user_id if trace_id: params["traceId"] = trace_id if from_timestamp: params["fromTimestamp"] = self._parse_timestamp(from_timestamp) if to_timestamp: params["toTimestamp"] = self._parse_timestamp(to_timestamp) data = self._get("/api/public/scores", params) return LangfuseScoresResponse(**data) def get_score(self, score_id: str) -> LangfuseScore: """Get a specific score. Args: score_id: The score ID to fetch. Returns: LangfuseScore with score data. """ data = self._get(f"/api/public/scores/{score_id}") return LangfuseScore(**data) def close(self) -> None: """Close the HTTP client.""" self._client.close() def __enter__(self) -> LangfuseClient: return self def __exit__(self, *args) -> None: self.close()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/neuralis-in/shepherd-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server