"""Query revenue data with flexible filters."""
from __future__ import annotations
import logging
from typing import Any
logger = logging.getLogger(__name__)
class QueryRevenueCalc:
"""Tool for querying revenue data with flexible filters."""
# Available service types
SERVICE_TYPES = [
"cloud_server", "dbaas", "vod", "ddos", "call_center",
"traffic_manager", "auto_scaling", "cloud_watcher",
"mail_inbox", "simple_storage", "container_registry",
"kas", "kubernetes_engine", "mps", "load_balancer",
"cdn", "lms", "vpn", "cloud_storage"
]
# Billing plan types
BILLING_PLANS = ["on_demand", "subscription"]
# Aggregation options
AGGREGATION_OPTIONS = ["account", "service", "billing_plan"]
def __init__(self, sql_driver) -> None:
self.sql_driver = sql_driver
async def query(
self,
billing_cycle: str | None = None,
service_type: str | None = None,
billing_plan: str | None = None,
email: str | None = None,
customer_type: str | None = None,
aggregation: str = "account",
limit: int = 100,
) -> dict[str, Any]:
"""Query revenue data with flexible filters.
Args:
billing_cycle: Filter by billing cycle (format: DD-MM-YYYY)
service_type: Filter by service type (e.g., cloud_server, dbaas)
billing_plan: Filter by billing plan (on_demand or subscription)
email: Filter by customer email
customer_type: Filter by customer type
aggregation: Group by option - account, service, or billing_plan
limit: Maximum number of results to return
Returns:
Dictionary containing query results and summary statistics
"""
try:
# Determine which table to use based on billing_plan filter
if billing_plan:
table = "report_revenue_by_billing_plan"
else:
table = "report_new_revenue"
# Build dynamic WHERE clauses and parameters
where_clauses = []
params = []
if billing_cycle:
where_clauses.append("billing_cycle = %s")
params.append(billing_cycle)
if service_type:
where_clauses.append("service_type = %s")
params.append(service_type)
if billing_plan:
where_clauses.append("billing_plan_type = %s")
params.append(billing_plan)
if email:
where_clauses.append("email = %s")
params.append(email)
if customer_type:
where_clauses.append("customer_type = %s")
params.append(customer_type)
# Build WHERE clause
where_sql = " AND ".join(where_clauses) if where_clauses else "1=1"
# Build GROUP BY based on aggregation
if aggregation == "service":
group_by = "service_type"
select_fields = "service_type"
elif aggregation == "billing_plan" and billing_plan:
group_by = "billing_plan_type"
select_fields = "billing_plan_type"
else: # default: account
group_by = "email, billing_cycle"
select_fields = "email, billing_cycle"
# Build full SQL query
sql = f"""
SELECT
{select_fields},
SUM(total) as total_revenue,
SUM(total_paid) as paid_revenue,
SUM(total_open) as open_revenue,
COUNT(*) as record_count
FROM {table}
WHERE {where_sql}
GROUP BY {group_by}
ORDER BY total_revenue DESC
LIMIT {limit}
"""
# Execute query
rows = await self.sql_driver.execute_query(sql, params)
if not rows:
return {
"total_records": 0,
"data": [],
"summary": {
"total_revenue": 0,
"total_paid": 0,
"total_open": 0,
},
"filters_applied": {
"billing_cycle": billing_cycle,
"service_type": service_type,
"billing_plan": billing_plan,
"email": email,
"customer_type": customer_type,
},
}
# Parse results
data = [row.cells for row in rows]
# Calculate summary
total_revenue = sum(
float(row.cells.get("total_revenue", 0) or 0) for row in rows
)
total_paid = sum(
float(row.cells.get("paid_revenue", 0) or 0) for row in rows
)
total_open = sum(
float(row.cells.get("open_revenue", 0) or 0) for row in rows
)
return {
"total_records": len(data),
"data": data,
"summary": {
"total_revenue": total_revenue,
"total_paid": total_paid,
"total_open": total_open,
},
"filters_applied": {
"billing_cycle": billing_cycle,
"service_type": service_type,
"billing_plan": billing_plan,
"email": email,
"customer_type": customer_type,
},
"aggregation": aggregation,
}
except Exception as e:
logger.error(f"Error querying revenue: {e}", exc_info=True)
return {
"error": str(e),
"total_records": 0,
"data": [],
"summary": {},
}