#!/usr/bin/env python3
"""
Minecraft RCON MCP Server
An MCP (Model Context Protocol) server that bridges AI agents with Minecraft servers
through RCON commands, log monitoring, SQLite database queries, and plugin health checks.
Battle-tested in real AI-assisted Minecraft plugin development workflows.
"""
from __future__ import annotations
import asyncio
import os
import re
import sqlite3
import sys
from contextlib import asynccontextmanager
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from aiomcrcon import Client as RconClient
from mcp.server.fastmcp import FastMCP
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class Config:
"""Server configuration loaded from environment variables."""
rcon_host: str
rcon_port: int
rcon_password: str
server_dir: Path
log_file: Path
database_path: Path | None
log_prefix: str
@classmethod
def from_env(cls) -> Config:
"""Build config from environment variables.
Required:
RCON_HOST — RCON server host (default: localhost)
RCON_PORT — RCON server port (default: 25575)
RCON_PASSWORD — RCON password
SERVER_DIR — Minecraft server root directory
Optional:
DATABASE_PATH — SQLite database path (auto-detected if empty)
LOG_PREFIX — Prefix for filtered log reading (default: [TEST])
"""
server_dir = Path(os.environ.get("SERVER_DIR", ""))
db_path_env = os.environ.get("DATABASE_PATH", "")
database_path = Path(db_path_env) if db_path_env else None
return cls(
rcon_host=os.environ.get("RCON_HOST", "localhost"),
rcon_port=int(os.environ.get("RCON_PORT", "25575")),
rcon_password=os.environ.get("RCON_PASSWORD", ""),
server_dir=server_dir,
log_file=server_dir / "logs" / "latest.log",
database_path=database_path,
log_prefix=os.environ.get("LOG_PREFIX", "[TEST]"),
)
# ---------------------------------------------------------------------------
# RCON Manager — connection lifecycle + auto-reconnect
# ---------------------------------------------------------------------------
_MAX_RETRIES = 3
class RconManager:
"""Manages the RCON connection with automatic reconnection."""
def __init__(self, host: str, port: int, password: str) -> None:
self._host = host
self._port = port
self._password = password
self._client: RconClient | None = None
self._connected = False
async def connect(self) -> None:
"""Establish the RCON connection."""
self._client = RconClient(self._host, self._port, self._password)
await self._client.connect()
self._connected = True
_log(f"RCON connected to {self._host}:{self._port}")
async def disconnect(self) -> None:
"""Close the RCON connection."""
if self._client:
try:
await self._client.close()
except Exception:
pass
self._client = None
self._connected = False
async def send(self, command: str) -> str:
"""Send a command with auto-reconnect on failure.
Retries up to ``_MAX_RETRIES`` times with a 1-second delay between
attempts. Returns the server response text.
"""
last_error: Exception | None = None
for attempt in range(_MAX_RETRIES):
try:
if not self._connected or self._client is None:
await self.connect()
assert self._client is not None
response = await self._client.send_cmd(command)
return response[0] if response else ""
except Exception as exc:
last_error = exc
self._connected = False
if attempt < _MAX_RETRIES - 1:
_log(f"RCON command failed (attempt {attempt + 1}), reconnecting...")
await asyncio.sleep(1)
raise RuntimeError(f"RCON command failed after {_MAX_RETRIES} attempts: {last_error}")
def _log(msg: str) -> None:
"""Write a log line to *stderr* (stdout is reserved for MCP stdio transport)."""
print(msg, file=sys.stderr, flush=True)
# ---------------------------------------------------------------------------
# Log utilities
# ---------------------------------------------------------------------------
def _read_log_tail(log_file: Path, lines: int = 50) -> str:
"""Return the last *lines* lines from *log_file*."""
if not log_file.exists():
return "Error: Log file not found."
with open(log_file, "r", encoding="utf-8", errors="ignore") as fh:
all_lines = fh.readlines()
return "".join(all_lines[-lines:])
def _get_log_position(log_file: Path) -> int:
"""Return the current byte-size of *log_file* (used as a position cursor)."""
if not log_file.exists():
return 0
return log_file.stat().st_size
def _read_log_since(log_file: Path, position: int) -> tuple[str, int]:
"""Read new content appended after *position* bytes."""
if not log_file.exists():
return "Error: Log file not found.", 0
with open(log_file, "r", encoding="utf-8", errors="ignore") as fh:
fh.seek(position)
content = fh.read()
new_position = fh.tell()
return content, new_position
def _read_filtered_log(
log_file: Path,
prefix: str,
since_position: int = 0,
event_filter: str | None = None,
) -> dict[str, Any]:
"""Read log lines matching *prefix*, optionally filtered by event type.
Returns a dict with ``logs`` (str), ``position`` (int), and ``count`` (int).
The *position* value can be passed back as *since_position* for incremental
reads.
"""
if not log_file.exists():
return {"logs": "Error: Log file not found.", "position": 0, "count": 0}
with open(log_file, "r", encoding="utf-8", errors="ignore") as fh:
if since_position > 0:
fh.seek(since_position)
lines = fh.readlines()
new_position = fh.tell()
matched = [ln for ln in lines if prefix in ln]
if event_filter:
matched = [ln for ln in matched if f"{prefix} {event_filter}" in ln]
return {
"logs": "".join(matched) if matched else "(no matching log entries)",
"position": new_position,
"count": len(matched),
}
# ---------------------------------------------------------------------------
# Database utilities
# ---------------------------------------------------------------------------
_SQL_WRITE_RE = re.compile(
r"\b(INSERT|UPDATE|DELETE|DROP|ALTER|CREATE|REPLACE|TRUNCATE|ATTACH|DETACH)\b",
re.IGNORECASE,
)
def _query_database(db_path: Path, sql: str) -> str:
"""Execute a read-only SELECT against a SQLite database.
Write operations are rejected before execution. Results are returned as a
formatted ASCII table.
"""
if not db_path.exists():
return f"Error: Database file not found ({db_path})"
if _SQL_WRITE_RE.search(sql):
return "Error: Only SELECT queries are allowed. Write operations are forbidden."
normalized = sql.strip()
if not normalized.upper().startswith("SELECT"):
return "Error: Query must start with SELECT."
try:
conn = sqlite3.connect(str(db_path), timeout=5)
conn.row_factory = sqlite3.Row
cursor = conn.execute(normalized)
rows = cursor.fetchall()
conn.close()
except sqlite3.Error as exc:
return f"Error: SQL execution failed — {exc}"
except Exception as exc:
return f"Error: Unexpected query error — {exc}"
if not rows:
return "Query returned 0 rows."
columns = rows[0].keys()
widths = {
col: max(len(col), *(len(str(row[col])) for row in rows))
for col in columns
}
header = " | ".join(col.ljust(widths[col]) for col in columns)
sep = "-+-".join("-" * widths[col] for col in columns)
data = "\n".join(
" | ".join(str(row[col]).ljust(widths[col]) for col in columns)
for row in rows
)
return f"{header}\n{sep}\n{data}\n\n{len(rows)} row(s)"
# ---------------------------------------------------------------------------
# FastMCP server
# ---------------------------------------------------------------------------
# Globals populated during lifespan
_config: Config | None = None
_rcon: RconManager | None = None
@asynccontextmanager
async def _lifespan(server: FastMCP):
"""Initialise config and RCON connection; tear down on exit."""
global _config, _rcon
_config = Config.from_env()
if not _config.rcon_password:
_log("WARNING: RCON_PASSWORD is not set. Commands will fail.")
if not _config.server_dir.exists():
_log(f"WARNING: SERVER_DIR does not exist: {_config.server_dir}")
_rcon = RconManager(_config.rcon_host, _config.rcon_port, _config.rcon_password)
try:
await _rcon.connect()
except Exception as exc:
_log(f"WARNING: Initial RCON connection failed ({exc}). Will retry on first command.")
try:
yield {}
finally:
if _rcon:
await _rcon.disconnect()
mcp = FastMCP("minecraft-rcon", lifespan=_lifespan)
# ---------------------------------------------------------------------------
# Tools
# ---------------------------------------------------------------------------
@mcp.tool(
annotations={
"title": "Run RCON Command",
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": False,
"openWorldHint": True,
}
)
async def run_command(command: str) -> str:
"""Execute an RCON command on the Minecraft server (without leading slash).
Examples: ``say Hello``, ``gamemode creative Steve``, ``tp Steve 0 64 0``
"""
assert _rcon is not None
if not command:
return "Error: Command cannot be empty."
try:
result = await _rcon.send(command)
return result if result else "(command executed, no response)"
except Exception as exc:
return f"Error: Failed to execute command — {exc}"
@mcp.tool(
annotations={
"title": "Run Command with Log Capture",
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": False,
"openWorldHint": True,
}
)
async def run_command_with_log(command: str, wait_seconds: float = 2.0) -> str:
"""Execute an RCON command and capture server log output produced afterwards.
Useful for commands whose feedback appears in the server log rather than the
RCON response (e.g. plugin commands that print results via logger).
Args:
command: The Minecraft command to execute.
wait_seconds: Seconds to wait for log output after execution (default 2).
"""
assert _rcon is not None and _config is not None
if not command:
return "Error: Command cannot be empty."
try:
log_pos = _get_log_position(_config.log_file)
result = await _rcon.send(command)
await asyncio.sleep(wait_seconds)
new_logs, _ = _read_log_since(_config.log_file, log_pos)
return (
f"=== RCON Response ===\n{result or '(no response)'}\n\n"
f"=== Server Log ===\n{new_logs or '(no new log output)'}"
)
except Exception as exc:
return f"Error: Failed to execute command — {exc}"
@mcp.tool(
annotations={
"title": "Read Server Log",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False,
}
)
async def read_server_log(lines: int = 50) -> str:
"""Read the last N lines from the Minecraft server log file.
Args:
lines: Number of lines to read from the end of the log (default 50).
"""
assert _config is not None
return _read_log_tail(_config.log_file, lines)
@mcp.tool(
annotations={
"title": "Read Filtered Test Log",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False,
}
)
async def read_test_log(since_position: int = 0, event_filter: str | None = None) -> str:
"""Read plugin test/debug logs filtered by a configurable prefix.
Only lines containing the configured LOG_PREFIX (default ``[TEST]``) are
returned. Supports incremental reading: pass the ``position`` value from a
previous call as ``since_position`` to only get new entries.
Args:
since_position: Byte offset to start reading from (0 = beginning).
event_filter: Optional event type filter (e.g. ``PASS``, ``FAIL``,
``SUMMARY``). Only lines matching ``{prefix} {event_filter}`` are
returned.
"""
assert _config is not None
result = _read_filtered_log(
_config.log_file, _config.log_prefix, since_position, event_filter
)
return (
f"=== Filtered Log ({result['count']} entries) ===\n"
f"{result['logs']}\n\n"
f"position={result['position']}"
)
@mcp.tool(
annotations={
"title": "Query SQLite Database",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False,
}
)
async def query_database(sql: str) -> str:
"""Execute a read-only SELECT query against the plugin's SQLite database.
Write operations (INSERT, UPDATE, DELETE, DROP, etc.) are blocked. Results
are returned as a formatted ASCII table.
Args:
sql: A SELECT SQL statement.
"""
assert _config is not None
if not sql:
return "Error: SQL query cannot be empty."
db = _config.database_path
if db is None:
return "Error: No DATABASE_PATH configured."
return _query_database(db, sql)
@mcp.tool(
annotations={
"title": "Check Plugin Status",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True,
}
)
async def check_plugin_status() -> str:
"""Check overall plugin health: RCON connectivity, recent errors in the
server log, and database file status.
Recommended to call before running tests or debugging.
"""
assert _rcon is not None and _config is not None
sections: list[str] = []
# 1. RCON / plugin list
try:
plugins = await _rcon.send("plugins")
sections.append(f"Plugins: {plugins}")
except Exception as exc:
sections.append(f"RCON: connection failed ({exc})")
# 2. Recent errors in log
try:
recent = _read_log_tail(_config.log_file, 200)
keywords = ("ERROR", "WARN", "Exception", "Traceback")
errors = [
ln.strip() for ln in recent.splitlines()
if any(kw in ln for kw in keywords)
]
if errors:
sections.append(f"Recent errors ({len(errors)}):")
for ln in errors[-10:]:
sections.append(f" {ln}")
else:
sections.append("Recent errors: none")
except Exception as exc:
sections.append(f"Log scan failed: {exc}")
# 3. Database file
db = _config.database_path
if db and db.exists():
size_kb = db.stat().st_size / 1024
sections.append(f"Database: exists ({size_kb:.1f} KB)")
elif db:
sections.append(f"Database: file not found ({db})")
else:
sections.append("Database: not configured")
return "\n".join(sections)