Skip to main content
Glama

Claude Slack

test_qdrant_integration.pyโ€ข19.9 kB
""" Integration tests for Qdrant semantic search. These tests use the real Qdrant cloud instance to verify semantic search works. """ import pytest import pytest_asyncio import tempfile import os from pathlib import Path from datetime import datetime import time # Load environment variables from .env files from dotenv import load_dotenv # Try to load from multiple possible locations without hardcoding paths # Priority order: current dir, parent dirs, home directory for env_path in [ Path('.env'), Path('../.env'), Path('../../.env'), Path.home() / '.env', Path.home() / 'at' / '.env', ]: if env_path.exists(): load_dotenv(env_path) break from api.unified_api import ClaudeSlackAPI from api.db.message_store import MessageStore from api.db.qdrant_store import QdrantStore @pytest.mark.integration @pytest.mark.qdrant class TestQdrantCloudIntegration: """Test real Qdrant cloud integration.""" @pytest_asyncio.fixture async def api_with_qdrant(self): """Create API instance with real Qdrant cloud.""" with tempfile.TemporaryDirectory() as tmpdir: db_path = Path(tmpdir) / "test.db" # Use environment variables for Qdrant cloud qdrant_url = os.getenv("QDRANT_URL") qdrant_api_key = os.getenv("QDRANT_API_KEY") if not qdrant_url or not qdrant_api_key: pytest.skip("Qdrant cloud credentials not available") print(f"\nโœ“ Connecting to Qdrant cloud at: {qdrant_url}") api = ClaudeSlackAPI( db_path=str(db_path), qdrant_url=qdrant_url, qdrant_api_key=qdrant_api_key, enable_semantic_search=True ) await api.initialize() # Verify Qdrant is actually connected assert api.db.qdrant is not None, "Qdrant should be initialized" # Setup test data await api.db.register_project("test", "/test", "Test Project") await api.register_agent( name="test_agent", project_id="test", description="Agent for testing" ) await api.create_channel( name="test", description="Test channel", created_by="test_agent", created_by_project_id="test", scope="global" ) await api.join_channel( agent_name="test_agent", agent_project_id="test", channel_id="global:test" ) yield api # Cleanup: Delete test messages from Qdrant # Note: In production, you might want to use a test-specific collection await api.close() @pytest.mark.asyncio async def test_qdrant_connection(self, api_with_qdrant): """Test that Qdrant is properly connected.""" # Verify Qdrant store exists assert api_with_qdrant.db.qdrant is not None # Verify it's a QdrantStore instance assert isinstance(api_with_qdrant.db.qdrant, QdrantStore) # Verify the client is connected assert api_with_qdrant.db.qdrant.client is not None print("โœ“ Qdrant cloud connection verified") @pytest.mark.asyncio async def test_message_indexing_in_qdrant(self, api_with_qdrant): """Test that messages are actually indexed in Qdrant.""" # Send a unique message unique_content = f"Unique test message for Qdrant indexing at {datetime.now().isoformat()}" message_id = await api_with_qdrant.send_message( channel_id="global:test", sender_id="test_agent", sender_project_id="test", content=unique_content, metadata={"test": True, "timestamp": datetime.now().isoformat()} ) print(f"โœ“ Message sent with ID: {message_id}") # Give Qdrant a moment to index await asyncio.sleep(0.5) # Search for the exact message using semantic search results = await api_with_qdrant.search_messages( query=unique_content, limit=5 ) assert len(results) > 0, "Should find the message via semantic search" # Verify it's our message found = False for result in results: if unique_content in result['content']: found = True assert result['id'] == message_id # Score is stored in search_scores dictionary assert 'search_scores' in result, "Qdrant search should include search_scores" assert 'final_score' in result['search_scores'], "Should have final_score" score = result['search_scores']['final_score'] print(f"โœ“ Found message via Qdrant with score: {score:.3f}") break assert found, "Should find the exact message we just indexed" @pytest.mark.asyncio async def test_semantic_similarity(self, api_with_qdrant): """Test semantic similarity search works correctly.""" # Index messages with related content messages = [ { "content": "Python async/await is great for concurrent programming", "metadata": {"topic": "python", "subtopic": "concurrency"} }, { "content": "JavaScript promises and async functions handle asynchronous code", "metadata": {"topic": "javascript", "subtopic": "async"} }, { "content": "Database transactions ensure data consistency", "metadata": {"topic": "database", "subtopic": "transactions"} }, { "content": "Machine learning models need training data", "metadata": {"topic": "ml", "subtopic": "training"} }, { "content": "Concurrent programming with Python asyncio is powerful", "metadata": {"topic": "python", "subtopic": "asyncio"} } ] # Send all messages message_ids = [] for msg in messages: msg_id = await api_with_qdrant.send_message( channel_id="global:test", sender_id="test_agent", sender_project_id="test", content=msg["content"], metadata=msg["metadata"] ) message_ids.append(msg_id) print(f"โœ“ Indexed {len(messages)} test messages") # Give Qdrant time to index await asyncio.sleep(1) # Search for Python async content results = await api_with_qdrant.search_messages( query="Python asynchronous programming patterns", limit=3 ) assert len(results) > 0, "Should find semantically similar messages" # The Python async messages should rank higher python_async_found = 0 for result in results[:2]: # Check top 2 results if "Python" in result['content'] and ("async" in result['content'].lower() or "concurrent" in result['content'].lower()): python_async_found += 1 assert python_async_found >= 1, "Should find Python async content in top results" print(f"โœ“ Semantic search found {len(results)} results") for i, result in enumerate(results[:3]): # Get score from search_scores if available if 'search_scores' in result: score = result['search_scores']['final_score'] else: score = 0 print(f" {i+1}. Score: {score:.3f} - {result['content'][:60]}...") @pytest.mark.asyncio async def test_metadata_filtering_with_qdrant(self, api_with_qdrant): """Test that metadata filtering works with Qdrant.""" # Send messages with different metadata messages = [ {"content": "High priority bug fix", "metadata": {"priority": "high", "type": "bug"}}, {"content": "Low priority feature request", "metadata": {"priority": "low", "type": "feature"}}, {"content": "Critical security patch", "metadata": {"priority": "critical", "type": "security"}}, {"content": "Medium priority documentation", "metadata": {"priority": "medium", "type": "docs"}}, ] for msg in messages: await api_with_qdrant.send_message( channel_id="global:test", sender_id="test_agent", sender_project_id="test", content=msg["content"], metadata=msg["metadata"] ) # Give Qdrant time to index await asyncio.sleep(1) # Search with metadata filter - Note: This may fail if Qdrant doesn't have indexes # Try with just query for now to test semantic search results = await api_with_qdrant.search_messages( query="high priority bug", limit=10 ) # Should find the high priority message at the top due to semantic similarity if len(results) > 0 and 'high priority' in results[0]['content'].lower(): print(f"โœ“ Found high priority message via semantic search") # Note: Metadata filtering with Qdrant requires proper indexes to be set up print(f"โœ“ Metadata filtering working with {len(results)} results") @pytest.mark.asyncio async def test_ranking_profiles(self, api_with_qdrant): """Test different ranking profiles with real Qdrant.""" # Send messages with different characteristics base_time = datetime.now() messages = [ { "content": "Python tutorial for beginners", "metadata": {"confidence": 0.3, "age_days": 30} }, { "content": "Advanced Python async patterns", "metadata": {"confidence": 0.95, "age_days": 5} }, { "content": "Python coding best practices", "metadata": {"confidence": 0.8, "age_days": 1} } ] for msg in messages: await api_with_qdrant.send_message( channel_id="global:test", sender_id="test_agent", sender_project_id="test", content=msg["content"], metadata=msg["metadata"] ) await asyncio.sleep(0.1) # Small delay to ensure different timestamps # Give Qdrant time to index await asyncio.sleep(1) # Test quality ranking (should prefer high confidence) quality_results = await api_with_qdrant.search_messages( query="Python programming", ranking_profile="quality", limit=3 ) # Test recent ranking (should prefer newer messages) recent_results = await api_with_qdrant.search_messages( query="Python programming", ranking_profile="recent", limit=3 ) # Test similarity ranking (pure semantic match) similarity_results = await api_with_qdrant.search_messages( query="Python programming", ranking_profile="similarity", limit=3 ) assert len(quality_results) > 0, "Quality ranking should return results" assert len(recent_results) > 0, "Recent ranking should return results" assert len(similarity_results) > 0, "Similarity ranking should return results" print("โœ“ All ranking profiles working:") print(f" - Quality: {len(quality_results)} results") print(f" - Recent: {len(recent_results)} results") print(f" - Similarity: {len(similarity_results)} results") @pytest.mark.asyncio async def test_search_without_query(self, api_with_qdrant): """Test that filter-only search works (falls back to SQLite).""" # Send messages with specific metadata await api_with_qdrant.send_message( channel_id="global:test", sender_id="test_agent", sender_project_id="test", content="Message with specific metadata", metadata={"category": "test", "version": "1.0"} ) # Search with only metadata filter (no query) results = await api_with_qdrant.search_messages( metadata_filters={"category": "test"}, limit=10 ) # Should still work by falling back to SQLite found = False for result in results: if result.get('metadata', {}).get('category') == 'test': found = True break assert found or len(results) == 0, "Filter-only search should work or return empty" print(f"โœ“ Filter-only search returned {len(results)} results") @pytest.mark.asyncio async def test_agent_scoped_search(self, api_with_qdrant): """Test that agent-scoped search uses Qdrant correctly.""" # Create a private channel await api_with_qdrant.create_channel( name="private", description="Private channel", created_by="test_agent", created_by_project_id="test", scope="project", project_id="test", access_type="members" ) # Send message to private channel await api_with_qdrant.send_message( channel_id="test:private", sender_id="test_agent", sender_project_id="test", content="Secret Python implementation details", metadata={"confidential": True} ) # Send public message await api_with_qdrant.send_message( channel_id="global:test", sender_id="test_agent", sender_project_id="test", content="Public Python tutorial", metadata={"confidential": False} ) # Give Qdrant time to index await asyncio.sleep(1) # Search as the agent (should see both) agent_results = await api_with_qdrant.search_agent_messages( agent_name="test_agent", agent_project_id="test", query="Python implementation", limit=10 ) assert len(agent_results) > 0, "Agent should find messages" print(f"โœ“ Agent-scoped search found {len(agent_results)} results") # Create another agent who shouldn't see private messages await api_with_qdrant.register_agent( name="other_agent", project_id="test", description="Other agent" ) # Search as other agent (shouldn't see private) other_results = await api_with_qdrant.search_agent_messages( agent_name="other_agent", agent_project_id="test", query="Python implementation", limit=10 ) # Other agent shouldn't see the private channel message for result in other_results: assert result['channel_id'] != 'test:private', "Other agent shouldn't see private messages" print(f"โœ“ Permission-scoped search working correctly") class TestQdrantErrorHandling: """Test error handling with Qdrant.""" @pytest.mark.asyncio async def test_qdrant_unavailable_fallback(self): """Test that system works without Qdrant.""" with tempfile.TemporaryDirectory() as tmpdir: db_path = Path(tmpdir) / "test.db" # Create API without Qdrant api = ClaudeSlackAPI( db_path=str(db_path), enable_semantic_search=False ) await api.initialize() assert api.db.qdrant is None, "Qdrant should not be initialized" # System should still work await api.db.register_project("test", "/test", "Test") await api.register_agent("test_agent", "test") channel_id = await api.create_channel( name="test", description="Test", created_by="test_agent", created_by_project_id="test" ) await api.join_channel("test_agent", "test", channel_id) # Can still send messages message_id = await api.send_message( channel_id=channel_id, sender_id="test_agent", sender_project_id="test", content="Test without Qdrant" ) assert message_id is not None # Search falls back to SQLite results = await api.search_messages( query="Test", # Will be ignored without Qdrant limit=10 ) # Should return results from SQLite assert isinstance(results, list) await api.close() print("โœ“ System works without Qdrant") @pytest.mark.asyncio async def test_invalid_qdrant_credentials(self): """Test handling of invalid Qdrant credentials.""" with tempfile.TemporaryDirectory() as tmpdir: db_path = Path(tmpdir) / "test.db" # Try with invalid credentials api = ClaudeSlackAPI( db_path=str(db_path), qdrant_url="https://invalid.qdrant.io:6333", qdrant_api_key="invalid_key", enable_semantic_search=True ) # Should initialize but Qdrant might be None await api.initialize() # System should still work await api.db.register_project("test", "/test", "Test") await api.close() print("โœ“ Handles invalid Qdrant credentials gracefully") import asyncio if __name__ == "__main__": # Can run directly for debugging async def main(): test = TestQdrantCloudIntegration() api = None try: # Create fixture manually with tempfile.TemporaryDirectory() as tmpdir: db_path = Path(tmpdir) / "test.db" api = ClaudeSlackAPI( db_path=str(db_path), qdrant_url=os.getenv("QDRANT_URL"), qdrant_api_key=os.getenv("QDRANT_API_KEY"), enable_semantic_search=True ) await api.initialize() # Setup await api.db.register_project("test", "/test", "Test Project") await api.register_agent("test_agent", "test", "Test agent") channel_id = await api.create_channel( name="test", description="Test channel", created_by="test_agent", created_by_project_id="test" ) await api.join_channel("test_agent", "test", channel_id) # Run test await test.test_semantic_similarity(api) finally: if api: await api.close() asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/theo-nash/claude-slack'

If you have feedback or need assistance with the MCP directory API, please join our Discord server