Skip to main content
Glama
cbcoutinho

Nextcloud MCP Server

by cbcoutinho
app.py90.2 kB
import logging import os import time from collections.abc import AsyncIterator from contextlib import AsyncExitStack, asynccontextmanager from contextvars import ContextVar from dataclasses import dataclass from typing import TYPE_CHECKING, Optional, cast from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor if TYPE_CHECKING: from nextcloud_mcp_server.auth.storage import RefreshTokenStorage import anyio import click import httpx from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream from mcp.server.auth.settings import AuthSettings from mcp.server.fastmcp import Context, FastMCP from mcp.server.transport_security import TransportSecuritySettings from pydantic import AnyHttpUrl from starlette.applications import Starlette from starlette.middleware.authentication import AuthenticationMiddleware from starlette.middleware.cors import CORSMiddleware from starlette.responses import JSONResponse, RedirectResponse from starlette.routing import Mount, Route from starlette.staticfiles import StaticFiles from starlette.types import ASGIApp, Receive, Send from starlette.types import Scope as StarletteScope from nextcloud_mcp_server.auth import ( InsufficientScopeError, discover_all_scopes, get_access_token_scopes, has_required_scopes, is_jwt_token, ) from nextcloud_mcp_server.auth.unified_verifier import UnifiedTokenVerifier from nextcloud_mcp_server.client import NextcloudClient from nextcloud_mcp_server.config import ( DeploymentMode, get_deployment_mode, get_document_processor_config, get_settings, ) from nextcloud_mcp_server.context import get_client as get_nextcloud_client from nextcloud_mcp_server.document_processors import get_registry from nextcloud_mcp_server.observability import ( ObservabilityMiddleware, setup_metrics, setup_tracing, ) from nextcloud_mcp_server.observability.metrics import ( record_dependency_check, set_dependency_health, ) from nextcloud_mcp_server.server import ( configure_calendar_tools, configure_contacts_tools, configure_cookbook_tools, configure_deck_tools, configure_news_tools, configure_notes_tools, configure_semantic_tools, configure_sharing_tools, configure_tables_tools, configure_webdav_tools, ) from nextcloud_mcp_server.server.oauth_tools import register_oauth_tools from nextcloud_mcp_server.vector import processor_task, scanner_task logger = logging.getLogger(__name__) HTTPXClientInstrumentor().instrument() def initialize_document_processors(): """Initialize and register document processors based on configuration. This function reads the environment configuration and registers available processors (Unstructured, Tesseract, Custom HTTP) with the global registry. """ config = get_document_processor_config() if not config["enabled"]: logger.info("Document processing disabled") return registry = get_registry() registered_count = 0 # Register Unstructured processor if "unstructured" in config["processors"]: unst_config = config["processors"]["unstructured"] try: from nextcloud_mcp_server.document_processors.unstructured import ( UnstructuredProcessor, ) processor = UnstructuredProcessor( api_url=unst_config["api_url"], timeout=unst_config["timeout"], default_strategy=unst_config["strategy"], default_languages=unst_config["languages"], progress_interval=unst_config.get("progress_interval", 10), ) registry.register(processor, priority=10) logger.info(f"Registered Unstructured processor: {unst_config['api_url']}") registered_count += 1 except Exception as e: logger.warning(f"Failed to register Unstructured processor: {e}") # Register Tesseract processor if "tesseract" in config["processors"]: tess_config = config["processors"]["tesseract"] try: from nextcloud_mcp_server.document_processors.tesseract import ( TesseractProcessor, ) processor = TesseractProcessor( tesseract_cmd=tess_config.get("tesseract_cmd"), default_lang=tess_config["lang"], ) registry.register(processor, priority=5) logger.info(f"Registered Tesseract processor: lang={tess_config['lang']}") registered_count += 1 except Exception as e: logger.warning(f"Failed to register Tesseract processor: {e}") # Register PyMuPDF processor (high priority, local, no API required) if "pymupdf" in config["processors"]: pymupdf_config = config["processors"]["pymupdf"] try: from nextcloud_mcp_server.document_processors.pymupdf import ( PyMuPDFProcessor, ) processor = PyMuPDFProcessor( extract_images=pymupdf_config.get("extract_images", True), image_dir=pymupdf_config.get("image_dir"), ) registry.register(processor, priority=15) # Higher than unstructured logger.info( f"Registered PyMuPDF processor: extract_images={pymupdf_config.get('extract_images', True)}" ) registered_count += 1 except Exception as e: logger.warning(f"Failed to register PyMuPDF processor: {e}") # Register custom processor if "custom" in config["processors"]: custom_config = config["processors"]["custom"] try: from nextcloud_mcp_server.document_processors.custom_http import ( CustomHTTPProcessor, ) processor = CustomHTTPProcessor( name=custom_config["name"], api_url=custom_config["api_url"], api_key=custom_config.get("api_key"), timeout=custom_config["timeout"], supported_types=custom_config["supported_types"], ) registry.register(processor, priority=1) logger.info( f"Registered Custom processor '{custom_config['name']}': {custom_config['api_url']}" ) registered_count += 1 except Exception as e: logger.warning(f"Failed to register Custom processor: {e}") if registered_count > 0: logger.info( f"Document processing initialized with {registered_count} processor(s): " f"{', '.join(registry.list_processors())}" ) else: logger.warning("Document processing enabled but no processors registered") def validate_pkce_support(discovery: dict, discovery_url: str) -> None: """ Validate that the OIDC provider properly advertises PKCE support. According to RFC 8414, if code_challenge_methods_supported is absent, it means the authorization server does not support PKCE. MCP clients require PKCE with S256 and will refuse to connect if this field is missing or doesn't include S256. """ code_challenge_methods = discovery.get("code_challenge_methods_supported") if code_challenge_methods is None: click.echo("=" * 80, err=True) click.echo( "ERROR: OIDC CONFIGURATION ERROR - Missing PKCE Support Advertisement", err=True, ) click.echo("=" * 80, err=True) click.echo(f"Discovery URL: {discovery_url}", err=True) click.echo("", err=True) click.echo( "The OIDC discovery document is missing 'code_challenge_methods_supported'.", err=True, ) click.echo( "According to RFC 8414, this means the server does NOT support PKCE.", err=True, ) click.echo("", err=True) click.echo("⚠️ MCP clients (like Claude Code) WILL REJECT this provider!") click.echo("", err=True) click.echo("How to fix:", err=True) click.echo( " 1. Ensure PKCE is enabled in Nextcloud OIDC app settings", err=True ) click.echo( " 2. Update the OIDC app to advertise PKCE support in discovery", err=True ) click.echo(" 3. See: RFC 8414 Section 2 (Authorization Server Metadata)") click.echo("=" * 80, err=True) click.echo("", err=True) return if "S256" not in code_challenge_methods: click.echo("=" * 80, err=True) click.echo( "WARNING: OIDC CONFIGURATION WARNING - S256 Challenge Method Not Advertised", err=True, ) click.echo("=" * 80, err=True) click.echo(f"Discovery URL: {discovery_url}", err=True) click.echo(f"Advertised methods: {code_challenge_methods}", err=True) click.echo("", err=True) click.echo("MCP specification requires S256 code challenge method.", err=True) click.echo("Some clients may reject this provider.", err=True) click.echo("=" * 80, err=True) click.echo("", err=True) return click.echo(f"✓ PKCE support validated: {code_challenge_methods}") @dataclass class VectorSyncState: """ Module-level state for vector sync background tasks. This singleton bridges the Starlette server lifespan (where background tasks run) and FastMCP session lifespans (where MCP tools need access to the streams). """ document_send_stream: Optional[MemoryObjectSendStream] = None document_receive_stream: Optional[MemoryObjectReceiveStream] = None shutdown_event: Optional[anyio.Event] = None scanner_wake_event: Optional[anyio.Event] = None # Module-level singleton for vector sync state _vector_sync_state = VectorSyncState() @dataclass class AppContext: """Application context for BasicAuth mode.""" client: NextcloudClient storage: Optional["RefreshTokenStorage"] = None document_send_stream: Optional[MemoryObjectSendStream] = None document_receive_stream: Optional[MemoryObjectReceiveStream] = None shutdown_event: Optional[anyio.Event] = None scanner_wake_event: Optional[anyio.Event] = None @dataclass class OAuthAppContext: """Application context for OAuth mode.""" nextcloud_host: str token_verifier: object # UnifiedTokenVerifier (ADR-005 compliant) refresh_token_storage: Optional["RefreshTokenStorage"] = None oauth_client: Optional[object] = None # NextcloudOAuthClient or KeycloakOAuthClient oauth_provider: str = "nextcloud" # "nextcloud" or "keycloak" server_client_id: Optional[str] = ( None # MCP server's OAuth client ID (static or DCR) ) @dataclass class SmitheryAppContext: """Application context for Smithery stateless mode. ADR-016: No shared client - clients created per-request from session config. """ pass # No shared state needed - everything comes from session config # ADR-016: Smithery config schema for container runtime # This schema is served at /.well-known/mcp-config for Smithery discovery # See: https://smithery.ai/docs/build/session-config SMITHERY_CONFIG_SCHEMA = { "$schema": "http://json-schema.org/draft-07/schema#", "$id": "https://server.smithery.ai/nextcloud-mcp-server/.well-known/mcp-config", "title": "Nextcloud MCP Server Configuration", "description": "Configuration for connecting to your Nextcloud instance via app password authentication", "x-query-style": "flat", # Our schema has no nested objects, so flat style works "type": "object", "required": ["nextcloud_url", "username", "app_password"], "properties": { "nextcloud_url": { "type": "string", "title": "Nextcloud URL", "description": "Your Nextcloud instance URL (e.g., https://cloud.example.com). Must be publicly accessible.", "pattern": "^https?://.+", }, "username": { "type": "string", "title": "Username", "description": "Your Nextcloud username", "minLength": 1, }, "app_password": { "type": "string", "title": "App Password", "description": "Nextcloud app password. Generate at Settings > Security > App passwords. Do NOT use your main password.", "minLength": 1, }, }, "additionalProperties": False, } # ADR-016: Context variable to hold Smithery session config per-request # This is set by SmitheryConfigMiddleware and accessed in context.py _smithery_session_config: ContextVar[dict[str, str] | None] = ContextVar( "smithery_session_config" ) _smithery_session_config.set(None) # Set initial value def get_smithery_session_config() -> dict | None: """Get the current Smithery session config from context variable. Used by context.py to access config extracted from URL query parameters. """ return _smithery_session_config.get() class SmitheryConfigMiddleware: """Middleware to extract Smithery config from URL query parameters. ADR-016: For container runtime, Smithery passes configuration as URL query parameters to the /mcp endpoint. This middleware extracts those parameters and stores them in a context variable for access in tools. Configuration parameters: - nextcloud_url: Nextcloud instance URL - username: Nextcloud username - app_password: Nextcloud app password The extracted config is stored in a ContextVar and can be accessed via get_smithery_session_config() in context.py. """ def __init__(self, app: ASGIApp): self.app = app async def __call__( self, scope: StarletteScope, receive: Receive, send: Send ) -> None: if scope["type"] == "http": # Extract config from query parameters from urllib.parse import parse_qs query_string = scope.get("query_string", b"").decode("utf-8") params = parse_qs(query_string) # Build session config from query parameters # Smithery uses dot notation for nested objects, but our schema is flat session_config = {} for key in ["nextcloud_url", "username", "app_password"]: if key in params: # parse_qs returns lists, take first value session_config[key] = params[key][0] # Store in context variable for access by context.py if session_config: _smithery_session_config.set(session_config) logger.debug( f"Smithery config extracted: nextcloud_url={session_config.get('nextcloud_url')}, " f"username={session_config.get('username')}" ) try: await self.app(scope, receive, send) finally: # Clear context variable after request _smithery_session_config.set(None) @asynccontextmanager async def app_lifespan_smithery(server: FastMCP) -> AsyncIterator[SmitheryAppContext]: """ Manage application lifecycle for Smithery stateless mode. ADR-016: Minimal lifespan with no shared state. - No shared Nextcloud client (created per-request from session config) - No vector sync (disabled in Smithery mode) - No persistent storage (stateless deployment) - No document processors (not enabled in Smithery mode) """ logger.info("Starting MCP server in Smithery stateless mode") logger.info("Clients will be created per-request from session config") try: yield SmitheryAppContext() finally: logger.info("Shutting down Smithery stateless mode") def is_oauth_mode() -> bool: """ Determine if OAuth mode should be used. OAuth mode is enabled when: - NEXTCLOUD_USERNAME and NEXTCLOUD_PASSWORD are NOT set - AND we are NOT in Smithery stateless mode - Or explicitly enabled via configuration Returns: True if OAuth mode, False if BasicAuth mode """ # ADR-016: Smithery stateless mode uses per-request BasicAuth from session config # It's not OAuth mode even though env credentials aren't set deployment_mode = get_deployment_mode() if deployment_mode == DeploymentMode.SMITHERY_STATELESS: logger.info( "BasicAuth mode (Smithery stateless - credentials from session config)" ) return False username = os.getenv("NEXTCLOUD_USERNAME") password = os.getenv("NEXTCLOUD_PASSWORD") # If both username and password are set, use BasicAuth if username and password: logger.info( "BasicAuth mode detected (NEXTCLOUD_USERNAME and NEXTCLOUD_PASSWORD set)" ) return False logger.info("OAuth mode detected (NEXTCLOUD_USERNAME/PASSWORD not set)") return True async def load_oauth_client_credentials( nextcloud_host: str, registration_endpoint: str | None ) -> tuple[str, str]: """ Load OAuth client credentials from environment, storage file, or dynamic registration. This consolidates the client loading logic that was duplicated across multiple functions. Args: nextcloud_host: Nextcloud instance URL registration_endpoint: Dynamic registration endpoint URL (or None if not available) Returns: Tuple of (client_id, client_secret) Raises: ValueError: If credentials cannot be obtained """ # Try environment variables first client_id = os.getenv("NEXTCLOUD_OIDC_CLIENT_ID") client_secret = os.getenv("NEXTCLOUD_OIDC_CLIENT_SECRET") if client_id and client_secret: logger.info("Using pre-configured OAuth client credentials from environment") return (client_id, client_secret) # Try loading from SQLite storage try: from nextcloud_mcp_server.auth.storage import RefreshTokenStorage storage = RefreshTokenStorage.from_env() await storage.initialize() client_data = await storage.get_oauth_client() if client_data: logger.info( f"Loaded OAuth client from SQLite: {client_data['client_id'][:16]}..." ) return (client_data["client_id"], client_data["client_secret"]) except ValueError: # TOKEN_ENCRYPTION_KEY not set, skip SQLite storage check logger.debug("SQLite storage not available (TOKEN_ENCRYPTION_KEY not set)") # Try dynamic registration if available if registration_endpoint: logger.info("Dynamic client registration available") mcp_server_url = os.getenv("NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000") redirect_uris = [ f"{mcp_server_url}/oauth/callback", # Unified callback (flow determined by query param) ] # MCP server DCR: Register with ALL supported scopes # When we register as a resource server (with resource_url), the allowed_scopes # represent what scopes are AVAILABLE for this resource, not what the server needs. # External clients will request tokens with resource=http://localhost:8001/mcp # and the authorization server will limit them to these allowed scopes. # # The PRM endpoint advertises the same scopes dynamically via @require_scopes decorators. dcr_scopes = "openid profile email notes:read notes:write calendar:read calendar:write todo:read todo:write contacts:read contacts:write cookbook:read cookbook:write deck:read deck:write tables:read tables:write files:read files:write sharing:read sharing:write news:read news:write" # Add offline_access scope if refresh tokens are enabled enable_offline_access = os.getenv("ENABLE_OFFLINE_ACCESS", "false").lower() in ( "true", "1", "yes", ) if enable_offline_access: dcr_scopes = f"{dcr_scopes} offline_access" logger.info("✓ offline_access scope enabled for refresh tokens") logger.info(f"MCP server DCR scopes (resource server): {dcr_scopes}") # Get token type from environment (Bearer or jwt) # Note: Must be lowercase "jwt" to match OIDC app's check token_type = os.getenv("NEXTCLOUD_OIDC_TOKEN_TYPE", "Bearer").lower() # Special case: "bearer" should remain capitalized for compatibility if token_type != "jwt": token_type = "Bearer" logger.info(f"Requesting token type: {token_type}") # Ensure OAuth client in SQLite storage from nextcloud_mcp_server.auth.client_registration import ensure_oauth_client from nextcloud_mcp_server.auth.storage import RefreshTokenStorage storage = RefreshTokenStorage.from_env() await storage.initialize() # RFC 9728: resource_url must be a URL for the protected resource # This URL is used by token introspection to match tokens to this client resource_url = f"{mcp_server_url}/mcp" client_info = await ensure_oauth_client( nextcloud_url=nextcloud_host, registration_endpoint=registration_endpoint, storage=storage, client_name=f"Nextcloud MCP Server ({token_type})", redirect_uris=redirect_uris, scopes=dcr_scopes, # Use DCR-specific scopes (basic OIDC only) token_type=token_type, resource_url=resource_url, # RFC 9728 Protected Resource URL ) logger.info(f"OAuth client ready: {client_info.client_id[:16]}...") return (client_info.client_id, client_info.client_secret) # No credentials available raise ValueError( "OAuth mode requires either:\n" "1. NEXTCLOUD_OIDC_CLIENT_ID and NEXTCLOUD_OIDC_CLIENT_SECRET environment variables, OR\n" "2. Pre-existing client credentials in SQLite storage (TOKEN_STORAGE_DB), OR\n" "3. Dynamic client registration enabled on Nextcloud OIDC app\n\n" "Note: TOKEN_ENCRYPTION_KEY is required for SQLite storage" ) @asynccontextmanager async def app_lifespan_basic(server: FastMCP) -> AsyncIterator[AppContext]: """ Manage application lifecycle for BasicAuth mode (FastMCP session lifespan). Creates a single Nextcloud client with basic authentication that is shared across all requests within a session. Note: Background tasks (scanner, processor) are started at server level in starlette_lifespan, not here. This lifespan runs per-session. """ logger.info("Starting MCP session in BasicAuth mode") logger.info("Creating Nextcloud client with BasicAuth") client = NextcloudClient.from_env() logger.info("Client initialization complete") # Initialize persistent storage (for webhook tracking and future features) from nextcloud_mcp_server.auth.storage import RefreshTokenStorage storage = RefreshTokenStorage.from_env() await storage.initialize() logger.info("Persistent storage initialized (webhook tracking enabled)") # Initialize document processors initialize_document_processors() # Yield client context - scanner runs at server level (starlette_lifespan) # Include vector sync state from module singleton (set by starlette_lifespan) try: yield AppContext( client=client, storage=storage, document_send_stream=_vector_sync_state.document_send_stream, document_receive_stream=_vector_sync_state.document_receive_stream, shutdown_event=_vector_sync_state.shutdown_event, scanner_wake_event=_vector_sync_state.scanner_wake_event, ) finally: logger.info("Shutting down BasicAuth session") await client.close() async def setup_oauth_config(): """ Setup OAuth configuration by performing OIDC discovery and client registration. Auto-detects OAuth provider mode: - Integrated mode: OIDC_DISCOVERY_URL points to NEXTCLOUD_HOST (or not set) → Nextcloud OIDC app provides both OAuth and API access - External IdP mode: OIDC_DISCOVERY_URL points to external provider → External IdP for OAuth, Nextcloud user_oidc validates tokens and provides API access Uses OIDC environment variables: - OIDC_DISCOVERY_URL: OIDC discovery endpoint (optional, defaults to NEXTCLOUD_HOST) - NEXTCLOUD_OIDC_CLIENT_ID / NEXTCLOUD_OIDC_CLIENT_SECRET: Static credentials (optional, uses DCR if not provided) - NEXTCLOUD_OIDC_SCOPES: Requested OAuth scopes This is done synchronously before FastMCP initialization because FastMCP requires token_verifier at construction time. Returns: Tuple of (nextcloud_host, token_verifier, auth_settings, refresh_token_storage, oauth_client, oauth_provider, client_id, client_secret) """ nextcloud_host = os.getenv("NEXTCLOUD_HOST") if not nextcloud_host: raise ValueError( "NEXTCLOUD_HOST environment variable is required for OAuth mode" ) nextcloud_host = nextcloud_host.rstrip("/") # Get OIDC discovery URL (defaults to Nextcloud integrated mode) discovery_url = os.getenv( "OIDC_DISCOVERY_URL", f"{nextcloud_host}/.well-known/openid-configuration" ) logger.info(f"Performing OIDC discovery: {discovery_url}") # Perform OIDC discovery async with httpx.AsyncClient() as client: response = await client.get(discovery_url) response.raise_for_status() discovery = response.json() logger.info("✓ OIDC discovery successful") # Validate PKCE support validate_pkce_support(discovery, discovery_url) # Extract OIDC endpoints issuer = discovery["issuer"] userinfo_uri = discovery["userinfo_endpoint"] jwks_uri = discovery.get("jwks_uri") introspection_uri = discovery.get("introspection_endpoint") registration_endpoint = discovery.get("registration_endpoint") # Allow overriding JWKS URI (useful when running in Docker with frontendUrl) # Example: frontendUrl=http://localhost:8888 but MCP server needs http://keycloak:8080 jwks_uri_override = os.getenv("OIDC_JWKS_URI") if jwks_uri_override: logger.info(f"OIDC_JWKS_URI override: {jwks_uri} → {jwks_uri_override}") jwks_uri = jwks_uri_override # Rewrite discovered endpoint URLs from public issuer to internal host # This is needed when OIDC discovery returns public URLs (e.g., http://localhost:8080) # but the server needs to access them via internal docker network (e.g., http://app:80) from urllib.parse import urlparse issuer_parsed = urlparse(issuer) nextcloud_parsed = urlparse(nextcloud_host) issuer_base = f"{issuer_parsed.scheme}://{issuer_parsed.netloc}" nextcloud_base = f"{nextcloud_parsed.scheme}://{nextcloud_parsed.netloc}" if issuer_base != nextcloud_base: logger.info(f"Rewriting OIDC endpoints: {issuer_base} → {nextcloud_base}") def rewrite_url(url: str | None) -> str | None: if url and url.startswith(issuer_base): return url.replace(issuer_base, nextcloud_base, 1) return url userinfo_uri = rewrite_url(userinfo_uri) or userinfo_uri jwks_uri = rewrite_url(jwks_uri) introspection_uri = rewrite_url(introspection_uri) registration_endpoint = rewrite_url(registration_endpoint) logger.info("OIDC endpoints discovered:") logger.info(f" Issuer: {issuer}") logger.info(f" Userinfo: {userinfo_uri}") if jwks_uri: logger.info(f" JWKS: {jwks_uri}") if introspection_uri: logger.info(f" Introspection: {introspection_uri}") # Auto-detect provider mode based on issuer # External IdP mode: issuer doesn't match Nextcloud host # Normalize URLs for comparison (handle port differences like :80 for HTTP) def normalize_url(url: str) -> str: """Normalize URL by removing default ports (80 for HTTP, 443 for HTTPS).""" parsed = urlparse(url) # Remove default ports if (parsed.scheme == "http" and parsed.port == 80) or ( parsed.scheme == "https" and parsed.port == 443 ): # Remove explicit default port hostname = parsed.hostname or parsed.netloc.split(":")[0] return f"{parsed.scheme}://{hostname}" return f"{parsed.scheme}://{parsed.netloc}" issuer_normalized = normalize_url(issuer) nextcloud_normalized = normalize_url(nextcloud_host) # Use NEXTCLOUD_PUBLIC_ISSUER_URL for IdP detection when set # This handles the case where MCP server accesses Nextcloud via internal URL (http://app:80) # but the issuer in OIDC discovery is the public URL (http://localhost:8080) public_issuer_for_detection = os.getenv("NEXTCLOUD_PUBLIC_ISSUER_URL") if public_issuer_for_detection: comparison_issuer = normalize_url(public_issuer_for_detection) else: comparison_issuer = nextcloud_normalized is_external_idp = not issuer_normalized.startswith(comparison_issuer) if is_external_idp: oauth_provider = "external" # Could be Keycloak, Auth0, Okta, etc. logger.info( f"✓ Detected external IdP mode (issuer: {issuer} != Nextcloud: {nextcloud_host})" ) logger.info(" Tokens will be validated via Nextcloud user_oidc app") else: oauth_provider = "nextcloud" logger.info("✓ Detected integrated mode (Nextcloud OIDC app)") # For integrated mode, rewrite OIDC endpoints to use internal URL # The discovery document returns external URLs (http://localhost:8080) # but the MCP server needs internal URLs (http://app:80) for backend requests if jwks_uri and not os.getenv("OIDC_JWKS_URI"): internal_jwks_uri = f"{nextcloud_host}/apps/oidc/jwks" logger.info( f" Auto-rewriting JWKS URI for internal access: {jwks_uri} → {internal_jwks_uri}" ) jwks_uri = internal_jwks_uri if introspection_uri and not os.getenv("OIDC_INTROSPECTION_URI"): internal_introspection_uri = f"{nextcloud_host}/apps/oidc/introspect" logger.info( f" Auto-rewriting introspection URI for internal access: {introspection_uri} → {internal_introspection_uri}" ) introspection_uri = internal_introspection_uri if userinfo_uri: internal_userinfo_uri = f"{nextcloud_host}/apps/oidc/userinfo" logger.info( f" Auto-rewriting userinfo URI for internal access: {userinfo_uri} → {internal_userinfo_uri}" ) userinfo_uri = internal_userinfo_uri # Check if offline access (refresh tokens) is enabled enable_offline_access = os.getenv("ENABLE_OFFLINE_ACCESS", "false").lower() in ( "true", "1", "yes", ) # Initialize refresh token storage if enabled refresh_token_storage = None if enable_offline_access: try: from nextcloud_mcp_server.auth.storage import ( RefreshTokenStorage, ) # Validate encryption key before initializing encryption_key = os.getenv("TOKEN_ENCRYPTION_KEY") if not encryption_key: logger.warning( "ENABLE_OFFLINE_ACCESS=true but TOKEN_ENCRYPTION_KEY not set. " "Refresh tokens will NOT be stored. Generate a key with:\n" ' python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"' ) else: refresh_token_storage = RefreshTokenStorage.from_env() await refresh_token_storage.initialize() logger.info( "✓ Refresh token storage initialized (offline_access enabled)" ) except Exception as e: logger.error(f"Failed to initialize refresh token storage: {e}") logger.warning( "Continuing without refresh token storage - users will need to re-authenticate after token expiration" ) # Load client credentials (static or dynamic registration) client_id = os.getenv("NEXTCLOUD_OIDC_CLIENT_ID") client_secret = os.getenv("NEXTCLOUD_OIDC_CLIENT_SECRET") if client_id and client_secret: logger.info(f"Using static OIDC client credentials: {client_id}") elif registration_endpoint: logger.info( "NEXTCLOUD_OIDC_CLIENT_ID not set, attempting Dynamic Client Registration" ) client_id, client_secret = await load_oauth_client_credentials( nextcloud_host=nextcloud_host, registration_endpoint=registration_endpoint ) else: raise ValueError( "NEXTCLOUD_OIDC_CLIENT_ID and NEXTCLOUD_OIDC_CLIENT_SECRET environment variables are required " "when the OIDC provider does not support Dynamic Client Registration. " f"Discovery URL: {discovery_url}" ) # Handle public issuer override (for clients accessing via different URL) # When clients access Nextcloud via a public URL (e.g., http://127.0.0.1:8080), # but the MCP server accesses via internal URL (e.g., http://app:80), # we need to use the public URL for JWT validation and client configuration public_issuer = os.getenv("NEXTCLOUD_PUBLIC_ISSUER_URL") if public_issuer: public_issuer = public_issuer.rstrip("/") logger.info( f"Using public issuer URL override for JWT validation: {public_issuer}" ) client_issuer = public_issuer else: client_issuer = issuer # ADR-005: Unified Token Verifier with proper audience validation # Get MCP server URL for audience validation mcp_server_url = os.getenv("NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000") nextcloud_resource_uri = os.getenv("NEXTCLOUD_RESOURCE_URI", nextcloud_host) # Warn if resource URIs are not configured (required for ADR-005 compliance) if not os.getenv("NEXTCLOUD_MCP_SERVER_URL"): logger.warning( f"NEXTCLOUD_MCP_SERVER_URL not set, defaulting to: {mcp_server_url}. " "This should be set explicitly for proper audience validation." ) if not os.getenv("NEXTCLOUD_RESOURCE_URI"): logger.warning( f"NEXTCLOUD_RESOURCE_URI not set, defaulting to: {nextcloud_resource_uri}. " "This should be set explicitly for proper audience validation." ) # Create settings for UnifiedTokenVerifier from nextcloud_mcp_server.config import get_settings settings = get_settings() # Override with discovered values if not set in environment if not settings.oidc_client_id: settings.oidc_client_id = client_id if not settings.oidc_client_secret: settings.oidc_client_secret = client_secret if not settings.jwks_uri: settings.jwks_uri = jwks_uri if not settings.introspection_uri: settings.introspection_uri = introspection_uri if not settings.userinfo_uri: settings.userinfo_uri = userinfo_uri if not settings.oidc_issuer: # Use client_issuer which handles public URL override settings.oidc_issuer = client_issuer if not settings.nextcloud_mcp_server_url: settings.nextcloud_mcp_server_url = mcp_server_url if not settings.nextcloud_resource_uri: settings.nextcloud_resource_uri = nextcloud_resource_uri # Create Unified Token Verifier (ADR-005 compliant) token_verifier = UnifiedTokenVerifier(settings) # Log the mode enable_token_exchange = ( os.getenv("ENABLE_TOKEN_EXCHANGE", "false").lower() == "true" ) if enable_token_exchange: logger.info( "✓ Token Exchange mode enabled (ADR-005) - exchanging MCP tokens for Nextcloud tokens via RFC 8693" ) logger.info(f" MCP audience: {client_id} or {mcp_server_url}") logger.info(f" Nextcloud audience: {nextcloud_resource_uri}") else: logger.info( "✓ Multi-audience mode enabled (ADR-005) - tokens must contain both MCP and Nextcloud audiences" ) logger.info(f" Required MCP audience: {client_id} or {mcp_server_url}") logger.info(f" Required Nextcloud audience: {nextcloud_resource_uri}") if introspection_uri: logger.info("✓ Opaque token introspection enabled (RFC 7662)") if jwks_uri: logger.info("✓ JWT signature verification enabled (JWKS)") # Progressive Consent mode (for offline access / background jobs) encryption_key = os.getenv("TOKEN_ENCRYPTION_KEY") if enable_offline_access and encryption_key and refresh_token_storage: logger.info("✓ Progressive Consent mode enabled - offline access available") # Note: Token Broker service would be initialized here for background job support # Currently not used in ADR-005 implementation as it's specific to offline access patterns # that are separate from the real-time token exchange flow logger.debug("Token broker available for future offline access features") # Create OAuth client for server-initiated flows (e.g., token exchange, background workers) oauth_client = None if enable_offline_access and refresh_token_storage and is_external_idp: # For external IdP mode, create generic OIDC client for token operations from nextcloud_mcp_server.auth.keycloak_oauth import KeycloakOAuthClient mcp_server_url = os.getenv("NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000") # Note: This redirect_uri is for OAuth client initialization, not used for actual redirects # since this client is used for backend token operations (exchange, refresh) redirect_uri = f"{mcp_server_url}/oauth/callback" # Extract base URL and realm from discovery URL # Format: http://keycloak:8080/realms/nextcloud-mcp/.well-known/openid-configuration # → base_url: http://keycloak:8080, realm: nextcloud-mcp if "/realms/" in discovery_url: base_url = discovery_url.split("/realms/")[0] realm = discovery_url.split("/realms/")[1].split("/")[0] else: # Fallback: use issuer to extract base URL base_url = ( issuer.rsplit("/realms/", 1)[0] if "/realms/" in issuer else issuer ) realm = issuer.split("/realms/")[1] if "/realms/" in issuer else "" oauth_client = KeycloakOAuthClient( keycloak_url=base_url, realm=realm, client_id=client_id, client_secret=client_secret, redirect_uri=redirect_uri, ) await oauth_client.discover() logger.info( "✓ OIDC client initialized for token operations (token exchange, refresh)" ) elif enable_offline_access and refresh_token_storage: # For integrated mode, OAuth client could be added later # For now, token refresh can use httpx directly with discovered endpoints logger.info( "OAuth client for token refresh not yet implemented for integrated mode" ) # Create auth settings mcp_server_url = os.getenv("NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000") # Note: We don't set required_scopes here anymore. # Scopes are now advertised via PRM endpoint and enforced per-tool. # This allows dynamic tool filtering based on user's actual token scopes. auth_settings = AuthSettings( issuer_url=AnyHttpUrl( client_issuer ), # Use client issuer (may be public override) resource_server_url=AnyHttpUrl(mcp_server_url), ) logger.info("OAuth configuration complete") return ( nextcloud_host, token_verifier, auth_settings, refresh_token_storage, oauth_client, oauth_provider, client_id, client_secret, ) def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None = None): # Initialize observability (logging will be configured by uvicorn) settings = get_settings() # Setup Prometheus metrics (always enabled by default) if settings.metrics_enabled: setup_metrics(port=settings.metrics_port) logger.info( f"Prometheus metrics enabled on dedicated port {settings.metrics_port}" ) # Setup OpenTelemetry tracing (optional) if settings.otel_exporter_otlp_endpoint: setup_tracing( service_name=settings.otel_service_name, otlp_endpoint=settings.otel_exporter_otlp_endpoint, otlp_verify_ssl=settings.otel_exporter_verify_ssl, sampling_rate=settings.otel_traces_sampler_arg, ) logger.info( f"OpenTelemetry tracing enabled (endpoint: {settings.otel_exporter_otlp_endpoint})" ) else: logger.info( "OpenTelemetry tracing disabled (set OTEL_EXPORTER_OTLP_ENDPOINT to enable)" ) # Determine authentication mode and deployment mode oauth_enabled = is_oauth_mode() deployment_mode = get_deployment_mode() if oauth_enabled: logger.info("Configuring MCP server for OAuth mode") # Asynchronously get the OAuth configuration import anyio ( nextcloud_host, token_verifier, auth_settings, refresh_token_storage, oauth_client, oauth_provider, client_id, client_secret, ) = anyio.run(setup_oauth_config) # Create lifespan function with captured OAuth context (closure) @asynccontextmanager async def oauth_lifespan(server: FastMCP) -> AsyncIterator[OAuthAppContext]: """ Lifespan context for OAuth mode - captures OAuth configuration from outer scope. """ logger.info("Starting MCP server in OAuth mode") logger.info(f"Using OAuth provider: {oauth_provider}") if refresh_token_storage: logger.info("Refresh token storage is available") if oauth_client: logger.info("OAuth client is available for token refresh") # Initialize document processors initialize_document_processors() try: yield OAuthAppContext( nextcloud_host=nextcloud_host, token_verifier=token_verifier, refresh_token_storage=refresh_token_storage, oauth_client=oauth_client, oauth_provider=oauth_provider, server_client_id=client_id, ) finally: logger.info("Shutting down MCP server") # RefreshTokenStorage uses context managers, no close() needed # OAuth client cleanup (if it has a close method) if oauth_client and hasattr(oauth_client, "close"): try: await oauth_client.close() except Exception as e: logger.warning(f"Error closing OAuth client: {e}") logger.info("MCP server shutdown complete") mcp = FastMCP( "Nextcloud MCP", lifespan=oauth_lifespan, token_verifier=token_verifier, auth=auth_settings, # Disable DNS rebinding protection for containerized deployments (k8s, Docker) # MCP 1.23+ auto-enables this for localhost, breaking k8s service DNS names transport_security=TransportSecuritySettings( enable_dns_rebinding_protection=False ), ) else: # ADR-016: Use Smithery lifespan for stateless mode, BasicAuth otherwise if deployment_mode == DeploymentMode.SMITHERY_STATELESS: logger.info("Configuring MCP server for Smithery stateless mode") # json_response=True returns plain JSON-RPC instead of SSE format, # required for Smithery scanner compatibility mcp = FastMCP( "Nextcloud MCP", lifespan=app_lifespan_smithery, json_response=True, # Disable DNS rebinding protection for containerized deployments (k8s, Docker) # MCP 1.23+ auto-enables this for localhost, breaking k8s service DNS names transport_security=TransportSecuritySettings( enable_dns_rebinding_protection=False ), ) else: logger.info("Configuring MCP server for BasicAuth mode") mcp = FastMCP( "Nextcloud MCP", lifespan=app_lifespan_basic, # Disable DNS rebinding protection for containerized deployments (k8s, Docker) # MCP 1.23+ auto-enables this for localhost, breaking k8s service DNS names transport_security=TransportSecuritySettings( enable_dns_rebinding_protection=False ), ) @mcp.resource("nc://capabilities") async def nc_get_capabilities(): """Get the Nextcloud Host capabilities""" ctx: Context = mcp.get_context() client = await get_nextcloud_client(ctx) return await client.capabilities() # Define available apps and their configuration functions available_apps = { "notes": configure_notes_tools, "tables": configure_tables_tools, "webdav": configure_webdav_tools, "sharing": configure_sharing_tools, "calendar": configure_calendar_tools, "contacts": configure_contacts_tools, "cookbook": configure_cookbook_tools, "deck": configure_deck_tools, "news": configure_news_tools, } # If no specific apps are specified, enable all if enabled_apps is None: enabled_apps = list(available_apps.keys()) # Configure only the enabled apps for app_name in enabled_apps: if app_name in available_apps: logger.info(f"Configuring {app_name} tools") available_apps[app_name](mcp) else: logger.warning( f"Unknown app: {app_name}. Available apps: {list(available_apps.keys())}" ) # Register semantic search tools (cross-app feature) # ADR-016: Skip in Smithery stateless mode (no vector database) settings = get_settings() deployment_mode = get_deployment_mode() if deployment_mode == DeploymentMode.SMITHERY_STATELESS: logger.info("Skipping semantic search tools (Smithery stateless mode)") elif settings.vector_sync_enabled: logger.info("Configuring semantic search tools (vector sync enabled)") configure_semantic_tools(mcp) else: logger.info("Skipping semantic search tools (VECTOR_SYNC_ENABLED not set)") # Register OAuth provisioning tools (only when offline access is enabled) # With token exchange enabled (external IdP), provisioning is not needed for MCP operations enable_token_exchange = ( os.getenv("ENABLE_TOKEN_EXCHANGE", "false").lower() == "true" ) enable_offline_access_for_tools = os.getenv( "ENABLE_OFFLINE_ACCESS", "false" ).lower() in ( "true", "1", "yes", ) if oauth_enabled and enable_offline_access_for_tools and not enable_token_exchange: logger.info("Registering OAuth provisioning tools for offline access") register_oauth_tools(mcp) elif oauth_enabled and enable_token_exchange: logger.info("Skipping provisioning tools registration (token exchange enabled)") elif oauth_enabled and not enable_offline_access_for_tools: logger.info( "Skipping provisioning tools registration (offline access not enabled)" ) # Override list_tools to filter based on user's token scopes (OAuth mode only) if oauth_enabled: original_list_tools = mcp._tool_manager.list_tools def list_tools_filtered(): """List tools filtered by user's token scopes (JWT and Bearer tokens).""" # Get user's scopes from token using MCP SDK's contextvar # This works for all request types including list_tools user_scopes = get_access_token_scopes() is_jwt = is_jwt_token() logger.info( f"🔍 list_tools called - Token type: {'JWT' if is_jwt else 'opaque/none'}, " f"User scopes: {user_scopes}" ) # Get all tools all_tools = original_list_tools() # Filter tools based on user's token scopes (both JWT and opaque tokens) # JWT tokens have scopes embedded in payload # Opaque tokens get scopes via introspection endpoint # Claude Code now properly respects PRM endpoint for scope discovery if user_scopes: allowed_tools = [ tool for tool in all_tools if has_required_scopes(tool.fn, user_scopes) ] token_type = "JWT" if is_jwt else "Bearer" logger.info( f"✂️ {token_type} scope filtering: {len(allowed_tools)}/{len(all_tools)} tools " f"available for scopes: {user_scopes}" ) else: # BasicAuth mode or no token - show all tools allowed_tools = all_tools logger.info( f"📋 Showing all {len(all_tools)} tools (no token/BasicAuth)" ) # Return the Tool objects directly (they're already in the correct format) return allowed_tools # Replace the tool manager's list_tools method mcp._tool_manager.list_tools = list_tools_filtered # type: ignore[method-assign] logger.info( "Dynamic tool filtering enabled for OAuth mode (JWT and Bearer tokens)" ) mcp_app = mcp.streamable_http_app() @asynccontextmanager async def starlette_lifespan(app: Starlette): # Set OAuth context for OAuth login routes (ADR-004) if oauth_enabled: # Prepare OAuth config from setup_oauth_config closure variables mcp_server_url = os.getenv( "NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000" ) nextcloud_resource_uri = os.getenv("NEXTCLOUD_RESOURCE_URI", nextcloud_host) discovery_url = os.getenv( "OIDC_DISCOVERY_URL", f"{nextcloud_host}/.well-known/openid-configuration", ) scopes = os.getenv("NEXTCLOUD_OIDC_SCOPES", "") oauth_context_dict = { "storage": refresh_token_storage, "oauth_client": oauth_client, "token_verifier": token_verifier, # For querying IdP userinfo endpoint "config": { "mcp_server_url": mcp_server_url, "discovery_url": discovery_url, "client_id": client_id, # From setup_oauth_config (DCR or static) "client_secret": client_secret, # From setup_oauth_config (DCR or static) "scopes": scopes, "nextcloud_host": nextcloud_host, "nextcloud_resource_uri": nextcloud_resource_uri, "oauth_provider": oauth_provider, }, } app.state.oauth_context = oauth_context_dict # Also set oauth_context on browser_app for session authentication # browser_app is in the same function scope (defined later in create_app) # We need to find it in the mounted routes for route in app.routes: if isinstance(route, Mount) and route.path == "/app": browser_app = cast(Starlette, route.app) browser_app.state.oauth_context = oauth_context_dict logger.info( "OAuth context shared with browser_app for session auth" ) break logger.info( f"OAuth context initialized for login routes (client_id={client_id[:16]}...)" ) else: # BasicAuth mode - share storage with browser_app for webhook management from nextcloud_mcp_server.auth.storage import RefreshTokenStorage storage = RefreshTokenStorage.from_env() await storage.initialize() app.state.storage = storage # Also share with browser_app for webhook routes for route in app.routes: if isinstance(route, Mount) and route.path == "/app": browser_app = cast(Starlette, route.app) browser_app.state.storage = storage logger.info( "Storage shared with browser_app for webhook management" ) break # Start background vector sync tasks (ADR-007) # Scanner runs at server-level (once), not per-session import anyio as anyio_module settings = get_settings() # Check if vector sync is enabled and determine the mode enable_offline_access_for_sync = os.getenv( "ENABLE_OFFLINE_ACCESS", "false" ).lower() in ("true", "1", "yes") encryption_key = os.getenv("TOKEN_ENCRYPTION_KEY") if settings.vector_sync_enabled and not oauth_enabled: # BasicAuth mode - single user sync logger.info("Starting background vector sync tasks for BasicAuth mode") # Get username from environment username = os.getenv("NEXTCLOUD_USERNAME") if not username: raise ValueError( "NEXTCLOUD_USERNAME required for vector sync in BasicAuth mode" ) # Create client for vector sync (server-level, not per-session) client = NextcloudClient.from_env() # Initialize Qdrant collection before starting background tasks logger.info("Initializing Qdrant collection...") from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client try: await get_qdrant_client() # Triggers collection creation if needed logger.info("Qdrant collection ready") except Exception as e: logger.error(f"Failed to initialize Qdrant collection: {e}") raise RuntimeError( f"Cannot start vector sync - Qdrant initialization failed: {e}" ) from e # Initialize shared state send_stream, receive_stream = anyio_module.create_memory_object_stream( max_buffer_size=settings.vector_sync_queue_max_size ) shutdown_event = anyio_module.Event() scanner_wake_event = anyio_module.Event() # Store in app state for access from routes (ADR-007) app.state.document_send_stream = send_stream app.state.document_receive_stream = receive_stream app.state.shutdown_event = shutdown_event app.state.scanner_wake_event = scanner_wake_event # Also store in module singleton for FastMCP session lifespans _vector_sync_state.document_send_stream = send_stream _vector_sync_state.document_receive_stream = receive_stream _vector_sync_state.shutdown_event = shutdown_event _vector_sync_state.scanner_wake_event = scanner_wake_event logger.info("Vector sync state stored in module singleton") # Also share with browser_app for /app route for route in app.routes: if isinstance(route, Mount) and route.path == "/app": browser_app = cast(Starlette, route.app) browser_app.state.document_send_stream = send_stream browser_app.state.document_receive_stream = receive_stream browser_app.state.shutdown_event = shutdown_event browser_app.state.scanner_wake_event = scanner_wake_event logger.info("Vector sync state shared with browser_app for /app") break # Start background tasks using anyio TaskGroup async with anyio_module.create_task_group() as tg: # Start scanner task await tg.start( scanner_task, send_stream, shutdown_event, scanner_wake_event, client, username, ) # Start processor pool (each gets a cloned receive stream) for i in range(settings.vector_sync_processor_workers): await tg.start( processor_task, i, receive_stream.clone(), shutdown_event, client, username, ) logger.info( f"Background sync tasks started: 1 scanner + " f"{settings.vector_sync_processor_workers} processors" ) # Run MCP session manager and yield async with AsyncExitStack() as stack: await stack.enter_async_context(mcp.session_manager.run()) try: yield finally: # Shutdown signal logger.info("Shutting down background sync tasks") shutdown_event.set() await client.close() # TaskGroup automatically cancels all tasks on exit elif ( settings.vector_sync_enabled and oauth_enabled and enable_offline_access_for_sync and refresh_token_storage and encryption_key ): # OAuth mode with offline access - multi-user sync logger.info("Starting background vector sync tasks for OAuth mode") from nextcloud_mcp_server.auth.token_broker import TokenBrokerService from nextcloud_mcp_server.vector.oauth_sync import ( oauth_processor_task, user_manager_task, ) # Get OIDC discovery URL (same as used for OAuth setup) discovery_url = os.getenv( "OIDC_DISCOVERY_URL", f"{nextcloud_host}/.well-known/openid-configuration", ) # Get client credentials from oauth_context (set by setup_oauth_config) # This includes credentials from DCR if dynamic registration was used # Use different variable names to avoid shadowing client_id/client_secret from outer scope oauth_ctx = getattr(app.state, "oauth_context", {}) oauth_config = oauth_ctx.get("config", {}) sync_client_id = oauth_config.get("client_id") sync_client_secret = oauth_config.get("client_secret") if not sync_client_id or not sync_client_secret: logger.error( "Cannot start OAuth vector sync: client credentials not found in oauth_context" ) raise ValueError("OAuth client credentials required for vector sync") # Create token broker for background operations # Note: storage handles encryption internally, no key needed here # Client credentials are needed for token refresh operations token_broker = TokenBrokerService( storage=refresh_token_storage, oidc_discovery_url=discovery_url, nextcloud_host=nextcloud_host, client_id=sync_client_id, client_secret=sync_client_secret, ) # Store token broker in oauth_context for management API (revoke endpoint) if hasattr(app.state, "oauth_context"): app.state.oauth_context["token_broker"] = token_broker logger.info("Token broker added to oauth_context for management API") # Initialize Qdrant collection before starting background tasks logger.info("Initializing Qdrant collection...") from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client try: await get_qdrant_client() # Triggers collection creation if needed logger.info("Qdrant collection ready") except Exception as e: logger.error(f"Failed to initialize Qdrant collection: {e}") raise RuntimeError( f"Cannot start vector sync - Qdrant initialization failed: {e}" ) from e # Initialize shared state send_stream, receive_stream = anyio_module.create_memory_object_stream( max_buffer_size=settings.vector_sync_queue_max_size ) shutdown_event = anyio_module.Event() scanner_wake_event = anyio_module.Event() # User state tracking for user manager user_states: dict = {} # Store in app state for access from routes (ADR-007) app.state.document_send_stream = send_stream app.state.document_receive_stream = receive_stream app.state.shutdown_event = shutdown_event app.state.scanner_wake_event = scanner_wake_event # Also store in module singleton for FastMCP session lifespans _vector_sync_state.document_send_stream = send_stream _vector_sync_state.document_receive_stream = receive_stream _vector_sync_state.shutdown_event = shutdown_event _vector_sync_state.scanner_wake_event = scanner_wake_event logger.info("Vector sync state stored in module singleton") # Also share with browser_app for /app route for route in app.routes: if isinstance(route, Mount) and route.path == "/app": browser_app = cast(Starlette, route.app) browser_app.state.document_send_stream = send_stream browser_app.state.document_receive_stream = receive_stream browser_app.state.shutdown_event = shutdown_event browser_app.state.scanner_wake_event = scanner_wake_event logger.info("Vector sync state shared with browser_app for /app") break # Start background tasks using anyio TaskGroup async with anyio_module.create_task_group() as tg: # Start user manager task (supervises per-user scanners) await tg.start( user_manager_task, send_stream, shutdown_event, scanner_wake_event, token_broker, refresh_token_storage, nextcloud_host, user_states, tg, ) # Start processor pool (each gets a cloned receive stream) for i in range(settings.vector_sync_processor_workers): await tg.start( oauth_processor_task, i, receive_stream.clone(), shutdown_event, token_broker, nextcloud_host, ) logger.info( f"Background sync tasks started: 1 user manager + " f"{settings.vector_sync_processor_workers} processors" ) # Run MCP session manager and yield async with AsyncExitStack() as stack: await stack.enter_async_context(mcp.session_manager.run()) try: yield finally: # Shutdown signal logger.info("Shutting down background sync tasks") shutdown_event.set() # Close token broker HTTP client if token_broker._http_client: await token_broker._http_client.aclose() # TaskGroup automatically cancels all tasks on exit else: # No vector sync - just run MCP session manager if settings.vector_sync_enabled: # Log why vector sync is not starting if oauth_enabled and not enable_offline_access_for_sync: logger.warning( "Vector sync enabled but ENABLE_OFFLINE_ACCESS=false - " "vector sync requires offline access in OAuth mode" ) elif oauth_enabled and not refresh_token_storage: logger.warning( "Vector sync enabled but refresh token storage not available" ) elif oauth_enabled and not encryption_key: logger.warning( "Vector sync enabled but TOKEN_ENCRYPTION_KEY not set" ) async with AsyncExitStack() as stack: await stack.enter_async_context(mcp.session_manager.run()) yield # Health check endpoints for Kubernetes probes def health_live(request): """Liveness probe endpoint. Returns 200 OK if the application process is running. This is a simple check that doesn't verify external dependencies. """ return JSONResponse( { "status": "alive", "mode": "oauth" if oauth_enabled else "basic", } ) async def health_ready(request): """Readiness probe endpoint. Returns 200 OK if the application is ready to serve traffic. Checks that required configuration is present and Qdrant if vector sync enabled. """ checks = {} is_ready = True # Check Nextcloud host configuration and connectivity nextcloud_host = os.getenv("NEXTCLOUD_HOST") if nextcloud_host: checks["nextcloud_configured"] = "ok" # Try to connect to Nextcloud start_time = time.time() try: async with httpx.AsyncClient(timeout=2.0) as client: response = await client.get(f"{nextcloud_host}/status.php") duration = time.time() - start_time if response.status_code == 200: checks["nextcloud_reachable"] = "ok" set_dependency_health("nextcloud", True) else: checks["nextcloud_reachable"] = ( f"error: status {response.status_code}" ) set_dependency_health("nextcloud", False) is_ready = False record_dependency_check("nextcloud", duration) except Exception as e: duration = time.time() - start_time checks["nextcloud_reachable"] = f"error: {str(e)}" set_dependency_health("nextcloud", False) record_dependency_check("nextcloud", duration) is_ready = False else: checks["nextcloud_configured"] = "error: NEXTCLOUD_HOST not set" set_dependency_health("nextcloud", False) is_ready = False # Check authentication configuration if oauth_enabled: # OAuth mode - just verify we got this far (token_verifier initialized in lifespan) checks["auth_mode"] = "oauth" checks["auth_configured"] = "ok" else: # BasicAuth mode - verify credentials are set username = os.getenv("NEXTCLOUD_USERNAME") password = os.getenv("NEXTCLOUD_PASSWORD") if username and password: checks["auth_mode"] = "basic" checks["auth_configured"] = "ok" else: checks["auth_mode"] = "basic" checks["auth_configured"] = "error: credentials not set" is_ready = False # Check Qdrant status if using network mode (external Qdrant service) # In-memory and persistent modes use embedded Qdrant, no external service to check vector_sync_enabled = ( os.getenv("VECTOR_SYNC_ENABLED", "false").lower() == "true" ) qdrant_url = os.getenv("QDRANT_URL") # Only set in network mode if vector_sync_enabled and qdrant_url: start_time = time.time() try: async with httpx.AsyncClient(timeout=2.0) as client: response = await client.get(f"{qdrant_url}/readyz") duration = time.time() - start_time if response.status_code == 200: checks["qdrant"] = "ok" set_dependency_health("qdrant", True) else: checks["qdrant"] = f"error: status {response.status_code}" set_dependency_health("qdrant", False) is_ready = False record_dependency_check("qdrant", duration) except Exception as e: duration = time.time() - start_time checks["qdrant"] = f"error: {str(e)}" set_dependency_health("qdrant", False) record_dependency_check("qdrant", duration) is_ready = False elif vector_sync_enabled: # Using embedded Qdrant (memory or persistent mode) checks["qdrant"] = "embedded" set_dependency_health("qdrant", True) status_code = 200 if is_ready else 503 return JSONResponse( { "status": "ready" if is_ready else "not_ready", "checks": checks, }, status_code=status_code, ) async def handle_nextcloud_webhook(request): """Test webhook endpoint to capture and log Nextcloud webhook payloads. This is a temporary endpoint for testing webhook schemas and payloads. It logs the full payload and returns 200 OK immediately. """ import json try: payload = await request.json() logger.info("=" * 80) logger.info("🔔 Webhook received from Nextcloud:") logger.info(json.dumps(payload, indent=2, sort_keys=True)) logger.info("=" * 80) return JSONResponse( {"status": "received", "timestamp": payload.get("time")}, status_code=200, ) except Exception as e: logger.error(f"❌ Failed to parse webhook payload: {e}") return JSONResponse( {"error": "invalid_payload", "message": str(e)}, status_code=400 ) # Add Protected Resource Metadata (PRM) endpoint for OAuth mode routes = [] # Add health check routes (available in both OAuth and BasicAuth modes) routes.append(Route("/health/live", health_live, methods=["GET"])) routes.append(Route("/health/ready", health_ready, methods=["GET"])) logger.info("Health check endpoints enabled: /health/live, /health/ready") # Add test webhook endpoint (for development/testing) routes.append( Route("/webhooks/nextcloud", handle_nextcloud_webhook, methods=["POST"]) ) logger.info("Test webhook endpoint enabled: /webhooks/nextcloud") # Add management API endpoints for Nextcloud PHP app (OAuth mode only) if oauth_enabled: from nextcloud_mcp_server.api.management import ( create_webhook, delete_webhook, get_chunk_context, get_installed_apps, get_server_status, get_user_session, get_vector_sync_status, list_webhooks, revoke_user_access, unified_search, vector_search, ) routes.append(Route("/api/v1/status", get_server_status, methods=["GET"])) routes.append( Route( "/api/v1/vector-sync/status", get_vector_sync_status, methods=["GET"], ) ) routes.append( Route( "/api/v1/users/{user_id}/session", get_user_session, methods=["GET"], ) ) routes.append( Route( "/api/v1/users/{user_id}/revoke", revoke_user_access, methods=["POST"], ) ) routes.append( Route("/api/v1/vector-viz/search", vector_search, methods=["POST"]) ) routes.append( Route("/api/v1/chunk-context", get_chunk_context, methods=["GET"]) ) # ADR-018: Unified search endpoint for Nextcloud PHP app integration routes.append(Route("/api/v1/search", unified_search, methods=["POST"])) routes.append(Route("/api/v1/apps", get_installed_apps, methods=["GET"])) # Webhook management endpoints routes.append(Route("/api/v1/webhooks", list_webhooks, methods=["GET"])) routes.append(Route("/api/v1/webhooks", create_webhook, methods=["POST"])) routes.append( Route("/api/v1/webhooks/{webhook_id}", delete_webhook, methods=["DELETE"]) ) logger.info( "Management API endpoints enabled: /api/v1/status, /api/v1/vector-sync/status, " "/api/v1/users/{user_id}/session, /api/v1/users/{user_id}/revoke, " "/api/v1/vector-viz/search, /api/v1/search, /api/v1/apps, " "/api/v1/webhooks" ) # ADR-016: Add Smithery well-known config endpoint for container runtime discovery if deployment_mode == DeploymentMode.SMITHERY_STATELESS: def smithery_mcp_config(request): """Smithery MCP configuration endpoint. Returns JSON Schema for Smithery's configuration UI. This endpoint is required for Smithery container runtime discovery. """ return JSONResponse(SMITHERY_CONFIG_SCHEMA) routes.append( Route( "/.well-known/mcp-config", smithery_mcp_config, methods=["GET"], ) ) logger.info("Smithery config endpoint enabled: /.well-known/mcp-config") # Note: Metrics endpoint is NOT exposed on main HTTP port for security reasons. # Metrics are served on dedicated port via setup_metrics() (default: 9090) if oauth_enabled: # Import OAuth routes (ADR-004 Progressive Consent) from nextcloud_mcp_server.auth.oauth_routes import oauth_authorize def oauth_protected_resource_metadata(request): """RFC 9728 Protected Resource Metadata endpoint. Dynamically discovers supported scopes from registered MCP tools. This ensures the advertised scopes always match the actual tool requirements. The 'resource' field is set to the MCP server's public URL (RFC 9728 requires a URL). This is used as the audience in access tokens via the resource parameter (RFC 8707). The introspection controller matches this URL to the MCP server's client via resource_url field. """ # Use PUBLIC_ISSUER_URL for authorization server since external clients # (like Claude) need the publicly accessible URL, not internal Docker URLs public_issuer_url = os.getenv("NEXTCLOUD_PUBLIC_ISSUER_URL") if not public_issuer_url: # Fallback to NEXTCLOUD_HOST if PUBLIC_ISSUER_URL not set public_issuer_url = os.getenv("NEXTCLOUD_HOST", "") # RFC 9728 requires resource to be a URL (not a client ID) # Use the MCP server's public URL mcp_server_url = os.getenv("NEXTCLOUD_MCP_SERVER_URL") if not mcp_server_url: # Fallback to constructing from host and port mcp_server_url = f"http://localhost:{os.getenv('PORT', '8000')}" # Dynamically discover all scopes from registered tools # This provides a single source of truth based on @require_scopes decorators supported_scopes = discover_all_scopes(mcp) return JSONResponse( { "resource": f"{mcp_server_url}/mcp", # RFC 9728: must be a URL "scopes_supported": supported_scopes, "authorization_servers": [public_issuer_url], "bearer_methods_supported": ["header"], "resource_signing_alg_values_supported": ["RS256"], } ) # Register PRM endpoint at both path-based and root locations per RFC 9728 # Path-based discovery: /.well-known/oauth-protected-resource{path} routes.append( Route( "/.well-known/oauth-protected-resource/mcp", oauth_protected_resource_metadata, methods=["GET"], ) ) # Root discovery (fallback): /.well-known/oauth-protected-resource routes.append( Route( "/.well-known/oauth-protected-resource", oauth_protected_resource_metadata, methods=["GET"], ) ) logger.info( "Protected Resource Metadata (PRM) endpoints enabled (path-based + root)" ) # Add OAuth login routes (ADR-004 Progressive Consent Flow 1) routes.append(Route("/oauth/authorize", oauth_authorize, methods=["GET"])) logger.info("OAuth login routes enabled: /oauth/authorize (Flow 1)") # Add unified OAuth callback endpoint supporting both flows from nextcloud_mcp_server.auth.oauth_routes import ( oauth_authorize_nextcloud, oauth_callback, oauth_callback_nextcloud, ) routes.append(Route("/oauth/callback", oauth_callback, methods=["GET"])) logger.info( "OAuth unified callback enabled: /oauth/callback?flow={browser|provisioning}" ) # Add OAuth resource provisioning routes (ADR-004 Progressive Consent Flow 2) routes.append( Route( "/oauth/authorize-nextcloud", oauth_authorize_nextcloud, methods=["GET"], ) ) # Keep old callback endpoint as backwards-compatible alias routes.append( Route( "/oauth/callback-nextcloud", oauth_callback_nextcloud, methods=["GET"], ) ) logger.info( "OAuth resource provisioning routes enabled: /oauth/authorize-nextcloud, /oauth/callback-nextcloud (Flow 2, legacy)" ) # Add browser OAuth login routes (OAuth mode only) if oauth_enabled: from nextcloud_mcp_server.auth.browser_oauth_routes import ( oauth_login, oauth_login_callback, oauth_logout, ) routes.append( Route("/oauth/login", oauth_login, methods=["GET"], name="oauth_login") ) # Keep old callback endpoint as backwards-compatible alias routes.append( Route( "/oauth/login-callback", oauth_login_callback, methods=["GET"], name="oauth_login_callback", ) ) routes.append( Route("/oauth/logout", oauth_logout, methods=["GET"], name="oauth_logout") ) logger.info( "Browser OAuth routes enabled: /oauth/login, /oauth/login-callback (legacy), /oauth/logout" ) # Add user info routes (available in both BasicAuth and OAuth modes) # ADR-016: Skip /app admin UI in Smithery stateless mode (no vector sync, webhooks) if deployment_mode != DeploymentMode.SMITHERY_STATELESS: # These require session authentication, so we wrap them in a separate app from nextcloud_mcp_server.auth.session_backend import SessionAuthBackend from nextcloud_mcp_server.auth.userinfo_routes import ( revoke_session, user_info_html, vector_sync_status_fragment, ) from nextcloud_mcp_server.auth.viz_routes import ( chunk_context_endpoint, vector_visualization_html, vector_visualization_search, ) from nextcloud_mcp_server.auth.webhook_routes import ( disable_webhook_preset, enable_webhook_preset, webhook_management_pane, ) # Create a separate Starlette app for browser routes that need session auth # This prevents SessionAuthBackend from interfering with FastMCP's OAuth browser_routes = [ Route( "/", user_info_html, methods=["GET"] ), # /app → user info with all tabs Route( "/revoke", revoke_session, methods=["POST"], name="revoke_session_endpoint", ), # /app/revoke → revoke_session # Vector sync status fragment (htmx polling) Route( "/vector-sync/status", vector_sync_status_fragment, methods=["GET"], ), # /app/vector-sync/status # Vector visualization routes Route( "/vector-viz", vector_visualization_html, methods=["GET"] ), # /app/vector-viz Route( "/vector-viz/search", vector_visualization_search, methods=["GET"], ), # /app/vector-viz/search Route( "/chunk-context", chunk_context_endpoint, methods=["GET"], ), # /app/chunk-context # Webhook management routes (admin-only) Route( "/webhooks", webhook_management_pane, methods=["GET"] ), # /app/webhooks Route( "/webhooks/enable/{preset_id:str}", enable_webhook_preset, methods=["POST"], ), Route( "/webhooks/disable/{preset_id:str}", disable_webhook_preset, methods=["DELETE"], ), ] # Add static files mount if directory exists static_dir = os.path.join(os.path.dirname(__file__), "auth", "static") if os.path.isdir(static_dir): browser_routes.append( Mount("/static", StaticFiles(directory=static_dir), name="static") ) logger.info(f"Mounted static files from {static_dir}") browser_app = Starlette(routes=browser_routes) browser_app.add_middleware( AuthenticationMiddleware, # type: ignore[invalid-argument-type] backend=SessionAuthBackend(oauth_enabled=oauth_enabled), ) # Add redirect from /app to /app/ (Starlette requires trailing slash for mounted apps) routes.append( Route("/app", lambda request: RedirectResponse("/app/", status_code=307)) ) # Mount browser app at /app (webapp and admin routes) routes.append(Mount("/app", app=browser_app)) logger.info("App routes with session auth: /app, /app/webhooks, /app/revoke") else: logger.info("Admin UI (/app) disabled in Smithery stateless mode") # Mount FastMCP at root last (catch-all, handles OAuth via token_verifier) routes.append(Mount("/", app=mcp_app)) app = Starlette(routes=routes, lifespan=starlette_lifespan) logger.info( "Routes: /user/* with SessionAuth, /mcp with FastMCP OAuth Bearer tokens" ) # Add debugging middleware to log Authorization headers and client capabilities @app.middleware("http") async def log_auth_headers(request, call_next): auth_header = request.headers.get("authorization") if request.url.path.startswith("/mcp"): if auth_header: # Log first 50 chars of token for debugging token_preview = ( auth_header[:50] + "..." if len(auth_header) > 50 else auth_header ) logger.info(f"🔑 /mcp request with Authorization: {token_preview}") else: # Only warn about missing Authorization in OAuth mode # In BasicAuth mode, /mcp requests without Authorization are expected if oauth_enabled: logger.warning( f"⚠️ /mcp request WITHOUT Authorization header from {request.client}" ) # Log client capabilities on initialize request if request.method == "POST": # Read body to check for initialize request # Starlette caches the body internally, so it's safe to read here body = await request.body() try: import json data = json.loads(body) # Check if this is an initialize request if data.get("method") == "initialize": params = data.get("params", {}) capabilities = params.get("capabilities", {}) client_info = params.get("clientInfo", {}) logger.info( f"🔌 MCP client connected: {client_info.get('name', 'unknown')} " f"v{client_info.get('version', 'unknown')}" ) # Log capabilities in a structured way cap_summary = [] # Check for presence using 'in' not truthiness (empty dict {} counts as having capability) if "roots" in capabilities: cap_summary.append("roots") if "sampling" in capabilities: cap_summary.append("sampling") if "experimental" in capabilities: cap_summary.append( f"experimental({len(capabilities['experimental'])} features)" ) logger.info( f"📋 Client capabilities: {', '.join(cap_summary) if cap_summary else 'none'}" ) # Log full capabilities at INFO level to diagnose capability issues logger.info( f"Full capabilities JSON: {json.dumps(capabilities)}" ) except Exception as e: # Don't fail the request if logging fails logger.debug( f"Failed to parse MCP request for capability logging: {e}" ) response = await call_next(request) return response # Add CORS middleware to allow browser-based clients like MCP Inspector app.add_middleware( CORSMiddleware, # type: ignore[invalid-argument-type] allow_origins=["*"], # Allow all origins for development allow_credentials=True, allow_methods=["*"], allow_headers=["*"], expose_headers=["*"], ) # Add observability middleware (metrics + tracing) if settings.metrics_enabled or settings.otel_exporter_otlp_endpoint: app.add_middleware(ObservabilityMiddleware) # type: ignore[invalid-argument-type] logger.info("Observability middleware enabled (metrics and/or tracing)") # Add exception handler for scope challenges (OAuth mode only) if oauth_enabled: @app.exception_handler(InsufficientScopeError) async def handle_insufficient_scope(request, exc: InsufficientScopeError): """Return 403 with WWW-Authenticate header for scope challenges.""" resource_url = os.getenv( "NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000" ) scope_str = " ".join(exc.missing_scopes) return JSONResponse( status_code=403, headers={ "WWW-Authenticate": ( f'Bearer error="insufficient_scope", ' f'scope="{scope_str}", ' f'resource_metadata="{resource_url}/.well-known/oauth-protected-resource/mcp"' ) }, content={ "error": "insufficient_scope", "scopes_required": exc.missing_scopes, }, ) logger.info("WWW-Authenticate scope challenge handler enabled") # ADR-016: Apply SmitheryConfigMiddleware in Smithery stateless mode # This must be the outermost middleware to extract config from URL query parameters # before any other middleware processes the request if deployment_mode == DeploymentMode.SMITHERY_STATELESS: app = SmitheryConfigMiddleware(app) logger.info("SmitheryConfigMiddleware enabled for query parameter config") return app

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/cbcoutinho/nextcloud-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server