"""
MCP tool handlers for Splitwise operations.
Each handler processes tool arguments and returns structured results.
"""
from datetime import datetime, timedelta
from typing import Any, Dict
from .cache import CacheManager
from .client import SplitwiseClient, SplitwiseError
async def handle_check_duplicate(
cache_manager: CacheManager,
arguments: Dict[str, Any]
) -> Dict[str, Any]:
"""
Handle splitwise_check_duplicate tool.
Args:
cache_manager: Cache manager instance
arguments: Tool arguments
- description: str
- amount: float
- date: str (YYYY-MM-DD)
- group_id: str
Returns:
{
"is_duplicate": bool,
"reason": str,
"existing_expense_id": str or None,
"similarity_score": float
}
"""
try:
description = arguments["description"]
amount = float(arguments["amount"])
date = arguments["date"]
# group_id = arguments["group_id"] # Not used in check_duplicate currently
result = cache_manager.check_duplicate(description, amount, date)
return {
"is_duplicate": result["is_duplicate"],
"reason": result["reason"],
"existing_expense_id": result.get("existing_expense_id"),
"similarity_score": result.get("confidence", 0.0)
}
except KeyError as e:
return {
"error": f"Missing required argument: {e}",
"status": "error"
}
except Exception as e:
return {
"error": f"Failed to check duplicate: {e}",
"status": "error"
}
async def handle_reserve_expense(
cache_manager: CacheManager,
arguments: Dict[str, Any]
) -> Dict[str, Any]:
"""
Handle splitwise_reserve_expense tool.
Args:
cache_manager: Cache manager instance
arguments: Tool arguments
- description: str
- amount: float
- date: str (YYYY-MM-DD)
- group_id: str
- order_id: str (optional, for idempotent duplicate detection)
Returns:
{
"reservation_id": str or None,
"expires_at": str (ISO timestamp),
"is_duplicate": bool,
"reason": str,
"existing_expense_id": str or None
}
"""
try:
description = arguments["description"]
amount = float(arguments["amount"])
date = arguments["date"]
group_id = arguments["group_id"]
order_id = arguments["order_id"] # Required for idempotent duplicate detection
result = cache_manager.reserve_expense(
description=description,
amount=amount,
date=date,
group_id=group_id,
ttl_minutes=5,
order_id=order_id
)
# Calculate expiry time
expires_at = None
if result["reservation_id"]:
expires_at = (datetime.now() + timedelta(minutes=5)).isoformat()
return {
"reservation_id": result["reservation_id"],
"expires_at": expires_at,
"is_duplicate": result["is_duplicate"],
"reason": result["reason"],
"existing_expense_id": result.get("existing_expense_id")
}
except KeyError as e:
return {
"error": f"Missing required argument: {e}",
"status": "error"
}
except Exception as e:
return {
"error": f"Failed to reserve expense: {e}",
"status": "error"
}
async def handle_create_expense(
cache_manager: CacheManager,
splitwise_client: SplitwiseClient,
arguments: Dict[str, Any]
) -> Dict[str, Any]:
"""
Handle splitwise_create_expense tool.
Args:
cache_manager: Cache manager instance
splitwise_client: Splitwise API client
arguments: Tool arguments
- reservation_id: str
- description: str
- amount: float
- date: str (YYYY-MM-DD)
- group_id: str
- payer_id: str
- partner_id: str
- order_id: str (optional)
Returns:
{
"status": "created",
"expense_id": str,
"payer_owes": float,
"partner_owes": float,
"total": float,
"url": str
}
"""
try:
reservation_id = arguments["reservation_id"]
description = arguments["description"]
amount = float(arguments["amount"])
date = arguments["date"]
# group_id = arguments["group_id"] # Already in client config
order_id = arguments.get("order_id")
# Verify client user IDs match arguments (validation)
payer_id = arguments["payer_id"]
partner_id = arguments["partner_id"]
if splitwise_client.payer_id != payer_id:
raise ValueError(
f"payer_id mismatch: config={splitwise_client.payer_id}, "
f"argument={payer_id}"
)
if splitwise_client.partner_id != partner_id:
raise ValueError(
f"partner_id mismatch: config={splitwise_client.partner_id}, "
f"argument={partner_id}"
)
# Get split ratio (default 50/50)
split_ratio = arguments.get("split_ratio", "50/50")
# Create expense in Splitwise
try:
expense = splitwise_client.create_expense(
description=description,
amount=amount,
date=date,
split_ratio=split_ratio,
order_id=order_id
)
except SplitwiseError as e:
# Rollback reservation on API error
cache_manager.cancel_reservation(reservation_id)
return {
"error": f"Splitwise API error: {e}",
"status": "error"
}
expense_id = str(expense["id"])
# Confirm reservation (converts to real expense in cache)
try:
cache_manager.confirm_reservation(reservation_id, expense_id)
except ValueError as e:
# Reservation expired or not found
# Expense was created in Splitwise but not cached
# This is OK - cache can be synced later
# Log warning but return success
return {
"status": "created_but_not_cached",
"expense_id": expense_id,
"warning": f"Reservation error: {e}. Expense created but not cached. Run sync_cache to update.",
"total": amount
}
# Calculate validated split amounts based on split_ratio
parts = split_ratio.split("/")
payer_pct = int(parts[0])
payer_owes = round(amount * payer_pct / 100, 2)
partner_owes = round(amount - payer_owes, 2)
return {
"status": "created",
"expense_id": expense_id,
"payer_owes": payer_owes,
"partner_owes": partner_owes,
"total": amount,
"url": f"https://secure.splitwise.com/#/expenses/{expense_id}"
}
except KeyError as e:
return {
"error": f"Missing required argument: {e}",
"status": "error"
}
except Exception as e:
# Cancel reservation on any error
if "reservation_id" in arguments:
try:
cache_manager.cancel_reservation(arguments["reservation_id"])
except:
pass # Reservation already cancelled or expired
return {
"error": f"Failed to create expense: {e}",
"status": "error"
}
async def handle_sync_cache(
cache_manager: CacheManager,
splitwise_client: SplitwiseClient,
arguments: Dict[str, Any]
) -> Dict[str, Any]:
"""
Handle splitwise_sync_cache tool.
Args:
cache_manager: Cache manager instance
splitwise_client: Splitwise API client
arguments: Tool arguments
- group_id: str
- since_date: str (YYYY-MM-DD, optional)
- limit: int (optional, default 100)
Returns:
{
"synced_count": int,
"last_sync_at": str (ISO timestamp),
"cache_size": int
}
"""
try:
# group_id = arguments["group_id"] # Already in client config
since_date = arguments.get("since_date")
limit = arguments.get("limit", 100)
# Determine since_date (default to 7 days ago)
if since_date is None:
days = 7
since_date_dt = datetime.now() - timedelta(days=days)
since_date = since_date_dt.strftime("%Y-%m-%d")
# Fetch expenses from Splitwise API (including deleted for proper cache sync)
try:
expenses = splitwise_client.get_expenses_for_sync(
dated_after=since_date,
limit=limit
)
except SplitwiseError as e:
return {
"error": f"Splitwise API error: {e}",
"status": "error"
}
# Sync to cache
cache_manager.sync_expenses(expenses)
# Get current cache size
all_expenses = cache_manager.get_expenses_for_timeframe(
start_date="2020-01-01", # Far past date to get all
end_date=None
)
return {
"synced_count": len(expenses),
"last_sync_at": datetime.now().isoformat(),
"cache_size": len(all_expenses)
}
except KeyError as e:
return {
"error": f"Missing required argument: {e}",
"status": "error"
}
except Exception as e:
return {
"error": f"Failed to sync cache: {e}",
"status": "error"
}
async def handle_get_expenses(
cache_manager: CacheManager,
arguments: Dict[str, Any]
) -> Dict[str, Any]:
"""
Handle splitwise_get_expenses tool.
Args:
cache_manager: Cache manager instance
arguments: Tool arguments
- group_id: str
- start_date: str (YYYY-MM-DD)
- end_date: str (YYYY-MM-DD, optional)
Returns:
{
"expenses": [
{
"id": str,
"description": str,
"amount": float,
"date": str,
"payer_owes": float,
"partner_owes": float
}
],
"total_count": int
}
"""
try:
# group_id = arguments["group_id"] # Could filter by group in future
start_date = arguments["start_date"]
end_date = arguments.get("end_date")
# Get expenses from cache
expenses = cache_manager.get_expenses_for_timeframe(
start_date=start_date,
end_date=end_date
)
# Format expenses for output
formatted_expenses = []
for exp in expenses:
# Calculate split amounts (default 50/50 for cached expenses)
payer_owes = round(exp.cost / 2, 2)
partner_owes = round(exp.cost - payer_owes, 2)
formatted_expenses.append({
"id": exp.expense_id,
"description": exp.description,
"amount": exp.cost,
"date": exp.date,
"payer_owes": payer_owes,
"partner_owes": partner_owes
})
return {
"expenses": formatted_expenses,
"total_count": len(formatted_expenses)
}
except KeyError as e:
return {
"error": f"Missing required argument: {e}",
"status": "error"
}
except Exception as e:
return {
"error": f"Failed to get expenses: {e}",
"status": "error"
}
async def handle_delete_expense(
cache_manager: CacheManager,
splitwise_client: SplitwiseClient,
arguments: Dict[str, Any]
) -> Dict[str, Any]:
"""
Handle splitwise_delete_expense tool.
Args:
cache_manager: Cache manager instance
splitwise_client: Splitwise API client
arguments: Tool arguments
- expense_id: str
- group_id: str
Returns:
{
"status": "deleted",
"expense_id": str
}
"""
try:
expense_id = arguments["expense_id"]
# group_id = arguments["group_id"] # Could validate group in future
# Delete from Splitwise
try:
splitwise_client.delete_expense(expense_id)
except SplitwiseError as e:
return {
"error": f"Splitwise API error: {e}",
"status": "error"
}
# Delete from cache
cache_manager.delete_expense(expense_id)
return {
"status": "deleted",
"expense_id": expense_id
}
except KeyError as e:
return {
"error": f"Missing required argument: {e}",
"status": "error"
}
except Exception as e:
return {
"error": f"Failed to delete expense: {e}",
"status": "error"
}
async def handle_find_by_order_id(
cache_manager: CacheManager,
arguments: Dict[str, Any]
) -> Dict[str, Any]:
"""
Handle splitwise_find_by_order_id tool.
Searches cached Splitwise expenses for an order ID in the notes field.
This is the most reliable duplicate detection method.
Args:
cache_manager: Cache manager instance
arguments: Tool arguments
- order_id: str (Amazon order ID)
- group_id: str (optional, for future filtering)
Returns:
{
"found": bool,
"expense": {
"expense_id": str,
"description": str,
"cost": float,
"date": str,
"notes": str
} or None
}
"""
try:
order_id = arguments["order_id"]
# Search for expense by order_id in notes
expense = cache_manager.find_by_order_id(order_id)
if expense:
return {
"found": True,
"expense": expense
}
else:
return {
"found": False,
"expense": None
}
except KeyError as e:
return {
"error": f"Missing required argument: {e}",
"status": "error"
}
except Exception as e:
return {
"error": f"Failed to find expense: {e}",
"status": "error"
}