"""Advanced tests for database functionality.
Tests for edge cases, thread safety, and error handling.
"""
import sqlite3
import tempfile
from pathlib import Path
import pytest
from mcp_task_aggregator.storage import Database
class TestDatabaseAdvanced:
"""Advanced database tests for edge cases."""
def test_database_creates_parent_directory(self):
"""Test database creates parent directory if missing."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create a path with non-existent parent directory
db_path = Path(tmpdir) / "subdir" / "nested" / "test.db"
db = Database(db_path)
assert db_path.parent.exists()
assert db_path.exists()
db.close()
def test_database_connection_context_manager(self):
"""Test database connection context manager."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test.db"
db = Database(db_path)
with db.connection() as conn:
assert conn is not None
assert isinstance(conn, sqlite3.Connection)
cursor = conn.cursor()
cursor.execute("SELECT 1")
result = cursor.fetchone()
assert result[0] == 1
db.close()
def test_database_cursor_context_manager(self):
"""Test database cursor context manager."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test.db"
db = Database(db_path)
with db.cursor() as cursor:
assert cursor is not None
cursor.execute("SELECT 2")
result = cursor.fetchone()
assert result[0] == 2
db.close()
def test_database_execute_with_params(self):
"""Test database execute with parameters."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test.db"
db = Database(db_path)
# Insert with params
db.execute(
"INSERT INTO tags (name) VALUES (?)",
("test-tag",),
)
# Query with params
row = db.fetchone("SELECT name FROM tags WHERE name = ?", ("test-tag",))
assert row is not None
assert row["name"] == "test-tag"
db.close()
def test_database_execute_without_params(self):
"""Test database execute without parameters."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test.db"
db = Database(db_path)
# Execute without params
cursor = db.execute("SELECT COUNT(*) as count FROM tags")
row = cursor.fetchone()
assert row["count"] == 0
db.close()
def test_database_executemany(self):
"""Test database executemany for batch operations."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test.db"
db = Database(db_path)
# Insert multiple tags
tags = [("tag1",), ("tag2",), ("tag3",)]
db.executemany("INSERT INTO tags (name) VALUES (?)", tags)
# Verify all were inserted
rows = db.fetchall("SELECT name FROM tags ORDER BY name")
assert len(rows) == 3
assert [row["name"] for row in rows] == ["tag1", "tag2", "tag3"]
db.close()
def test_database_fetchone_returns_none_when_no_results(self):
"""Test fetchone returns None when no results."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test.db"
db = Database(db_path)
row = db.fetchone("SELECT * FROM tags WHERE name = ?", ("nonexistent",))
assert row is None
db.close()
def test_database_fetchall_returns_empty_list(self):
"""Test fetchall returns empty list when no results."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test.db"
db = Database(db_path)
rows = db.fetchall("SELECT * FROM tags WHERE name = ?", ("nonexistent",))
assert rows == []
db.close()
def test_database_foreign_keys_enabled(self):
"""Test foreign keys are enabled."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test.db"
db = Database(db_path)
# Check foreign keys pragma
row = db.fetchone("PRAGMA foreign_keys")
assert row is not None
# foreign_keys pragma returns integer (0 or 1)
assert row[0] == 1
db.close()
def test_database_rollback_on_error(self):
"""Test database rolls back on error."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test.db"
db = Database(db_path)
# Insert a tag
db.execute("INSERT INTO tags (name) VALUES (?)", ("test-tag",))
# Try to insert duplicate (should fail due to UNIQUE constraint)
with pytest.raises(sqlite3.IntegrityError), db.connection():
db.execute("INSERT INTO tags (name) VALUES (?)", ("test-tag",))
# Verify rollback - only one tag exists
rows = db.fetchall("SELECT * FROM tags")
assert len(rows) == 1
db.close()
def test_database_path_property(self):
"""Test database path property returns correct path."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test.db"
db = Database(db_path)
assert db.path == db_path
assert isinstance(db.path, Path)
db.close()
def test_database_close_connection(self):
"""Test database close properly closes connection."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test.db"
db = Database(db_path)
# Access connection to create it
_ = db._get_connection()
assert hasattr(db._local, "connection")
assert db._local.connection is not None
# Close database
db.close()
# Verify connection is cleared
assert db._local.connection is None