#!/usr/bin/env python3
"""
Incremental Plaid transaction sync.
Uses cursors to only fetch new/modified transactions since last sync.
"""
import subprocess
import httpx
import asyncio
import json
import os
from datetime import datetime
from typing import Optional, List, Dict, Any
PLAID_API_URL = "https://production.plaid.com"
PROJECT_DIR = "/Users/sfinnerty/CodingProjects/plaid-transactions-mcp"
OUTPUT_FILE = f"{PROJECT_DIR}/all_transactions.json"
CURSOR_FILE = f"{PROJECT_DIR}/sync_cursors.json"
def _get_keychain_value(service_name: str, account_name: Optional[str] = None) -> str:
"""Retrieve a value from macOS Keychain."""
cmd = ["security", "find-generic-password", "-s", service_name]
if account_name:
cmd.extend(["-a", account_name])
cmd.append("-w")
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
return result.stdout.strip()
def _get_all_plaid_accounts() -> List[Dict[str, str]]:
"""Retrieve all Plaid access tokens from keychain."""
result = subprocess.run(
["security", "dump-keychain"],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
text=True
)
accounts = []
lines = result.stdout.split('\n')
i = 0
while i < len(lines):
line = lines[i]
if '0x00000007 <blob>="PlaidTracker"' in line:
for j in range(i+1, min(len(lines), i+6)):
if '"acct"<blob>=' in lines[j]:
account_line = lines[j]
if 'access_token_' in account_line:
start = account_line.find('"access_token_') + 1
end = account_line.find('"', start)
account_name = account_line[start:end]
try:
token = _get_keychain_value("PlaidTracker", account_name)
friendly_name = account_name.replace("access_token_", "")
if "_" in friendly_name:
parts = friendly_name.split("_", 1)
if len(parts) > 1:
friendly_name = parts[1].replace("_", " ")
accounts.append({
"account_name": account_name,
"friendly_name": friendly_name,
"access_token": token
})
except:
pass
break
i += 1
return accounts
def load_cursors() -> Dict[str, str]:
"""Load saved cursors from file."""
if os.path.exists(CURSOR_FILE):
with open(CURSOR_FILE, 'r') as f:
return json.load(f)
return {}
def save_cursors(cursors: Dict[str, str]):
"""Save cursors to file."""
with open(CURSOR_FILE, 'w') as f:
json.dump(cursors, f, indent=2)
def load_existing_data() -> Dict[str, Any]:
"""Load existing transaction data if available."""
if os.path.exists(OUTPUT_FILE):
with open(OUTPUT_FILE, 'r') as f:
return json.load(f)
return None
async def _make_plaid_request(endpoint: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""Make a request to the Plaid API."""
client_id = _get_keychain_value("PlaidTracker", "plaid_client_id")
secret = _get_keychain_value("PlaidTracker", "plaid_secret")
data["client_id"] = client_id
data["secret"] = secret
async with httpx.AsyncClient() as client:
response = await client.post(
f"{PLAID_API_URL}{endpoint}",
json=data,
timeout=60.0
)
response.raise_for_status()
return response.json()
async def get_account_details(access_token: str) -> Dict[str, Any]:
"""Get account details including masks."""
response = await _make_plaid_request("/accounts/get", data={
"access_token": access_token
})
return {
acc["account_id"]: {
"mask": acc.get("mask", ""),
"name": acc.get("name", ""),
"official_name": acc.get("official_name", ""),
"type": acc.get("type", ""),
"subtype": acc.get("subtype", "")
}
for acc in response.get("accounts", [])
}
async def sync_transactions(access_token: str, cursor: Optional[str] = None) -> Dict[str, Any]:
"""
Sync transactions incrementally using cursor.
Returns added, modified, removed transactions and new cursor.
"""
added = []
modified = []
removed = []
has_more = True
request_data = {
"access_token": access_token,
"count": 500,
"options": {
"include_original_description": True
}
}
# Only request historical data on first sync (no cursor)
if not cursor:
request_data["options"]["days_requested"] = 730
current_cursor = cursor
while has_more:
if current_cursor:
request_data["cursor"] = current_cursor
response = await _make_plaid_request("/transactions/sync", data=request_data)
added.extend(response.get("added", []))
modified.extend(response.get("modified", []))
removed.extend(response.get("removed", []))
has_more = response.get("has_more", False)
current_cursor = response.get("next_cursor")
if added or modified or removed:
print(f" +{len(added)} added, ~{len(modified)} modified, -{len(removed)} removed...")
return {
"added": added,
"modified": modified,
"removed": removed,
"cursor": current_cursor
}
def merge_transactions(
existing: List[Dict],
added: List[Dict],
modified: List[Dict],
removed: List[Dict]
) -> List[Dict]:
"""Merge new transactions with existing ones."""
# Build lookup by transaction_id
txn_map = {t["transaction_id"]: t for t in existing}
# Remove deleted transactions
removed_ids = {r.get("transaction_id") for r in removed}
for tid in removed_ids:
txn_map.pop(tid, None)
# Update modified transactions
for txn in modified:
txn_map[txn["transaction_id"]] = txn
# Add new transactions
for txn in added:
txn_map[txn["transaction_id"]] = txn
# Return sorted by date (newest first)
return sorted(txn_map.values(), key=lambda x: x.get("date", ""), reverse=True)
async def main():
print("=" * 60)
print("Plaid Transaction Sync (Incremental)")
print("=" * 60)
print()
# Load existing data and cursors
existing_data = load_existing_data()
cursors = load_cursors()
is_first_sync = existing_data is None
if is_first_sync:
print("First sync - fetching full history (up to 2 years)...")
else:
print("Incremental sync - fetching only new transactions...")
print()
# Get all accounts
accounts = _get_all_plaid_accounts()
print(f"Found {len(accounts)} linked institutions:")
for acc in accounts:
cursor_status = "has cursor" if acc["friendly_name"] in cursors else "no cursor"
print(f" - {acc['friendly_name']} ({cursor_status})")
print()
# Prepare output data structure
if existing_data:
all_data = existing_data
all_data["last_sync"] = datetime.now().isoformat()
else:
all_data = {
"export_date": datetime.now().isoformat(),
"last_sync": datetime.now().isoformat(),
"institutions": []
}
# Build institution lookup for merging
inst_map = {inst["name"]: inst for inst in all_data.get("institutions", [])}
total_added = 0
total_modified = 0
total_removed = 0
total_transactions = 0
for account in accounts:
name = account["friendly_name"]
print(f"Syncing {name}...")
try:
# Get account details
account_details = await get_account_details(account["access_token"])
# Get existing cursor for this account
existing_cursor = cursors.get(name)
# Sync transactions
sync_result = await sync_transactions(
account["access_token"],
cursor=existing_cursor
)
# Save new cursor
cursors[name] = sync_result["cursor"]
# Get existing transactions for this institution
existing_txns = []
if name in inst_map:
existing_txns = inst_map[name].get("transactions", [])
# Add account info to new transactions
for txn in sync_result["added"] + sync_result["modified"]:
acc_id = txn.get("account_id", "")
if acc_id in account_details:
txn["_account_mask"] = account_details[acc_id].get("mask", "")
txn["_account_name"] = account_details[acc_id].get("name", "")
# Merge transactions
merged_txns = merge_transactions(
existing_txns,
sync_result["added"],
sync_result["modified"],
sync_result["removed"]
)
# Update institution data
inst_map[name] = {
"name": name,
"accounts": account_details,
"transaction_count": len(merged_txns),
"transactions": merged_txns
}
added_count = len(sync_result["added"])
modified_count = len(sync_result["modified"])
removed_count = len(sync_result["removed"])
total_added += added_count
total_modified += modified_count
total_removed += removed_count
total_transactions += len(merged_txns)
if added_count or modified_count or removed_count:
print(f" -> +{added_count} new, ~{modified_count} updated, -{removed_count} removed")
else:
print(f" -> No changes (up to date)")
print(f" -> {len(merged_txns)} total transactions")
except Exception as e:
print(f" ERROR: {e}")
if name not in inst_map:
inst_map[name] = {
"name": name,
"error": str(e),
"transactions": []
}
# Update institutions list
all_data["institutions"] = list(inst_map.values())
# Update summary
all_data["summary"] = {
"total_transactions": total_transactions,
"total_institutions": len(accounts),
"last_sync": {
"added": total_added,
"modified": total_modified,
"removed": total_removed
}
}
print()
print("=" * 60)
print(f"Sync complete:")
print(f" +{total_added} added, ~{total_modified} modified, -{total_removed} removed")
print(f" {total_transactions} total transactions across {len(accounts)} institutions")
print("=" * 60)
# Save data
with open(OUTPUT_FILE, "w") as f:
json.dump(all_data, f, indent=2, default=str)
# Save cursors
save_cursors(cursors)
print()
print(f"Saved to: {OUTPUT_FILE}")
print(f"Cursors saved to: {CURSOR_FILE}")
# File size
size_mb = os.path.getsize(OUTPUT_FILE) / (1024 * 1024)
print(f"File size: {size_mb:.2f} MB")
if __name__ == "__main__":
asyncio.run(main())