Skip to main content
Glama
zvika-finally

Marqeta DiVA API MCP Server

local_storage.py11.4 kB
"""Local SQLite storage for complete transaction data.""" import json import sqlite3 import sys from pathlib import Path from typing import Any, Dict, List, Optional class TransactionStorage: """Local SQLite storage for complete transaction data.""" def __init__(self, db_path: str = "./transactions.db"): """ Initialize the transaction storage. Args: db_path: Path to SQLite database file """ self.db_path = Path(db_path) self.db_path.parent.mkdir(parents=True, exist_ok=True) print(f"[Storage] Initializing SQLite database at {self.db_path}...", file=sys.stderr) self.conn = sqlite3.connect(str(self.db_path)) self.conn.row_factory = sqlite3.Row # Return rows as dictionaries self._create_tables() print(f"[Storage] SQLite database ready", file=sys.stderr) def _create_tables(self) -> None: """Create database tables if they don't exist.""" cursor = self.conn.cursor() # Main transactions table cursor.execute(""" CREATE TABLE IF NOT EXISTS transactions ( transaction_token TEXT PRIMARY KEY, view_name TEXT NOT NULL, aggregation TEXT NOT NULL, merchant_name TEXT, transaction_amount REAL, transaction_type TEXT, state TEXT, user_token TEXT, card_token TEXT, business_user_token TEXT, created_time TEXT, transaction_timestamp TEXT, network TEXT, merchant_category_code TEXT, currency_code TEXT, full_data TEXT NOT NULL, indexed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # Create indexes for common queries cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_merchant_name ON transactions(merchant_name) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_transaction_amount ON transactions(transaction_amount) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_user_token ON transactions(user_token) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_business_user_token ON transactions(business_user_token) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_created_time ON transactions(created_time) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_view_name ON transactions(view_name) """) self.conn.commit() def add_transactions( self, transactions: List[Dict[str, Any]], view_name: str, aggregation: str ) -> int: """ Add or update transactions in the database. Args: transactions: List of transaction dictionaries view_name: DiVA view name aggregation: Aggregation level Returns: Number of transactions added/updated """ cursor = self.conn.cursor() count = 0 for txn in transactions: transaction_token = txn.get("transaction_token") if not transaction_token: print(f"[Storage] Warning: Transaction missing token, skipping", file=sys.stderr) continue # Extract common fields for indexing merchant_name = txn.get("merchant_name", txn.get("acquirer_merchant_name")) transaction_amount = txn.get("transaction_amount") transaction_type = txn.get("transaction_type") state = txn.get("state", txn.get("transaction_status")) user_token = txn.get("user_token", txn.get("acting_user_token")) card_token = txn.get("card_token") business_user_token = txn.get("business_user_token") created_time = txn.get("created_time", txn.get("transaction_timestamp")) transaction_timestamp = txn.get("transaction_timestamp") network = txn.get("network") merchant_category_code = txn.get("merchant_category_code") currency_code = txn.get("currency_code") # Store full transaction as JSON full_data = json.dumps(txn) # Upsert (insert or replace) cursor.execute(""" INSERT OR REPLACE INTO transactions ( transaction_token, view_name, aggregation, merchant_name, transaction_amount, transaction_type, state, user_token, card_token, business_user_token, created_time, transaction_timestamp, network, merchant_category_code, currency_code, full_data ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( transaction_token, view_name, aggregation, merchant_name, transaction_amount, transaction_type, state, user_token, card_token, business_user_token, created_time, transaction_timestamp, network, merchant_category_code, currency_code, full_data )) count += 1 self.conn.commit() print(f"[Storage] Added/updated {count} transactions", file=sys.stderr) return count def get_transactions( self, transaction_tokens: List[str] ) -> List[Dict[str, Any]]: """ Get transactions by their tokens. Args: transaction_tokens: List of transaction tokens Returns: List of complete transaction dictionaries """ if not transaction_tokens: return [] cursor = self.conn.cursor() placeholders = ",".join("?" * len(transaction_tokens)) cursor.execute(f""" SELECT full_data FROM transactions WHERE transaction_token IN ({placeholders}) """, transaction_tokens) results = [] for row in cursor.fetchall(): full_data = json.loads(row["full_data"]) results.append(full_data) return results def query_transactions( self, filters: Optional[Dict[str, Any]] = None, limit: int = 100, offset: int = 0, order_by: str = "created_time DESC" ) -> Dict[str, Any]: """ Query transactions with filters. Args: filters: Dictionary of filters (e.g., {"merchant_name": "Starbucks", "transaction_amount": {">": 10}}) limit: Maximum number of results offset: Offset for pagination order_by: SQL ORDER BY clause Returns: Dictionary with results and metadata """ cursor = self.conn.cursor() # Build WHERE clause where_clauses = [] params = [] if filters: for field, value in filters.items(): if isinstance(value, dict): # Handle operators like {"transaction_amount": {">": 10}} for op, val in value.items(): if op in [">", "<", ">=", "<=", "=", "!="]: where_clauses.append(f"{field} {op} ?") params.append(val) elif op == "like": where_clauses.append(f"{field} LIKE ?") params.append(f"%{val}%") else: # Simple equality where_clauses.append(f"{field} = ?") params.append(value) where_clause = " AND ".join(where_clauses) if where_clauses else "1=1" # Get total count cursor.execute(f""" SELECT COUNT(*) as count FROM transactions WHERE {where_clause} """, params) total_count = cursor.fetchone()["count"] # Get results cursor.execute(f""" SELECT full_data FROM transactions WHERE {where_clause} ORDER BY {order_by} LIMIT ? OFFSET ? """, params + [limit, offset]) results = [] for row in cursor.fetchall(): full_data = json.loads(row["full_data"]) results.append(full_data) return { "total": total_count, "count": len(results), "offset": offset, "limit": limit, "is_more": offset + len(results) < total_count, "data": results } def get_stats(self) -> Dict[str, Any]: """ Get statistics about the local storage. Returns: Dictionary with storage statistics """ cursor = self.conn.cursor() # Total transactions cursor.execute("SELECT COUNT(*) as count FROM transactions") total_count = cursor.fetchone()["count"] # By view cursor.execute(""" SELECT view_name, aggregation, COUNT(*) as count FROM transactions GROUP BY view_name, aggregation """) by_view = [dict(row) for row in cursor.fetchall()] # Date range cursor.execute(""" SELECT MIN(created_time) as earliest, MAX(created_time) as latest FROM transactions """) date_range = dict(cursor.fetchone()) # Database file size db_size = self.db_path.stat().st_size if self.db_path.exists() else 0 return { "database_path": str(self.db_path.absolute()), "total_transactions": total_count, "by_view": by_view, "date_range": date_range, "database_size_bytes": db_size, "database_size_mb": round(db_size / (1024 * 1024), 2) } def clear(self, view_name: Optional[str] = None) -> int: """ Clear transactions from the database. Args: view_name: If specified, only clear transactions from this view Returns: Number of transactions deleted """ cursor = self.conn.cursor() if view_name: cursor.execute("DELETE FROM transactions WHERE view_name = ?", (view_name,)) count = cursor.rowcount print(f"[Storage] Cleared {count} transactions from view '{view_name}'", file=sys.stderr) else: cursor.execute("DELETE FROM transactions") count = cursor.rowcount print(f"[Storage] Cleared all {count} transactions", file=sys.stderr) self.conn.commit() return count def close(self) -> None: """Close the database connection.""" self.conn.close() def __enter__(self): """Context manager entry.""" return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit.""" self.close() # Global storage instance (lazy-loaded) _storage: TransactionStorage | None = None def get_storage(db_path: str = "./transactions.db") -> TransactionStorage: """Get or create the global transaction storage instance.""" global _storage if _storage is None: _storage = TransactionStorage(db_path=db_path) return _storage

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/zvika-finally/marqeta-diva-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server