build_and_run.py•50.6 kB
"""
Python CLI replacement for scripts/build_and_run.sh.
Step 1: Provide CLI skeleton with argument parsing, colored output, and
environment checks (uv, Docker). Subsequent steps will implement each shell
feature (local setup, Splunk config, server run, Docker mode, stop, etc.).
"""
from __future__ import annotations
import argparse
import os
import shutil
import signal
import subprocess
import sys
from dataclasses import dataclass
from pathlib import Path
# ANSI colors
COLOR_RED = "\033[0;31m"
COLOR_GREEN = "\033[0;32m"
COLOR_YELLOW = "\033[1;33m"
COLOR_BLUE = "\033[0;34m"
COLOR_PURPLE = "\033[0;35m"
COLOR_RESET = "\033[0m"
def print_status(message: str) -> None:
print(f"{COLOR_BLUE}[INFO]{COLOR_RESET} {message}")
def print_success(message: str) -> None:
print(f"{COLOR_GREEN}[SUCCESS]{COLOR_RESET} {message}")
def print_warning(message: str) -> None:
print(f"{COLOR_YELLOW}[WARNING]{COLOR_RESET} {message}")
def print_error(message: str) -> None:
print(f"{COLOR_RED}[ERROR]{COLOR_RESET} {message}")
def print_local(message: str) -> None:
print(f"{COLOR_PURPLE}[LOCAL]{COLOR_RESET} {message}")
@dataclass
class CliArgs:
force_docker: bool
force_local: bool
stop: bool
restart: bool
detached: bool
no_inspector: bool
local_only: bool
docker_only: bool
test: bool
detailed: bool
setup: bool
def parse_args(argv: list[str] | None = None) -> CliArgs:
parser = argparse.ArgumentParser(
prog="build-and-run",
description=(
"Build and run MCP Server for Splunk. Either via Docker (full stack) "
"or local mode (FastMCP server only)."
),
)
parser.add_argument(
"--docker", action="store_true", help="Force Docker deployment (skip prompt)", dest="force_docker"
)
parser.add_argument(
"--local", action="store_true", help="Force local deployment (skip prompt)", dest="force_local"
)
parser.add_argument(
"--stop", action="store_true", help="Stop Docker services and clean up", dest="stop"
)
parser.add_argument(
"--restart",
action="store_true",
help=(
"Stop any running MCP processes/services, then start local server detached using current .env (no prompts)"
),
dest="restart",
)
parser.add_argument(
"--local-only",
action="store_true",
help="With --stop: stop only local FastMCP processes (skip Docker)",
dest="local_only",
)
parser.add_argument(
"--docker-only",
action="store_true",
help="With --stop: stop only Docker services (skip local)",
dest="docker_only",
)
parser.add_argument(
"--detached",
"-d",
action="store_true",
help="Run local server in background (detached). Use with --local or interactive local mode",
dest="detached",
)
parser.add_argument(
"--no-inspector",
action="store_true",
help="Do not auto-start MCP Inspector in local mode",
dest="no_inspector",
)
parser.add_argument(
"--test", "-t",
action="store_true",
help="Run MCP server test (standalone or after starting)",
dest="test"
)
parser.add_argument(
"--detailed",
action="store_true",
help="Show detailed output in test (use with --test)",
dest="detailed"
)
parser.add_argument(
"--setup",
action="store_true",
help="Force Splunk credential setup prompt (even if .env is configured)",
dest="setup"
)
ns = parser.parse_args(argv)
return CliArgs(
force_docker=ns.force_docker,
force_local=ns.force_local,
stop=ns.stop,
restart=ns.restart,
detached=ns.detached,
no_inspector=ns.no_inspector,
local_only=ns.local_only,
docker_only=ns.docker_only,
test=ns.test,
detailed=ns.detailed,
setup=ns.setup,
)
def check_uv_available() -> bool:
return shutil.which("uv") is not None
def check_docker_available() -> bool:
return shutil.which("docker") is not None
def check_compose_available() -> tuple[bool, list[str]]:
"""Return whether a compose command is available and the base command list.
Prefers `docker compose`, falls back to `docker-compose`.
"""
if shutil.which("docker") is not None:
# Verify `docker compose` subcommand works
code = subprocess.run(["docker", "compose", "version"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False).returncode
if code == 0:
return True, ["docker", "compose"]
if shutil.which("docker-compose") is not None:
return True, ["docker-compose"]
return False, []
def show_intro() -> None:
print("🚀 Building and Running MCP Server for Splunk")
print("=============================================")
print()
print("📚 Need help with prerequisites? See: docs/getting-started/installation.md")
print()
def interactive_choice() -> str:
print_status("Both Docker and local development options are available.")
print("Choose deployment method:")
print(" 1) Docker (full stack with Splunk, Traefik, MCP Inspector)")
print(" 2) Local (FastMCP server only, lighter weight)")
print()
choice = input("Enter your choice (1 or 2, default: 1): ").strip() or "1"
if choice not in {"1", "2"}:
print_warning("Invalid choice. Using Docker deployment (default).")
return "docker"
return "docker" if choice == "1" else "local"
def not_implemented_yet(topic: str) -> int:
print_error(f"{topic} is not implemented yet in the Python CLI.")
print_status("We'll implement this functionality in the next steps.")
return 2
def run_cmd(cmd: list[str], cwd: str | None = None) -> int:
try:
print_local(f"Running: {' '.join(cmd)}")
result = subprocess.run(cmd, cwd=cwd, check=False)
return result.returncode
except FileNotFoundError:
print_error(f"Command not found: {cmd[0]}")
return 127
def ensure_logs_dir() -> None:
logs_dir = Path("logs")
if not logs_dir.exists():
print_local("Creating logs directory...")
logs_dir.mkdir(parents=True, exist_ok=True)
def install_uv_if_missing() -> bool:
"""Attempt to install uv if missing. Returns True if available after attempt."""
if check_uv_available():
return True
print_warning("uv not found. Attempting to install uv...")
installer = None
if shutil.which("curl"):
installer = ["bash", "-lc", "curl -LsSf https://astral.sh/uv/install.sh | sh"]
elif shutil.which("wget"):
installer = ["bash", "-lc", "wget -qO- https://astral.sh/uv/install.sh | sh"]
if installer is None:
print_error("Neither curl nor wget found. Please install uv manually: pip install uv")
return False
code = subprocess.run(installer, check=False).returncode
# try update PATH for current process
os.environ["PATH"] = f"{os.path.expanduser('~')}/.cargo/bin:" + os.environ.get("PATH", "")
if code == 0 and check_uv_available():
print_success("uv installed successfully!")
return True
print_error("Failed to install uv. Please install manually: https://astral.sh/uv")
return False
def load_env_file(env_path: Path) -> None:
if not env_path.exists():
print_warning("No .env file found. Using system environment variables only.")
# Show minimal info if present
sys_host = os.environ.get("SPLUNK_HOST") or os.environ.get("MCP_SPLUNK_HOST")
if sys_host:
print_status("📋 System Environment Splunk Configuration:")
print(f" 🌐 Host: {sys_host}")
return
print_local("Loading environment variables from .env file...")
with env_path.open("r", encoding="utf-8") as f:
for raw in f:
line = raw.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
continue
key, value = line.split("=", 1)
# Strip quotes
value = value.strip().strip('"').strip("'")
os.environ[key] = value
print_success("Environment variables loaded from .env file!")
# Show summary (mask passwords)
print()
print_status("📋 Splunk Configuration Summary:")
print(f" 🌐 Host: {os.environ.get('SPLUNK_HOST', 'Not set')}")
print(f" 🔌 Port: {os.environ.get('SPLUNK_PORT', '8089 (default)')}")
print(f" 👤 User: {os.environ.get('SPLUNK_USERNAME', 'Not set')}")
pw = os.environ.get("SPLUNK_PASSWORD")
pw_display = f"***{pw[-3:]}" if pw else "Not set"
print(f" 🔐 Pass: {pw_display}")
print(f" 🔒 SSL: {os.environ.get('SPLUNK_VERIFY_SSL', 'Not set')}")
def update_env_file(env_path: Path, updates: dict[str, str]) -> None:
# Create if missing
if not env_path.exists():
env_path.write_text("", encoding="utf-8")
# Read current lines
with env_path.open("r", encoding="utf-8") as f:
lines = f.readlines()
# Normalize: ensure last line ends with \n to prevent concatenation on append
if lines and not lines[-1].endswith('\n'):
lines[-1] += '\n'
keys = set(updates.keys())
new_lines: list[str] = []
for line in lines:
if not line.strip() or line.lstrip().startswith("#"):
new_lines.append(line)
continue
if "=" not in line:
new_lines.append(line)
continue
key = line.split("=", 1)[0].strip()
if key in keys:
val = updates[key]
new_lines.append(f"{key}='{val}'\n")
keys.remove(key)
else:
new_lines.append(line)
# Append any remaining keys
for k in keys:
new_lines.append(f"{k}='{updates[k]}'\n")
with env_path.open("w", encoding="utf-8") as f:
f.writelines(new_lines)
def prompt_splunk_config(is_docker_mode: bool) -> bool:
env_path = Path(".env")
# Ensure .env exists (seed from env.example if present)
if not env_path.exists():
example = Path("env.example")
if example.exists():
print_local("Creating .env file from env.example...")
env_path.write_text(example.read_text(encoding="utf-8"), encoding="utf-8")
print_success(".env file created from env.example")
else:
print_error("env.example not found. Cannot create .env file.")
return False
# Read current values
current: dict[str, str] = {}
with env_path.open("r", encoding="utf-8") as f:
for raw in f:
line = raw.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, v = line.split("=", 1)
v = v.strip().strip('"').strip("'")
current[k] = v
cur_host = current.get("SPLUNK_HOST", "")
cur_port = current.get("SPLUNK_PORT", "8089")
cur_user = current.get("SPLUNK_USERNAME", "admin")
cur_pass = current.get("SPLUNK_PASSWORD", "")
print()
print_status("🔧 Splunk Configuration Setup")
print("==================================")
print()
print("Current Splunk configuration:")
print(f" Host: {cur_host or 'Not set'}")
print(f" Port: {cur_port or '8089'}")
print(f" Username: {cur_user or 'admin'}")
print(f" Password: {'***' if cur_pass else 'Not set'}")
print()
docker_defaults_restored = False
if is_docker_mode and cur_host and cur_host != "so1":
print("🚢 Docker Mode Detected:")
print(f" Current SPLUNK_HOST ({cur_host}) is different from Docker default (so1)")
print(" This will use external Splunk instead of the included Docker container")
print()
restore = input("Restore Docker defaults (so1) to include Splunk container? (y/N): ").strip()
if restore.lower() == "y":
cur_host = "so1"
cur_port = "8089"
cur_user = "admin"
cur_pass = "Chang3d!"
docker_defaults_restored = True
print_success("Restored Docker defaults: SPLUNK_HOST=so1, SPLUNK_PORT=8089, SPLUNK_USERNAME=admin, SPLUNK_PASSWORD=Chang3d!")
print()
if docker_defaults_restored:
final_host, final_port, final_user, final_pass = cur_host, cur_port, cur_user, cur_pass
print_local("Using restored Docker defaults, skipping user input prompts...")
else:
# Helper to prompt with enforcement
def prompt_value(label: str, current: str, display_current: str | None = None, default: str | None = None, required: bool = True, secure_mode: bool = False) -> str:
import getpass
value = current
display = display_current if display_current is not None else current
while True:
if current:
p = f"Enter {label} (press Enter to keep current: {display}): "
else:
p = f"Enter {label} (required, current: Not set): "
if secure_mode:
new = getpass.getpass(p)
else:
new = input(p).strip()
if new:
value = new
elif current:
if secure_mode and current: # Confirm keep for secure fields
confirm = input("Keep current value? (y/N): ").strip().lower()
if confirm != 'y':
continue
value = current
elif default:
value = default
if required and not value:
print_error(f"{label} is required. Please provide a value.")
continue
return value
final_host = prompt_value("Splunk host/URL", cur_host)
final_port = prompt_value("Splunk port", cur_port, default="8089", required=False)
final_user = prompt_value("Splunk username", cur_user, default="admin")
try:
final_pass = prompt_value("Splunk password", cur_pass, display_current="***" if cur_pass else "", required=True, secure_mode=True)
except KeyboardInterrupt:
print()
final_pass = cur_pass
# Strip protocol if URL provided
if final_host.startswith("http://") or final_host.startswith("https://"):
final_host = final_host.split("://", 1)[1]
print_local(f"Extracted hostname from URL: {final_host}")
print_local("Note: SSL verification setting unchanged (preserves private CA configuration)")
# Validate
if not final_host:
print_error("SPLUNK_HOST is required. Please provide a value.")
return False
if not final_user:
print_error("SPLUNK_USERNAME is required. Please provide a value.")
return False
if not final_pass:
print_error("SPLUNK_PASSWORD is required. Please provide a value.")
return False
# Determine whether changes are needed
has_changes = docker_defaults_restored or any(
[final_host != cur_host, final_port != cur_port, final_user != cur_user, final_pass != cur_pass]
)
print()
print("Final Splunk configuration:")
print(f" Host: {final_host}")
print(f" Port: {final_port}")
print(f" Username: {final_user}")
print(" Password: ***")
print()
# Note: final_pass contains the actual password value (not masked);
# the display_current is only for prompt masking, actual value is preserved/updated
if has_changes:
if docker_defaults_restored:
print_status("Automatically updating .env file with restored Docker defaults...")
update_env_file(env_path, {
"SPLUNK_HOST": final_host,
"SPLUNK_PORT": final_port,
"SPLUNK_USERNAME": final_user,
"SPLUNK_PASSWORD": final_pass,
})
print_success(".env file updated successfully with Docker defaults!")
else:
confirm = input("Update .env file with these settings? (y/N): ").strip().lower()
if confirm == "y":
update_env_file(env_path, {
"SPLUNK_HOST": final_host,
"SPLUNK_PORT": final_port,
"SPLUNK_USERNAME": final_user,
"SPLUNK_PASSWORD": final_pass,
})
print_success(".env file updated successfully!")
else:
print_warning("Configuration update cancelled. Using existing values.")
# Export for current session
os.environ["SPLUNK_HOST"] = final_host
os.environ["SPLUNK_PORT"] = final_port
os.environ["SPLUNK_USERNAME"] = final_user
os.environ["SPLUNK_PASSWORD"] = final_pass
print_success("Splunk configuration loaded for current session.")
return True
def setup_local_env(force_setup: bool = False) -> int:
print_local("Setting up local development environment...")
if not install_uv_if_missing():
return 1
# Create or sync venv
venv_cfg = Path(".venv/pyvenv.cfg")
uv_lock = Path("uv.lock")
pyproject = Path("pyproject.toml")
if (not venv_cfg.exists()) or (uv_lock.exists() and uv_lock.stat().st_mtime > venv_cfg.stat().st_mtime) or (pyproject.stat().st_mtime > venv_cfg.stat().st_mtime):
print_local("Creating/updating virtual environment and installing dependencies...")
code = run_cmd(["uv", "sync", "--dev"])
if code != 0:
return code
else:
print_local("Virtual environment is up to date.")
# Ensure .env exists
env_path = Path(".env")
was_existing = env_path.exists()
if not was_existing:
print_warning(".env file not found. Creating from env.example...")
example = Path("env.example")
if example.exists():
env_path.write_text(example.read_text(encoding="utf-8"), encoding="utf-8")
print_warning("Created .env file from env.example.")
print_status("Prompting for Splunk configuration to customize...")
else:
print_warning("env.example not found. Proceeding without .env.")
# Check if prompting is needed
if not force_setup and was_existing:
# Read current .env to check key vars
current: dict[str, str] = {}
if env_path.exists():
with env_path.open("r", encoding="utf-8") as f:
for raw in f:
line = raw.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, v = line.split("=", 1)
v = v.strip().strip('"').strip("'")
current[k] = v
has_host = bool(current.get("SPLUNK_HOST", "").strip())
has_user = bool(current.get("SPLUNK_USERNAME", "").strip())
has_pass = bool(current.get("SPLUNK_PASSWORD", "").strip())
if has_host and has_user and has_pass:
print_status("Splunk configuration in .env looks complete. Skipping setup prompt.")
load_env_file(env_path)
print_success("Local environment setup complete!")
return 0
# Prompt and load configuration if needed or forced
if not prompt_splunk_config(is_docker_mode=False):
return 1
load_env_file(env_path)
print_success("Local environment setup complete!")
return 0
def find_available_port(start_port: int, attempts: int = 10) -> int:
import socket
port = start_port
for _ in range(attempts):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(0.2)
result = s.connect_ex(("127.0.0.1", port))
if result != 0: # non-zero means connection failed => likely free
return port
port += 1
print_local(f"Port {port - 1} is in use, trying next port...")
print_warning(f"Could not find an available port after {attempts} attempts")
return start_port
def is_port_listening(port: int) -> bool:
import socket
try:
with socket.create_connection(("127.0.0.1", port), timeout=0.5):
return True
except OSError:
return False
def print_access_points(mcp_port: int) -> None:
print()
print_status("📋 Access Points:")
print(f" 🔌 MCP Server (HTTP): http://localhost:{mcp_port}")
print(f" 🔌 MCP Server API: http://localhost:{mcp_port}/mcp/")
print(f" 🩺 MCP Health Dashboard: http://localhost:{mcp_port}")
def start_mcp_inspector(mcp_port: int) -> bool:
"""Start MCP Inspector (Node 22+ required). Returns True if available/running."""
# If already running on 6274, reuse
if is_port_listening(6274):
print_warning("MCP Inspector appears to already be running on port 6274")
print_success("Using existing MCP Inspector instance")
return True
if shutil.which("node") is None or shutil.which("npx") is None:
print_warning("Node.js/npx not found. MCP Inspector will not be available.")
return False
# Check Node version
out = subprocess.run(["node", "--version"], capture_output=True, text=True, check=False)
version = out.stdout.strip().lstrip("v")
try:
major = int(version.split(".")[0]) if version else 0
except ValueError:
major = 0
if major < 22:
print_warning(f"Node.js v{version or 'unknown'} detected, but MCP Inspector requires Node.js 22+.")
return False
print_local("Node.js v22+ detected. Starting MCP Inspector...")
ensure_logs_dir()
inspector_url = f"http://localhost:{mcp_port}/mcp/"
env = os.environ.copy()
env["DANGEROUSLY_OMIT_AUTH"] = "true"
env["MCP_AUTO_OPEN_ENABLED"] = "false"
cmd = [
"npx",
"--yes",
"@modelcontextprotocol/inspector@0.16.5",
"--transport",
"streamable-http",
"--server-url",
inspector_url,
]
log_file = Path("logs/inspector.log")
try:
with log_file.open("w", encoding="utf-8") as lf:
proc = subprocess.Popen(cmd, stdout=lf, stderr=lf, env=env)
except FileNotFoundError:
print_warning("Failed to start MCP Inspector (npx not available).")
return False
# Wait for readiness
attempts = 0
max_attempts = 10
ready = False
import time
try:
import httpx # lightweight dependency already in project
while attempts < max_attempts:
try:
r = httpx.get("http://localhost:6274", timeout=2.0)
if r.status_code < 500:
ready = True
break
except httpx.HTTPError:
pass
attempts += 1
time.sleep(1)
except ImportError:
# Fallback: port check
while attempts < max_attempts and not is_port_listening(6274):
attempts += 1
time.sleep(1)
ready = is_port_listening(6274)
# Persist PID for stop_mcp
Path(".inspector_pid").write_text(str(proc.pid), encoding="utf-8")
if ready:
print_success("MCP Inspector started successfully on port 6274 and is accessible!")
return True
else:
print_warning("MCP Inspector may not be fully ready yet. Check logs: logs/inspector.log")
return False
def run_local_server(detached: bool = False, skip_inspector: bool = False, run_test: bool = False, detailed: bool = False) -> int:
# Ensure any existing local processes are stopped first
print_status("Checking for and stopping any existing local MCP processes...")
stop_local_processes()
print_local("Starting MCP server locally with FastMCP CLI...")
if not install_uv_if_missing():
return 1
ensure_logs_dir()
preferred_port_str = os.environ.get("MCP_SERVER_PORT", "8003")
try:
preferred_port = int(preferred_port_str)
except ValueError:
preferred_port = 8003
print_local(f"Preferred port from MCP_SERVER_PORT: {preferred_port}")
mcp_port = find_available_port(preferred_port)
if mcp_port != preferred_port:
print_warning(f"Port {preferred_port} is in use. Using port {mcp_port} instead.")
else:
print_local(f"Using port {mcp_port} for MCP server.")
log_file = Path("logs/mcp_splunk_server.log")
# Preflight: ensure fastmcp is importable; if not, install
test_code = subprocess.run(["uv", "run", "python", "-c", "import fastmcp; print('ok')"], capture_output=True, text=True, check=False)
if test_code.returncode != 0:
print_local("FastMCP import failed. Installing fastmcp...")
add_code = run_cmd(["uv", "add", "fastmcp"])
if add_code != 0:
print_warning("Failed to add fastmcp via uv; continuing, it may already be present via sync.")
run_cmd(["uv", "sync", "--dev"]) # best-effort
cmd = [
"uv",
"run",
"fastmcp",
"run",
"src/server.py",
"--transport",
"http",
"--port",
str(mcp_port),
]
print_local(f"Command: {' '.join(cmd)}")
# Start server
with log_file.open("w", encoding="utf-8") as lf:
proc = subprocess.Popen(cmd, stdout=lf, stderr=lf, start_new_session=True)
# Always write PID file for monitoring/testing
pid_file = Path('.mcp_local_server.pid')
pid_file.write_text(str(proc.pid), encoding='utf-8')
print_local(f"Server PID: {proc.pid} (written to {pid_file})")
print_local("Waiting for MCP server to start...")
import time
time.sleep(3)
if proc.poll() is not None:
print_error("MCP server failed to start. Check the logs:")
if log_file.exists():
print_error("=== MCP Server Log (last 50 lines) ===")
try:
lines = log_file.read_text(encoding="utf-8").splitlines()[-50:]
for line in lines:
print_error(line)
except (OSError, UnicodeDecodeError):
print_error("(failed to read log file)")
# Cleanup PID on early failure
pid_file.unlink(missing_ok=True)
return 1
# Check port listening up to 5 attempts
attempts = 0
max_attempts = 5
server_listening = False
while attempts < max_attempts:
if is_port_listening(mcp_port):
server_listening = True
break
time.sleep(2)
attempts += 1
print_local(f"Port check attempt {attempts}/{max_attempts}...")
if not server_listening:
print_error(f"MCP server is not listening on port {mcp_port}")
print_error(f"Server process ID: {proc.pid}")
if log_file.exists():
print_error("=== MCP Server Log (last 20 lines) ===")
try:
lines = log_file.read_text(encoding="utf-8").splitlines()[-20:]
for line in lines:
print_error(line)
except (OSError, UnicodeDecodeError):
print_error("(failed to read log file)")
# Try foreground restart for debugging
if proc.poll() is None:
proc.terminate()
print_error("Attempting to run server in foreground for debugging...")
# Cleanup PID on failure
pid_file.unlink(missing_ok=True)
return run_cmd(cmd)
print_success(f"MCP server is listening on port {mcp_port}!")
print_status("🎉 Local MCP Server Ready!")
print_access_points(mcp_port)
print()
print_status("📊 Log Files:")
print(" 📄 MCP Server: logs/mcp_splunk_server.log")
# Start MCP Inspector (best-effort)
started_inspector = False
if skip_inspector:
print_warning("Skipping MCP Inspector auto-start (flag --no-inspector).")
else:
started_inspector = start_mcp_inspector(mcp_port)
if started_inspector:
print(" 📊 MCP Inspector: http://localhost:6274")
print(" 📄 MCP Inspector: logs/inspector.log")
# Detached mode: exit after start
if detached:
print()
print_success(f"✅ Local server is running detached (PID {proc.pid}).")
print_access_points(mcp_port)
if started_inspector:
print(" 📊 MCP Inspector: http://localhost:6274")
print()
print_status("🛑 To stop the server:")
print(" uv run mcp-server --stop")
if run_test:
print_status("Running MCP server test...")
test_cmd = ["uv", "run", "python", "src/cli/test_mcp_server.py"]
if detailed:
test_cmd.append("--detailed")
run_cmd(test_cmd)
return 0 # Exit main script immediately after starting detached and optional test
print()
print_status("🛑 To stop the server: press Ctrl+C")
print_local("Monitoring server; press Ctrl+C to stop.")
# Always re-print access points as the final lines for easy copy/paste
print_access_points(mcp_port)
try:
while proc.poll() is None:
time.sleep(1)
except KeyboardInterrupt:
print_local("Stopping MCP Server...")
if proc.poll() is None:
proc.terminate()
# Wait for up to 10 seconds for graceful shutdown
try:
proc.wait(timeout=10)
except subprocess.TimeoutExpired:
print_warning("Process did not exit gracefully; sending SIGKILL...")
proc.kill()
# Ensure inspector and pid artifacts are cleaned up on Ctrl+C
try:
stop_local_processes()
except (OSError, ValueError, subprocess.SubprocessError):
pass
# Cleanup PID file
pid_file.unlink(missing_ok=True)
return 0
# Handle natural process exit (e.g., crash or manual kill)
print_error("MCP server process exited unexpectedly.")
if log_file.exists():
print_error("=== Recent server logs (last 10 lines) ===")
try:
lines = log_file.read_text(encoding="utf-8").splitlines()[-10:]
for line in lines:
print_error(line)
except (OSError, UnicodeDecodeError):
print_error("(failed to read log file)")
# Cleanup
try:
stop_local_processes()
except (OSError, ValueError, subprocess.SubprocessError):
pass
# Cleanup PID file
pid_file.unlink(missing_ok=True)
return proc.returncode or 1
def stop_docker_services() -> int:
print_status("Stopping Docker services...")
available, base_cmd = check_compose_available()
if not available:
print_warning("docker-compose or docker compose not found; skipping Docker stop.")
return 0
# Check if any compose-managed containers are running
ps_quiet = base_cmd + ["ps", "-q"]
try:
out = subprocess.run(ps_quiet, capture_output=True, text=True, check=False)
running_ids = [line for line in out.stdout.strip().splitlines() if line.strip()]
except FileNotFoundError:
running_ids = []
if not running_ids:
print_status("No Docker MCP services appear to be running (compose ps is empty).")
return 0
print_status(f"Stopping Docker services (found {len(running_ids)} running container(s))...")
down_cmd = base_cmd + ["down"]
rc = run_cmd(down_cmd)
if rc == 0:
# Verify after stopping
out2 = subprocess.run(ps_quiet, capture_output=True, text=True, check=False)
remaining = [line for line in out2.stdout.strip().splitlines() if line.strip()]
if remaining:
print_warning(f"Some Docker containers may still be running: {len(remaining)}")
else:
print_success("Docker services stopped.")
else:
print_error("Failed to stop some Docker services.")
return rc
def stop_local_processes() -> int:
base_dir = Path(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
os.chdir(base_dir)
# Stop by PID file if present
pid_file = base_dir / ".mcp_local_server.pid"
inspector_pid_file = base_dir / ".inspector_pid"
# Detect initial processes
initial_pids: set[int] = set()
if pid_file.exists():
try:
p_str = pid_file.read_text(encoding="utf-8").strip()
p = int(p_str)
initial_pids.add(p)
except (OSError, ValueError):
pass
patterns = [
"fastmcp run src/server.py",
"fastmcp run",
]
for pat in patterns:
if shutil.which("pgrep") is not None:
out = subprocess.run(["pgrep", "-f", pat], capture_output=True, text=True, check=False)
if out.returncode == 0 and out.stdout:
for line in out.stdout.strip().splitlines():
try:
initial_pids.add(int(line.strip()))
except ValueError:
continue
if pid_file.exists():
try:
pid_str = pid_file.read_text(encoding="utf-8").strip()
pid = int(pid_str)
print_status(f"Stopping local MCP Server (PID {pid})...")
os.kill(pid, signal.SIGTERM)
# Wait briefly for termination
for _ in range(10):
try:
os.kill(pid, 0)
import time as _t
_t.sleep(0.3)
except ProcessLookupError:
break
else:
print_warning("Process did not exit after SIGTERM; sending SIGKILL...")
os.kill(pid, signal.SIGKILL)
pid_file.unlink(missing_ok=True)
print_success("Local MCP Server stopped.")
except (OSError, ValueError) as e:
print_warning(f"Could not stop PID from file: {e}")
# Stop inspector if present
if inspector_pid_file.exists():
try:
ipid_str = inspector_pid_file.read_text(encoding="utf-8").strip()
ipid = int(ipid_str)
print_status(f"Stopping MCP Inspector (PID {ipid})...")
os.kill(ipid, signal.SIGTERM)
# Wait briefly for inspector termination
for _ in range(10):
try:
os.kill(ipid, 0)
import time as _t
_t.sleep(0.3)
except ProcessLookupError:
break
else:
print_warning("Inspector did not exit after SIGTERM; sending SIGKILL...")
os.kill(ipid, signal.SIGKILL)
inspector_pid_file.unlink(missing_ok=True)
print_success("MCP Inspector stopped.")
except (OSError, ValueError) as e:
# If the process is already gone or PID invalid, still remove stale pid file
print_warning(f"Could not stop inspector PID from file: {e}")
inspector_pid_file.unlink(missing_ok=True)
print_status("Removed stale .inspector_pid file.")
# Additional inspector stop attempts for port 6274
try:
import socket as _socket
s = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM)
s.settimeout(0.2)
port_open = s.connect_ex(("127.0.0.1", 6274)) == 0
s.close()
except OSError:
port_open = False
if port_open:
if shutil.which("lsof"):
out = subprocess.run(["lsof", "-t", "-i", ":6274"], capture_output=True, text=True, check=False)
pids = [line.strip() for line in out.stdout.splitlines() if line.strip()]
for pid_str in pids:
try:
pid = int(pid_str)
print_status(f"Stopping MCP Inspector (port 6274) PID {pid}...")
os.kill(pid, signal.SIGTERM)
print_success("MCP Inspector stop signal sent.")
except (ValueError, OSError):
continue
else:
if shutil.which("fuser"):
print_status("Using fuser to kill processes on port 6274...")
subprocess.run(["fuser", "-k", "6274/tcp"], check=False)
elif shutil.which("pkill"):
print_status("Trying pkill -f '@modelcontextprotocol/inspector'...")
subprocess.run(["pkill", "-f", "@modelcontextprotocol/inspector"], check=False)
elif shutil.which("pgrep") and shutil.which("kill"):
out = subprocess.run(["pgrep", "-f", "@modelcontextprotocol/inspector"], capture_output=True, text=True, check=False)
if out.returncode == 0 and out.stdout:
for line in out.stdout.strip().splitlines():
try:
pid = int(line.strip())
print_status(f"Killing Inspector PID {pid} matching '@modelcontextprotocol/inspector'...")
os.kill(pid, signal.SIGTERM)
except (ValueError, OSError):
continue
# After attempting to stop inspector by port/name, remove stale pid file if present
inspector_pid_file.unlink(missing_ok=True)
# Fall back to pkill for fastmcp patterns
for pat in patterns:
if shutil.which("pkill") is not None:
print_status(f"Trying pkill -f '{pat}'...")
subprocess.run(["pkill", "-f", pat], check=False)
else:
if shutil.which("pgrep") and shutil.which("kill"):
out = subprocess.run(["pgrep", "-f", pat], capture_output=True, text=True, check=False)
if out.returncode == 0 and out.stdout:
for line in out.stdout.strip().splitlines():
try:
pid = int(line.strip())
print_status(f"Killing PID {pid} matching '{pat}'...")
os.kill(pid, signal.SIGTERM)
except (ValueError, OSError):
continue
# Small grace period to allow processes to exit cleanly before verification
try:
import time as _t
_t.sleep(0.5)
except (ImportError, RuntimeError, OSError):
pass
# Verify post-state
remaining_pids: set[int] = set()
if pid_file.exists():
try:
p_str = pid_file.read_text(encoding="utf-8").strip()
p = int(p_str)
remaining_pids.add(p)
except (OSError, ValueError):
pass
for pat in patterns:
if shutil.which("pgrep") is not None:
out = subprocess.run(["pgrep", "-f", pat], capture_output=True, text=True, check=False)
if out.returncode == 0 and out.stdout:
for line in out.stdout.strip().splitlines():
try:
remaining_pids.add(int(line.strip()))
except ValueError:
continue
# Confirm candidates are truly alive (avoid false positives from zombie/exited processes)
alive_remaining: set[int] = set()
for pid in list(remaining_pids):
try:
os.kill(pid, 0) # raises ProcessLookupError if not running
alive_remaining.add(pid)
except ProcessLookupError:
continue
except PermissionError:
# If we lack permission, assume it's alive to be safe
alive_remaining.add(pid)
initially_running = len(initial_pids)
now_running = len(alive_remaining)
stopped_count = max(0, initially_running - now_running)
if initially_running == 0:
print_status("No local MCP processes found.")
return 0
if stopped_count > 0:
print_success(f"Stopped {stopped_count} local MCP process(es).")
if now_running > 0:
print_warning(
f"{now_running} MCP process(es) may still be running: {', '.join(map(str, sorted(alive_remaining)))}"
)
return 0
def run_docker_setup(run_test: bool = False) -> int:
print_status("Using Docker deployment mode...")
# Ensure compose available
available, base_cmd = check_compose_available()
if not available:
print_error("docker-compose or docker compose not found. Please install or use local mode.")
print_error("To install docker-compose: https://docs.docker.com/compose/install/")
return 1
# If uv exists (install if missing), ensure uv.lock is up to date
if install_uv_if_missing():
print_status("uv detected. Ensuring uv.lock is up to date for Docker build...")
uv_lock = Path("uv.lock")
pyproject = Path("pyproject.toml")
if (not uv_lock.exists()) or (pyproject.stat().st_mtime > uv_lock.stat().st_mtime):
print_status("Updating uv.lock file...")
code = run_cmd(["uv", "sync", "--dev"])
if code != 0:
return code
print_success("uv.lock updated successfully!")
else:
print_status("uv.lock is already up to date.")
else:
print_warning("uv not found. Docker will use existing uv.lock file (if present).")
# Ensure .env exists
env_path = Path(".env")
if not env_path.exists():
print_warning(".env file not found. Creating from env.example...")
example = Path("env.example")
if example.exists():
env_path.write_text(example.read_text(encoding="utf-8"), encoding="utf-8")
print_warning("Created .env file. You may want to edit it with your Splunk configuration.")
else:
print_warning("env.example not found. Proceeding without .env.")
# Prompt for Splunk configuration with Docker mode enabled
prompt_splunk_config(is_docker_mode=True)
load_env_file(env_path)
# Choose mode
print()
print_status("Choose Docker deployment mode:")
print(" 1) Production (default) - Optimized for performance, no hot reload")
print(" 2) Development - Hot reload enabled, enhanced debugging")
print()
docker_choice = (input("Enter your choice (1 or 2, default: 1): ").strip() or "1")
docker_mode = "prod" if docker_choice == "1" else "dev"
service_name = "mcp-server" if docker_mode == "prod" else "mcp-server-dev"
# Compose command with file selection
compose_cmd = base_cmd[:]
if docker_mode == "dev":
compose_cmd += ["-f", "docker-compose-dev.yml"]
# Build
print_status("Building Docker image...")
code = run_cmd(compose_cmd + ["build", service_name])
if code != 0:
print_error("Failed to build Docker image")
return code
print_success("Docker image built successfully!")
# Up
print_status("Starting services with docker compose...")
code = run_cmd(compose_cmd + ["up", "-d"])
if code != 0:
print_error("Failed to start services")
return code
print_success("Services started successfully!")
# Brief wait and status
import time
time.sleep(5)
print_status("Checking service status...")
run_cmd(compose_cmd + ["ps"]) # best-effort
# Show service URLs
print()
print_success("🎉 Docker setup complete!")
print()
print_status("📋 Service URLs:")
print(" 🔧 Traefik Dashboard: http://localhost:8080")
if os.environ.get("SPLUNK_HOST", "") in ("", "so1"):
print(" 🌐 Splunk Web UI: http://localhost:9000 (admin/Chang3d!)")
else:
print(f" 🌐 External Splunk: {os.environ.get('SPLUNK_HOST')}")
print(f" 🔌 MCP Server: http://localhost:{os.environ.get('MCP_SERVER_PORT', '8001')}/mcp/")
print(f" 🩺 MCP Health Dashboard: http://localhost:{os.environ.get('MCP_SERVER_PORT', '8001')}")
print(" 📊 MCP Inspector: http://localhost:6274")
print()
print_status("🔍 To check logs:")
print(" "+" ".join(compose_cmd + ["logs", "-f", service_name]))
print(" "+" ".join(compose_cmd + ["logs", "-f", "mcp-inspector"]))
print()
print_status("🛑 To stop all services:")
print(" "+" ".join(compose_cmd + ["down"]))
if docker_mode == "dev":
print()
print_status("🚀 Development Mode Features:")
print(" • Hot reload enabled - changes sync automatically")
print(" • Enhanced debugging and logging")
print(" • Use: "+" ".join(compose_cmd + ["logs", "-f", service_name]))
if run_test:
print_status("Running MCP server test...")
test_cmd = ["uv", "run", "python", "src/cli/test_mcp_server.py", "--detailed"]
run_cmd(test_cmd)
return 0
def main(argv: list[str] | None = None) -> int:
os.chdir(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
show_intro()
args = parse_args(argv)
docker_available = check_docker_available()
uv_available = check_uv_available()
if docker_available:
print_success("Docker is available and may be running.")
else:
print_warning("Docker is not available.")
if uv_available:
print_success("uv package manager is available.")
else:
print_warning("uv package manager is not available.")
# Handle standalone --test first (no startup)
if args.test and not (args.force_docker or args.force_local or args.restart or args.stop):
print_status("Running standalone MCP server test...")
test_cmd = ["uv", "run", "python", "src/cli/test_mcp_server.py"]
if args.detailed:
test_cmd.append("--detailed")
return run_cmd(test_cmd)
# Handle forced modes first
if args.stop:
if args.local_only and args.docker_only:
print_error("Cannot use --local-only and --docker-only together.")
return 1
rc_local = 0
rc_docker = 0
if not args.docker_only:
rc_local = stop_local_processes()
if not args.local_only:
rc_docker = stop_docker_services()
# Prefer non-zero if any failed
return rc_local or rc_docker
if args.restart:
# Stop everything, then start local server in detached mode using existing .env strictly
print_status("Restart requested: stopping running services/processes first...")
rc_local = stop_local_processes()
rc_docker = stop_docker_services()
if rc_local or rc_docker:
print_warning("Some components may not have stopped cleanly; continuing with restart.")
# Ensure uv available for local run
if not check_uv_available() and not install_uv_if_missing():
print_error("uv package manager is required for local restart.")
return 1
# Strictly use current .env without prompts
env_path = Path(".env")
if not env_path.exists():
print_error(".env file is required for --restart. Please create it first.")
return 1
print_local("Loading existing .env without prompting...")
load_env_file(env_path)
# Start local server detached and skip inspector only if user requested via flag
return run_local_server(detached=True, skip_inspector=args.no_inspector, run_test=args.test, detailed=args.detailed)
if args.force_docker and args.force_local:
print_error("Cannot force both --docker and --local. Choose one.")
return 1
if args.force_docker:
if not docker_available:
print_error("Docker deployment requested but Docker is not available.")
print_error("Please start Docker or install Docker first.")
return 1
print_status("Forcing Docker deployment as requested...")
return run_docker_setup(run_test=args.test)
if args.force_local:
if not uv_available:
print_error("Local deployment requested but uv package manager is not available.")
print_error("Please install uv first: curl -LsSf https://astral.sh/uv/install.sh | sh")
return 1
print_status("Forcing local deployment as requested...")
code = setup_local_env(force_setup=args.setup)
if code != 0:
return code
return run_local_server(detached=args.detached, skip_inspector=args.no_inspector, run_test=args.test, detailed=args.detailed)
# Interactive mode selection
if docker_available and uv_available:
selected = interactive_choice()
if selected == "docker":
return run_docker_setup(run_test=args.test)
else:
code = setup_local_env(force_setup=args.setup)
if code != 0:
return code
# If not explicitly detached by flag, ask interactively
local_detached = args.detached
if not local_detached:
ans = (input("Run local server detached (background)? (y/N): ").strip() or "n").lower()
local_detached = ans == 'y'
return run_local_server(detached=local_detached, skip_inspector=args.no_inspector, run_test=args.test, detailed=args.detailed)
if docker_available:
print_status("Only Docker is available. Using Docker deployment...")
return run_docker_setup(run_test=args.test)
if uv_available:
print_status("Only local development is available. Setting up local mode...")
code = setup_local_env(force_setup=args.setup)
if code != 0:
return code
# If not explicitly detached by flag, ask interactively
local_detached = args.detached
if not local_detached:
ans = (input("Run local server detached (background)? (y/N): ").strip() or "n").lower()
local_detached = ans == 'y'
return run_local_server(detached=local_detached, skip_inspector=args.no_inspector, run_test=args.test, detailed=args.detailed)
print_error("Neither Docker nor uv package manager are available.")
print_error("Please install one of the following:")
print_error("1. Docker: https://docs.docker.com/get-docker/")
print_error("2. uv: curl -LsSf https://astral.sh/uv/install.sh | sh")
print()
print_error("📚 For detailed installation instructions, see:")
print_error(" docs/getting-started/installation.md")
print_error("")
print_error("🔧 You can also run our prerequisite checker to see what's missing:")
print_error(" ./scripts/check-prerequisites.sh")
return 1
if __name__ == "__main__":
sys.exit(main())