#!/usr/bin/env python3
# Generated by MCP Langflow Component Generator
# Date: 2025-03-16T01:14:21.176462
import os
import json
import requests
from typing import Dict, List, Any, Optional, Type, Union
# Import Langflow components
from langflow.custom import Component
from langflow.io import (
Input,
Output,
Select,
IntInput,
FloatInput,
StrInput,
TextInput,
MultilineInput,
UrlInput,
SecretStrInput
)
from langflow.schema import Data
class MCPComponent(Component):
"""Component for interacting with MCP models"""
# Langflow UI display properties
display_name = "MCP AI Service"
description = "Connect to MCP server for AI model inference, Git analysis, filesystem access, and Prometheus metrics"
icon = "🧠"
category = "AI Services"
def __init__(self):
super().__init__()
self.mcp_server_url = "http://localhost:8000"
self.available_models = []
# Known model IDs from server - prioritize OpenAI models over Azure
self.known_models = ["openai-gpt-chat", "openai-gpt-completion", "git-analyzer", "git-diff-analyzer", "filesystem", "prometheus", "azure-gpt-4"]
# Model display names
self.model_names = {
"openai-gpt-chat": "OpenAI gpt-4o-mini",
"openai-gpt-completion": "OpenAI gpt-3.5-turbo-instruct",
"git-analyzer": "Git Repository Analyzer",
"git-diff-analyzer": "Git Diff Analyzer",
"filesystem": "Filesystem Access",
"prometheus": "Prometheus Metrics",
"azure-gpt-4": "Azure GPT-4"
}
def set_mcp_server_url(self, url: str):
"""Set the MCP server URL and refresh available models"""
self.mcp_server_url = url
self.available_models = self._fetch_available_models()
def _fetch_available_models(self) -> List[Dict[str, Any]]:
"""Fetch available models from the MCP server"""
try:
response = requests.get(f"{self.mcp_server_url}/v1/models")
response.raise_for_status()
return response.json().get("models", [])
except Exception as e:
print(f"Error fetching models from MCP server: {e}")
return []
def list_models(self) -> List[Dict[str, Any]]:
"""Return list of available models, with OpenAI models prioritized over Azure models"""
# If we have actual models from the server, prioritize them by type
if not self.available_models:
self.available_models = self._fetch_available_models()
if self.available_models:
prioritized_models = []
openai_models = []
other_models = []
azure_models = []
# Sort models by type
for model in self.available_models:
model_id = model.get('id', '')
if 'openai' in model_id:
openai_models.append(model)
elif 'azure' in model_id:
azure_models.append(model)
else:
other_models.append(model)
# Combine in priority order: OpenAI first, then others, Azure last
prioritized_models = openai_models + other_models + azure_models
return prioritized_models
# If no models are available, return empty list
return self.available_models
# Define inputs and outputs
def build(self):
# Server URL input
self.add_input(
StrInput(
id="mcp_server_url",
name="MCP Server URL",
description="URL of the MCP server",
default="http://localhost:8000",
required=True
)
)
# Operation selector
self.add_input(
Select(
id="operation",
name="Operation",
description="Type of operation to perform",
options=["chat", "completion", "git", "filesystem", "prometheus"],
default="chat",
required=True
)
)
# Model ID input (optional)
self.add_input(
Select(
id="model_id",
name="Model ID",
description="ID of the model to use (if not specified, a default will be chosen)",
options=self.known_models,
default=None,
required=False
)
)
# Messages input (for chat)
self.add_input(
MultilineInput(
id="messages",
name="Messages",
description="Chat messages in JSON format: [{'role': 'user', 'content': 'Hello'}]",
default="[{\"role\": \"user\", \"content\": \"Hello, how can I help you?\"}]",
required=False
)
)
# Prompt input (for completion)
self.add_input(
TextInput(
id="prompt",
name="Prompt",
description="Text prompt for completion operation",
default="Complete this text:",
required=False
)
)
# Repository URL input (for git)
self.add_input(
UrlInput(
id="repo_url",
name="Repository URL",
description="URL of the Git repository to analyze",
default="",
required=False
)
)
# Path input (for filesystem)
self.add_input(
StrInput(
id="path",
name="Path",
description="Path for filesystem operations",
default=".",
required=False
)
)
# Query input (for prometheus)
self.add_input(
StrInput(
id="query",
name="Query",
description="Prometheus query string",
default="up",
required=False
)
)
# Result output
self.add_output(
Output(
id="result",
name="Result",
description="Operation result"
)
)
# Process the inputs and produce outputs
def process(self, data: Data) -> Dict[str, Any]:
# Extract inputs
self.set_mcp_server_url(data["inputs"]["mcp_server_url"])
operation = data["inputs"]["operation"]
model_id = data["inputs"].get("model_id")
# Process based on operation type
if operation == "chat":
# Parse messages from JSON string
messages_str = data["inputs"].get("messages", "[{\"role\": \"user\", \"content\": \"Hello\"}]")
try:
messages = json.loads(messages_str)
result = self.chat(messages=messages, model_id=model_id)
except json.JSONDecodeError:
result = {"error": "Invalid JSON format for messages"}
elif operation == "completion":
prompt = data["inputs"].get("prompt", "Complete this text:")
result = self.completion(prompt=prompt, model_id=model_id)
elif operation == "git":
repo_url = data["inputs"].get("repo_url", "")
result = self.git(repo_url=repo_url, model_id=model_id)
elif operation == "filesystem":
path = data["inputs"].get("path", ".")
result = self.filesystem(path=path, model_id=model_id)
elif operation == "prometheus":
query = data["inputs"].get("query", "up")
result = self.prometheus(query=query, model_id=model_id)
else:
result = {"error": f"Unsupported operation: {operation}"}
return {"result": result}
def completion(self, prompt: str, model_id: Optional[str] = None, max_tokens: int = 100, temperature: float = 0.7):
"""
Generate text completion for the given prompt
Args:
prompt: Text prompt for completion
model_id: The ID of the model to use (optional)
max_tokens: Maximum number of tokens to generate (1-4000)
temperature: Temperature for generation (0.0 to 1.0)
Returns:
dict: JSON response with generated text and metadata
"""
supported_models = ['openai-gpt-completion'] # Only use OpenAI model
if model_id is None:
model_id = supported_models[0]
if model_id not in supported_models:
raise ValueError(f"Unsupported model_id. Supported models: {supported_models}")
# Validate prompt
if not isinstance(prompt, str) or not prompt.strip():
raise ValueError("Prompt must be a non-empty string")
payload = {
'prompt': prompt,
'max_tokens': max_tokens,
'temperature': temperature
}
url = f"{self.mcp_server_url}/v1/models/{model_id}/completion"
try:
response = requests.post(url, json=payload)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Error making HTTP request: {e}")
return {"error": str(e)}
def chat(self, messages: List[Dict[str, str]], model_id: Optional[str] = None, max_tokens: int = 100, temperature: float = 0.7):
"""
Generate a chat response from a series of messages
Args:
messages: List of message objects with 'role' and 'content' keys
Roles can be 'system', 'user', or 'assistant'
model_id: The ID of the model to use (optional)
max_tokens: Maximum number of tokens to generate (1-4000)
temperature: Temperature for generation (0.0 to 1.0)
Returns:
dict: JSON response with generated message and metadata
"""
supported_models = ['openai-gpt-chat'] # Only use OpenAI model
if model_id is None:
model_id = supported_models[0]
if model_id not in supported_models:
raise ValueError(f"Unsupported model_id. Supported models: {supported_models}")
# Validate messages
if not isinstance(messages, list) or not messages:
raise ValueError("Messages must be a non-empty list of message objects")
for msg in messages:
if not isinstance(msg, dict) or 'role' not in msg or 'content' not in msg:
raise ValueError("Each message must be a dictionary with 'role' and 'content' keys")
payload = {
'messages': messages,
'max_tokens': max_tokens,
'temperature': temperature
}
url = f"{self.mcp_server_url}/v1/models/{model_id}/chat"
try:
response = requests.post(url, json=payload)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Error making HTTP request: {e}")
return {"error": str(e)}
def git(self, repo_url: str = None, branch: str = "main", model_id: Optional[str] = None):
"""
Analyze a Git repository
Args:
repo_url: URL of the Git repository to analyze
branch: Branch to analyze (default: main)
model_id: The ID of the model to use (optional)
Returns:
dict: JSON response with repository analysis
"""
supported_models = ['git-analyzer', 'git-diff-analyzer']
if model_id is None:
model_id = supported_models[0]
if model_id not in supported_models:
raise ValueError(f"Unsupported model_id. Supported models: {supported_models}")
payload = {
'repo_url': repo_url if repo_url else "",
'branch': branch
}
# Use the correct endpoint
url = f"{self.mcp_server_url}/v1/models/{model_id}/analyze"
try:
response = requests.post(url, json=payload)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
print(f"HTTP request failed: {e}")
return {"error": str(e)}
def filesystem(self, path: str = None, operation: str = "list", model_id: Optional[str] = None):
"""
Perform filesystem operations
Args:
path: Path to operate on
operation: Operation to perform (list, read, write, etc.)
model_id: The ID of the model to use (optional)
Returns:
dict: JSON response with filesystem operation results
"""
supported_models = ['filesystem']
if model_id is None:
model_id = supported_models[0]
if model_id not in supported_models:
raise ValueError(f"Unsupported model_id: {model_id}. Supported models: {supported_models}")
# Determine the correct endpoint based on the operation
if operation not in ["list", "read", "write", "search", "info", "mkdir", "move", "edit"]:
raise ValueError(f"Unsupported operation: {operation}. Supported operations: list, read, write, search, info, mkdir, move, edit")
# Prepare payload based on operation
payload = {'path': path} if path else {'path': '.'}
# Use the appropriate endpoint for the specified operation
url = f"{self.mcp_server_url}/v1/models/{model_id}/{operation}"
try:
response = requests.post(url, json=payload)
response.raise_for_status()
result = response.json()
# For list operation, the response format might need some adjustment
if operation == "list" and "entries" in result:
# The test is looking for result["files"], so we'll add it
result["files"] = result.get("entries", [])
return result
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def prometheus(self, query: str = None, start_time: str = None, end_time: str = None, model_id: Optional[str] = None):
"""
Query Prometheus metrics
Args:
query: Prometheus query string
start_time: Start time for range queries (ISO format)
end_time: End time for range queries (ISO format)
model_id: The ID of the model to use (optional)
Returns:
dict: JSON response with Prometheus metrics
"""
supported_models = ['prometheus']
if model_id is None:
model_id = supported_models[0]
if model_id not in supported_models:
raise ValueError(f"Model '{model_id}' is not supported")
payload = {}
if query:
payload['query'] = query
if start_time and end_time:
payload['start'] = start_time
payload['end'] = end_time
# For range queries, we need a step
payload['step'] = "1m" # Default to 1 minute steps
url = f"{self.mcp_server_url}/v1/models/{model_id}/query_range"
else:
# For instant queries
url = f"{self.mcp_server_url}/v1/models/{model_id}/query"
try:
response = requests.post(url, json=payload)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Error making HTTP request: {e}")
return {"error": str(e)}