from __future__ import annotations
import base64
import json
import logging
import os
import time
import traceback
from collections.abc import AsyncIterator
from contextlib import AsyncExitStack, asynccontextmanager
from contextvars import ContextVar
from dataclasses import dataclass
from typing import TYPE_CHECKING, Optional, cast
from urllib.parse import urlparse
import anyio
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
if TYPE_CHECKING:
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
import click
import httpx
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
from mcp.server.auth.settings import AuthSettings
from mcp.server.fastmcp import Context, FastMCP
from mcp.server.transport_security import TransportSecuritySettings
from pydantic import AnyHttpUrl
from starlette.applications import Starlette
from starlette.middleware.authentication import AuthenticationMiddleware
from starlette.middleware.cors import CORSMiddleware
from starlette.responses import JSONResponse, RedirectResponse
from starlette.routing import Mount, Route
from starlette.staticfiles import StaticFiles
from starlette.types import ASGIApp, Receive, Send
from starlette.types import Scope as StarletteScope
from nextcloud_mcp_server.auth import (
InsufficientScopeError,
discover_all_scopes,
get_access_token_scopes,
has_required_scopes,
is_jwt_token,
)
from nextcloud_mcp_server.auth.unified_verifier import UnifiedTokenVerifier
from nextcloud_mcp_server.client import NextcloudClient
from nextcloud_mcp_server.config import (
DeploymentMode,
Settings,
get_document_processor_config,
get_settings,
)
from nextcloud_mcp_server.config_validators import (
AuthMode,
get_mode_summary,
validate_configuration,
)
from nextcloud_mcp_server.context import get_client as get_nextcloud_client
from nextcloud_mcp_server.document_processors import get_registry
from nextcloud_mcp_server.observability import (
ObservabilityMiddleware,
setup_metrics,
setup_tracing,
)
from nextcloud_mcp_server.observability.metrics import (
record_dependency_check,
set_dependency_health,
)
from nextcloud_mcp_server.server import (
configure_calendar_tools,
configure_contacts_tools,
configure_cookbook_tools,
configure_deck_tools,
configure_news_tools,
configure_notes_tools,
configure_semantic_tools,
configure_sharing_tools,
configure_tables_tools,
configure_webdav_tools,
)
from nextcloud_mcp_server.server.oauth_tools import register_oauth_tools
from nextcloud_mcp_server.vector import processor_task, scanner_task
logger = logging.getLogger(__name__)
HTTPXClientInstrumentor().instrument()
def initialize_document_processors():
"""Initialize and register document processors based on configuration.
This function reads the environment configuration and registers available
processors (Unstructured, Tesseract, Custom HTTP) with the global registry.
"""
config = get_document_processor_config()
if not config["enabled"]:
logger.info("Document processing disabled")
return
registry = get_registry()
registered_count = 0
# Register Unstructured processor
if "unstructured" in config["processors"]:
unst_config = config["processors"]["unstructured"]
try:
from nextcloud_mcp_server.document_processors.unstructured import (
UnstructuredProcessor,
)
processor = UnstructuredProcessor(
api_url=unst_config["api_url"],
timeout=unst_config["timeout"],
default_strategy=unst_config["strategy"],
default_languages=unst_config["languages"],
progress_interval=unst_config.get("progress_interval", 10),
)
registry.register(processor, priority=10)
logger.info(f"Registered Unstructured processor: {unst_config['api_url']}")
registered_count += 1
except Exception as e:
logger.warning(f"Failed to register Unstructured processor: {e}")
# Register Tesseract processor
if "tesseract" in config["processors"]:
tess_config = config["processors"]["tesseract"]
try:
from nextcloud_mcp_server.document_processors.tesseract import (
TesseractProcessor,
)
processor = TesseractProcessor(
tesseract_cmd=tess_config.get("tesseract_cmd"),
default_lang=tess_config["lang"],
)
registry.register(processor, priority=5)
logger.info(f"Registered Tesseract processor: lang={tess_config['lang']}")
registered_count += 1
except Exception as e:
logger.warning(f"Failed to register Tesseract processor: {e}")
# Register PyMuPDF processor (high priority, local, no API required)
if "pymupdf" in config["processors"]:
pymupdf_config = config["processors"]["pymupdf"]
try:
from nextcloud_mcp_server.document_processors.pymupdf import (
PyMuPDFProcessor,
)
processor = PyMuPDFProcessor(
extract_images=pymupdf_config.get("extract_images", True),
image_dir=pymupdf_config.get("image_dir"),
)
registry.register(processor, priority=15) # Higher than unstructured
logger.info(
f"Registered PyMuPDF processor: extract_images={pymupdf_config.get('extract_images', True)}"
)
registered_count += 1
except Exception as e:
logger.warning(f"Failed to register PyMuPDF processor: {e}")
# Register custom processor
if "custom" in config["processors"]:
custom_config = config["processors"]["custom"]
try:
from nextcloud_mcp_server.document_processors.custom_http import (
CustomHTTPProcessor,
)
processor = CustomHTTPProcessor(
name=custom_config["name"],
api_url=custom_config["api_url"],
api_key=custom_config.get("api_key"),
timeout=custom_config["timeout"],
supported_types=custom_config["supported_types"],
)
registry.register(processor, priority=1)
logger.info(
f"Registered Custom processor '{custom_config['name']}': {custom_config['api_url']}"
)
registered_count += 1
except Exception as e:
logger.warning(f"Failed to register Custom processor: {e}")
if registered_count > 0:
logger.info(
f"Document processing initialized with {registered_count} processor(s): "
f"{', '.join(registry.list_processors())}"
)
else:
logger.warning("Document processing enabled but no processors registered")
def validate_pkce_support(discovery: dict, discovery_url: str) -> None:
"""
Validate that the OIDC provider properly advertises PKCE support.
According to RFC 8414, if code_challenge_methods_supported is absent,
it means the authorization server does not support PKCE.
MCP clients require PKCE with S256 and will refuse to connect if this
field is missing or doesn't include S256.
"""
code_challenge_methods = discovery.get("code_challenge_methods_supported")
if code_challenge_methods is None:
click.echo("=" * 80, err=True)
click.echo(
"ERROR: OIDC CONFIGURATION ERROR - Missing PKCE Support Advertisement",
err=True,
)
click.echo("=" * 80, err=True)
click.echo(f"Discovery URL: {discovery_url}", err=True)
click.echo("", err=True)
click.echo(
"The OIDC discovery document is missing 'code_challenge_methods_supported'.",
err=True,
)
click.echo(
"According to RFC 8414, this means the server does NOT support PKCE.",
err=True,
)
click.echo("", err=True)
click.echo("⚠️ MCP clients (like Claude Code) WILL REJECT this provider!")
click.echo("", err=True)
click.echo("How to fix:", err=True)
click.echo(
" 1. Ensure PKCE is enabled in Nextcloud OIDC app settings", err=True
)
click.echo(
" 2. Update the OIDC app to advertise PKCE support in discovery", err=True
)
click.echo(" 3. See: RFC 8414 Section 2 (Authorization Server Metadata)")
click.echo("=" * 80, err=True)
click.echo("", err=True)
return
if "S256" not in code_challenge_methods:
click.echo("=" * 80, err=True)
click.echo(
"WARNING: OIDC CONFIGURATION WARNING - S256 Challenge Method Not Advertised",
err=True,
)
click.echo("=" * 80, err=True)
click.echo(f"Discovery URL: {discovery_url}", err=True)
click.echo(f"Advertised methods: {code_challenge_methods}", err=True)
click.echo("", err=True)
click.echo("MCP specification requires S256 code challenge method.", err=True)
click.echo("Some clients may reject this provider.", err=True)
click.echo("=" * 80, err=True)
click.echo("", err=True)
return
click.echo(f"✓ PKCE support validated: {code_challenge_methods}")
@dataclass
class VectorSyncState:
"""
Module-level state for vector sync background tasks.
This singleton bridges the Starlette server lifespan (where background tasks run)
and FastMCP session lifespans (where MCP tools need access to the streams).
"""
document_send_stream: Optional[MemoryObjectSendStream] = None
document_receive_stream: Optional[MemoryObjectReceiveStream] = None
shutdown_event: Optional[anyio.Event] = None
scanner_wake_event: Optional[anyio.Event] = None
# Module-level singleton for vector sync state
_vector_sync_state = VectorSyncState()
@dataclass
class AppContext:
"""Application context for BasicAuth mode."""
client: NextcloudClient
storage: Optional["RefreshTokenStorage"] = None
document_send_stream: Optional[MemoryObjectSendStream] = None
document_receive_stream: Optional[MemoryObjectReceiveStream] = None
shutdown_event: Optional[anyio.Event] = None
scanner_wake_event: Optional[anyio.Event] = None
@dataclass
class OAuthAppContext:
"""Application context for OAuth mode."""
nextcloud_host: str
token_verifier: object # UnifiedTokenVerifier (ADR-005 compliant)
refresh_token_storage: Optional["RefreshTokenStorage"] = None
oauth_client: Optional[object] = None # NextcloudOAuthClient or KeycloakOAuthClient
oauth_provider: str = "nextcloud" # "nextcloud" or "keycloak"
server_client_id: Optional[str] = (
None # MCP server's OAuth client ID (static or DCR)
)
@dataclass
class SmitheryAppContext:
"""Application context for Smithery stateless mode.
ADR-016: No shared client - clients created per-request from session config.
"""
pass # No shared state needed - everything comes from session config
# ADR-016: Smithery config schema for container runtime
# This schema is served at /.well-known/mcp-config for Smithery discovery
# See: https://smithery.ai/docs/build/session-config
SMITHERY_CONFIG_SCHEMA = {
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "https://server.smithery.ai/nextcloud-mcp-server/.well-known/mcp-config",
"title": "Nextcloud MCP Server Configuration",
"description": "Configuration for connecting to your Nextcloud instance via app password authentication",
"x-query-style": "flat", # Our schema has no nested objects, so flat style works
"type": "object",
"required": ["nextcloud_url", "username", "app_password"],
"properties": {
"nextcloud_url": {
"type": "string",
"title": "Nextcloud URL",
"description": "Your Nextcloud instance URL (e.g., https://cloud.example.com). Must be publicly accessible.",
"pattern": "^https?://.+",
},
"username": {
"type": "string",
"title": "Username",
"description": "Your Nextcloud username",
"minLength": 1,
},
"app_password": {
"type": "string",
"title": "App Password",
"description": "Nextcloud app password. Generate at Settings > Security > App passwords. Do NOT use your main password.",
"minLength": 1,
},
},
"additionalProperties": False,
}
# ADR-016: Context variable to hold Smithery session config per-request
# This is set by SmitheryConfigMiddleware and accessed in context.py
_smithery_session_config: ContextVar[dict[str, str] | None] = ContextVar(
"smithery_session_config"
)
_smithery_session_config.set(None) # Set initial value
def get_smithery_session_config() -> dict | None:
"""Get the current Smithery session config from context variable.
Used by context.py to access config extracted from URL query parameters.
"""
return _smithery_session_config.get()
class BasicAuthMiddleware:
"""Middleware to extract BasicAuth credentials from Authorization header.
For multi-user BasicAuth pass-through mode, this middleware extracts
username/password from the Authorization: Basic header and stores them
in the request state for use by the context layer.
The credentials are NOT stored persistently - they are passed through
directly to Nextcloud APIs for each request (stateless).
"""
def __init__(self, app: ASGIApp):
self.app = app
async def __call__(
self, scope: StarletteScope, receive: Receive, send: Send
) -> None:
if scope["type"] == "http":
# Extract Authorization header
headers = dict(scope.get("headers", []))
auth_header = headers.get(b"authorization", b"")
if auth_header.startswith(b"Basic "):
try:
# Decode base64(username:password)
encoded = auth_header[6:] # Skip "Basic "
decoded = base64.b64decode(encoded).decode("utf-8")
username, password = decoded.split(":", 1)
# Store in request state
scope.setdefault("state", {})
scope["state"]["basic_auth"] = {
"username": username,
"password": password,
}
logger.debug(
f"BasicAuth credentials extracted for user: {username}"
)
except Exception as e:
logger.warning(f"Failed to extract BasicAuth credentials: {e}")
await self.app(scope, receive, send)
class SmitheryConfigMiddleware:
"""Middleware to extract Smithery config from URL query parameters.
ADR-016: For container runtime, Smithery passes configuration as URL query
parameters to the /mcp endpoint. This middleware extracts those parameters
and stores them in a context variable for access in tools.
Configuration parameters:
- nextcloud_url: Nextcloud instance URL
- username: Nextcloud username
- app_password: Nextcloud app password
The extracted config is stored in a ContextVar and can be accessed via
get_smithery_session_config() in context.py.
"""
def __init__(self, app: ASGIApp):
self.app = app
async def __call__(
self, scope: StarletteScope, receive: Receive, send: Send
) -> None:
if scope["type"] == "http":
# Extract config from query parameters
from urllib.parse import parse_qs
query_string = scope.get("query_string", b"").decode("utf-8")
params = parse_qs(query_string)
# Build session config from query parameters
# Smithery uses dot notation for nested objects, but our schema is flat
session_config = {}
for key in ["nextcloud_url", "username", "app_password"]:
if key in params:
# parse_qs returns lists, take first value
session_config[key] = params[key][0]
# Store in context variable for access by context.py
if session_config:
_smithery_session_config.set(session_config)
logger.debug(
f"Smithery config extracted: nextcloud_url={session_config.get('nextcloud_url')}, "
f"username={session_config.get('username')}"
)
try:
await self.app(scope, receive, send)
finally:
# Clear context variable after request
_smithery_session_config.set(None)
@asynccontextmanager
async def app_lifespan_smithery(server: FastMCP) -> AsyncIterator[SmitheryAppContext]:
"""
Manage application lifecycle for Smithery stateless mode.
ADR-016: Minimal lifespan with no shared state.
- No shared Nextcloud client (created per-request from session config)
- No vector sync (disabled in Smithery mode)
- No persistent storage (stateless deployment)
- No document processors (not enabled in Smithery mode)
"""
logger.info("Starting MCP server in Smithery stateless mode")
logger.info("Clients will be created per-request from session config")
try:
yield SmitheryAppContext()
finally:
logger.info("Shutting down Smithery stateless mode")
async def load_oauth_client_credentials(
nextcloud_host: str, registration_endpoint: str | None
) -> tuple[str, str]:
"""
Load OAuth client credentials from environment, storage file, or dynamic registration.
This consolidates the client loading logic that was duplicated across multiple functions.
Args:
nextcloud_host: Nextcloud instance URL
registration_endpoint: Dynamic registration endpoint URL (or None if not available)
Returns:
Tuple of (client_id, client_secret)
Raises:
ValueError: If credentials cannot be obtained
"""
# Try environment variables first
client_id = os.getenv("NEXTCLOUD_OIDC_CLIENT_ID")
client_secret = os.getenv("NEXTCLOUD_OIDC_CLIENT_SECRET")
if client_id and client_secret:
logger.info("Using pre-configured OAuth client credentials from environment")
return (client_id, client_secret)
# Try loading from SQLite storage
try:
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
storage = RefreshTokenStorage.from_env()
await storage.initialize()
client_data = await storage.get_oauth_client()
if client_data:
logger.info(
f"Loaded OAuth client from SQLite: {client_data['client_id'][:16]}..."
)
return (client_data["client_id"], client_data["client_secret"])
except ValueError:
# TOKEN_ENCRYPTION_KEY not set, skip SQLite storage check
logger.debug("SQLite storage not available (TOKEN_ENCRYPTION_KEY not set)")
# Try dynamic registration if available
if registration_endpoint:
logger.info("Dynamic client registration available")
mcp_server_url = os.getenv("NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000")
redirect_uris = [
f"{mcp_server_url}/oauth/callback", # Unified callback (flow determined by query param)
]
# MCP server DCR: Register with ALL supported scopes
# When we register as a resource server (with resource_url), the allowed_scopes
# represent what scopes are AVAILABLE for this resource, not what the server needs.
# External clients will request tokens with resource=http://localhost:8001/mcp
# and the authorization server will limit them to these allowed scopes.
#
# The PRM endpoint advertises the same scopes dynamically via @require_scopes decorators.
dcr_scopes = "openid profile email notes:read notes:write calendar:read calendar:write todo:read todo:write contacts:read contacts:write cookbook:read cookbook:write deck:read deck:write tables:read tables:write files:read files:write sharing:read sharing:write news:read news:write"
# Add offline_access scope if refresh tokens are enabled
# Use settings.enable_offline_access which handles both ENABLE_BACKGROUND_OPERATIONS (new)
# and ENABLE_OFFLINE_ACCESS (deprecated) environment variables
dcr_settings = get_settings()
enable_offline_access = dcr_settings.enable_offline_access
if enable_offline_access:
dcr_scopes = f"{dcr_scopes} offline_access"
logger.info("✓ offline_access scope enabled for refresh tokens")
logger.info(f"MCP server DCR scopes (resource server): {dcr_scopes}")
# Get token type from environment (Bearer or jwt)
# Note: Must be lowercase "jwt" to match OIDC app's check
token_type = os.getenv("NEXTCLOUD_OIDC_TOKEN_TYPE", "Bearer").lower()
# Special case: "bearer" should remain capitalized for compatibility
if token_type != "jwt":
token_type = "Bearer"
logger.info(f"Requesting token type: {token_type}")
# Ensure OAuth client in SQLite storage
from nextcloud_mcp_server.auth.client_registration import ensure_oauth_client
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
storage = RefreshTokenStorage.from_env()
await storage.initialize()
# RFC 9728: resource_url must be a URL for the protected resource
# This URL is used by token introspection to match tokens to this client
resource_url = f"{mcp_server_url}/mcp"
client_info = await ensure_oauth_client(
nextcloud_url=nextcloud_host,
registration_endpoint=registration_endpoint,
storage=storage,
client_name=f"Nextcloud MCP Server ({token_type})",
redirect_uris=redirect_uris,
scopes=dcr_scopes, # Use DCR-specific scopes (basic OIDC only)
token_type=token_type,
resource_url=resource_url, # RFC 9728 Protected Resource URL
)
logger.info(f"OAuth client ready: {client_info.client_id[:16]}...")
return (client_info.client_id, client_info.client_secret)
# No credentials available
raise ValueError(
"OAuth mode requires either:\n"
"1. NEXTCLOUD_OIDC_CLIENT_ID and NEXTCLOUD_OIDC_CLIENT_SECRET environment variables, OR\n"
"2. Pre-existing client credentials in SQLite storage (TOKEN_STORAGE_DB), OR\n"
"3. Dynamic client registration enabled on Nextcloud OIDC app\n\n"
"Note: TOKEN_ENCRYPTION_KEY is required for SQLite storage"
)
@asynccontextmanager
async def app_lifespan_basic(server: FastMCP) -> AsyncIterator[AppContext]:
"""
Manage application lifecycle for BasicAuth mode (FastMCP session lifespan).
For single-user mode: Creates a single Nextcloud client with basic authentication
that is shared across all requests within a session.
For multi-user mode: No shared client - clients created per-request by BasicAuthMiddleware.
Note: Background tasks (scanner, processor) are started at server level
in starlette_lifespan, not here. This lifespan runs per-session.
"""
settings = get_settings()
is_multi_user = settings.enable_multi_user_basic_auth
logger.info(
f"Starting MCP session in {'multi-user' if is_multi_user else 'single-user'} BasicAuth mode"
)
# Only create shared client for single-user mode
client = None
if not is_multi_user:
logger.info("Creating shared Nextcloud client with BasicAuth")
client = NextcloudClient.from_env()
logger.info("Client initialization complete")
else:
logger.info(
"Multi-user mode - clients created per-request from BasicAuth headers"
)
# Initialize persistent storage (for webhook tracking and future features)
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
storage = RefreshTokenStorage.from_env()
await storage.initialize()
logger.info("Persistent storage initialized (webhook tracking enabled)")
# Initialize document processors
initialize_document_processors()
# Yield client context - scanner runs at server level (starlette_lifespan)
# Include vector sync state from module singleton (set by starlette_lifespan)
try:
yield AppContext(
client=client, # type: ignore[arg-type] # None in multi-user mode
storage=storage,
document_send_stream=_vector_sync_state.document_send_stream,
document_receive_stream=_vector_sync_state.document_receive_stream,
shutdown_event=_vector_sync_state.shutdown_event,
scanner_wake_event=_vector_sync_state.scanner_wake_event,
)
finally:
logger.info("Shutting down BasicAuth session")
if client is not None:
await client.close()
async def setup_oauth_config():
"""
Setup OAuth configuration by performing OIDC discovery and client registration.
Auto-detects OAuth provider mode:
- Integrated mode: OIDC_DISCOVERY_URL points to NEXTCLOUD_HOST (or not set)
→ Nextcloud OIDC app provides both OAuth and API access
- External IdP mode: OIDC_DISCOVERY_URL points to external provider
→ External IdP for OAuth, Nextcloud user_oidc validates tokens and provides API access
Uses OIDC environment variables:
- OIDC_DISCOVERY_URL: OIDC discovery endpoint (optional, defaults to NEXTCLOUD_HOST)
- NEXTCLOUD_OIDC_CLIENT_ID / NEXTCLOUD_OIDC_CLIENT_SECRET: Static credentials (optional, uses DCR if not provided)
- NEXTCLOUD_OIDC_SCOPES: Requested OAuth scopes
This is done synchronously before FastMCP initialization because FastMCP
requires token_verifier at construction time.
Returns:
Tuple of (nextcloud_host, token_verifier, auth_settings, refresh_token_storage, oauth_client, oauth_provider, client_id, client_secret)
"""
# Get settings for enable_offline_access check (handles both ENABLE_BACKGROUND_OPERATIONS
# and ENABLE_OFFLINE_ACCESS environment variables)
settings = get_settings()
nextcloud_host = os.getenv("NEXTCLOUD_HOST")
if not nextcloud_host:
raise ValueError(
"NEXTCLOUD_HOST environment variable is required for OAuth mode"
)
nextcloud_host = nextcloud_host.rstrip("/")
# Get OIDC discovery URL (defaults to Nextcloud integrated mode)
discovery_url = os.getenv(
"OIDC_DISCOVERY_URL", f"{nextcloud_host}/.well-known/openid-configuration"
)
logger.info(f"Performing OIDC discovery: {discovery_url}")
# Perform OIDC discovery
async with httpx.AsyncClient(follow_redirects=True) as client:
response = await client.get(discovery_url)
response.raise_for_status()
discovery = response.json()
logger.info("✓ OIDC discovery successful")
# Validate PKCE support
validate_pkce_support(discovery, discovery_url)
# Extract OIDC endpoints
issuer = discovery["issuer"]
userinfo_uri = discovery["userinfo_endpoint"]
jwks_uri = discovery.get("jwks_uri")
introspection_uri = discovery.get("introspection_endpoint")
registration_endpoint = discovery.get("registration_endpoint")
logger.info("OIDC endpoints discovered:")
logger.info(f" Issuer: {issuer}")
logger.info(f" Userinfo: {userinfo_uri}")
if jwks_uri:
logger.info(f" JWKS: {jwks_uri}")
if introspection_uri:
logger.info(f" Introspection: {introspection_uri}")
# Auto-detect provider mode based on issuer
# External IdP mode: issuer doesn't match Nextcloud host
# Normalize URLs for comparison (handle port differences like :80 for HTTP)
def normalize_url(url: str) -> str:
"""Normalize URL by removing default ports (80 for HTTP, 443 for HTTPS)."""
parsed = urlparse(url)
# Remove default ports
if (parsed.scheme == "http" and parsed.port == 80) or (
parsed.scheme == "https" and parsed.port == 443
):
# Remove explicit default port
hostname = parsed.hostname or parsed.netloc.split(":")[0]
return f"{parsed.scheme}://{hostname}"
return f"{parsed.scheme}://{parsed.netloc}"
issuer_normalized = normalize_url(issuer)
nextcloud_normalized = normalize_url(nextcloud_host)
# Determine if this is an external IdP by comparing discovered issuer with Nextcloud host
is_external_idp = not issuer_normalized.startswith(nextcloud_normalized)
if is_external_idp:
oauth_provider = "external" # Could be Keycloak, Auth0, Okta, etc.
logger.info(
f"✓ Detected external IdP mode (issuer: {issuer} != Nextcloud: {nextcloud_host})"
)
logger.info(" Tokens will be validated via Nextcloud user_oidc app")
else:
oauth_provider = "nextcloud"
logger.info("✓ Detected integrated mode (Nextcloud OIDC app)")
# Check if offline access (refresh tokens) is enabled
# Use settings.enable_offline_access which handles both ENABLE_BACKGROUND_OPERATIONS (new)
# and ENABLE_OFFLINE_ACCESS (deprecated) environment variables
enable_offline_access = settings.enable_offline_access
# Initialize refresh token storage if enabled
refresh_token_storage = None
if enable_offline_access:
try:
from nextcloud_mcp_server.auth.storage import (
RefreshTokenStorage,
)
# Validate encryption key before initializing
encryption_key = os.getenv("TOKEN_ENCRYPTION_KEY")
if not encryption_key:
logger.warning(
"ENABLE_OFFLINE_ACCESS=true but TOKEN_ENCRYPTION_KEY not set. "
"Refresh tokens will NOT be stored. Generate a key with:\n"
' python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"'
)
else:
refresh_token_storage = RefreshTokenStorage.from_env()
await refresh_token_storage.initialize()
logger.info(
"✓ Refresh token storage initialized (offline_access enabled)"
)
except Exception as e:
logger.error(f"Failed to initialize refresh token storage: {e}")
logger.warning(
"Continuing without refresh token storage - users will need to re-authenticate after token expiration"
)
# Load client credentials (static or dynamic registration)
client_id = os.getenv("NEXTCLOUD_OIDC_CLIENT_ID")
client_secret = os.getenv("NEXTCLOUD_OIDC_CLIENT_SECRET")
if client_id and client_secret:
logger.info(f"Using static OIDC client credentials: {client_id}")
elif registration_endpoint:
logger.info(
"NEXTCLOUD_OIDC_CLIENT_ID not set, attempting Dynamic Client Registration"
)
client_id, client_secret = await load_oauth_client_credentials(
nextcloud_host=nextcloud_host, registration_endpoint=registration_endpoint
)
else:
raise ValueError(
"NEXTCLOUD_OIDC_CLIENT_ID and NEXTCLOUD_OIDC_CLIENT_SECRET environment variables are required "
"when the OIDC provider does not support Dynamic Client Registration. "
f"Discovery URL: {discovery_url}"
)
# ADR-005: Unified Token Verifier with proper audience validation
# Use public issuer URL for JWT validation if set (handles Docker internal/external URL mismatch)
# Tokens are issued with the public URL, but OIDC discovery returns internal URL
public_issuer_url = os.getenv("NEXTCLOUD_PUBLIC_ISSUER_URL")
client_issuer = public_issuer_url if public_issuer_url else issuer
# Get MCP server URL for audience validation
mcp_server_url = os.getenv("NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000")
nextcloud_resource_uri = os.getenv("NEXTCLOUD_RESOURCE_URI", nextcloud_host)
# Warn if resource URIs are not configured (required for ADR-005 compliance)
if not os.getenv("NEXTCLOUD_MCP_SERVER_URL"):
logger.warning(
f"NEXTCLOUD_MCP_SERVER_URL not set, defaulting to: {mcp_server_url}. "
"This should be set explicitly for proper audience validation."
)
if not os.getenv("NEXTCLOUD_RESOURCE_URI"):
logger.warning(
f"NEXTCLOUD_RESOURCE_URI not set, defaulting to: {nextcloud_resource_uri}. "
"This should be set explicitly for proper audience validation."
)
# Create settings for UnifiedTokenVerifier (use same settings instance from start of function)
# settings is already set at the start of setup_oauth_config()
# Override with discovered values if not set in environment
if not settings.oidc_client_id:
settings.oidc_client_id = client_id
if not settings.oidc_client_secret:
settings.oidc_client_secret = client_secret
if not settings.jwks_uri:
settings.jwks_uri = jwks_uri
if not settings.introspection_uri:
settings.introspection_uri = introspection_uri
if not settings.userinfo_uri:
settings.userinfo_uri = userinfo_uri
if not settings.oidc_issuer:
# Use client_issuer which handles public URL override
settings.oidc_issuer = client_issuer
if not settings.nextcloud_mcp_server_url:
settings.nextcloud_mcp_server_url = mcp_server_url
if not settings.nextcloud_resource_uri:
settings.nextcloud_resource_uri = nextcloud_resource_uri
# Create Unified Token Verifier (ADR-005 compliant)
token_verifier = UnifiedTokenVerifier(settings)
# Log the mode
enable_token_exchange = (
os.getenv("ENABLE_TOKEN_EXCHANGE", "false").lower() == "true"
)
if enable_token_exchange:
logger.info(
"✓ Token Exchange mode enabled (ADR-005) - exchanging MCP tokens for Nextcloud tokens via RFC 8693"
)
logger.info(f" MCP audience: {client_id} or {mcp_server_url}")
logger.info(f" Nextcloud audience: {nextcloud_resource_uri}")
else:
logger.info(
"✓ Multi-audience mode enabled (ADR-005) - tokens must contain both MCP and Nextcloud audiences"
)
logger.info(f" Required MCP audience: {client_id} or {mcp_server_url}")
logger.info(f" Required Nextcloud audience: {nextcloud_resource_uri}")
if introspection_uri:
logger.info("✓ Opaque token introspection enabled (RFC 7662)")
if jwks_uri:
logger.info("✓ JWT signature verification enabled (JWKS)")
# Progressive Consent mode (for offline access / background jobs)
encryption_key = os.getenv("TOKEN_ENCRYPTION_KEY")
if enable_offline_access and encryption_key and refresh_token_storage:
logger.info("✓ Progressive Consent mode enabled - offline access available")
# Note: Token Broker service would be initialized here for background job support
# Currently not used in ADR-005 implementation as it's specific to offline access patterns
# that are separate from the real-time token exchange flow
logger.debug("Token broker available for future offline access features")
# Create OAuth client for server-initiated flows (e.g., token exchange, background workers)
oauth_client = None
if enable_offline_access and refresh_token_storage and is_external_idp:
# For external IdP mode, create generic OIDC client for token operations
from nextcloud_mcp_server.auth.keycloak_oauth import KeycloakOAuthClient
mcp_server_url = os.getenv("NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000")
# Note: This redirect_uri is for OAuth client initialization, not used for actual redirects
# since this client is used for backend token operations (exchange, refresh)
redirect_uri = f"{mcp_server_url}/oauth/callback"
# Extract base URL and realm from discovery URL
# Format: http://keycloak:8080/realms/nextcloud-mcp/.well-known/openid-configuration
# → base_url: http://keycloak:8080, realm: nextcloud-mcp
if "/realms/" in discovery_url:
base_url = discovery_url.split("/realms/")[0]
realm = discovery_url.split("/realms/")[1].split("/")[0]
else:
# Fallback: use issuer to extract base URL
base_url = (
issuer.rsplit("/realms/", 1)[0] if "/realms/" in issuer else issuer
)
realm = issuer.split("/realms/")[1] if "/realms/" in issuer else ""
oauth_client = KeycloakOAuthClient(
keycloak_url=base_url,
realm=realm,
client_id=client_id,
client_secret=client_secret,
redirect_uri=redirect_uri,
)
await oauth_client.discover()
logger.info(
"✓ OIDC client initialized for token operations (token exchange, refresh)"
)
elif enable_offline_access and refresh_token_storage:
# For integrated mode, OAuth client could be added later
# For now, token refresh can use httpx directly with discovered endpoints
logger.info(
"OAuth client for token refresh not yet implemented for integrated mode"
)
# Create auth settings
mcp_server_url = os.getenv("NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000")
# Note: We don't set required_scopes here anymore.
# Scopes are now advertised via PRM endpoint and enforced per-tool.
# This allows dynamic tool filtering based on user's actual token scopes.
auth_settings = AuthSettings(
issuer_url=AnyHttpUrl(
client_issuer
), # Use client issuer (may be public override)
resource_server_url=AnyHttpUrl(mcp_server_url),
)
logger.info("OAuth configuration complete")
return (
nextcloud_host,
token_verifier,
auth_settings,
refresh_token_storage,
oauth_client,
oauth_provider,
client_id,
client_secret,
)
async def setup_oauth_config_for_multi_user_basic(
settings: Settings,
client_id: str,
client_secret: str,
) -> tuple[UnifiedTokenVerifier, RefreshTokenStorage | None, str, str]:
"""
Setup minimal OAuth configuration for multi-user BasicAuth mode.
This is a lightweight version of setup_oauth_config() that:
- Performs OIDC discovery to get endpoints
- Creates UnifiedTokenVerifier for management API token validation
- Creates RefreshTokenStorage for webhook token storage
- Skips OAuth client creation (not needed for BasicAuth background sync)
- Skips AuthSettings creation (not needed for BasicAuth MCP operations)
This enables hybrid authentication mode where:
- MCP operations use BasicAuth (stateless, simple)
- Management APIs use OAuth bearer tokens (secure, per-user)
- Background operations use OAuth refresh tokens (webhook sync)
Args:
settings: Application settings
client_id: OAuth client ID (from DCR or static config)
client_secret: OAuth client secret
Returns:
Tuple of (token_verifier, refresh_token_storage, client_id, client_secret)
Raises:
ValueError: If NEXTCLOUD_HOST is not set
httpx.HTTPError: If OIDC discovery fails
"""
nextcloud_host = settings.nextcloud_host
if not nextcloud_host:
raise ValueError("NEXTCLOUD_HOST is required for OAuth infrastructure setup")
nextcloud_host = nextcloud_host.rstrip("/")
# Get OIDC discovery URL (always Nextcloud integrated mode for multi-user BasicAuth)
discovery_url = os.getenv(
"OIDC_DISCOVERY_URL",
f"{nextcloud_host}/.well-known/openid-configuration",
)
logger.info(
f"Performing OIDC discovery for multi-user BasicAuth hybrid mode: {discovery_url}"
)
# Perform OIDC discovery
try:
async with httpx.AsyncClient(
timeout=30.0, follow_redirects=True
) as http_client:
response = await http_client.get(discovery_url)
response.raise_for_status()
discovery = response.json()
except httpx.HTTPStatusError as e:
logger.error(
f"OIDC discovery failed: HTTP {e.response.status_code} from {discovery_url}"
)
raise ValueError(
f"OIDC discovery failed: HTTP {e.response.status_code} from {discovery_url}. "
"Ensure Nextcloud OIDC (user_oidc app) is installed and configured."
) from e
except httpx.RequestError as e:
logger.error(f"OIDC discovery failed: {e}")
raise ValueError(
f"OIDC discovery failed: Cannot connect to {discovery_url}. Error: {e}"
) from e
except (KeyError, ValueError) as e:
logger.error(
f"OIDC discovery failed: Invalid response from {discovery_url}: {e}"
)
raise ValueError(
f"OIDC discovery failed: Invalid response from {discovery_url}. "
"The endpoint did not return valid OIDC configuration."
) from e
logger.info("✓ OIDC discovery successful (multi-user BasicAuth)")
# Extract OIDC endpoints from discovery
issuer = discovery["issuer"]
userinfo_uri = discovery["userinfo_endpoint"]
jwks_uri = discovery.get("jwks_uri")
introspection_uri = discovery.get("introspection_endpoint")
logger.info("OIDC endpoints configured for management API:")
logger.info(f" Issuer: {issuer}")
logger.info(f" Userinfo: {userinfo_uri}")
logger.info(f" JWKS: {jwks_uri}")
logger.info(f" Introspection: {introspection_uri}")
# Get MCP server URL for audience validation
mcp_server_url = os.getenv("NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000")
nextcloud_resource_uri = os.getenv("NEXTCLOUD_RESOURCE_URI", nextcloud_host)
# Use public issuer URL for JWT validation if set (handles Docker internal/external URL mismatch)
# Tokens are issued with the public URL, but OIDC discovery returns internal URL
public_issuer_url = os.getenv("NEXTCLOUD_PUBLIC_ISSUER_URL")
client_issuer = public_issuer_url if public_issuer_url else issuer
# Update settings with discovered values for UnifiedTokenVerifier
if not settings.oidc_client_id:
settings.oidc_client_id = client_id
if not settings.oidc_client_secret:
settings.oidc_client_secret = client_secret
if not settings.jwks_uri:
settings.jwks_uri = jwks_uri
if not settings.introspection_uri:
settings.introspection_uri = introspection_uri
if not settings.userinfo_uri:
settings.userinfo_uri = userinfo_uri
if not settings.oidc_issuer:
settings.oidc_issuer = client_issuer
if not settings.nextcloud_mcp_server_url:
settings.nextcloud_mcp_server_url = mcp_server_url
if not settings.nextcloud_resource_uri:
settings.nextcloud_resource_uri = nextcloud_resource_uri
# Create Unified Token Verifier for management API authentication
token_verifier = UnifiedTokenVerifier(settings)
logger.info("✓ Token verifier created for management API (hybrid mode)")
if introspection_uri:
logger.info(" Opaque token introspection enabled (RFC 7662)")
if jwks_uri:
logger.info(" JWT signature verification enabled (JWKS)")
# Initialize refresh token storage for background operations
refresh_token_storage = None
if settings.enable_offline_access:
try:
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
encryption_key = os.getenv("TOKEN_ENCRYPTION_KEY")
if not encryption_key:
logger.warning(
"ENABLE_OFFLINE_ACCESS=true but TOKEN_ENCRYPTION_KEY not set. "
"Refresh tokens will NOT be stored. Generate a key with:\n"
' python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"'
)
else:
refresh_token_storage = RefreshTokenStorage.from_env()
await refresh_token_storage.initialize()
logger.info(
"✓ Refresh token storage initialized for background operations (hybrid mode)"
)
except Exception as e:
logger.error(f"Failed to initialize refresh token storage: {e}")
logger.debug(f"Full traceback:\n{traceback.format_exc()}")
logger.warning(
"Continuing without refresh token storage - webhook management may be limited"
)
logger.info(
"OAuth infrastructure setup complete for multi-user BasicAuth hybrid mode"
)
return (token_verifier, refresh_token_storage, client_id, client_secret)
def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None = None):
# Initialize observability (logging will be configured by uvicorn)
settings = get_settings()
# Validate configuration and detect deployment mode
mode, config_errors = validate_configuration(settings)
if config_errors:
error_msg = (
f"Configuration validation failed for {mode.value} mode:\n"
+ "\n".join(f" - {err}" for err in config_errors)
+ "\n\n"
+ get_mode_summary(mode)
)
logger.error(error_msg)
raise ValueError(error_msg)
logger.info(f"✅ Configuration validated successfully for {mode.value} mode")
logger.debug(f"Mode details:\n{get_mode_summary(mode)}")
# Derive helper variables for backward compatibility with existing code
oauth_enabled = mode in (
AuthMode.OAUTH_SINGLE_AUDIENCE,
AuthMode.OAUTH_TOKEN_EXCHANGE,
)
deployment_mode = (
DeploymentMode.SMITHERY_STATELESS
if mode == AuthMode.SMITHERY_STATELESS
else DeploymentMode.SELF_HOSTED
)
# Log hybrid authentication status for multi-user BasicAuth with offline access
if mode == AuthMode.MULTI_USER_BASIC and settings.enable_offline_access:
logger.info(
"🔄 Hybrid authentication mode will be enabled:\n"
" - MCP operations: BasicAuth (stateless, credentials per-request)\n"
" - Management APIs: OAuth bearer tokens (secure, per-user)\n"
" - Background operations: OAuth refresh tokens (webhook sync)"
)
# Setup Prometheus metrics (always enabled by default)
if settings.metrics_enabled:
setup_metrics(port=settings.metrics_port)
logger.info(
f"Prometheus metrics enabled on dedicated port {settings.metrics_port}"
)
# Setup OpenTelemetry tracing (optional)
if settings.otel_exporter_otlp_endpoint:
setup_tracing(
service_name=settings.otel_service_name,
otlp_endpoint=settings.otel_exporter_otlp_endpoint,
otlp_verify_ssl=settings.otel_exporter_verify_ssl,
sampling_rate=settings.otel_traces_sampler_arg,
)
logger.info(
f"OpenTelemetry tracing enabled (endpoint: {settings.otel_exporter_otlp_endpoint})"
)
else:
logger.info(
"OpenTelemetry tracing disabled (set OTEL_EXPORTER_OTLP_ENDPOINT to enable)"
)
# Initialize OAuth credentials for multi-user modes that need background operations
# This must happen BEFORE uvicorn starts (same lifecycle point as OAuth modes)
# to avoid async context issues
multi_user_basic_oauth_creds: tuple[str, str] | None = None
multi_user_token_verifier: UnifiedTokenVerifier | None = None
multi_user_refresh_storage: RefreshTokenStorage | None = None
if (
mode == AuthMode.MULTI_USER_BASIC
and settings.vector_sync_enabled
and settings.enable_background_operations
):
print(
f"DEBUG: Multi-user BasicAuth mode detected, vector_sync={settings.vector_sync_enabled}, background_operations={settings.enable_background_operations}"
)
logger.info(
"Multi-user BasicAuth with vector sync - checking for OAuth/app password credentials"
)
# Check for static credentials first
static_client_id = os.getenv("NEXTCLOUD_OIDC_CLIENT_ID")
static_client_secret = os.getenv("NEXTCLOUD_OIDC_CLIENT_SECRET")
if static_client_id and static_client_secret:
print("DEBUG: Using static OAuth credentials")
logger.info("Using static OAuth credentials for background operations")
multi_user_basic_oauth_creds = (static_client_id, static_client_secret)
else:
# Perform DCR before uvicorn starts (same lifecycle as OAuth modes)
print("DEBUG: No static credentials, attempting DCR...")
logger.info(
"OAuth credentials not configured - attempting Dynamic Client Registration..."
)
async def setup_multi_user_basic_dcr():
"""Setup DCR for multi-user BasicAuth background operations."""
# Construct registration endpoint directly from nextcloud_host
# Standard RFC 7591 endpoint pattern for Nextcloud OIDC
# This avoids relying on discovery doc which may use public URLs unreachable from containers
registration_endpoint = f"{settings.nextcloud_host}/apps/oidc/register"
logger.info(
f"Attempting Dynamic Client Registration at: {registration_endpoint}"
)
# Perform DCR
try:
# Assert nextcloud_host is not None (required for multi-user mode)
assert settings.nextcloud_host is not None, (
"NEXTCLOUD_HOST is required"
)
client_id, client_secret = await load_oauth_client_credentials(
nextcloud_host=settings.nextcloud_host,
registration_endpoint=registration_endpoint,
)
logger.info(
f"✓ Dynamic Client Registration successful for background operations "
f"(client_id: {client_id[:16]}...)"
)
return (client_id, client_secret)
except Exception as e:
logger.error(f"Dynamic Client Registration failed: {e}")
logger.debug(f"Full traceback:\n{traceback.format_exc()}")
logger.warning("Background vector sync will be disabled.")
return None
# Run DCR synchronously before uvicorn starts
multi_user_basic_oauth_creds = anyio.run(setup_multi_user_basic_dcr)
# Setup OAuth infrastructure for management APIs and background operations
# This creates the UnifiedTokenVerifier needed by management.py and
# RefreshTokenStorage for webhook token persistence
if multi_user_basic_oauth_creds:
sync_client_id, sync_client_secret = multi_user_basic_oauth_creds
logger.info(
"Setting up OAuth infrastructure for management APIs (hybrid mode)..."
)
try:
(
multi_user_token_verifier,
multi_user_refresh_storage,
_,
_,
) = anyio.run(
setup_oauth_config_for_multi_user_basic,
settings,
sync_client_id,
sync_client_secret,
)
logger.info(
"✓ OAuth infrastructure setup complete for multi-user BasicAuth hybrid mode"
)
except (httpx.HTTPError, ValueError, KeyError) as e:
# Expected errors during OAuth infrastructure setup:
# - httpx.HTTPError: Network issues, OIDC discovery failures
# - ValueError: Missing required configuration (NEXTCLOUD_HOST)
# - KeyError: Missing required fields in OIDC discovery response
logger.error(f"Failed to setup OAuth infrastructure: {e}")
logger.debug(f"Full traceback:\n{traceback.format_exc()}")
logger.warning(
"Management API will be unavailable. "
"Webhook management from Astrolabe admin UI will not work."
)
# Set to None to indicate failure
multi_user_token_verifier = None
multi_user_refresh_storage = None
except Exception as e:
# Unexpected error - this is a programming error, re-raise it
logger.error(
f"Unexpected error during OAuth infrastructure setup: {e}. "
"This is likely a programming error that should be fixed."
)
raise
# Create MCP server based on detected mode
if mode in (AuthMode.OAUTH_SINGLE_AUDIENCE, AuthMode.OAUTH_TOKEN_EXCHANGE):
logger.info("Configuring MCP server for OAuth mode")
# Asynchronously get the OAuth configuration
(
nextcloud_host,
token_verifier,
auth_settings,
refresh_token_storage,
oauth_client,
oauth_provider,
client_id,
client_secret,
) = anyio.run(setup_oauth_config)
# Create lifespan function with captured OAuth context (closure)
@asynccontextmanager
async def oauth_lifespan(server: FastMCP) -> AsyncIterator[OAuthAppContext]:
"""
Lifespan context for OAuth mode - captures OAuth configuration from outer scope.
"""
logger.info("Starting MCP server in OAuth mode")
logger.info(f"Using OAuth provider: {oauth_provider}")
if refresh_token_storage:
logger.info("Refresh token storage is available")
if oauth_client:
logger.info("OAuth client is available for token refresh")
# Initialize document processors
initialize_document_processors()
try:
yield OAuthAppContext(
nextcloud_host=nextcloud_host,
token_verifier=token_verifier,
refresh_token_storage=refresh_token_storage,
oauth_client=oauth_client,
oauth_provider=oauth_provider,
server_client_id=client_id,
)
finally:
logger.info("Shutting down MCP server")
# RefreshTokenStorage uses context managers, no close() needed
# OAuth client cleanup (if it has a close method)
if oauth_client and hasattr(oauth_client, "close"):
try:
await oauth_client.close()
except Exception as e:
logger.warning(f"Error closing OAuth client: {e}")
logger.info("MCP server shutdown complete")
mcp = FastMCP(
"Nextcloud MCP",
lifespan=oauth_lifespan,
token_verifier=token_verifier,
auth=auth_settings,
# Disable DNS rebinding protection for containerized deployments (k8s, Docker)
# MCP 1.23+ auto-enables this for localhost, breaking k8s service DNS names
transport_security=TransportSecuritySettings(
enable_dns_rebinding_protection=False
),
)
elif mode == AuthMode.SMITHERY_STATELESS:
logger.info("Configuring MCP server for Smithery stateless mode")
# json_response=True returns plain JSON-RPC instead of SSE format,
# required for Smithery scanner compatibility
mcp = FastMCP(
"Nextcloud MCP",
lifespan=app_lifespan_smithery,
json_response=True,
# Disable DNS rebinding protection for containerized deployments (k8s, Docker)
# MCP 1.23+ auto-enables this for localhost, breaking k8s service DNS names
transport_security=TransportSecuritySettings(
enable_dns_rebinding_protection=False
),
)
else:
# BasicAuth modes (single-user or multi-user)
logger.info(f"Configuring MCP server for {mode.value} mode")
mcp = FastMCP(
"Nextcloud MCP",
lifespan=app_lifespan_basic,
# Disable DNS rebinding protection for containerized deployments (k8s, Docker)
# MCP 1.23+ auto-enables this for localhost, breaking k8s service DNS names
transport_security=TransportSecuritySettings(
enable_dns_rebinding_protection=False
),
)
@mcp.resource("nc://capabilities")
async def nc_get_capabilities():
"""Get the Nextcloud Host capabilities"""
ctx: Context = mcp.get_context()
client = await get_nextcloud_client(ctx)
return await client.capabilities()
# Define available apps and their configuration functions
available_apps = {
"notes": configure_notes_tools,
"tables": configure_tables_tools,
"webdav": configure_webdav_tools,
"sharing": configure_sharing_tools,
"calendar": configure_calendar_tools,
"contacts": configure_contacts_tools,
"cookbook": configure_cookbook_tools,
"deck": configure_deck_tools,
"news": configure_news_tools,
}
# If no specific apps are specified, enable all
if enabled_apps is None:
enabled_apps = list(available_apps.keys())
# Configure only the enabled apps
for app_name in enabled_apps:
if app_name in available_apps:
logger.info(f"Configuring {app_name} tools")
available_apps[app_name](mcp)
else:
logger.warning(
f"Unknown app: {app_name}. Available apps: {list(available_apps.keys())}"
)
# Register semantic search tools (cross-app feature)
# ADR-016: Skip in Smithery stateless mode (no vector database)
if deployment_mode == DeploymentMode.SMITHERY_STATELESS:
logger.info("Skipping semantic search tools (Smithery stateless mode)")
elif settings.vector_sync_enabled:
logger.info("Configuring semantic search tools (vector sync enabled)")
configure_semantic_tools(mcp)
else:
logger.info("Skipping semantic search tools (VECTOR_SYNC_ENABLED not set)")
# Register OAuth provisioning tools (only when offline access is enabled)
# With token exchange enabled (external IdP), provisioning is not needed for MCP operations
enable_token_exchange = (
os.getenv("ENABLE_TOKEN_EXCHANGE", "false").lower() == "true"
)
# Use settings.enable_offline_access which handles both ENABLE_BACKGROUND_OPERATIONS (new)
# and ENABLE_OFFLINE_ACCESS (deprecated) environment variables
enable_offline_access_for_tools = settings.enable_offline_access
if oauth_enabled and enable_offline_access_for_tools and not enable_token_exchange:
logger.info("Registering OAuth provisioning tools for offline access")
register_oauth_tools(mcp)
elif oauth_enabled and enable_token_exchange:
logger.info("Skipping provisioning tools registration (token exchange enabled)")
elif oauth_enabled and not enable_offline_access_for_tools:
logger.info(
"Skipping provisioning tools registration (offline access not enabled)"
)
# Override list_tools to filter based on user's token scopes (OAuth mode only)
if oauth_enabled:
original_list_tools = mcp._tool_manager.list_tools
def list_tools_filtered():
"""List tools filtered by user's token scopes (JWT and Bearer tokens)."""
# Get user's scopes from token using MCP SDK's contextvar
# This works for all request types including list_tools
user_scopes = get_access_token_scopes()
is_jwt = is_jwt_token()
logger.info(
f"🔍 list_tools called - Token type: {'JWT' if is_jwt else 'opaque/none'}, "
f"User scopes: {user_scopes}"
)
# Get all tools
all_tools = original_list_tools()
# Filter tools based on user's token scopes (both JWT and opaque tokens)
# JWT tokens have scopes embedded in payload
# Opaque tokens get scopes via introspection endpoint
# Claude Code now properly respects PRM endpoint for scope discovery
if user_scopes:
allowed_tools = [
tool
for tool in all_tools
if has_required_scopes(tool.fn, user_scopes)
]
token_type = "JWT" if is_jwt else "Bearer"
logger.info(
f"✂️ {token_type} scope filtering: {len(allowed_tools)}/{len(all_tools)} tools "
f"available for scopes: {user_scopes}"
)
else:
# BasicAuth mode or no token - show all tools
allowed_tools = all_tools
logger.info(
f"📋 Showing all {len(all_tools)} tools (no token/BasicAuth)"
)
# Return the Tool objects directly (they're already in the correct format)
return allowed_tools
# Replace the tool manager's list_tools method
mcp._tool_manager.list_tools = list_tools_filtered # type: ignore[method-assign]
logger.info(
"Dynamic tool filtering enabled for OAuth mode (JWT and Bearer tokens)"
)
mcp_app = mcp.streamable_http_app()
@asynccontextmanager
async def starlette_lifespan(app: Starlette):
# Set OAuth context for OAuth login routes (ADR-004)
if oauth_enabled:
# Prepare OAuth config from setup_oauth_config closure variables
# Get nextcloud_host from settings (it was validated as required)
nextcloud_host_for_context = settings.nextcloud_host
if not nextcloud_host_for_context:
raise ValueError("NEXTCLOUD_HOST is required for OAuth mode")
mcp_server_url = os.getenv(
"NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000"
)
nextcloud_resource_uri = os.getenv(
"NEXTCLOUD_RESOURCE_URI", nextcloud_host_for_context
)
discovery_url = os.getenv(
"OIDC_DISCOVERY_URL",
f"{nextcloud_host_for_context}/.well-known/openid-configuration",
)
scopes = os.getenv("NEXTCLOUD_OIDC_SCOPES", "")
oauth_context_dict = {
"storage": refresh_token_storage,
"oauth_client": oauth_client,
"token_verifier": token_verifier, # For querying IdP userinfo endpoint
"config": {
"mcp_server_url": mcp_server_url,
"discovery_url": discovery_url,
"client_id": client_id, # From setup_oauth_config (DCR or static)
"client_secret": client_secret, # From setup_oauth_config (DCR or static)
"scopes": scopes,
"nextcloud_host": nextcloud_host_for_context,
"nextcloud_resource_uri": nextcloud_resource_uri,
"oauth_provider": oauth_provider,
},
}
app.state.oauth_context = oauth_context_dict
# Also set oauth_context on browser_app for session authentication
# browser_app is in the same function scope (defined later in create_app)
# We need to find it in the mounted routes
for route in app.routes:
if isinstance(route, Mount) and route.path == "/app":
browser_app = cast(Starlette, route.app)
browser_app.state.oauth_context = oauth_context_dict
logger.info(
"OAuth context shared with browser_app for session auth"
)
break
logger.info(
f"OAuth context initialized for login routes (client_id={client_id[:16]}...)"
)
else:
# BasicAuth mode - initialize storage for webhook management
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
basic_auth_storage = RefreshTokenStorage.from_env()
await basic_auth_storage.initialize()
logger.info("Initialized refresh token storage for webhook management")
app.state.storage = basic_auth_storage
# For multi-user BasicAuth with offline access, create oauth_context for management APIs
# This allows Astrolabe to use management APIs with OAuth bearer tokens
if settings.enable_multi_user_basic_auth and settings.enable_offline_access:
# Check if we have OAuth credentials AND infrastructure from setup
if (
multi_user_basic_oauth_creds
and multi_user_token_verifier is not None
):
sync_client_id, sync_client_secret = multi_user_basic_oauth_creds
# Create oauth_context for management API authentication
nextcloud_host_for_context = settings.nextcloud_host
mcp_server_url = os.getenv(
"NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000"
)
discovery_url = os.getenv(
"OIDC_DISCOVERY_URL",
f"{nextcloud_host_for_context}/.well-known/openid-configuration",
)
oauth_context_dict = {
# Use OAuth refresh token storage if available, fallback to basic_auth_storage
"storage": multi_user_refresh_storage or basic_auth_storage,
"oauth_client": None, # Not needed for management APIs
"token_verifier": multi_user_token_verifier, # FIXED: Now has real verifier!
"config": {
"mcp_server_url": mcp_server_url,
"discovery_url": discovery_url,
"client_id": sync_client_id,
"client_secret": sync_client_secret,
"scopes": "", # Background sync only
"nextcloud_host": nextcloud_host_for_context,
"nextcloud_resource_uri": nextcloud_host_for_context,
"oauth_provider": "nextcloud", # Always Nextcloud for multi-user BasicAuth
},
}
app.state.oauth_context = oauth_context_dict
logger.info(
f"✓ OAuth context initialized for management APIs (hybrid mode, client_id={sync_client_id[:16]}...)"
)
elif multi_user_basic_oauth_creds and multi_user_token_verifier is None:
logger.warning(
"OAuth infrastructure setup failed - management API will be unavailable. "
"This is expected if OIDC discovery failed or token verifier creation failed. "
"Webhook management from Astrolabe admin UI will not work."
)
else:
logger.warning(
"OAuth credentials not available - management API will be unavailable. "
"This is expected if DCR failed or static credentials were not provided. "
"Webhook management from Astrolabe admin UI will not work."
)
# Also share with browser_app for webhook routes
for route in app.routes:
if isinstance(route, Mount) and route.path == "/app":
browser_app = cast(Starlette, route.app)
browser_app.state.storage = basic_auth_storage
if (
settings.enable_multi_user_basic_auth
and settings.enable_offline_access
and hasattr(app.state, "oauth_context")
):
browser_app.state.oauth_context = app.state.oauth_context
logger.info(
"OAuth context shared with browser_app for management APIs"
)
logger.info(
"Storage shared with browser_app for webhook management"
)
break
# Start background vector sync tasks (ADR-007)
# Scanner runs at server-level (once), not per-session
# Re-use settings from outer scope (already validated)
# Note: enable_offline_access_for_sync, encryption_key, and refresh_token_storage
# are already defined in outer scope before mode split
# Multi-user BasicAuth uses OAuth-style background sync (with app passwords)
# So skip single-user BasicAuth vector sync if in multi-user mode
if (
settings.vector_sync_enabled
and not oauth_enabled
and not settings.enable_multi_user_basic_auth
):
# BasicAuth mode - single user sync
logger.info("Starting background vector sync tasks for BasicAuth mode")
# Get username from environment
username = os.getenv("NEXTCLOUD_USERNAME")
if not username:
raise ValueError(
"NEXTCLOUD_USERNAME required for vector sync in BasicAuth mode"
)
# Create client for vector sync (server-level, not per-session)
client = NextcloudClient.from_env()
# Initialize Qdrant collection before starting background tasks
logger.info("Initializing Qdrant collection...")
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
try:
await get_qdrant_client() # Triggers collection creation if needed
logger.info("Qdrant collection ready")
except Exception as e:
logger.error(f"Failed to initialize Qdrant collection: {e}")
raise RuntimeError(
f"Cannot start vector sync - Qdrant initialization failed: {e}"
) from e
# Initialize shared state
send_stream, receive_stream = anyio.create_memory_object_stream(
max_buffer_size=settings.vector_sync_queue_max_size
)
shutdown_event = anyio.Event()
scanner_wake_event = anyio.Event()
# Store in app state for access from routes (ADR-007)
app.state.document_send_stream = send_stream
app.state.document_receive_stream = receive_stream
app.state.shutdown_event = shutdown_event
app.state.scanner_wake_event = scanner_wake_event
# Also store in module singleton for FastMCP session lifespans
_vector_sync_state.document_send_stream = send_stream
_vector_sync_state.document_receive_stream = receive_stream
_vector_sync_state.shutdown_event = shutdown_event
_vector_sync_state.scanner_wake_event = scanner_wake_event
logger.info("Vector sync state stored in module singleton")
# Also share with browser_app for /app route
for route in app.routes:
if isinstance(route, Mount) and route.path == "/app":
browser_app = cast(Starlette, route.app)
browser_app.state.document_send_stream = send_stream
browser_app.state.document_receive_stream = receive_stream
browser_app.state.shutdown_event = shutdown_event
browser_app.state.scanner_wake_event = scanner_wake_event
logger.info("Vector sync state shared with browser_app for /app")
break
# Start background tasks using anyio TaskGroup
async with anyio.create_task_group() as tg:
# Start scanner task
await tg.start(
scanner_task,
send_stream,
shutdown_event,
scanner_wake_event,
client,
username,
)
# Start processor pool (each gets a cloned receive stream)
for i in range(settings.vector_sync_processor_workers):
await tg.start(
processor_task,
i,
receive_stream.clone(),
shutdown_event,
client,
username,
)
logger.info(
f"Background sync tasks started: 1 scanner + "
f"{settings.vector_sync_processor_workers} processors"
)
# Run MCP session manager and yield
async with AsyncExitStack() as stack:
await stack.enter_async_context(mcp.session_manager.run())
try:
yield
finally:
# Shutdown signal
logger.info("Shutting down background sync tasks")
shutdown_event.set()
await client.close()
# TaskGroup automatically cancels all tasks on exit
elif (
settings.vector_sync_enabled
and (oauth_enabled or settings.enable_multi_user_basic_auth)
and settings.enable_background_operations
):
# OAuth mode with background operations - multi-user sync
# Also used for multi-user BasicAuth mode (client auth is BasicAuth, background sync uses app passwords or OAuth)
mode_desc = "OAuth mode" if oauth_enabled else "Multi-user BasicAuth mode"
logger.info(f"Starting background vector sync tasks for {mode_desc}")
from nextcloud_mcp_server.auth.token_broker import TokenBrokerService
from nextcloud_mcp_server.vector.oauth_sync import (
oauth_processor_task,
user_manager_task,
)
# Get nextcloud_host (from settings - already validated)
nextcloud_host_for_sync = settings.nextcloud_host
if not nextcloud_host_for_sync:
raise ValueError("NEXTCLOUD_HOST required for vector sync")
# Get OIDC discovery URL (same as used for OAuth setup)
discovery_url = os.getenv(
"OIDC_DISCOVERY_URL",
f"{nextcloud_host_for_sync}/.well-known/openid-configuration",
)
# Get client credentials - these were obtained before uvicorn started
# For OAuth modes: from setup_oauth_config()
# For multi-user BasicAuth: from setup_multi_user_basic_dcr()
oauth_ctx = getattr(app.state, "oauth_context", {})
oauth_config = oauth_ctx.get("config", {})
sync_client_id = oauth_config.get("client_id")
sync_client_secret = oauth_config.get("client_secret")
# For multi-user BasicAuth mode, use pre-obtained credentials from outer scope
if not sync_client_id or not sync_client_secret:
if multi_user_basic_oauth_creds:
sync_client_id, sync_client_secret = multi_user_basic_oauth_creds
logger.info(
"Using pre-obtained OAuth credentials for background sync"
)
else:
# No credentials available - DCR was attempted before uvicorn started but failed
sync_client_id = None
sync_client_secret = None
logger.warning(
"OAuth credentials not available for background sync "
"(DCR was attempted during startup but failed)"
)
# Only start vector sync if credentials are available
if sync_client_id and sync_client_secret:
# Get storage - different for OAuth vs multi-user BasicAuth modes
# OAuth mode: refresh_token_storage (from setup_oauth_config)
# Multi-user BasicAuth: app.state.storage (basic_auth_storage)
token_storage = (
refresh_token_storage if oauth_enabled else app.state.storage
)
# Create token broker for background operations
# Note: storage handles encryption internally, no key needed here
# Client credentials are needed for token refresh operations
token_broker = TokenBrokerService(
storage=token_storage,
oidc_discovery_url=discovery_url,
nextcloud_host=nextcloud_host_for_sync,
client_id=sync_client_id,
client_secret=sync_client_secret,
)
# Store token broker in oauth_context for management API (revoke endpoint)
if hasattr(app.state, "oauth_context"):
app.state.oauth_context["token_broker"] = token_broker
logger.info(
"Token broker added to oauth_context for management API"
)
# Initialize Qdrant collection before starting background tasks
logger.info("Initializing Qdrant collection...")
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
try:
await get_qdrant_client() # Triggers collection creation if needed
logger.info("Qdrant collection ready")
except Exception as e:
logger.error(f"Failed to initialize Qdrant collection: {e}")
raise RuntimeError(
f"Cannot start vector sync - Qdrant initialization failed: {e}"
) from e
# Initialize shared state
send_stream, receive_stream = anyio.create_memory_object_stream(
max_buffer_size=settings.vector_sync_queue_max_size
)
shutdown_event = anyio.Event()
scanner_wake_event = anyio.Event()
# User state tracking for user manager
user_states: dict = {}
# Store in app state for access from routes (ADR-007)
app.state.document_send_stream = send_stream
app.state.document_receive_stream = receive_stream
app.state.shutdown_event = shutdown_event
app.state.scanner_wake_event = scanner_wake_event
# Also store in module singleton for FastMCP session lifespans
_vector_sync_state.document_send_stream = send_stream
_vector_sync_state.document_receive_stream = receive_stream
_vector_sync_state.shutdown_event = shutdown_event
_vector_sync_state.scanner_wake_event = scanner_wake_event
logger.info("Vector sync state stored in module singleton")
# Also share with browser_app for /app route
for route in app.routes:
if isinstance(route, Mount) and route.path == "/app":
browser_app = cast(Starlette, route.app)
browser_app.state.document_send_stream = send_stream
browser_app.state.document_receive_stream = receive_stream
browser_app.state.shutdown_event = shutdown_event
browser_app.state.scanner_wake_event = scanner_wake_event
logger.info(
"Vector sync state shared with browser_app for /app"
)
break
# Determine authentication mode for background sync
# Multi-user BasicAuth: use app passwords via Astrolabe (NOT OAuth)
# OAuth mode: use OAuth refresh tokens (NOT app passwords)
use_basic_auth = not oauth_enabled
# Start background tasks using anyio TaskGroup
async with anyio.create_task_group() as tg:
# Start user manager task (supervises per-user scanners)
await tg.start(
user_manager_task,
send_stream,
shutdown_event,
scanner_wake_event,
token_broker if not use_basic_auth else None,
token_storage, # Use token_storage (works for both OAuth and multi-user BasicAuth)
nextcloud_host_for_sync,
user_states,
tg,
use_basic_auth, # Pass as positional arg (before task_status)
)
# Start processor pool (each gets a cloned receive stream)
for i in range(settings.vector_sync_processor_workers):
await tg.start(
oauth_processor_task,
i,
receive_stream.clone(),
shutdown_event,
token_broker if not use_basic_auth else None,
nextcloud_host_for_sync,
use_basic_auth, # Pass as positional arg (before task_status)
)
logger.info(
f"Background sync tasks started: 1 user manager + "
f"{settings.vector_sync_processor_workers} processors"
)
# Run MCP session manager and yield
async with AsyncExitStack() as stack:
await stack.enter_async_context(mcp.session_manager.run())
try:
yield
finally:
# Shutdown signal
logger.info("Shutting down background sync tasks")
shutdown_event.set()
# Close token broker HTTP client
if token_broker._http_client:
await token_broker._http_client.aclose()
# TaskGroup automatically cancels all tasks on exit
else:
# No OAuth credentials available for background sync
logger.warning(
"Skipping background vector sync - OAuth credentials not available. "
"Multi-user BasicAuth mode will run without semantic search background operations. "
"To enable, set NEXTCLOUD_OIDC_CLIENT_ID and NEXTCLOUD_OIDC_CLIENT_SECRET."
)
# Just run MCP session manager without vector sync
async with AsyncExitStack() as stack:
await stack.enter_async_context(mcp.session_manager.run())
yield
else:
# No vector sync - just run MCP session manager
if settings.vector_sync_enabled:
# Log why vector sync is not starting
if oauth_enabled and not settings.enable_offline_access:
logger.warning(
"Vector sync enabled but ENABLE_OFFLINE_ACCESS=false - "
"vector sync requires offline access in OAuth mode"
)
elif oauth_enabled and not refresh_token_storage:
logger.warning(
"Vector sync enabled but refresh token storage not available"
)
elif oauth_enabled and not os.getenv("TOKEN_ENCRYPTION_KEY"):
logger.warning(
"Vector sync enabled but TOKEN_ENCRYPTION_KEY not set"
)
async with AsyncExitStack() as stack:
await stack.enter_async_context(mcp.session_manager.run())
yield
# Health check endpoints for Kubernetes probes
def health_live(request):
"""Liveness probe endpoint.
Returns 200 OK if the application process is running.
This is a simple check that doesn't verify external dependencies.
"""
return JSONResponse(
{
"status": "alive",
"mode": "oauth" if oauth_enabled else "basic",
}
)
async def health_ready(request):
"""Readiness probe endpoint.
Returns 200 OK if the application is ready to serve traffic.
Checks that required configuration is present and Qdrant if vector sync enabled.
"""
checks = {}
is_ready = True
# Check Nextcloud host configuration and connectivity
nextcloud_host = os.getenv("NEXTCLOUD_HOST")
if nextcloud_host:
checks["nextcloud_configured"] = "ok"
# Try to connect to Nextcloud
start_time = time.time()
try:
async with httpx.AsyncClient(timeout=2.0) as client:
response = await client.get(f"{nextcloud_host}/status.php")
duration = time.time() - start_time
if response.status_code == 200:
checks["nextcloud_reachable"] = "ok"
set_dependency_health("nextcloud", True)
else:
checks["nextcloud_reachable"] = (
f"error: status {response.status_code}"
)
set_dependency_health("nextcloud", False)
is_ready = False
record_dependency_check("nextcloud", duration)
except Exception as e:
duration = time.time() - start_time
checks["nextcloud_reachable"] = f"error: {str(e)}"
set_dependency_health("nextcloud", False)
record_dependency_check("nextcloud", duration)
is_ready = False
else:
checks["nextcloud_configured"] = "error: NEXTCLOUD_HOST not set"
set_dependency_health("nextcloud", False)
is_ready = False
# Check authentication configuration
# Report the deployment mode, not just whether OAuth is enabled
# This helps clients (like Astrolabe) determine which auth flow to use
if (
mode == AuthMode.OAUTH_SINGLE_AUDIENCE
or mode == AuthMode.OAUTH_TOKEN_EXCHANGE
):
checks["auth_mode"] = "oauth"
checks["auth_configured"] = "ok"
elif mode == AuthMode.MULTI_USER_BASIC:
checks["auth_mode"] = "multi_user_basic"
checks["auth_configured"] = "ok"
# Indicate if app passwords are supported (when offline_access enabled)
checks["supports_app_passwords"] = settings.enable_offline_access
elif mode == AuthMode.SINGLE_USER_BASIC:
username = os.getenv("NEXTCLOUD_USERNAME")
password = os.getenv("NEXTCLOUD_PASSWORD")
if username and password:
checks["auth_mode"] = "basic"
checks["auth_configured"] = "ok"
else:
checks["auth_mode"] = "basic"
checks["auth_configured"] = "error: credentials not set"
is_ready = False
elif mode == AuthMode.SMITHERY_STATELESS:
checks["auth_mode"] = "smithery"
checks["auth_configured"] = "ok"
# Check Qdrant status if using network mode (external Qdrant service)
# In-memory and persistent modes use embedded Qdrant, no external service to check
vector_sync_enabled = (
os.getenv("VECTOR_SYNC_ENABLED", "false").lower() == "true"
)
qdrant_url = os.getenv("QDRANT_URL") # Only set in network mode
if vector_sync_enabled and qdrant_url:
start_time = time.time()
try:
async with httpx.AsyncClient(timeout=2.0) as client:
response = await client.get(f"{qdrant_url}/readyz")
duration = time.time() - start_time
if response.status_code == 200:
checks["qdrant"] = "ok"
set_dependency_health("qdrant", True)
else:
checks["qdrant"] = f"error: status {response.status_code}"
set_dependency_health("qdrant", False)
is_ready = False
record_dependency_check("qdrant", duration)
except Exception as e:
duration = time.time() - start_time
checks["qdrant"] = f"error: {str(e)}"
set_dependency_health("qdrant", False)
record_dependency_check("qdrant", duration)
is_ready = False
elif vector_sync_enabled:
# Using embedded Qdrant (memory or persistent mode)
checks["qdrant"] = "embedded"
set_dependency_health("qdrant", True)
status_code = 200 if is_ready else 503
return JSONResponse(
{
"status": "ready" if is_ready else "not_ready",
"checks": checks,
},
status_code=status_code,
)
async def handle_nextcloud_webhook(request):
"""Test webhook endpoint to capture and log Nextcloud webhook payloads.
This is a temporary endpoint for testing webhook schemas and payloads.
It logs the full payload and returns 200 OK immediately.
"""
try:
payload = await request.json()
logger.info("=" * 80)
logger.info("🔔 Webhook received from Nextcloud:")
logger.info(json.dumps(payload, indent=2, sort_keys=True))
logger.info("=" * 80)
return JSONResponse(
{"status": "received", "timestamp": payload.get("time")},
status_code=200,
)
except Exception as e:
logger.error(f"❌ Failed to parse webhook payload: {e}")
return JSONResponse(
{"error": "invalid_payload", "message": str(e)}, status_code=400
)
# Add Protected Resource Metadata (PRM) endpoint for OAuth mode
routes = []
# Add health check routes (available in both OAuth and BasicAuth modes)
routes.append(Route("/health/live", health_live, methods=["GET"]))
routes.append(Route("/health/ready", health_ready, methods=["GET"]))
logger.info("Health check endpoints enabled: /health/live, /health/ready")
# Add test webhook endpoint (for development/testing)
routes.append(
Route("/webhooks/nextcloud", handle_nextcloud_webhook, methods=["POST"])
)
logger.info("Test webhook endpoint enabled: /webhooks/nextcloud")
# Add management API endpoints for Nextcloud PHP app
# Available in: OAuth modes OR multi-user BasicAuth with offline access (for Astrolabe integration)
enable_management_apis = oauth_enabled or (
settings.enable_multi_user_basic_auth and settings.enable_offline_access
)
if enable_management_apis:
from nextcloud_mcp_server.api.management import (
create_webhook,
delete_webhook,
get_chunk_context,
get_installed_apps,
get_server_status,
get_user_session,
get_vector_sync_status,
list_webhooks,
revoke_user_access,
unified_search,
vector_search,
)
routes.append(Route("/api/v1/status", get_server_status, methods=["GET"]))
routes.append(
Route(
"/api/v1/vector-sync/status",
get_vector_sync_status,
methods=["GET"],
)
)
routes.append(
Route(
"/api/v1/users/{user_id}/session",
get_user_session,
methods=["GET"],
)
)
routes.append(
Route(
"/api/v1/users/{user_id}/revoke",
revoke_user_access,
methods=["POST"],
)
)
routes.append(
Route("/api/v1/vector-viz/search", vector_search, methods=["POST"])
)
routes.append(
Route("/api/v1/chunk-context", get_chunk_context, methods=["GET"])
)
# ADR-018: Unified search endpoint for Nextcloud PHP app integration
routes.append(Route("/api/v1/search", unified_search, methods=["POST"]))
routes.append(Route("/api/v1/apps", get_installed_apps, methods=["GET"]))
# Webhook management endpoints
routes.append(Route("/api/v1/webhooks", list_webhooks, methods=["GET"]))
routes.append(Route("/api/v1/webhooks", create_webhook, methods=["POST"]))
routes.append(
Route("/api/v1/webhooks/{webhook_id}", delete_webhook, methods=["DELETE"])
)
logger.info(
"Management API endpoints enabled: /api/v1/status, /api/v1/vector-sync/status, "
"/api/v1/users/{user_id}/session, /api/v1/users/{user_id}/revoke, "
"/api/v1/vector-viz/search, /api/v1/search, /api/v1/apps, "
"/api/v1/webhooks"
)
# ADR-016: Add Smithery well-known config endpoint for container runtime discovery
if deployment_mode == DeploymentMode.SMITHERY_STATELESS:
def smithery_mcp_config(request):
"""Smithery MCP configuration endpoint.
Returns JSON Schema for Smithery's configuration UI.
This endpoint is required for Smithery container runtime discovery.
"""
return JSONResponse(SMITHERY_CONFIG_SCHEMA)
routes.append(
Route(
"/.well-known/mcp-config",
smithery_mcp_config,
methods=["GET"],
)
)
logger.info("Smithery config endpoint enabled: /.well-known/mcp-config")
# Note: Metrics endpoint is NOT exposed on main HTTP port for security reasons.
# Metrics are served on dedicated port via setup_metrics() (default: 9090)
# Determine if OAuth provisioning is available
# This is true for:
# 1. OAuth modes (primary auth method for MCP operations)
# 2. Multi-user BasicAuth with offline access (hybrid mode)
oauth_provisioning_available = oauth_enabled or (
mode == AuthMode.MULTI_USER_BASIC
and settings.enable_offline_access
and multi_user_token_verifier is not None # Ensure OAuth setup succeeded
)
if oauth_provisioning_available:
logger.info(
f"OAuth provisioning routes enabled for mode: {mode.value} "
f"(oauth_enabled={oauth_enabled}, hybrid_mode={not oauth_enabled})"
)
# Import OAuth routes (ADR-004 Progressive Consent)
from nextcloud_mcp_server.auth.oauth_routes import oauth_authorize
def oauth_protected_resource_metadata(request):
"""RFC 9728 Protected Resource Metadata endpoint.
Dynamically discovers supported scopes from registered MCP tools.
This ensures the advertised scopes always match the actual tool requirements.
The 'resource' field is set to the MCP server's public URL (RFC 9728 requires a URL).
This is used as the audience in access tokens via the resource parameter (RFC 8707).
The introspection controller matches this URL to the MCP server's client via resource_url field.
"""
# Use PUBLIC_ISSUER_URL for authorization server since external clients
# (like Claude) need the publicly accessible URL, not internal Docker URLs
public_issuer_url = os.getenv("NEXTCLOUD_PUBLIC_ISSUER_URL")
if not public_issuer_url:
# Fallback to NEXTCLOUD_HOST if PUBLIC_ISSUER_URL not set
public_issuer_url = os.getenv("NEXTCLOUD_HOST", "")
# RFC 9728 requires resource to be a URL (not a client ID)
# Use the MCP server's public URL
mcp_server_url = os.getenv("NEXTCLOUD_MCP_SERVER_URL")
if not mcp_server_url:
# Fallback to constructing from host and port
mcp_server_url = f"http://localhost:{os.getenv('PORT', '8000')}"
# Dynamically discover all scopes from registered tools
# This provides a single source of truth based on @require_scopes decorators
supported_scopes = discover_all_scopes(mcp)
return JSONResponse(
{
"resource": f"{mcp_server_url}/mcp", # RFC 9728: must be a URL
"scopes_supported": supported_scopes,
"authorization_servers": [public_issuer_url],
"bearer_methods_supported": ["header"],
"resource_signing_alg_values_supported": ["RS256"],
}
)
# Register PRM endpoint at both path-based and root locations per RFC 9728
# Path-based discovery: /.well-known/oauth-protected-resource{path}
routes.append(
Route(
"/.well-known/oauth-protected-resource/mcp",
oauth_protected_resource_metadata,
methods=["GET"],
)
)
# Root discovery (fallback): /.well-known/oauth-protected-resource
routes.append(
Route(
"/.well-known/oauth-protected-resource",
oauth_protected_resource_metadata,
methods=["GET"],
)
)
logger.info(
"Protected Resource Metadata (PRM) endpoints enabled (path-based + root)"
)
# Add unified OAuth callback endpoint supporting both flows
from nextcloud_mcp_server.auth.oauth_routes import (
oauth_authorize_nextcloud,
oauth_callback,
oauth_callback_nextcloud,
)
routes.append(Route("/oauth/callback", oauth_callback, methods=["GET"]))
logger.info(
"OAuth unified callback enabled: /oauth/callback?flow={browser|provisioning}"
)
# Add OAuth resource provisioning routes (ADR-004 Progressive Consent Flow 2)
routes.append(
Route(
"/oauth/authorize-nextcloud",
oauth_authorize_nextcloud,
methods=["GET"],
)
)
# Keep old callback endpoint as backwards-compatible alias
routes.append(
Route(
"/oauth/callback-nextcloud",
oauth_callback_nextcloud,
methods=["GET"],
)
)
logger.info(
"OAuth resource provisioning routes enabled: /oauth/authorize-nextcloud, /oauth/callback-nextcloud (Flow 2)"
)
# Add OAuth Flow 1 routes (MCP client login) - ONLY for OAuth modes
# Multi-user BasicAuth uses hybrid mode with only Flow 2 (resource provisioning)
if oauth_enabled:
from nextcloud_mcp_server.auth.oauth_routes import oauth_authorize
routes.append(Route("/oauth/authorize", oauth_authorize, methods=["GET"]))
logger.info("OAuth login routes enabled: /oauth/authorize (Flow 1)")
# Add browser OAuth login routes for Management API access
# Available in OAuth modes AND multi-user BasicAuth with offline access
# (hybrid mode). Separate from MCP tool auth - Management API uses OAuth
if oauth_provisioning_available:
from nextcloud_mcp_server.auth.browser_oauth_routes import (
oauth_login,
oauth_login_callback,
oauth_logout,
)
routes.append(
Route("/oauth/login", oauth_login, methods=["GET"], name="oauth_login")
)
# Keep old callback endpoint as backwards-compatible alias
routes.append(
Route(
"/oauth/login-callback",
oauth_login_callback,
methods=["GET"],
name="oauth_login_callback",
)
)
routes.append(
Route("/oauth/logout", oauth_logout, methods=["GET"], name="oauth_logout")
)
logger.info(
"Browser OAuth routes enabled: /oauth/login, /oauth/login-callback (legacy), /oauth/logout"
)
# Add user info routes (available in both BasicAuth and OAuth modes)
# ADR-016: Skip /app admin UI in Smithery stateless mode (no vector sync, webhooks)
if deployment_mode != DeploymentMode.SMITHERY_STATELESS:
# These require session authentication, so we wrap them in a separate app
from nextcloud_mcp_server.auth.session_backend import SessionAuthBackend
from nextcloud_mcp_server.auth.userinfo_routes import (
revoke_session,
user_info_html,
vector_sync_status_fragment,
)
from nextcloud_mcp_server.auth.viz_routes import (
chunk_context_endpoint,
vector_visualization_html,
vector_visualization_search,
)
from nextcloud_mcp_server.auth.webhook_routes import (
disable_webhook_preset,
enable_webhook_preset,
webhook_management_pane,
)
# Create a separate Starlette app for browser routes that need session auth
# This prevents SessionAuthBackend from interfering with FastMCP's OAuth
browser_routes = [
Route(
"/", user_info_html, methods=["GET"]
), # /app → user info with all tabs
Route(
"/revoke",
revoke_session,
methods=["POST"],
name="revoke_session_endpoint",
), # /app/revoke → revoke_session
# Vector sync status fragment (htmx polling)
Route(
"/vector-sync/status",
vector_sync_status_fragment,
methods=["GET"],
), # /app/vector-sync/status
# Vector visualization routes
Route(
"/vector-viz", vector_visualization_html, methods=["GET"]
), # /app/vector-viz
Route(
"/vector-viz/search",
vector_visualization_search,
methods=["GET"],
), # /app/vector-viz/search
Route(
"/chunk-context",
chunk_context_endpoint,
methods=["GET"],
), # /app/chunk-context
# Webhook management routes (admin-only)
Route(
"/webhooks", webhook_management_pane, methods=["GET"]
), # /app/webhooks
Route(
"/webhooks/enable/{preset_id:str}",
enable_webhook_preset,
methods=["POST"],
),
Route(
"/webhooks/disable/{preset_id:str}",
disable_webhook_preset,
methods=["DELETE"],
),
]
# Add static files mount if directory exists
static_dir = os.path.join(os.path.dirname(__file__), "auth", "static")
if os.path.isdir(static_dir):
browser_routes.append(
Mount("/static", StaticFiles(directory=static_dir), name="static")
)
logger.info(f"Mounted static files from {static_dir}")
browser_app = Starlette(routes=browser_routes)
browser_app.add_middleware(
AuthenticationMiddleware, # type: ignore[invalid-argument-type]
backend=SessionAuthBackend(oauth_enabled=oauth_enabled),
)
# Add redirect from /app to /app/ (Starlette requires trailing slash for mounted apps)
routes.append(
Route("/app", lambda request: RedirectResponse("/app/", status_code=307))
)
# Mount browser app at /app (webapp and admin routes)
routes.append(Mount("/app", app=browser_app))
logger.info("App routes with session auth: /app, /app/webhooks, /app/revoke")
else:
logger.info("Admin UI (/app) disabled in Smithery stateless mode")
# Mount FastMCP at root last (catch-all, handles OAuth via token_verifier)
routes.append(Mount("/", app=mcp_app))
app = Starlette(routes=routes, lifespan=starlette_lifespan)
logger.info(
"Routes: /user/* with SessionAuth, /mcp with FastMCP OAuth Bearer tokens"
)
# Add debugging middleware to log Authorization headers and client capabilities
@app.middleware("http")
async def log_auth_headers(request, call_next):
auth_header = request.headers.get("authorization")
if request.url.path.startswith("/mcp"):
if auth_header:
# Log first 50 chars of token for debugging
token_preview = (
auth_header[:50] + "..." if len(auth_header) > 50 else auth_header
)
logger.info(f"🔑 /mcp request with Authorization: {token_preview}")
else:
# Only warn about missing Authorization in OAuth mode
# In BasicAuth mode, /mcp requests without Authorization are expected
if oauth_enabled:
logger.warning(
f"⚠️ /mcp request WITHOUT Authorization header from {request.client}"
)
# Log client capabilities on initialize request
if request.method == "POST":
# Read body to check for initialize request
# Starlette caches the body internally, so it's safe to read here
body = await request.body()
try:
data = json.loads(body)
# Check if this is an initialize request
if data.get("method") == "initialize":
params = data.get("params", {})
capabilities = params.get("capabilities", {})
client_info = params.get("clientInfo", {})
logger.info(
f"🔌 MCP client connected: {client_info.get('name', 'unknown')} "
f"v{client_info.get('version', 'unknown')}"
)
# Log capabilities in a structured way
cap_summary = []
# Check for presence using 'in' not truthiness (empty dict {} counts as having capability)
if "roots" in capabilities:
cap_summary.append("roots")
if "sampling" in capabilities:
cap_summary.append("sampling")
if "experimental" in capabilities:
cap_summary.append(
f"experimental({len(capabilities['experimental'])} features)"
)
logger.info(
f"📋 Client capabilities: {', '.join(cap_summary) if cap_summary else 'none'}"
)
# Log full capabilities at INFO level to diagnose capability issues
logger.info(
f"Full capabilities JSON: {json.dumps(capabilities)}"
)
except Exception as e:
# Don't fail the request if logging fails
logger.debug(
f"Failed to parse MCP request for capability logging: {e}"
)
response = await call_next(request)
return response
# Add CORS middleware to allow browser-based clients like MCP Inspector
app.add_middleware(
CORSMiddleware, # type: ignore[invalid-argument-type]
allow_origins=["*"], # Allow all origins for development
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
expose_headers=["*"],
)
# Add observability middleware (metrics + tracing)
if settings.metrics_enabled or settings.otel_exporter_otlp_endpoint:
app.add_middleware(ObservabilityMiddleware) # type: ignore[invalid-argument-type]
logger.info("Observability middleware enabled (metrics and/or tracing)")
# Add exception handler for scope challenges (OAuth mode only)
if oauth_enabled:
@app.exception_handler(InsufficientScopeError)
async def handle_insufficient_scope(request, exc: InsufficientScopeError):
"""Return 403 with WWW-Authenticate header for scope challenges."""
resource_url = os.getenv(
"NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000"
)
scope_str = " ".join(exc.missing_scopes)
return JSONResponse(
status_code=403,
headers={
"WWW-Authenticate": (
f'Bearer error="insufficient_scope", '
f'scope="{scope_str}", '
f'resource_metadata="{resource_url}/.well-known/oauth-protected-resource/mcp"'
)
},
content={
"error": "insufficient_scope",
"scopes_required": exc.missing_scopes,
},
)
logger.info("WWW-Authenticate scope challenge handler enabled")
# ADR-016: Apply SmitheryConfigMiddleware in Smithery stateless mode
# This must be the outermost middleware to extract config from URL query parameters
# before any other middleware processes the request
if deployment_mode == DeploymentMode.SMITHERY_STATELESS:
app = SmitheryConfigMiddleware(app)
logger.info("SmitheryConfigMiddleware enabled for query parameter config")
# Apply BasicAuthMiddleware for multi-user BasicAuth pass-through mode
if settings.enable_multi_user_basic_auth:
app = BasicAuthMiddleware(app)
logger.info(
"BasicAuthMiddleware enabled - multi-user BasicAuth pass-through mode active"
)
return app