MCP Unified Server
by getfounded
- tools
#!/usr/bin/env python3
import os
import sys
import json
import logging
import tempfile
import subprocess
import signal
import time
import re
import psutil
from pathlib import Path
from typing import List, Dict, Optional, Any, Union
from enum import Enum
import threading
# Ensure compatibility with mcp server
from mcp.server.fastmcp import FastMCP, Context
# External MCP reference for tool registration
external_mcp = None
def set_external_mcp(mcp):
"""Set the external MCP reference for tool registration"""
global external_mcp
external_mcp = mcp
logging.info("Streamlit tools MCP reference set")
class StreamlitTools(str, Enum):
"""Enum of Streamlit tool names"""
CREATE_APP = "streamlit_create_app"
RUN_APP = "streamlit_run_app"
STOP_APP = "streamlit_stop_app"
LIST_APPS = "streamlit_list_apps"
GET_APP_URL = "streamlit_get_app_url"
MODIFY_APP = "streamlit_modify_app"
CHECK_DEPS = "streamlit_check_deps"
class StreamlitService:
"""Service to manage Streamlit applications via MCP"""
def __init__(self, apps_dir=None, port_range=(8501, 8599)):
"""Initialize the Streamlit service"""
self.apps_dir = apps_dir or os.path.expanduser("~/streamlit_apps")
self.port_range = port_range
self.running_apps = {} # app_id -> {process, port, url}
self.app_logs = {} # app_id -> log_content
# Create apps directory if it doesn't exist
os.makedirs(self.apps_dir, exist_ok=True)
# Track used ports
self.used_ports = set()
# Add lock for thread safety
self.lock = threading.Lock()
logging.info(f"Initialized Streamlit service at {self.apps_dir}")
def _find_available_port(self):
"""Find an available port for a Streamlit app"""
with self.lock:
for port in range(self.port_range[0], self.port_range[1] + 1):
if port not in self.used_ports:
# Also check if port is really free (in case an external process is using it)
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex(('localhost', port))
sock.close()
if result != 0: # Port is available
self.used_ports.add(port)
return port
raise ValueError(f"No available ports in range {self.port_range}")
def _release_port(self, port):
"""Release a port when an app is stopped"""
with self.lock:
if port in self.used_ports:
self.used_ports.remove(port)
def validate_app_id(self, app_id):
"""Validate that an app ID is safe and suitable for a filename"""
if not re.match(r'^[a-zA-Z0-9_-]+$', app_id):
raise ValueError(
"App ID must contain only letters, numbers, underscores, and hyphens")
return app_id
def get_app_path(self, app_id):
"""Get the file path for a Streamlit app"""
safe_app_id = self.validate_app_id(app_id)
return os.path.join(self.apps_dir, f"{safe_app_id}.py")
async def create_app(self, app_id, code, overwrite=False):
"""Create a new Streamlit app with the given code"""
safe_app_id = self.validate_app_id(app_id)
app_path = self.get_app_path(safe_app_id)
# Check if app already exists
if os.path.exists(app_path) and not overwrite:
raise ValueError(
f"App {app_id} already exists. Use overwrite=True to replace it.")
# Write the app code to the file
with open(app_path, 'w') as f:
f.write(code)
logging.info(f"Created Streamlit app '{app_id}' at {app_path}")
return {
"app_id": safe_app_id,
"path": app_path,
"status": "created"
}
async def run_app(self, app_id, port=None, browser=False):
"""Run a Streamlit app as a background process"""
safe_app_id = self.validate_app_id(app_id)
app_path = self.get_app_path(safe_app_id)
# Check if app exists
if not os.path.exists(app_path):
raise ValueError(f"App {app_id} does not exist.")
# Check if app is already running
if safe_app_id in self.running_apps:
return {
"app_id": safe_app_id,
"status": "already_running",
"port": self.running_apps[safe_app_id]["port"],
"url": self.running_apps[safe_app_id]["url"]
}
# Find an available port if not specified
if port is None:
port = self._find_available_port()
# Create a log file
log_path = os.path.join(self.apps_dir, f"{safe_app_id}.log")
log_file = open(log_path, 'w')
# Build the command
cmd = [
sys.executable, "-m", "streamlit", "run",
app_path,
"--server.port", str(port),
"--server.headless", "true",
"--server.enableCORS", "false",
"--server.enableXsrfProtection", "false"
]
if browser:
cmd.extend(["--server.headless", "false"])
# Launch the process
try:
process = subprocess.Popen(
cmd,
stdout=log_file,
stderr=subprocess.STDOUT,
text=True
)
# Wait a bit to ensure process starts correctly
time.sleep(2)
# Check if process is running
if process.poll() is not None:
# Process failed to start
log_file.close()
with open(log_path, 'r') as f:
error_log = f.read()
self._release_port(port)
raise RuntimeError(
f"Failed to start Streamlit app. Error: {error_log}")
# Store process info
url = f"http://localhost:{port}"
self.running_apps[safe_app_id] = {
"process": process,
"port": port,
"url": url,
"log_path": log_path,
"log_file": log_file
}
logging.info(f"Started Streamlit app '{app_id}' on port {port}")
return {
"app_id": safe_app_id,
"status": "running",
"port": port,
"url": url
}
except Exception as e:
# Clean up if process failed to start
log_file.close()
self._release_port(port)
raise Exception(f"Error starting Streamlit app: {str(e)}")
async def stop_app(self, app_id):
"""Stop a running Streamlit app"""
safe_app_id = self.validate_app_id(app_id)
# Check if app is running
if safe_app_id not in self.running_apps:
return {
"app_id": safe_app_id,
"status": "not_running"
}
# Get process info
app_info = self.running_apps[safe_app_id]
process = app_info["process"]
log_file = app_info["log_file"]
port = app_info["port"]
# Terminate the process (and all child processes)
try:
parent = psutil.Process(process.pid)
for child in parent.children(recursive=True):
child.terminate()
parent.terminate()
# Wait for termination (with timeout)
process.wait(timeout=10)
# Close the log file
if not log_file.closed:
log_file.close()
# Release the port
self._release_port(port)
# Remove from running apps
del self.running_apps[safe_app_id]
logging.info(f"Stopped Streamlit app '{app_id}'")
return {
"app_id": safe_app_id,
"status": "stopped"
}
except Exception as e:
logging.error(f"Error stopping Streamlit app '{app_id}': {str(e)}")
# If process didn't terminate gracefully, kill it
try:
process.kill()
self._release_port(port)
# Close the log file
if not log_file.closed:
log_file.close()
# Remove from running apps
del self.running_apps[safe_app_id]
return {
"app_id": safe_app_id,
"status": "killed"
}
except Exception as kill_error:
return {
"app_id": safe_app_id,
"status": "error",
"error": f"Failed to kill app: {str(kill_error)}"
}
async def list_apps(self):
"""List all available Streamlit apps"""
# Get all .py files in the apps directory
app_files = [f for f in os.listdir(self.apps_dir) if f.endswith('.py')]
# Sort apps by timestamp (newest first)
app_files.sort(key=lambda f: os.path.getmtime(
os.path.join(self.apps_dir, f)), reverse=True)
# Format results
apps = []
for app_file in app_files:
app_id = app_file[:-3] # Remove .py extension
app_path = os.path.join(self.apps_dir, app_file)
# Get file stats
stats = os.stat(app_path)
# Check if app is running
is_running = app_id in self.running_apps
app_info = {
"app_id": app_id,
"path": app_path,
"size_bytes": stats.st_size,
"modified": time.ctime(stats.st_mtime),
"running": is_running
}
if is_running:
app_info["port"] = self.running_apps[app_id]["port"]
app_info["url"] = self.running_apps[app_id]["url"]
apps.append(app_info)
return {
"apps": apps,
"count": len(apps),
"apps_dir": self.apps_dir
}
async def get_app_url(self, app_id):
"""Get the URL for a running Streamlit app"""
safe_app_id = self.validate_app_id(app_id)
# Check if app is running
if safe_app_id not in self.running_apps:
return {
"app_id": safe_app_id,
"status": "not_running"
}
# Get app URL
app_info = self.running_apps[safe_app_id]
return {
"app_id": safe_app_id,
"status": "running",
"port": app_info["port"],
"url": app_info["url"]
}
async def modify_app(self, app_id, code_updates=None, append_code=None):
"""Modify an existing Streamlit app"""
safe_app_id = self.validate_app_id(app_id)
app_path = self.get_app_path(safe_app_id)
# Check if app exists
if not os.path.exists(app_path):
raise ValueError(f"App {app_id} does not exist.")
# Read current code
with open(app_path, 'r') as f:
current_code = f.read()
# Apply code updates if provided
if code_updates:
for old_text, new_text in code_updates:
current_code = current_code.replace(old_text, new_text)
# Append code if provided
if append_code:
current_code += "\n\n" + append_code
# Write the updated code back to the file
with open(app_path, 'w') as f:
f.write(current_code)
logging.info(f"Modified Streamlit app '{app_id}'")
# Restart the app if it's running
was_running = safe_app_id in self.running_apps
result = {"app_id": safe_app_id,
"status": "modified", "was_running": was_running}
if was_running:
port = self.running_apps[safe_app_id]["port"]
await self.stop_app(safe_app_id)
restart_result = await self.run_app(safe_app_id, port=port)
result["restart"] = restart_result
return result
async def check_dependencies(self):
"""Check if Streamlit and required dependencies are installed"""
# Check for Streamlit
try:
# Run streamlit version command
process = subprocess.run(
[sys.executable, "-m", "streamlit", "--version"],
capture_output=True,
text=True,
check=False
)
if process.returncode != 0:
return {
"status": "error",
"streamlit_installed": False,
"error": process.stderr.strip() or "Streamlit is not installed correctly",
"install_command": "pip install streamlit"
}
streamlit_version = process.stdout.strip()
# Check for other dependencies
dependencies = ["pandas", "numpy",
"matplotlib", "altair", "plotly"]
installed_deps = {}
for dep in dependencies:
try:
# Try to import the module
__import__(dep)
installed_deps[dep] = True
except ImportError:
installed_deps[dep] = False
return {
"status": "success",
"streamlit_installed": True,
"streamlit_version": streamlit_version,
"dependencies": installed_deps,
"missing_dependencies": [dep for dep, installed in installed_deps.items() if not installed]
}
except Exception as e:
return {
"status": "error",
"streamlit_installed": False,
"error": str(e),
"install_command": "pip install streamlit"
}
# Tool function definitions that will be registered with MCP
async def streamlit_create_app(app_id: str, code: str, overwrite: bool = False, ctx: Context = None) -> str:
"""Create a new Streamlit app with the provided code
Parameters:
- app_id: Unique identifier for the app (letters, numbers, underscores, and hyphens only)
- code: Python code for the Streamlit app
- overwrite: Whether to overwrite an existing app with the same ID
"""
streamlit = _get_streamlit_service()
try:
result = await streamlit.create_app(app_id, code, overwrite)
return json.dumps(result, indent=2)
except Exception as e:
return json.dumps({"status": "error", "error": str(e)}, indent=2)
async def streamlit_run_app(app_id: str, port: int = None, browser: bool = False, ctx: Context = None) -> str:
"""Run a Streamlit app as a background process
Parameters:
- app_id: Identifier of the app to run
- port: Optional port number (if not specified, an available port will be used)
- browser: Whether to open the app in a browser window
"""
streamlit = _get_streamlit_service()
try:
result = await streamlit.run_app(app_id, port, browser)
return json.dumps(result, indent=2)
except Exception as e:
return json.dumps({"status": "error", "error": str(e)}, indent=2)
async def streamlit_stop_app(app_id: str, ctx: Context = None) -> str:
"""Stop a running Streamlit app
Parameters:
- app_id: Identifier of the app to stop
"""
streamlit = _get_streamlit_service()
try:
result = await streamlit.stop_app(app_id)
return json.dumps(result, indent=2)
except Exception as e:
return json.dumps({"status": "error", "error": str(e)}, indent=2)
async def streamlit_list_apps(ctx: Context = None) -> str:
"""List all available Streamlit apps"""
streamlit = _get_streamlit_service()
try:
result = await streamlit.list_apps()
return json.dumps(result, indent=2)
except Exception as e:
return json.dumps({"status": "error", "error": str(e)}, indent=2)
async def streamlit_get_app_url(app_id: str, ctx: Context = None) -> str:
"""Get the URL for a running Streamlit app
Parameters:
- app_id: Identifier of the app
"""
streamlit = _get_streamlit_service()
try:
result = await streamlit.get_app_url(app_id)
return json.dumps(result, indent=2)
except Exception as e:
return json.dumps({"status": "error", "error": str(e)}, indent=2)
async def streamlit_modify_app(app_id: str, code_updates: List[tuple] = None, append_code: str = None, ctx: Context = None) -> str:
"""Modify an existing Streamlit app
Parameters:
- app_id: Identifier of the app to modify
- code_updates: List of tuples (old_text, new_text) for text replacements
- append_code: Code to append to the end of the app
"""
streamlit = _get_streamlit_service()
try:
result = await streamlit.modify_app(app_id, code_updates, append_code)
return json.dumps(result, indent=2)
except Exception as e:
return json.dumps({"status": "error", "error": str(e)}, indent=2)
async def streamlit_check_deps(ctx: Context = None) -> str:
"""Check if Streamlit and required dependencies are installed"""
streamlit = _get_streamlit_service()
try:
result = await streamlit.check_dependencies()
return json.dumps(result, indent=2)
except Exception as e:
return json.dumps({"status": "error", "error": str(e)}, indent=2)
# Tool registration and initialization
_streamlit_service = None
def initialize_streamlit_service(apps_dir=None, port_range=(8501, 8599)):
"""Initialize the Streamlit service"""
global _streamlit_service
# Check if streamlit is installed
try:
import streamlit
streamlit_version = streamlit.__version__
logging.info(f"Found Streamlit version {streamlit_version}")
except ImportError:
logging.error(
"Streamlit is not installed. Please install it with 'pip install streamlit'")
return None
# Rest of initialization code...
# Use environment variables as fallback
if apps_dir is None:
apps_dir = os.environ.get("STREAMLIT_APPS_DIR")
if apps_dir is None:
apps_dir = os.path.expanduser("~/streamlit_apps")
_streamlit_service = StreamlitService(apps_dir, port_range)
return _streamlit_service
def _get_streamlit_service():
"""Get or initialize the Streamlit service"""
global _streamlit_service
if _streamlit_service is None:
_streamlit_service = initialize_streamlit_service()
return _streamlit_service
def get_streamlit_tools():
"""Get a dictionary of all Streamlit tools for registration with MCP"""
return {
StreamlitTools.CREATE_APP: streamlit_create_app,
StreamlitTools.RUN_APP: streamlit_run_app,
StreamlitTools.STOP_APP: streamlit_stop_app,
StreamlitTools.LIST_APPS: streamlit_list_apps,
StreamlitTools.GET_APP_URL: streamlit_get_app_url,
StreamlitTools.MODIFY_APP: streamlit_modify_app,
StreamlitTools.CHECK_DEPS: streamlit_check_deps
}
# This function will be called by the unified server to initialize the module
def initialize(mcp=None):
"""Initialize the Streamlit module with MCP reference"""
if mcp:
set_external_mcp(mcp)
# Initialize the service
service = initialize_streamlit_service()
if service:
logging.info("Streamlit service initialized successfully")
else:
logging.warning("Failed to initialize Streamlit service")
return service is not None
if __name__ == "__main__":
print("Streamlit service module - use with MCP Unified Server")