Skip to main content
Glama
itshare4u

Agent Knowledge MCP

setup_elasticsearch

Deploy Elasticsearch with Docker, optionally adding Kibana and recreating containers as needed for search and analytics.

Instructions

Auto-setup Elasticsearch using Docker with optional Kibana and force recreate options

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
include_kibanaNoAlso setup Kibana (default: true)
force_recreateNoForce recreate containers even if they exist

Implementation Reference

  • Primary handler for the 'setup_elasticsearch' tool. Manages Docker container lifecycle for Elasticsearch (+Kibana), calls helper auto_setup_elasticsearch, updates config, and reinitializes client.
    @app.tool(
        description="Auto-setup Elasticsearch using Docker with optional Kibana and force recreate options",
        tags={"admin", "elasticsearch", "setup", "docker", "auto-install"}
    )
    async def setup_elasticsearch(
        include_kibana: Annotated[bool, Field(description="Also setup Kibana (default: true)")] = True,
        force_recreate: Annotated[bool, Field(description="Force recreate containers even if they exist")] = False
    ) -> str:
        """Auto-setup Elasticsearch using Docker with comprehensive Docker container management."""
        try:
            # Get config path for setup
            config_path = Path(__file__).parent.parent / "config.json"
            config = load_config()
    
            # Handle force recreate - stop existing containers first
            if force_recreate:
                try:
                    setup_manager = ElasticsearchSetup(config_path)
                    stop_result = setup_manager.stop_containers()
    
                    # Give containers time to stop gracefully
                    import time
                    time.sleep(5)
    
                    message = "πŸ”„ **Force Recreate Mode:**\n"
                    message += f"   πŸ›‘ Stopped existing containers\n"
                    message += f"   ⏳ Waiting for graceful shutdown...\n\n"
                except Exception as e:
                    # Continue with setup even if stop fails
                    message = f"⚠️ **Container Stop Warning:** {str(e)}\n"
                    message += f"   πŸ’‘ Continuing with setup anyway...\n\n"
            else:
                message = ""
    
            # Run comprehensive auto setup
            result = auto_setup_elasticsearch(config_path, config)
    
            if result["status"] == "already_configured":
                message += "βœ… **Elasticsearch Already Configured!**\n\n"
                message += f"πŸ“ **Running at:** http://{result['host']}:{result['port']}\n"
                message += f"πŸ” **Status:** Ready and operational\n"
                message += f"πŸ’‘ **Action:** No setup needed - already working perfectly\n\n"
                message += f"πŸš€ **Next Steps:**\n"
                message += f"   β€’ Test connection with search tools\n"
                message += f"   β€’ Start indexing your knowledge base\n"
                message += f"   β€’ Use force_recreate=true to rebuild if needed"
    
            elif result["status"] == "setup_completed":
                es_info = result["elasticsearch"]
                kibana_info = result.get("kibana")
    
                message += "πŸŽ‰ **Elasticsearch Setup Completed Successfully!**\n\n"
                message += f"πŸ“ **Elasticsearch:** http://{es_info['host']}:{es_info['port']}\n"
    
                # Handle Kibana setup results
                if kibana_info:
                    if kibana_info.get("status") in ["running", "already_running"]:
                        message += f"πŸ“Š **Kibana:** http://{kibana_info['host']}:{kibana_info['port']}\n"
                        message += f"   βœ… Status: {kibana_info['status'].replace('_', ' ').title()}\n"
                    elif "error" in kibana_info:
                        message += f"⚠️ **Kibana Warning:** {kibana_info['error']}\n"
                        message += f"   πŸ’‘ Elasticsearch is ready, Kibana setup had issues\n"
                else:
                    message += f"πŸ“Š **Kibana:** Skipped (include_kibana=false)\n"
    
                message += f"\nπŸ”§ **Configuration Updated:**\n"
                message += f"   πŸ“ config.json automatically updated\n"
                message += f"   πŸ”„ Elasticsearch client reinitialized\n"
                message += f"   βœ… All components ready for use\n"
    
                message += f"\nπŸš€ **Next Steps:**\n"
                message += f"   β€’ Test search functionality\n"
                message += f"   β€’ Index your first documents\n"
                message += f"   β€’ Explore Kibana dashboard (if enabled)\n"
    
                # Reload configuration to use new Elasticsearch setup
                new_config = load_config()
                init_elasticsearch(new_config)
                reset_es_client()
    
            else:
                # Setup failed
                error_msg = result.get("error", "Unknown setup error")
                message += f"❌ **Elasticsearch Setup Failed!**\n\n"
                message += f"🚨 **Error Details:** {error_msg}\n\n"
                message += f"πŸ”§ **Troubleshooting Steps:**\n"
                message += f"   1. Check Docker is running and accessible\n"
                message += f"   2. Verify ports 9200 (ES) and 5601 (Kibana) are available\n"
                message += f"   3. Try force_recreate=true to rebuild containers\n"
                message += f"   4. Check Docker logs for detailed error information\n"
                message += f"   5. Ensure sufficient disk space and memory\n\n"
                message += f"πŸ’‘ **Need Help?** Check Docker status and container logs for more details"
    
            return message
    
        except ImportError as e:
            return f"❌ Module Error: Missing required dependency\nπŸ” Details: {str(e)}\nπŸ’‘ Ensure all Elasticsearch setup modules are properly installed"
        except FileNotFoundError as e:
            return f"❌ File Error: Required configuration or setup file not found\nπŸ” Details: {str(e)}\nπŸ’‘ Check config.json exists and setup modules are in place"
        except Exception as e:
            return _format_admin_error(e, "setup Elasticsearch", f"Docker setup with include_kibana={include_kibana}, force_recreate={force_recreate}")
  • Pydantic schema definitions for tool inputs using Annotated[Field]. Defines parameters for Kibana inclusion and force recreate.
    async def setup_elasticsearch(
        include_kibana: Annotated[bool, Field(description="Also setup Kibana (default: true)")] = True,
        force_recreate: Annotated[bool, Field(description="Force recreate containers even if they exist")] = False
    ) -> str:
  • Lists 'setup_elasticsearch' as available tool from Admin Server in CLI output, confirming registration via mounting.
    print("    └─ Tools: get_config, update_config, server_status, server_upgrade, setup_elasticsearch, elasticsearch_status, validate_config, reset_config, reload_config")
  • Core helper function called by the tool handler. Performs Docker setup via ElasticsearchSetup class, updates config.json, returns setup status.
    def auto_setup_elasticsearch(config_path: Path, config: Dict[str, Any]) -> Dict[str, Any]:
        """Auto-setup Elasticsearch if not configured or not accessible."""
        
        # Check if Elasticsearch is already configured and accessible
        if check_elasticsearch_config(config):
            host = config["elasticsearch"]["host"]
            port = config["elasticsearch"]["port"]
            print(f"Elasticsearch already running at {host}:{port}")
            return {"status": "already_configured", "host": host, "port": port}
        
        print("Elasticsearch not accessible, starting auto-setup...")
        
        # Setup Elasticsearch using Docker
        setup = ElasticsearchSetup(config_path)
        
        try:
            result = setup.setup_elasticsearch(include_kibana=True)
            
            # Update config file
            es_host = result["elasticsearch"]["host"]
            es_port = result["elasticsearch"]["port"]
            setup.update_config(es_host, es_port)
            
            print("πŸŽ‰ Elasticsearch setup completed!")
            print(f"πŸ“ Elasticsearch: http://{es_host}:{es_port}")
            
            if result["kibana"] and result["kibana"]["status"] in ["running", "already_running"]:
                kibana_host = result["kibana"]["host"]
                kibana_port = result["kibana"]["port"]
                print(f"πŸ“Š Kibana: http://{kibana_host}:{kibana_port}")
            
            return {
                "status": "setup_completed",
                "elasticsearch": result["elasticsearch"],
                "kibana": result["kibana"]
            }
            
        except Exception as e:
            print(f"❌ Failed to setup Elasticsearch: {e}")
            return {"status": "setup_failed", "error": str(e)}
  • Supporting class with Docker operations: container management (start/stop/status), config updates, health checks for ES and Kibana.
    class ElasticsearchSetup:
        """Manage Elasticsearch Docker container setup."""
        
        def __init__(self, config_path: Path):
            self.config_path = config_path
            self.docker_client = None
            self.container_name = "elasticsearch-mcp"
            self.kibana_container_name = "kibana-mcp"
            
        def _get_docker_client(self):
            """Get Docker client."""
            if self.docker_client is None:
                try:
                    self.docker_client = docker.from_env()
                    # Test connection
                    self.docker_client.ping()
                except DockerException as e:
                    raise ConnectionError(f"Cannot connect to Docker. Is Docker running? Error: {e}")
            return self.docker_client
        
        def _is_elasticsearch_running(self, host: str, port: int) -> bool:
            """Check if Elasticsearch is running at the given host:port."""
            try:
                response = requests.get(f"http://{host}:{port}", timeout=5)
                return response.status_code == 200
            except:
                return False
        
        def _wait_for_elasticsearch(self, host: str, port: int, timeout: int = 60) -> bool:
            """Wait for Elasticsearch to be ready."""
            print(f"Waiting for Elasticsearch at {host}:{port}...")
            start_time = time.time()
            
            while time.time() - start_time < timeout:
                if self._is_elasticsearch_running(host, port):
                    print("βœ… Elasticsearch is ready!")
                    return True
                time.sleep(2)
                print("⏳ Still waiting...")
            
            print("❌ Timeout waiting for Elasticsearch")
            return False
        
        def _container_exists(self, container_name: str) -> bool:
            """Check if container exists."""
            try:
                client = self._get_docker_client()
                client.containers.get(container_name)
                return True
            except NotFound:
                return False
        
        def _is_container_running(self, container_name: str) -> bool:
            """Check if container is running."""
            try:
                client = self._get_docker_client()
                container = client.containers.get(container_name)
                return container.status == 'running'
            except NotFound:
                return False
        
        def start_elasticsearch_container(self) -> Dict[str, Any]:
            """Start Elasticsearch container."""
            client = self._get_docker_client()
            
            # Check if container already exists
            if self._container_exists(self.container_name):
                if self._is_container_running(self.container_name):
                    print(f"Container {self.container_name} is already running")
                    return {"status": "already_running", "host": "localhost", "port": 9200}
                else:
                    # Start existing container
                    print(f"πŸ”„ Starting existing container {self.container_name}")
                    container = client.containers.get(self.container_name)
                    container.start()
            else:
                # Create new container
                print(f"πŸš€ Creating new Elasticsearch container {self.container_name}")
                
                environment = {
                    "discovery.type": "single-node",
                    "ES_JAVA_OPTS": "-Xms512m -Xmx512m",
                    "xpack.security.enabled": "false",
                    "xpack.security.enrollment.enabled": "false"
                }
                
                ports = {"9200/tcp": 9200, "9300/tcp": 9300}
                
                container = client.containers.run(
                    "docker.elastic.co/elasticsearch/elasticsearch:8.14.1",
                    name=self.container_name,
                    environment=environment,
                    ports=ports,
                    detach=True,
                    restart_policy={"Name": "unless-stopped"}
                )
                
                print(f"πŸ“¦ Container {self.container_name} created")
            
            # Wait for Elasticsearch to be ready
            if self._wait_for_elasticsearch("localhost", 9200):
                return {"status": "running", "host": "localhost", "port": 9200}
            else:
                raise RuntimeError("Failed to start Elasticsearch within timeout")
        
        def start_kibana_container(self) -> Dict[str, Any]:
            """Start Kibana container."""
            client = self._get_docker_client()
            
            # Check if Elasticsearch is running first
            if not self._is_container_running(self.container_name):
                raise RuntimeError("Elasticsearch container must be running before starting Kibana")
            
            # Check if Kibana container already exists
            if self._container_exists(self.kibana_container_name):
                if self._is_container_running(self.kibana_container_name):
                    print(f"βœ… Container {self.kibana_container_name} is already running")
                    return {"status": "already_running", "host": "localhost", "port": 5601}
                else:
                    # Start existing container
                    print(f"πŸ”„ Starting existing container {self.kibana_container_name}")
                    container = client.containers.get(self.kibana_container_name)
                    container.start()
            else:
                # Create new container
                print(f"πŸš€ Creating new Kibana container {self.kibana_container_name}")
                
                environment = {
                    "ELASTICSEARCH_HOSTS": "http://elasticsearch-mcp:9200"
                }
                
                ports = {"5601/tcp": 5601}
                
                container = client.containers.run(
                    "docker.elastic.co/kibana/kibana:8.14.1",
                    name=self.kibana_container_name,
                    environment=environment,
                    ports=ports,
                    links={self.container_name: "elasticsearch-mcp"},
                    detach=True,
                    restart_policy={"Name": "unless-stopped"}
                )
                
                print(f"πŸ“¦ Container {self.kibana_container_name} created")
            
            # Wait a bit for Kibana to start
            print("⏳ Waiting for Kibana to start...")
            time.sleep(10)
            
            return {"status": "running", "host": "localhost", "port": 5601}
        
        def setup_elasticsearch(self, include_kibana: bool = True) -> Dict[str, Any]:
            """Complete Elasticsearch setup with optional Kibana."""
            print("πŸ”§ Setting up Elasticsearch...")
            
            # Start Elasticsearch
            es_result = self.start_elasticsearch_container()
            
            result = {
                "elasticsearch": es_result,
                "kibana": None
            }
            
            # Start Kibana if requested
            if include_kibana:
                try:
                    kibana_result = self.start_kibana_container()
                    result["kibana"] = kibana_result
                except Exception as e:
                    print(f"⚠️  Warning: Failed to start Kibana: {e}")
                    result["kibana"] = {"status": "failed", "error": str(e)}
            
            return result
        
        def update_config(self, host: str, port: int) -> None:
            """Update configuration file with Elasticsearch connection details."""
            try:
                # Load current config
                with open(self.config_path, 'r', encoding='utf-8') as f:
                    config = json.load(f)
                
                # Update Elasticsearch settings
                config["elasticsearch"]["host"] = host
                config["elasticsearch"]["port"] = port
                config["elasticsearch"]["auto_setup"] = True
                
                # Save updated config
                with open(self.config_path, 'w', encoding='utf-8') as f:
                    json.dump(config, f, indent=2, ensure_ascii=False)
                
                print(f"βœ… Updated config file: {host}:{port}")
                
            except Exception as e:
                print(f"❌ Failed to update config: {e}")
                raise
        
        def get_container_status(self) -> Dict[str, Any]:
            """Get status of Elasticsearch and Kibana containers."""
            try:
                client = self._get_docker_client()
                
                status = {
                    "elasticsearch": {
                        "exists": self._container_exists(self.container_name),
                        "running": self._is_container_running(self.container_name),
                        "container_name": self.container_name
                    },
                    "kibana": {
                        "exists": self._container_exists(self.kibana_container_name),
                        "running": self._is_container_running(self.kibana_container_name),
                        "container_name": self.kibana_container_name
                    }
                }
                
                return status
                
            except Exception as e:
                return {"error": str(e)}
        
        def stop_containers(self) -> Dict[str, Any]:
            """Stop Elasticsearch and Kibana containers."""
            try:
                client = self._get_docker_client()
                results = {}
                
                # Stop Kibana first
                if self._container_exists(self.kibana_container_name):
                    container = client.containers.get(self.kibana_container_name)
                    container.stop()
                    results["kibana"] = "stopped"
                
                # Stop Elasticsearch
                if self._container_exists(self.container_name):
                    container = client.containers.get(self.container_name)
                    container.stop()
                    results["elasticsearch"] = "stopped"
                
                return results
                
            except Exception as e:
                return {"error": str(e)}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/itshare4u/AgentKnowledgeMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server