"""
Gap Detection Module
Detects missing data, duplicates, and anomalies in subscription records
"""
from src.mysql_db import execute_query
def detect_missing_data():
"""Detect subscriptions with incomplete data"""
try:
query = """
SELECT * FROM subscriptions
WHERE service IS NULL OR service = ''
OR amount IS NULL OR amount = 0
OR renewal_date IS NULL
OR plan_name IS NULL OR plan_name = ''
ORDER BY created_at DESC
"""
results = execute_query(query, fetch_all=True)
gaps = []
for sub in results:
missing_fields = []
if not sub.get('service'):
missing_fields.append('service')
if not sub.get('amount') or sub.get('amount') == 0:
missing_fields.append('amount')
if not sub.get('renewal_date'):
missing_fields.append('renewal_date')
if not sub.get('plan_name'):
missing_fields.append('plan_name')
gaps.append({
'subscription_id': sub['id'],
'service': sub.get('service', 'Unknown'),
'missing_fields': missing_fields
})
return {
'success': True,
'gaps': gaps,
'count': len(gaps)
}
except Exception as error:
return {
'success': False,
'error': str(error)
}
def detect_duplicate_subscriptions():
"""Detect duplicate subscriptions"""
try:
query = """
SELECT service, COUNT(*) as count, GROUP_CONCAT(id) as ids
FROM subscriptions
WHERE status = 'active'
GROUP BY service
HAVING count > 1
"""
results = execute_query(query, fetch_all=True)
duplicates = []
for dup in results:
ids = [int(id) for id in dup['ids'].split(',')]
detail_query = """
SELECT id, service, amount, renewal_date, plan_name, created_at
FROM subscriptions
WHERE id IN ({})
ORDER BY created_at DESC
""".format(','.join(['%s'] * len(ids)))
details = execute_query(detail_query, tuple(ids), fetch_all=True)
duplicates.append({
'service': dup['service'],
'count': dup['count'],
'subscriptions': details
})
return {
'success': True,
'duplicates': duplicates,
'count': len(duplicates)
}
except Exception as error:
return {
'success': False,
'error': str(error)
}
def detect_price_anomalies(threshold_percent=20):
"""
Detect unusual price changes
Args:
threshold_percent: Percentage change to flag as anomaly
"""
try:
query = """
SELECT
s.id, s.service, s.amount, s.updated_at,
ph.amount as previous_amount,
ph.changed_at,
((s.amount - ph.amount) / ph.amount * 100) as change_percent
FROM subscriptions s
JOIN price_history ph ON s.id = ph.subscription_id
WHERE ABS((s.amount - ph.amount) / ph.amount * 100) > %s
ORDER BY ABS((s.amount - ph.amount) / ph.amount * 100) DESC
"""
results = execute_query(query, (threshold_percent,), fetch_all=True)
anomalies = []
for result in results:
anomalies.append({
'subscription_id': result['id'],
'service': result['service'],
'current_amount': result['amount'],
'previous_amount': result['previous_amount'],
'change_percent': result['change_percent'],
'changed_at': result['changed_at']
})
return {
'success': True,
'anomalies': anomalies,
'count': len(anomalies)
}
except Exception as error:
return {
'success': False,
'error': str(error)
}
def analyze_subscription_gaps():
"""Comprehensive gap analysis"""
try:
missing_data_result = detect_missing_data()
duplicate_result = detect_duplicate_subscriptions()
price_anomaly_result = detect_price_anomalies()
total_issues = 0
if missing_data_result.get('success'):
total_issues += missing_data_result.get('count', 0)
if duplicate_result.get('success'):
total_issues += duplicate_result.get('count', 0)
if price_anomaly_result.get('success'):
total_issues += price_anomaly_result.get('count', 0)
return {
'success': True,
'summary': {
'total_issues': total_issues,
'missing_data': missing_data_result.get('count', 0),
'duplicates': duplicate_result.get('count', 0),
'price_anomalies': price_anomaly_result.get('count', 0)
},
'details': {
'missing_data': missing_data_result.get('gaps', []),
'duplicates': duplicate_result.get('duplicates', []),
'price_anomalies': price_anomaly_result.get('anomalies', [])
}
}
except Exception as error:
return {
'success': False,
'error': str(error)
}