Skip to main content
Glama

Model Control Plane (MCP) Server

#!/usr/bin/env python3 # MCP Connection Test Component import json import requests import socket import os import subprocess import platform from langflow.custom import Component from langflow.io import StrInput, BoolInput, MessageTextInput, Output from langflow.schema import Data from langflow.schema.message import Message class MCPConnectionTest(Component): """Component for testing MCP server connections""" # Langflow UI display properties display_name = "MCP Connection Test" description = "Test different ways to connect to the MCP server and provide diagnostics" icon = "🔌" category = "Tools" name = "MCPConnectionTest" inputs = [ StrInput( name="server_host", display_name="Server Host", info="Host address of the MCP server (try: localhost, 127.0.0.1, host.docker.internal, or your machine's IP)", value="host.docker.internal", required=True ), StrInput( name="server_port", display_name="Server Port", info="Port of the MCP server", value="8000", required=True ), BoolInput( name="run_diagnostics", display_name="Run Network Diagnostics", info="Run additional network diagnostics", value=True ), MessageTextInput( name="user_message", display_name="User Message", info="Optional message to include with the test", required=False ) ] outputs = [ Output(display_name="Test Result", name="result", method="test_connection"), ] def _ping_host(self, host): """Try to ping the host to check connectivity""" try: # Different ping command based on OS param = '-n' if platform.system().lower() == 'windows' else '-c' command = ['ping', param, '1', host] # Execute the ping command result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=2) # Check if the ping was successful return result.returncode == 0 except Exception as e: return False def _check_socket_connection(self, host, port): """Try to establish a direct socket connection""" try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(2) result = sock.connect_ex((host, int(port))) sock.close() return result == 0, result # 0 means success except Exception as e: return False, str(e) def _try_http_connection(self, url): """Try to make an HTTP connection to the server""" try: response = requests.get(url, timeout=2) return True, response.status_code except requests.exceptions.ConnectionError as e: return False, "Connection error" except requests.exceptions.Timeout: return False, "Timeout" except Exception as e: return False, str(e) def _get_container_network_info(self): """Get network information from inside the container""" try: # Get container's IP address container_ip = socket.gethostbyname(socket.gethostname()) # Try to resolve host.docker.internal host_docker_internal = None try: host_docker_internal = socket.gethostbyname("host.docker.internal") except: pass # Get gateway IP gateway_ip = None try: with open('/proc/net/route') as f: for line in f: fields = line.strip().split() if fields[1] == '00000000': # Default route gateway_ip = socket.inet_ntoa(bytes.fromhex(fields[2].zfill(8))[::-1]) break except: pass return { "container_ip": container_ip, "host_docker_internal": host_docker_internal, "gateway_ip": gateway_ip } except Exception as e: return {"error": str(e)} def test_connection(self) -> Message: """Test connecting to the MCP server with detailed diagnostics""" host = self.server_host port = self.server_port run_diagnostics = self.run_diagnostics user_message = self.user_message if hasattr(self, 'user_message') else "" results = [] # Add user message if provided if user_message: results.append(f"User message: {user_message}") # Store connection results connection_details = {} # Try direct socket connection can_connect, socket_result = self._check_socket_connection(host, port) connection_details["socket_connection"] = { "success": can_connect, "result": socket_result } # Try HTTP connections with different protocols http_url = f"http://{host}:{port}" https_url = f"https://{host}:{port}" http_success, http_result = self._try_http_connection(http_url) connection_details["http_connection"] = { "success": http_success, "result": http_result } https_success, https_result = self._try_http_connection(https_url) connection_details["https_connection"] = { "success": https_success, "result": https_result } # Comprehensive report if can_connect or http_success or https_success: results.append(f"✅ Successfully connected to MCP server at {host}:{port}") if can_connect: results.append("✓ Socket connection: Successful") else: results.append("⚠️ Socket connection: Failed") if http_success: results.append(f"✓ HTTP connection: Successful (Status {http_result})") else: results.append("⚠️ HTTP connection: Failed") if https_success: results.append(f"✓ HTTPS connection: Successful (Status {https_result})") else: results.append("⚠️ HTTPS connection: Failed") else: results.append(f"🚫 Cannot connect to MCP server at {host}:{port}") # Try alternative hosts if the given one failed alternative_hosts = [] if host != "localhost" and host != "127.0.0.1": alternative_hosts.append("localhost") alternative_hosts.append("127.0.0.1") if host != "host.docker.internal": alternative_hosts.append("host.docker.internal") if host != "172.17.0.1": alternative_hosts.append("172.17.0.1") # Try container's gateway (common for Docker) network_info = self._get_container_network_info() if "gateway_ip" in network_info and network_info["gateway_ip"] and network_info["gateway_ip"] not in alternative_hosts: alternative_hosts.append(network_info["gateway_ip"]) # Add results for alternative hosts alt_results = [] for alt_host in alternative_hosts: alt_can_connect, _ = self._check_socket_connection(alt_host, port) if alt_can_connect: alt_results.append(f"✅ Alternative host {alt_host}:{port} is reachable!") alt_results.append(f"⚠️ Try changing your server_host to '{alt_host}' instead") if alt_results: results.append("\n🔍 Alternative hosts tested:") results.extend(alt_results) # Only run extensive diagnostics if requested if run_diagnostics: results.append("\n📊 Network Diagnostics:") # Check if host.docker.internal is configured properly try: docker_internal_ip = socket.gethostbyname("host.docker.internal") results.append(f"✓ host.docker.internal resolves to {docker_internal_ip}") except socket.gaierror: results.append("⚠️ host.docker.internal is not properly configured") # Add container network information results.append("\n🖧 Container Network Information:") for key, value in self._get_container_network_info().items(): results.append(f"- {key}: {value}") # Check ping to the host ping_result = self._ping_host(host) if ping_result: results.append(f"✓ Ping to {host} successful") else: results.append(f"⚠️ Ping to {host} failed") # Add recommendations results.append("\n📋 Recommendations:") results.append("1. Make sure your MCP server is running on 0.0.0.0 instead of just 127.0.0.1") results.append("2. Check for any firewalls blocking connections between Docker and host") results.append("3. Try using your machine's actual LAN IP instead of localhost or host.docker.internal") results.append("4. If using Linux, ensure host.docker.internal is properly configured or use the gateway IP") return Message(text="\n".join(results))

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dvladimirov/MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server