list_transactions
List transactions for a bank account using cached data from the last 90 days, refreshed daily. Filter by date range, limit results, and optionally force a cache refresh.
Instructions
List transactions for a bank account, served from a local cache that keeps the last ~90 days. The cache is refreshed at most once per 24h (Akahu Personal only refreshes upstream daily); pass force=True to bypass the TTL.
Args: account: account id or fuzzy substring match against account name start: ISO date (YYYY-MM-DD), inclusive lower bound on transaction date end: ISO date (YYYY-MM-DD), inclusive upper bound limit: max rows to return (default 100, newest first) force: bypass the 24h cache TTL
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| account | Yes | ||
| start | No | ||
| end | No | ||
| limit | No | ||
| force | No |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- akahu_mcp/server.py:91-124 (handler)The 'list_transactions' MCP tool handler function. It takes account (id or name), optional start/end dates, limit, and force flag. It resolves the account, ensures transactions are fresh in cache, queries cached transactions, and returns the results.
@mcp.tool() async def list_transactions( account: str, start: str | None = None, end: str | None = None, limit: int = 100, force: bool = False, ) -> dict[str, Any]: """List transactions for a bank account, served from a local cache that keeps the last ~90 days. The cache is refreshed at most once per 24h (Akahu Personal only refreshes upstream daily); pass force=True to bypass the TTL. Args: account: account id or fuzzy substring match against account name start: ISO date (YYYY-MM-DD), inclusive lower bound on transaction date end: ISO date (YYYY-MM-DD), inclusive upper bound limit: max rows to return (default 100, newest first) force: bypass the 24h cache TTL """ accounts = await sync.ensure_accounts_fresh(force=force) acc = _find_account(accounts, account) if acc is None: return {"error": f"No account matching {account!r}"} fetched = await sync.ensure_transactions_fresh(acc["_id"], force=force) rows = cache.get_transactions_cached(acc["_id"], start=start, end=end, limit=limit) return { "account_id": acc["_id"], "account_name": acc.get("name"), "fetched_from_akahu": fetched, "returned": len(rows), "transactions": rows, } - akahu_mcp/server.py:91-91 (registration)The tool is registered via the @mcp.tool() decorator on line 91, which registers 'list_transactions' as an MCP tool with FastMCP.
@mcp.tool() - akahu_mcp/server.py:92-110 (schema)Function signature and docstring define the schema: account (str, required), start/end (optional ISO date strings), limit (int, default 100), force (bool, default False).
async def list_transactions( account: str, start: str | None = None, end: str | None = None, limit: int = 100, force: bool = False, ) -> dict[str, Any]: """List transactions for a bank account, served from a local cache that keeps the last ~90 days. The cache is refreshed at most once per 24h (Akahu Personal only refreshes upstream daily); pass force=True to bypass the TTL. Args: account: account id or fuzzy substring match against account name start: ISO date (YYYY-MM-DD), inclusive lower bound on transaction date end: ISO date (YYYY-MM-DD), inclusive upper bound limit: max rows to return (default 100, newest first) force: bypass the 24h cache TTL """ - akahu_mcp/sync.py:47-86 (helper)Helper function 'ensure_transactions_fresh' that handles cache TTL logic and calls the Akahu API to fetch transactions, then stores them in the local SQLite cache.
async def ensure_transactions_fresh( account_id: str, lookback_days: int = DEFAULT_LOOKBACK_DAYS, force: bool = False, ) -> int: """Refresh transactions for one account. Returns the number of rows fetched from Akahu (which is upserted into the cache; many may already exist).""" cache.init_db() state = cache.get_sync_state(account_id) now = _now() if not force and state and (now - state["last_synced_at"]) < DEFAULT_TTL_SECONDS: logger.info( "Skipping refresh for %s — last synced %ds ago", account_id, now - state["last_synced_at"], ) return 0 end_dt = datetime.now(timezone.utc) if state is None: start_dt = end_dt - timedelta(days=lookback_days) else: # Resync from `last_synced_at` minus a small overlap so late-settling # transactions are picked up; INSERT OR REPLACE deduplicates. start_dt = datetime.fromtimestamp( state["last_synced_at"], tz=timezone.utc ) - timedelta(days=OVERLAP_DAYS) start_iso = _iso_z(start_dt) end_iso = _iso_z(end_dt) logger.info("Fetching transactions for %s from %s to %s", account_id, start_iso, end_iso) client = AkahuClient() txns = await client.get_transactions(account_id, start=start_iso, end=end_iso) cache.put_transactions(account_id, txns) oldest_iso = start_dt.date().isoformat() cache.set_sync_state(account_id, now, oldest_iso) return len(txns) - akahu_mcp/cache.py:116-138 (helper)Helper function 'get_transactions_cached' that queries the local SQLite cache for transactions by account_id, with optional start/end date filtering and limit.
def get_transactions_cached( account_id: str, start: str | None = None, end: str | None = None, limit: int | None = None, ) -> list[dict[str, Any]]: sql = ( "SELECT id, account_id, date, amount, description, merchant_name " "FROM transactions WHERE account_id = ?" ) params: list[Any] = [account_id] if start: sql += " AND date >= ?" params.append(start) if end: sql += " AND date <= ?" params.append(end) sql += " ORDER BY date DESC" if limit: sql += f" LIMIT {int(limit)}" with _connect() as conn: rows = conn.execute(sql, params).fetchall() return [dict(r) for r in rows]