"""Analyze revenue trends and detect anomalies."""
from __future__ import annotations
import logging
from typing import Any
logger = logging.getLogger(__name__)
class AnalyzeTrendCalc:
"""Tool for analyzing revenue trends and detecting anomalies."""
def __init__(self, sql_driver) -> None:
self.sql_driver = sql_driver
async def analyze(
self,
current_cycle: str,
comparison_cycles: int = 3,
threshold_percent: float = 20.0,
service_type: str | None = None,
billing_plan: str | None = None,
) -> dict[str, Any]:
"""Analyze revenue trends by comparing current cycle with previous cycles.
Args:
current_cycle: Current billing cycle to analyze (format: DD-MM-YYYY)
comparison_cycles: Number of previous cycles to compare against
threshold_percent: Threshold percentage for anomaly detection
service_type: Optional filter by service type
billing_plan: Optional filter by billing plan
Returns:
Dictionary containing trend analysis and anomaly detection results
"""
try:
# Determine table based on billing_plan filter
table = "report_revenue_by_billing_plan" if billing_plan else "report_new_revenue"
# Build filter clauses for current cycle query
current_filters = ["billing_cycle = %s"]
params_current = [current_cycle]
if service_type:
current_filters.append("service_type = %s")
params_current.append(service_type)
if billing_plan:
current_filters.append("billing_plan_type = %s")
params_current.append(billing_plan)
current_where = " AND ".join(current_filters)
# Query current cycle revenue
current_sql = f"""
SELECT
email,
service_type,
SUM(total) as current_total
FROM {table}
WHERE {current_where}
GROUP BY email, service_type
"""
current_rows = await self.sql_driver.execute_query(current_sql, params_current)
if not current_rows:
return {
"summary": {
"total_anomalies": 0,
"significant_increases": 0,
"churn_risks": 0,
"threshold_percent": threshold_percent,
},
"increases": [],
"decreases": [],
"alert_required": False,
"message": f"No data found for cycle {current_cycle}",
}
# Build filter clauses for previous cycles query
prev_filters = []
prev_base_params = [current_cycle, comparison_cycles]
if service_type:
prev_filters.append("service_type = %s")
prev_base_params.append(service_type)
if billing_plan:
prev_filters.append("billing_plan_type = %s")
prev_base_params.append(billing_plan)
prev_where = " AND " + " AND ".join(prev_filters) if prev_filters else ""
# Query previous N cycles average revenue
previous_sql_simple = f"""
WITH prev_cycles AS (
SELECT DISTINCT billing_cycle
FROM {table}
WHERE billing_cycle < %s
ORDER BY billing_cycle DESC
LIMIT %s
)
SELECT
email,
service_type,
AVG(total) as avg_total
FROM {table}
WHERE billing_cycle IN (SELECT billing_cycle FROM prev_cycles)
{prev_where}
GROUP BY email, service_type
"""
previous_rows = await self.sql_driver.execute_query(
previous_sql_simple, prev_base_params
)
# Build lookup for previous revenue
prev_lookup: dict[tuple[str, str], float] = {}
if previous_rows:
for row in previous_rows:
key = (row.cells.get("email", ""), row.cells.get("service_type", ""))
prev_lookup[key] = float(row.cells.get("avg_total", 0) or 0)
# Calculate trends and detect anomalies
increases = []
decreases = []
for row in current_rows:
email = row.cells.get("email", "")
service = row.cells.get("service_type", "")
current_total = float(row.cells.get("current_total", 0) or 0)
key = (email, service)
avg_previous = prev_lookup.get(key, 0)
if avg_previous == 0:
# New customer or no previous data
continue
change_percent = ((current_total - avg_previous) / avg_previous) * 100
if abs(change_percent) >= threshold_percent:
record = {
"email": email,
"service_type": service,
"current_revenue": current_total,
"avg_previous_revenue": avg_previous,
"change_percent": round(change_percent, 2),
"trend": "increase" if change_percent > 0 else "decrease",
}
if change_percent > 0:
increases.append(record)
else:
decreases.append(record)
# Sort by absolute change
increases.sort(key=lambda x: x["change_percent"], reverse=True)
decreases.sort(key=lambda x: x["change_percent"])
return {
"summary": {
"total_anomalies": len(increases) + len(decreases),
"significant_increases": len(increases),
"churn_risks": len(decreases),
"threshold_percent": threshold_percent,
"current_cycle": current_cycle,
"comparison_cycles": comparison_cycles,
},
"increases": increases[:10], # Top 10
"decreases": decreases[:10], # Top 10
"alert_required": len(decreases) > 0,
"filters_applied": {
"service_type": service_type,
"billing_plan": billing_plan,
},
}
except Exception as e:
logger.error(f"Error analyzing trend: {e}", exc_info=True)
return {
"error": str(e),
"summary": {
"total_anomalies": 0,
"significant_increases": 0,
"churn_risks": 0,
},
"increases": [],
"decreases": [],
"alert_required": False,
}