# src/fctr_okta_mcp/server.py
"""
FastMCP Server Entry Point
This server uses tag-based filtering to control tool visibility.
Two different configurations based on `ENABLE_AGENT_MODE` environment variable:
- Agent mode (ENABLE_AGENT_MODE=true): Only tools with 'agent' tag visible
- Direct Tools mode (ENABLE_AGENT_MODE=false): Only tools with 'direct' tag visible
Transport options (via command-line flags):
- stdio (default): Standard I/O for local MCP clients (Claude Desktop, VS Code)
- http: Streamable HTTP for web deployments (requires --http-transport --i-understand-the-risks)
"""
import os
from typing import Optional
from dotenv import load_dotenv
from fastmcp import FastMCP
from fctr_okta_mcp.tools.meta_tools import register_meta_tools
from fctr_okta_mcp.tools.okta_tools import register_okta_tools
from fctr_okta_mcp.utils.logger import get_logger
# Load environment variables
load_dotenv()
# Initialize logger
logger = get_logger(__name__)
logger.info("Initializing Okta MCP Server...")
# ============================================================================
# CACHED OPERATIONS CATALOG
# ============================================================================
# Cached catalog - built once at startup, reused for all queries
_operations_catalog: Optional[dict] = None
def _build_operations_catalog(server) -> dict:
"""
Build the complete operations catalog from registered okta_* tools.
Returns dict with:
- operations: list of operation names ['user.list', 'group.list', ...]
- categories: list of unique categories ['user', 'group', ...]
- count: total number of operations
"""
operations = []
categories = set()
tools = server._tool_manager._tools
for tool_name in tools.keys():
if tool_name.startswith("okta_"):
# okta_user_list -> user.list
op_name = tool_name[5:] # Remove "okta_" prefix
parts = op_name.split("_")
if len(parts) >= 2:
category = parts[0]
action = "_".join(parts[1:])
op_name = f"{category}.{action}"
categories.add(category)
operations.append(op_name)
sorted_ops = sorted(operations)
return {
"operations": sorted_ops,
"categories": sorted(categories),
"count": len(sorted_ops)
}
def initialize_catalog(server) -> None:
"""Initialize the cached operations catalog at server startup."""
global _operations_catalog
_operations_catalog = _build_operations_catalog(server)
logger.info(f"Operations catalog initialized: {_operations_catalog['count']} operations in {len(_operations_catalog['categories'])} categories")
def get_operations_catalog(server, category: str = "") -> dict:
"""
Get the cached operations catalog, optionally filtered by category.
Args:
server: The FastMCP server instance
category: Optional category filter (e.g., 'user', 'group')
Returns:
dict with operations, categories, and count
"""
global _operations_catalog
# Initialize if not already done (fallback for edge cases)
if _operations_catalog is None:
initialize_catalog(server)
# Return filtered or full catalog
if category:
filtered_ops = [op for op in _operations_catalog["operations"] if op.startswith(f"{category}.")]
return {
"operations": filtered_ops,
"categories": [category] if filtered_ops else [],
"count": len(filtered_ops)
}
return _operations_catalog
def get_tool_details(server, tool_name: str) -> dict:
"""
Get detailed information for a specific tool including docstring.
Args:
server: The FastMCP server instance
tool_name: The tool name (e.g., 'okta_user_list')
Returns:
dict with description, parameters, and docstring
"""
tools = server._tool_manager._tools
if tool_name not in tools:
raise ValueError(f"Tool '{tool_name}' not found in registry")
tool = tools[tool_name]
# Get the docstring from the underlying function
docstring = ""
if hasattr(tool, 'fn') and tool.fn.__doc__:
docstring = tool.fn.__doc__
elif hasattr(tool, '__doc__') and tool.__doc__:
docstring = tool.__doc__
return {
"description": getattr(tool, 'description', ""),
"parameters": getattr(tool, 'parameters', {}),
"docstring": docstring.strip() if docstring else "",
}
# ============================================================================
# SERVER INITIALIZATION
# ============================================================================
# Check environment variable (Direct Mode is default for standard MCP compatibility)
enable_agent_mode = os.getenv("ENABLE_AGENT_MODE", "false").lower() == "true"
# Create server with tag-based filtering
if enable_agent_mode:
# Agent Mode - only show tools with 'agent' tag
mcp = FastMCP(
name="fctr-okta-mcp-server",
include_tags={"agent"}
)
else:
# Direct Tools Mode - only show tools with 'direct' tag
mcp = FastMCP(
name="fctr-okta-mcp-server",
include_tags={"direct"}
)
# Register ALL tools (filtering happens via tags)
register_meta_tools(mcp) # 4 tools: 2 with "agent" only, 2 with both "agent" and "direct"
register_okta_tools(mcp) # 50+ tools with "okta" and "direct" (needed for catalog even in agent mode)
# Initialize the operations catalog cache AFTER tools are registered
initialize_catalog(mcp)
# Log which mode we're running in and what tools are visible
mode_name = "Agent Mode" if enable_agent_mode else "Direct Tools Mode"
visible_tools = [name for name in mcp._tool_manager._tools.keys()]
logger.info(f"Server running in {mode_name} - {len(visible_tools)} tools visible to MCP client")
# ============================================================================
# CUSTOM HTTP ROUTES (for HTTP transport mode)
# ============================================================================
# These routes are served alongside the MCP endpoint when running in HTTP mode.
# Using @mcp.custom_route per FastMCP docs: https://gofastmcp.com/servers/http
import re
from starlette.requests import Request
from starlette.responses import FileResponse, Response, PlainTextResponse
# UUID pattern for filename validation (security: prevent path traversal)
_UUID_PATTERN = re.compile(r'^[a-f0-9]{32}\.csv$')
@mcp.custom_route("/health", methods=["GET"])
async def health_check(request: Request) -> PlainTextResponse:
"""Health check endpoint for monitoring."""
return PlainTextResponse("OK")
@mcp.custom_route("/results/{filename:path}", methods=["GET"])
async def serve_csv_results(request: Request) -> Response:
"""
Serve CSV result files with security checks.
Only serves files with UUID-based filenames to prevent enumeration.
Validates path to prevent directory traversal attacks.
"""
from fctr_okta_mcp.resources.csv_output import DEFAULT_OUTPUT_DIR
filename = request.path_params.get("filename", "")
# Security: Only allow UUID-based .csv filenames
if not _UUID_PATTERN.match(filename):
return Response("Not Found", status_code=404)
filepath = os.path.join(DEFAULT_OUTPUT_DIR, filename)
# Security: Verify file is within allowed directory (prevent path traversal)
real_path = os.path.realpath(filepath)
real_output_dir = os.path.realpath(DEFAULT_OUTPUT_DIR)
if not real_path.startswith(real_output_dir):
return Response("Forbidden", status_code=403)
if not os.path.exists(filepath):
return Response("Not Found", status_code=404)
return FileResponse(
filepath,
media_type="text/csv",
filename=filename
)
def run_server(http_transport: bool = False, i_understand_the_risks: bool = False):
"""
Run the MCP server with the configured transport.
Args:
http_transport: Enable HTTP streaming transport (requires i_understand_the_risks=True)
i_understand_the_risks: Acknowledge security implications of HTTP transport
Transport options:
- stdio (default): Standard I/O for local MCP clients
- http: Streamable HTTP for web deployments (requires explicit flags)
HTTP-specific env vars:
- MCP_HOST: Host to bind to (default: 127.0.0.1)
- MCP_PORT: Port to listen on (default: 8000)
"""
if http_transport:
if not i_understand_the_risks:
logger.error("HTTP transport requires --i-understand-the-risks flag")
print("\n" + "=" * 70)
print("ERROR: HTTP transport requires explicit acknowledgment")
print("=" * 70)
print("\nHTTP transport exposes your MCP server over the network.")
print("This has security implications:")
print(" - Anyone with network access can connect to the server")
print(" - There is NO built-in authentication in this mode")
print(" - Your Okta API credentials could be misused")
print(" - CSV result files are accessible via /results/ endpoint")
print("\nTo proceed, you must:")
print(" 1. Understand these risks")
print(" 2. Use --i-understand-the-risks flag")
print(" 3. Consider binding to 127.0.0.1 (localhost only)")
print(" 4. Use a reverse proxy with authentication for production")
print("\nExample:")
print(" python -m fctr_okta_mcp.server --http-transport --i-understand-the-risks")
print("=" * 70 + "\n")
raise SystemExit(1)
# HTTP Streaming transport for web deployments
host = os.getenv("MCP_HOST", "127.0.0.1")
port = int(os.getenv("MCP_PORT", "8000"))
# Set base URL for CSV download links
# Users can override with MCP_BASE_URL for https or custom domains
# This is picked up by meta_tools.py to construct CSV URLs
base_url = os.getenv("MCP_BASE_URL", f"http://{host}:{port}")
os.environ["_MCP_HTTP_BASE_URL"] = base_url
# Setup results directory and cleanup
from fctr_okta_mcp.resources.csv_output import DEFAULT_OUTPUT_DIR, FILE_EXPIRY_SECONDS, cleanup_old_files
from pathlib import Path
# Ensure results directory exists
Path(DEFAULT_OUTPUT_DIR).mkdir(parents=True, exist_ok=True)
# Cleanup old CSV files from previous runs
deleted = cleanup_old_files()
if deleted > 0:
logger.info(f"Startup cleanup: removed {deleted} expired CSV file(s)")
logger.info(f"Starting HTTP server on {host}:{port}/mcp")
logger.info(f"CSV results available at http://{host}:{port}/results/ (expires in {FILE_EXPIRY_SECONDS // 60} min)")
# Print startup info (before FastMCP banner)
print("")
print("=" * 60)
print(" SECURITY NOTICE")
print("=" * 60)
print(" - HTTP transport exposes your server over the network")
print(" - Keep MCP_HOST=127.0.0.1 for local-only access")
print(" - Use a reverse proxy with auth for production")
print(" - Never expose to 0.0.0.0 without authentication")
print("=" * 60)
print("")
print(f" CSV Results: http://{host}:{port}/results/")
print(f" Health Check: http://{host}:{port}/health")
print(f" CSV Expiry: {FILE_EXPIRY_SECONDS // 60} minutes")
print("")
print("Press Ctrl+C to stop")
print("")
# Start background cleanup task
import asyncio
async def run_with_cleanup():
"""Run HTTP server with background CSV cleanup task."""
cleanup_task = None
async def csv_cleanup_loop():
"""Periodically cleanup expired CSV files."""
while True:
await asyncio.sleep(60) # Check every 60 seconds
try:
deleted = cleanup_old_files()
if deleted > 0:
logger.info(f"CSV cleanup: removed {deleted} expired file(s)")
except Exception as e:
logger.error(f"CSV cleanup error: {e}")
# Start cleanup task
cleanup_task = asyncio.create_task(csv_cleanup_loop())
logger.info("CSV cleanup background task started")
try:
# Run the MCP server (this blocks until shutdown)
await mcp.run_async(transport="http", host=host, port=port)
finally:
# Cancel cleanup task on shutdown
if cleanup_task:
cleanup_task.cancel()
try:
await cleanup_task
except asyncio.CancelledError:
pass
logger.info("CSV cleanup background task stopped")
# Run the async function
asyncio.run(run_with_cleanup())
else:
# STDIO transport (default) for local MCP clients
logger.info("Starting STDIO server")
mcp.run(transport="stdio")
def main():
"""Entry point with argument parsing."""
import argparse
parser = argparse.ArgumentParser(
description="FCTR Okta MCP Server",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# STDIO mode (default, for Claude Desktop / VS Code)
python -m fctr_okta_mcp.server
# HTTP streaming mode (for web deployments)
python -m fctr_okta_mcp.server --http-transport --i-understand-the-risks
"""
)
parser.add_argument(
"--http-transport",
action="store_true",
help="Enable HTTP streaming transport instead of STDIO"
)
parser.add_argument(
"--i-understand-the-risks",
action="store_true",
help="Acknowledge security implications of HTTP transport"
)
args = parser.parse_args()
run_server(http_transport=args.http_transport, i_understand_the_risks=args.i_understand_the_risks)
if __name__ == "__main__":
main()