#!/usr/bin/env python3
"""
Subscription Tracker MCP Server
Professional MCP server for intelligent subscription management
"""
import sys
import json
import asyncio
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from mcp.server import Server
from mcp.types import Tool, TextContent
import src.gmail_tools as gmail
import src.mysql_tools as mysql
import src.extraction as extraction
import src.gap_detection as gap_detection
import src.alert_system as alerts
def load_config():
try:
config_path = Path(__file__).parent.parent / 'config.json'
with open(config_path, 'r') as f:
return json.load(f)
except:
return {'google': {}, 'mysql': {}}
config = load_config()
server = Server("subscription-tracker-mcp-server")
@server.list_tools()
async def list_tools():
return [
Tool(
name="search_subscription_emails",
description="Search Gmail for subscription-related emails",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Gmail search query",
"default": "subscription OR renewal OR billing"
},
"maxResults": {
"type": "integer",
"description": "Maximum number of emails to return",
"default": 50
}
}
}
),
Tool(
name="get_email_detail",
description="Get detailed information about a specific email",
inputSchema={
"type": "object",
"properties": {
"messageId": {
"type": "string",
"description": "Gmail message ID"
}
},
"required": ["messageId"]
}
),
Tool(
name="send_renewal_alert",
description="Send proactive renewal alert via Gmail",
inputSchema={
"type": "object",
"properties": {
"to": {"type": "string", "description": "Recipient email address"},
"serviceName": {"type": "string", "description": "Service name"},
"amount": {"type": "number", "description": "Renewal amount"},
"renewalDate": {"type": "string", "description": "Renewal date (YYYY-MM-DD)"}
},
"required": ["to", "serviceName", "amount", "renewalDate"]
}
),
Tool(
name="extract_subscription_data",
description="Extract structured subscription data from email content",
inputSchema={
"type": "object",
"properties": {
"emailData": {
"type": "object",
"description": "Email data (subject, body, from, date, id)",
"properties": {
"subject": {"type": "string"},
"body": {"type": "string"},
"from": {"type": "string"},
"date": {"type": "string"},
"id": {"type": "string"}
}
}
},
"required": ["emailData"]
}
),
Tool(
name="create_subscription",
description="Create a new subscription record in MySQL",
inputSchema={
"type": "object",
"properties": {
"data": {
"type": "object",
"description": "Subscription data",
"properties": {
"service": {"type": "string"},
"amount": {"type": "number"},
"currency": {"type": "string", "default": "USD"},
"renewalDate": {"type": "string"},
"planName": {"type": "string"},
"cycle": {"type": "string"},
"status": {"type": "string", "default": "active"},
"emailId": {"type": "string"},
"emailDate": {"type": "string"},
"category": {"type": "string"},
"notes": {"type": "string"}
},
"required": ["service", "amount", "renewalDate"]
}
},
"required": ["data"]
}
),
Tool(
name="get_subscription",
description="Get subscription by ID",
inputSchema={
"type": "object",
"properties": {
"subscriptionId": {"type": "integer", "description": "Subscription ID"}
},
"required": ["subscriptionId"]
}
),
Tool(
name="update_subscription",
description="Update existing subscription",
inputSchema={
"type": "object",
"properties": {
"subscriptionId": {"type": "integer"},
"data": {
"type": "object",
"description": "Fields to update"
}
},
"required": ["subscriptionId", "data"]
}
),
Tool(
name="delete_subscription",
description="Delete subscription",
inputSchema={
"type": "object",
"properties": {
"subscriptionId": {"type": "integer"}
},
"required": ["subscriptionId"]
}
),
Tool(
name="list_all_subscriptions",
description="List all subscriptions with optional filtering",
inputSchema={
"type": "object",
"properties": {
"status": {
"type": "string",
"description": "Filter by status",
"default": "active"
},
"limit": {
"type": "integer",
"description": "Maximum number of results",
"default": 100
}
}
}
),
Tool(
name="get_upcoming_renewals",
description="Get subscriptions renewing within specified days",
inputSchema={
"type": "object",
"properties": {
"days": {
"type": "integer",
"description": "Number of days to look ahead",
"default": 7
}
}
}
),
Tool(
name="detect_missing_data",
description="Detect subscriptions with incomplete data",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="detect_duplicate_subscriptions",
description="Detect duplicate subscription entries",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="detect_price_anomalies",
description="Detect unusual price changes in subscriptions",
inputSchema={
"type": "object",
"properties": {
"thresholdPercent": {
"type": "number",
"description": "Percentage change to flag as anomaly",
"default": 20
}
}
}
),
Tool(
name="analyze_subscription_gaps",
description="Comprehensive gap analysis for all subscriptions",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="check_renewals_3_days_ahead",
description="Check for renewals 3 days ahead and send alerts",
inputSchema={
"type": "object",
"properties": {
"userEmail": {
"type": "string",
"description": "Email address to send alerts to"
}
},
"required": ["userEmail"]
}
),
Tool(
name="get_alert_history",
description="Get history of sent renewal alerts",
inputSchema={
"type": "object",
"properties": {
"subscriptionId": {
"type": "integer",
"description": "Optional subscription ID to filter"
},
"limit": {
"type": "integer",
"description": "Maximum number of alerts",
"default": 50
}
}
}
)
]
@server.call_tool()
async def call_tool(name: str, arguments: dict):
try:
result = None
if name == "search_subscription_emails":
result = gmail.search_subscription_emails(
arguments.get("query", "subscription OR renewal OR billing"),
arguments.get("maxResults", 50)
)
elif name == "get_email_detail":
result = gmail.get_email_detail(arguments["messageId"])
elif name == "send_renewal_alert":
result = gmail.send_renewal_alert(
arguments["to"],
arguments["serviceName"],
arguments["amount"],
arguments["renewalDate"]
)
elif name == "extract_subscription_data":
result = extraction.extract_subscription_data(arguments["emailData"])
elif name == "create_subscription":
data = arguments["data"]
formatted_data = {
'service': data.get('service'),
'amount': data.get('amount'),
'currency': data.get('currency', 'USD'),
'renewal_date': data.get('renewalDate'),
'plan_name': data.get('planName', ''),
'cycle': data.get('cycle', 'monthly'),
'status': data.get('status', 'active'),
'email_id': data.get('emailId', ''),
'email_date': data.get('emailDate'),
'category': data.get('category', ''),
'notes': data.get('notes', '')
}
result = mysql.create_subscription(formatted_data)
elif name == "get_subscription":
result = mysql.get_subscription(arguments["subscriptionId"])
elif name == "update_subscription":
result = mysql.update_subscription(
arguments["subscriptionId"],
arguments["data"]
)
elif name == "delete_subscription":
result = mysql.delete_subscription(arguments["subscriptionId"])
elif name == "list_all_subscriptions":
result = mysql.list_all_subscriptions(
arguments.get("status", "active"),
arguments.get("limit", 100)
)
elif name == "get_upcoming_renewals":
result = mysql.get_upcoming_renewals(arguments.get("days", 7))
elif name == "detect_missing_data":
result = gap_detection.detect_missing_data()
elif name == "detect_duplicate_subscriptions":
result = gap_detection.detect_duplicate_subscriptions()
elif name == "detect_price_anomalies":
result = gap_detection.detect_price_anomalies(
arguments.get("thresholdPercent", 20)
)
elif name == "analyze_subscription_gaps":
result = gap_detection.analyze_subscription_gaps()
elif name == "check_renewals_3_days_ahead":
result = alerts.check_renewals_3_days_ahead(arguments["userEmail"])
elif name == "get_alert_history":
result = alerts.get_alert_history(
arguments.get("subscriptionId"),
arguments.get("limit", 50)
)
else:
result = {"success": False, "error": f"Unknown tool: {name}"}
return [TextContent(type="text", text=json.dumps(result, indent=2, default=str))]
except Exception as error:
return [TextContent(type="text", text=json.dumps({"success": False, "error": str(error)}, indent=2))]
async def main():
from mcp.server.stdio import stdio_server
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())