Skip to main content
Glama
ShivamPansuriya

Dynamic Per-User Tool Generation MCP Server

test_elasticsearch_data.py5.01 kB
#!/usr/bin/env python3 """ Test to check what data exists in Elasticsearch indices. """ import asyncio import json import logging from elasticsearch import AsyncElasticsearch # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) async def check_elasticsearch_data(): """Check what data exists in Elasticsearch.""" es = AsyncElasticsearch(["http://localhost:9200"]) try: # Check cluster health health = await es.cluster.health() logger.info(f"Cluster health: {health['status']}") # Check indices indices = await es.cat.indices(format="json") logger.info(f"\nFound {len(indices)} indices") apolo_indices = [idx for idx in indices if idx['index'].startswith('apolo_')] logger.info(f"Apolo indices: {len(apolo_indices)}") for idx in apolo_indices: logger.info(f" - {idx['index']}: {idx['docs.count']} docs") # Check impact index logger.info("\n" + "=" * 80) logger.info("Checking apolo_impact index") logger.info("=" * 80) if await es.indices.exists(index="apolo_impact"): # Get sample documents result = await es.search( index="apolo_impact", body={ "size": 10, "query": {"match_all": {}} } ) logger.info(f"Total impact documents: {result['hits']['total']['value']}") logger.info(f"\nSample impact documents:") for hit in result['hits']['hits']: doc = hit['_source'] logger.info(f" ID: {doc.get('id')}, Name: {doc.get('name')}, Display: {doc.get('displayName')}") else: logger.warning("apolo_impact index does not exist!") # Check urgency index logger.info("\n" + "=" * 80) logger.info("Checking apolo_urgency index") logger.info("=" * 80) if await es.indices.exists(index="apolo_urgency"): result = await es.search( index="apolo_urgency", body={ "size": 10, "query": {"match_all": {}} } ) logger.info(f"Total urgency documents: {result['hits']['total']['value']}") logger.info(f"\nSample urgency documents:") for hit in result['hits']['hits']: doc = hit['_source'] logger.info(f" ID: {doc.get('id')}, Name: {doc.get('name')}, Display: {doc.get('displayName')}") else: logger.warning("apolo_urgency index does not exist!") # Check priority index logger.info("\n" + "=" * 80) logger.info("Checking apolo_priority index") logger.info("=" * 80) if await es.indices.exists(index="apolo_priority"): result = await es.search( index="apolo_priority", body={ "size": 10, "query": {"match_all": {}} } ) logger.info(f"Total priority documents: {result['hits']['total']['value']}") logger.info(f"\nSample priority documents:") for hit in result['hits']['hits']: doc = hit['_source'] logger.info(f" ID: {doc.get('id')}, Name: {doc.get('name')}, Display: {doc.get('displayName')}") else: logger.warning("apolo_priority index does not exist!") # Test search query logger.info("\n" + "=" * 80) logger.info("Testing search query for 'high' in impact") logger.info("=" * 80) if await es.indices.exists(index="apolo_impact"): # Try different search queries queries = [ {"match": {"name": "high"}}, {"match": {"displayName": "high"}}, {"multi_match": {"query": "high", "fields": ["name", "displayName"]}}, {"query_string": {"query": "*high*", "fields": ["name", "displayName"]}}, ] for i, query in enumerate(queries, 1): result = await es.search( index="apolo_impact", body={"size": 5, "query": query} ) logger.info(f"\nQuery {i}: {json.dumps(query)}") logger.info(f"Results: {result['hits']['total']['value']} hits") for hit in result['hits']['hits'][:3]: doc = hit['_source'] logger.info(f" - ID: {doc.get('id')}, Name: {doc.get('name')}, Score: {hit['_score']}") finally: await es.close() if __name__ == "__main__": asyncio.run(check_elasticsearch_data())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ShivamPansuriya/MCP-server-Python'

If you have feedback or need assistance with the MCP directory API, please join our Discord server