Skip to main content
Glama

Model Control Plane (MCP) Server

#!/usr/bin/env python3 # Generated by MCP Langflow Component Generator # Date: 2025-03-16T01:14:21.176462 import os import json import requests from typing import Dict, List, Any, Optional, Type, Union # Import Langflow components from langflow.custom import Component from langflow.io import ( Input, Output, Select, IntInput, FloatInput, StrInput, TextInput, MultilineInput, UrlInput, SecretStrInput ) from langflow.schema import Data class MCPComponent(Component): """Component for interacting with MCP models""" # Langflow UI display properties display_name = "MCP AI Service" description = "Connect to MCP server for AI model inference, Git analysis, filesystem access, and Prometheus metrics" icon = "🧠" category = "AI Services" def __init__(self): super().__init__() self.mcp_server_url = "http://localhost:8000" self.available_models = [] # Known model IDs from server - prioritize OpenAI models over Azure self.known_models = ["openai-gpt-chat", "openai-gpt-completion", "git-analyzer", "git-diff-analyzer", "filesystem", "prometheus", "azure-gpt-4"] # Model display names self.model_names = { "openai-gpt-chat": "OpenAI gpt-4o-mini", "openai-gpt-completion": "OpenAI gpt-3.5-turbo-instruct", "git-analyzer": "Git Repository Analyzer", "git-diff-analyzer": "Git Diff Analyzer", "filesystem": "Filesystem Access", "prometheus": "Prometheus Metrics", "azure-gpt-4": "Azure GPT-4" } def set_mcp_server_url(self, url: str): """Set the MCP server URL and refresh available models""" self.mcp_server_url = url self.available_models = self._fetch_available_models() def _fetch_available_models(self) -> List[Dict[str, Any]]: """Fetch available models from the MCP server""" try: response = requests.get(f"{self.mcp_server_url}/v1/models") response.raise_for_status() return response.json().get("models", []) except Exception as e: print(f"Error fetching models from MCP server: {e}") return [] def list_models(self) -> List[Dict[str, Any]]: """Return list of available models, with OpenAI models prioritized over Azure models""" # If we have actual models from the server, prioritize them by type if not self.available_models: self.available_models = self._fetch_available_models() if self.available_models: prioritized_models = [] openai_models = [] other_models = [] azure_models = [] # Sort models by type for model in self.available_models: model_id = model.get('id', '') if 'openai' in model_id: openai_models.append(model) elif 'azure' in model_id: azure_models.append(model) else: other_models.append(model) # Combine in priority order: OpenAI first, then others, Azure last prioritized_models = openai_models + other_models + azure_models return prioritized_models # If no models are available, return empty list return self.available_models # Define inputs and outputs def build(self): # Server URL input self.add_input( StrInput( id="mcp_server_url", name="MCP Server URL", description="URL of the MCP server", default="http://localhost:8000", required=True ) ) # Operation selector self.add_input( Select( id="operation", name="Operation", description="Type of operation to perform", options=["chat", "completion", "git", "filesystem", "prometheus"], default="chat", required=True ) ) # Model ID input (optional) self.add_input( Select( id="model_id", name="Model ID", description="ID of the model to use (if not specified, a default will be chosen)", options=self.known_models, default=None, required=False ) ) # Messages input (for chat) self.add_input( MultilineInput( id="messages", name="Messages", description="Chat messages in JSON format: [{'role': 'user', 'content': 'Hello'}]", default="[{\"role\": \"user\", \"content\": \"Hello, how can I help you?\"}]", required=False ) ) # Prompt input (for completion) self.add_input( TextInput( id="prompt", name="Prompt", description="Text prompt for completion operation", default="Complete this text:", required=False ) ) # Repository URL input (for git) self.add_input( UrlInput( id="repo_url", name="Repository URL", description="URL of the Git repository to analyze", default="", required=False ) ) # Path input (for filesystem) self.add_input( StrInput( id="path", name="Path", description="Path for filesystem operations", default=".", required=False ) ) # Query input (for prometheus) self.add_input( StrInput( id="query", name="Query", description="Prometheus query string", default="up", required=False ) ) # Result output self.add_output( Output( id="result", name="Result", description="Operation result" ) ) # Process the inputs and produce outputs def process(self, data: Data) -> Dict[str, Any]: # Extract inputs self.set_mcp_server_url(data["inputs"]["mcp_server_url"]) operation = data["inputs"]["operation"] model_id = data["inputs"].get("model_id") # Process based on operation type if operation == "chat": # Parse messages from JSON string messages_str = data["inputs"].get("messages", "[{\"role\": \"user\", \"content\": \"Hello\"}]") try: messages = json.loads(messages_str) result = self.chat(messages=messages, model_id=model_id) except json.JSONDecodeError: result = {"error": "Invalid JSON format for messages"} elif operation == "completion": prompt = data["inputs"].get("prompt", "Complete this text:") result = self.completion(prompt=prompt, model_id=model_id) elif operation == "git": repo_url = data["inputs"].get("repo_url", "") result = self.git(repo_url=repo_url, model_id=model_id) elif operation == "filesystem": path = data["inputs"].get("path", ".") result = self.filesystem(path=path, model_id=model_id) elif operation == "prometheus": query = data["inputs"].get("query", "up") result = self.prometheus(query=query, model_id=model_id) else: result = {"error": f"Unsupported operation: {operation}"} return {"result": result} def completion(self, prompt: str, model_id: Optional[str] = None, max_tokens: int = 100, temperature: float = 0.7): """ Generate text completion for the given prompt Args: prompt: Text prompt for completion model_id: The ID of the model to use (optional) max_tokens: Maximum number of tokens to generate (1-4000) temperature: Temperature for generation (0.0 to 1.0) Returns: dict: JSON response with generated text and metadata """ supported_models = ['openai-gpt-completion'] # Only use OpenAI model if model_id is None: model_id = supported_models[0] if model_id not in supported_models: raise ValueError(f"Unsupported model_id. Supported models: {supported_models}") # Validate prompt if not isinstance(prompt, str) or not prompt.strip(): raise ValueError("Prompt must be a non-empty string") payload = { 'prompt': prompt, 'max_tokens': max_tokens, 'temperature': temperature } url = f"{self.mcp_server_url}/v1/models/{model_id}/completion" try: response = requests.post(url, json=payload) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: print(f"Error making HTTP request: {e}") return {"error": str(e)} def chat(self, messages: List[Dict[str, str]], model_id: Optional[str] = None, max_tokens: int = 100, temperature: float = 0.7): """ Generate a chat response from a series of messages Args: messages: List of message objects with 'role' and 'content' keys Roles can be 'system', 'user', or 'assistant' model_id: The ID of the model to use (optional) max_tokens: Maximum number of tokens to generate (1-4000) temperature: Temperature for generation (0.0 to 1.0) Returns: dict: JSON response with generated message and metadata """ supported_models = ['openai-gpt-chat'] # Only use OpenAI model if model_id is None: model_id = supported_models[0] if model_id not in supported_models: raise ValueError(f"Unsupported model_id. Supported models: {supported_models}") # Validate messages if not isinstance(messages, list) or not messages: raise ValueError("Messages must be a non-empty list of message objects") for msg in messages: if not isinstance(msg, dict) or 'role' not in msg or 'content' not in msg: raise ValueError("Each message must be a dictionary with 'role' and 'content' keys") payload = { 'messages': messages, 'max_tokens': max_tokens, 'temperature': temperature } url = f"{self.mcp_server_url}/v1/models/{model_id}/chat" try: response = requests.post(url, json=payload) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: print(f"Error making HTTP request: {e}") return {"error": str(e)} def git(self, repo_url: str = None, branch: str = "main", model_id: Optional[str] = None): """ Analyze a Git repository Args: repo_url: URL of the Git repository to analyze branch: Branch to analyze (default: main) model_id: The ID of the model to use (optional) Returns: dict: JSON response with repository analysis """ supported_models = ['git-analyzer', 'git-diff-analyzer'] if model_id is None: model_id = supported_models[0] if model_id not in supported_models: raise ValueError(f"Unsupported model_id. Supported models: {supported_models}") payload = { 'repo_url': repo_url if repo_url else "", 'branch': branch } # Use the correct endpoint url = f"{self.mcp_server_url}/v1/models/{model_id}/analyze" try: response = requests.post(url, json=payload) response.raise_for_status() return response.json() except requests.RequestException as e: print(f"HTTP request failed: {e}") return {"error": str(e)} def filesystem(self, path: str = None, operation: str = "list", model_id: Optional[str] = None): """ Perform filesystem operations Args: path: Path to operate on operation: Operation to perform (list, read, write, etc.) model_id: The ID of the model to use (optional) Returns: dict: JSON response with filesystem operation results """ supported_models = ['filesystem'] if model_id is None: model_id = supported_models[0] if model_id not in supported_models: raise ValueError(f"Unsupported model_id: {model_id}. Supported models: {supported_models}") # Determine the correct endpoint based on the operation if operation not in ["list", "read", "write", "search", "info", "mkdir", "move", "edit"]: raise ValueError(f"Unsupported operation: {operation}. Supported operations: list, read, write, search, info, mkdir, move, edit") # Prepare payload based on operation payload = {'path': path} if path else {'path': '.'} # Use the appropriate endpoint for the specified operation url = f"{self.mcp_server_url}/v1/models/{model_id}/{operation}" try: response = requests.post(url, json=payload) response.raise_for_status() result = response.json() # For list operation, the response format might need some adjustment if operation == "list" and "entries" in result: # The test is looking for result["files"], so we'll add it result["files"] = result.get("entries", []) return result except requests.exceptions.RequestException as e: return {"error": str(e)} def prometheus(self, query: str = None, start_time: str = None, end_time: str = None, model_id: Optional[str] = None): """ Query Prometheus metrics Args: query: Prometheus query string start_time: Start time for range queries (ISO format) end_time: End time for range queries (ISO format) model_id: The ID of the model to use (optional) Returns: dict: JSON response with Prometheus metrics """ supported_models = ['prometheus'] if model_id is None: model_id = supported_models[0] if model_id not in supported_models: raise ValueError(f"Model '{model_id}' is not supported") payload = {} if query: payload['query'] = query if start_time and end_time: payload['start'] = start_time payload['end'] = end_time # For range queries, we need a step payload['step'] = "1m" # Default to 1 minute steps url = f"{self.mcp_server_url}/v1/models/{model_id}/query_range" else: # For instant queries url = f"{self.mcp_server_url}/v1/models/{model_id}/query" try: response = requests.post(url, json=payload) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: print(f"Error making HTTP request: {e}") return {"error": str(e)}

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dvladimirov/MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server