Skip to main content
Glama
telemetry_service.py7.31 kB
"""Telemetry service for context protection metrics. Tracks pagination, summarization, and token budget metrics for observability. Based on data-model.md TelemetryRecord entity. """ import time from datetime import datetime from typing import Any from pydantic import BaseModel class TelemetryRecord(BaseModel): """Per-request telemetry record. Attributes: request_id: Correlation ID for request tracing endpoint: API endpoint path timestamp: Request timestamp estimated_tokens: Estimated tokens in response response_bytes: Actual response size in bytes item_count: Number of items returned (for lists) latency_ms: Request processing time in milliseconds pagination_used: Whether pagination was applied summarization_used: Whether summarization was triggered chunking_used: Whether content chunking was applied optimization_metadata: Additional optimization details """ request_id: str endpoint: str timestamp: datetime estimated_tokens: int response_bytes: int item_count: int latency_ms: int pagination_used: bool summarization_used: bool chunking_used: bool optimization_metadata: dict[str, Any] = {} @property def tokens_per_item(self) -> float: """Calculate average tokens per item. Returns: Tokens per item (0.0 if no items) """ if self.item_count == 0: return 0.0 return self.estimated_tokens / self.item_count class TelemetryService: """Service for tracking context protection metrics. Maintains in-memory metrics and provides aggregation for observability. """ def __init__(self, max_records: int = 10000) -> None: """Initialize telemetry service. Args: max_records: Maximum records to keep in memory """ self.max_records = max_records self.records: list[TelemetryRecord] = [] self._start_time = time.time() def record_request( self, request_id: str, endpoint: str, estimated_tokens: int, response_bytes: int, item_count: int, latency_ms: int, pagination_used: bool = False, summarization_used: bool = False, chunking_used: bool = False, optimization_metadata: dict[str, Any] | None = None, ) -> None: """Record request metrics. Args: request_id: Correlation ID endpoint: API endpoint path estimated_tokens: Estimated response tokens response_bytes: Response size in bytes item_count: Number of items latency_ms: Latency in milliseconds pagination_used: Pagination applied summarization_used: Summarization applied chunking_used: Chunking applied optimization_metadata: Additional metadata """ record = TelemetryRecord( request_id=request_id, endpoint=endpoint, timestamp=datetime.now(), estimated_tokens=estimated_tokens, response_bytes=response_bytes, item_count=item_count, latency_ms=latency_ms, pagination_used=pagination_used, summarization_used=summarization_used, chunking_used=chunking_used, optimization_metadata=optimization_metadata or {}, ) self.records.append(record) # Trim old records if exceeding max if len(self.records) > self.max_records: self.records = self.records[-self.max_records :] def get_metrics(self) -> dict[str, Any]: """Get aggregated metrics. Returns: Dict with aggregated telemetry metrics """ if not self.records: return { "total_requests": 0, "pagination_adoption": 0.0, "summarization_adoption": 0.0, "avg_response_size": 0, "avg_latency_ms": 0, "oversized_events": 0, "uptime_seconds": time.time() - self._start_time, } total_requests = len(self.records) pagination_count = sum(1 for r in self.records if r.pagination_used) summarization_count = sum(1 for r in self.records if r.summarization_used) total_bytes = sum(r.response_bytes for r in self.records) total_latency = sum(r.latency_ms for r in self.records) # Count oversized events (>4000 tokens before optimization) oversized_events = sum(1 for r in self.records if r.estimated_tokens > 4000) return { "total_requests": total_requests, "pagination_adoption": pagination_count / total_requests, "summarization_adoption": summarization_count / total_requests, "avg_response_size": total_bytes // total_requests, "avg_latency_ms": total_latency // total_requests, "oversized_events": oversized_events, "uptime_seconds": time.time() - self._start_time, } def get_endpoint_metrics(self, endpoint: str) -> dict[str, Any]: """Get metrics for specific endpoint. Args: endpoint: API endpoint path Returns: Dict with endpoint-specific metrics """ endpoint_records = [r for r in self.records if r.endpoint == endpoint] if not endpoint_records: return { "endpoint": endpoint, "total_requests": 0, "pagination_rate": 0.0, "summarization_rate": 0.0, "avg_tokens": 0, "avg_items": 0, } total_requests = len(endpoint_records) pagination_count = sum(1 for r in endpoint_records if r.pagination_used) summarization_count = sum(1 for r in endpoint_records if r.summarization_used) total_tokens = sum(r.estimated_tokens for r in endpoint_records) total_items = sum(r.item_count for r in endpoint_records) return { "endpoint": endpoint, "total_requests": total_requests, "pagination_rate": pagination_count / total_requests, "summarization_rate": summarization_count / total_requests, "avg_tokens": total_tokens // total_requests, "avg_items": total_items // total_requests, } def get_recent_records(self, limit: int = 100) -> list[dict[str, Any]]: """Get recent telemetry records. Args: limit: Maximum records to return Returns: List of recent records as dicts """ recent = self.records[-limit:] return [r.model_dump() for r in recent] def clear_records(self) -> None: """Clear all telemetry records.""" self.records.clear() # Global singleton instance _telemetry_service: TelemetryService | None = None def get_telemetry_service() -> TelemetryService: """Get global telemetry service instance. Returns: Singleton TelemetryService instance """ global _telemetry_service if _telemetry_service is None: _telemetry_service = TelemetryService() return _telemetry_service

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/darrentmorgan/hostaway-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server