#!/usr/bin/env python3
"""
Simple Elasticsearch MCP Server using FastMCP
"""
import asyncio
import json
import logging
import os
import sys
from typing import Any, Dict
import httpx
from mcp.server.fastmcp import FastMCP
# Force unbuffered output for Docker logging
sys.stdout.reconfigure(line_buffering=True)
sys.stderr.reconfigure(line_buffering=True)
# Configure logging with timestamps
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
stream=sys.stdout,
force=True
)
logger = logging.getLogger("elasticsearch-mcp")
# Elasticsearch configuration - use environment variable
ELASTICSEARCH_URL = os.getenv("ES_URL", "http://localhost:9400")
# Connection configuration for concurrent requests
MAX_CONNECTIONS = int(os.getenv("MAX_CONNECTIONS", "100"))
MAX_KEEPALIVE_CONNECTIONS = int(os.getenv("MAX_KEEPALIVE_CONNECTIONS", "20"))
CONNECTION_TIMEOUT = int(os.getenv("CONNECTION_TIMEOUT", "30"))
REQUEST_TIMEOUT = int(os.getenv("REQUEST_TIMEOUT", "30"))
# Create shared async client with connection pooling
# This allows multiple concurrent requests to reuse connections
http_client = httpx.AsyncClient(
timeout=httpx.Timeout(
connect=CONNECTION_TIMEOUT,
read=REQUEST_TIMEOUT,
write=REQUEST_TIMEOUT,
pool=REQUEST_TIMEOUT
),
limits=httpx.Limits(
max_connections=MAX_CONNECTIONS,
max_keepalive_connections=MAX_KEEPALIVE_CONNECTIONS
),
http2=True, # Enable HTTP/2 for multiplexing
verify=False # For self-signed certs in development
)
# Create FastMCP server instance
mcp = FastMCP("elasticsearch-mcp")
@mcp.tool()
async def list_indices(indexPattern: str = "*") -> str:
"""List all available Elasticsearch indices matching the index pattern"""
try:
if indexPattern == "*":
# Get all indices when pattern is "*"
response = await http_client.get(f"{ELASTICSEARCH_URL}/_cat/indices?format=json")
else:
# Use pattern to filter indices
response = await http_client.get(f"{ELASTICSEARCH_URL}/_cat/indices/{indexPattern}?format=json")
response.raise_for_status()
indices = response.json()
result = []
for index in indices:
result.append({
"index": index.get("index", ""),
"docs_count": index.get("docs.count", "0"),
"store_size": index.get("store.size", "0"),
"health": index.get("health", "unknown")
})
return json.dumps(result, indent=2)
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error listing indices: {e.response.status_code} - {e.response.text}")
return f"HTTP Error {e.response.status_code}: {e.response.text}"
except httpx.RequestError as e:
logger.error(f"Network error listing indices: {e}")
return f"Network Error: {str(e)}"
except json.JSONDecodeError as e:
logger.error(f"JSON decode error listing indices: {e}")
return f"Invalid JSON response from Elasticsearch"
except Exception as e:
logger.error(f"Unexpected error listing indices: {e}")
return f"Unexpected error: {str(e)}"
@mcp.tool()
async def search(index: str, queryBody: Dict[str, Any]) -> str:
"""Perform an Elasticsearch search with the provided query DSL. Highlights are always enabled."""
try:
if not index:
raise ValueError("index parameter is required")
if not queryBody:
raise ValueError("queryBody parameter is required")
# Create a copy to avoid mutating the input parameter
query_body = queryBody.copy()
# Enable highlights by default as mentioned in official docs
if "highlight" not in query_body:
query_body["highlight"] = {
"fields": {"*": {}}
}
response = await http_client.post(
f"{ELASTICSEARCH_URL}/{index}/_search",
json=query_body
)
response.raise_for_status()
result = response.json()
return json.dumps(result, indent=2)
except ValueError as e:
logger.warning(f"Validation error in search: {e}")
return f"Validation Error: {str(e)}"
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error searching index {index}: {e.response.status_code} - {e.response.text}")
return f"HTTP Error {e.response.status_code}: {e.response.text}"
except httpx.RequestError as e:
logger.error(f"Network error searching index {index}: {e}")
return f"Network Error: {str(e)}"
except json.JSONDecodeError as e:
logger.error(f"JSON decode error searching index {index}: {e}")
return f"Invalid JSON response from Elasticsearch"
except Exception as e:
logger.error(f"Unexpected error searching index {index}: {e}")
return f"Unexpected error: {str(e)}"
@mcp.tool()
async def get_mappings(index: str) -> str:
"""Get field mappings for a specific Elasticsearch index"""
try:
if not index:
raise ValueError("index parameter is required")
response = await http_client.get(f"{ELASTICSEARCH_URL}/{index}/_mapping")
response.raise_for_status()
mappings = response.json()
return json.dumps(mappings, indent=2)
except ValueError as e:
logger.warning(f"Validation error getting mappings: {e}")
return f"Validation Error: {str(e)}"
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
logger.warning(f"Index '{index}' not found")
return f"Error: Index '{index}' does not exist"
logger.error(f"HTTP error getting mappings for index {index}: {e.response.status_code} - {e.response.text}")
return f"HTTP Error {e.response.status_code}: {e.response.text}"
except httpx.RequestError as e:
logger.error(f"Network error getting mappings for index {index}: {e}")
return f"Network Error: {str(e)}"
except json.JSONDecodeError as e:
logger.error(f"JSON decode error getting mappings for index {index}: {e}")
return f"Invalid JSON response from Elasticsearch"
except Exception as e:
logger.error(f"Unexpected error getting mappings for index {index}: {e}")
return f"Unexpected error: {str(e)}"
@mcp.tool()
async def get_shards(index: str = None) -> str:
"""Get shard information for all or specific indices"""
try:
if index:
url = f"{ELASTICSEARCH_URL}/_cat/shards/{index}?format=json"
else:
url = f"{ELASTICSEARCH_URL}/_cat/shards?format=json"
response = await http_client.get(url)
response.raise_for_status()
shards = response.json()
result = []
for shard in shards:
result.append({
"index": shard.get("index", ""),
"shard": shard.get("shard", ""),
"prirep": shard.get("prirep", ""),
"state": shard.get("state", ""),
"docs": shard.get("docs", ""),
"store": shard.get("store", ""),
"node": shard.get("node", "")
})
return json.dumps(result, indent=2)
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error getting shard information: {e.response.status_code} - {e.response.text}")
return f"HTTP Error {e.response.status_code}: {e.response.text}"
except httpx.RequestError as e:
logger.error(f"Network error getting shard information: {e}")
return f"Network Error: {str(e)}"
except json.JSONDecodeError as e:
logger.error(f"JSON decode error getting shard information: {e}")
return f"Invalid JSON response from Elasticsearch"
except Exception as e:
logger.error(f"Unexpected error getting shard information: {e}")
return f"Unexpected error: {str(e)}"
if __name__ == "__main__":
try:
mcp.run()
except KeyboardInterrupt:
try:
logger.info("Server shutdown requested")
except:
pass # Ignore logging errors during shutdown
finally:
# Close the async client gracefully
asyncio.run(http_client.aclose())
try:
logger.info("Server shutdown complete")
except:
pass # Ignore logging errors during shutdown