Skip to main content
Glama

Claude Slack

test_message_store.pyโ€ข19.2 kB
""" Test suite for MessageStore. Tests coordination between SQLite and Qdrant. """ import pytest import pytest_asyncio import tempfile from pathlib import Path from datetime import datetime, timedelta import json from api.db.message_store import MessageStore from api.ranking import RankingProfiles @pytest_asyncio.fixture async def message_store(): """Provide a clean MessageStore instance for each test.""" with tempfile.TemporaryDirectory() as tmpdir: db_path = Path(tmpdir) / "test.db" qdrant_path = Path(tmpdir) / "qdrant" store = MessageStore( db_path=str(db_path), qdrant_config={"qdrant_path": str(qdrant_path)} ) await store.initialize() yield store @pytest_asyncio.fixture async def populated_store(message_store): """Provide MessageStore with test data.""" # Register projects and agents await message_store.register_project("proj1", "/path/to/proj1", "Project 1") await message_store.register_agent( name="alice", project_id="proj1", description="Test agent Alice", dm_policy="open", discoverable="public" ) await message_store.register_agent( name="bob", project_id="proj1", description="Test agent Bob", dm_policy="open", discoverable="public" ) # Create channels await message_store.create_channel( channel_id="global:general", channel_type="channel", access_type="open", scope="global", name="general", description="General discussion" ) await message_store.create_channel( channel_id="proj1:dev", channel_type="channel", access_type="open", scope="project", name="dev", project_id="proj1", description="Development discussion" ) # Add members to channels await message_store.add_channel_member( channel_id="global:general", agent_name="alice", agent_project_id="proj1" ) await message_store.add_channel_member( channel_id="global:general", agent_name="bob", agent_project_id="proj1" ) await message_store.add_channel_member( channel_id="proj1:dev", agent_name="alice", agent_project_id="proj1" ) await message_store.add_channel_member( channel_id="proj1:dev", agent_name="bob", agent_project_id="proj1" ) # Add test messages with different content for search test_messages = [ { "channel_id": "global:general", "sender_id": "alice", "sender_project_id": "proj1", "content": "How to implement async/await in Python?", "metadata": {"confidence": 0.8, "topic": "python"} }, { "channel_id": "global:general", "sender_id": "bob", "sender_project_id": "proj1", "content": "Use async def for coroutine functions", "metadata": {"confidence": 0.95, "topic": "python"} }, { "channel_id": "proj1:dev", "sender_id": "alice", "sender_project_id": "proj1", "content": "Working on the authentication module", "metadata": {"confidence": 0.7, "module": "auth"} }, { "channel_id": "proj1:dev", "sender_id": "bob", "sender_project_id": "proj1", "content": "Fixed the database connection pool issue", "metadata": {"confidence": 0.9, "module": "database"} } ] for msg in test_messages: await message_store.send_message(**msg) yield message_store class TestMessageStoreBasics: """Test basic MessageStore operations.""" @pytest.mark.asyncio async def test_initialization(self, message_store): """Test store initialization.""" assert message_store.sqlite is not None assert message_store.qdrant is not None # Should be initialized with qdrant_config @pytest.mark.asyncio async def test_save_and_retrieve_message(self, populated_store): """Test saving and retrieving messages.""" # Save a new message message_id = await populated_store.send_message( channel_id="global:general", sender_id="alice", sender_project_id="proj1", content="Test message for retrieval", metadata={"test": True} ) assert message_id is not None # Retrieve messages messages = await populated_store.get_messages( channel_ids=["global:general"] ) # Find our test message test_msg = next((m for m in messages if m["id"] == message_id), None) assert test_msg is not None assert test_msg["content"] == "Test message for retrieval" assert test_msg["metadata"]["test"] is True class TestPermissionBasedRetrieval: """Test permission-based message retrieval.""" @pytest.mark.asyncio async def test_agent_message_permissions(self, populated_store): """Test get_agent_messages respects permissions.""" # Create a private channel await populated_store.create_channel( channel_id="proj1:private", channel_type="channel", access_type="members", scope="project", name="private", project_id="proj1", description="Private channel" ) # Add only alice as member await populated_store.add_channel_member( channel_id="proj1:private", agent_name="alice", agent_project_id="proj1", can_send=True, can_invite=False ) # Post a message await populated_store.send_message( channel_id="proj1:private", sender_id="alice", sender_project_id="proj1", content="Private message only Alice can see" ) # Alice should see the message alice_messages = await populated_store.get_agent_messages( agent_name="alice", agent_project_id="proj1" ) private_msgs = [m for m in alice_messages if m["channel_id"] == "proj1:private"] assert len(private_msgs) == 1 # Bob should NOT see the message bob_messages = await populated_store.get_agent_messages( agent_name="bob", agent_project_id="proj1" ) private_msgs = [m for m in bob_messages if m["channel_id"] == "proj1:private"] assert len(private_msgs) == 0 @pytest.mark.asyncio async def test_get_messages_no_permissions(self, populated_store): """Test get_messages (admin) ignores permissions.""" # Create a private channel await populated_store.create_channel( channel_id="proj1:secret", channel_type="channel", access_type="members", scope="project", name="secret", project_id="proj1" ) # Add alice as member first so she can send await populated_store.sqlite.add_channel_member( channel_id="proj1:secret", agent_name="alice", agent_project_id="proj1" ) # Post a message await populated_store.sqlite.send_message( channel_id="proj1:secret", sender_id="alice", sender_project_id="proj1", content="Secret message" ) # Admin get_messages_admin should still see it messages = await populated_store.sqlite.get_messages_admin( channel_ids=["proj1:secret"] ) assert len(messages) == 1 assert messages[0]["content"] == "Secret message" class TestSemanticSearch: """Test semantic search capabilities.""" @pytest.mark.asyncio async def test_search_messages(self, populated_store): """Test basic semantic search.""" results = await populated_store.search_messages( query="Python async programming", limit=5 ) assert len(results) > 0 # Should find messages about Python and async contents = [r["content"] for r in results] assert any("async" in c or "Python" in c for c in contents) @pytest.mark.asyncio async def test_search_with_channel_filter(self, populated_store): """Test search with channel filtering.""" results = await populated_store.search_messages( query="database", channel_ids=["proj1:dev"], limit=5 ) assert len(results) > 0 # All results should be from proj1:dev for result in results: assert result["channel_id"] == "proj1:dev" @pytest.mark.asyncio async def test_search_with_metadata_filter(self, populated_store): """Test search with metadata filtering.""" results = await populated_store.search_messages( query="Python", metadata_filters={"confidence": {"$gte": 0.9}}, limit=5 ) # Should only find high-confidence messages for result in results: assert result["metadata"]["confidence"] >= 0.9 @pytest.mark.asyncio async def test_search_ranking_profiles(self, populated_store): """Test different ranking profiles.""" # Add a recent message await populated_store.send_message( channel_id="global:general", sender_id="alice", sender_project_id="proj1", content="Latest Python tips and tricks", metadata={"confidence": 0.5} ) # Search with RECENT_PRIORITY recent_results = await populated_store.search_messages( query="Python", ranking_profile=RankingProfiles.RECENT_PRIORITY, limit=3 ) # Search with QUALITY_PRIORITY quality_results = await populated_store.search_messages( query="Python", ranking_profile=RankingProfiles.QUALITY_PRIORITY, limit=3 ) # Results should be different due to ranking assert len(recent_results) > 0 assert len(quality_results) > 0 # The exact order depends on the ranking algorithm @pytest.mark.asyncio async def test_search_agent_messages(self, populated_store): """Test permission-scoped search.""" # Create private channel with searchable content await populated_store.create_channel( channel_id="proj1:security", channel_type="channel", access_type="members", scope="project", name="security", project_id="proj1" ) # Only alice has access await populated_store.add_channel_member( channel_id="proj1:security", agent_name="alice", agent_project_id="proj1", can_send=True ) # Add security-related message await populated_store.send_message( channel_id="proj1:security", sender_id="alice", sender_project_id="proj1", content="Critical security vulnerability in authentication", metadata={"severity": "high"} ) # Alice should find it alice_results = await populated_store.search_agent_messages( agent_name="alice", agent_project_id="proj1", query="security vulnerability", limit=5 ) security_msgs = [r for r in alice_results if "security" in r["channel_id"]] assert len(security_msgs) > 0 # Bob should NOT find it bob_results = await populated_store.search_agent_messages( agent_name="bob", agent_project_id="proj1", query="security vulnerability", limit=5 ) security_msgs = [r for r in bob_results if "security" in r["channel_id"]] assert len(security_msgs) == 0 class TestChannelOperations: """Test channel operations through MessageStore.""" @pytest.mark.asyncio async def test_list_channels_for_agent(self, populated_store): """Test listing channels accessible to an agent.""" # Use sqlite's get_agent_channels method channels = await populated_store.sqlite.get_agent_channels( agent_name="alice", agent_project_id="proj1" ) channel_ids = {ch["id"] for ch in channels} # Alice should see global and project channels she's a member of assert "global:general" in channel_ids assert "proj1:dev" in channel_ids @pytest.mark.asyncio async def test_channel_membership(self, populated_store): """Test channel membership operations.""" # Create a members-only channel await populated_store.create_channel( channel_id="proj1:team", channel_type="channel", access_type="members", scope="project", name="team", project_id="proj1" ) # Add alice with full permissions await populated_store.add_channel_member( channel_id="proj1:team", agent_name="alice", agent_project_id="proj1", can_send=True, can_invite=True ) # Check membership members = await populated_store.sqlite.get_channel_members("proj1:team") assert len(members) == 1 assert members[0]["agent_name"] == "alice" class TestEdgeCases: """Test edge cases and error handling.""" @pytest.mark.asyncio async def test_empty_search_results(self, populated_store): """Test search with no matches.""" results = await populated_store.search_messages( query="quantum computing blockchain AI cryptocurrency", limit=5 ) # Should return results even if no perfect match (semantic search) assert isinstance(results, list) @pytest.mark.asyncio async def test_metadata_persistence(self, populated_store): """Test complex metadata persistence.""" complex_metadata = { "nested": { "level1": { "level2": ["item1", "item2"], "data": {"key": "value"} } }, "array": [1, 2, 3], "boolean": True, "number": 42.5 } message_id = await populated_store.send_message( channel_id="global:general", sender_id="alice", sender_project_id="proj1", content="Message with complex metadata", metadata=complex_metadata ) messages = await populated_store.get_messages( message_ids=[message_id] ) assert len(messages) == 1 retrieved_metadata = messages[0]["metadata"] assert retrieved_metadata["nested"]["level1"]["level2"] == ["item1", "item2"] assert retrieved_metadata["array"] == [1, 2, 3] assert retrieved_metadata["boolean"] is True assert retrieved_metadata["number"] == 42.5 @pytest.mark.asyncio async def test_filter_based_search(self, populated_store): """Test search without query (filter-only).""" # Search only with metadata filter, no query # Note: The module:auth message exists in our test data results = await populated_store.search_messages( metadata_filters={"module": "auth"}, limit=10 ) # Should find the authentication module message if len(results) > 0: for result in results: assert result["metadata"]["module"] == "auth" else: # If no results, it's because Qdrant may not support pure filter search # without a query. This is acceptable behavior. pass class TestQdrantIntegration: """Test Qdrant-specific functionality.""" @pytest.mark.asyncio async def test_qdrant_disabled(self): """Test MessageStore without Qdrant.""" with tempfile.TemporaryDirectory() as tmpdir: db_path = Path(tmpdir) / "test.db" # Create MessageStore without Qdrant config store = MessageStore( db_path=str(db_path), qdrant_config=None ) await store.initialize() assert store.qdrant is None # Should be None when no config provided # Should still work for basic operations await store.register_project("test", "/test", "Test") projects = await store.list_projects() assert len(projects) == 1 @pytest.mark.asyncio async def test_collection_management(self, message_store): """Test Qdrant collection is properly managed.""" if message_store.qdrant: # The collection should be created during initialization assert message_store.qdrant is not None # Collection operations are internal to QdrantStore # Verify we can index and search await message_store.register_project("test", "/test", "Test") await message_store.create_channel( channel_id="test:chan", channel_type="channel", access_type="open", scope="global", name="test" ) # Register an agent first await message_store.sqlite.register_agent( name="test", project_id="test", description="Test agent" ) # Add agent as channel member await message_store.sqlite.add_channel_member( channel_id="test:chan", agent_name="test", agent_project_id="test" ) # This should index in Qdrant message_id = await message_store.sqlite.send_message( channel_id="test:chan", sender_id="test", sender_project_id="test", content="Test content for Qdrant" ) # If Qdrant is working, this should not fail if message_store.qdrant: from datetime import datetime await message_store.qdrant.index_message( message_id=message_id, content="Test content for Qdrant", metadata={}, channel_id="test:chan", sender_id="test", timestamp=datetime.now() )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/theo-nash/claude-slack'

If you have feedback or need assistance with the MCP directory API, please join our Discord server