"""
FastAPI-based MCP server for Active Directory with Streamable HTTP support.
This module provides full HTTP transport with:
- Streamable HTTP Transport (GET/POST/DELETE) for Gemini CLI compatibility
- Bearer Token authentication for security
- Multi-tenant support (Skills, Ramada, Grupo Wink)
- Session management for long-running connections
Skills IT Solucoes em Tecnologia
"""
import os
import sys
import json
import logging
import asyncio
import uuid
import signal
from datetime import datetime
from typing import Optional, List, Dict, Any
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, Request, Header, Depends
from fastapi.responses import StreamingResponse, JSONResponse
from pydantic import BaseModel, Field
import uvicorn
from .config.loader import load_config, validate_config
from .core.logging import setup_logging
from .core.ldap_manager import LDAPManager
from .core.client_security import init_security_manager
from .core.client_registry import get_client_registry, list_configured_clients as registry_list_clients
from .tools.user import UserTools
from .tools.group import GroupTools
from .tools.computer import ComputerTools
from .tools.organizational_unit import OrganizationalUnitTools
from .tools.security import SecurityTools
from .tools.prompts import PROMPTS_CATALOG, handle_get_prompt
# ============= MODELS =============
class MCPRequest(BaseModel):
"""MCP JSON-RPC request model."""
jsonrpc: str = "2.0"
method: str
params: Optional[Dict[str, Any]] = None
id: Optional[Any] = None
class MCPResponse(BaseModel):
"""MCP JSON-RPC response model."""
jsonrpc: str = "2.0"
result: Optional[Any] = None
error: Optional[Dict[str, Any]] = None
id: Optional[Any] = None
# ============= SESSION MANAGEMENT =============
mcp_sessions: Dict[str, Dict] = {}
def cleanup_expired_sessions():
"""Remove sessions older than 15 minutes."""
now = datetime.utcnow()
expired = [
sid for sid, data in mcp_sessions.items()
if (now - data.get("created_at", now)).total_seconds() > 900
]
for sid in expired:
del mcp_sessions[sid]
# ============= AUTHENTICATION =============
async def verify_bearer_token(authorization: str = Header(None)) -> str:
"""
Verify Bearer token from Authorization header.
Token is configured via AD_MCP_API_TOKEN environment variable.
"""
expected_token = os.getenv("AD_MCP_API_TOKEN")
if not expected_token:
# Se nao houver token configurado, permitir acesso (desenvolvimento)
return "no_auth_configured"
if not authorization:
raise HTTPException(
status_code=401,
detail="Authorization header required",
headers={"WWW-Authenticate": "Bearer"}
)
if not authorization.startswith("Bearer "):
raise HTTPException(
status_code=401,
detail="Invalid authorization format. Use: Bearer <token>",
headers={"WWW-Authenticate": "Bearer"}
)
token = authorization.replace("Bearer ", "")
if token != expected_token:
raise HTTPException(
status_code=403,
detail="Invalid or expired token"
)
return token
# ============= SERVER CLASS =============
class ActiveDirectoryMCPFastAPI:
"""
FastAPI-based MCP server for Active Directory.
Supports:
- Streamable HTTP Transport (Claude + Gemini)
- Bearer Token authentication
- Multi-tenant client identification
- Full AD management operations
"""
def __init__(
self,
config_path: Optional[str] = None,
host: str = "0.0.0.0",
port: int = 8820,
):
self.config_path = config_path
self.host = host
self.port = port
# Load configuration
self.config = load_config(config_path)
validate_config(self.config)
# Setup logging
self.logger = setup_logging(self.config.logging)
# Initialize security manager
self._init_client_security()
# Initialize LDAP manager
self.ldap_manager = LDAPManager(
self.config.active_directory,
self.config.security,
self.config.performance
)
# Test connection
self._test_connection()
# Initialize tools
self.user_tools = UserTools(self.ldap_manager)
self.group_tools = GroupTools(self.ldap_manager)
self.computer_tools = ComputerTools(self.ldap_manager)
self.ou_tools = OrganizationalUnitTools(self.ldap_manager)
self.security_tools = SecurityTools(self.ldap_manager)
# Build tool registry
self._build_tool_registry()
# Create FastAPI app
self.app = self._create_app()
def _init_client_security(self):
"""Initialize client security manager."""
try:
if self.config_path and os.path.exists(self.config_path):
with open(self.config_path, 'r') as f:
config_dict = json.load(f)
else:
config_dict = {}
self.security_manager = init_security_manager(config_dict, self.config_path)
self.logger.info(f"Client: {self.security_manager.client_name}")
except Exception as e:
self.logger.warning(f"Security manager init failed: {e}")
self.security_manager = None
def _test_connection(self):
"""Test LDAP connection on startup."""
try:
info = self.ldap_manager.test_connection()
if info.get('connected'):
self.logger.info(f"LDAP connected: {info.get('server')}")
else:
self.logger.error(f"LDAP failed: {info.get('error')}")
except Exception as e:
self.logger.error(f"LDAP test error: {e}")
def _check_write_permission(self, operation: str, target: str,
automation_token: Optional[str] = None,
client_confirmation: Optional[str] = None) -> dict:
"""Check if write operation is permitted."""
if not self.security_manager:
return {"permitted": True, "mode": "no_security"}
return self.security_manager.check_write_permission(
operation, target, automation_token, client_confirmation
)
def _build_tool_registry(self):
"""Build registry of all available tools."""
self.tools = {}
# Client identification tools
self.tools["get_client_info"] = {
"description": "Get client/tenant information for this AD instance. ALWAYS call this first to confirm which client's AD you are connected to.",
"inputSchema": {"type": "object", "properties": {}},
"handler": self._handle_get_client_info
}
self.tools["list_ad_clients"] = {
"description": "List all configured AD clients. Use this to check which clients have AD configured before trying to access an AD.",
"inputSchema": {"type": "object", "properties": {}},
"handler": self._handle_list_ad_clients
}
self.tools["check_client_exists"] = {
"description": "Check if a specific client has AD configured. Use this before any AD operation to validate the client exists.",
"inputSchema": {
"type": "object",
"properties": {
"client_name": {"type": "string", "description": "Client name or slug"}
},
"required": ["client_name"]
},
"handler": self._handle_check_client_exists
}
# User tools
self.tools["list_users"] = {
"description": "List users in Active Directory",
"inputSchema": {
"type": "object",
"properties": {
"ou": {"type": "string", "description": "OU DN to search in", "anyOf": [{"type": "string"}, {"type": "null"}]},
"filter_criteria": {"type": "string", "description": "LDAP filter", "anyOf": [{"type": "string"}, {"type": "null"}]},
"attributes": {"type": "array", "items": {}, "description": "Attributes to retrieve", "anyOf": [{"type": "array"}, {"type": "null"}]}
}
},
"handler": self._handle_list_users
}
self.tools["get_user"] = {
"description": "Get detailed information about a specific user",
"inputSchema": {
"type": "object",
"properties": {
"username": {"type": "string", "description": "Username (sAMAccountName)"},
"attributes": {"anyOf": [{"type": "array", "items": {}}, {"type": "null"}]}
},
"required": ["username"]
},
"handler": self._handle_get_user
}
self.tools["create_user"] = {
"description": "Create a new user in Active Directory. IMPORTANT: Confirm client with get_client_info first!",
"inputSchema": {
"type": "object",
"properties": {
"username": {"type": "string"},
"password": {"type": "string"},
"first_name": {"type": "string"},
"last_name": {"type": "string"},
"email": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"ou": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"additional_attributes": {"anyOf": [{"type": "object"}, {"type": "null"}]},
"automation_token": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"client_confirmation": {"anyOf": [{"type": "string"}, {"type": "null"}]}
},
"required": ["username", "password", "first_name", "last_name"]
},
"handler": self._handle_create_user
}
self.tools["modify_user"] = {
"description": "Modify user attributes. IMPORTANT: Confirm client with get_client_info first!",
"inputSchema": {
"type": "object",
"properties": {
"username": {"type": "string"},
"attributes": {"type": "object"},
"automation_token": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"client_confirmation": {"anyOf": [{"type": "string"}, {"type": "null"}]}
},
"required": ["username", "attributes"]
},
"handler": self._handle_modify_user
}
self.tools["delete_user"] = {
"description": "Delete a user from Active Directory. IMPORTANT: Confirm client with get_client_info first!",
"inputSchema": {
"type": "object",
"properties": {
"username": {"type": "string"},
"automation_token": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"client_confirmation": {"anyOf": [{"type": "string"}, {"type": "null"}]}
},
"required": ["username"]
},
"handler": self._handle_delete_user
}
self.tools["enable_user"] = {
"description": "Enable a user account. IMPORTANT: Confirm client with get_client_info first!",
"inputSchema": {
"type": "object",
"properties": {
"username": {"type": "string"},
"automation_token": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"client_confirmation": {"anyOf": [{"type": "string"}, {"type": "null"}]}
},
"required": ["username"]
},
"handler": self._handle_enable_user
}
self.tools["disable_user"] = {
"description": "Disable a user account. IMPORTANT: Confirm client with get_client_info first!",
"inputSchema": {
"type": "object",
"properties": {
"username": {"type": "string"},
"automation_token": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"client_confirmation": {"anyOf": [{"type": "string"}, {"type": "null"}]}
},
"required": ["username"]
},
"handler": self._handle_disable_user
}
self.tools["reset_user_password"] = {
"description": "Reset user password. IMPORTANT: Confirm client with get_client_info first!",
"inputSchema": {
"type": "object",
"properties": {
"username": {"type": "string"},
"new_password": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"force_change": {"type": "boolean", "default": True},
"password_never_expires": {"type": "boolean", "default": False},
"automation_token": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"client_confirmation": {"anyOf": [{"type": "string"}, {"type": "null"}]}
},
"required": ["username"]
},
"handler": self._handle_reset_user_password
}
self.tools["get_user_groups"] = {
"description": "Get groups that a user is member of",
"inputSchema": {
"type": "object",
"properties": {
"username": {"type": "string"}
},
"required": ["username"]
},
"handler": self._handle_get_user_groups
}
# Group tools
self.tools["list_groups"] = {
"description": "List groups in Active Directory",
"inputSchema": {
"type": "object",
"properties": {
"ou": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"filter_criteria": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"attributes": {"anyOf": [{"type": "array", "items": {}}, {"type": "null"}]}
}
},
"handler": self._handle_list_groups
}
self.tools["get_group"] = {
"description": "Get detailed information about a specific group",
"inputSchema": {
"type": "object",
"properties": {
"group_name": {"type": "string"},
"attributes": {"anyOf": [{"type": "array", "items": {}}, {"type": "null"}]}
},
"required": ["group_name"]
},
"handler": self._handle_get_group
}
self.tools["create_group"] = {
"description": "Create a new group in Active Directory. IMPORTANT: Confirm client with get_client_info first!",
"inputSchema": {
"type": "object",
"properties": {
"group_name": {"type": "string"},
"display_name": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"description": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"ou": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"group_scope": {"type": "string", "default": "Global"},
"group_type": {"type": "string", "default": "Security"},
"additional_attributes": {"anyOf": [{"type": "object"}, {"type": "null"}]},
"automation_token": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"client_confirmation": {"anyOf": [{"type": "string"}, {"type": "null"}]}
},
"required": ["group_name"]
},
"handler": self._handle_create_group
}
self.tools["modify_group"] = {
"description": "Modify group attributes. IMPORTANT: Confirm client with get_client_info first!",
"inputSchema": {
"type": "object",
"properties": {
"group_name": {"type": "string"},
"attributes": {"type": "object"},
"automation_token": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"client_confirmation": {"anyOf": [{"type": "string"}, {"type": "null"}]}
},
"required": ["group_name", "attributes"]
},
"handler": self._handle_modify_group
}
self.tools["delete_group"] = {
"description": "Delete a group from Active Directory. IMPORTANT: Confirm client with get_client_info first!",
"inputSchema": {
"type": "object",
"properties": {
"group_name": {"type": "string"},
"automation_token": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"client_confirmation": {"anyOf": [{"type": "string"}, {"type": "null"}]}
},
"required": ["group_name"]
},
"handler": self._handle_delete_group
}
self.tools["add_group_member"] = {
"description": "Add a member to a group. IMPORTANT: Confirm client with get_client_info first!",
"inputSchema": {
"type": "object",
"properties": {
"group_name": {"type": "string"},
"member_dn": {"type": "string"},
"automation_token": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"client_confirmation": {"anyOf": [{"type": "string"}, {"type": "null"}]}
},
"required": ["group_name", "member_dn"]
},
"handler": self._handle_add_group_member
}
self.tools["remove_group_member"] = {
"description": "Remove a member from a group. IMPORTANT: Confirm client with get_client_info first!",
"inputSchema": {
"type": "object",
"properties": {
"group_name": {"type": "string"},
"member_dn": {"type": "string"},
"automation_token": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"client_confirmation": {"anyOf": [{"type": "string"}, {"type": "null"}]}
},
"required": ["group_name", "member_dn"]
},
"handler": self._handle_remove_group_member
}
self.tools["get_group_members"] = {
"description": "Get members of a group",
"inputSchema": {
"type": "object",
"properties": {
"group_name": {"type": "string"},
"recursive": {"type": "boolean", "default": False}
},
"required": ["group_name"]
},
"handler": self._handle_get_group_members
}
# Computer tools
self.tools["list_computers"] = {
"description": "List computer objects in Active Directory",
"inputSchema": {
"type": "object",
"properties": {
"ou": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"filter_criteria": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"attributes": {"anyOf": [{"type": "array", "items": {}}, {"type": "null"}]}
}
},
"handler": self._handle_list_computers
}
self.tools["get_computer"] = {
"description": "Get detailed information about a specific computer",
"inputSchema": {
"type": "object",
"properties": {
"computer_name": {"type": "string"},
"attributes": {"anyOf": [{"type": "array", "items": {}}, {"type": "null"}]}
},
"required": ["computer_name"]
},
"handler": self._handle_get_computer
}
self.tools["create_computer"] = {
"description": "Create a new computer object in Active Directory. IMPORTANT: Confirm client with get_client_info first!",
"inputSchema": {
"type": "object",
"properties": {
"computer_name": {"type": "string"},
"description": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"ou": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"dns_hostname": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"additional_attributes": {"anyOf": [{"type": "object"}, {"type": "null"}]},
"automation_token": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"client_confirmation": {"anyOf": [{"type": "string"}, {"type": "null"}]}
},
"required": ["computer_name"]
},
"handler": self._handle_create_computer
}
self.tools["modify_computer"] = {
"description": "Modify computer attributes. IMPORTANT: Confirm client with get_client_info first!",
"inputSchema": {
"type": "object",
"properties": {
"computer_name": {"type": "string"},
"attributes": {"type": "object"},
"automation_token": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"client_confirmation": {"anyOf": [{"type": "string"}, {"type": "null"}]}
},
"required": ["computer_name", "attributes"]
},
"handler": self._handle_modify_computer
}
self.tools["delete_computer"] = {
"description": "Delete a computer from Active Directory. IMPORTANT: Confirm client with get_client_info first!",
"inputSchema": {
"type": "object",
"properties": {
"computer_name": {"type": "string"},
"automation_token": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"client_confirmation": {"anyOf": [{"type": "string"}, {"type": "null"}]}
},
"required": ["computer_name"]
},
"handler": self._handle_delete_computer
}
self.tools["enable_computer"] = {
"description": "Enable a computer account. IMPORTANT: Confirm client with get_client_info first!",
"inputSchema": {
"type": "object",
"properties": {
"computer_name": {"type": "string"},
"automation_token": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"client_confirmation": {"anyOf": [{"type": "string"}, {"type": "null"}]}
},
"required": ["computer_name"]
},
"handler": self._handle_enable_computer
}
self.tools["disable_computer"] = {
"description": "Disable a computer account. IMPORTANT: Confirm client with get_client_info first!",
"inputSchema": {
"type": "object",
"properties": {
"computer_name": {"type": "string"},
"automation_token": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"client_confirmation": {"anyOf": [{"type": "string"}, {"type": "null"}]}
},
"required": ["computer_name"]
},
"handler": self._handle_disable_computer
}
self.tools["reset_computer_password"] = {
"description": "Reset computer account password. IMPORTANT: Confirm client with get_client_info first!",
"inputSchema": {
"type": "object",
"properties": {
"computer_name": {"type": "string"},
"automation_token": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"client_confirmation": {"anyOf": [{"type": "string"}, {"type": "null"}]}
},
"required": ["computer_name"]
},
"handler": self._handle_reset_computer_password
}
self.tools["get_stale_computers"] = {
"description": "Get stale computers (not logged in for specified days)",
"inputSchema": {
"type": "object",
"properties": {
"days": {"type": "integer", "default": 90}
}
},
"handler": self._handle_get_stale_computers
}
# OU tools
self.tools["list_organizational_units"] = {
"description": "List Organizational Units in Active Directory",
"inputSchema": {
"type": "object",
"properties": {
"parent_ou": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"filter_criteria": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"attributes": {"anyOf": [{"type": "array", "items": {}}, {"type": "null"}]},
"recursive": {"type": "boolean", "default": True}
}
},
"handler": self._handle_list_organizational_units
}
self.tools["get_organizational_unit"] = {
"description": "Get detailed information about a specific Organizational Unit",
"inputSchema": {
"type": "object",
"properties": {
"ou_dn": {"type": "string"},
"attributes": {"anyOf": [{"type": "array", "items": {}}, {"type": "null"}]}
},
"required": ["ou_dn"]
},
"handler": self._handle_get_organizational_unit
}
self.tools["create_organizational_unit"] = {
"description": "Create a new Organizational Unit. IMPORTANT: Confirm client with get_client_info first!",
"inputSchema": {
"type": "object",
"properties": {
"name": {"type": "string"},
"parent_ou": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"description": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"managed_by": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"additional_attributes": {"anyOf": [{"type": "object"}, {"type": "null"}]},
"automation_token": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"client_confirmation": {"anyOf": [{"type": "string"}, {"type": "null"}]}
},
"required": ["name"]
},
"handler": self._handle_create_organizational_unit
}
self.tools["modify_organizational_unit"] = {
"description": "Modify OU attributes. IMPORTANT: Confirm client with get_client_info first!",
"inputSchema": {
"type": "object",
"properties": {
"ou_dn": {"type": "string"},
"attributes": {"type": "object"},
"automation_token": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"client_confirmation": {"anyOf": [{"type": "string"}, {"type": "null"}]}
},
"required": ["ou_dn", "attributes"]
},
"handler": self._handle_modify_organizational_unit
}
self.tools["delete_organizational_unit"] = {
"description": "Delete an Organizational Unit. IMPORTANT: Confirm client with get_client_info first!",
"inputSchema": {
"type": "object",
"properties": {
"ou_dn": {"type": "string"},
"force": {"type": "boolean", "default": False},
"automation_token": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"client_confirmation": {"anyOf": [{"type": "string"}, {"type": "null"}]}
},
"required": ["ou_dn"]
},
"handler": self._handle_delete_organizational_unit
}
self.tools["move_organizational_unit"] = {
"description": "Move an OU to a new parent. IMPORTANT: Confirm client with get_client_info first!",
"inputSchema": {
"type": "object",
"properties": {
"ou_dn": {"type": "string"},
"new_parent_dn": {"type": "string"},
"automation_token": {"anyOf": [{"type": "string"}, {"type": "null"}]},
"client_confirmation": {"anyOf": [{"type": "string"}, {"type": "null"}]}
},
"required": ["ou_dn", "new_parent_dn"]
},
"handler": self._handle_move_organizational_unit
}
self.tools["get_organizational_unit_contents"] = {
"description": "Get contents of an OU",
"inputSchema": {
"type": "object",
"properties": {
"ou_dn": {"type": "string"},
"object_types": {"anyOf": [{"type": "array", "items": {}}, {"type": "null"}]}
},
"required": ["ou_dn"]
},
"handler": self._handle_get_organizational_unit_contents
}
# Security tools
self.tools["get_domain_info"] = {
"description": "Get domain information and security settings",
"inputSchema": {"type": "object", "properties": {}},
"handler": self._handle_get_domain_info
}
self.tools["get_privileged_groups"] = {
"description": "Get information about privileged groups",
"inputSchema": {"type": "object", "properties": {}},
"handler": self._handle_get_privileged_groups
}
self.tools["get_user_permissions"] = {
"description": "Get effective permissions for a user",
"inputSchema": {
"type": "object",
"properties": {
"username": {"type": "string"}
},
"required": ["username"]
},
"handler": self._handle_get_user_permissions
}
self.tools["get_inactive_users"] = {
"description": "Get inactive users",
"inputSchema": {
"type": "object",
"properties": {
"days": {"type": "integer", "default": 90},
"include_disabled": {"type": "boolean", "default": False}
}
},
"handler": self._handle_get_inactive_users
}
self.tools["get_password_policy_violations"] = {
"description": "Get users with password policy violations",
"inputSchema": {"type": "object", "properties": {}},
"handler": self._handle_get_password_policy_violations
}
self.tools["audit_admin_accounts"] = {
"description": "Audit administrative accounts",
"inputSchema": {"type": "object", "properties": {}},
"handler": self._handle_audit_admin_accounts
}
# System tools
self.tools["test_connection"] = {
"description": "Test LDAP connection",
"inputSchema": {"type": "object", "properties": {}},
"handler": self._handle_test_connection
}
self.tools["health"] = {
"description": "Health check for Active Directory MCP server",
"inputSchema": {"type": "object", "properties": {}},
"handler": self._handle_health
}
self.tools["get_schema_info"] = {
"description": "Get schema information for all available tools",
"inputSchema": {"type": "object", "properties": {}},
"handler": self._handle_get_schema_info
}
# ============= TOOL HANDLERS =============
def _format_result(self, result) -> dict:
"""Format tool result for MCP response."""
if isinstance(result, list) and len(result) > 0:
# Handle MCP TextContent format
if hasattr(result[0], 'text'):
try:
return json.loads(result[0].text)
except:
return {"text": result[0].text}
if isinstance(result, dict):
return result
return {"result": str(result)}
def _handle_get_client_info(self, params: dict) -> dict:
if self.security_manager:
return self.security_manager.get_client_info()
return {
"client": {"name": "Unknown", "slug": "unknown"},
"domain": {"name": self.config.active_directory.domain},
"warning": "Security manager not initialized"
}
def _handle_list_ad_clients(self, params: dict) -> dict:
try:
result = registry_list_clients()
if self.security_manager:
result["current_instance"] = {
"name": self.security_manager.client_name,
"slug": self.security_manager.client_slug,
"domain": self.security_manager.domain
}
return result
except Exception as e:
return {"error": str(e)}
def _handle_check_client_exists(self, params: dict) -> dict:
client_name = params.get("client_name", "")
try:
registry = get_client_registry()
client = registry.get_client(client_name)
if client:
return {"exists": True, "client": client}
available = registry.list_client_names()
return {"exists": False, "searched_for": client_name, "available": available}
except Exception as e:
return {"error": str(e), "exists": False}
def _handle_list_users(self, params: dict) -> dict:
result = self.user_tools.list_users(
params.get("ou"), params.get("filter_criteria"), params.get("attributes")
)
return self._format_result(result)
def _handle_get_user(self, params: dict) -> dict:
result = self.user_tools.get_user(params["username"], params.get("attributes"))
return self._format_result(result)
def _handle_create_user(self, params: dict) -> dict:
perm = self._check_write_permission("create_user", params["username"],
params.get("automation_token"),
params.get("client_confirmation"))
if not perm.get("permitted"):
return perm
result = self.user_tools.create_user(
params["username"], params["password"], params["first_name"], params["last_name"],
params.get("email"), params.get("ou"), params.get("additional_attributes")
)
return self._format_result(result)
def _handle_modify_user(self, params: dict) -> dict:
perm = self._check_write_permission("modify_user", params["username"],
params.get("automation_token"),
params.get("client_confirmation"))
if not perm.get("permitted"):
return perm
result = self.user_tools.modify_user(params["username"], params["attributes"])
return self._format_result(result)
def _handle_delete_user(self, params: dict) -> dict:
perm = self._check_write_permission("delete_user", params["username"],
params.get("automation_token"),
params.get("client_confirmation"))
if not perm.get("permitted"):
return perm
result = self.user_tools.delete_user(params["username"])
return self._format_result(result)
def _handle_enable_user(self, params: dict) -> dict:
perm = self._check_write_permission("enable_user", params["username"],
params.get("automation_token"),
params.get("client_confirmation"))
if not perm.get("permitted"):
return perm
result = self.user_tools.enable_user(params["username"])
return self._format_result(result)
def _handle_disable_user(self, params: dict) -> dict:
perm = self._check_write_permission("disable_user", params["username"],
params.get("automation_token"),
params.get("client_confirmation"))
if not perm.get("permitted"):
return perm
result = self.user_tools.disable_user(params["username"])
return self._format_result(result)
def _handle_reset_user_password(self, params: dict) -> dict:
perm = self._check_write_permission("reset_user_password", params["username"],
params.get("automation_token"),
params.get("client_confirmation"))
if not perm.get("permitted"):
return perm
result = self.user_tools.reset_password(
params["username"], params.get("new_password"),
params.get("force_change", True), params.get("password_never_expires", False)
)
return self._format_result(result)
def _handle_get_user_groups(self, params: dict) -> dict:
result = self.user_tools.get_user_groups(params["username"])
return self._format_result(result)
def _handle_list_groups(self, params: dict) -> dict:
result = self.group_tools.list_groups(
params.get("ou"), params.get("filter_criteria"), params.get("attributes")
)
return self._format_result(result)
def _handle_get_group(self, params: dict) -> dict:
result = self.group_tools.get_group(params["group_name"], params.get("attributes"))
return self._format_result(result)
def _handle_create_group(self, params: dict) -> dict:
perm = self._check_write_permission("create_group", params["group_name"],
params.get("automation_token"),
params.get("client_confirmation"))
if not perm.get("permitted"):
return perm
result = self.group_tools.create_group(
params["group_name"], params.get("display_name"), params.get("description"),
params.get("ou"), params.get("group_scope", "Global"),
params.get("group_type", "Security"), params.get("additional_attributes")
)
return self._format_result(result)
def _handle_modify_group(self, params: dict) -> dict:
perm = self._check_write_permission("modify_group", params["group_name"],
params.get("automation_token"),
params.get("client_confirmation"))
if not perm.get("permitted"):
return perm
result = self.group_tools.modify_group(params["group_name"], params["attributes"])
return self._format_result(result)
def _handle_delete_group(self, params: dict) -> dict:
perm = self._check_write_permission("delete_group", params["group_name"],
params.get("automation_token"),
params.get("client_confirmation"))
if not perm.get("permitted"):
return perm
result = self.group_tools.delete_group(params["group_name"])
return self._format_result(result)
def _handle_add_group_member(self, params: dict) -> dict:
perm = self._check_write_permission("add_group_member", params["group_name"],
params.get("automation_token"),
params.get("client_confirmation"))
if not perm.get("permitted"):
return perm
result = self.group_tools.add_member(params["group_name"], params["member_dn"])
return self._format_result(result)
def _handle_remove_group_member(self, params: dict) -> dict:
perm = self._check_write_permission("remove_group_member", params["group_name"],
params.get("automation_token"),
params.get("client_confirmation"))
if not perm.get("permitted"):
return perm
result = self.group_tools.remove_member(params["group_name"], params["member_dn"])
return self._format_result(result)
def _handle_get_group_members(self, params: dict) -> dict:
result = self.group_tools.get_members(params["group_name"], params.get("recursive", False))
return self._format_result(result)
def _handle_list_computers(self, params: dict) -> dict:
result = self.computer_tools.list_computers(
params.get("ou"), params.get("filter_criteria"), params.get("attributes")
)
return self._format_result(result)
def _handle_get_computer(self, params: dict) -> dict:
result = self.computer_tools.get_computer(params["computer_name"], params.get("attributes"))
return self._format_result(result)
def _handle_create_computer(self, params: dict) -> dict:
perm = self._check_write_permission("create_computer", params["computer_name"],
params.get("automation_token"),
params.get("client_confirmation"))
if not perm.get("permitted"):
return perm
result = self.computer_tools.create_computer(
params["computer_name"], params.get("description"), params.get("ou"),
params.get("dns_hostname"), params.get("additional_attributes")
)
return self._format_result(result)
def _handle_modify_computer(self, params: dict) -> dict:
perm = self._check_write_permission("modify_computer", params["computer_name"],
params.get("automation_token"),
params.get("client_confirmation"))
if not perm.get("permitted"):
return perm
result = self.computer_tools.modify_computer(params["computer_name"], params["attributes"])
return self._format_result(result)
def _handle_delete_computer(self, params: dict) -> dict:
perm = self._check_write_permission("delete_computer", params["computer_name"],
params.get("automation_token"),
params.get("client_confirmation"))
if not perm.get("permitted"):
return perm
result = self.computer_tools.delete_computer(params["computer_name"])
return self._format_result(result)
def _handle_enable_computer(self, params: dict) -> dict:
perm = self._check_write_permission("enable_computer", params["computer_name"],
params.get("automation_token"),
params.get("client_confirmation"))
if not perm.get("permitted"):
return perm
result = self.computer_tools.enable_computer(params["computer_name"])
return self._format_result(result)
def _handle_disable_computer(self, params: dict) -> dict:
perm = self._check_write_permission("disable_computer", params["computer_name"],
params.get("automation_token"),
params.get("client_confirmation"))
if not perm.get("permitted"):
return perm
result = self.computer_tools.disable_computer(params["computer_name"])
return self._format_result(result)
def _handle_reset_computer_password(self, params: dict) -> dict:
perm = self._check_write_permission("reset_computer_password", params["computer_name"],
params.get("automation_token"),
params.get("client_confirmation"))
if not perm.get("permitted"):
return perm
result = self.computer_tools.reset_computer_password(params["computer_name"])
return self._format_result(result)
def _handle_get_stale_computers(self, params: dict) -> dict:
result = self.computer_tools.get_stale_computers(params.get("days", 90))
return self._format_result(result)
def _handle_list_organizational_units(self, params: dict) -> dict:
result = self.ou_tools.list_ous(
params.get("parent_ou"), params.get("filter_criteria"),
params.get("attributes"), params.get("recursive", True)
)
return self._format_result(result)
def _handle_get_organizational_unit(self, params: dict) -> dict:
result = self.ou_tools.get_ou(params["ou_dn"], params.get("attributes"))
return self._format_result(result)
def _handle_create_organizational_unit(self, params: dict) -> dict:
perm = self._check_write_permission("create_organizational_unit", params["name"],
params.get("automation_token"),
params.get("client_confirmation"))
if not perm.get("permitted"):
return perm
result = self.ou_tools.create_ou(
params["name"], params.get("parent_ou"), params.get("description"),
params.get("managed_by"), params.get("additional_attributes")
)
return self._format_result(result)
def _handle_modify_organizational_unit(self, params: dict) -> dict:
perm = self._check_write_permission("modify_organizational_unit", params["ou_dn"],
params.get("automation_token"),
params.get("client_confirmation"))
if not perm.get("permitted"):
return perm
result = self.ou_tools.modify_ou(params["ou_dn"], params["attributes"])
return self._format_result(result)
def _handle_delete_organizational_unit(self, params: dict) -> dict:
perm = self._check_write_permission("delete_organizational_unit", params["ou_dn"],
params.get("automation_token"),
params.get("client_confirmation"))
if not perm.get("permitted"):
return perm
result = self.ou_tools.delete_ou(params["ou_dn"], params.get("force", False))
return self._format_result(result)
def _handle_move_organizational_unit(self, params: dict) -> dict:
perm = self._check_write_permission("move_organizational_unit", params["ou_dn"],
params.get("automation_token"),
params.get("client_confirmation"))
if not perm.get("permitted"):
return perm
result = self.ou_tools.move_ou(params["ou_dn"], params["new_parent_dn"])
return self._format_result(result)
def _handle_get_organizational_unit_contents(self, params: dict) -> dict:
result = self.ou_tools.get_ou_contents(params["ou_dn"], params.get("object_types"))
return self._format_result(result)
def _handle_get_domain_info(self, params: dict) -> dict:
result = self.security_tools.get_domain_info()
return self._format_result(result)
def _handle_get_privileged_groups(self, params: dict) -> dict:
result = self.security_tools.get_privileged_groups()
return self._format_result(result)
def _handle_get_user_permissions(self, params: dict) -> dict:
result = self.security_tools.get_user_permissions(params["username"])
return self._format_result(result)
def _handle_get_inactive_users(self, params: dict) -> dict:
result = self.security_tools.get_inactive_users(
params.get("days", 90), params.get("include_disabled", False)
)
return self._format_result(result)
def _handle_get_password_policy_violations(self, params: dict) -> dict:
result = self.security_tools.get_password_policy_violations()
return self._format_result(result)
def _handle_audit_admin_accounts(self, params: dict) -> dict:
result = self.security_tools.audit_admin_accounts()
return self._format_result(result)
def _handle_test_connection(self, params: dict) -> dict:
try:
return self.ldap_manager.test_connection()
except Exception as e:
return {"success": False, "error": str(e)}
def _handle_health(self, params: dict) -> dict:
health_info = {
"status": "ok",
"server": "ActiveDirectoryMCP-FastAPI",
"version": "1.0.0",
"timestamp": datetime.utcnow().isoformat(),
"transport": "streamable-http",
"ldap_connection": "unknown"
}
if self.security_manager:
health_info["client"] = {
"name": self.security_manager.client_name,
"slug": self.security_manager.client_slug,
"domain": self.security_manager.domain
}
try:
conn_info = self.ldap_manager.test_connection()
health_info["ldap_connection"] = "connected" if conn_info.get("connected") else "error"
health_info["ldap_server"] = conn_info.get("server", "unknown")
except Exception as e:
health_info["ldap_connection"] = "error"
health_info["ldap_error"] = str(e)
health_info["status"] = "degraded"
return health_info
def _handle_get_schema_info(self, params: dict) -> dict:
return {
"server": "ActiveDirectoryMCP-FastAPI",
"version": "1.0.0",
"transport": "streamable-http",
"multi_tenant": True,
"total_tools": len(self.tools),
"tools": list(self.tools.keys())
}
# ============= MCP PROTOCOL HANDLERS =============
async def handle_initialize(self, params: dict) -> dict:
"""Handle MCP initialize method."""
return {
"protocolVersion": "2024-11-05",
"serverInfo": {
"name": "mcp-active-directory",
"version": "1.0.0"
},
"capabilities": {
"tools": {},
"prompts": {}
}
}
async def handle_tools_list(self, params: dict) -> dict:
"""Handle tools/list method."""
tools_list = []
for name, tool in self.tools.items():
tools_list.append({
"name": name,
"description": tool["description"],
"inputSchema": tool["inputSchema"]
})
return {"tools": tools_list}
async def handle_tools_call(self, params: dict) -> dict:
"""Handle tools/call method."""
tool_name = params.get("name")
tool_args = params.get("arguments", {})
if tool_name not in self.tools:
raise ValueError(f"Unknown tool: {tool_name}")
handler = self.tools[tool_name]["handler"]
result = handler(tool_args)
return {
"content": [
{"type": "text", "text": json.dumps(result, indent=2, ensure_ascii=False)}
]
}
async def handle_prompts_list(self, params: dict) -> dict:
"""Handle prompts/list method."""
return {"prompts": PROMPTS_CATALOG}
async def handle_prompts_get(self, params: dict) -> dict:
"""Handle prompts/get method."""
prompt_name = params.get("name")
prompt_args = params.get("arguments", {})
if not prompt_name:
raise ValueError("Prompt name is required")
# Verificar se prompt existe
prompt = next((p for p in PROMPTS_CATALOG if p["name"] == prompt_name), None)
if not prompt:
raise ValueError(f"Prompt não encontrado: {prompt_name}")
# Chamar handler de prompts com o ldap_manager
result = await handle_get_prompt(prompt_name, prompt_args, self.ldap_manager)
return result
async def handle_request(self, request_data: dict) -> dict:
"""Handle MCP JSON-RPC request."""
method = request_data.get("method", "")
params = request_data.get("params", {})
request_id = request_data.get("id")
try:
if method == "initialize":
result = await self.handle_initialize(params)
elif method == "tools/list":
result = await self.handle_tools_list(params)
elif method == "tools/call":
result = await self.handle_tools_call(params)
elif method == "prompts/list":
result = await self.handle_prompts_list(params)
elif method == "prompts/get":
result = await self.handle_prompts_get(params)
elif method == "notifications/initialized":
return {"jsonrpc": "2.0", "result": {}, "id": request_id}
else:
raise ValueError(f"Unknown method: {method}")
return {"jsonrpc": "2.0", "result": result, "id": request_id}
except Exception as e:
self.logger.error(f"Error handling {method}: {e}")
return {
"jsonrpc": "2.0",
"error": {"code": -32000, "message": str(e)},
"id": request_id
}
# ============= FASTAPI APP CREATION =============
def _create_app(self) -> FastAPI:
"""Create FastAPI application with Streamable HTTP endpoints."""
server = self
@asynccontextmanager
async def lifespan(app: FastAPI):
server.logger.info(f"Starting AD MCP on port {server.port}")
yield
server.logger.info("Shutting down AD MCP")
server.ldap_manager.disconnect()
app = FastAPI(
title="MCP Active Directory Server",
description="MCP server for Active Directory with Streamable HTTP support",
version="1.0.0",
lifespan=lifespan
)
# Health check (no auth required)
@app.get("/health")
async def health_check():
return server._handle_health({})
# ============= STREAMABLE HTTP ENDPOINTS =============
# GET /mcp - SSE endpoint for Gemini
@app.get("/mcp")
async def mcp_sse_endpoint(
request: Request,
token: str = Depends(verify_bearer_token)
):
session_id = request.headers.get("mcp-session-id") or str(uuid.uuid4())
server.logger.info(f"SSE connection opened: {session_id}")
mcp_sessions[session_id] = {"created_at": datetime.utcnow(), "active": True}
async def event_generator():
try:
yield f"event: endpoint\ndata: /mcp\n\n"
while True:
if await request.is_disconnected():
break
if session_id not in mcp_sessions:
break
yield ":keepalive\n\n"
await asyncio.sleep(30)
except asyncio.CancelledError:
pass
finally:
if session_id in mcp_sessions:
del mcp_sessions[session_id]
server.logger.info(f"SSE connection closed: {session_id}")
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Mcp-Session-Id": session_id
}
)
# DELETE /mcp - Session termination
@app.delete("/mcp")
async def mcp_session_terminate(
request: Request,
token: str = Depends(verify_bearer_token)
):
session_id = request.headers.get("mcp-session-id")
if session_id and session_id in mcp_sessions:
del mcp_sessions[session_id]
server.logger.info(f"Session terminated: {session_id}")
return {"status": "session_terminated", "session_id": session_id}
# POST /mcp - JSON-RPC handler
@app.post("/mcp")
async def mcp_handler_http(
request: Request,
mcp_request: MCPRequest,
token: str = Depends(verify_bearer_token)
):
session_id = request.headers.get("mcp-session-id") or str(uuid.uuid4())
server.logger.info(f"MCP request: method={mcp_request.method}")
response = await server.handle_request(mcp_request.model_dump())
return JSONResponse(
content=response,
headers={"Mcp-Session-Id": session_id}
)
return app
def run(self):
"""Start the server."""
uvicorn.run(
self.app,
host=self.host,
port=self.port,
log_level="info"
)
def main():
"""Main entry point."""
import argparse
parser = argparse.ArgumentParser(description='Active Directory MCP FastAPI Server')
parser.add_argument('--host', default='0.0.0.0', help='Host (default: 0.0.0.0)')
parser.add_argument('--port', type=int, default=8820, help='Port (default: 8820)')
parser.add_argument('--config', help='Config file path')
args = parser.parse_args()
config_path = args.config or os.getenv("AD_MCP_CONFIG")
if not config_path:
print("Error: AD_MCP_CONFIG not set")
sys.exit(1)
server = ActiveDirectoryMCPFastAPI(
config_path=config_path,
host=args.host,
port=args.port
)
server.run()
if __name__ == "__main__":
main()