"""
Cache manager for Splitwise expenses.
Migrated from scripts/splitwise_cache.py for use in MCP server.
Provides atomic reservation pattern to prevent race conditions.
"""
import sqlite3
import uuid
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional
from dataclasses import dataclass
@dataclass
class SplitwiseExpense:
"""Represents a Splitwise expense from the cache."""
expense_id: str
description: str
cost: float
date: str
created_at: str
group_id: str
raw_json: str
class CacheManager:
"""
Local cache of Splitwise expenses for duplicate detection.
Implements atomic reservation pattern to prevent race conditions (Dec 13, 2025 fix).
"""
def __init__(self, db_path: str):
"""
Initialize the cache manager.
Args:
db_path: Path to SQLite database
"""
self.db_path = db_path
self.conn = sqlite3.connect(db_path, check_same_thread=False)
self.conn.row_factory = sqlite3.Row
self._create_schema()
def _create_schema(self):
"""Create the splitwise_cache and reservations tables."""
cursor = self.conn.cursor()
# Main cache table
cursor.execute('''
CREATE TABLE IF NOT EXISTS splitwise_cache (
expense_id TEXT PRIMARY KEY,
description TEXT NOT NULL,
cost REAL NOT NULL,
date TEXT NOT NULL,
created_at TEXT NOT NULL,
group_id TEXT NOT NULL,
order_id TEXT,
raw_json TEXT,
synced_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
# Reservation table for atomic duplicate prevention
cursor.execute('''
CREATE TABLE IF NOT EXISTS splitwise_reservations (
reservation_id TEXT PRIMARY KEY,
description TEXT NOT NULL,
cost REAL NOT NULL,
date TEXT NOT NULL,
group_id TEXT NOT NULL,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
expires_at TEXT NOT NULL,
order_id TEXT
)
''')
# Add order_id column if it doesn't exist (migration for existing DBs)
try:
cursor.execute("ALTER TABLE splitwise_reservations ADD COLUMN order_id TEXT")
except:
pass # Column already exists
# Index for fast date-based queries
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_date
ON splitwise_cache(date)
''')
# Index for reservation expiry cleanup
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_reservation_expiry
ON splitwise_reservations(expires_at)
''')
self.conn.commit()
def sync_expenses(self, expenses: List[Dict[str, Any]]):
"""
Sync expenses from Splitwise API to local cache.
Extracts order_id from details field for indexed duplicate detection.
Filters out soft-deleted expenses and removes them from cache if present.
Args:
expenses: List of expense dictionaries from Splitwise API
"""
cursor = self.conn.cursor()
import json
for expense in expenses:
# Skip and remove soft-deleted expenses from cache
if expense.get('deleted_at') is not None:
cursor.execute(
'DELETE FROM splitwise_cache WHERE expense_id = ?',
(str(expense['id']),)
)
continue
# Extract order_id from details field
# Supports formats:
# - "Order #113-6001239-5817844" (Amazon)
# - "Order #112818652919159448436" (Instacart)
# - Any "Order #<id>" format
details = expense.get('details', '') or ''
order_id = None
if details.startswith('Order #'):
order_id = details.replace('Order #', '').strip()
cursor.execute('''
INSERT OR REPLACE INTO splitwise_cache
(expense_id, description, cost, date, created_at, group_id, order_id, raw_json, synced_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
''', (
str(expense['id']),
expense.get('description', ''),
float(expense.get('cost', 0)),
expense.get('date', ''),
expense.get('created_at', ''),
str(expense.get('group_id', '')),
order_id,
json.dumps(expense)
))
self.conn.commit()
def get_expenses_for_timeframe(
self,
start_date: str,
end_date: Optional[str] = None
) -> List[SplitwiseExpense]:
"""
Get cached expenses for a date range.
Args:
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD), defaults to today
Returns:
List of SplitwiseExpense objects
"""
cursor = self.conn.cursor()
if end_date is None:
end_date = datetime.now().strftime('%Y-%m-%d')
cursor.execute('''
SELECT expense_id, description, cost, date, created_at, group_id, raw_json
FROM splitwise_cache
WHERE DATE(date) >= ? AND DATE(date) <= ?
ORDER BY date DESC
''', (start_date, end_date))
expenses = []
for row in cursor.fetchall():
expenses.append(SplitwiseExpense(
expense_id=row['expense_id'],
description=row['description'],
cost=row['cost'],
date=row['date'],
created_at=row['created_at'],
group_id=row['group_id'],
raw_json=row['raw_json']
))
return expenses
def check_duplicate(
self,
description: str,
amount: float,
date: str,
days_window: int = 7,
amount_tolerance: float = 1.0,
similarity_threshold: float = 0.7
) -> Dict[str, Any]:
"""
Check if an expense is a duplicate using Splitsage's algorithm.
Algorithm:
1. Check expenses within N days of the target date
2. Amount difference <= $1.00
3. Description similarity > 70% (word-based matching)
4. Also check pending reservations (not yet confirmed)
Args:
description: Expense description to check
amount: Expense amount
date: Expense date (YYYY-MM-DD)
days_window: Number of days to check (default 7)
amount_tolerance: Max amount difference in dollars (default $1.00)
similarity_threshold: Min similarity score (default 0.7)
Returns:
{
'is_duplicate': bool,
'confidence': float,
'existing_expense_id': str or None,
'reason': str
}
"""
# Calculate date range
target_date = datetime.strptime(date, '%Y-%m-%d')
start_date = (target_date - timedelta(days=days_window)).strftime('%Y-%m-%d')
end_date = (target_date + timedelta(days=days_window)).strftime('%Y-%m-%d')
# Get expenses in window
expenses = self.get_expenses_for_timeframe(start_date, end_date)
new_description_lower = description.lower()
# Check existing expenses
for expense in expenses:
amount_diff = abs(amount - expense.cost)
if amount_diff <= amount_tolerance:
similarity = self._calculate_similarity(
new_description_lower,
expense.description.lower()
)
if similarity > similarity_threshold:
return {
'is_duplicate': True,
'confidence': similarity,
'existing_expense_id': expense.expense_id,
'reason': f'Similar expense found: {expense.description} (${expense.cost:.2f}) on {expense.date}'
}
# Check pending reservations
cursor = self.conn.cursor()
now = datetime.now().isoformat()
cursor.execute('''
SELECT reservation_id, description, cost, date
FROM splitwise_reservations
WHERE expires_at >= ?
AND DATE(date) >= ? AND DATE(date) <= ?
''', (now, start_date, end_date))
for row in cursor.fetchall():
amount_diff = abs(amount - row['cost'])
if amount_diff <= amount_tolerance:
similarity = self._calculate_similarity(
new_description_lower,
row['description'].lower()
)
if similarity > similarity_threshold:
return {
'is_duplicate': True,
'confidence': similarity,
'existing_expense_id': None,
'reason': f'Pending reservation found: {row["description"]} (${row["cost"]:.2f}) on {row["date"]}'
}
return {
'is_duplicate': False,
'confidence': 0.0,
'existing_expense_id': None,
'reason': 'No duplicate found'
}
def _calculate_similarity(self, str1: str, str2: str) -> float:
"""
Calculate word-based similarity between two strings.
From Splitsage algorithm:
- Split into words (>2 chars)
- Count common words
- Similarity = common_words / max(words1, words2)
Args:
str1: First string (lowercase)
str2: Second string (lowercase)
Returns:
Similarity score (0.0 to 1.0)
"""
words1 = [w for w in str1.split() if len(w) > 2]
words2 = [w for w in str2.split() if len(w) > 2]
if not words1 or not words2:
return 0.0
words2_set = set(words2)
common_words = sum(1 for w in words1 if w in words2_set)
total_words = max(len(words1), len(words2))
return common_words / total_words if total_words > 0 else 0.0
def reserve_expense(
self,
description: str,
amount: float,
date: str,
group_id: str,
ttl_minutes: int = 5,
order_id: Optional[str] = None
) -> Dict[str, Any]:
"""
Atomically reserve an expense slot to prevent race conditions.
Uses EXCLUSIVE transaction for true atomicity under concurrency.
Args:
description: Expense description
amount: Expense amount
date: Expense date (YYYY-MM-DD)
group_id: Splitwise group ID
ttl_minutes: Reservation time-to-live in minutes (default: 5)
order_id: Optional order ID for idempotent duplicate detection
Returns:
{
'reservation_id': str or None,
'is_duplicate': bool,
'reason': str,
'existing_expense_id': str or None (if duplicate found by order_id)
}
"""
cursor = self.conn.cursor()
# FIRST: Check by order_id if provided (most reliable duplicate detection)
if order_id:
existing = self.find_by_order_id(order_id)
if existing:
return {
'reservation_id': None,
'is_duplicate': True,
'reason': f'Order ID {order_id} already synced as expense {existing["expense_id"]}',
'existing_expense_id': existing['expense_id']
}
# BEGIN EXCLUSIVE locks the database for writing
cursor.execute('BEGIN EXCLUSIVE')
try:
# Clean up expired reservations
now = datetime.now().isoformat()
cursor.execute('DELETE FROM splitwise_reservations WHERE expires_at < ?', (now,))
# Check for duplicates (including pending reservations) - fuzzy matching
dup_check = self.check_duplicate(description, amount, date)
if dup_check['is_duplicate']:
self.conn.rollback()
return {
'reservation_id': None,
'is_duplicate': True,
'reason': dup_check['reason'],
'existing_expense_id': dup_check.get('existing_expense_id')
}
# Check if order_id already has a pending reservation
if order_id:
cursor.execute('''
SELECT reservation_id FROM splitwise_reservations
WHERE order_id = ? AND expires_at >= ?
''', (order_id, now))
existing_reservation = cursor.fetchone()
if existing_reservation:
self.conn.rollback()
return {
'reservation_id': None,
'is_duplicate': True,
'reason': f'Order ID {order_id} already has pending reservation {existing_reservation["reservation_id"]}',
'existing_expense_id': None
}
# Create reservation
reservation_id = f"temp_{uuid.uuid4().hex[:12]}"
expires_at = (datetime.now() + timedelta(minutes=ttl_minutes)).isoformat()
cursor.execute('''
INSERT INTO splitwise_reservations
(reservation_id, description, cost, date, group_id, expires_at, order_id)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (reservation_id, description, amount, date, group_id, expires_at, order_id))
self.conn.commit()
return {
'reservation_id': reservation_id,
'is_duplicate': False,
'reason': 'Reservation created successfully',
'existing_expense_id': None
}
except Exception as e:
self.conn.rollback()
raise
def confirm_reservation(self, reservation_id: str, expense_id: str):
"""
Confirm a reservation by converting it to a real expense.
Args:
reservation_id: The reservation ID to confirm
expense_id: The real Splitwise expense ID
Raises:
ValueError: If reservation not found or expired
"""
cursor = self.conn.cursor()
# Get reservation details
cursor.execute('''
SELECT description, cost, date, group_id, expires_at
FROM splitwise_reservations
WHERE reservation_id = ?
''', (reservation_id,))
row = cursor.fetchone()
if not row:
raise ValueError(f"Reservation {reservation_id} not found")
# Check expiry
expires_at = datetime.fromisoformat(row['expires_at'])
if datetime.now() > expires_at:
cursor.execute('DELETE FROM splitwise_reservations WHERE reservation_id = ?', (reservation_id,))
self.conn.commit()
raise ValueError(f"Reservation {reservation_id} has expired")
# Add to cache
cursor.execute('''
INSERT OR REPLACE INTO splitwise_cache
(expense_id, description, cost, date, created_at, group_id, synced_at)
VALUES (?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
''', (
str(expense_id),
row['description'],
row['cost'],
row['date'],
datetime.now().isoformat(),
row['group_id']
))
# Delete reservation
cursor.execute('DELETE FROM splitwise_reservations WHERE reservation_id = ?', (reservation_id,))
self.conn.commit()
def cancel_reservation(self, reservation_id: str):
"""
Cancel a reservation (rollback on error).
Args:
reservation_id: The reservation ID to cancel
"""
cursor = self.conn.cursor()
cursor.execute('DELETE FROM splitwise_reservations WHERE reservation_id = ?', (reservation_id,))
self.conn.commit()
def delete_expense(self, expense_id: str):
"""
Delete an expense from the cache.
Args:
expense_id: Splitwise expense ID to delete
"""
cursor = self.conn.cursor()
cursor.execute('DELETE FROM splitwise_cache WHERE expense_id = ?', (str(expense_id),))
self.conn.commit()
def find_by_order_id(self, order_id: str) -> Optional[Dict[str, Any]]:
"""
Find expense by order ID using indexed column (fast lookup).
Searches the cached expenses for a matching order_id.
This is the most reliable way to prevent duplicates.
Args:
order_id: Order ID to search for (e.g., "113-6001239-5817844")
Returns:
Expense dict if found, None otherwise
{
'expense_id': str,
'description': str,
'cost': float,
'date': str,
'order_id': str,
'group_id': str
}
"""
cursor = self.conn.cursor()
# Use indexed column for fast lookup
cursor.execute('''
SELECT expense_id, description, cost, date, group_id, order_id
FROM splitwise_cache
WHERE order_id = ?
LIMIT 1
''', (order_id,))
row = cursor.fetchone()
if not row:
return None
return {
'expense_id': row['expense_id'],
'description': row['description'],
'cost': row['cost'],
'date': row['date'],
'order_id': row['order_id'],
'group_id': row['group_id']
}
def close(self):
"""Close database connection."""
self.conn.close()