server.py•35.8 kB
#!/usr/bin/env python3
import os
# --- inserted by automated fix: ensure host/port env vars are set early ---
os.environ.setdefault('MCP_HOST', os.getenv('MCP_HOST', '0.0.0.0'))
os.environ.setdefault('MCP_PORT', os.getenv('MCP_PORT', '3000'))
os.environ.setdefault('HOST', os.environ['MCP_HOST'])
os.environ.setdefault('PORT', os.environ['MCP_PORT'])
# ------------------------------------------------------------------------
"""
Windows Server Operations MCP Server - Standalone Linux Deployment
Provides MCP tools to access backend API services for IIS, services, OS info, etc.
Transport: HTTP (fastmcp.http_app) – migrated from legacy SSE. The server mounts the
FastMCP ASGI app at root ("/") and exposes a lightweight /health endpoint. SSE paths
(/sse, /mcp/sse) are retained only as compatibility stubs returning guidance.
"""
import os
import sys
import logging
import warnings
import asyncio
from typing import Optional, Any, Dict, List
try:
from typing import Annotated
except Exception:
from typing_extensions import Annotated # type: ignore
# Attempt to load a local .env file so environment-based installs and user-mode
# starts pick up WIN_DASH_API_BASE and other settings even if the shell didn't
# source the file. This mirrors behavior in config.py.
from pathlib import Path
try:
from dotenv import load_dotenv
env_path = Path(__file__).parent / '.env'
if env_path.exists():
load_dotenv(env_path)
except Exception:
# dotenv is optional; if missing we'll rely on the environment provided by
# the caller (start scripts or systemd EnvironmentFile)
pass
import paramiko
import json
import httpx
import threading
from functools import wraps
from starlette.applications import Starlette
from starlette.responses import JSONResponse
from starlette.routing import Route, Mount
from fastmcp import FastMCP
# ---- Config via environment (no secrets in code) ----
API_BASE = os.getenv("WIN_DASH_API_BASE", "http://127.0.0.1:8000")
MCP_TOKEN = os.getenv("MCP_API_TOKEN", "")
# Default to empty strings so we don't send JSON nulls to backend Pydantic validators
DEFAULT_USER = os.getenv("WIN_DEFAULT_USER", "")
DEFAULT_PASS = os.getenv("WIN_DEFAULT_PASS", "")
# Server configuration
MCP_HOST = os.getenv("MCP_HOST", "0.0.0.0")
MCP_PORT = int(os.getenv("MCP_PORT", "3000"))
# ---- logging ----
# Ensure log directory exists under the install root so start/stop wrappers
# that expect logs/mcp-server.log will find the file.
from pathlib import Path as _Path
_LOG_DIR = _Path(__file__).parent / "logs"
try:
_LOG_DIR.mkdir(parents=True, exist_ok=True)
except Exception:
# best-effort: if we cannot create the directory, fallback to CWD
_LOG_DIR = _Path.cwd()
_LOG_FILE = _LOG_DIR / "mcp-server.log"
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
fmt = logging.Formatter("%(asctime)s %(levelname)-7s %(name)s: %(message)s")
try:
file_handler = logging.handlers.WatchedFileHandler(str(_LOG_FILE), encoding="utf-8")
except Exception:
file_handler = logging.FileHandler(str(_LOG_FILE), encoding="utf-8")
file_handler.setFormatter(fmt)
file_handler.setLevel(logging.INFO)
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setFormatter(fmt)
stream_handler.setLevel(logging.INFO)
if root_logger.handlers:
for h in list(root_logger.handlers):
try:
root_logger.removeHandler(h)
except Exception:
pass
root_logger.addHandler(file_handler)
root_logger.addHandler(stream_handler)
# Ensure common uvicorn/ASGI loggers propagate to the root logger so their
# messages also end up in our file. This captures access/error logs from
# uvicorn/uvicorn.access and related libraries.
for _name in ("uvicorn", "uvicorn.error", "uvicorn.access", "uvicorn.config"):
lg = logging.getLogger(_name)
lg.propagate = True
lg.setLevel(logging.INFO)
logger = logging.getLogger("mcp_server")
# Suppress noisy third-party deprecation warnings that don't affect runtime
# behavior, especially around websockets legacy APIs used by uvicorn.
try:
warnings.filterwarnings("ignore", category=DeprecationWarning, module="websockets")
warnings.filterwarnings("ignore", category=DeprecationWarning, module="uvicorn\.protocols\.websockets")
except Exception:
pass
logger = logging.getLogger("mcp_server")
mcp = FastMCP("WinServerDashboardMCP")
# ---- module-level registration guard + registry ----
_REGISTERED = False
TOOL_REGISTRY: List[Dict[str, Any]] = []
def _register_tool(func, name: Optional[str] = None, doc: Optional[str] = None):
"""
Helper to register a tool using mcp.tool and keep local registry.
Avoids double-registration if module is imported twice.
"""
global TOOL_REGISTRY
tool_name = name or func.__name__
tool_doc = doc or (func.__doc__ or "")
# Ensure the underlying function has the desired name/doc before registering
try:
func.__name__ = tool_name
except Exception:
pass
try:
func.__doc__ = tool_doc
except Exception:
pass
# Use mcp.tool to register (this returns a wrapped callable)
wrapped = mcp.tool()(func)
# track in registry (avoid duplicates)
if not any(t["name"] == tool_name for t in TOOL_REGISTRY):
TOOL_REGISTRY.append({"name": tool_name, "doc": tool_doc})
return wrapped
# ---- helpers ----
def _get_token_from_context(context: dict) -> Optional[str]:
if not context:
return None
headers = context.get("headers") or {}
token = headers.get("mcp-token") or headers.get("authorization")
if token:
return token.replace("Bearer ", "")
return None
def check_token(context: dict) -> bool:
return True # Disabled for testing
async def _request(method: str, path: str, params: Optional[dict] = None,
json_payload: Optional[dict] = None, headers: Optional[dict] = None,
timeout: int = 20) -> Dict[str, Any]:
url = path if path.startswith("http") else f"{API_BASE}{path}"
headers = headers or {}
backend_token = os.getenv("BACKEND_API_TOKEN")
if backend_token:
headers.setdefault("Authorization", f"Bearer {backend_token}")
async with httpx.AsyncClient() as client:
try:
if MCP_TOKEN:
headers.setdefault("X-MCP-Adapter-Token", MCP_TOKEN)
resp = await client.request(method.upper(), url, params=params, json=json_payload, headers=headers, timeout=timeout)
status = resp.status_code
try:
body = resp.json()
except Exception:
body = resp.text
if 200 <= status < 300:
return {"ok": True, "status": status, "data": body}
else:
logger.warning("Backend returned non-2xx status %s for %s %s", status, method, url)
return {"ok": False, "status": status, "error": body}
except httpx.RequestError as e:
logger.error("Request error %s %s -> %s", method, url, str(e))
return {"ok": False, "error": f"Network error: {str(e)}"}
except Exception as e:
logger.exception("Unexpected error in _request")
return {"ok": False, "error": f"Unexpected error: {str(e)}"}
# ---- auth decorator ----
def require_token(func):
"""Decorator that checks MCP token from context before calling the tool."""
@wraps(func)
async def wrapper(*args, **kwargs):
context = {}
if "mcp_context" in kwargs:
context = kwargs.pop("mcp_context") or {}
elif len(args) and isinstance(args[-1], dict) and args[-1].get("__mcp_context__"):
context = args[-1].pop("__mcp_context__", {}) or {}
if not check_token(context):
return {"ok": False, "error": "Authentication failed: missing/invalid token"}
return await func(*args, **kwargs)
return wrapper
# ---- Define tools ----
@require_token
async def list_inventory() -> dict:
"""List all applications and their servers in the inventory."""
logger.info("tool=list_inventory")
return await _request("GET", "/api/inventory/list", timeout=15)
_register_tool(list_inventory, name="list_inventory", doc="List all applications and their servers in the inventory. Returns AppIDs, application names, and server details (name, IP, function). Use this to discover available servers before running operations.")
@require_token
async def get_iis_info(
appId: str,
serverName: str,
username: Optional[str] = None,
password: Optional[str] = None
) -> dict:
"""Get IIS website information for a Windows server."""
logger.info("tool=get_iis_info appId=%s server=%s", appId, serverName)
payload = {"appId": appId, "serverName": serverName,
"username": username or DEFAULT_USER,
"password": password or DEFAULT_PASS}
return await _request("POST", "/api/server/iis", json_payload=payload, timeout=30)
_register_tool(get_iis_info, name="get_iis_info", doc="Get IIS website information for a Windows server. Returns all websites with their state, bindings, physical paths, and application pool assignments. Use this to check website configuration and health.")
@require_token
async def get_iis_apppools(
appId: str,
serverName: str,
username: Optional[str] = None,
password: Optional[str] = None
) -> dict:
"""Get IIS Application Pool status for a Windows server."""
logger.info("tool=get_iis_apppools appId=%s server=%s", appId, serverName)
payload = {"appId": appId, "serverName": serverName,
"username": username or DEFAULT_USER,
"password": password or DEFAULT_PASS}
return await _request("POST", "/api/server/iis/apppools", json_payload=payload, timeout=30)
_register_tool(get_iis_apppools, name="get_iis_apppools", doc="Get IIS Application Pool status for a Windows server. Returns all app pools with their state (Started/Stopped), CLR version, pipeline mode, identity, and advanced settings. Use this to check app pool health before restarting.")
@require_token
async def manage_iis_apppool(
appId: str,
serverName: str,
apppool_name: str,
action: str,
username: Optional[str] = None,
password: Optional[str] = None
) -> dict:
"""Manage an IIS Application Pool (start, stop, restart) on a Windows server."""
logger.info("tool=manage_iis_apppool appId=%s server=%s apppool=%s action=%s",
appId, serverName, apppool_name, action)
user = username or DEFAULT_USER
pwd = password or DEFAULT_PASS
payload = {
"servers": [{"appId": appId, "serverName": serverName, "username": user, "password": pwd}],
"pool_name": apppool_name,
"action": action
}
return await _request("POST", "/api/server/iis/apppool/manage_stream", json_payload=payload, timeout=120)
_register_tool(manage_iis_apppool, name="manage_iis_apppool", doc="Manage an IIS Application Pool (start, stop, restart) on a Windows server. Use this to restart app pools when troubleshooting IIS issues or after deployments.")
@require_token
async def get_services(
appId: str,
serverName: str,
username: Optional[str] = None,
password: Optional[str] = None
) -> dict:
"""Get all Windows services running on a server."""
logger.info("tool=get_services appId=%s server=%s", appId, serverName)
payload = {"appId": appId, "serverName": serverName,
"username": username or DEFAULT_USER,
"password": password or DEFAULT_PASS}
return await _request("POST", "/api/server/services", json_payload=payload, timeout=30)
_register_tool(get_services, name="get_services", doc="Get all Windows services running on a server with their status (Running/Stopped), startup type, and account info. Use this to check service health like W3SVC (IIS), WAS (Windows Activation), MSSQLSERVER, etc.")
@require_token
async def manage_service(
appId: str,
serverName: str,
service_name: str,
action: str,
username: Optional[str] = None,
password: Optional[str] = None
) -> dict:
"""Manage a Windows service (start, stop, restart) on a server."""
logger.info("tool=manage_service appId=%s server=%s service=%s action=%s", appId, serverName, service_name, action)
payload = {
"servers": [{"appId": appId, "serverName": serverName, "username": username or DEFAULT_USER, "password": password or DEFAULT_PASS}],
"service_name": service_name,
"action": action
}
return await _request("POST", "/api/server/service/manage_parallel", json_payload=payload, timeout=60)
_register_tool(manage_service, name="manage_service", doc="Manage a Windows service (start, stop, restart) on a server. Use this to control services like W3SVC (IIS), MSSQLSERVER, or custom application services. Provide service name exactly as shown in services list.")
@require_token
async def get_os_patching_info(
appId: str,
serverName: str,
username: Optional[str] = None,
password: Optional[str] = None
) -> dict:
"""Get OS patching status and server uptime information."""
logger.info("tool=get_os_patching_info appId=%s server=%s", appId, serverName)
payload = {"appId": appId, "serverName": serverName,
"username": username or DEFAULT_USER,
"password": password or DEFAULT_PASS}
return await _request("POST", "/api/server/os_patching", json_payload=payload, timeout=30)
_register_tool(get_os_patching_info, name="get_os_patching_info", doc="Get OS patching status and server uptime information. Returns last boot time, uptime, installed patches, pending updates, and Windows Update status. Use this to verify patch compliance and reboot requirements.")
# Generic caller and discovery
@require_token
async def call_api(
method: Annotated[str, "HTTP method: GET/POST/PUT/DELETE"],
path: Annotated[str, "Path relative to API base (e.g., /api/server/services) or full URL"],
params: Optional[dict] = None,
json_payload: Optional[dict] = None,
headers: Optional[dict] = None,
timeout: Optional[int] = 20
) -> dict:
"""Generic API caller for any backend endpoint."""
logger.info("tool=call_api method=%s path=%s", method, path)
method = (method or "GET").upper()
if method not in ("GET", "POST", "PUT", "DELETE", "PATCH"):
return {"ok": False, "error": f"Unsupported method {method}"}
if path.startswith("http") is False and not path.startswith("/"):
path = "/" + path
return await _request(method, path, params=params, json_payload=json_payload, headers=headers, timeout=timeout)
_register_tool(call_api, name="call_api", doc="Generic API caller for any backend endpoint")
@require_token
async def list_backend_endpoints() -> dict:
"""Discover backend endpoints via OpenAPI."""
logger.info("tool=list_backend_endpoints")
res = await _request("GET", "/openapi.json", timeout=15)
if not res.get("ok"):
return {"ok": False, "error": "Could not fetch OpenAPI spec", "detail": res}
spec = res.get("data") or {}
paths = spec.get("paths", {})
simple = {p: list(paths[p].keys()) for p in paths}
return {"ok": True, "data": {"openapi": spec.get("openapi"), "paths": simple}}
_register_tool(list_backend_endpoints, name="list_backend_endpoints", doc="Discover backend endpoints via OpenAPI")
@require_token
async def get_openapi_spec() -> dict:
"""Return raw OpenAPI JSON."""
logger.info("tool=get_openapi_spec")
return await _request("GET", "/openapi.json", timeout=20)
_register_tool(get_openapi_spec, name="get_openapi_spec", doc="Return raw OpenAPI JSON")
@require_token
async def list_all_endpoints() -> dict:
"""Curated endpoint list + guidance."""
logger.info("tool=list_all_endpoints")
base = [
"/api/inventory/list",
"/api/server/iis",
"/api/server/services",
"/api/server/os_patching"
]
openapi = await list_backend_endpoints()
if openapi.get("ok"):
paths = list(openapi["data"]["paths"].keys())
for p in paths:
if p not in base:
base.append(p)
return {"ok": True, "data": {"endpoints": base,
"note": "Use call_api(method, path, ...) to call any endpoint. Use list_backend_endpoints() to discover more."}}
_register_tool(list_all_endpoints, name="list_all_endpoints", doc="Curated endpoint list + guidance")
@require_token
async def assistant_agent(instruction: str) -> dict:
"""Lightweight in-process agent: map natural-language instructions to project API tools."""
import re
out = {"ok": False, "steps": [], "result": None}
text = (instruction or "").strip()
if not text:
return {"ok": False, "error": "Empty instruction"}
low = text.lower()
# helpers to find appId/server/service
def find_appid(s: str):
m = re.search(r"appid[:= ]?(\w+)", s, re.I)
return m.group(1) if m else None
def find_server(s: str):
m = re.search(r"server[:= ]?(\S+)", s, re.I)
if m:
return m.group(1)
m = re.search(r"on (\S+)", s, re.I)
return m.group(1) if m else None
def find_service_name(s: str):
m = re.search(r"service[: ]?(\S+)", s, re.I)
if m:
return m.group(1)
m = re.search(r"(?:start|stop|restart) (\S+)", s, re.I)
return m.group(1) if m else None
appId = find_appid(text)
serverName = find_server(text)
try:
# IIS info
if "iis" in low and ("info" in low or "website" in low):
if not appId or not serverName:
return {"ok": False, "error": "Please specify appId and server (e.g. 'appId=1001 server=AD')"}
out["steps"].append({"action": "get_iis_info", "appId": appId, "server": serverName})
res = await get_iis_info(appId=appId, serverName=serverName)
out.update({"ok": True, "result": res})
return out
# IIS app pools
if "apppool" in low or "app pool" in low or ("app" in low and "pool" in low):
if not any(k in low for k in ("start ", "stop ", "restart ")):
if not appId or not serverName:
return {"ok": False, "error": "Please specify appId and server (e.g. 'appId=1001 server=AD')"}
out["steps"].append({"action": "get_iis_apppools", "appId": appId, "server": serverName})
res = await get_iis_apppools(appId=appId, serverName=serverName)
out.update({"ok": True, "result": res})
return out
# Services list/status
if "service" in low and ("status" in low or "services" in low or "list" in low):
if not appId or not serverName:
return {"ok": False, "error": "Please specify appId and server (e.g. 'appId=1001 server=AD')"}
out["steps"].append({"action": "get_services", "appId": appId, "server": serverName})
res = await get_services(appId=appId, serverName=serverName)
out.update({"ok": True, "result": res})
return out
# IIS AppPool management
if any(k in low for k in ("start ", "stop ", "restart ")) and ("apppool" in low or "app pool" in low):
pool_name = None
m = re.search(r"(?:restart|start|stop)\s+(?:app ?pool\s+)?(.+?)\s+on", text, re.I)
if m:
pool_name = m.group(1)
act = None
if "start" in low:
act = "start"
elif "stop" in low:
act = "stop"
elif "restart" in low:
act = "restart"
if not pool_name:
return {"ok": False, "error": "Could not parse app pool name; use 'restart apppool DefaultAppPool on ET appId=102'"}
if not appId or not serverName:
return {"ok": False, "error": "Please specify appId and server (e.g. 'appId=102 server=ET')"}
out["steps"].append({
"action": "manage_iis_apppool",
"apppool": pool_name,
"op": act,
"appId": appId,
"server": serverName
})
payload = {
"servers": [{
"appId": appId,
"serverName": serverName,
"username": DEFAULT_USER,
"password": DEFAULT_PASS
}],
"apppool_name": pool_name,
"action": act
}
res = await _request("POST", "/api/server/iis/apppool/manage_stream", json_payload=payload, timeout=60)
out.update({"ok": True, "result": res})
return out
# Start/stop/restart service
if any(k in low for k in ("start ", "stop ", "restart ")):
svc = find_service_name(text)
act = None
if "start" in low:
act = "start"
elif "stop" in low:
act = "stop"
elif "restart" in low:
act = "restart"
if not svc:
return {"ok": False, "error": "Could not parse service name; include 'service <name>' or 'start <name>'"}
if not appId or not serverName:
return {"ok": False, "error": "Please specify appId and server (e.g. 'appId=1001 server=AD')"}
out["steps"].append({"action": "manage_service", "service": svc, "op": act, "appId": appId, "server": serverName})
res = await manage_service(appId=appId, serverName=serverName, service_name=svc, action=act)
out.update({"ok": True, "result": res})
return out
return {"ok": False, "error": "Instruction not recognized. Try: 'get iis info appId=1001 server=AD' or 'restart w3svc on AD'"}
except Exception as e:
return {"ok": False, "error": f"Agent execution failed: {str(e)}"}
_register_tool(assistant_agent, name="assistant_agent", doc="Lightweight in-process agent: map natural-language instructions to project API tools")
# ---- Debug / helper tools ----
def health() -> dict:
"""MCP adapter health check (sync)"""
return {"ok": True, "data": {"status": "running", "api_base": API_BASE}}
async def list_tools() -> dict:
"""Return the list of registered tools"""
return {"ok": True, "data": {"tools": TOOL_REGISTRY}}
_register_tool(health, name="health", doc="MCP adapter health check (sync)")
_register_tool(list_tools, name="list_tools", doc="Return the list of registered tools")
@require_token
async def refresh_openapi_tools() -> dict:
"""Refresh OpenAPI-derived tools from backend."""
try:
registered = await asyncio.to_thread(register_openapi_tools)
return {"ok": True, "data": {"registered": registered}}
except Exception as e:
logger.exception("refresh_openapi_tools failed")
return {"ok": False, "error": str(e)}
_register_tool(refresh_openapi_tools, name="refresh_openapi_tools", doc="Refresh OpenAPI-derived tools from backend (call when backend becomes available)")
def _sanitize_tool_name(method: str, path: str) -> str:
"""Create a short, safe pythonic name for a tool from method+path."""
name = f"{method.lower()}_{path.strip('/').replace('/', '_').replace('{', '').replace('}', '')}"
import re
name = re.sub(r'[^0-9a-zA-Z_]', '_', name)
if name and name[0].isdigit():
name = "t_" + name
return name
def register_openapi_tools(timeout: int = 3):
"""Fetch backend OpenAPI spec and register generic call wrappers for each path/method."""
url = API_BASE.rstrip('/') + '/openapi.json'
try:
with httpx.Client(timeout=timeout) as client:
resp = client.get(url)
if resp.status_code != 200:
logger.warning("OpenAPI fetch failed: %s %s", resp.status_code, resp.text[:200])
return 0
spec = resp.json()
except Exception as e:
logger.warning("OpenAPI fetch error for %s: %s", url, str(e))
return 0
def _build_example_from_schema(sch: dict, components: dict) -> Any:
if not isinstance(sch, dict):
return {}
if "$ref" in sch:
ref = sch["$ref"]
if ref.startswith('#/components/schemas/'):
name = ref.split('/')[-1]
comp = components.get('schemas', {}).get(name, {})
return _build_example_from_schema(comp, components)
return {}
t = sch.get('type') or 'object'
if t == 'object':
props = sch.get('properties', {})
example = {}
for k, v in props.items():
example[k] = _build_example_from_schema(v, components)
return example
if t == 'array':
items = sch.get('items', {})
return [_build_example_from_schema(items, components)]
if t == 'string':
fmt = sch.get('format', '')
if fmt == 'date-time':
return '2025-01-01T00:00:00Z'
if fmt == 'date':
return '2025-01-01'
return sch.get('example') or sch.get('default') or 'string'
if t == 'integer':
return sch.get('example') or sch.get('default') or 0
if t == 'number':
return sch.get('example') or sch.get('default') or 0.0
if t == 'boolean':
return sch.get('example') or sch.get('default') or False
return None
paths = spec.get('paths', {})
registered = 0
def make_proxy(http_method: str, api_path: str):
@require_token
async def proxy_tool(json_payload: Optional[dict] = None, params: Optional[dict] = None,
headers: Optional[dict] = None, timeout: int = 60):
logger.info("OpenAPI tool called: %s %s", http_method, api_path)
return await _request(http_method, api_path, params=params,
json_payload=json_payload, headers=headers, timeout=timeout)
return proxy_tool
for path, methods in paths.items():
for mth, meta in methods.items():
method = mth.upper()
tool_name = _sanitize_tool_name(method, path)
if any(t['name'] == tool_name for t in TOOL_REGISTRY):
logger.debug("Skipping duplicate tool: %s", tool_name)
continue
doc = ''
if isinstance(meta, dict):
doc = meta.get('summary') or meta.get('description') or ''
req_schema = None
try:
rb = meta.get('requestBody', {})
content = rb.get('content', {})
appjson = content.get('application/json') or {}
req_schema = appjson.get('schema')
except Exception:
req_schema = None
example = None
if req_schema:
try:
example = _build_example_from_schema(req_schema, spec.get('components', {}))
except Exception as e:
logger.debug("Could not build example for %s: %s", tool_name, str(e))
example = None
if example is not None:
import json as _json
try:
ex_text = _json.dumps(example, indent=2)
except Exception:
ex_text = str(example)
doc = (doc or f"{method} {path}") + "\n\nRequest example (application/json):\n" + ex_text
else:
doc = doc or f"Proxy to {method} {path}"
try:
proxy_func = make_proxy(method, path)
_register_tool(proxy_func, name=tool_name, doc=doc)
logger.info("✓ Registered OpenAPI tool: %s -> %s %s", tool_name, method, path)
registered += 1
except Exception as e:
logger.warning("✗ Failed to register OpenAPI tool %s: %s", tool_name, str(e), exc_info=True)
return registered
def start_openapi_watcher(retries: int = 12, interval_seconds: int = 5):
"""Start a daemon thread that retries OpenAPI registration until it succeeds."""
def _watcher():
try:
logger.info("OpenAPI watcher: attempting to register backend tools (retries=%s interval=%ss)", retries, interval_seconds)
for attempt in range(1, retries + 1):
try:
registered = register_openapi_tools(timeout=3)
except Exception as e:
logger.debug("OpenAPI watcher: registration attempt %s failed: %s", attempt, str(e))
registered = 0
if registered:
logger.info("OpenAPI auto-registered %s tools on attempt %s", registered, attempt)
break
else:
logger.debug("OpenAPI watcher: no tools registered on attempt %s", attempt)
if attempt < retries:
import time
time.sleep(interval_seconds)
else:
logger.info("OpenAPI watcher: exhausted %s attempts; no tools registered", retries)
except Exception:
logger.exception("OpenAPI watcher encountered an unexpected error")
t = threading.Thread(target=_watcher, name="openapi-watcher", daemon=True)
t.start()
return t
# ---- Entrypoint ----
def main():
logger.info("=" * 80)
logger.info("Starting MCP Server (WinServerDashboardMCP) - Standalone Linux Deployment")
logger.info("=" * 80)
logger.info("API Backend: %s", API_BASE)
logger.info("MCP Token: %s", "Configured" if MCP_TOKEN else "Not set (allowing all)")
logger.info("Default Credentials: %s", "Configured" if DEFAULT_USER else "Not set")
logger.info("Server: %s:%s", MCP_HOST, MCP_PORT)
# Print registered static tools
logger.info("Registered %d static tools:", len(TOOL_REGISTRY))
for tool in TOOL_REGISTRY:
logger.info(" - %s: %s", tool["name"], tool["doc"][:80] if tool["doc"] else "(no description)")
# Try to dynamically register backend OpenAPI endpoints
try:
registered = register_openapi_tools()
if registered > 0:
logger.info("✓ OpenAPI auto-registered %d additional tools", registered)
else:
logger.warning("✗ OpenAPI registration returned 0 tools - starting background watcher")
start_openapi_watcher(retries=12, interval_seconds=5)
except Exception as e:
logger.warning("Could not register OpenAPI tools at startup: %s", str(e))
logger.info("Starting background watcher to retry...")
start_openapi_watcher(retries=12, interval_seconds=5)
logger.info("MCP server ready - total tools available: %d", len(TOOL_REGISTRY))
logger.info("=" * 80)
# Ensure common env vars for underlying servers (uvicorn/etc.) are set so
# any wrappers or libraries that read HOST/PORT or MCP_HOST/MCP_PORT will
# pick up the values configured above. This avoids a situation where the
# FastMCP wrapper ignores explicit host/port arguments and falls back to
# uvicorn defaults (127.0.0.1:8000).
os.environ.setdefault("MCP_HOST", str(MCP_HOST))
os.environ.setdefault("MCP_PORT", str(MCP_PORT))
os.environ.setdefault("HOST", str(MCP_HOST))
os.environ.setdefault("PORT", str(MCP_PORT))
# Obtain ASGI application via modern HTTP transport preference.
# We call http_app() (or streamable_http_app()) directly and start uvicorn with
# explicit host/port to avoid FastMCP's internal default bind (127.0.0.1:8000).
import uvicorn
asgi_app = None
# Try callable methods first (prefer modern http_app over deprecated sse_app)
for method_name in ['http_app', 'streamable_http_app', 'sse_app']:
method = getattr(mcp, method_name, None)
if callable(method):
try:
logger.info("Attempting to obtain ASGI app via '%s'...", method_name)
if method_name == 'sse_app':
# Prefer the new factory if available
try:
from fastmcp.server.http import create_sse_app
asgi_app = create_sse_app()
logger.info("Got ASGI app from fastmcp.server.http.create_sse_app() (SSE)")
except Exception:
asgi_app = method()
logger.info("Got ASGI app from mcp.sse_app() (deprecated in fastmcp >= 2.3.2)")
else:
asgi_app = method()
logger.info("Got ASGI app from mcp.%s()", method_name)
break
except Exception as e:
logger.debug("mcp.%s() failed: %s", method_name, e)
# Fallback: try attribute access (older versions or different patterns)
if asgi_app is None:
for attr_name in ['asgi_app', 'app', 'server_app']:
attr = getattr(mcp, attr_name, None)
if attr is not None and not callable(attr):
logger.info("Found ASGI app attribute: mcp.%s", attr_name)
asgi_app = attr
break
if asgi_app is not None:
# Provide a lightweight /health endpoint and mount the FastMCP ASGI app
async def _health(request):
return JSONResponse({"ok": True, "status": "running"})
async def _deprecated_sse(request):
return JSONResponse({
"ok": False,
"deprecated": True,
"message": "SSE transport removed. Update client URL to base http://HOST:PORT without /sse.",
"expected_url": f"http://{MCP_HOST}:{MCP_PORT}",
}, status_code=410)
# If the FastMCP app exposes routes (Starlette/FastAPI), log them for troubleshooting
try:
routes = getattr(asgi_app, "routes", [])
for r in routes:
try:
path = getattr(r, "path", getattr(r, "path_regex", "?"))
methods = list(getattr(r, "methods", []) or [])
logger.info("FastMCP http_app route: %s methods=%s", path, methods)
except Exception:
pass
except Exception:
pass
# Expose /health and SSE compatibility stubs, and mount the FastMCP app at root.
# Ensure we propagate the FastMCP app lifespan to the parent Starlette app to initialize
# the StreamableHTTPSessionManager task group (required for HTTP transport).
wrapper = Starlette(routes=[
Route("/health", _health, methods=["GET"]),
Route("/sse", _deprecated_sse, methods=["GET", "POST"]),
Route("/mcp/sse", _deprecated_sse, methods=["GET", "POST"]),
Mount("/", asgi_app)
], lifespan=getattr(asgi_app, "lifespan", None))
try:
logger.info("Starting uvicorn with host=%s port=%s (lifespan=on)", MCP_HOST, MCP_PORT)
uvicorn.run(wrapper, host=MCP_HOST, port=MCP_PORT, log_level="info", lifespan="on")
except Exception:
logger.exception("uvicorn.run() failed")
raise
else:
# No ASGI app found; fall back to mcp.run() (this will likely bind to wrong port)
logger.error("No ASGI app discovered via http_app/streamable_http_app; aborting instead of falling back to deprecated SSE.")
raise RuntimeError("FastMCP ASGI app acquisition failed; ensure fastmcp>=2.3.2 installed.")
if __name__ == "__main__":
main()