Skip to main content
Glama
test_database_integration.py•21 kB
""" Integration tests for database operations """ import pytest import pytest_asyncio from datetime import datetime, timedelta import uuid import asyncio from src.database_factory import DatabaseFactory from src.models import Project, Todo, CalendarEvent, StatusEntry, PersonalData from tests.conftest import generate_test_project, generate_test_todo, generate_test_event class TestSQLiteDatabaseIntegration: """Integration tests for SQLite database operations""" @pytest_asyncio.fixture async def sqlite_db(self, test_config): """Create SQLite database for testing""" test_config.database_type = "sqlite" db = await DatabaseFactory.create_database(test_config) yield db await db.close() @pytest.mark.asyncio async def test_project_operations(self, sqlite_db): """Test complete project CRUD operations""" # Create project project_data = generate_test_project( name="SQLite Test Project", description="Testing SQLite database operations", priority="high" ) project = Project(**project_data) # Add project await sqlite_db.add_project(project) # Get project by ID retrieved_project = await sqlite_db.get_project_by_id(project.id) assert retrieved_project is not None assert retrieved_project.id == project.id assert retrieved_project.name == project.name assert retrieved_project.description == project.description assert retrieved_project.priority == project.priority # Update project retrieved_project.name = "Updated SQLite Test Project" retrieved_project.status = "completed" retrieved_project.updated_date = datetime.now() await sqlite_db.update_project(retrieved_project) # Verify update updated_project = await sqlite_db.get_project_by_id(project.id) assert updated_project.name == "Updated SQLite Test Project" assert updated_project.status == "completed" # List projects all_projects = await sqlite_db.get_projects() assert len(all_projects) >= 1 assert any(p.id == project.id for p in all_projects) # Delete project await sqlite_db.delete_project(project.id) # Verify deletion deleted_project = await sqlite_db.get_project_by_id(project.id) assert deleted_project is None @pytest.mark.asyncio async def test_todo_operations(self, sqlite_db): """Test complete todo CRUD operations""" # Create project first (for foreign key) project_data = generate_test_project() project = Project(**project_data) await sqlite_db.add_project(project) # Create todo todo_data = generate_test_todo( title="SQLite Test Todo", description="Testing SQLite todo operations", project_id=project.id, priority="high" ) todo = Todo(**todo_data) # Add todo await sqlite_db.add_todo(todo) # Get todo by ID retrieved_todo = await sqlite_db.get_todo_by_id(todo.id) assert retrieved_todo is not None assert retrieved_todo.id == todo.id assert retrieved_todo.title == todo.title assert retrieved_todo.project_id == project.id # Update todo retrieved_todo.title = "Updated SQLite Test Todo" retrieved_todo.completed = True retrieved_todo.updated_date = datetime.now() await sqlite_db.update_todo(retrieved_todo) # Verify update updated_todo = await sqlite_db.get_todo_by_id(todo.id) assert updated_todo.title == "Updated SQLite Test Todo" assert updated_todo.completed is True # List todos all_todos = await sqlite_db.get_todos() assert len(all_todos) >= 1 assert any(t.id == todo.id for t in all_todos) # List todos by project project_todos = await sqlite_db.get_todos(project_id=project.id) assert len(project_todos) >= 1 assert all(t.project_id == project.id for t in project_todos) # Delete todo await sqlite_db.delete_todo(todo.id) # Verify deletion deleted_todo = await sqlite_db.get_todo_by_id(todo.id) assert deleted_todo is None # Cleanup await sqlite_db.delete_project(project.id) @pytest.mark.asyncio async def test_calendar_event_operations(self, sqlite_db): """Test calendar event operations""" # Create event event_data = generate_test_event( title="SQLite Test Event", description="Testing SQLite calendar operations" ) event = CalendarEvent(**event_data) # Add event await sqlite_db.add_calendar_event(event) # Get events in date range start_date = datetime.now() end_date = datetime.now() + timedelta(days=7) events = await sqlite_db.get_calendar_events(start_date, end_date) assert len(events) >= 1 found_event = next((e for e in events if e.id == event.id), None) assert found_event is not None assert found_event.title == event.title assert found_event.description == event.description @pytest.mark.asyncio async def test_status_operations(self, sqlite_db): """Test status operations""" # Set initial status status = StatusEntry( id="test_status_1", status="working", message="Working on SQLite tests", emoji="đź’»", created_date=datetime.now() ) await sqlite_db.set_status(status) # Get status retrieved_status = await sqlite_db.get_status() assert retrieved_status is not None assert retrieved_status.id == status.id assert retrieved_status.status == status.status assert retrieved_status.message == status.message # Update status new_status = StatusEntry( id="test_status_2", status="available", message="Available for new tasks", emoji="âś…", created_date=datetime.now() ) await sqlite_db.set_status(new_status) # Verify new status current_status = await sqlite_db.get_status() assert current_status.id == new_status.id assert current_status.status == "available" @pytest.mark.asyncio async def test_personal_data_operations(self, sqlite_db): """Test personal data operations""" # Set string data string_data = PersonalData( key="test_string", value="test_value", data_type="string", created_date=datetime.now(), updated_date=datetime.now() ) await sqlite_db.set_personal_data(string_data) # Get string data retrieved_data = await sqlite_db.get_personal_data("test_string") assert retrieved_data is not None assert retrieved_data.key == "test_string" assert retrieved_data.value == "test_value" assert retrieved_data.data_type == "string" # Set JSON data json_data = PersonalData( key="test_json", value={"nested": {"key": "value"}, "array": [1, 2, 3]}, data_type="json", created_date=datetime.now(), updated_date=datetime.now() ) await sqlite_db.set_personal_data(json_data) # Get JSON data retrieved_json = await sqlite_db.get_personal_data("test_json") assert retrieved_json is not None assert retrieved_json.value["nested"]["key"] == "value" assert retrieved_json.value["array"] == [1, 2, 3] # Get all personal data all_data = await sqlite_db.get_all_personal_data() assert len(all_data) >= 2 keys = [d.key for d in all_data] assert "test_string" in keys assert "test_json" in keys @pytest.mark.asyncio async def test_concurrent_operations(self, sqlite_db): """Test concurrent database operations""" # Create multiple projects concurrently async def create_project(i): project_data = generate_test_project( name=f"Concurrent Project {i}", description=f"Project created concurrently {i}" ) project = Project(**project_data) await sqlite_db.add_project(project) return project.id # Run concurrent operations tasks = [create_project(i) for i in range(5)] project_ids = await asyncio.gather(*tasks, return_exceptions=True) # Verify all projects were created successful_ids = [pid for pid in project_ids if isinstance(pid, str)] assert len(successful_ids) >= 3 # At least 3 should succeed # Verify projects exist all_projects = await sqlite_db.get_projects() for project_id in successful_ids: project = await sqlite_db.get_project_by_id(project_id) assert project is not None @pytest.mark.asyncio async def test_data_integrity(self, sqlite_db): """Test data integrity and foreign key constraints""" # Create project project_data = generate_test_project() project = Project(**project_data) await sqlite_db.add_project(project) # Create todo with project reference todo_data = generate_test_todo(project_id=project.id) todo = Todo(**todo_data) await sqlite_db.add_todo(todo) # Delete project (should handle foreign key constraint) await sqlite_db.delete_project(project.id) # Todo should still exist but project_id might be null remaining_todo = await sqlite_db.get_todo_by_id(todo.id) assert remaining_todo is not None # Foreign key behavior depends on database configuration # In SQLite with CASCADE or SET NULL, the behavior would be defined @pytest.mark.asyncio async def test_transaction_rollback(self, sqlite_db): """Test transaction rollback behavior""" # This test would require transaction support in the database interface # For now, test basic error handling # Try to add invalid project try: invalid_project = Project( id="", # Invalid empty ID name="Invalid Project", description="This should fail", created_date=datetime.now(), updated_date=datetime.now() ) await sqlite_db.add_project(invalid_project) assert False, "Should have failed with invalid data" except Exception: # Expected to fail pass # Verify database state is consistent all_projects = await sqlite_db.get_projects() # Should not contain the invalid project assert not any(p.name == "Invalid Project" for p in all_projects) class TestTinyDBIntegration: """Integration tests for TinyDB database operations""" @pytest_asyncio.fixture async def tinydb(self, test_config): """Create TinyDB database for testing""" test_config.database_type = "tinydb" db = await DatabaseFactory.create_database(test_config) yield db await db.close() @pytest.mark.asyncio async def test_basic_operations(self, tinydb): """Test basic TinyDB operations""" # Create and add project project_data = generate_test_project( name="TinyDB Test Project", description="Testing TinyDB operations" ) project = Project(**project_data) await tinydb.add_project(project) # Retrieve project retrieved_project = await tinydb.get_project_by_id(project.id) assert retrieved_project is not None assert retrieved_project.name == project.name # List projects all_projects = await tinydb.get_projects() assert len(all_projects) >= 1 assert any(p.id == project.id for p in all_projects) @pytest.mark.asyncio async def test_json_serialization(self, tinydb): """Test JSON serialization with TinyDB""" # Test complex data structure project_data = generate_test_project( tags=["json", "serialization", "test"], ) project_data["metadata"] = { "nested": {"key": "value"}, "array": [1, 2, 3], "boolean": True, "null": None } project = Project(**project_data) await tinydb.add_project(project) # Retrieve and verify serialization retrieved_project = await tinydb.get_project_by_id(project.id) assert retrieved_project is not None assert retrieved_project.tags == ["json", "serialization", "test"] # TinyDB should handle complex data structures if hasattr(retrieved_project, 'metadata'): assert retrieved_project.metadata["nested"]["key"] == "value" @pytest.mark.asyncio async def test_date_handling(self, tinydb): """Test date serialization/deserialization with TinyDB""" now = datetime.now() future_date = now + timedelta(days=7) todo_data = generate_test_todo( title="Date Test Todo", due_date=future_date ) todo = Todo(**todo_data) await tinydb.add_todo(todo) # Retrieve and verify dates retrieved_todo = await tinydb.get_todo_by_id(todo.id) assert retrieved_todo is not None assert retrieved_todo.due_date is not None # Date should be preserved (allowing for serialization differences) if isinstance(retrieved_todo.due_date, datetime): # Allow small differences due to serialization time_diff = abs((retrieved_todo.due_date - future_date).total_seconds()) assert time_diff < 1 # Within 1 second class TestPostgreSQLIntegration: """Integration tests for PostgreSQL database operations""" @pytest_asyncio.fixture async def postgres_db(self, test_config): """Create PostgreSQL database for testing""" # Skip if PostgreSQL not available pytest.skip("PostgreSQL integration tests require running PostgreSQL instance") test_config.database_type = "postgresql" test_config.pgvector_connection_string = "postgresql://test:test@localhost:5432/test_db" try: db = await DatabaseFactory.create_database(test_config) yield db await db.close() except Exception as e: pytest.skip(f"PostgreSQL not available: {e}") @pytest.mark.asyncio async def test_vector_search_operations(self, postgres_db): """Test PostgreSQL vector search operations""" # Create projects with embeddings project1 = Project( id=str(uuid.uuid4()), name="AI Project", description="Artificial intelligence and machine learning project", status="active", priority="high", tags=["ai", "ml", "python"], created_date=datetime.now(), updated_date=datetime.now() ) project2 = Project( id=str(uuid.uuid4()), name="Web Development", description="Frontend web development with React", status="active", priority="medium", tags=["web", "react", "javascript"], created_date=datetime.now(), updated_date=datetime.now() ) # Add mock embeddings project1.__dict__['embedding'] = [0.1] * 1536 # Mock AI-related embedding project2.__dict__['embedding'] = [0.9] * 1536 # Mock web-related embedding await postgres_db.add_project(project1) await postgres_db.add_project(project2) # Test semantic search if hasattr(postgres_db, 'semantic_search_projects'): query_embedding = [0.1] * 1536 # Should be similar to AI project results = await postgres_db.semantic_search_projects( query_embedding, limit=5, similarity_threshold=0.7 ) assert len(results) >= 1 # First result should be the AI project (most similar) best_match, similarity = results[0] assert best_match.id == project1.id assert similarity > 0.7 @pytest.mark.asyncio async def test_schema_isolation(self, postgres_db): """Test schema-based tenant isolation""" # This test would require multiple database connections # with different schema configurations # For now, verify that tables exist in expected schema if hasattr(postgres_db, 'schema'): assert postgres_db.schema is not None # Verify basic operations work with schema project_data = generate_test_project(name="Schema Test Project") project = Project(**project_data) await postgres_db.add_project(project) retrieved_project = await postgres_db.get_project_by_id(project.id) assert retrieved_project is not None assert retrieved_project.name == project.name class TestDatabaseFactory: """Test database factory operations""" @pytest.mark.asyncio async def test_create_sqlite_database(self, test_config): """Test creating SQLite database via factory""" test_config.database_type = "sqlite" db = await DatabaseFactory.create_database(test_config) assert db is not None # Test basic operation project_data = generate_test_project() project = Project(**project_data) await db.add_project(project) retrieved_project = await db.get_project_by_id(project.id) assert retrieved_project is not None assert retrieved_project.id == project.id await db.close() @pytest.mark.asyncio async def test_create_tinydb_database(self, test_config): """Test creating TinyDB database via factory""" test_config.database_type = "tinydb" db = await DatabaseFactory.create_database(test_config) assert db is not None # Test basic operation project_data = generate_test_project() project = Project(**project_data) await db.add_project(project) retrieved_project = await db.get_project_by_id(project.id) assert retrieved_project is not None assert retrieved_project.id == project.id await db.close() @pytest.mark.asyncio async def test_invalid_database_type(self, test_config): """Test factory with invalid database type""" test_config.database_type = "invalid_db_type" with pytest.raises(ValueError, match="Unsupported database type"): await DatabaseFactory.create_database(test_config) @pytest.mark.asyncio async def test_database_connection_management(self, test_config): """Test proper database connection management""" test_config.database_type = "sqlite" # Create multiple databases db1 = await DatabaseFactory.create_database(test_config) db2 = await DatabaseFactory.create_database(test_config) # Both should be functional assert db1 is not None assert db2 is not None # Test operations on both project1_data = generate_test_project(name="DB1 Project") project2_data = generate_test_project(name="DB2 Project") project1 = Project(**project1_data) project2 = Project(**project2_data) await db1.add_project(project1) await db2.add_project(project2) # Verify isolation (if using different database files) db1_projects = await db1.get_projects() db2_projects = await db2.get_projects() # Both should have at least their own project assert len(db1_projects) >= 1 assert len(db2_projects) >= 1 # Close connections await db1.close() await db2.close()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/swapnilsurdi/mcp-pa'

If you have feedback or need assistance with the MCP directory API, please join our Discord server