tasks_sqlite.py•3.45 kB
import sqlite3
from dataclasses import dataclass, field
from .tasks_interfaces import DatabaseABC
@dataclass
class Task:
id: int
title: str
description: str
status: int
created_at: str
@dataclass
class Database(DatabaseABC[Task]):
path: str = field(default=":memory:")
def __post_init__(self):
self.conn = sqlite3.connect(self.path)
self._create_tables()
def _create_tables(self) -> None:
self.conn.execute(
"""
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
description TEXT,
status INTEGER NOT NULL CHECK (status IN (0, 1, 2)) DEFAULT 0,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
)
"""
)
self.conn.commit()
def add_task(self, title: str, description: str = "") -> Task:
cursor = self.conn.execute(
"INSERT INTO tasks (title, description, status, created_at) VALUES (?, ?, 0, datetime('now'))",
(title, description),
)
self.conn.commit()
lastrowid = cursor.lastrowid
return self.get_task(lastrowid)
def get_tasks(self, limit: int = 10) -> list[Task]:
cursor = self.conn.execute(
"SELECT id, title, description, status, created_at FROM tasks LIMIT ?",
(limit,),
)
rows = cursor.fetchall()
return [Task(*row) for row in rows]
def get_task(self, task_id: int) -> Task | None:
cursor = self.conn.execute(
"SELECT id, title, description, status, created_at FROM tasks WHERE id = ?",
(task_id,),
)
row = cursor.fetchone()
return Task(*row) if row else None
def update_task(
self, task_id: int, title: str, description: str, status: int
) -> Task:
self.conn.execute(
"UPDATE tasks SET title = ?, description = ?, status = ? WHERE id = ?",
(title, description, status, task_id),
)
self.conn.commit()
return self.get_task(task_id)
def delete_task(self, task_id: int) -> None:
self.conn.execute("DELETE FROM tasks WHERE id = ?", (task_id,))
self.conn.commit()
def filter_tasks(
self,
title: str = "",
description: str = "",
status: int = None,
created_at: str = "",
limit: int = 20,
) -> list[Task]:
query = "SELECT id, title, description, status, created_at FROM tasks WHERE 1=1"
params = []
if title:
query += " AND title LIKE ?"
params.append(f"%{title}%")
if description:
query += " AND description LIKE ?"
params.append(f"%{description}%")
if status is not None:
query += " AND status = ?"
params.append(status)
if created_at:
query += " AND created_at LIKE ?"
params.append(f"%{created_at}%")
query += " LIMIT ?"
params.append(limit)
cursor = self.conn.execute(query, tuple(params))
rows = cursor.fetchall()
return [Task(*row) for row in rows]
def get_schema(self) -> dict:
schema = self.conn.execute("PRAGMA table_info(tasks)").fetchall()
return {column[1]: column[2] for column in schema}
def close(self) -> None:
self.conn.close()