mcp_reminder_server.py•23.1 kB
import logging
import os
import json
import datetime
import sys
from dataclasses import dataclass
from typing import List, Dict, Any, Union
from dotenv import load_dotenv
from reminder_app import (Reminder, GoogleSheetsReader, ReminderSender,
check_and_send_reminders, generate_reminder_id, load_sent_reminders)
@dataclass
class MCPTool:
name: str
description: str
version: str
input_schema: dict
@dataclass
class MCPResource:
uri: str
name: str
description: str
mimeType: str
class ReminderMCPServer:
"""MCP Server wrapper for reminder_app.py"""
def __init__(self):
load_dotenv()
# Initialize email sender
self.reminder_sender = ReminderSender(
smtp_server=os.getenv("EMAIL_HOST"),
smtp_port=int(os.getenv("EMAIL_PORT", 587)),
smtp_username=os.getenv("EMAIL_USERNAME"),
smtp_password=os.getenv("EMAIL_PASSWORD")
)
# Initialize Google Sheets reader
self.google_sheets_reader = GoogleSheetsReader(
credentials_file=os.getenv(
"GOOGLE_CREDENTIALS_FILE", "credentials.json"),
spreadsheet_name=os.getenv(
"GOOGLE_SPREADSHEET_NAME", "Customer_reminder")
)
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def get_tools(self) -> List[MCPTool]:
return [
MCPTool(
name="send_reminder",
description="Sends a reminder email to a customer",
version="1.0",
input_schema={
"type": "object",
"properties": {
"customer_name": {
"type": "string",
"description": "The name of the customer"
},
"customer_email": {
"type": "string",
"description": "The email of the customer"
},
"message_template": {
"type": "string",
"description": "The template for the reminder message"
},
"due_date": {
"type": "string",
"description": "The due date in YYYY-MM-DD format"
},
"reminder_type": {
"type": "string",
"description": "The type of reminder",
"enum": ["email"],
"default": "email"
},
},
"required": ["customer_name", "customer_email", "message_template", "due_date"],
},
),
MCPTool(
name="check_due_reminders",
description="Checks for and sends all due reminders from Google Sheets",
version="1.0",
input_schema={
"type": "object",
"properties": {
"dry_run": {
"type": "boolean",
"description": "If true, only shows what would be sent without actually sending",
"default": False
}
},
}
),
MCPTool(
name="get_customer_reminders",
description="Retrieves reminders for customers from Google Sheets",
version="1.0",
input_schema={
"type": "object",
"properties": {
"include_sent": {
"type": "boolean",
"description": "Whether to include already sent reminders",
"default": False
},
"days_ahead": {
"type": "integer",
"description": "Number of days ahead to check for reminders",
"default": 0
}
},
}
),
MCPTool(
name="get_sent_reminders",
description="Retrieves the history of sent reminders",
version="1.0",
input_schema={
"type": "object",
"properties": {},
}
),
MCPTool(
name="get_sheet_status",
description="Retrieves the status of the Google Sheet",
version="1.0",
input_schema={
"type": "object",
"properties": {},
}
)
]
def get_resources(self) -> List[MCPResource]:
"""Get available resources"""
return [
MCPResource(
uri="reminders://customer-reminders",
name="Customer Reminders",
description="Customer reminder data from Google Sheets",
mimeType="application/json"
),
MCPResource(
uri="reminders://sent-reminders",
name="Sent Reminders",
description="History of sent reminders",
mimeType="application/json"
)
]
def call_tool(self, name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a tool call"""
try:
if name == "send_reminder":
return self._send_reminder(arguments)
elif name == "check_due_reminders":
return self._check_due_reminders(arguments)
elif name == "get_customer_reminders":
return self._get_customer_reminders(arguments)
elif name == "get_sent_reminders":
return self._get_sent_reminders(arguments)
elif name == "get_sheet_status":
return self._get_sheet_status(arguments)
else:
return {"error": f"Unknown tool: {name}"}
except Exception as e:
self.logger.error(f"Error executing tool {name}: {e}")
return {"error": str(e)}
def _send_reminder(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Send a single reminder"""
try:
# Parse due date
due_date_str = arguments.get("due_date")
due_date = datetime.datetime.strptime(due_date_str, "%Y-%m-%d")
# Create reminder object
reminder = Reminder(
customer_name=arguments.get("customer_name"),
customer_email=arguments.get("customer_email"),
message_template=arguments.get("message_template"),
due_date=due_date,
reminder_type=arguments.get("reminder_type", "email")
)
# Check if already sent
reminder_id = generate_reminder_id(
reminder.customer_email, reminder.due_date)
sent_reminders = load_sent_reminders()
if reminder_id in sent_reminders:
return {
"success": False,
"message": f"Reminder already sent to {reminder.customer_email}",
"reminder_id": reminder_id
}
# Send reminder
self.reminder_sender.send_reminder(reminder)
return {
"success": True,
"message": f"Reminder sent to {reminder.customer_email}",
"reminder_id": reminder_id,
"customer_name": reminder.customer_name,
"due_date": due_date_str
}
except Exception as e:
return {"success": False, "error": str(e)}
def _check_due_reminders(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Check and send all due reminders from Google Sheets"""
try:
dry_run = arguments.get("dry_run", False)
# Get reminders from Google Sheets
reminders_data = self.google_sheets_reader.get_reminders_data()
if not reminders_data:
return {
"success": True,
"message": "No reminders due at this time",
"reminders_processed": 0,
"reminders_sent": 0
}
sent_count = 0
skipped_count = 0
results = []
# Load sent reminders to avoid duplicates
sent_reminders = load_sent_reminders()
for data in reminders_data:
reminder = Reminder(
customer_name=data["customer_name"],
customer_email=data["customer_email"],
message_template=data["message_template"],
due_date=data["due_date"],
reminder_type=data["reminder_type"]
)
reminder_id = generate_reminder_id(
reminder.customer_email, reminder.due_date)
if reminder_id in sent_reminders:
skipped_count += 1
results.append({
"customer_name": reminder.customer_name,
"customer_email": reminder.customer_email,
"status": "skipped",
"reason": "already_sent"
})
continue
if not dry_run:
self.reminder_sender.send_reminder(reminder)
sent_count += 1
results.append({
"customer_name": reminder.customer_name,
"customer_email": reminder.customer_email,
"status": "sent",
"reminder_id": reminder_id
})
else:
results.append({
"customer_name": reminder.customer_name,
"customer_email": reminder.customer_email,
"status": "would_send",
"reminder_id": reminder_id
})
return {
"success": True,
"message": f"Processed {len(reminders_data)} reminders",
"reminders_processed": len(reminders_data),
"reminders_sent": sent_count,
"reminders_skipped": skipped_count,
"dry_run": dry_run,
"results": results
}
except Exception as e:
return {"success": False, "error": str(e)}
def _get_sheet_status(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
from reminder_app import GoogleSheetsReader
reader = self.google_sheets_reader
connected = reader.sheet is not None
count = 0
if connected:
count = len(reader.sheet.get_all_records())
return {
"success": True,
"connected": connected,
"reminder_count": count
}
def _get_customer_reminders(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Get customer reminders from Google Sheets"""
try:
include_sent = arguments.get("include_sent", False)
days_ahead = arguments.get("days_ahead", 0)
# Get all reminders from Google Sheets
all_reminders = self.google_sheets_reader.get_reminders_data()
if days_ahead > 0:
# Filter for future reminders within days_ahead
today = datetime.datetime.now().date()
future_date = today + datetime.timedelta(days=days_ahead)
filtered_reminders = []
for reminder_data in all_reminders:
due_date = reminder_data["due_date"].date()
if today <= due_date <= future_date:
filtered_reminders.append(reminder_data)
all_reminders = filtered_reminders
# Load sent reminders if needed
sent_reminders = load_sent_reminders() if include_sent else set()
results = []
for reminder_data in all_reminders:
reminder_id = generate_reminder_id(
reminder_data["customer_email"],
reminder_data["due_date"]
)
is_sent = reminder_id in sent_reminders
if include_sent or not is_sent:
results.append({
"customer_name": reminder_data["customer_name"],
"customer_email": reminder_data["customer_email"],
"message_template": reminder_data["message_template"],
"due_date": reminder_data["due_date"].strftime("%Y-%m-%d"),
"reminder_type": reminder_data["reminder_type"],
"reminder_id": reminder_id,
"is_sent": is_sent
})
return {
"success": True,
"reminders": results,
"total_count": len(results)
}
except Exception as e:
return {"success": False, "error": str(e)}
def _get_sent_reminders(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Get history of sent reminders"""
try:
sent_reminders = load_sent_reminders()
return {
"success": True,
"sent_reminders": list(sent_reminders),
"total_count": len(sent_reminders)
}
except Exception as e:
return {"success": False, "error": str(e)}
class MCPTransport:
"""MCP JSON-RPC transport layer over stdio"""
def __init__(self, server: ReminderMCPServer):
self.server = server
self.request_id = 0
def send_response(self, request_id: Union[str, int, None], result: Any):
"""Send a JSON-RPC response"""
# Don't send response for notifications (id is None)
if request_id is None:
return
response = {
"jsonrpc": "2.0",
"id": request_id,
"result": result
}
print(json.dumps(response), flush=True)
def send_error(self, request_id: Union[str, int, None], code: int, message: str, data: Any = None):
"""Send a JSON-RPC error response"""
error = {
"code": code,
"message": message
}
if data is not None:
error["data"] = data
response = {
"jsonrpc": "2.0",
"id": request_id,
"error": error
}
print(json.dumps(response), flush=True)
def handle_initialize(self, request_id: Union[str, int], params: Dict):
"""Handle MCP initialize request"""
result = {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {},
"resources": {}
},
"serverInfo": {
"name": "reminder-mcp",
"version": "1.0.0"
}
}
self.send_response(request_id, result)
def handle_tools_list(self, request_id: Union[str, int], params: Dict):
"""Handle tools/list request"""
tools = []
for tool in self.server.get_tools():
tools.append({
"name": tool.name,
"description": tool.description,
"inputSchema": tool.input_schema
})
result = {"tools": tools}
self.send_response(request_id, result)
def handle_tools_call(self, request_id: Union[str, int], params: Dict):
"""Handle tools/call request"""
tool_name = params.get("name")
arguments = params.get("arguments", {})
if not tool_name:
self.send_error(request_id, -32602, "Missing tool name")
return
result = self.server.call_tool(tool_name, arguments)
# Format response for MCP - tools/call expects content array
mcp_result = {
"content": [
{
"type": "text",
"text": json.dumps(result, indent=2, default=str)
}
]
}
self.send_response(request_id, mcp_result)
def handle_resources_list(self, request_id: Union[str, int], params: Dict):
"""Handle resources/list request"""
resources = []
for resource in self.server.get_resources():
resources.append({
"uri": resource.uri,
"name": resource.name,
"description": resource.description,
"mimeType": resource.mimeType
})
result = {"resources": resources}
self.send_response(request_id, result)
def handle_resources_read(self, request_id: Union[str, int], params: Dict):
"""Handle resources/read request"""
uri = params.get("uri")
if not uri:
self.send_error(request_id, -32602, "Missing resource URI")
return
try:
if uri == "reminders://customer-reminders":
# Get customer reminders
result = self.server.call_tool(
"get_customer_reminders", {"include_sent": True})
content = json.dumps(result, indent=2, default=str)
elif uri == "reminders://sent-reminders":
# Get sent reminders
result = self.server.call_tool("get_sent_reminders", {})
content = json.dumps(result, indent=2, default=str)
else:
self.send_error(request_id, -32602, f"Unknown resource: {uri}")
return
mcp_result = {
"contents": [
{
"uri": uri,
"mimeType": "application/json",
"text": content
}
]
}
self.send_response(request_id, mcp_result)
except Exception as e:
self.send_error(request_id, -32603,
f"Failed to read resource: {str(e)}")
def handle_request(self, request: Dict):
"""Handle incoming JSON-RPC request"""
# Validate basic JSON-RPC structure
if not isinstance(request, dict):
self.send_error(None, -32600, "Invalid Request - not an object")
return
jsonrpc = request.get("jsonrpc")
if jsonrpc != "2.0":
self.send_error(
None, -32600, "Invalid Request - missing or invalid jsonrpc version")
return
method = request.get("method")
if not method or not isinstance(method, str):
self.send_error(request.get("id"), -32600,
"Invalid Request - missing or invalid method")
return
request_id = request.get("id")
params = request.get("params", {})
try:
if method == "initialize":
self.handle_initialize(request_id, params)
elif method == "tools/list":
self.handle_tools_list(request_id, params)
elif method == "tools/call":
self.handle_tools_call(request_id, params)
elif method == "resources/list":
self.handle_resources_list(request_id, params)
elif method == "resources/read":
self.handle_resources_read(request_id, params)
elif method == "notifications/initialized":
# This is a notification, no response needed
pass
else:
self.send_error(request_id, -32601,
f"Method not found: {method}")
except Exception as e:
self.send_error(request_id, -32603, f"Internal error: {str(e)}")
def run(self):
"""Run the MCP server transport"""
try:
for line in sys.stdin:
line = line.strip()
if not line:
continue
try:
request = json.loads(line)
self.handle_request(request)
except json.JSONDecodeError as e:
# Send parse error for malformed JSON
error_response = {
"jsonrpc": "2.0",
"id": None,
"error": {
"code": -32700,
"message": f"Parse error: {str(e)}"
}
}
print(json.dumps(error_response), flush=True)
except Exception as e:
# Log unexpected errors but continue running
self.server.logger.error(f"Unexpected error: {e}")
except KeyboardInterrupt:
pass
except EOFError:
# stdin closed
pass
def main():
"""Main entry point for the MCP server"""
if len(sys.argv) > 1 and sys.argv[1] == "--test":
# Test mode - show capabilities and run examples
server = ReminderMCPServer()
print("=== Customer Reminder MCP Server ===")
print("Available tools:")
for tool in server.get_tools():
print(f" - {tool.name}: {tool.description}")
print("\nAvailable resources:")
for resource in server.get_resources():
print(f" - {resource.name}: {resource.description}")
print("\nMCP Server initialized and ready!")
# Example usage
print("\n=== Example Tool Calls ===")
# Check due reminders (dry run)
print("1. Checking due reminders (dry run):")
result = server.call_tool("check_due_reminders", {"dry_run": True})
print(json.dumps(result, indent=2, default=str))
# Get customer reminders
print("\n2. Getting customer reminders:")
result = server.call_tool("get_customer_reminders", {
"include_sent": False})
print(json.dumps(result, indent=2, default=str))
print("\n3. Getting sheet status:")
result = server.call_tool("get_sheet_status", {})
print(json.dumps(result, indent=2, default=str))
else:
# MCP mode - run the transport layer
try:
server = ReminderMCPServer()
transport = MCPTransport(server)
transport.run()
except Exception as e:
logging.error(f"Failed to start MCP server: {e}")
sys.exit(1)
if __name__ == "__main__":
main()