store.pyā¢10.1 kB
"""SQLite data store for transactions"""
import sqlite3
from typing import List, Optional
from datetime import datetime
import json
from pathlib import Path
from models import Transaction, TransactionCreate, TransactionUpdate, TransactionType, Category
from config import settings
class TransactionStore:
"""SQLite-based transaction storage"""
def __init__(self, db_path: Path = settings.DB_PATH):
self.db_path = db_path
self._init_db()
def _get_connection(self):
"""Get database connection"""
conn = sqlite3.connect(str(self.db_path))
conn.row_factory = sqlite3.Row
return conn
def _init_db(self):
"""Initialize database schema"""
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS transactions (
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
amount REAL NOT NULL,
type TEXT NOT NULL,
category TEXT NOT NULL,
description TEXT,
tags TEXT,
date TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
""")
conn.commit()
conn.close()
def _row_to_transaction(self, row: sqlite3.Row) -> Transaction:
"""Convert database row to Transaction model"""
return Transaction(
id=row['id'],
title=row['title'],
amount=row['amount'],
type=TransactionType(row['type']),
category=Category(row['category']),
description=row['description'],
tags=json.loads(row['tags']) if row['tags'] else [],
date=datetime.fromisoformat(row['date']),
created_at=datetime.fromisoformat(row['created_at']),
updated_at=datetime.fromisoformat(row['updated_at'])
)
def get_all(self) -> List[Transaction]:
"""Get all transactions sorted by date (newest first)"""
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM transactions
ORDER BY date DESC
""")
transactions = [self._row_to_transaction(row) for row in cursor.fetchall()]
conn.close()
return transactions
def get_by_id(self, transaction_id: str) -> Optional[Transaction]:
"""Get transaction by ID"""
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute("SELECT * FROM transactions WHERE id = ?", (transaction_id,))
row = cursor.fetchone()
conn.close()
return self._row_to_transaction(row) if row else None
def create(self, transaction_data: TransactionCreate) -> Transaction:
"""Create a new transaction"""
import uuid
transaction_id = str(uuid.uuid4())
now = datetime.now()
transaction = Transaction(
id=transaction_id,
created_at=now,
updated_at=now,
**transaction_data.model_dump()
)
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO transactions
(id, title, amount, type, category, description, tags, date, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
transaction.id,
transaction.title,
transaction.amount,
transaction.type.value,
transaction.category.value,
transaction.description,
json.dumps(transaction.tags),
transaction.date.isoformat(),
transaction.created_at.isoformat(),
transaction.updated_at.isoformat()
))
conn.commit()
conn.close()
return transaction
def update(self, transaction_id: str, transaction_data: TransactionUpdate) -> Optional[Transaction]:
"""Update an existing transaction"""
transaction = self.get_by_id(transaction_id)
if not transaction:
return None
update_data = transaction_data.model_dump(exclude_unset=True)
# Update fields
for field, value in update_data.items():
setattr(transaction, field, value)
transaction.updated_at = datetime.now()
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute("""
UPDATE transactions
SET title = ?, amount = ?, type = ?, category = ?,
description = ?, tags = ?, date = ?, updated_at = ?
WHERE id = ?
""", (
transaction.title,
transaction.amount,
transaction.type.value,
transaction.category.value,
transaction.description,
json.dumps(transaction.tags),
transaction.date.isoformat(),
transaction.updated_at.isoformat(),
transaction_id
))
conn.commit()
conn.close()
return transaction
def delete(self, transaction_id: str) -> bool:
"""Delete a transaction"""
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM transactions WHERE id = ?", (transaction_id,))
deleted = cursor.rowcount > 0
conn.commit()
conn.close()
return deleted
def search(self, query: str) -> List[Transaction]:
"""Search transactions by title, description, or tags"""
query = f"%{query.lower()}%"
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM transactions
WHERE LOWER(title) LIKE ?
OR LOWER(description) LIKE ?
OR LOWER(category) LIKE ?
OR LOWER(tags) LIKE ?
ORDER BY date DESC
""", (query, query, query, query))
transactions = [self._row_to_transaction(row) for row in cursor.fetchall()]
conn.close()
return transactions
def get_by_type(self, transaction_type: TransactionType) -> List[Transaction]:
"""Get transactions by type (income or expense)"""
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM transactions
WHERE type = ?
ORDER BY date DESC
""", (transaction_type.value,))
transactions = [self._row_to_transaction(row) for row in cursor.fetchall()]
conn.close()
return transactions
def get_by_category(self, category: Category) -> List[Transaction]:
"""Get transactions by category"""
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM transactions
WHERE category = ?
ORDER BY date DESC
""", (category.value,))
transactions = [self._row_to_transaction(row) for row in cursor.fetchall()]
conn.close()
return transactions
def get_by_date_range(self, start_date: datetime, end_date: datetime) -> List[Transaction]:
"""Get transactions within a date range"""
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM transactions
WHERE date BETWEEN ? AND ?
ORDER BY date DESC
""", (start_date.isoformat(), end_date.isoformat()))
transactions = [self._row_to_transaction(row) for row in cursor.fetchall()]
conn.close()
return transactions
def get_summary(self) -> dict:
"""Get transaction summary statistics"""
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT
SUM(CASE WHEN type = 'income' THEN amount ELSE 0 END) as total_income,
SUM(CASE WHEN type = 'expense' THEN amount ELSE 0 END) as total_expenses,
COUNT(CASE WHEN type = 'income' THEN 1 END) as income_count,
COUNT(CASE WHEN type = 'expense' THEN 1 END) as expense_count,
COUNT(*) as total_count
FROM transactions
""")
row = cursor.fetchone()
conn.close()
total_income = row['total_income'] or 0
total_expenses = row['total_expenses'] or 0
return {
"total_income": total_income,
"total_expenses": total_expenses,
"net_balance": total_income - total_expenses,
"transaction_count": row['total_count'],
"income_count": row['income_count'],
"expense_count": row['expense_count']
}
def get_category_summary(self) -> List[dict]:
"""Get summary by category"""
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT
category,
SUM(amount) as total_amount,
COUNT(*) as transaction_count
FROM transactions
GROUP BY category
ORDER BY total_amount DESC
""")
rows = cursor.fetchall()
# Calculate total for percentage
total = sum(row['total_amount'] for row in rows)
result = []
for row in rows:
result.append({
"category": row['category'],
"total_amount": row['total_amount'],
"transaction_count": row['transaction_count'],
"percentage": (row['total_amount'] / total * 100) if total > 0 else 0
})
conn.close()
return result