Skip to main content
Glama

MCP Agent - AI Expense Tracker

by dev-muhammad
store.py•10.1 kB
"""SQLite data store for transactions""" import sqlite3 from typing import List, Optional from datetime import datetime import json from pathlib import Path from models import Transaction, TransactionCreate, TransactionUpdate, TransactionType, Category from config import settings class TransactionStore: """SQLite-based transaction storage""" def __init__(self, db_path: Path = settings.DB_PATH): self.db_path = db_path self._init_db() def _get_connection(self): """Get database connection""" conn = sqlite3.connect(str(self.db_path)) conn.row_factory = sqlite3.Row return conn def _init_db(self): """Initialize database schema""" conn = self._get_connection() cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS transactions ( id TEXT PRIMARY KEY, title TEXT NOT NULL, amount REAL NOT NULL, type TEXT NOT NULL, category TEXT NOT NULL, description TEXT, tags TEXT, date TEXT NOT NULL, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ) """) conn.commit() conn.close() def _row_to_transaction(self, row: sqlite3.Row) -> Transaction: """Convert database row to Transaction model""" return Transaction( id=row['id'], title=row['title'], amount=row['amount'], type=TransactionType(row['type']), category=Category(row['category']), description=row['description'], tags=json.loads(row['tags']) if row['tags'] else [], date=datetime.fromisoformat(row['date']), created_at=datetime.fromisoformat(row['created_at']), updated_at=datetime.fromisoformat(row['updated_at']) ) def get_all(self) -> List[Transaction]: """Get all transactions sorted by date (newest first)""" conn = self._get_connection() cursor = conn.cursor() cursor.execute(""" SELECT * FROM transactions ORDER BY date DESC """) transactions = [self._row_to_transaction(row) for row in cursor.fetchall()] conn.close() return transactions def get_by_id(self, transaction_id: str) -> Optional[Transaction]: """Get transaction by ID""" conn = self._get_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM transactions WHERE id = ?", (transaction_id,)) row = cursor.fetchone() conn.close() return self._row_to_transaction(row) if row else None def create(self, transaction_data: TransactionCreate) -> Transaction: """Create a new transaction""" import uuid transaction_id = str(uuid.uuid4()) now = datetime.now() transaction = Transaction( id=transaction_id, created_at=now, updated_at=now, **transaction_data.model_dump() ) conn = self._get_connection() cursor = conn.cursor() cursor.execute(""" INSERT INTO transactions (id, title, amount, type, category, description, tags, date, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( transaction.id, transaction.title, transaction.amount, transaction.type.value, transaction.category.value, transaction.description, json.dumps(transaction.tags), transaction.date.isoformat(), transaction.created_at.isoformat(), transaction.updated_at.isoformat() )) conn.commit() conn.close() return transaction def update(self, transaction_id: str, transaction_data: TransactionUpdate) -> Optional[Transaction]: """Update an existing transaction""" transaction = self.get_by_id(transaction_id) if not transaction: return None update_data = transaction_data.model_dump(exclude_unset=True) # Update fields for field, value in update_data.items(): setattr(transaction, field, value) transaction.updated_at = datetime.now() conn = self._get_connection() cursor = conn.cursor() cursor.execute(""" UPDATE transactions SET title = ?, amount = ?, type = ?, category = ?, description = ?, tags = ?, date = ?, updated_at = ? WHERE id = ? """, ( transaction.title, transaction.amount, transaction.type.value, transaction.category.value, transaction.description, json.dumps(transaction.tags), transaction.date.isoformat(), transaction.updated_at.isoformat(), transaction_id )) conn.commit() conn.close() return transaction def delete(self, transaction_id: str) -> bool: """Delete a transaction""" conn = self._get_connection() cursor = conn.cursor() cursor.execute("DELETE FROM transactions WHERE id = ?", (transaction_id,)) deleted = cursor.rowcount > 0 conn.commit() conn.close() return deleted def search(self, query: str) -> List[Transaction]: """Search transactions by title, description, or tags""" query = f"%{query.lower()}%" conn = self._get_connection() cursor = conn.cursor() cursor.execute(""" SELECT * FROM transactions WHERE LOWER(title) LIKE ? OR LOWER(description) LIKE ? OR LOWER(category) LIKE ? OR LOWER(tags) LIKE ? ORDER BY date DESC """, (query, query, query, query)) transactions = [self._row_to_transaction(row) for row in cursor.fetchall()] conn.close() return transactions def get_by_type(self, transaction_type: TransactionType) -> List[Transaction]: """Get transactions by type (income or expense)""" conn = self._get_connection() cursor = conn.cursor() cursor.execute(""" SELECT * FROM transactions WHERE type = ? ORDER BY date DESC """, (transaction_type.value,)) transactions = [self._row_to_transaction(row) for row in cursor.fetchall()] conn.close() return transactions def get_by_category(self, category: Category) -> List[Transaction]: """Get transactions by category""" conn = self._get_connection() cursor = conn.cursor() cursor.execute(""" SELECT * FROM transactions WHERE category = ? ORDER BY date DESC """, (category.value,)) transactions = [self._row_to_transaction(row) for row in cursor.fetchall()] conn.close() return transactions def get_by_date_range(self, start_date: datetime, end_date: datetime) -> List[Transaction]: """Get transactions within a date range""" conn = self._get_connection() cursor = conn.cursor() cursor.execute(""" SELECT * FROM transactions WHERE date BETWEEN ? AND ? ORDER BY date DESC """, (start_date.isoformat(), end_date.isoformat())) transactions = [self._row_to_transaction(row) for row in cursor.fetchall()] conn.close() return transactions def get_summary(self) -> dict: """Get transaction summary statistics""" conn = self._get_connection() cursor = conn.cursor() cursor.execute(""" SELECT SUM(CASE WHEN type = 'income' THEN amount ELSE 0 END) as total_income, SUM(CASE WHEN type = 'expense' THEN amount ELSE 0 END) as total_expenses, COUNT(CASE WHEN type = 'income' THEN 1 END) as income_count, COUNT(CASE WHEN type = 'expense' THEN 1 END) as expense_count, COUNT(*) as total_count FROM transactions """) row = cursor.fetchone() conn.close() total_income = row['total_income'] or 0 total_expenses = row['total_expenses'] or 0 return { "total_income": total_income, "total_expenses": total_expenses, "net_balance": total_income - total_expenses, "transaction_count": row['total_count'], "income_count": row['income_count'], "expense_count": row['expense_count'] } def get_category_summary(self) -> List[dict]: """Get summary by category""" conn = self._get_connection() cursor = conn.cursor() cursor.execute(""" SELECT category, SUM(amount) as total_amount, COUNT(*) as transaction_count FROM transactions GROUP BY category ORDER BY total_amount DESC """) rows = cursor.fetchall() # Calculate total for percentage total = sum(row['total_amount'] for row in rows) result = [] for row in rows: result.append({ "category": row['category'], "total_amount": row['total_amount'], "transaction_count": row['transaction_count'], "percentage": (row['total_amount'] / total * 100) if total > 0 else 0 }) conn.close() return result

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dev-muhammad/MCPAgent'

If you have feedback or need assistance with the MCP directory API, please join our Discord server