import asyncio
import os
import contextlib
import logging
import signal
import subprocess
import time
from typing import Optional, List, Tuple
# Удаляем импорты Starlette, так как мы не будем использовать ее напрямую
# from starlette.applications import Starlette
# from starlette.routing import Mount
import uvicorn
from starlette.responses import JSONResponse, PlainTextResponse
from starlette.requests import Request
from pydantic import BaseModel
try:
# Fast API for написания MCP-серверов
from mcp.server.fastmcp import FastMCP
except Exception as exc: # pragma: no cover
raise RuntimeError(
"Не удалось импортировать MCP SDK. Установите пакет 'mcp' (pip install modelcontextprotocol)."
) from exc
def _env_bool(name: str, default: bool = False) -> bool:
val = os.environ.get(name)
if val is None:
return default
val_lower = val.strip().lower()
return val_lower in ("1", "true", "yes", "y", "on")
def _env_int(name: str, default: int) -> int:
try:
return int(os.environ.get(name, str(default)))
except Exception:
return default
def _env_list(name: str, default: List[str]) -> List[str]:
raw = os.environ.get(name)
if not raw:
return list(default)
return [item.strip().lower() for item in raw.split(',') if item.strip()]
# Configure logging early
_log_level = os.environ.get("MCP_LOG_LEVEL", "INFO").upper()
logging.basicConfig(
level=getattr(logging, _log_level, logging.INFO),
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
logger = logging.getLogger("terminal-mcp")
# Runtime configuration (env)
ALLOWED_SHELLS = set(_env_list("MCP_ALLOWED_SHELLS", ["bash", "sh", "pwsh", "powershell", "cmd", "cmd.exe"]))
DEFAULT_SHELL = os.environ.get("MCP_DEFAULT_SHELL", "bash").lower()
MAX_STDOUT_BYTES = _env_int("MCP_MAX_STDOUT_BYTES", 1024 * 1024)
MAX_STDERR_BYTES = _env_int("MCP_MAX_STDERR_BYTES", 512 * 1024)
MAX_COMMAND_LENGTH = _env_int("MCP_MAX_COMMAND_LENGTH", 8192)
MAX_CONCURRENCY = max(1, _env_int("MCP_MAX_CONCURRENCY", 8))
KILL_GRACE_SECONDS = max(0, _env_int("MCP_KILL_GRACE_SECONDS", 3))
MAX_EXEC_SECONDS = max(1, _env_int("MCP_MAX_EXEC_SECONDS", 120))
ALLOWED_CWD_ROOT = os.environ.get("MCP_ALLOWED_CWD_ROOT")
# Concurrency limiter and metrics
_concurrency_semaphore = asyncio.Semaphore(MAX_CONCURRENCY)
START_TIME = time.time()
METRICS = {
"running": 0,
"total": 0,
"success": 0,
"error": 0,
"timeout": 0,
"spawn_failures": 0,
"bytes_stdout": 0,
"bytes_stderr": 0,
"truncated_stdout": 0,
"truncated_stderr": 0,
"duration_total_seconds": 0.0,
}
server = FastMCP(
"terminal-mcp",
instructions=os.environ.get(
"MCP_INSTRUCTIONS",
"Сервер MCP для выполнения терминальных команд. Используйте tools/list и tools/call.",
),
debug=_env_bool("MCP_DEBUG", False),
json_response=_env_bool("MCP_JSON_RESPONSE", False),
stateless_http=_env_bool("MCP_STATELESS_HTTP", False),
)
def _build_shell_command(command: str, shell: str) -> List[str]:
shell_lower = (shell or DEFAULT_SHELL).lower()
if shell_lower in ("powershell", "pwsh"):
exe = "powershell.exe" if os.name == "nt" else "pwsh"
return [exe, "-NoLogo", "-NoProfile", "-ExecutionPolicy", "Bypass", "-Command", command]
if shell_lower in ("cmd", "cmd.exe"):
if os.name == "nt":
return ["cmd.exe", "/d", "/s", "/c", command]
# На *nix аналогично через sh
return ["sh", "-lc", command]
# По умолчанию bash/sh
if os.name == "nt":
return ["cmd.exe", "/d", "/s", "/c", command]
return ["bash", "-lc", command]
def _is_subpath(base: str, target: str) -> bool:
try:
base_real = os.path.realpath(base)
target_real = os.path.realpath(target)
common = os.path.commonpath([base_real, target_real])
return common == base_real
except Exception:
return False
def _validate_params(command: str, shell: str, cwd: Optional[str], timeout_seconds: int) -> Optional[str]:
if not command or not isinstance(command, str):
return "Ошибка: параметр 'command' должен быть непустой строкой."
if len(command) > MAX_COMMAND_LENGTH:
return f"Ошибка: команда слишком длинная (>{MAX_COMMAND_LENGTH} символов)."
shell_lower = (shell or DEFAULT_SHELL).lower()
if shell_lower not in ALLOWED_SHELLS:
return f"Ошибка: оболочка '{shell}' не разрешена. Разрешено: {', '.join(sorted(ALLOWED_SHELLS))}."
if cwd:
if not os.path.isdir(cwd):
return f"Ошибка: каталог cwd не найден: {cwd}"
if ALLOWED_CWD_ROOT and not _is_subpath(ALLOWED_CWD_ROOT, cwd):
return f"Ошибка: доступ к каталогу вне разрешённого корня: {ALLOWED_CWD_ROOT}"
if timeout_seconds <= 0:
return "Ошибка: timeout_seconds должен быть > 0"
return None
async def _read_stream_with_limit(stream: asyncio.StreamReader, max_bytes: int) -> Tuple[str, bool, int]:
stored = bytearray()
total = 0
truncated = False
try:
while True:
chunk = await stream.read(65536)
if not chunk:
break
total += len(chunk)
if len(stored) < max_bytes:
remaining = max_bytes - len(stored)
if len(chunk) <= remaining:
stored.extend(chunk)
else:
stored.extend(chunk[:remaining])
truncated = True
if len(stored) >= max_bytes and total > max_bytes:
truncated = True
else:
truncated = True
text = stored.decode("utf-8", errors="replace")
return text, truncated, len(stored)
except Exception:
# On read error, mark as error/truncated
return stored.decode("utf-8", errors="replace"), True, len(stored)
async def _terminate_process_tree(process: asyncio.subprocess.Process) -> None:
with contextlib.suppress(Exception):
if os.name != "nt":
# Try terminating the whole process group
os.killpg(process.pid, signal.SIGTERM) # type: ignore[arg-type]
else:
process.terminate()
try:
await asyncio.wait_for(process.wait(), timeout=KILL_GRACE_SECONDS)
return
except asyncio.TimeoutError:
pass
with contextlib.suppress(Exception):
if os.name != "nt":
os.killpg(process.pid, signal.SIGKILL) # type: ignore[arg-type]
else:
process.kill()
with contextlib.suppress(Exception):
await asyncio.wait_for(process.wait(), timeout=KILL_GRACE_SECONDS)
async def _run_subprocess(command: str, shell: str, cwd: Optional[str], timeout_seconds: int) -> Tuple[int, str, str, bool, bool, float]:
"""Run subprocess with limits. Returns (exit_code, stdout, stderr, stdout_trunc, stderr_trunc, duration_sec)."""
# Clamp timeout to MAX_EXEC_SECONDS
if timeout_seconds > MAX_EXEC_SECONDS:
logger.debug("Clamping timeout_seconds=%s to MAX_EXEC_SECONDS=%s", timeout_seconds, MAX_EXEC_SECONDS)
timeout_seconds = MAX_EXEC_SECONDS
args = _build_shell_command(command, shell)
start_ts = time.perf_counter()
creationflags = 0
start_new_session = False
preexec_fn = None
if os.name == "nt":
creationflags = subprocess.CREATE_NEW_PROCESS_GROUP # type: ignore[attr-defined]
else:
# On POSIX, create new session so we can signal the group
start_new_session = True
async with _concurrency_semaphore:
METRICS["running"] += 1
METRICS["total"] += 1
try:
try:
process = await asyncio.create_subprocess_exec(
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=cwd or None,
start_new_session=start_new_session,
creationflags=creationflags,
preexec_fn=preexec_fn,
)
except FileNotFoundError:
METRICS["spawn_failures"] += 1
return -1, "", f"Ошибка: оболочка '{shell}' недоступна в текущей системе.", False, False, 0.0
except Exception as e:
METRICS["spawn_failures"] += 1
return -1, "", f"Ошибка запуска процесса: {e}", False, False, 0.0
assert process.stdout is not None and process.stderr is not None
read_stdout_task = asyncio.create_task(_read_stream_with_limit(process.stdout, MAX_STDOUT_BYTES))
read_stderr_task = asyncio.create_task(_read_stream_with_limit(process.stderr, MAX_STDERR_BYTES))
try:
await asyncio.wait_for(process.wait(), timeout=timeout_seconds)
stdout_text, stdout_trunc, stored_out = await read_stdout_task
stderr_text, stderr_trunc, stored_err = await read_stderr_task
duration = time.perf_counter() - start_ts
METRICS["bytes_stdout"] += stored_out
METRICS["bytes_stderr"] += stored_err
if stdout_trunc:
METRICS["truncated_stdout"] += 1
if stderr_trunc:
METRICS["truncated_stderr"] += 1
if process.returncode == 0:
METRICS["success"] += 1
else:
METRICS["error"] += 1
METRICS["duration_total_seconds"] += duration
return process.returncode or 0, stdout_text, stderr_text, stdout_trunc, stderr_trunc, duration
except asyncio.TimeoutError:
METRICS["timeout"] += 1
await _terminate_process_tree(process)
# Ensure streams are consumed after kill
with contextlib.suppress(Exception):
stdout_text, stdout_trunc, stored_out = await read_stdout_task
METRICS["bytes_stdout"] += stored_out
if stdout_trunc:
METRICS["truncated_stdout"] += 1
with contextlib.suppress(Exception):
stderr_text, stderr_trunc, stored_err = await read_stderr_task
METRICS["bytes_stderr"] += stored_err
if stderr_trunc:
METRICS["truncated_stderr"] += 1
duration = time.perf_counter() - start_ts
METRICS["duration_total_seconds"] += duration
return 124, stdout_text if 'stdout_text' in locals() else "", stderr_text if 'stderr_text' in locals() else "", True, True, duration
finally:
METRICS["running"] = max(0, METRICS["running"] - 1)
def _format_summary(exit_code: int, stdout: str, stderr: str, stdout_trunc: bool, stderr_trunc: bool) -> str:
out = []
out.append(f"exit_code={exit_code}")
out.append("--- stdout ---")
if stdout_trunc:
out.append(stdout + "\n[truncated]")
else:
out.append(stdout)
out.append("--- stderr ---")
if stderr_trunc:
out.append(stderr + "\n[truncated]")
else:
out.append(stderr)
return "\n".join(out)
@server.tool(description=
"Выполняет команду в терминале. Поддерживаемые оболочки: powershell, cmd, bash.\n"
"Параметры: command (str), shell (str, optional), cwd (str, optional), timeout_seconds (int, optional)."
)
async def run_command(
command: str,
shell: str = "bash",
cwd: Optional[str] = None,
timeout_seconds: int = 120,
) -> str:
err = _validate_params(command, shell, cwd, timeout_seconds)
if err:
return err
exit_code, stdout_text, stderr_text, out_trunc, err_trunc, _ = await _run_subprocess(command, shell, cwd, timeout_seconds)
return _format_summary(exit_code, stdout_text, stderr_text, out_trunc, err_trunc)
class RunCommandJsonOutput(BaseModel):
exit_code: int
stdout: str
stderr: str
text: str
@server.tool(
name="run_command_json",
description=(
"Выполняет команду и возвращает структурированный JSON: exit_code, stdout, stderr, text.\n"
"Параметры: command (str), shell (str, optional), cwd (str, optional), timeout_seconds (int, optional)."
),
structured_output=True,
)
async def run_command_json(
command: str,
shell: str = "bash",
cwd: Optional[str] = None,
timeout_seconds: int = 120,
) -> RunCommandJsonOutput:
err = _validate_params(command, shell, cwd, timeout_seconds)
if err:
return RunCommandJsonOutput(exit_code=-1, stdout="", stderr=err, text=err)
exit_code, stdout_text, stderr_text, out_trunc, err_trunc, _ = await _run_subprocess(command, shell, cwd, timeout_seconds)
summary = _format_summary(exit_code, stdout_text, stderr_text, out_trunc, err_trunc)
return RunCommandJsonOutput(exit_code=exit_code, stdout=stdout_text, stderr=stderr_text, text=summary)
@server.custom_route("/health", methods=["GET"], name="health")
async def health_check(request):
uptime = time.time() - START_TIME
return JSONResponse({
"status": "ok",
"name": server._mcp_server.name,
"uptime_seconds": int(uptime),
"running": METRICS.get("running", 0),
"max_concurrency": MAX_CONCURRENCY,
})
@server.custom_route("/info", methods=["GET"], name="info")
async def info(request):
try:
from mcp.shared.version import SUPPORTED_PROTOCOL_VERSIONS
from mcp.types import DEFAULT_NEGOTIATED_VERSION
except Exception:
SUPPORTED_PROTOCOL_VERSIONS = []
DEFAULT_NEGOTIATED_VERSION = None
return JSONResponse({
"name": server._mcp_server.name,
"debug": server.settings.debug,
"json_response": server.settings.json_response,
"stateless_http": server.settings.stateless_http,
"streamable_http_path": server.settings.streamable_http_path,
"supported_versions": SUPPORTED_PROTOCOL_VERSIONS,
"default_version": DEFAULT_NEGOTIATED_VERSION,
"config": {
"allowed_shells": sorted(list(ALLOWED_SHELLS)),
"default_shell": DEFAULT_SHELL,
"max_stdout_bytes": MAX_STDOUT_BYTES,
"max_stderr_bytes": MAX_STDERR_BYTES,
"max_command_length": MAX_COMMAND_LENGTH,
"max_concurrency": MAX_CONCURRENCY,
"kill_grace_seconds": KILL_GRACE_SECONDS,
"max_exec_seconds": MAX_EXEC_SECONDS,
"allowed_cwd_root": ALLOWED_CWD_ROOT,
},
})
@server.custom_route("/metrics", methods=["GET"], name="metrics")
async def metrics_route(request):
# Simple Prometheus text format
lines = []
lines.append(f"terminalmcp_running {METRICS.get('running', 0)}")
lines.append(f"terminalmcp_total {METRICS.get('total', 0)}")
lines.append(f"terminalmcp_success {METRICS.get('success', 0)}")
lines.append(f"terminalmcp_error {METRICS.get('error', 0)}")
lines.append(f"terminalmcp_timeout {METRICS.get('timeout', 0)}")
lines.append(f"terminalmcp_spawn_failures {METRICS.get('spawn_failures', 0)}")
lines.append(f"terminalmcp_bytes_stdout {METRICS.get('bytes_stdout', 0)}")
lines.append(f"terminalmcp_bytes_stderr {METRICS.get('bytes_stderr', 0)}")
lines.append(f"terminalmcp_truncated_stdout {METRICS.get('truncated_stdout', 0)}")
lines.append(f"terminalmcp_truncated_stderr {METRICS.get('truncated_stderr', 0)}")
uptime = time.time() - START_TIME
lines.append(f"terminalmcp_uptime_seconds {int(uptime)}")
lines.append(f"terminalmcp_duration_total_seconds {METRICS.get('duration_total_seconds', 0.0)}")
text = "\n".join(lines) + "\n"
return PlainTextResponse(text)
@server.custom_route("/tools/list", methods=["GET"], name="tools_list")
async def tools_list_route(request: Request):
tools = []
try:
for tool in server._mcp_server.tools: # type: ignore[attr-defined]
tools.append({
"name": tool.name,
"description": tool.description,
"structured_output": getattr(tool, "structured_output", False),
})
except Exception:
pass
return JSONResponse({"tools": tools})
@server.custom_route("/tools/run_command", methods=["POST"], name="tools_run_command")
async def tools_run_command_route(request: Request):
try:
body = await request.json()
except Exception:
return JSONResponse({"error": "Invalid JSON body"}, status_code=400)
command = body.get("command")
shell = body.get("shell", DEFAULT_SHELL)
cwd = body.get("cwd")
timeout_seconds = body.get("timeout_seconds", 120)
err = _validate_params(command, shell, cwd, timeout_seconds)
if err:
return JSONResponse({"error": err}, status_code=400)
exit_code, stdout_text, stderr_text, out_trunc, err_trunc, _ = await _run_subprocess(command, shell, cwd, timeout_seconds)
return JSONResponse({
"exit_code": exit_code,
"stdout": stdout_text,
"stderr": stderr_text,
"stdout_truncated": out_trunc,
"stderr_truncated": err_trunc,
})
@server.custom_route("/tools/run_command_json", methods=["POST"], name="tools_run_command_json")
async def tools_run_command_json_route(request: Request):
try:
body = await request.json()
except Exception:
return JSONResponse({"exit_code": -1, "stdout": "", "stderr": "Invalid JSON body", "text": "Invalid JSON body"}, status_code=400)
command = body.get("command")
shell = body.get("shell", DEFAULT_SHELL)
cwd = body.get("cwd")
timeout_seconds = body.get("timeout_seconds", 120)
err = _validate_params(command, shell, cwd, timeout_seconds)
if err:
return JSONResponse({
"exit_code": -1,
"stdout": "",
"stderr": err,
"text": err,
}, status_code=400)
exit_code, stdout_text, stderr_text, out_trunc, err_trunc, _ = await _run_subprocess(command, shell, cwd, timeout_seconds)
summary = _format_summary(exit_code, stdout_text, stderr_text, out_trunc, err_trunc)
return JSONResponse({
"exit_code": exit_code,
"stdout": stdout_text,
"stderr": stderr_text,
"text": summary,
})
@server.custom_route("/help", methods=["GET"], name="help")
async def help_route(request):
text = (
"Streamable HTTP endpoint: POST /mcp\n\n"
"1) Initialize session (no mcp-session-id):\n"
"curl -i -N -X POST 'http://HOST:PORT/mcp' \n"
" -H 'Accept: application/json, text/event-stream' \n"
" -H 'Content-Type: application/json' \n"
" --data '{\"jsonrpc\":\"2.0\",\"id\":\"1\",\"method\":\"initialize\",\"params\":{\"protocolVersion\":\"2025-03-26\",\"capabilities\":{},\"clientInfo\":{\"name\":\"curl\",\"version\":\"0.1.0\"}}}'\n\n"
"2) Use returned mcp-session-id for subsequent requests.\n"
"curl -i -N -X POST 'http://HOST:PORT/mcp' \n"
" -H 'Accept: application/json, text/event-stream' \n"
" -H 'Content-Type: application/json' \n"
" -H 'mcp-session-id: <SID>' \n"
" --data '{\"jsonrpc\":\"2.0\",\"id\":\"2\",\"method\":\"tools/list\",\"params\":{}}'\n\n"
"Direct HTTP helpers:\n"
"GET /tools/list\n"
"POST /tools/run_command JSON: {command, shell?, cwd?, timeout_seconds?}\n"
"POST /tools/run_command_json JSON: {command, shell?, cwd?, timeout_seconds?}\n"
)
return PlainTextResponse(text)
# Удаляем функцию lifespan, так как она больше не нужна
# @contextlib.asynccontextmanager
# async def lifespan(app: Starlette):
# async with server.session_manager.run():
# yield
def main() -> None:
# Запускаем MCP-сервер напрямую через uvicorn
host = os.environ.get("MCP_HOST", "0.0.0.0")
try:
port = int(os.environ.get("MCP_PORT", "3889"))
except Exception:
port = 3889
log_level = "debug" if server.settings.debug else os.environ.get("UVICORN_LOG_LEVEL", "info")
# Try to enable uvloop if available and not on Windows
if os.name != "nt" and _env_bool("MCP_USE_UVLOOP", True):
try:
import uvloop # type: ignore
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
logger.info("uvloop enabled")
except Exception:
logger.debug("uvloop not available")
uvicorn.run(
server.streamable_http_app(),
host=host,
port=port,
log_level=log_level,
timeout_keep_alive=_env_int("UVICORN_TIMEOUT_KEEP_ALIVE", 15),
limit_concurrency=_env_int("UVICORN_LIMIT_CONCURRENCY", 0) or None,
# Note: workers>1 is not compatible with in-memory session state; use external LB for scale-out
)
if __name__ == "__main__":
main()