# -*- coding: utf-8 -*-
"""Location: ./mcpgateway/routers/observability.py
Copyright 2025
SPDX-License-Identifier: Apache-2.0
Authors: Mihai Criveti
Observability API Router.
Provides REST endpoints for querying traces, spans, events, and metrics.
"""
# Standard
from datetime import datetime, timedelta
from typing import List, Optional
# Third-Party
from fastapi import APIRouter, Depends, HTTPException, Query
import orjson
from sqlalchemy import text
from sqlalchemy.orm import Session
# First-Party
from mcpgateway.db import SessionLocal
from mcpgateway.middleware.rbac import get_current_user_with_permissions, require_permission
from mcpgateway.schemas import ObservabilitySpanRead, ObservabilityTraceRead, ObservabilityTraceWithSpans
from mcpgateway.services.observability_service import ObservabilityService
router = APIRouter(prefix="/observability", tags=["Observability"])
def get_db():
"""Database session dependency.
Commits the transaction on successful completion to avoid implicit rollbacks
for read-only operations. Rolls back explicitly on exception.
Yields:
Session: SQLAlchemy database session
Raises:
Exception: Re-raises any exception after rolling back the transaction.
"""
db = SessionLocal()
try:
yield db
db.commit()
except Exception:
try:
db.rollback()
except Exception:
try:
db.invalidate()
except Exception:
pass # nosec B110 - Best effort cleanup on connection failure
raise
finally:
db.close()
@router.get("/traces", response_model=List[ObservabilityTraceRead])
@require_permission("admin.system_config")
async def list_traces(
start_time: Optional[datetime] = Query(None, description="Filter traces after this time"),
end_time: Optional[datetime] = Query(None, description="Filter traces before this time"),
min_duration_ms: Optional[float] = Query(None, ge=0, description="Minimum duration in milliseconds"),
max_duration_ms: Optional[float] = Query(None, ge=0, description="Maximum duration in milliseconds"),
status: Optional[str] = Query(None, description="Filter by status (ok, error)"),
http_status_code: Optional[int] = Query(None, description="Filter by HTTP status code"),
http_method: Optional[str] = Query(None, description="Filter by HTTP method (GET, POST, etc.)"),
user_email: Optional[str] = Query(None, description="Filter by user email"),
attribute_search: Optional[str] = Query(None, description="Free-text search within trace attributes"),
limit: int = Query(100, ge=1, le=1000, description="Maximum results"),
offset: int = Query(0, ge=0, description="Result offset"),
db: Session = Depends(get_db),
_user=Depends(get_current_user_with_permissions),
):
"""List traces with optional filtering.
Query traces with various filters including time range, duration, status, HTTP method,
HTTP status code, user email, and attribute search. Results are paginated.
Note: For structured attribute filtering (key-value pairs with AND logic),
use a JSON request body via POST endpoint or the Python SDK.
Args:
start_time: Filter traces after this time
end_time: Filter traces before this time
min_duration_ms: Minimum duration in milliseconds
max_duration_ms: Maximum duration in milliseconds
status: Filter by status (ok, error)
http_status_code: Filter by HTTP status code
http_method: Filter by HTTP method (GET, POST, etc.)
user_email: Filter by user email
attribute_search: Free-text search across all trace attributes
limit: Maximum results
offset: Result offset
db: Database session
Returns:
List[ObservabilityTraceRead]: List of traces matching filters
Examples:
>>> import asyncio
>>> import mcpgateway.routers.observability as obs
>>> from mcpgateway.config import settings
>>> class FakeTrace:
... def __init__(self, trace_id='t1'):
... self.trace_id = trace_id
... self.name = 'n'
... self.start_time = None
... self.end_time = None
... self.duration_ms = 100
... self.status = 'ok'
... self.http_method = 'GET'
... self.http_url = '/'
... self.http_status_code = 200
... self.user_email = 'u'
>>> class FakeService:
... def query_traces(self, **kwargs):
... return [FakeTrace('t1')]
>>> obs.ObservabilityService = FakeService
>>> async def run_list_traces():
... traces = await obs.list_traces(db=None, _user={"email": settings.platform_admin_email, "db": None})
... return traces[0].trace_id
>>> asyncio.run(run_list_traces())
't1'
"""
service = ObservabilityService()
traces = service.query_traces(
db=db,
start_time=start_time,
end_time=end_time,
min_duration_ms=min_duration_ms,
max_duration_ms=max_duration_ms,
status=status,
http_status_code=http_status_code,
http_method=http_method,
user_email=user_email,
attribute_search=attribute_search,
limit=limit,
offset=offset,
)
return traces
@router.post("/traces/query", response_model=List[ObservabilityTraceRead])
@require_permission("admin.system_config")
async def query_traces_advanced(
# Third-Party
request_body: dict,
db: Session = Depends(get_db),
_user=Depends(get_current_user_with_permissions),
):
"""Advanced trace querying with attribute filtering.
POST endpoint that accepts a JSON body with complex filtering criteria,
including structured attribute filters with AND logic.
Request Body:
{
"start_time": "2025-01-01T00:00:00Z", # Optional datetime
"end_time": "2025-01-02T00:00:00Z", # Optional datetime
"min_duration_ms": 100.0, # Optional float
"max_duration_ms": 5000.0, # Optional float
"status": "error", # Optional string
"http_status_code": 500, # Optional int
"http_method": "POST", # Optional string
"user_email": "user@example.com", # Optional string
"attribute_filters": { # Optional dict (AND logic)
"http.route": "/api/tools",
"service.name": "mcp-gateway"
},
"attribute_search": "error", # Optional string (OR logic)
"limit": 100, # Optional int
"offset": 0 # Optional int
}
Args:
request_body: JSON request body with filter criteria
db: Database session
Returns:
List[ObservabilityTraceRead]: List of traces matching filters
Raises:
HTTPException: 400 error if request body is invalid
Examples:
>>> import asyncio
>>> from fastapi import HTTPException
>>> from mcpgateway.config import settings
>>> async def run_invalid_query():
... try:
... await query_traces_advanced({"start_time": "not-a-date"}, db=None, _user={"email": settings.platform_admin_email, "db": None})
... except HTTPException as e:
... return (e.status_code, "Invalid request body" in str(e.detail))
>>> asyncio.run(run_invalid_query())
(400, True)
>>> import mcpgateway.routers.observability as obs
>>> class FakeTrace:
... def __init__(self):
... self.trace_id = 'tx'
... self.name = 'n'
>>> class FakeService2:
... def query_traces(self, **kwargs):
... return [FakeTrace()]
>>> obs.ObservabilityService = FakeService2
>>> async def run_query_traces():
... traces = await obs.query_traces_advanced({}, db=None, _user={"email": settings.platform_admin_email, "db": None})
... return traces[0].trace_id
>>> asyncio.run(run_query_traces())
'tx'
"""
# Third-Party
from pydantic import ValidationError
try:
# Extract filters from request body
service = ObservabilityService()
# Parse datetime strings if provided
start_time = request_body.get("start_time")
if start_time and isinstance(start_time, str):
start_time = datetime.fromisoformat(start_time.replace("Z", "+00:00"))
end_time = request_body.get("end_time")
if end_time and isinstance(end_time, str):
end_time = datetime.fromisoformat(end_time.replace("Z", "+00:00"))
traces = service.query_traces(
db=db,
start_time=start_time,
end_time=end_time,
min_duration_ms=request_body.get("min_duration_ms"),
max_duration_ms=request_body.get("max_duration_ms"),
status=request_body.get("status"),
status_in=request_body.get("status_in"),
status_not_in=request_body.get("status_not_in"),
http_status_code=request_body.get("http_status_code"),
http_status_code_in=request_body.get("http_status_code_in"),
http_method=request_body.get("http_method"),
http_method_in=request_body.get("http_method_in"),
user_email=request_body.get("user_email"),
user_email_in=request_body.get("user_email_in"),
attribute_filters=request_body.get("attribute_filters"),
attribute_filters_or=request_body.get("attribute_filters_or"),
attribute_search=request_body.get("attribute_search"),
name_contains=request_body.get("name_contains"),
order_by=request_body.get("order_by", "start_time_desc"),
limit=request_body.get("limit", 100),
offset=request_body.get("offset", 0),
)
return traces
except (ValidationError, ValueError) as e:
raise HTTPException(status_code=400, detail=f"Invalid request body: {e}")
@router.get("/traces/{trace_id}", response_model=ObservabilityTraceWithSpans)
@require_permission("admin.system_config")
async def get_trace(trace_id: str, db: Session = Depends(get_db), _user=Depends(get_current_user_with_permissions)):
"""Get a trace by ID with all its spans and events.
Returns a complete trace with all nested spans and their events,
providing a full view of the request flow.
Args:
trace_id: UUID of the trace to retrieve
db: Database session
Returns:
ObservabilityTraceWithSpans: Complete trace with all spans and events
Raises:
HTTPException: 404 if trace not found
Examples:
>>> import asyncio
>>> import mcpgateway.routers.observability as obs
>>> from mcpgateway.config import settings
>>> class FakeService:
... def get_trace_with_spans(self, db, trace_id):
... return None
>>> obs.ObservabilityService = FakeService
>>> async def run_missing_trace():
... try:
... await obs.get_trace("missing", db=None, _user={"email": settings.platform_admin_email, "db": None})
... except obs.HTTPException as e:
... return e.status_code
>>> asyncio.run(run_missing_trace())
404
>>> class FakeService2:
... def get_trace_with_spans(self, db, trace_id):
... return {'trace_id': trace_id}
>>> obs.ObservabilityService = FakeService2
>>> async def run_found_trace():
... trace = await obs.get_trace("found", db=None, _user={"email": settings.platform_admin_email, "db": None})
... return trace["trace_id"]
>>> asyncio.run(run_found_trace())
'found'
"""
service = ObservabilityService()
trace = service.get_trace_with_spans(db, trace_id)
if not trace:
raise HTTPException(status_code=404, detail="Trace not found")
return trace
@router.get("/spans", response_model=List[ObservabilitySpanRead])
@require_permission("admin.system_config")
async def list_spans(
trace_id: Optional[str] = Query(None, description="Filter by trace ID"),
resource_type: Optional[str] = Query(None, description="Filter by resource type"),
resource_name: Optional[str] = Query(None, description="Filter by resource name"),
start_time: Optional[datetime] = Query(None, description="Filter spans after this time"),
end_time: Optional[datetime] = Query(None, description="Filter spans before this time"),
limit: int = Query(100, ge=1, le=1000, description="Maximum results"),
offset: int = Query(0, ge=0, description="Result offset"),
db: Session = Depends(get_db),
_user=Depends(get_current_user_with_permissions),
):
"""List spans with optional filtering.
Query spans by trace ID, resource type, resource name, or time range.
Useful for analyzing specific operations or resource performance.
Args:
trace_id: Filter by trace ID
resource_type: Filter by resource type
resource_name: Filter by resource name
start_time: Filter spans after this time
end_time: Filter spans before this time
limit: Maximum results
offset: Result offset
db: Database session
Returns:
List[ObservabilitySpanRead]: List of spans matching filters
Examples:
>>> import asyncio
>>> import mcpgateway.routers.observability as obs
>>> from mcpgateway.config import settings
>>> class FakeSpan:
... def __init__(self):
... self.span_id = 's1'
... self.trace_id = 't1'
... self.name = 'op'
>>> class FakeService:
... def query_spans(self, **kwargs):
... return [FakeSpan()]
>>> obs.ObservabilityService = FakeService
>>> async def run_list_spans():
... spans = await obs.list_spans(db=None, _user={"email": settings.platform_admin_email, "db": None})
... return spans[0].span_id
>>> asyncio.run(run_list_spans())
's1'
"""
service = ObservabilityService()
spans = service.query_spans(
db=db,
trace_id=trace_id,
resource_type=resource_type,
resource_name=resource_name,
start_time=start_time,
end_time=end_time,
limit=limit,
offset=offset,
)
return spans
@router.delete("/traces/cleanup")
@require_permission("admin.system_config")
async def cleanup_old_traces(
days: int = Query(7, ge=1, description="Delete traces older than this many days"),
db: Session = Depends(get_db),
_user=Depends(get_current_user_with_permissions),
):
"""Delete traces older than a specified number of days.
Cleans up old trace data to manage storage. Cascading deletes will
also remove associated spans, events, and metrics.
Args:
days: Delete traces older than this many days
db: Database session
Returns:
dict: Number of deleted traces and cutoff time
Examples:
>>> import asyncio
>>> import mcpgateway.routers.observability as obs
>>> from mcpgateway.config import settings
>>> class FakeService:
... def delete_old_traces(self, db, cutoff):
... return 5
>>> obs.ObservabilityService = FakeService
>>> async def run_cleanup():
... res = await obs.cleanup_old_traces(days=7, db=None, _user={"email": settings.platform_admin_email, "db": None})
... return res["deleted"]
>>> asyncio.run(run_cleanup())
5
"""
service = ObservabilityService()
cutoff_time = datetime.now() - timedelta(days=days)
deleted = service.delete_old_traces(db, cutoff_time)
return {"deleted": deleted, "cutoff_time": cutoff_time}
@router.get("/stats")
@require_permission("admin.system_config")
async def get_stats(
hours: int = Query(24, ge=1, le=168, description="Time window in hours"),
db: Session = Depends(get_db),
_user=Depends(get_current_user_with_permissions),
):
"""Get observability statistics.
Returns summary statistics including:
- Total traces in time window
- Success/error counts
- Average response time
- Top slowest endpoints
Args:
hours: Time window in hours
db: Database session
Returns:
dict: Statistics including counts, error rate, and slowest endpoints
"""
# Third-Party
from sqlalchemy import func
# First-Party
from mcpgateway.db import ObservabilityTrace
ObservabilityService()
cutoff_time = datetime.now() - timedelta(hours=hours)
# Get basic counts
total_traces = db.query(func.count(ObservabilityTrace.trace_id)).filter(ObservabilityTrace.start_time >= cutoff_time).scalar()
success_count = db.query(func.count(ObservabilityTrace.trace_id)).filter(ObservabilityTrace.start_time >= cutoff_time, ObservabilityTrace.status == "ok").scalar()
error_count = db.query(func.count(ObservabilityTrace.trace_id)).filter(ObservabilityTrace.start_time >= cutoff_time, ObservabilityTrace.status == "error").scalar()
avg_duration = db.query(func.avg(ObservabilityTrace.duration_ms)).filter(ObservabilityTrace.start_time >= cutoff_time, ObservabilityTrace.duration_ms.isnot(None)).scalar() or 0
# Get slowest endpoints
slowest = (
db.query(ObservabilityTrace.name, func.avg(ObservabilityTrace.duration_ms).label("avg_duration"), func.count(ObservabilityTrace.trace_id).label("count"))
.filter(ObservabilityTrace.start_time >= cutoff_time, ObservabilityTrace.duration_ms.isnot(None))
.group_by(ObservabilityTrace.name)
.order_by(func.avg(ObservabilityTrace.duration_ms).desc())
.limit(10)
.all()
)
return {
"time_window_hours": hours,
"total_traces": total_traces,
"success_count": success_count,
"error_count": error_count,
"error_rate": (error_count / total_traces * 100) if total_traces > 0 else 0,
"avg_duration_ms": round(avg_duration, 2),
"slowest_endpoints": [{"name": row[0], "avg_duration_ms": round(row[1], 2), "count": row[2]} for row in slowest],
}
@router.post("/traces/export")
@require_permission("admin.system_config")
async def export_traces(
request_body: dict,
format: str = Query("json", description="Export format (json, csv, ndjson)"),
db: Session = Depends(get_db),
_user=Depends(get_current_user_with_permissions),
):
"""Export traces in various formats.
POST endpoint that accepts filter criteria (same as /traces/query) and exports
matching traces in the specified format.
Supported formats:
- json: Standard JSON array
- csv: Comma-separated values
- ndjson: Newline-delimited JSON (streaming)
Args:
request_body: JSON request body with filter criteria (same as /traces/query)
format: Export format (json, csv, ndjson)
db: Database session
Returns:
StreamingResponse or JSONResponse with exported data
Raises:
HTTPException: 400 error if format is invalid or export fails
Examples:
>>> import asyncio
>>> from datetime import datetime
>>> from fastapi import HTTPException
>>> import mcpgateway.routers.observability as obs
>>> from mcpgateway.config import settings
>>> async def run_invalid_export():
... try:
... await export_traces({}, format="xml", db=None, _user={"email": settings.platform_admin_email, "db": None})
... except HTTPException as e:
... return (e.status_code, "format must be one of" in str(e.detail))
>>> asyncio.run(run_invalid_export())
(400, True)
>>> class FakeTrace:
... def __init__(self):
... self.trace_id = 'tx'
... self.name = 'name'
... self.start_time = datetime(2025,1,1)
... self.end_time = None
... self.duration_ms = 100
... self.status = 'ok'
... self.http_method = 'GET'
... self.http_url = '/'
... self.http_status_code = 200
... self.user_email = 'u'
>>> class FakeService:
... def query_traces(self, **kwargs):
... return [FakeTrace()]
>>> obs.ObservabilityService = FakeService
>>> async def run_json_export():
... out = await obs.export_traces({}, format="json", db=None, _user={"email": settings.platform_admin_email, "db": None})
... return out[0]["trace_id"]
>>> asyncio.run(run_json_export())
'tx'
>>> async def run_csv_export():
... resp = await obs.export_traces({}, format="csv", db=None, _user={"email": settings.platform_admin_email, "db": None})
... return hasattr(resp, "media_type") and "csv" in resp.media_type
>>> asyncio.run(run_csv_export())
True
>>> async def run_ndjson_export():
... resp2 = await obs.export_traces({}, format="ndjson", db=None, _user={"email": settings.platform_admin_email, "db": None})
... return type(resp2).__name__
>>> asyncio.run(run_ndjson_export())
'StreamingResponse'
"""
# Standard
import csv
import io
# Third-Party
from starlette.responses import Response, StreamingResponse
# Validate format
if format not in ["json", "csv", "ndjson"]:
raise HTTPException(status_code=400, detail="format must be one of: json, csv, ndjson")
try:
service = ObservabilityService()
# Parse datetime strings
start_time = request_body.get("start_time")
if start_time and isinstance(start_time, str):
start_time = datetime.fromisoformat(start_time.replace("Z", "+00:00"))
end_time = request_body.get("end_time")
if end_time and isinstance(end_time, str):
end_time = datetime.fromisoformat(end_time.replace("Z", "+00:00"))
# Query traces
traces = service.query_traces(
db=db,
start_time=start_time,
end_time=end_time,
min_duration_ms=request_body.get("min_duration_ms"),
max_duration_ms=request_body.get("max_duration_ms"),
status=request_body.get("status"),
status_in=request_body.get("status_in"),
http_status_code=request_body.get("http_status_code"),
http_method=request_body.get("http_method"),
user_email=request_body.get("user_email"),
order_by=request_body.get("order_by", "start_time_desc"),
limit=request_body.get("limit", 1000), # Higher limit for export
offset=request_body.get("offset", 0),
)
if format == "json":
# Standard JSON response
return [
{
"trace_id": t.trace_id,
"name": t.name,
"start_time": t.start_time.isoformat() if t.start_time else None,
"end_time": t.end_time.isoformat() if t.end_time else None,
"duration_ms": t.duration_ms,
"status": t.status,
"http_method": t.http_method,
"http_url": t.http_url,
"http_status_code": t.http_status_code,
"user_email": t.user_email,
}
for t in traces
]
elif format == "csv":
# CSV export
output = io.StringIO()
writer = csv.writer(output)
# Write header
writer.writerow(["trace_id", "name", "start_time", "duration_ms", "status", "http_method", "http_status_code", "user_email"])
# Write data
for t in traces:
writer.writerow(
[t.trace_id, t.name, t.start_time.isoformat() if t.start_time else "", t.duration_ms or "", t.status, t.http_method or "", t.http_status_code or "", t.user_email or ""]
)
output.seek(0)
return Response(content=output.getvalue(), media_type="text/csv", headers={"Content-Disposition": "attachment; filename=traces.csv"})
elif format == "ndjson":
# Newline-delimited JSON (streaming)
def generate():
"""Yield newline-delimited JSON strings for each trace.
This nested generator is used to stream NDJSON responses.
Yields:
str: A JSON-encoded line (with trailing newline) for a trace.
"""
for t in traces:
yield orjson.dumps(
{
"trace_id": t.trace_id,
"name": t.name,
"start_time": t.start_time.isoformat() if t.start_time else None,
"duration_ms": t.duration_ms,
"status": t.status,
"http_method": t.http_method,
"http_status_code": t.http_status_code,
"user_email": t.user_email,
}
).decode() + "\n"
return StreamingResponse(generate(), media_type="application/x-ndjson", headers={"Content-Disposition": "attachment; filename=traces.ndjson"})
except (ValueError, Exception) as e:
raise HTTPException(status_code=400, detail=f"Export failed: {e}")
@router.get("/analytics/query-performance")
@require_permission("admin.system_config")
async def get_query_performance(
hours: int = Query(24, ge=1, le=168, description="Time window in hours"),
db: Session = Depends(get_db),
_user=Depends(get_current_user_with_permissions),
):
"""Get query performance analytics.
Returns performance metrics about trace queries including:
- Average, min, max, p50, p95, p99 durations
- Query volume over time
- Error rate trends
Args:
hours: Time window in hours
db: Database session
Returns:
dict: Performance analytics
Examples:
>>> import asyncio
>>> import mcpgateway.routers.observability as obs
>>> from mcpgateway.config import settings
>>> class MockDialect:
... name = "sqlite"
>>> class MockBind:
... dialect = MockDialect()
>>> class EmptyDB:
... def get_bind(self):
... return MockBind()
... def query(self, *a, **k):
... return self
... def filter(self, *a, **k):
... return self
... def all(self):
... return []
>>> async def run_empty_stats():
... return (await obs.get_query_performance(hours=1, db=EmptyDB(), _user={"email": settings.platform_admin_email, "db": None}))["total_traces"]
>>> asyncio.run(run_empty_stats())
0
>>> class SmallDB:
... def get_bind(self):
... return MockBind()
... def query(self, *a, **k):
... return self
... def filter(self, *a, **k):
... return self
... def all(self):
... return [(10,), (20,), (30,), (40,)]
>>> async def run_small_stats():
... return await obs.get_query_performance(hours=1, db=SmallDB(), _user={"email": settings.platform_admin_email, "db": None})
>>> res = asyncio.run(run_small_stats())
>>> res["total_traces"]
4
"""
# First-Party
ObservabilityService()
cutoff_time = datetime.now() - timedelta(hours=hours)
# Use SQL aggregation for PostgreSQL, Python fallback for SQLite
dialect_name = db.get_bind().dialect.name
if dialect_name == "postgresql":
return _get_query_performance_postgresql(db, cutoff_time, hours)
return _get_query_performance_python(db, cutoff_time, hours)
def _get_query_performance_postgresql(db: Session, cutoff_time: datetime, hours: int) -> dict:
"""Compute query performance using PostgreSQL percentile_cont.
Args:
db: Database session
cutoff_time: Start time for analysis
hours: Time window in hours
Returns:
dict: Performance analytics computed via SQL
"""
stats_sql = text(
"""
SELECT
COUNT(*) as total_traces,
percentile_cont(0.50) WITHIN GROUP (ORDER BY duration_ms) as p50,
percentile_cont(0.75) WITHIN GROUP (ORDER BY duration_ms) as p75,
percentile_cont(0.90) WITHIN GROUP (ORDER BY duration_ms) as p90,
percentile_cont(0.95) WITHIN GROUP (ORDER BY duration_ms) as p95,
percentile_cont(0.99) WITHIN GROUP (ORDER BY duration_ms) as p99,
AVG(duration_ms) as avg_duration,
MIN(duration_ms) as min_duration,
MAX(duration_ms) as max_duration
FROM observability_traces
WHERE start_time >= :cutoff_time AND duration_ms IS NOT NULL
"""
)
result = db.execute(stats_sql, {"cutoff_time": cutoff_time}).fetchone()
if not result or result.total_traces == 0:
return {
"time_window_hours": hours,
"total_traces": 0,
"percentiles": {},
"avg_duration_ms": 0,
"min_duration_ms": 0,
"max_duration_ms": 0,
}
return {
"time_window_hours": hours,
"total_traces": result.total_traces,
"percentiles": {
"p50": round(float(result.p50), 2) if result.p50 else 0,
"p75": round(float(result.p75), 2) if result.p75 else 0,
"p90": round(float(result.p90), 2) if result.p90 else 0,
"p95": round(float(result.p95), 2) if result.p95 else 0,
"p99": round(float(result.p99), 2) if result.p99 else 0,
},
"avg_duration_ms": round(float(result.avg_duration), 2) if result.avg_duration else 0,
"min_duration_ms": round(float(result.min_duration), 2) if result.min_duration else 0,
"max_duration_ms": round(float(result.max_duration), 2) if result.max_duration else 0,
}
def _get_query_performance_python(db: Session, cutoff_time: datetime, hours: int) -> dict:
"""Compute query performance using Python (fallback for SQLite).
Args:
db: Database session
cutoff_time: Start time for analysis
hours: Time window in hours
Returns:
dict: Performance analytics computed in Python
"""
# First-Party
from mcpgateway.db import ObservabilityTrace
# Get duration percentiles
traces_with_duration = db.query(ObservabilityTrace.duration_ms).filter(ObservabilityTrace.start_time >= cutoff_time, ObservabilityTrace.duration_ms.isnot(None)).all()
durations = sorted([t[0] for t in traces_with_duration if t[0] is not None])
if not durations:
return {
"time_window_hours": hours,
"total_traces": 0,
"percentiles": {},
"avg_duration_ms": 0,
"min_duration_ms": 0,
"max_duration_ms": 0,
}
def percentile(data, p):
"""Compute percentile using linear interpolation matching PostgreSQL percentile_cont.
Args:
data: Sorted list of numeric values.
p: Percentile value between 0 and 1.
Returns:
Interpolated percentile value.
"""
n = len(data)
if n == 0:
return 0
k = (n - 1) * p
f = int(k)
c = k - f
if f + 1 < n:
return data[f] + (c * (data[f + 1] - data[f]))
return data[f]
return {
"time_window_hours": hours,
"total_traces": len(durations),
"percentiles": {
"p50": round(percentile(durations, 0.50), 2),
"p75": round(percentile(durations, 0.75), 2),
"p90": round(percentile(durations, 0.90), 2),
"p95": round(percentile(durations, 0.95), 2),
"p99": round(percentile(durations, 0.99), 2),
},
"avg_duration_ms": round(sum(durations) / len(durations), 2),
"min_duration_ms": round(durations[0], 2),
"max_duration_ms": round(durations[-1], 2),
}