"""Async Sentry API client with connection pooling."""
from typing import Any
import httpx
from .config import SentryInstanceConfig
class SentryAPIError(Exception):
"""Exception raised for Sentry API errors."""
def __init__(
self,
message: str,
status_code: int | None = None,
response_body: str | None = None,
):
super().__init__(message)
self.status_code = status_code
self.response_body = response_body
class SentryClient:
"""Async client for Sentry REST API."""
def __init__(self, config: SentryInstanceConfig, timeout: float = 30.0):
"""Initialize the Sentry client.
Args:
config: Instance configuration with auth token and base URL
timeout: Request timeout in seconds
"""
if not config.auth_token:
raise ValueError(f"No auth token configured for instance '{config.name}'")
self.config = config
self.base_url = config.api_base_url
self.timeout = timeout
self._client: httpx.AsyncClient | None = None
@property
def headers(self) -> dict[str, str]:
"""Get request headers with authentication."""
return {
"Authorization": f"Bearer {self.config.auth_token}",
"Content-Type": "application/json",
}
async def _get_client(self) -> httpx.AsyncClient:
"""Get or create the HTTP client with connection pooling."""
if self._client is None or self._client.is_closed:
self._client = httpx.AsyncClient(
base_url=self.base_url,
headers=self.headers,
timeout=self.timeout,
)
return self._client
async def close(self) -> None:
"""Close the HTTP client."""
if self._client and not self._client.is_closed:
await self._client.aclose()
self._client = None
async def _request(
self,
method: str,
path: str,
params: dict[str, Any] | None = None,
json_data: dict[str, Any] | None = None,
) -> dict[str, Any] | list[Any] | bytes:
"""Make an API request.
Args:
method: HTTP method
path: API path (e.g., "/api/0/organizations/")
params: Query parameters
json_data: JSON body for POST/PUT requests
Returns:
Response data (dict, list, or bytes for attachments)
Raises:
SentryAPIError: If the request fails
"""
client = await self._get_client()
try:
response = await client.request(
method=method,
url=path,
params=params,
json=json_data,
)
response.raise_for_status()
# Handle binary responses (attachments)
content_type = response.headers.get("content-type", "")
if "application/json" in content_type:
return response.json()
return response.content
except httpx.HTTPStatusError as e:
error_body = e.response.text
error_message = self._parse_error_message(error_body, e.response.status_code)
raise SentryAPIError(
message=error_message,
status_code=e.response.status_code,
response_body=error_body,
) from e
except httpx.RequestError as e:
raise SentryAPIError(
message=f"Request failed: {e}",
) from e
def _parse_error_message(self, body: str, status_code: int) -> str:
"""Parse error message from response body."""
try:
import json
data = json.loads(body)
if isinstance(data, dict):
# Sentry error format: {"detail": "message"} or {"error": "message"}
if "detail" in data:
return f"Sentry API error ({status_code}): {data['detail']}"
if "error" in data:
return f"Sentry API error ({status_code}): {data['error']}"
return f"Sentry API error ({status_code}): {body}"
except Exception:
return f"Sentry API error ({status_code}): {body}"
async def get(
self, path: str, params: dict[str, Any] | None = None
) -> dict[str, Any] | list[Any] | bytes:
"""Make a GET request."""
return await self._request("GET", path, params=params)
async def post(
self,
path: str,
json_data: dict[str, Any] | None = None,
params: dict[str, Any] | None = None,
) -> dict[str, Any] | list[Any]:
"""Make a POST request."""
result = await self._request("POST", path, params=params, json_data=json_data)
if isinstance(result, bytes):
raise SentryAPIError("Unexpected binary response for POST request")
return result
# ===== Organization & Project Discovery =====
async def list_organizations(self) -> list[dict[str, Any]]:
"""List organizations accessible with this token.
Note: Internal Integration tokens only have access to their own org.
"""
result = await self.get("/api/0/organizations/")
return result if isinstance(result, list) else []
async def list_teams(self, org_slug: str | None = None) -> list[dict[str, Any]]:
"""List teams in an organization."""
org = org_slug or self.config.org_slug
result = await self.get(f"/api/0/organizations/{org}/teams/")
return result if isinstance(result, list) else []
async def list_projects(self, org_slug: str | None = None) -> list[dict[str, Any]]:
"""List projects in an organization."""
org = org_slug or self.config.org_slug
result = await self.get(f"/api/0/organizations/{org}/projects/")
return result if isinstance(result, list) else []
async def list_releases(
self,
org_slug: str | None = None,
project_slug: str | None = None,
query: str | None = None,
) -> list[dict[str, Any]]:
"""List releases in an organization or project."""
org = org_slug or self.config.org_slug
params: dict[str, Any] = {}
if project_slug:
params["project"] = project_slug
if query:
params["query"] = query
result = await self.get(f"/api/0/organizations/{org}/releases/", params=params)
return result if isinstance(result, list) else []
# ===== Issue Investigation =====
async def get_issue(self, issue_id: str) -> dict[str, Any]:
"""Get detailed information about a specific issue."""
result = await self.get(f"/api/0/issues/{issue_id}/")
return result if isinstance(result, dict) else {}
async def search_issues(
self,
project_slug: str,
org_slug: str | None = None,
query: str = "is:unresolved",
stats_period: str = "24h",
limit: int = 25,
cursor: str | None = None,
) -> dict[str, Any]:
"""Search for grouped issues in a project.
Args:
project_slug: Project slug to search in
org_slug: Organization slug (defaults to configured org)
query: Sentry search query (e.g., "is:unresolved level:error")
stats_period: Time period for stats (e.g., "24h", "14d")
limit: Maximum number of results
cursor: Pagination cursor
Returns:
Dict with 'issues' list and pagination info
"""
org = org_slug or self.config.org_slug
params: dict[str, Any] = {
"query": query,
"statsPeriod": stats_period,
"limit": limit,
}
if cursor:
params["cursor"] = cursor
result = await self.get(f"/api/0/projects/{org}/{project_slug}/issues/", params=params)
issues = result if isinstance(result, list) else []
return {"issues": issues, "count": len(issues)}
async def search_issue_events(
self,
issue_id: str,
full: bool = True,
limit: int = 25,
cursor: str | None = None,
) -> dict[str, Any]:
"""Search events within a specific issue.
Args:
issue_id: Issue ID to get events for
full: Include full event details
limit: Maximum number of results
cursor: Pagination cursor
Returns:
Dict with 'events' list
"""
params: dict[str, Any] = {
"full": str(full).lower(),
"limit": limit,
}
if cursor:
params["cursor"] = cursor
result = await self.get(f"/api/0/issues/{issue_id}/events/", params=params)
events = result if isinstance(result, list) else []
return {"events": events, "count": len(events)}
# ===== Event & Trace Analysis =====
async def search_events(
self,
org_slug: str | None = None,
project: str | None = None,
query: str | None = None,
field: list[str] | None = None,
stats_period: str = "24h",
per_page: int = 50,
) -> dict[str, Any]:
"""Search events and perform aggregations.
This is the primary tool for counting errors, getting statistics, etc.
Args:
org_slug: Organization slug (defaults to configured org)
project: Project slug to filter by
query: Sentry search query
field: Fields to return (e.g., ["count()", "title", "project"])
stats_period: Time period (e.g., "24h", "7d", "14d")
per_page: Results per page
Returns:
Dict with event data and metadata
"""
org = org_slug or self.config.org_slug
params: dict[str, Any] = {
"statsPeriod": stats_period,
"per_page": per_page,
}
if project:
params["project"] = project
if query:
params["query"] = query
if field:
params["field"] = field
result = await self.get(f"/api/0/organizations/{org}/events/", params=params)
if isinstance(result, dict):
return result
return {"data": result if isinstance(result, list) else []}
async def get_trace(
self,
trace_id: str,
org_slug: str | None = None,
) -> dict[str, Any]:
"""Get trace overview and span breakdown.
Args:
trace_id: Trace ID to look up
org_slug: Organization slug (defaults to configured org)
Returns:
Trace details with spans
"""
org = org_slug or self.config.org_slug
result = await self.get(f"/api/0/organizations/{org}/events-trace/{trace_id}/")
if isinstance(result, dict):
return result
return {"spans": result if isinstance(result, list) else []}
async def get_event_attachment(
self,
org_slug: str | None = None,
project_slug: str | None = None,
event_id: str | None = None,
attachment_id: str | None = None,
) -> bytes:
"""Download an attachment from an event.
Args:
org_slug: Organization slug
project_slug: Project slug
event_id: Event ID
attachment_id: Attachment ID
Returns:
Attachment content as bytes
"""
if not all([project_slug, event_id, attachment_id]):
raise ValueError("project_slug, event_id, and attachment_id are required")
org = org_slug or self.config.org_slug
result = await self.get(
f"/api/0/projects/{org}/{project_slug}/events/{event_id}/attachments/{attachment_id}/"
)
if isinstance(result, bytes):
return result
raise SentryAPIError("Expected binary attachment data")
class SentryClientManager:
"""Manages Sentry clients for multiple instances."""
def __init__(self) -> None:
self._clients: dict[str, SentryClient] = {}
def register(self, config: SentryInstanceConfig) -> None:
"""Register a Sentry instance configuration."""
if config.auth_token:
self._clients[config.name] = SentryClient(config)
def get(self, instance_name: str) -> SentryClient:
"""Get client for an instance."""
if instance_name not in self._clients:
raise ValueError(
f"Unknown Sentry instance: '{instance_name}'. "
f"Available: {list(self._clients.keys())}"
)
return self._clients[instance_name]
def list_instances(self) -> list[str]:
"""List all registered instance names."""
return list(self._clients.keys())
async def close_all(self) -> None:
"""Close all clients."""
for client in self._clients.values():
await client.close()