Skip to main content
Glama

BIG-IP MCP Server

by arunhotra
server.py64.8 kB
""" BIG-IP MCP Server An MCP server for managing F5 BIG-IP devices. Provides tools to authenticate and query virtual servers from BIG-IP load balancers. """ from fastmcp import FastMCP, Context from pydantic import BaseModel, Field from typing import Optional, List, Dict import httpx from datetime import datetime, timedelta import json import os import asyncio import re from pathlib import Path # Initialize the FastMCP server mcp = FastMCP("BIG-IP MCP Server") # Configuration file path CONFIG_FILE = Path(__file__).parent / "bigip_config.json" # ============================================================================ # PYDANTIC MODELS # ============================================================================ class BIGIPDevice(BaseModel): """BIG-IP device configuration""" ip_address: str = Field(description="BIG-IP management IP address or hostname") username: str = Field(description="BIG-IP username") password: str = Field(description="BIG-IP password") verify_ssl: bool = Field(default=False, description="Verify SSL certificates (use False for self-signed certs)") description: Optional[str] = Field(default=None, description="Device description") class BIGIPCredentials(BaseModel): """Credentials for connecting to BIG-IP""" ip_address: str = Field(description="BIG-IP management IP address or hostname") username: str = Field(description="BIG-IP username") password: str = Field(description="BIG-IP password") verify_ssl: bool = Field(default=False, description="Verify SSL certificates (use False for self-signed certs)") class AuthToken(BaseModel): """Authentication token with expiry""" token: str expires_at: datetime class VirtualServer(BaseModel): """BIG-IP Virtual Server information""" name: str full_path: str destination: str pool: Optional[str] = None enabled: bool availability_status: str description: Optional[str] = None # ============================================================================ # CONFIGURATION MANAGEMENT # ============================================================================ def load_bigip_config() -> Dict[str, BIGIPDevice]: """ Load BIG-IP device configurations from bigip_config.json Returns: Dictionary mapping device names to BIGIPDevice objects Raises: Exception: If config file doesn't exist or is invalid """ if not CONFIG_FILE.exists(): raise Exception( f"Configuration file not found: {CONFIG_FILE}\n" f"Please create bigip_config.json from bigip_config.example.json" ) try: with open(CONFIG_FILE, 'r') as f: config_data = json.load(f) devices = {} for name, device_config in config_data.get("devices", {}).items(): devices[name] = BIGIPDevice(**device_config) if not devices: raise Exception("No devices found in configuration file") return devices except json.JSONDecodeError as e: raise Exception(f"Invalid JSON in configuration file: {str(e)}") except Exception as e: raise Exception(f"Error loading configuration: {str(e)}") def get_device_by_name(device_name: str) -> BIGIPDevice: """ Get a BIG-IP device configuration by name Args: device_name: Name of the device in the config file Returns: BIGIPDevice configuration Raises: Exception: If device not found """ devices = load_bigip_config() if device_name not in devices: available = ", ".join(devices.keys()) raise Exception( f"Device '{device_name}' not found in configuration.\n" f"Available devices: {available}" ) return devices[device_name] # ============================================================================ # AUTHENTICATION & API HELPERS # ============================================================================ async def authenticate_bigip(credentials: BIGIPCredentials) -> AuthToken: """ Authenticate to BIG-IP and obtain an auth token. Args: credentials: BIG-IP connection credentials Returns: AuthToken with token string and expiry time Raises: Exception: If authentication fails """ url = f"https://{credentials.ip_address}/mgmt/shared/authn/login" payload = { "username": credentials.username, "password": credentials.password, "loginProviderName": "tmos" } async with httpx.AsyncClient(verify=credentials.verify_ssl) as client: try: response = await client.post( url, json=payload, headers={"Content-Type": "application/json"}, timeout=30.0 ) response.raise_for_status() data = response.json() token = data["token"]["token"] # Default token lifetime is 1200 seconds (20 minutes) # Set expiry to 19 minutes to be safe expires_at = datetime.now() + timedelta(seconds=1140) return AuthToken(token=token, expires_at=expires_at) except httpx.HTTPStatusError as e: raise Exception(f"BIG-IP authentication failed: {e.response.status_code} - {e.response.text}") except Exception as e: raise Exception(f"Failed to connect to BIG-IP at {credentials.ip_address}: {str(e)}") async def get_virtual_servers( ip_address: str, token: str, verify_ssl: bool = False ) -> List[VirtualServer]: """ Retrieve all virtual servers from BIG-IP. Args: ip_address: BIG-IP management IP token: Authentication token verify_ssl: Whether to verify SSL certificates Returns: List of VirtualServer objects Raises: Exception: If API call fails """ url = f"https://{ip_address}/mgmt/tm/ltm/virtual" headers = { "X-F5-Auth-Token": token, "Content-Type": "application/json" } async with httpx.AsyncClient(verify=verify_ssl) as client: try: response = await client.get(url, headers=headers, timeout=30.0) response.raise_for_status() data = response.json() virtual_servers = [] for item in data.get("items", []): # Extract pool name from pool reference (format: /Common/pool_name) pool_ref = item.get("pool", "") pool_name = pool_ref.split("/")[-1] if pool_ref else None vs = VirtualServer( name=item.get("name", ""), full_path=item.get("fullPath", ""), destination=item.get("destination", ""), pool=pool_name, enabled=item.get("enabled", False), availability_status=item.get("status", {}).get("availabilityState", "unknown"), description=item.get("description") ) virtual_servers.append(vs) return virtual_servers except httpx.HTTPStatusError as e: raise Exception(f"Failed to retrieve virtual servers: {e.response.status_code} - {e.response.text}") except Exception as e: raise Exception(f"Error querying BIG-IP: {str(e)}") # ============================================================================ # AS3 MANAGEMENT HELPERS # ============================================================================ async def get_latest_as3_release() -> Dict: """ Fetch the latest AS3 release information from GitHub API. Returns: Dictionary with: version, rpm_url, rpm_filename, sha256 Raises: Exception: If GitHub API request fails """ github_api_url = "https://api.github.com/repos/F5Networks/f5-appsvcs-extension/releases/latest" async with httpx.AsyncClient() as client: try: response = await client.get(github_api_url, timeout=30.0) response.raise_for_status() data = response.json() version = data.get("tag_name", "").replace("v", "") # Remove 'v' prefix # Find the RPM asset rpm_asset = None sha256_asset = None for asset in data.get("assets", []): name = asset.get("name", "") if name.endswith(".rpm") and "f5-appsvcs" in name: rpm_asset = asset elif name.endswith(".rpm.sha256"): sha256_asset = asset if not rpm_asset: raise Exception("Could not find AS3 RPM in latest GitHub release") return { "version": version, "rpm_url": rpm_asset.get("browser_download_url"), "rpm_filename": rpm_asset.get("name"), "sha256_url": sha256_asset.get("browser_download_url") if sha256_asset else None } except httpx.HTTPStatusError as e: raise Exception(f"GitHub API request failed: {e.response.status_code} - {e.response.text}") except Exception as e: raise Exception(f"Failed to fetch latest AS3 release: {str(e)}") async def check_as3_installed( ip_address: str, token: str, verify_ssl: bool = False ) -> Optional[str]: """ Check if AS3 is installed on BIG-IP and return the version. Args: ip_address: BIG-IP management IP token: Authentication token verify_ssl: Whether to verify SSL certificates Returns: Version string if installed, None if not installed Raises: Exception: If API call fails (other than 404) """ url = f"https://{ip_address}/mgmt/shared/appsvcs/info" headers = { "X-F5-Auth-Token": token, "Content-Type": "application/json" } async with httpx.AsyncClient(verify=verify_ssl) as client: try: response = await client.get(url, headers=headers, timeout=30.0) if response.status_code == 404: return None # AS3 not installed response.raise_for_status() data = response.json() # AS3 returns version in the response return data.get("version", "unknown") except httpx.HTTPStatusError as e: if e.response.status_code == 404: return None raise Exception(f"Failed to check AS3 status: {e.response.status_code} - {e.response.text}") except Exception as e: raise Exception(f"Error checking AS3 installation: {str(e)}") async def download_as3_rpm(rpm_url: str, rpm_filename: str) -> bytes: """ Download AS3 RPM from GitHub. Args: rpm_url: Download URL for the RPM rpm_filename: Filename for logging purposes Returns: RPM file content as bytes Raises: Exception: If download fails """ async with httpx.AsyncClient(follow_redirects=True) as client: try: response = await client.get(rpm_url, timeout=300.0) # 5 minute timeout for large file response.raise_for_status() return response.content except httpx.HTTPStatusError as e: raise Exception(f"Failed to download AS3 RPM: {e.response.status_code}") except Exception as e: raise Exception(f"Error downloading AS3 RPM from GitHub: {str(e)}") async def upload_as3_rpm( ip_address: str, token: str, rpm_filename: str, rpm_content: bytes, verify_ssl: bool = False ) -> bool: """ Upload AS3 RPM to BIG-IP. Args: ip_address: BIG-IP management IP token: Authentication token rpm_filename: Name of the RPM file rpm_content: RPM file content as bytes verify_ssl: Whether to verify SSL certificates Returns: True if upload successful Raises: Exception: If upload fails """ url = f"https://{ip_address}/mgmt/shared/file-transfer/uploads/{rpm_filename}" headers = { "X-F5-Auth-Token": token, "Content-Type": "application/octet-stream", "Content-Range": f"0-{len(rpm_content)-1}/{len(rpm_content)}" } async with httpx.AsyncClient(verify=verify_ssl) as client: try: response = await client.post( url, content=rpm_content, headers=headers, timeout=300.0 # 5 minute timeout for large file upload ) response.raise_for_status() return True except httpx.HTTPStatusError as e: if e.response.status_code == 401: raise Exception("Authentication failed. AS3 installation requires admin account privileges.") elif e.response.status_code == 403: raise Exception("Permission denied. AS3 installation requires admin account (not just administrator role).") raise Exception(f"Failed to upload AS3 RPM: {e.response.status_code} - {e.response.text}") except Exception as e: raise Exception(f"Error uploading AS3 RPM to BIG-IP: {str(e)}") async def install_as3_package( ip_address: str, token: str, rpm_filename: str, verify_ssl: bool = False ) -> str: """ Install AS3 package on BIG-IP and wait for completion. Args: ip_address: BIG-IP management IP token: Authentication token rpm_filename: Name of the uploaded RPM file verify_ssl: Whether to verify SSL certificates Returns: Success message with installation status Raises: Exception: If installation fails or times out """ url = f"https://{ip_address}/mgmt/shared/iapp/package-management-tasks" headers = { "X-F5-Auth-Token": token, "Content-Type": "application/json" } payload = { "operation": "INSTALL", "packageFilePath": f"/var/config/rest/downloads/{rpm_filename}" } async with httpx.AsyncClient(verify=verify_ssl) as client: try: # Start installation task response = await client.post(url, json=payload, headers=headers, timeout=30.0) response.raise_for_status() task_data = response.json() task_id = task_data.get("id") if not task_id: raise Exception("No task ID returned from installation request") # Poll for task completion (max 5 minutes) task_url = f"{url}/{task_id}" max_attempts = 60 # 5 minutes with 5-second intervals attempt = 0 while attempt < max_attempts: await asyncio.sleep(5) # Wait 5 seconds between checks status_response = await client.get(task_url, headers=headers, timeout=30.0) status_response.raise_for_status() status_data = status_response.json() status = status_data.get("status") if status == "FINISHED": return "AS3 installation completed successfully" elif status == "FAILED": error_msg = status_data.get("errorMessage", "Unknown error") raise Exception(f"AS3 installation failed: {error_msg}") attempt += 1 raise Exception("AS3 installation timed out after 5 minutes") except httpx.HTTPStatusError as e: if e.response.status_code == 401: raise Exception("Authentication failed. AS3 installation requires admin account privileges.") elif e.response.status_code == 403: raise Exception("Permission denied. AS3 installation requires admin account (not just administrator role).") raise Exception(f"Failed to install AS3 package: {e.response.status_code} - {e.response.text}") except Exception as e: raise Exception(f"Error installing AS3 package: {str(e)}") # ============================================================================ # AS3 APPLICATION CREATION HELPERS # ============================================================================ def load_as3_template(template_name: str) -> Dict: """ Load AS3 template from templates directory. Args: template_name: Name of the template (without .json extension) Returns: Template as Python dictionary Raises: Exception: If template file not found or invalid JSON """ template_path = Path(__file__).parent / "templates" / f"{template_name}.json" if not template_path.exists(): # List available templates templates_dir = Path(__file__).parent / "templates" if templates_dir.exists(): available = [f.stem for f in templates_dir.glob("*.json")] if available: raise Exception( f"Template '{template_name}' not found. " f"Available templates: {', '.join(available)}" ) raise Exception( f"Template '{template_name}' not found. " f"No templates directory found at {templates_dir}" ) try: with open(template_path, 'r') as f: return json.load(f) except json.JSONDecodeError as e: raise Exception(f"Invalid JSON in template '{template_name}': {str(e)}") except Exception as e: raise Exception(f"Error loading template '{template_name}': {str(e)}") def render_as3_template(template: Dict, variables: Dict) -> Dict: """ Render AS3 template by replacing variable placeholders. Variable format: {{VARIABLE_NAME}} Special handling for arrays: {{POOL_MEMBERS}} expects a list and converts to JSON array Args: template: Template dictionary with {{VARIABLE}} placeholders variables: Dictionary of variable names to values Returns: Rendered template with all variables substituted Raises: Exception: If required variables are missing """ # Convert template to JSON string for easy substitution template_str = json.dumps(template, indent=2) # Track which variables were found in template found_vars = set() # Replace each variable for var_name, var_value in variables.items(): placeholder = f"{{{{{var_name}}}}}" if placeholder in template_str: found_vars.add(var_name) # Special handling for list/array variables if isinstance(var_value, list): # Convert list to JSON array string var_value_str = json.dumps(var_value) else: var_value_str = str(var_value) template_str = template_str.replace(placeholder, var_value_str) # Check for any remaining unreplaced variables import re remaining_vars = re.findall(r'\{\{([A-Z_]+)\}\}', template_str) if remaining_vars: raise Exception( f"Template has unreplaced variables: {', '.join(set(remaining_vars))}. " f"Provided variables: {', '.join(variables.keys())}" ) # Convert back to dictionary try: return json.loads(template_str) except json.JSONDecodeError as e: raise Exception(f"Error rendering template (invalid JSON after substitution): {str(e)}") def validate_as3_declaration(declaration: Dict) -> tuple[bool, str]: """ Validate AS3 declaration structure and content. Args: declaration: AS3 declaration dictionary Returns: Tuple of (is_valid, error_message) If valid: (True, "") If invalid: (False, "error description") """ # Check required top-level fields if "class" not in declaration or declaration["class"] != "AS3": return False, "Declaration must have 'class': 'AS3'" if "declaration" not in declaration: return False, "Declaration must have 'declaration' property" decl = declaration["declaration"] if "class" not in decl or decl["class"] != "ADC": return False, "Declaration.class must be 'ADC'" # Validate IP addresses in virtualAddresses import re ip_pattern = re.compile(r'^(\d{1,3}\.){3}\d{1,3}$') def validate_ip(ip_str): """Validate IP address format""" if not ip_pattern.match(ip_str): return False octets = ip_str.split('.') return all(0 <= int(octet) <= 255 for octet in octets) def check_virtual_addresses(obj, path=""): """Recursively find and validate virtualAddresses""" if isinstance(obj, dict): if "virtualAddresses" in obj: addrs = obj["virtualAddresses"] if not isinstance(addrs, list): return False, f"virtualAddresses at {path} must be a list" for addr in addrs: if not validate_ip(addr): return False, f"Invalid IP address '{addr}' at {path}.virtualAddresses" for key, value in obj.items(): is_valid, msg = check_virtual_addresses(value, f"{path}.{key}") if not is_valid: return is_valid, msg elif isinstance(obj, list): for i, item in enumerate(obj): is_valid, msg = check_virtual_addresses(item, f"{path}[{i}]") if not is_valid: return is_valid, msg return True, "" # Validate IPs in declaration is_valid, error_msg = check_virtual_addresses(decl) if not is_valid: return False, error_msg return True, "" async def deploy_as3_declaration( ip_address: str, token: str, declaration: Dict, verify_ssl: bool = False ) -> str: """ Deploy AS3 declaration to BIG-IP. Args: ip_address: BIG-IP management IP token: Authentication token declaration: AS3 declaration dictionary verify_ssl: Whether to verify SSL certificates Returns: Success message with deployment details Raises: Exception: If deployment fails or times out """ url = f"https://{ip_address}/mgmt/shared/appsvcs/declare" headers = { "X-F5-Auth-Token": token, "Content-Type": "application/json" } async with httpx.AsyncClient(verify=verify_ssl) as client: try: # Deploy declaration response = await client.post( url, json=declaration, headers=headers, timeout=60.0 # Initial POST timeout ) response.raise_for_status() result = response.json() # AS3 returns immediate result or task ID # Check if deployment is synchronous or asynchronous if result.get("results"): # Synchronous response results = result["results"] # Check for errors in results for tenant_result in results: if isinstance(tenant_result, dict): if tenant_result.get("code") not in [200, 201]: message = tenant_result.get("message", "Unknown error") return f"AS3 deployment failed: {message}" return "AS3 declaration deployed successfully" # If we get a task ID, poll for completion task_id = result.get("id") if task_id: # Poll task endpoint task_url = f"https://{ip_address}/mgmt/shared/appsvcs/task/{task_id}" max_attempts = 120 # 10 minutes with 5-second intervals attempt = 0 while attempt < max_attempts: await asyncio.sleep(5) status_response = await client.get(task_url, headers=headers, timeout=30.0) status_response.raise_for_status() status_data = status_response.json() # Check if task is complete results = status_data.get("results", []) if results: # Check for errors for tenant_result in results: if isinstance(tenant_result, dict): code = tenant_result.get("code", 0) if code not in [200, 201, 202]: message = tenant_result.get("message", "Unknown error") return f"AS3 deployment failed: {message}" return "AS3 declaration deployed successfully" attempt += 1 raise Exception("AS3 deployment timed out after 10 minutes") # No task ID and no immediate results return "AS3 declaration submitted (status unknown)" except httpx.HTTPStatusError as e: error_text = e.response.text try: error_json = json.loads(error_text) error_msg = error_json.get("message", error_text) except: error_msg = error_text raise Exception(f"AS3 deployment failed: {e.response.status_code} - {error_msg}") except Exception as e: raise Exception(f"Error deploying AS3 declaration: {str(e)}") # ============================================================================ # PHPIPAM INTEGRATION HELPERS # ============================================================================ async def reserve_vip_from_phpipam(subnet_id: str, hostname: str) -> str: """ Reserve first available IP from phpIPAM subnet for virtual server. Note: This function assumes phpIPAM MCP server is available and will be called by Claude. It returns instructions for Claude to call the phpIPAM tool. Args: subnet_id: phpIPAM subnet ID hostname: Hostname for the IP reservation Returns: Instruction string for Claude to call phpIPAM Raises: Exception: This function should not be called directly - it's a placeholder """ raise Exception( "This function is a placeholder. Claude should call the phpIPAM MCP server's " f"reserve_ip_address tool with subnet_id='{subnet_id}' and hostname='{hostname}'" ) async def get_reserved_ips_from_phpipam(subnet_id: str) -> Dict: """ Get all reserved IP addresses from phpIPAM subnet. Note: This function assumes phpIPAM MCP server is available and will be called by Claude. It returns instructions for Claude to call the phpIPAM tool. Args: subnet_id: phpIPAM subnet ID Returns: Instruction string for Claude to call phpIPAM Raises: Exception: This function should not be called directly - it's a placeholder """ raise Exception( "This function is a placeholder. Claude should call the phpIPAM MCP server's " f"get_subnet_details tool with subnet_id='{subnet_id}' and include_addresses=True" ) def parse_pool_selection(selection: str, available_ips: List[Dict]) -> List[str]: """ Parse user's pool member selection from text input. Supports formats: - Numbers: "1 and 2", "1, 2", "1,2" - IPs: "172.16.100.10 and 172.16.100.11" - Hostnames: "web-mkt-01 and web-mkt-02" - Mixed: "1 and 172.16.100.11" Args: selection: User's selection text available_ips: List of available IP dictionaries from phpIPAM Format: [{"ip": "172.16.100.10", "hostname": "web-mkt-01"}, ...] Returns: List of selected IP addresses Raises: Exception: If selection cannot be parsed or IPs not found """ if not selection or not selection.strip(): raise Exception("No pool member selection provided") selection = selection.lower().strip() selected_ips = [] # Extract numbers (for numbered selections like "1 and 2") numbers = re.findall(r'\d+', selection) # Extract IP addresses ip_pattern = r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b' ips_in_selection = re.findall(ip_pattern, selection) # Build mapping of index -> IP and hostname -> IP index_to_ip = {} hostname_to_ip = {} all_ips = [] for idx, ip_info in enumerate(available_ips, start=1): ip = ip_info.get('ip', '') hostname = ip_info.get('hostname', '').lower() index_to_ip[str(idx)] = ip if hostname: hostname_to_ip[hostname] = ip all_ips.append(ip) # Process numbers (highest priority) for num in numbers: if num in index_to_ip: ip = index_to_ip[num] if ip not in selected_ips: selected_ips.append(ip) # Process explicit IPs for ip in ips_in_selection: if ip in all_ips and ip not in selected_ips: selected_ips.append(ip) # Process hostnames (if no numbers or IPs matched) if not selected_ips: for hostname, ip in hostname_to_ip.items(): if hostname in selection and ip not in selected_ips: selected_ips.append(ip) if not selected_ips: raise Exception( f"Could not parse selection '{selection}'. " f"Please provide numbers (e.g., '1 and 2'), " f"IPs (e.g., '172.16.100.10 and 172.16.100.11'), " f"or hostnames" ) return selected_ips def format_pool_selection_prompt(subnet_info: Dict, reserved_ips: List[Dict]) -> str: """ Format the pool member selection prompt for user. Args: subnet_info: Subnet information dict reserved_ips: List of reserved IP dicts from phpIPAM Returns: Formatted string prompting user to select pool members """ if not reserved_ips: return ( f"❌ No reserved IP addresses found in subnet {subnet_info.get('subnet', 'unknown')}. " "Please reserve some IPs first using phpIPAM before creating the application." ) output = [ f"## Available Web Servers\n", f"**Subnet:** {subnet_info.get('subnet', 'unknown')}/{subnet_info.get('mask', '?')}", f"**Description:** {subnet_info.get('description', 'N/A')}\n", f"Found {len(reserved_ips)} reserved IP(s):\n" ] for idx, ip_info in enumerate(reserved_ips, start=1): ip = ip_info.get('ip', 'unknown') hostname = ip_info.get('hostname', 'N/A') description = ip_info.get('description', '') line = f"{idx}. **{ip}** - {hostname}" if description: line += f" ({description[:50]})" output.append(line) output.append("\n**Next Step:** Please specify which servers to include in the pool.") output.append("Examples:") output.append("- By number: `Use 1 and 2`") output.append("- By IP: `Use 172.16.100.10 and 172.16.100.11`") output.append("- By hostname: `Use web-mkt-01 and web-mkt-02`") return "\n".join(output) # ============================================================================ # MCP TOOLS # ============================================================================ @mcp.tool async def list_virtual_servers( device_name: str, ctx: Context = None ) -> str: """ List all virtual servers configured on the BIG-IP device. Args: device_name: Name of the BIG-IP device from configuration file (e.g., 'prod-lb-01', 'lab-bigip') Returns: A formatted list of virtual servers with their status and configuration """ if ctx: await ctx.info(f"Starting BIG-IP virtual server query for device: {device_name}") # Load device configuration try: device = get_device_by_name(device_name) if ctx: await ctx.info(f"Loaded configuration for {device_name} ({device.ip_address})") except Exception as e: return f"❌ Configuration error: {str(e)}" # Create credentials object from config credentials = BIGIPCredentials( ip_address=device.ip_address, username=device.username, password=device.password, verify_ssl=device.verify_ssl ) # Check if we have a cached valid token for this device cached_token = None if ctx: cached_token = ctx.get_state(f"bigip_token_{device_name}") token_obj = None # Check if cached token is still valid if cached_token: token_obj = AuthToken(**cached_token) if token_obj.expires_at > datetime.now(): if ctx: await ctx.info("Using cached authentication token") else: if ctx: await ctx.info("Cached token expired, will re-authenticate") token_obj = None # Authenticate if we don't have a valid token if not token_obj: if ctx: await ctx.info(f"Authenticating to BIG-IP at {credentials.ip_address}") try: token_obj = await authenticate_bigip(credentials) if ctx: ctx.set_state(f"bigip_token_{device_name}", token_obj.model_dump()) await ctx.info("Successfully authenticated to BIG-IP") except Exception as e: if ctx: await ctx.error(f"Authentication failed: {str(e)}") return f"❌ Authentication failed: {str(e)}" # Query virtual servers if ctx: await ctx.info("Retrieving virtual servers from BIG-IP") try: virtual_servers = await get_virtual_servers( ip_address=credentials.ip_address, token=token_obj.token, verify_ssl=credentials.verify_ssl ) if ctx: await ctx.info(f"Found {len(virtual_servers)} virtual servers") # Format the response as a table if not virtual_servers: return "No virtual servers found on this BIG-IP device." # Helper function to parse destination into IP and Port def parse_destination(dest: str): """Parse destination format: /Common/10.1.1.100:80 or 10.1.1.100:80""" if not dest: return "N/A", "N/A" # Remove partition prefix if present dest_clean = dest.split("/")[-1] # Split IP and port if ":" in dest_clean: ip, port = dest_clean.rsplit(":", 1) return ip, port return dest_clean, "N/A" # Build table header output = [ f"## Virtual Servers on {credentials.ip_address} ({len(virtual_servers)} total)\n", "| Status | Name | Destination IP | Port | Pool |", "|--------|------|----------------|------|------|" ] # Build table rows for vs in virtual_servers: # Status indicator if vs.availability_status == "available": status = "🟢" # Green circle elif vs.availability_status == "offline": status = "🔴" # Red circle elif vs.availability_status == "unknown": status = "⚪" # White circle else: status = "🟡" # Yellow circle for other states # Parse destination dest_ip, dest_port = parse_destination(vs.destination) # Pool name or "None" pool_name = vs.pool if vs.pool else "None" # Add row output.append(f"| {status} | {vs.name} | {dest_ip} | {dest_port} | {pool_name} |") return "\n".join(output) except Exception as e: if ctx: await ctx.error(f"Failed to retrieve virtual servers: {str(e)}") return f"❌ Failed to retrieve virtual servers: {str(e)}" @mcp.tool async def list_bigip_devices(ctx: Context = None) -> str: """ List all configured BIG-IP devices from the configuration file. Returns: A formatted list of available BIG-IP devices with their details """ if ctx: await ctx.info("Loading BIG-IP device configurations") try: devices = load_bigip_config() if not devices: return "No BIG-IP devices configured. Please add devices to bigip_config.json" output = [f"Found {len(devices)} configured BIG-IP device(s):\n"] for name, device in devices.items(): output.append(f"\n**{name}**") output.append(f" - IP Address: {device.ip_address}") output.append(f" - Username: {device.username}") output.append(f" - SSL Verification: {'enabled' if device.verify_ssl else 'disabled'}") if device.description: output.append(f" - Description: {device.description}") output.append("\n**Usage:** Use device name with list_virtual_servers tool") output.append("Example: list_virtual_servers(device_name='prod-lb-01')") return "\n".join(output) except Exception as e: if ctx: await ctx.error(f"Failed to load devices: {str(e)}") return f"❌ Error loading devices: {str(e)}" @mcp.tool async def manage_as3( device_name: str, action: str = "check", auto_install: bool = False, ctx: Context = None ) -> str: """ Check, install, or upgrade F5 AS3 (Application Services 3 Extension) on BIG-IP. Args: device_name: Name of the BIG-IP device from configuration file action: Action to perform - "check" (default), "install", or "upgrade" auto_install: Set to True to proceed with installation/upgrade (default: False) Returns: Status message with AS3 version information and installation results """ if ctx: await ctx.info(f"Starting AS3 management for device: {device_name}") # Validate action parameter if action not in ["check", "install", "upgrade"]: return f"❌ Invalid action '{action}'. Must be 'check', 'install', or 'upgrade'." # Load device configuration try: device = get_device_by_name(device_name) if ctx: await ctx.info(f"Loaded configuration for {device_name} ({device.ip_address})") except Exception as e: return f"❌ Configuration error: {str(e)}" # Create credentials object from config credentials = BIGIPCredentials( ip_address=device.ip_address, username=device.username, password=device.password, verify_ssl=device.verify_ssl ) # Check if we have a cached valid token for this device cached_token = None if ctx: cached_token = ctx.get_state(f"bigip_token_{device_name}") token_obj = None # Check if cached token is still valid if cached_token: token_obj = AuthToken(**cached_token) if token_obj.expires_at > datetime.now(): if ctx: await ctx.info("Using cached authentication token") else: if ctx: await ctx.info("Cached token expired, will re-authenticate") token_obj = None # Authenticate if we don't have a valid token if not token_obj: if ctx: await ctx.info(f"Authenticating to BIG-IP at {credentials.ip_address}") try: token_obj = await authenticate_bigip(credentials) if ctx: ctx.set_state(f"bigip_token_{device_name}", token_obj.model_dump()) await ctx.info("Successfully authenticated to BIG-IP") except Exception as e: if ctx: await ctx.error(f"Authentication failed: {str(e)}") return f"❌ Authentication failed: {str(e)}" # Check current AS3 installation status if ctx: await ctx.info("Checking AS3 installation status") try: installed_version = await check_as3_installed( ip_address=credentials.ip_address, token=token_obj.token, verify_ssl=credentials.verify_ssl ) if ctx: if installed_version: await ctx.info(f"AS3 version {installed_version} is currently installed") else: await ctx.info("AS3 is not currently installed") except Exception as e: if ctx: await ctx.error(f"Failed to check AS3 status: {str(e)}") return f"❌ Failed to check AS3 status: {str(e)}" # Fetch latest AS3 version from GitHub if ctx: await ctx.info("Fetching latest AS3 release information from GitHub") try: latest_release = await get_latest_as3_release() latest_version = latest_release["version"] if ctx: await ctx.info(f"Latest AS3 version available: {latest_version}") except Exception as e: if ctx: await ctx.error(f"Failed to fetch latest AS3 release: {str(e)}") return f"❌ Failed to fetch latest AS3 release: {str(e)}" # Handle "check" action if action == "check": output = [f"## AS3 Status on {device_name} ({credentials.ip_address})\n"] if installed_version: output.append(f"**Installed Version:** {installed_version}") else: output.append("**Installed Version:** Not installed") output.append(f"**Latest Available:** {latest_version}") # Determine recommendation if not installed_version: output.append("\n**Recommendation:** AS3 is not installed.") output.append(f"To install: `manage_as3(device_name='{device_name}', action='install', auto_install=True)`") elif installed_version != latest_version: output.append(f"\n**Recommendation:** Newer version available ({latest_version}).") output.append(f"To upgrade: `manage_as3(device_name='{device_name}', action='upgrade', auto_install=True)`") else: output.append("\n✅ **Status:** AS3 is up to date!") return "\n".join(output) # Handle "install" action if action == "install": if installed_version: return (f"ℹ️ AS3 version {installed_version} is already installed on {device_name}. " f"Use action='upgrade' to update to version {latest_version}.") if not auto_install: return (f"⚠️ AS3 is not installed. Latest version: {latest_version}\n\n" f"To proceed with installation, run:\n" f"`manage_as3(device_name='{device_name}', action='install', auto_install=True)`\n\n" f"**Note:** AS3 installation requires admin account privileges.") # Proceed with installation if ctx: await ctx.info(f"Starting AS3 {latest_version} installation") try: # Download RPM if ctx: await ctx.info(f"Downloading AS3 RPM from GitHub: {latest_release['rpm_filename']}") rpm_content = await download_as3_rpm( rpm_url=latest_release["rpm_url"], rpm_filename=latest_release["rpm_filename"] ) if ctx: await ctx.info(f"Downloaded {len(rpm_content)} bytes") # Upload RPM to BIG-IP if ctx: await ctx.info(f"Uploading RPM to BIG-IP at {credentials.ip_address}") await upload_as3_rpm( ip_address=credentials.ip_address, token=token_obj.token, rpm_filename=latest_release["rpm_filename"], rpm_content=rpm_content, verify_ssl=credentials.verify_ssl ) if ctx: await ctx.info("RPM upload completed successfully") # Install package if ctx: await ctx.info("Starting AS3 package installation (this may take a few minutes)") install_result = await install_as3_package( ip_address=credentials.ip_address, token=token_obj.token, rpm_filename=latest_release["rpm_filename"], verify_ssl=credentials.verify_ssl ) if ctx: await ctx.info(install_result) # Verify installation if ctx: await ctx.info("Verifying AS3 installation") # Wait a moment for AS3 to fully initialize await asyncio.sleep(5) verified_version = await check_as3_installed( ip_address=credentials.ip_address, token=token_obj.token, verify_ssl=credentials.verify_ssl ) if verified_version: return (f"✅ **AS3 Installation Successful!**\n\n" f"Device: {device_name} ({credentials.ip_address})\n" f"Installed Version: {verified_version}\n" f"Status: AS3 is now ready to use") else: return (f"⚠️ Installation completed but AS3 verification failed. " f"The package may still be initializing. Please wait a moment and check again.") except Exception as e: if ctx: await ctx.error(f"AS3 installation failed: {str(e)}") return f"❌ AS3 installation failed: {str(e)}" # Handle "upgrade" action if action == "upgrade": if not installed_version: return (f"ℹ️ AS3 is not currently installed on {device_name}. " f"Use action='install' to install version {latest_version}.") if installed_version == latest_version: return f"✅ AS3 is already at the latest version ({latest_version}) on {device_name}." if not auto_install: return (f"⚠️ AS3 upgrade available: {installed_version} → {latest_version}\n\n" f"To proceed with upgrade, run:\n" f"`manage_as3(device_name='{device_name}', action='upgrade', auto_install=True)`\n\n" f"**Note:** AS3 installation requires admin account privileges.") # Proceed with upgrade (same process as install) if ctx: await ctx.info(f"Starting AS3 upgrade from {installed_version} to {latest_version}") try: # Download RPM if ctx: await ctx.info(f"Downloading AS3 RPM from GitHub: {latest_release['rpm_filename']}") rpm_content = await download_as3_rpm( rpm_url=latest_release["rpm_url"], rpm_filename=latest_release["rpm_filename"] ) if ctx: await ctx.info(f"Downloaded {len(rpm_content)} bytes") # Upload RPM to BIG-IP if ctx: await ctx.info(f"Uploading RPM to BIG-IP at {credentials.ip_address}") await upload_as3_rpm( ip_address=credentials.ip_address, token=token_obj.token, rpm_filename=latest_release["rpm_filename"], rpm_content=rpm_content, verify_ssl=credentials.verify_ssl ) if ctx: await ctx.info("RPM upload completed successfully") # Install package if ctx: await ctx.info("Starting AS3 package upgrade (this may take a few minutes)") install_result = await install_as3_package( ip_address=credentials.ip_address, token=token_obj.token, rpm_filename=latest_release["rpm_filename"], verify_ssl=credentials.verify_ssl ) if ctx: await ctx.info(install_result) # Verify installation if ctx: await ctx.info("Verifying AS3 upgrade") # Wait a moment for AS3 to fully initialize await asyncio.sleep(5) verified_version = await check_as3_installed( ip_address=credentials.ip_address, token=token_obj.token, verify_ssl=credentials.verify_ssl ) if verified_version == latest_version: return (f"✅ **AS3 Upgrade Successful!**\n\n" f"Device: {device_name} ({credentials.ip_address})\n" f"Previous Version: {installed_version}\n" f"New Version: {verified_version}\n" f"Status: AS3 is now ready to use") else: return (f"⚠️ Upgrade completed but verification returned version {verified_version}. " f"The package may still be initializing. Please wait a moment and check again.") except Exception as e: if ctx: await ctx.error(f"AS3 upgrade failed: {str(e)}") return f"❌ AS3 upgrade failed: {str(e)}" @mcp.tool async def create_as3_app( device_name: str, section_name: str, vip_subnet_id: str, pool_subnet_id: str, pool_member_selection: str = None, virtual_ip: str = None, pool_members: List[str] = None, app_name: str = None, template_name: str = None, auto_deploy: bool = False, ctx: Context = None ) -> str: """ Create AS3 application with IP allocation from phpIPAM. This tool demonstrates cross-MCP-server integration with phpIPAM MCP server. Two usage modes: Mode 1: Direct deployment (when IPs are already known) - Provide virtual_ip and pool_members directly - Set auto_deploy=True to deploy immediately Mode 2: Guided workflow (Claude orchestrates phpIPAM calls) - Claude calls phpIPAM to reserve VIP and get pool options - User selects pool members - Claude calls this tool with results Args: device_name: BIG-IP device from config file (e.g., 'lab-bigip') section_name: phpIPAM section name (e.g., 'marketing') vip_subnet_id: phpIPAM subnet ID for virtual server IP (e.g., '8') pool_subnet_id: phpIPAM subnet ID for pool member IPs (e.g., '7') pool_member_selection: User's selection of pool members (e.g., '1 and 2', '172.16.100.3 and 172.16.100.4') Leave empty for guided mode virtual_ip: Virtual server IP address (optional, for direct deployment) pool_members: List of pool member IPs (optional, for direct deployment) app_name: Application name (optional, defaults to '{section_name}_app') template_name: Template to use (optional, defaults to section_name) auto_deploy: Set to True to deploy without preview (default: False) Returns: - Pool selection prompt (guided mode without pool_member_selection) - Preview (if pool_member_selection provided or direct mode, auto_deploy=False) - Deployment status (if auto_deploy=True) Example - Direct deployment: create_as3_app( device_name='lab-bigip', section_name='marketing', vip_subnet_id='8', pool_subnet_id='7', virtual_ip='192.168.50.3', pool_members=['172.16.100.3', '172.16.100.4'], auto_deploy=True ) """ state_key = f"as3_app_{section_name}" if ctx: await ctx.info(f"AS3 app creation for section: {section_name}") # Determine mode: Direct (IPs provided) or Guided (orchestrated via Claude) if virtual_ip and pool_members: # Mode 1: Direct deployment with provided IPs if ctx: await ctx.info(f"Direct deployment mode - VIP: {virtual_ip}, Pool: {pool_members}") selected_ips = pool_members else: # Mode 2: Guided workflow with phpIPAM orchestration # Step 1: Handle VIP reservation (auto or from cache) app_state = ctx.get_state(state_key) if ctx else None if app_state and app_state.get("virtual_ip"): virtual_ip = app_state["virtual_ip"] if ctx: await ctx.info(f"Using cached VIP: {virtual_ip}") else: # Need to tell Claude to call phpIPAM to reserve VIP return f"""## Step 1: Reserve Virtual IP To proceed, please use the phpIPAM MCP server to reserve a virtual IP: **Action needed:** Call phpIPAM's `reserve_ip_address` tool with: - `subnet_id`: `{vip_subnet_id}` - `hostname`: `{section_name}-vip` Once you have the reserved IP, call this tool again with the same parameters. **Note:** I'll cache the VIP for subsequent calls.""" # Step 2: Handle pool member selection if not pool_member_selection: # Need to tell Claude to get subnet details from phpIPAM return f"""## Step 2: Show Pool Member Options **Virtual IP Reserved:** {virtual_ip} ✓ To see available pool members, please use the phpIPAM MCP server: **Action needed:** Call phpIPAM's `get_subnet_details` tool with: - `subnet_id`: `{pool_subnet_id}` - `include_addresses`: `True` This will show all reserved IPs in the web servers subnet. Then specify which servers to use. **Example selections:** - By number: "1 and 2" - By IP: "172.16.100.10 and 172.16.100.11" - By hostname: "web-mkt-01 and web-mkt-02" """ # Step 3: Parse pool selection (guided mode) # This requires Claude to have already shown the user the list # For now, we'll parse the selection text try: # Extract IPs from selection (support "1 and 2" format or IP format) # Simple parsing: look for IP addresses in the selection ip_pattern = r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b' selected_ips = re.findall(ip_pattern, pool_member_selection) if not selected_ips: # If no IPs found, assume user provided description like "1 and 2" # We need the actual IPs from phpIPAM context return f"""❌ Could not parse IP addresses from selection: '{pool_member_selection}' Please provide the actual IP addresses of the pool members you want to use. **Example:** "172.16.100.10 and 172.16.100.11" Or first show the available servers using phpIPAM's `get_subnet_details` tool.""" if len(selected_ips) < 2: return f"❌ Please select at least 2 pool members. Found only: {selected_ips}" selected_ips = selected_ips[:2] # Use first 2 IPs except Exception as e: return f"❌ Error parsing pool selection: {str(e)}" # Step 4: Load device config and authenticate try: device = get_device_by_name(device_name) if ctx: await ctx.info(f"Loaded BIG-IP config: {device_name}") except Exception as e: return f"❌ Configuration error: {str(e)}" credentials = BIGIPCredentials( ip_address=device.ip_address, username=device.username, password=device.password, verify_ssl=device.verify_ssl ) # Check for cached token cached_token = ctx.get_state(f"bigip_token_{device_name}") if ctx else None token_obj = None if cached_token: token_obj = AuthToken(**cached_token) if token_obj.expires_at <= datetime.now(): token_obj = None if not token_obj: try: token_obj = await authenticate_bigip(credentials) if ctx: ctx.set_state(f"bigip_token_{device_name}", token_obj.model_dump()) except Exception as e: return f"❌ Authentication failed: {str(e)}" # Step 5: Load and render template template_to_use = template_name if template_name else section_name try: template = load_as3_template(template_to_use) except Exception as e: return f"❌ Failed to load template: {str(e)}" app_name_final = app_name if app_name else f"{section_name}_app" variables = { "TENANT_NAME": section_name.title(), "APP_NAME": app_name_final, "VIRTUAL_IP": virtual_ip, } # Add individual pool member variables for i, member_ip in enumerate(selected_ips, start=1): variables[f"POOL_MEMBER_{i}"] = member_ip try: declaration = render_as3_template(template, variables) except Exception as e: return f"❌ Failed to render template: {str(e)}" # Validate declaration is_valid, error_msg = validate_as3_declaration(declaration) if not is_valid: return f"❌ Declaration validation failed: {error_msg}" # Step 6: Preview or deploy if not auto_deploy: pool_members_str = "\n - ".join([f"{ip}:80" for ip in selected_ips]) preview = f"""## AS3 Application Preview **Device:** {device_name} ({credentials.ip_address}) **Tenant:** {variables['TENANT_NAME']} **Application:** {variables['APP_NAME']} ### Configuration: - **Virtual Server IP:** {virtual_ip}:80 - **Pool Members:** - {pool_members_str} - **Load Balancing:** Round-robin - **Health Monitor:** HTTP ### Template: {template_to_use} **To deploy:** Call this tool again with `auto_deploy=True` """ return preview # Deploy if ctx: await ctx.info("Deploying AS3 declaration") try: result = await deploy_as3_declaration( ip_address=credentials.ip_address, token=token_obj.token, declaration=declaration, verify_ssl=credentials.verify_ssl ) # Clear cached state after successful deployment if ctx: ctx.set_state(state_key, None) pool_members_str = ", ".join(selected_ips) return f"""✅ **AS3 Application Deployed Successfully!** **Device:** {device_name} ({credentials.ip_address}) **Tenant:** {variables['TENANT_NAME']} **Application:** {variables['APP_NAME']} ### Configuration: - **Virtual Server:** {virtual_ip}:80 - **Pool Members:** {pool_members_str} - **Status:** Active and ready to receive traffic The application is now live and will distribute traffic across the pool members. """ except Exception as e: return f"❌ Deployment failed: {str(e)}" # ============================================================================ # RESOURCES # ============================================================================ @mcp.resource("config://server") def get_server_config() -> str: """Get server configuration information""" config = { "name": "BIG-IP MCP Server", "version": "1.0.0", "description": "MCP server for F5 BIG-IP management", "capabilities": [ "List virtual servers", "Token-based authentication", "Credential caching" ] } return json.dumps(config, indent=2) @mcp.resource("templates://available") def list_available_templates() -> str: """List all available AS3 application templates""" templates_dir = Path(__file__).parent / "templates" if not templates_dir.exists(): return json.dumps({ "templates": [], "message": "No templates directory found" }, indent=2) template_files = list(templates_dir.glob("*.json")) templates = [] for template_file in template_files: template_name = template_file.stem try: with open(template_file, 'r') as f: template_data = json.load(f) # Extract variables from template template_str = json.dumps(template_data) variables = re.findall(r'\{\{([A-Z_]+)\}\}', template_str) unique_vars = list(set(variables)) templates.append({ "name": template_name, "file": template_file.name, "variables": unique_vars, "description": f"AS3 template for {template_name} applications" }) except Exception as e: templates.append({ "name": template_name, "file": template_file.name, "error": f"Failed to load: {str(e)}" }) result = { "count": len(templates), "templates": templates } return json.dumps(result, indent=2) @mcp.resource("templates://{template_name}") def get_template_content(template_name: str) -> str: """Get the content of a specific AS3 template""" try: template = load_as3_template(template_name) # Extract variables template_str = json.dumps(template) variables = re.findall(r'\{\{([A-Z_]+)\}\}', template_str) unique_vars = list(set(variables)) result = { "name": template_name, "variables": unique_vars, "template": template } return json.dumps(result, indent=2) except Exception as e: return json.dumps({ "error": str(e) }, indent=2) # ============================================================================ # PROMPTS # ============================================================================ @mcp.prompt def help_prompt() -> str: """Get help about available BIG-IP server capabilities""" return """ This BIG-IP MCP server provides tools to manage F5 BIG-IP devices: **Available Tools:** - `list_virtual_servers`: Query all virtual servers from a BIG-IP device - Requires: IP address, username, and password - Caches authentication token for subsequent requests to the same device - Returns detailed status of each virtual server - Optional: verify_ssl parameter (default: False for self-signed certs) **Usage:** Ask Claude to "list virtual servers on my BIG-IP" and provide: - BIG-IP IP address or hostname - Username - Password **Security:** - Authentication tokens are cached per-device for efficiency - Tokens automatically expire after 19 minutes - SSL verification can be enabled for production environments """ @mcp.prompt def create_app_workflow() -> str: """Guide for creating AS3 applications with phpIPAM integration""" return """ # AS3 Application Creation Workflow This workflow demonstrates cross-MCP-server integration between phpIPAM and BIG-IP servers. ## Prerequisites - Both phpIPAM and BIG-IP MCP servers must be connected to Claude Desktop - AS3 must be installed on the BIG-IP device - phpIPAM must be configured with sections and subnets ## Step-by-Step Process ### 1. List Available Sections (phpIPAM) Ask Claude: "List all sections from phpIPAM" - This uses the phpIPAM server's `list_sections()` tool - Returns: Section IDs, names, and descriptions ### 2. Select Your Section Choose the section for your application (e.g., "marketing", "engineering") ### 3. Reserve Virtual IP (phpIPAM) The workflow will: - Find the appropriate subnet for virtual servers in your section - Reserve the first available IP for your virtual server - Tool: `reserve_ip_address(subnet_id, hostname)` ### 4. Get Pool Member IPs (phpIPAM) Ask Claude: "Show me web servers in the marketing section" - Tool: `search_subnets(query="marketing web")` - Tool: `get_subnet_details(subnet_id, include_addresses=true)` - Select 2 or more IPs for your pool members ### 5. Create Application (BIG-IP) Ask Claude: "Create AS3 application for marketing using these IPs" - Tool: `create_as3_app(device_name, section_name, virtual_ip, pool_members)` - Claude will generate AS3 declaration from template - Preview declaration before deployment - Confirm deployment ## Example Interaction ``` User: "I want to create a load balancer for the marketing department" Claude: [Calls phpIPAM list_sections()] "Found these sections: marketing, engineering, finance" User: "Use marketing" Claude: [Reserves VIP] "Reserved 10.1.100.5 for virtual server" [Lists web servers] "Found: 192.168.10.10, 192.168.10.11, 192.168.10.12" User: "Use the first two" Claude: [Shows preview] "Will create: VIP: 10.1.100.5 Pool: 192.168.10.10, 192.168.10.11 Deploy to lab-bigip?" User: "Yes" Claude: [Deploys] "✅ Application created successfully!" ``` ## MCP Features Demonstrated - **Cross-Server Orchestration**: Claude coordinates between two MCP servers - **Resources**: View available templates via `templates://available` - **Prompts**: This workflow guide - **Context State**: Cached authentication tokens - **Human-in-the-Loop**: Preview before deployment """ # ============================================================================ # SERVER EXECUTION # ============================================================================ if __name__ == "__main__": # Run the server using stdio transport (default for MCP) mcp.run()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/arunhotra/mcp-bigip'

If you have feedback or need assistance with the MCP directory API, please join our Discord server