A VMware ESXi/vCenter management server

import os import json import logging import ssl import argparse from dataclasses import dataclass from typing import Optional, Dict, Any # MCP protocol related imports from mcp.server.lowlevel import Server # MCP server base class from mcp.server.sse import SseServerTransport # SSE transport support from mcp import types # MCP type definitions # pyVmomi VMware API imports from pyVim import connect from pyVmomi import vim, vmodl # Configuration data class for storing configuration options @dataclass class Config: vcenter_host: str vcenter_user: str vcenter_password: str datacenter: Optional[str] = None # Datacenter name (optional) cluster: Optional[str] = None # Cluster name (optional) datastore: Optional[str] = None # Datastore name (optional) network: Optional[str] = None # Virtual network name (optional) insecure: bool = False # Whether to skip SSL certificate verification (default: False) api_key: Optional[str] = None # API access key for authentication log_file: Optional[str] = None # Log file path (if not specified, output to console) log_level: str = "INFO" # Log level # VMware management class, encapsulating pyVmomi operations for vSphere class VMwareManager: def __init__(self, config: Config): self.config = config self.si = None # Service instance (ServiceInstance) self.content = None # vSphere content root self.datacenter_obj = None self.resource_pool = None self.datastore_obj = None self.network_obj = None self.authenticated = False # Authentication flag for API key verification self._connect_vcenter() def _connect_vcenter(self): """Connect to vCenter/ESXi and retrieve main resource object references.""" try: if self.config.insecure: # Connection method without SSL certificate verification context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) context.check_hostname = False # Disable hostname checking context.verify_mode = ssl.CERT_NONE self.si = connect.SmartConnect( host=self.config.vcenter_host, user=self.config.vcenter_user, pwd=self.config.vcenter_password, sslContext=context) else: # Standard SSL verification connection self.si = connect.SmartConnect( host=self.config.vcenter_host, user=self.config.vcenter_user, pwd=self.config.vcenter_password) except Exception as e: logging.error(f"Failed to connect to vCenter/ESXi: {e}") raise # Retrieve content root object self.content = self.si.RetrieveContent() logging.info("Successfully connected to VMware vCenter/ESXi API") # Retrieve target datacenter object if self.config.datacenter: # Find specified datacenter by name self.datacenter_obj = next((dc for dc in self.content.rootFolder.childEntity if isinstance(dc, vim.Datacenter) and dc.name == self.config.datacenter), None) if not self.datacenter_obj: logging.error(f"Datacenter named {self.config.datacenter} not found") raise Exception(f"Datacenter {self.config.datacenter} not found") else: # Default to the first available datacenter self.datacenter_obj = next((dc for dc in self.content.rootFolder.childEntity if isinstance(dc, vim.Datacenter)), None) if not self.datacenter_obj: raise Exception("No datacenter object found") # Retrieve resource pool (if a cluster is configured, use the cluster's resource pool; otherwise, use the host resource pool) compute_resource = None if self.config.cluster: # Find specified cluster for folder in self.datacenter_obj.hostFolder.childEntity: if isinstance(folder, vim.ClusterComputeResource) and folder.name == self.config.cluster: compute_resource = folder break if not compute_resource: logging.error(f"Cluster named {self.config.cluster} not found") raise Exception(f"Cluster {self.config.cluster} not found") else: # Default to the first ComputeResource (cluster or standalone host) compute_resource = next((cr for cr in self.datacenter_obj.hostFolder.childEntity if isinstance(cr, vim.ComputeResource)), None) if not compute_resource: raise Exception("No compute resource (cluster or host) found") self.resource_pool = compute_resource.resourcePool logging.info(f"Using resource pool: {self.resource_pool.name}") # Retrieve datastore object if self.config.datastore: # Find specified datastore in the datacenter self.datastore_obj = next((ds for ds in self.datacenter_obj.datastoreFolder.childEntity if isinstance(ds, vim.Datastore) and ds.name == self.config.datastore), None) if not self.datastore_obj: logging.error(f"Datastore named {self.config.datastore} not found") raise Exception(f"Datastore {self.config.datastore} not found") else: # Default to the datastore with the largest available capacity datastores = [ds for ds in self.datacenter_obj.datastoreFolder.childEntity if isinstance(ds, vim.Datastore)] if not datastores: raise Exception("No available datastore found in the datacenter") # Select the one with the maximum free space self.datastore_obj = max(datastores, key=lambda ds: ds.summary.freeSpace) logging.info(f"Using datastore: {self.datastore_obj.name}") # Retrieve network object (network or distributed virtual portgroup) if self.config.network: # Find specified network in the datacenter network list networks = self.datacenter_obj.networkFolder.childEntity self.network_obj = next((net for net in networks if net.name == self.config.network), None) if not self.network_obj: logging.error(f"Network {self.config.network} not found") raise Exception(f"Network {self.config.network} not found") logging.info(f"Using network: {self.network_obj.name}") else: self.network_obj = None # If no network is specified, VM creation can choose to not connect to a network def list_vms(self) -> list: """List all virtual machine names.""" vm_list = [] # Create a view to iterate over all virtual machines container = self.content.viewManager.CreateContainerView(self.content.rootFolder, [vim.VirtualMachine], True) for vm in container.view: vm_list.append(vm.name) container.Destroy() return vm_list def find_vm(self, name: str) -> Optional[vim.VirtualMachine]: """Find virtual machine object by name.""" container = self.content.viewManager.CreateContainerView(self.content.rootFolder, [vim.VirtualMachine], True) vm_obj = None for vm in container.view: if vm.name == name: vm_obj = vm break container.Destroy() return vm_obj def get_vm_performance(self, vm_name: str) -> Dict[str, Any]: """Retrieve performance data (CPU, memory, storage, and network) for the specified virtual machine.""" vm = self.find_vm(vm_name) if not vm: raise Exception(f"VM {vm_name} not found") # CPU and memory usage (obtained from quickStats) stats = {} qs = vm.summary.quickStats stats["cpu_usage"] = qs.overallCpuUsage # MHz stats["memory_usage"] = qs.guestMemoryUsage # MB # Storage usage (committed storage, in GB) committed = vm.summary.storage.committed if vm.summary.storage else 0 stats["storage_usage"] = round(committed / (1024**3), 2) # Convert to GB # Network usage (obtained from host or VM NIC statistics, latest sample) # Here we simply obtain the latest performance counter for VM network I/O net_bytes_transmitted = 0 net_bytes_received = 0 try: pm = self.content.perfManager # Define performance counter IDs to query: network transmitted and received bytes counter_ids = [] for c in pm.perfCounter: counter_full_name = f"{c.groupInfo.key}.{c.nameInfo.key}.{c.rollupType}" if counter_full_name in ("net.transmitted.average", "net.received.average"): counter_ids.append(c.key) if counter_ids: query = vim.PerformanceManager.QuerySpec(maxSample=1, entity=vm, metricId=[vim.PerformanceManager.MetricId(counterId=cid, instance="*") for cid in counter_ids]) stats_res = pm.QueryStats(querySpec=[query]) for series in stats_res[0].value: # Sum data from each network interface if series.id.counterId == counter_ids[0]: net_bytes_transmitted = sum(series.value) elif series.id.counterId == counter_ids[1]: net_bytes_received = sum(series.value) stats["network_transmit_KBps"] = net_bytes_transmitted stats["network_receive_KBps"] = net_bytes_received except Exception as e: # If obtaining performance counters fails, log the error but do not terminate logging.warning(f"Failed to retrieve network performance data: {e}") stats["network_transmit_KBps"] = None stats["network_receive_KBps"] = None return stats def create_vm(self, name: str, cpus: int, memory_mb: int, datastore: Optional[str] = None, network: Optional[str] = None) -> str: """Create a new virtual machine (from scratch, with an empty disk and optional network).""" # If a specific datastore or network is provided, update the corresponding object accordingly datastore_obj = self.datastore_obj network_obj = self.network_obj if datastore: datastore_obj = next((ds for ds in self.datacenter_obj.datastoreFolder.childEntity if isinstance(ds, vim.Datastore) and ds.name == datastore), None) if not datastore_obj: raise Exception(f"Specified datastore {datastore} not found") if network: networks = self.datacenter_obj.networkFolder.childEntity network_obj = next((net for net in networks if net.name == network), None) if not network_obj: raise Exception(f"Specified network {network} not found") # Build VM configuration specification vm_spec = vim.vm.ConfigSpec(name=name, memoryMB=memory_mb, numCPUs=cpus, guestId="otherGuest") # guestId can be adjusted as needed device_specs = [] # Add SCSI controller controller_spec = vim.vm.device.VirtualDeviceSpec() controller_spec.operation = vim.vm.device.VirtualDeviceSpec.Operation.add controller_spec.device = vim.vm.device.ParaVirtualSCSIController() # Using ParaVirtual SCSI controller controller_spec.device.deviceInfo = vim.Description(label="SCSI Controller", summary="ParaVirtual SCSI Controller") controller_spec.device.busNumber = 0 controller_spec.device.sharedBus = vim.vm.device.VirtualSCSIController.Sharing.noSharing # Set a temporary negative key for the controller for later reference controller_spec.device.key = -101 device_specs.append(controller_spec) # Add virtual disk disk_spec = vim.vm.device.VirtualDeviceSpec() disk_spec.operation = vim.vm.device.VirtualDeviceSpec.Operation.add disk_spec.fileOperation = vim.vm.device.VirtualDeviceSpec.FileOperation.create disk_spec.device = vim.vm.device.VirtualDisk() disk_spec.device.capacityInKB = 1024 * 1024 * 10 # Create a 10GB disk disk_spec.device.deviceInfo = vim.Description(label="Hard Disk 1", summary="10 GB disk") disk_spec.device.backing = vim.vm.device.VirtualDisk.FlatVer2BackingInfo() disk_spec.device.backing.diskMode = "persistent" disk_spec.device.backing.thinProvisioned = True # Thin provisioning disk_spec.device.backing.datastore = datastore_obj # Attach the disk to the previously created controller disk_spec.device.controllerKey = controller_spec.device.key disk_spec.device.unitNumber = 0 device_specs.append(disk_spec) # If a network is provided, add a virtual network adapter if network_obj: nic_spec = vim.vm.device.VirtualDeviceSpec() nic_spec.operation = vim.vm.device.VirtualDeviceSpec.Operation.add nic_spec.device = vim.vm.device.VirtualVmxnet3() # Using VMXNET3 network adapter nic_spec.device.deviceInfo = vim.Description(label="Network Adapter 1", summary=network_obj.name) if isinstance(network_obj, vim.Network): nic_spec.device.backing = vim.vm.device.VirtualEthernetCard.NetworkBackingInfo(network=network_obj, deviceName=network_obj.name) elif isinstance(network_obj, vim.dvs.DistributedVirtualPortgroup): # Distributed virtual switch portgroup dvs_uuid = network_obj.config.distributedVirtualSwitch.uuid port_key = network_obj.key nic_spec.device.backing = vim.vm.device.VirtualEthernetCard.DistributedVirtualPortBackingInfo( port=vim.dvs.PortConnection(portgroupKey=port_key, switchUuid=dvs_uuid) ) nic_spec.device.connectable = vim.vm.device.VirtualDevice.ConnectInfo(startConnected=True, allowGuestControl=True) device_specs.append(nic_spec) vm_spec.deviceChange = device_specs # Get the folder in which to place the VM (default is the datacenter's vmFolder) vm_folder = self.datacenter_obj.vmFolder # Create the VM in the specified resource pool try: task = vm_folder.CreateVM_Task(config=vm_spec, pool=self.resource_pool) # Wait for the task to complete while task.info.state not in [vim.TaskInfo.State.success, vim.TaskInfo.State.error]: continue if task.info.state == vim.TaskInfo.State.error: raise task.info.error except Exception as e: logging.error(f"Failed to create virtual machine: {e}") raise logging.info(f"Virtual machine created: {name}") return f"VM '{name}' created." def clone_vm(self, template_name: str, new_name: str) -> str: """Clone a new virtual machine from an existing template or VM.""" template_vm = self.find_vm(template_name) if not template_vm: raise Exception(f"Template virtual machine {template_name} not found") vm_folder = template_vm.parent # Place the new VM in the same folder as the template if not isinstance(vm_folder, vim.Folder): vm_folder = self.datacenter_obj.vmFolder # Use the resource pool of the host/cluster where the template is located resource_pool = template_vm.resourcePool or self.resource_pool relocate_spec = vim.vm.RelocateSpec(pool=resource_pool, datastore=self.datastore_obj) clone_spec = vim.vm.CloneSpec(powerOn=False, template=False, location=relocate_spec) try: task = template_vm.Clone(folder=vm_folder, name=new_name, spec=clone_spec) while task.info.state not in [vim.TaskInfo.State.success, vim.TaskInfo.State.error]: continue if task.info.state == vim.TaskInfo.State.error: raise task.info.error except Exception as e: logging.error(f"Failed to clone virtual machine: {e}") raise logging.info(f"Cloned virtual machine {template_name} to new VM: {new_name}") return f"VM '{new_name}' cloned from '{template_name}'." def delete_vm(self, name: str) -> str: """Delete the specified virtual machine.""" vm = self.find_vm(name) if not vm: raise Exception(f"Virtual machine {name} not found") try: task = vm.Destroy_Task() while task.info.state not in [vim.TaskInfo.State.success, vim.TaskInfo.State.error]: continue if task.info.state == vim.TaskInfo.State.error: raise task.info.error except Exception as e: logging.error(f"Failed to delete virtual machine: {e}") raise logging.info(f"Virtual machine deleted: {name}") return f"VM '{name}' deleted." def power_on_vm(self, name: str) -> str: """Power on the specified virtual machine.""" vm = self.find_vm(name) if not vm: raise Exception(f"Virtual machine {name} not found") if vm.runtime.powerState == vim.VirtualMachine.PowerState.poweredOn: return f"VM '{name}' is already powered on." task = vm.PowerOnVM_Task() while task.info.state not in [vim.TaskInfo.State.success, vim.TaskInfo.State.error]: continue if task.info.state == vim.TaskInfo.State.error: raise task.info.error logging.info(f"Virtual machine powered on: {name}") return f"VM '{name}' powered on." def power_off_vm(self, name: str) -> str: """Power off the specified virtual machine.""" vm = self.find_vm(name) if not vm: raise Exception(f"Virtual machine {name} not found") if vm.runtime.powerState == vim.VirtualMachine.PowerState.poweredOff: return f"VM '{name}' is already powered off." task = vm.PowerOffVM_Task() while task.info.state not in [vim.TaskInfo.State.success, vim.TaskInfo.State.error]: continue if task.info.state == vim.TaskInfo.State.error: raise task.info.error logging.info(f"Virtual machine powered off: {name}") return f"VM '{name}' powered off." # ---------------- MCP Server Definition ---------------- # Initialize MCP Server object mcp_server = Server(name="VMware-MCP-Server", version="0.0.1") # Define supported tools (executable operations) and resources (data interfaces) # The implementation of tools and resources will call methods in VMwareManager # Note: For each operation, perform API key authentication check, and only execute sensitive operations if the authenticated flag is True # If not authenticated, an exception is raised # Tool 1: Authentication (via API Key) def tool_authenticate(key: str) -> str: """Validate the API key and enable subsequent operations upon success.""" if config.api_key and key == config.api_key: manager.authenticated = True logging.info("API key verification successful, client is authorized") return "Authentication successful." else: logging.warning("API key verification failed") raise Exception("Authentication failed: invalid API key.") # Tool 2: Create virtual machine def tool_create_vm(name: str, cpu: int, memory: int, datastore: str = None, network: str = None) -> str: """Create a new virtual machine.""" _check_auth() # Check access permissions return manager.create_vm(name, cpu, memory, datastore, network) # Tool 3: Clone virtual machine def tool_clone_vm(template_name: str, new_name: str) -> str: """Clone a virtual machine from a template.""" _check_auth() return manager.clone_vm(template_name, new_name) # Tool 4: Delete virtual machine def tool_delete_vm(name: str) -> str: """Delete the specified virtual machine.""" _check_auth() return manager.delete_vm(name) # Tool 5: Power on virtual machine def tool_power_on(name: str) -> str: """Power on the specified virtual machine.""" _check_auth() return manager.power_on_vm(name) # Tool 6: Power off virtual machine def tool_power_off(name: str) -> str: """Power off the specified virtual machine.""" _check_auth() return manager.power_off_vm(name) # Tool 7: List all virtual machines def tool_list_vms() -> list: """Return a list of all virtual machine names.""" _check_auth() return manager.list_vms() # Resource 1: Retrieve virtual machine performance data def resource_vm_performance(vm_name: str) -> dict: """Retrieve CPU, memory, storage, and network usage for the specified virtual machine.""" _check_auth() return manager.get_vm_performance(vm_name) # Internal helper: Check API access permissions def _check_auth(): if config.api_key: # If an API key is configured, require that manager.authenticated is True if not manager.authenticated: raise Exception("Unauthorized: API key required.") # Register the above functions as tools and resources for the MCP Server # Encapsulate using mcp.types.Tool and mcp.types.Resource tools = { "authenticate": types.Tool( name="authenticate", description="Authenticate using API key to enable privileged operations", parameters={"key": str}, handler=lambda params: tool_authenticate(**params), inputSchema={"type": "object", "properties": {"key": {"type": "string"}}, "required": ["key"]} ), "createVM": types.Tool( name="createVM", description="Create a new virtual machine", parameters={"name": str, "cpu": int, "memory": int, "datastore": Optional[str], "network": Optional[str]}, handler=lambda params: tool_create_vm(**params), inputSchema={ "type": "object", "properties": { "name": {"type": "string"}, "cpu": {"type": "integer"}, "memory": {"type": "integer"}, "datastore": {"type": "string", "nullable": True}, "network": {"type": "string", "nullable": True} }, "required": ["name", "cpu", "memory"] } ), "cloneVM": types.Tool( name="cloneVM", description="Clone a virtual machine from a template or existing VM", parameters={"template_name": str, "new_name": str}, handler=lambda params: tool_clone_vm(**params), inputSchema={ "type": "object", "properties": { "template_name": {"type": "string"}, "new_name": {"type": "string"} }, "required": ["template_name", "new_name"] } ), "deleteVM": types.Tool( name="deleteVM", description="Delete a virtual machine", parameters={"name": str}, handler=lambda params: tool_delete_vm(**params), inputSchema={ "type": "object", "properties": {"name": {"type": "string"}}, "required": ["name"] } ), "powerOn": types.Tool( name="powerOn", description="Power on a virtual machine", parameters={"name": str}, handler=lambda params: tool_power_on(**params), inputSchema={ "type": "object", "properties": {"name": {"type": "string"}}, "required": ["name"] } ), "powerOff": types.Tool( name="powerOff", description="Power off a virtual machine", parameters={"name": str}, handler=lambda params: tool_power_off(**params), inputSchema={ "type": "object", "properties": {"name": {"type": "string"}}, "required": ["name"] } ), "listVMs": types.Tool( name="listVMs", description="List all virtual machines", parameters={}, handler=lambda params: tool_list_vms(), inputSchema={"type": "object", "properties": {}} ) } resources = { "vmStats": types.Resource( name="vmStats", uri="vmstats://{vm_name}", description="Get CPU, memory, storage, network usage of a VM", parameters={"vm_name": str}, handler=lambda params: resource_vm_performance(**params), inputSchema={ "type": "object", "properties": { "vm_name": {"type": "string"} }, "required": ["vm_name"] } ) } # Add tools and resources to the MCP Server object for name, tool in tools.items(): setattr(mcp_server, f"tool_{name}", tool) for name, res in resources.items(): setattr(mcp_server, f"resource_{name}", res) # Set the MCP Server capabilities, declaring that the tools and resources list is available mcp_server.capabilities = { "tools": {"listChanged": True}, "resources": {"listChanged": True} } # Maintain a global SSE transport instance for sending events during POST request processing active_transport: Optional[SseServerTransport] = None # SSE initialization request handler (HTTP GET /sse) async def sse_endpoint(scope, receive, send): """Handle SSE connection initialization requests. Establish an MCP SSE session.""" global active_transport # Construct response headers to establish an event stream headers = [(b"content-type", b"text/event-stream")] # Verify API key: Retrieve from request headers 'Authorization' or 'X-API-Key' headers_dict = {k.lower().decode(): v.decode() for (k, v) in scope.get("headers", [])} provided_key = None if b"authorization" in scope["headers"]: provided_key = headers_dict.get("authorization") elif b"x-api-key" in scope["headers"]: provided_key = headers_dict.get("x-api-key") if config.api_key and provided_key != f"Bearer {config.api_key}" and provided_key != config.api_key: # If the correct API key is not provided, return 401 res_status = b"401 UNAUTHORIZED" await send({"type": "http.response.start", "status": 401, "headers": [(b"content-type", b"text/plain")]}) await send({"type": "http.response.body", "body": b"Unauthorized"}) logging.warning("No valid API key provided, rejecting SSE connection") return # Establish SSE transport and connect to the MCP Server active_transport = SseServerTransport("/sse/messages") logging.info("Established new SSE session") # Send SSE response headers to the client, preparing to start sending events await send({"type": "http.response.start", "status": 200, "headers": headers}) try: async with active_transport.connect_sse(scope, receive, send) as (read_stream, write_stream): init_opts = mcp_server.create_initialization_options() # Run MCP Server, passing the read/write streams to the Server await mcp_server.run(read_stream, write_stream, init_opts) except Exception as e: logging.error(f"SSE session encountered an error: {e}") finally: active_transport = None # SSE session ended, send an empty message to indicate completion await send({"type": "http.response.body", "body": b"", "more_body": False}) # JSON-RPC message handler (HTTP POST /sse/messages) async def messages_endpoint(scope, receive, send): """Handle JSON-RPC requests sent by the client (via POST).""" global active_transport # Read request body data body_bytes = b'' more_body = True while more_body: event = await receive() if event["type"] == "http.request": body_bytes += event.get("body", b'') more_body = event.get("more_body", False) # Parse JSON-RPC request try: body_str = body_bytes.decode('utf-8') msg = json.loads(body_str) except Exception as e: logging.error(f"JSON parsing failed: {e}") await send({"type": "http.response.start", "status": 400, "headers": [(b"content-type", b"text/plain")]}) await send({"type": "http.response.body", "body": b"Invalid JSON"}) return # Only accept requests sent through an established SSE transport if not active_transport: await send({"type": "http.response.start", "status": 400, "headers": [(b"content-type", b"text/plain")]}) await send({"type": "http.response.body", "body": b"No active session"}) return # Pass the POST request content to active_transport to trigger the corresponding MCP Server operation try: # Handle the POST message through SseServerTransport, which injects the request into the MCP session await active_transport.handle_post(scope, body_bytes) status = 200 response_body = b"" except Exception as e: logging.error(f"Error handling POST message: {e}") status = 500 response_body = str(e).encode('utf-8') # Reply to the client with HTTP status await send({"type": "http.response.start", "status": status, "headers": [(b"content-type", b"text/plain")]}) await send({"type": "http.response.body", "body": response_body}) # Simple ASGI application routing: dispatch requests to the appropriate handler based on the path and method async def app(scope, receive, send): if scope["type"] == "http": path = scope.get("path", "") method = scope.get("method", "").upper() if path == "/sse" and method == "GET": # SSE initialization request await sse_endpoint(scope, receive, send) elif path == "/sse/messages" and method in ("POST", "OPTIONS"): # JSON-RPC message request; handle CORS preflight OPTIONS request if method == "OPTIONS": # Return allowed methods headers = [ (b"access-control-allow-methods", b"POST, OPTIONS"), (b"access-control-allow-headers", b"Content-Type, Authorization, X-API-Key"), (b"access-control-allow-origin", b"*") ] await send({"type": "http.response.start", "status": 204, "headers": headers}) await send({"type": "http.response.body", "body": b""}) else: await messages_endpoint(scope, receive, send) else: # Route not found await send({"type": "http.response.start", "status": 404, "headers": [(b"content-type", b"text/plain")]}) await send({"type": "http.response.body", "body": b"Not Found"}) else: # Non-HTTP event, do not process return # Parse command-line arguments and environment variables, and load configuration parser = argparse.ArgumentParser(description="MCP VMware ESXi Management Server") parser.add_argument("--config", "-c", help="Configuration file path (JSON or YAML)", default=None) args = parser.parse_args() # Attempt to load configuration from a file or environment variables config_data = {} config_path = args.config or os.environ.get("MCP_CONFIG_FILE") if config_path: # Parse JSON or YAML based on the file extension if config_path.endswith((".yml", ".yaml")): import yaml with open(config_path, 'r') as f: config_data = yaml.safe_load(f) elif config_path.endswith(".json"): with open(config_path, 'r') as f: config_data = json.load(f) else: raise ValueError("Unsupported configuration file format. Please use JSON or YAML") # Override configuration from environment variables (higher priority than file) env_map = { "VCENTER_HOST": "vcenter_host", "VCENTER_USER": "vcenter_user", "VCENTER_PASSWORD": "vcenter_password", "VCENTER_DATACENTER": "datacenter", "VCENTER_CLUSTER": "cluster", "VCENTER_DATASTORE": "datastore", "VCENTER_NETWORK": "network", "VCENTER_INSECURE": "insecure", "MCP_API_KEY": "api_key", "MCP_LOG_FILE": "log_file", "MCP_LOG_LEVEL": "log_level" } for env_key, cfg_key in env_map.items(): if env_key in os.environ: val = os.environ[env_key] # Boolean type conversion if cfg_key == "insecure": config_data[cfg_key] = val.lower() in ("1", "true", "yes") else: config_data[cfg_key] = val # Construct Config object from config_data required_keys = ["vcenter_host", "vcenter_user", "vcenter_password"] for k in required_keys: if k not in config_data or not config_data[k]: raise Exception(f"Missing required configuration item: {k}") config = Config(**config_data) # Initialize logging log_level = getattr(logging, config.log_level.upper(), logging.INFO) logging.basicConfig(level=log_level, format="%(asctime)s [%(levelname)s] %(message)s", filename=config.log_file if config.log_file else None) if not config.log_file: # If no log file is specified, output logs to the console logging.getLogger().addHandler(logging.StreamHandler()) logging.info("Starting VMware ESXi Management MCP Server...") # Create VMware Manager instance and connect manager = VMwareManager(config) # If an API key is configured, prompt that authentication is required before invoking sensitive operations if config.api_key: logging.info("API key authentication is enabled. Clients must call the authenticate tool to verify the key before invoking sensitive operations") # Start ASGI server to listen for MCP SSE connections if __name__ == "__main__": # Start ASGI application using the built-in uvicorn server (listening on 0.0.0.0:8080) import uvicorn uvicorn.run(app, host="0.0.0.0", port=8080)