setup_elasticsearch
Deploy Elasticsearch with Docker, optionally adding Kibana and recreating containers as needed for search and analytics.
Instructions
Auto-setup Elasticsearch using Docker with optional Kibana and force recreate options
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| include_kibana | No | Also setup Kibana (default: true) | |
| force_recreate | No | Force recreate containers even if they exist |
Implementation Reference
- src/admin/admin_server.py:352-453 (handler)Primary handler for the 'setup_elasticsearch' tool. Manages Docker container lifecycle for Elasticsearch (+Kibana), calls helper auto_setup_elasticsearch, updates config, and reinitializes client.@app.tool( description="Auto-setup Elasticsearch using Docker with optional Kibana and force recreate options", tags={"admin", "elasticsearch", "setup", "docker", "auto-install"} ) async def setup_elasticsearch( include_kibana: Annotated[bool, Field(description="Also setup Kibana (default: true)")] = True, force_recreate: Annotated[bool, Field(description="Force recreate containers even if they exist")] = False ) -> str: """Auto-setup Elasticsearch using Docker with comprehensive Docker container management.""" try: # Get config path for setup config_path = Path(__file__).parent.parent / "config.json" config = load_config() # Handle force recreate - stop existing containers first if force_recreate: try: setup_manager = ElasticsearchSetup(config_path) stop_result = setup_manager.stop_containers() # Give containers time to stop gracefully import time time.sleep(5) message = "š **Force Recreate Mode:**\n" message += f" š Stopped existing containers\n" message += f" ā³ Waiting for graceful shutdown...\n\n" except Exception as e: # Continue with setup even if stop fails message = f"ā ļø **Container Stop Warning:** {str(e)}\n" message += f" š” Continuing with setup anyway...\n\n" else: message = "" # Run comprehensive auto setup result = auto_setup_elasticsearch(config_path, config) if result["status"] == "already_configured": message += "ā **Elasticsearch Already Configured!**\n\n" message += f"š **Running at:** http://{result['host']}:{result['port']}\n" message += f"š **Status:** Ready and operational\n" message += f"š” **Action:** No setup needed - already working perfectly\n\n" message += f"š **Next Steps:**\n" message += f" ⢠Test connection with search tools\n" message += f" ⢠Start indexing your knowledge base\n" message += f" ⢠Use force_recreate=true to rebuild if needed" elif result["status"] == "setup_completed": es_info = result["elasticsearch"] kibana_info = result.get("kibana") message += "š **Elasticsearch Setup Completed Successfully!**\n\n" message += f"š **Elasticsearch:** http://{es_info['host']}:{es_info['port']}\n" # Handle Kibana setup results if kibana_info: if kibana_info.get("status") in ["running", "already_running"]: message += f"š **Kibana:** http://{kibana_info['host']}:{kibana_info['port']}\n" message += f" ā Status: {kibana_info['status'].replace('_', ' ').title()}\n" elif "error" in kibana_info: message += f"ā ļø **Kibana Warning:** {kibana_info['error']}\n" message += f" š” Elasticsearch is ready, Kibana setup had issues\n" else: message += f"š **Kibana:** Skipped (include_kibana=false)\n" message += f"\nš§ **Configuration Updated:**\n" message += f" š config.json automatically updated\n" message += f" š Elasticsearch client reinitialized\n" message += f" ā All components ready for use\n" message += f"\nš **Next Steps:**\n" message += f" ⢠Test search functionality\n" message += f" ⢠Index your first documents\n" message += f" ⢠Explore Kibana dashboard (if enabled)\n" # Reload configuration to use new Elasticsearch setup new_config = load_config() init_elasticsearch(new_config) reset_es_client() else: # Setup failed error_msg = result.get("error", "Unknown setup error") message += f"ā **Elasticsearch Setup Failed!**\n\n" message += f"šØ **Error Details:** {error_msg}\n\n" message += f"š§ **Troubleshooting Steps:**\n" message += f" 1. Check Docker is running and accessible\n" message += f" 2. Verify ports 9200 (ES) and 5601 (Kibana) are available\n" message += f" 3. Try force_recreate=true to rebuild containers\n" message += f" 4. Check Docker logs for detailed error information\n" message += f" 5. Ensure sufficient disk space and memory\n\n" message += f"š” **Need Help?** Check Docker status and container logs for more details" return message except ImportError as e: return f"ā Module Error: Missing required dependency\nš Details: {str(e)}\nš” Ensure all Elasticsearch setup modules are properly installed" except FileNotFoundError as e: return f"ā File Error: Required configuration or setup file not found\nš Details: {str(e)}\nš” Check config.json exists and setup modules are in place" except Exception as e: return _format_admin_error(e, "setup Elasticsearch", f"Docker setup with include_kibana={include_kibana}, force_recreate={force_recreate}")
- src/admin/admin_server.py:356-359 (schema)Pydantic schema definitions for tool inputs using Annotated[Field]. Defines parameters for Kibana inclusion and force recreate.async def setup_elasticsearch( include_kibana: Annotated[bool, Field(description="Also setup Kibana (default: true)")] = True, force_recreate: Annotated[bool, Field(description="Force recreate containers even if they exist")] = False ) -> str:
- src/main_server.py:105-105 (registration)Lists 'setup_elasticsearch' as available tool from Admin Server in CLI output, confirming registration via mounting.print(" āā Tools: get_config, update_config, server_status, server_upgrade, setup_elasticsearch, elasticsearch_status, validate_config, reset_config, reload_config")
- Core helper function called by the tool handler. Performs Docker setup via ElasticsearchSetup class, updates config.json, returns setup status.def auto_setup_elasticsearch(config_path: Path, config: Dict[str, Any]) -> Dict[str, Any]: """Auto-setup Elasticsearch if not configured or not accessible.""" # Check if Elasticsearch is already configured and accessible if check_elasticsearch_config(config): host = config["elasticsearch"]["host"] port = config["elasticsearch"]["port"] print(f"Elasticsearch already running at {host}:{port}") return {"status": "already_configured", "host": host, "port": port} print("Elasticsearch not accessible, starting auto-setup...") # Setup Elasticsearch using Docker setup = ElasticsearchSetup(config_path) try: result = setup.setup_elasticsearch(include_kibana=True) # Update config file es_host = result["elasticsearch"]["host"] es_port = result["elasticsearch"]["port"] setup.update_config(es_host, es_port) print("š Elasticsearch setup completed!") print(f"š Elasticsearch: http://{es_host}:{es_port}") if result["kibana"] and result["kibana"]["status"] in ["running", "already_running"]: kibana_host = result["kibana"]["host"] kibana_port = result["kibana"]["port"] print(f"š Kibana: http://{kibana_host}:{kibana_port}") return { "status": "setup_completed", "elasticsearch": result["elasticsearch"], "kibana": result["kibana"] } except Exception as e: print(f"ā Failed to setup Elasticsearch: {e}") return {"status": "setup_failed", "error": str(e)}
- Supporting class with Docker operations: container management (start/stop/status), config updates, health checks for ES and Kibana.class ElasticsearchSetup: """Manage Elasticsearch Docker container setup.""" def __init__(self, config_path: Path): self.config_path = config_path self.docker_client = None self.container_name = "elasticsearch-mcp" self.kibana_container_name = "kibana-mcp" def _get_docker_client(self): """Get Docker client.""" if self.docker_client is None: try: self.docker_client = docker.from_env() # Test connection self.docker_client.ping() except DockerException as e: raise ConnectionError(f"Cannot connect to Docker. Is Docker running? Error: {e}") return self.docker_client def _is_elasticsearch_running(self, host: str, port: int) -> bool: """Check if Elasticsearch is running at the given host:port.""" try: response = requests.get(f"http://{host}:{port}", timeout=5) return response.status_code == 200 except: return False def _wait_for_elasticsearch(self, host: str, port: int, timeout: int = 60) -> bool: """Wait for Elasticsearch to be ready.""" print(f"Waiting for Elasticsearch at {host}:{port}...") start_time = time.time() while time.time() - start_time < timeout: if self._is_elasticsearch_running(host, port): print("ā Elasticsearch is ready!") return True time.sleep(2) print("ā³ Still waiting...") print("ā Timeout waiting for Elasticsearch") return False def _container_exists(self, container_name: str) -> bool: """Check if container exists.""" try: client = self._get_docker_client() client.containers.get(container_name) return True except NotFound: return False def _is_container_running(self, container_name: str) -> bool: """Check if container is running.""" try: client = self._get_docker_client() container = client.containers.get(container_name) return container.status == 'running' except NotFound: return False def start_elasticsearch_container(self) -> Dict[str, Any]: """Start Elasticsearch container.""" client = self._get_docker_client() # Check if container already exists if self._container_exists(self.container_name): if self._is_container_running(self.container_name): print(f"Container {self.container_name} is already running") return {"status": "already_running", "host": "localhost", "port": 9200} else: # Start existing container print(f"š Starting existing container {self.container_name}") container = client.containers.get(self.container_name) container.start() else: # Create new container print(f"š Creating new Elasticsearch container {self.container_name}") environment = { "discovery.type": "single-node", "ES_JAVA_OPTS": "-Xms512m -Xmx512m", "xpack.security.enabled": "false", "xpack.security.enrollment.enabled": "false" } ports = {"9200/tcp": 9200, "9300/tcp": 9300} container = client.containers.run( "docker.elastic.co/elasticsearch/elasticsearch:8.14.1", name=self.container_name, environment=environment, ports=ports, detach=True, restart_policy={"Name": "unless-stopped"} ) print(f"š¦ Container {self.container_name} created") # Wait for Elasticsearch to be ready if self._wait_for_elasticsearch("localhost", 9200): return {"status": "running", "host": "localhost", "port": 9200} else: raise RuntimeError("Failed to start Elasticsearch within timeout") def start_kibana_container(self) -> Dict[str, Any]: """Start Kibana container.""" client = self._get_docker_client() # Check if Elasticsearch is running first if not self._is_container_running(self.container_name): raise RuntimeError("Elasticsearch container must be running before starting Kibana") # Check if Kibana container already exists if self._container_exists(self.kibana_container_name): if self._is_container_running(self.kibana_container_name): print(f"ā Container {self.kibana_container_name} is already running") return {"status": "already_running", "host": "localhost", "port": 5601} else: # Start existing container print(f"š Starting existing container {self.kibana_container_name}") container = client.containers.get(self.kibana_container_name) container.start() else: # Create new container print(f"š Creating new Kibana container {self.kibana_container_name}") environment = { "ELASTICSEARCH_HOSTS": "http://elasticsearch-mcp:9200" } ports = {"5601/tcp": 5601} container = client.containers.run( "docker.elastic.co/kibana/kibana:8.14.1", name=self.kibana_container_name, environment=environment, ports=ports, links={self.container_name: "elasticsearch-mcp"}, detach=True, restart_policy={"Name": "unless-stopped"} ) print(f"š¦ Container {self.kibana_container_name} created") # Wait a bit for Kibana to start print("ā³ Waiting for Kibana to start...") time.sleep(10) return {"status": "running", "host": "localhost", "port": 5601} def setup_elasticsearch(self, include_kibana: bool = True) -> Dict[str, Any]: """Complete Elasticsearch setup with optional Kibana.""" print("š§ Setting up Elasticsearch...") # Start Elasticsearch es_result = self.start_elasticsearch_container() result = { "elasticsearch": es_result, "kibana": None } # Start Kibana if requested if include_kibana: try: kibana_result = self.start_kibana_container() result["kibana"] = kibana_result except Exception as e: print(f"ā ļø Warning: Failed to start Kibana: {e}") result["kibana"] = {"status": "failed", "error": str(e)} return result def update_config(self, host: str, port: int) -> None: """Update configuration file with Elasticsearch connection details.""" try: # Load current config with open(self.config_path, 'r', encoding='utf-8') as f: config = json.load(f) # Update Elasticsearch settings config["elasticsearch"]["host"] = host config["elasticsearch"]["port"] = port config["elasticsearch"]["auto_setup"] = True # Save updated config with open(self.config_path, 'w', encoding='utf-8') as f: json.dump(config, f, indent=2, ensure_ascii=False) print(f"ā Updated config file: {host}:{port}") except Exception as e: print(f"ā Failed to update config: {e}") raise def get_container_status(self) -> Dict[str, Any]: """Get status of Elasticsearch and Kibana containers.""" try: client = self._get_docker_client() status = { "elasticsearch": { "exists": self._container_exists(self.container_name), "running": self._is_container_running(self.container_name), "container_name": self.container_name }, "kibana": { "exists": self._container_exists(self.kibana_container_name), "running": self._is_container_running(self.kibana_container_name), "container_name": self.kibana_container_name } } return status except Exception as e: return {"error": str(e)} def stop_containers(self) -> Dict[str, Any]: """Stop Elasticsearch and Kibana containers.""" try: client = self._get_docker_client() results = {} # Stop Kibana first if self._container_exists(self.kibana_container_name): container = client.containers.get(self.kibana_container_name) container.stop() results["kibana"] = "stopped" # Stop Elasticsearch if self._container_exists(self.container_name): container = client.containers.get(self.container_name) container.stop() results["elasticsearch"] = "stopped" return results except Exception as e: return {"error": str(e)}