#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""MindsDB auto-registration for MCP Context Forge.
Registers MindsDB as a federated gateway, waits for tool discovery,
provisions teams, and creates team-scoped virtual servers.
Environment variables:
MCPGATEWAY_URL - Gateway base URL (default: http://gateway:4444)
MINDSDB_URL - MindsDB base URL (default: http://mindsdb:47334)
MINDSDB_MCP_URL - MCP SSE URL via nginx sidecar (default: http://mindsdb-mcp-proxy:47336/mcp/sse)
MINDSDB_USERNAME - MindsDB login username
MINDSDB_PASSWORD - MindsDB login password
JWT_ALGORITHM - JWT signing algorithm (default: HS256)
JWT_SECRET_KEY - JWT secret key for HS* algorithms
JWT_PRIVATE_KEY_PATH - Path to private key for RS* algorithms
JWT_PUBLIC_KEY_PATH - Path to public key for RS* algorithms
JWT_AUDIENCE - JWT audience claim (default: mcpgateway-api)
JWT_ISSUER - JWT issuer claim (default: mcpgateway)
PLATFORM_ADMIN_EMAIL - Admin email for JWT (default: admin@apollosai.dev)
TOKEN_EXPIRY - JWT expiry in minutes (default: 10080)
TOOL_DISCOVERY_TIMEOUT - Tool polling timeout in seconds (default: 120)
"""
# Standard
import json
import os
import sys
import time
import urllib.error
import urllib.request
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
GATEWAY_URL = os.environ.get("MCPGATEWAY_URL", "http://gateway:4444")
MINDSDB_URL = os.environ.get("MINDSDB_URL", "http://mindsdb:47334")
# MCP SSE URL — via nginx sidecar that rewrites Host header to bypass DNS rebinding protection.
# The MCP SDK rejects non-localhost Host headers (HTTP 421). The sidecar proxies to MindsDB
# with Host: localhost, making the MCP SSE endpoint accessible from Docker hostnames.
MINDSDB_MCP_URL = os.environ.get("MINDSDB_MCP_URL", "http://mindsdb-mcp-proxy:47336/mcp/sse")
TOOL_DISCOVERY_TIMEOUT = int(os.environ.get("TOOL_DISCOVERY_TIMEOUT", "120"))
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def wait_for_service(url: str, name: str, attempts: int = 60, interval: int = 2) -> None:
"""Block until *url* returns HTTP 200 or exhaust retries."""
for i in range(1, attempts + 1):
try:
with urllib.request.urlopen(url, timeout=5) as r:
if r.status == 200:
print(f" {name} is healthy")
return
except Exception:
pass
print(f" Waiting for {name}... ({i}/{attempts})")
time.sleep(interval)
print(f" {name} not healthy after {attempts * interval}s")
sys.exit(1)
def generate_admin_token() -> str:
"""Create a JWT with admin-bypass claims (teams: null + is_admin: true).
Builds the token directly with PyJWT to avoid depending on mcpgateway
internals (the package isn't pip-installed in the container image).
"""
# Standard
import pathlib
import uuid
# Third-Party
import jwt as pyjwt
email = os.environ.get("PLATFORM_ADMIN_EMAIL", "admin@apollosai.dev")
expiry = int(os.environ.get("TOKEN_EXPIRY", "10080"))
algo = os.environ.get("JWT_ALGORITHM", "HS256")
issuer = os.environ.get("JWT_ISSUER", "mcpgateway")
audience = os.environ.get("JWT_AUDIENCE", "mcpgateway-api")
now = int(time.time())
payload = {
"username": email,
"sub": email,
"iat": now,
"iss": issuer,
"aud": audience,
"jti": str(uuid.uuid4()),
"user": {
"email": email,
"full_name": "MindsDB Registration",
"is_admin": True,
"auth_provider": "cli",
},
"exp": now + expiry * 60,
# Admin bypass requires explicit null — not missing, not empty list
"teams": None,
}
if algo.startswith("RS"):
key = pathlib.Path(os.environ["JWT_PRIVATE_KEY_PATH"]).read_text()
else:
key = os.environ.get("JWT_SECRET_KEY", "my-test-key")
return pyjwt.encode(payload, key, algorithm=algo)
def login_mindsdb() -> str:
"""Obtain a session token from MindsDB."""
data = json.dumps(
{
"username": os.environ["MINDSDB_USERNAME"],
"password": os.environ["MINDSDB_PASSWORD"],
}
).encode()
req = urllib.request.Request(
f"{MINDSDB_URL}/api/login",
data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=30) as r:
result = json.loads(r.read())
return result.get("token") or result.get("session") or ""
def api(method: str, path: str, data: dict | None = None, token: str = "") -> dict | list:
"""Call the Context Forge Gateway REST API."""
url = f"{GATEWAY_URL}{path}"
req = urllib.request.Request(url, method=method)
req.add_header("Authorization", f"Bearer {token}")
req.add_header("Content-Type", "application/json")
if data:
req.data = json.dumps(data).encode()
with urllib.request.urlopen(req, timeout=30) as r:
body = r.read()
return json.loads(body) if body else {}
def mindsdb_sql(token: str, query: str) -> dict:
"""Execute SQL against MindsDB's REST API."""
data = json.dumps({"query": query}).encode()
req = urllib.request.Request(
f"{MINDSDB_URL}/api/sql/query",
data=data,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
},
method="POST",
)
with urllib.request.urlopen(req, timeout=30) as r:
return json.loads(r.read())
# ---------------------------------------------------------------------------
# Step 1 — Register or update the MindsDB gateway (idempotent)
# ---------------------------------------------------------------------------
def register_gateway(cf_token: str, mdb_token: str) -> str:
"""Ensure a 'mindsdb' gateway exists and return its ID."""
print("Step 1: Registering MindsDB gateway...")
gw_payload = {
"name": "mindsdb",
"url": MINDSDB_MCP_URL,
"description": ("MindsDB federated data gateway — query databases, warehouses, " "knowledge bases, and SaaS applications via SQL"),
"transport": "SSE",
"auth_type": "bearer",
"auth_token": mdb_token,
"tags": ["data-gateway", "knowledge-base", "sql", "mindsdb"],
"visibility": "public",
}
# Check for existing gateway
gateway_id = None
try:
gateways = api("GET", "/gateways", token=cf_token)
for gw in gateways:
if gw.get("name") == "mindsdb":
gateway_id = gw["id"]
break
except Exception as exc:
print(f" Note: {exc}")
# Update existing (preserves tools, avoids race conditions with replicas)
if gateway_id:
try:
api(
"PUT",
f"/gateways/{gateway_id}",
{
"auth_type": "bearer",
"auth_token": mdb_token,
"url": MINDSDB_MCP_URL,
},
token=cf_token,
)
print(f" Updated existing gateway: {gateway_id}")
return gateway_id
except Exception as exc:
print(f" Update failed ({exc}), re-creating...")
gateway_id = None
# Create new
if not gateway_id:
try:
result = api("POST", "/gateways", gw_payload, token=cf_token)
gateway_id = result.get("id") if isinstance(result, dict) else None
if not gateway_id:
print(" Gateway POST succeeded but returned no ID")
sys.exit(1)
print(f" Created gateway: {gateway_id}")
return gateway_id
except Exception as exc:
print(f" Gateway registration failed: {exc}")
sys.exit(1)
# ---------------------------------------------------------------------------
# Step 2 — Wait for tool auto-discovery
# ---------------------------------------------------------------------------
def discover_tools(cf_token: str, gateway_id: str) -> tuple[str | None, str | None]:
"""Poll until the query tool appears; return (query_id, list_databases_id).
Returns (None, None) if tools are not discovered within the timeout.
The caller should continue with remaining steps — the gateway health check
loop will discover tools later when MindsDB's MCP endpoint is ready.
"""
print("Step 2: Waiting for tool auto-discovery...")
query_tool_id = None
list_db_tool_id = None
poll_interval = 2 # seconds
for i in range(max(1, TOOL_DISCOVERY_TIMEOUT // poll_interval)):
try:
tools = api("GET", "/tools", token=cf_token)
gw_tools = [t for t in tools if t.get("gatewayId") == gateway_id]
if i % 5 == 0:
elapsed = i * poll_interval
print(f" Polling: {len(tools)} total tools, " f"{len(gw_tools)} for gateway {gateway_id[:8]}... ({elapsed}s)")
for t in gw_tools:
name = t["name"]
if name.endswith("-query") or name == "query":
query_tool_id = t["id"]
elif name.endswith("-list-databases") or name.endswith("-list_databases") or name == "list_databases":
list_db_tool_id = t["id"]
if query_tool_id:
print(f" Found query tool: {query_tool_id}")
if list_db_tool_id:
print(f" Found list_databases tool: {list_db_tool_id}")
return query_tool_id, list_db_tool_id
except Exception as exc:
if i % 5 == 0:
print(f" Polling error at {i * poll_interval}s: {exc}")
time.sleep(poll_interval)
print(f" WARNING: query tool not discovered after {TOOL_DISCOVERY_TIMEOUT}s")
print(" Continuing with remaining steps — tools will appear when MindsDB MCP is ready.")
return None, None
# ---------------------------------------------------------------------------
# Step 3 — Create teams (idempotent)
# ---------------------------------------------------------------------------
def get_or_create_team(cf_token: str, name: str, slug: str, description: str) -> str | None:
"""Return the UUID for *slug*, creating the team if necessary."""
try:
resp = api("GET", "/teams/", token=cf_token)
teams_list = resp if isinstance(resp, list) else resp.get("items", resp.get("teams", []))
for t in teams_list:
if t.get("slug") == slug or t.get("name") == name:
print(f" Team exists: {name} ({t['id']})")
return t["id"]
except Exception as exc:
print(f" Note checking teams: {exc}")
try:
result = api(
"POST",
"/teams/",
{
"name": name,
"slug": slug,
"description": description,
"visibility": "private",
},
token=cf_token,
)
team_id = result.get("id")
print(f" Created team: {name} ({team_id})")
return team_id
except Exception as exc:
print(f" Failed to create team {name}: {exc}")
return None
def provision_teams(cf_token: str) -> tuple[str | None, str | None]:
"""Ensure Legal and HR teams exist; return (legal_id, hr_id)."""
print("Step 3: Provisioning teams...")
legal = get_or_create_team(cf_token, "Legal", "legal", "Legal department team")
hr = get_or_create_team(cf_token, "HR", "hr", "Human Resources department team")
return legal, hr
# ---------------------------------------------------------------------------
# Step 4 — Create team-scoped virtual servers (idempotent)
# ---------------------------------------------------------------------------
def create_virtual_servers(
cf_token: str,
query_tool_id: str | None,
list_db_tool_id: str | None,
legal_team_id: str | None,
hr_team_id: str | None,
) -> None:
"""Create the three demo virtual servers if they don't already exist.
If tools haven't been discovered yet (query_tool_id is None), servers are
created with empty tool lists. Tools can be associated later via the API
or by re-running this script once MindsDB MCP is healthy.
"""
print("Step 4: Creating virtual servers...")
# Build tool lists — empty if tools not yet discovered
query_tools = [query_tool_id] if query_tool_id else []
admin_tools = list(query_tools)
if list_db_tool_id:
admin_tools.append(list_db_tool_id)
if not query_tool_id:
print(" NOTE: No tools discovered yet — creating servers with empty tool lists.")
print(" Associate tools later via the Admin UI or re-run this script.")
servers = [
{
"server": {
"id": "00000000-0000-0000-0000-00006c656731",
"name": "legal-team-data",
"description": ("Legal department Knowledge Base access. " "Query tool available for semantic search over legal documents."),
"tags": ["legal", "knowledge-base"],
"associated_tools": query_tools,
"team_id": legal_team_id,
"visibility": "team" if legal_team_id else "private",
}
},
{
"server": {
"id": "00000000-0000-0000-0000-006872303031",
"name": "hr-team-data",
"description": ("HR department Knowledge Base access. " "Query tool available for semantic search over HR documents."),
"tags": ["hr", "knowledge-base"],
"associated_tools": query_tools,
"team_id": hr_team_id,
"visibility": "team" if hr_team_id else "private",
}
},
{
"server": {
"id": "00000000-0000-0000-0000-00006d696e64",
"name": "admin-data-gateway",
"description": ("Full access to all MindsDB databases and knowledge bases. " "Use list_databases to see available sources, and query to execute any SQL."),
"tags": ["admin", "data-gateway"],
"associated_tools": admin_tools,
"visibility": "public",
}
},
]
for srv in servers:
name = srv["server"]["name"]
srv_id = srv["server"]["id"]
# Drop None team_id to avoid 422 validation errors
if srv["server"].get("team_id") is None:
srv["server"].pop("team_id", None)
try:
api("POST", "/servers", srv, token=cf_token)
print(f" Created virtual server: {name} ({srv_id})")
except urllib.error.HTTPError as exc:
if exc.code == 409:
print(f" Virtual server exists: {name} ({srv_id})")
else:
print(f" Failed to create {name}: {exc}")
except Exception as exc:
print(f" Failed to create {name}: {exc}")
# ---------------------------------------------------------------------------
# Step 5 — Provision Knowledge Bases in MindsDB (idempotent)
# ---------------------------------------------------------------------------
def provision_knowledge_bases(mdb_token: str) -> None:
"""Create team Knowledge Bases in MindsDB if they don't exist."""
print("Step 5: Provisioning Knowledge Bases...")
try:
result = mindsdb_sql(mdb_token, "SHOW KNOWLEDGE_BASES")
existing_kbs = [row[0] for row in result.get("data", [])]
except Exception as exc:
print(f" Could not list KBs: {exc}")
existing_kbs = []
kbs_to_create = [
("legal_kb", "Legal department knowledge base"),
("hr_kb", "Human Resources knowledge base"),
]
for kb_name, _description in kbs_to_create:
if kb_name in existing_kbs:
print(f" KB exists: {kb_name}")
continue
sql = f"CREATE KNOWLEDGE_BASE {kb_name}" f" USING" f" content_columns = ['content']," f" metadata_columns = ['source_file', 'department']"
try:
mindsdb_sql(mdb_token, sql)
print(f" Created KB: {kb_name}")
except Exception as exc:
print(f" Failed to create KB {kb_name}: {exc}")
# ---------------------------------------------------------------------------
# Step 6 — Verify registration
# ---------------------------------------------------------------------------
def verify_registration(cf_token: str, gateway_id: str) -> None:
"""Verify the gateway is healthy and tools are accessible."""
print("Step 6: Verifying registration...")
try:
gw = api("GET", f"/gateways/{gateway_id}", token=cf_token)
reachable = gw.get("reachable", False) if isinstance(gw, dict) else False
enabled = gw.get("enabled", False) if isinstance(gw, dict) else False
tools = gw.get("tools", []) if isinstance(gw, dict) else []
tool_count = len(tools)
print(f" Gateway: enabled={enabled}, reachable={reachable}, tools={tool_count}")
if not reachable:
print(" WARNING: Gateway registered but not reachable")
if tool_count == 0:
print(" WARNING: No tools discovered — attempting manual refresh...")
try:
api("POST", f"/gateways/{gateway_id}/tools/refresh", token=cf_token)
time.sleep(5)
gw = api("GET", f"/gateways/{gateway_id}", token=cf_token)
tools = gw.get("tools", []) if isinstance(gw, dict) else []
tool_count = len(tools)
print(f" After manual refresh: tools={tool_count}")
except Exception as refresh_exc:
print(f" Manual refresh failed: {refresh_exc}")
if tool_count == 0:
print(" WARNING: Still no tools — check MCP SSE connectivity")
except Exception as exc:
print(f" Verification failed: {exc}")
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> None:
print("=== MindsDB Registration ===")
print()
# Health checks — only use public endpoints (MCP endpoints require auth)
wait_for_service(f"{GATEWAY_URL}/health", "Gateway")
wait_for_service(f"{MINDSDB_URL}/api/status", "MindsDB HTTP")
# Authentication
print()
print("Generating admin token...")
cf_token = generate_admin_token()
print("Logging into MindsDB...")
mdb_token = login_mindsdb()
if not mdb_token:
print(" Failed to get MindsDB token")
sys.exit(1)
print(" MindsDB token obtained")
print()
# Registration pipeline — continues through all steps even if tools aren't
# discovered. The gateway health check loop will discover tools later when
# MindsDB's MCP endpoint is fully ready.
gateway_id = register_gateway(cf_token, mdb_token)
print()
query_tool_id, list_db_tool_id = discover_tools(cf_token, gateway_id)
print()
legal_team_id, hr_team_id = provision_teams(cf_token)
print()
create_virtual_servers(cf_token, query_tool_id, list_db_tool_id, legal_team_id, hr_team_id)
print()
provision_knowledge_bases(mdb_token)
print()
verify_registration(cf_token, gateway_id)
# Summary
tools_ok = query_tool_id is not None
print()
print("=== MindsDB registration complete ===")
print(f" Gateway: mindsdb ({gateway_id})")
print(f" Tools: query={query_tool_id or 'pending'}, list_databases={list_db_tool_id or 'pending'}")
if legal_team_id:
print(f" Team: Legal ({legal_team_id})")
if hr_team_id:
print(f" Team: HR ({hr_team_id})")
if not tools_ok:
print()
print(" WARNING: Tools not yet discovered — MindsDB MCP may still be starting.")
print(" Virtual servers were created with empty tool lists.")
print(" The gateway health check loop will auto-discover tools when ready.")
if __name__ == "__main__":
main()