"""
Usage tracking service.
Tracks token usage and costs for each LLM request.
"""
from sqlalchemy.orm import Session
from sqlalchemy import func, and_
from datetime import datetime, timedelta
from typing import Optional
import uuid
import logging
from app.db.models import UsageEvent, UsageDaily, UsageMonthly, Tenant
from app.billing.pricing import calculate_cost
from app.billing.quota import ensure_tenant_exists
logger = logging.getLogger(__name__)
def track_usage(
db: Session,
tenant_id: str,
user_id: str,
kb_id: str,
provider: str,
model: str,
prompt_tokens: int,
completion_tokens: int,
request_timestamp: Optional[datetime] = None
) -> UsageEvent:
"""
Track a single usage event.
Args:
db: Database session
tenant_id: Tenant ID
user_id: User ID
kb_id: Knowledge base ID
provider: "gemini" or "openai"
model: Model name
prompt_tokens: Input tokens
completion_tokens: Output tokens
request_timestamp: Request timestamp (defaults to now)
Returns:
Created UsageEvent
"""
# Ensure tenant exists
ensure_tenant_exists(db, tenant_id)
# Calculate cost
total_tokens = prompt_tokens + completion_tokens
estimated_cost = calculate_cost(provider, model, prompt_tokens, completion_tokens)
# Create usage event
request_id = f"req_{uuid.uuid4().hex[:16]}"
timestamp = request_timestamp or datetime.utcnow()
usage_event = UsageEvent(
request_id=request_id,
tenant_id=tenant_id,
user_id=user_id,
kb_id=kb_id,
provider=provider,
model=model,
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=total_tokens,
estimated_cost_usd=estimated_cost,
request_timestamp=timestamp
)
db.add(usage_event)
# Update daily aggregation
_update_daily_usage(db, tenant_id, timestamp, provider, total_tokens, estimated_cost)
# Update monthly aggregation
_update_monthly_usage(db, tenant_id, timestamp, provider, total_tokens, estimated_cost)
db.commit()
db.refresh(usage_event)
logger.info(
f"Tracked usage: tenant={tenant_id}, provider={provider}, "
f"tokens={total_tokens}, cost=${estimated_cost:.6f}"
)
return usage_event
def _update_daily_usage(
db: Session,
tenant_id: str,
timestamp: datetime,
provider: str,
tokens: int,
cost: float
):
"""Update daily usage aggregation."""
date = timestamp.date()
date_start = datetime.combine(date, datetime.min.time())
daily = db.query(UsageDaily).filter(
and_(
UsageDaily.tenant_id == tenant_id,
UsageDaily.date == date_start
)
).first()
if daily:
daily.total_requests += 1
daily.total_tokens += tokens
daily.total_cost_usd += cost
if provider == "gemini":
daily.gemini_requests += 1
elif provider == "openai":
daily.openai_requests += 1
daily.updated_at = datetime.utcnow()
else:
daily = UsageDaily(
tenant_id=tenant_id,
date=date_start,
total_requests=1,
total_tokens=tokens,
total_cost_usd=cost,
gemini_requests=1 if provider == "gemini" else 0,
openai_requests=1 if provider == "openai" else 0
)
db.add(daily)
def _update_monthly_usage(
db: Session,
tenant_id: str,
timestamp: datetime,
provider: str,
tokens: int,
cost: float
):
"""Update monthly usage aggregation."""
year = timestamp.year
month = timestamp.month
monthly = db.query(UsageMonthly).filter(
and_(
UsageMonthly.tenant_id == tenant_id,
UsageMonthly.year == year,
UsageMonthly.month == month
)
).first()
if monthly:
monthly.total_requests += 1
monthly.total_tokens += tokens
monthly.total_cost_usd += cost
if provider == "gemini":
monthly.gemini_requests += 1
elif provider == "openai":
monthly.openai_requests += 1
monthly.updated_at = datetime.utcnow()
else:
monthly = UsageMonthly(
tenant_id=tenant_id,
year=year,
month=month,
total_requests=1,
total_tokens=tokens,
total_cost_usd=cost,
gemini_requests=1 if provider == "gemini" else 0,
openai_requests=1 if provider == "openai" else 0
)
db.add(monthly)