"""
Tests for Dynamic Client Registration (DCR) lifecycle - register and delete.
These tests verify the complete lifecycle of DCR clients:
1. Registration via RFC 7591
2. Token acquisition and use
3. Deletion via RFC 7592
4. Error handling for deletion edge cases
This is critical for ensuring the fixture cleanup code works reliably.
"""
import logging
import os
import secrets
import time
from urllib.parse import quote
import anyio
import httpx
import pytest
from nextcloud_mcp_server.auth.client_registration import delete_client, register_client
from ...conftest import _handle_oauth_consent_screen
logger = logging.getLogger(__name__)
pytestmark = [pytest.mark.integration, pytest.mark.oauth]
async def get_oauth_token_with_client(
browser,
client_id: str,
client_secret: str,
token_endpoint: str,
authorization_endpoint: str,
callback_url: str,
auth_states: dict,
scopes: str = "openid profile email notes:read notes:write",
) -> str:
"""
Helper to obtain OAuth access token using existing client credentials.
Args:
browser: Playwright browser instance
client_id: OAuth client ID
client_secret: OAuth client secret
token_endpoint: Token endpoint URL
authorization_endpoint: Authorization endpoint URL
callback_url: Callback URL for OAuth redirect
auth_states: Dict for storing auth codes (from callback server)
scopes: Space-separated list of scopes to request
Returns:
Access token string
"""
nextcloud_host = os.getenv("NEXTCLOUD_HOST")
username = os.getenv("NEXTCLOUD_USERNAME")
password = os.getenv("NEXTCLOUD_PASSWORD")
if not all([nextcloud_host, username, password]):
pytest.skip(
"OAuth requires NEXTCLOUD_HOST, NEXTCLOUD_USERNAME, and NEXTCLOUD_PASSWORD"
)
# Generate unique state parameter
state = secrets.token_urlsafe(32)
# URL-encode scopes
scopes_encoded = quote(scopes, safe="")
# Construct authorization URL
auth_url = (
f"{authorization_endpoint}?"
f"response_type=code&"
f"client_id={client_id}&"
f"redirect_uri={quote(callback_url, safe='')}&"
f"state={state}&"
f"scope={scopes_encoded}"
)
# Browser automation
context = await browser.new_context(ignore_https_errors=True)
page = await context.new_page()
try:
await page.goto(auth_url, wait_until="networkidle", timeout=60000)
current_url = page.url
# Login if needed
if "/login" in current_url or "/index.php/login" in current_url:
logger.info("Logging in for DCR lifecycle test...")
await page.wait_for_selector('input[name="user"]', timeout=10000)
await page.fill('input[name="user"]', username)
await page.fill('input[name="password"]', password)
await page.click('button[type="submit"]')
await page.wait_for_load_state("networkidle", timeout=60000)
# Handle consent screen if present
try:
await _handle_oauth_consent_screen(page, username)
except Exception as e:
logger.debug(f"No consent screen or already authorized: {e}")
# Wait for callback
logger.info("Waiting for OAuth callback...")
timeout_seconds = 30
start_time = time.time()
while state not in auth_states:
if time.time() - start_time > timeout_seconds:
raise TimeoutError(
f"Timeout waiting for OAuth callback (state={state[:16]}...)"
)
await anyio.sleep(0.5)
auth_code = auth_states[state]
logger.info(f"Got auth code: {auth_code[:20]}...")
finally:
await context.close()
# Exchange code for token
logger.info("Exchanging authorization code for access token...")
async with httpx.AsyncClient(timeout=30.0) as http_client:
token_response = await http_client.post(
token_endpoint,
data={
"grant_type": "authorization_code",
"code": auth_code,
"redirect_uri": callback_url,
"client_id": client_id,
"client_secret": client_secret,
},
)
token_response.raise_for_status()
token_data = token_response.json()
access_token = token_data.get("access_token")
if not access_token:
raise ValueError(f"No access_token in response: {token_data}")
logger.info("Successfully obtained access token")
return access_token
@pytest.mark.integration
async def test_dcr_register_and_delete_lifecycle(
anyio_backend,
browser,
oauth_callback_server,
):
"""
Test the complete DCR lifecycle: register → use → delete.
This verifies:
1. Client registration succeeds
2. Client can obtain tokens and make API calls
3. Client deletion succeeds (returns 204)
4. Deleted client cannot be used again (tokens are revoked)
"""
nextcloud_host = os.getenv("NEXTCLOUD_HOST")
if not nextcloud_host:
pytest.skip("Test requires NEXTCLOUD_HOST")
auth_states, callback_url = oauth_callback_server
# Discover OIDC endpoints
async with httpx.AsyncClient(timeout=30.0) as client:
discovery_url = f"{nextcloud_host}/.well-known/openid-configuration"
discovery_response = await client.get(discovery_url)
discovery_response.raise_for_status()
oidc_config = discovery_response.json()
registration_endpoint = oidc_config.get("registration_endpoint")
token_endpoint = oidc_config.get("token_endpoint")
authorization_endpoint = oidc_config.get("authorization_endpoint")
# Step 1: Register client (and capture full response including registration_access_token)
logger.info("Step 1: Registering OAuth client...")
# Register manually to capture full response
client_metadata = {
"client_name": "DCR Lifecycle Test Client",
"redirect_uris": [callback_url],
"token_endpoint_auth_method": "client_secret_post",
"grant_types": ["authorization_code", "refresh_token"],
"response_types": ["code"],
"scope": "openid profile email notes:read",
"token_type": "Bearer",
}
async with httpx.AsyncClient(timeout=30.0) as reg_client:
reg_response = await reg_client.post(
registration_endpoint,
json=client_metadata,
headers={"Content-Type": "application/json"},
)
reg_response.raise_for_status()
full_client_info = reg_response.json()
logger.info(f"Full registration response keys: {list(full_client_info.keys())}")
logger.info(f"Registration response: {full_client_info}")
# Use the register_client function for the ClientInfo object
client_info = await register_client(
nextcloud_url=nextcloud_host,
registration_endpoint=registration_endpoint,
client_name="DCR Lifecycle Test Client 2",
redirect_uris=[callback_url],
scopes="openid profile email notes:read",
token_type="Bearer",
)
# Store RFC 7592 fields if present
registration_access_token = full_client_info.get("registration_access_token")
registration_client_uri = full_client_info.get("registration_client_uri")
logger.info(
f"Registration access token present: {registration_access_token is not None}"
)
logger.info(
f"Registration client URI present: {registration_client_uri is not None}"
)
logger.info(f"✅ Client registered: {client_info.client_id[:16]}...")
# Step 2: Obtain token and verify client works
logger.info("Step 2: Obtaining OAuth token with registered client...")
access_token = await get_oauth_token_with_client(
browser=browser,
client_id=client_info.client_id,
client_secret=client_info.client_secret,
token_endpoint=token_endpoint,
authorization_endpoint=authorization_endpoint,
callback_url=callback_url,
auth_states=auth_states,
scopes="openid profile email notes:read",
)
assert access_token, "Failed to obtain access token"
logger.info(f"✅ Access token obtained: {access_token[:30]}...")
# Step 3: Delete the client using RFC 7592
logger.info("Step 3: Deleting OAuth client...")
logger.info(f"Client ID: {client_info.client_id}")
logger.info(f"Client secret (first 16 chars): {client_info.client_secret[:16]}...")
logger.info(
f"Registration access token: {registration_access_token[:16] if registration_access_token else 'None'}..."
)
# Use delete_client() which prefers RFC 7592 Bearer token, falls back to Basic Auth
success = await delete_client(
nextcloud_url=nextcloud_host,
client_id=client_info.client_id,
registration_access_token=registration_access_token,
client_secret=client_info.client_secret,
registration_client_uri=registration_client_uri,
)
assert success, (
"Client deletion should succeed with RFC 7592 Bearer token or Basic Auth"
)
logger.info(f"✅ Client deleted successfully: {client_info.client_id[:16]}...")
# Step 4: Verify deleted client cannot obtain new tokens
logger.info("Step 4: Verifying deleted client cannot obtain new tokens...")
# Try to use the deleted client to get a token
# This should fail because the client no longer exists
async with httpx.AsyncClient(timeout=30.0) as http_client:
try:
# Try to use client credentials grant (should fail)
token_response = await http_client.post(
token_endpoint,
data={
"grant_type": "client_credentials",
"client_id": client_info.client_id,
"client_secret": client_info.client_secret,
},
)
# If we get here, check the status code
# Accept either 400 (Bad Request) or 401 (Unauthorized) as valid rejection
if token_response.status_code in [400, 401]:
logger.info(
f"✅ Deleted client correctly rejected ({token_response.status_code})"
)
else:
# Unexpected success - client should be deleted
pytest.fail(
f"Deleted client should not be able to obtain tokens, "
f"but got status {token_response.status_code}"
)
except httpx.HTTPStatusError as e:
# Expected - client should be rejected
if e.response.status_code == 401:
logger.info("✅ Deleted client correctly rejected (401 Unauthorized)")
else:
# Re-raise if it's a different error
raise
logger.info("✅ Complete DCR lifecycle test passed!")
@pytest.mark.integration
async def test_dcr_delete_with_wrong_credentials(
anyio_backend,
oauth_callback_server,
):
"""
Test that deletion fails with wrong registration_access_token (401 Unauthorized).
This verifies:
1. Client registration succeeds and returns registration_access_token
2. Deletion with wrong registration_access_token returns 401
3. Deletion with correct registration_access_token succeeds (RFC 7592)
"""
nextcloud_host = os.getenv("NEXTCLOUD_HOST")
if not nextcloud_host:
pytest.skip("Test requires NEXTCLOUD_HOST")
auth_states, callback_url = oauth_callback_server
# Discover OIDC endpoints
async with httpx.AsyncClient(timeout=30.0) as client:
discovery_url = f"{nextcloud_host}/.well-known/openid-configuration"
discovery_response = await client.get(discovery_url)
discovery_response.raise_for_status()
oidc_config = discovery_response.json()
registration_endpoint = oidc_config.get("registration_endpoint")
# Register client
logger.info("Registering OAuth client for credential test...")
client_info = await register_client(
nextcloud_url=nextcloud_host,
registration_endpoint=registration_endpoint,
client_name="DCR Wrong Credentials Test",
redirect_uris=[callback_url],
scopes="openid profile email",
token_type="Bearer",
)
logger.info(f"Client registered: {client_info.client_id[:16]}...")
# Try to delete with wrong registration_access_token (RFC 7592 Bearer token)
logger.info("Attempting deletion with wrong registration_access_token...")
wrong_token = "wrong_token_" + secrets.token_urlsafe(32)
success = await delete_client(
nextcloud_url=nextcloud_host,
client_id=client_info.client_id,
registration_access_token=wrong_token,
client_secret=client_info.client_secret, # Should not be used if token is present
)
assert not success, "Deletion with wrong credentials should fail"
logger.info("✅ Deletion correctly failed with wrong credentials")
# Clean up: Delete with correct RFC 7592 Bearer token
logger.info("Cleaning up: deleting with correct registration_access_token...")
success = await delete_client(
nextcloud_url=nextcloud_host,
client_id=client_info.client_id,
registration_access_token=client_info.registration_access_token,
client_secret=client_info.client_secret,
registration_client_uri=client_info.registration_client_uri,
)
assert success, "Deletion with correct credentials should succeed"
logger.info("✅ Cleanup successful with correct credentials")
@pytest.mark.integration
async def test_dcr_delete_nonexistent_client(
anyio_backend,
):
"""
Test that deleting a non-existent client fails gracefully.
This verifies:
1. Deletion of fake client_id returns False (not 204)
2. No exceptions are raised (graceful failure)
"""
nextcloud_host = os.getenv("NEXTCLOUD_HOST")
if not nextcloud_host:
pytest.skip("Test requires NEXTCLOUD_HOST")
# Try to delete a client that doesn't exist
fake_client_id = "nonexistent_" + secrets.token_urlsafe(16)
fake_client_secret = secrets.token_urlsafe(32)
logger.info(f"Attempting to delete non-existent client: {fake_client_id[:16]}...")
success = await delete_client(
nextcloud_url=nextcloud_host,
client_id=fake_client_id,
client_secret=fake_client_secret,
)
assert not success, "Deletion of non-existent client should fail"
logger.info("✅ Non-existent client deletion correctly failed")
@pytest.mark.integration
async def test_dcr_deletion_is_idempotent(
anyio_backend,
oauth_callback_server,
):
"""
Test that deleting the same client twice fails gracefully on second attempt.
This verifies:
1. First deletion succeeds (204)
2. Second deletion fails gracefully (returns False, not an exception)
"""
nextcloud_host = os.getenv("NEXTCLOUD_HOST")
if not nextcloud_host:
pytest.skip("Test requires NEXTCLOUD_HOST")
auth_states, callback_url = oauth_callback_server
# Discover OIDC endpoints
async with httpx.AsyncClient(timeout=30.0) as client:
discovery_url = f"{nextcloud_host}/.well-known/openid-configuration"
discovery_response = await client.get(discovery_url)
discovery_response.raise_for_status()
oidc_config = discovery_response.json()
registration_endpoint = oidc_config.get("registration_endpoint")
# Register client
logger.info("Registering OAuth client for idempotency test...")
client_info = await register_client(
nextcloud_url=nextcloud_host,
registration_endpoint=registration_endpoint,
client_name="DCR Idempotency Test",
redirect_uris=[callback_url],
scopes="openid profile email",
token_type="Bearer",
)
logger.info(f"Client registered: {client_info.client_id[:16]}...")
# First deletion with RFC 7592 Bearer token
logger.info("First deletion attempt...")
success = await delete_client(
nextcloud_url=nextcloud_host,
client_id=client_info.client_id,
registration_access_token=client_info.registration_access_token,
client_secret=client_info.client_secret,
registration_client_uri=client_info.registration_client_uri,
)
assert success, "First deletion should succeed"
logger.info("✅ First deletion succeeded")
# Second deletion (should fail gracefully - token no longer valid after first deletion)
logger.info("Second deletion attempt (should fail)...")
success = await delete_client(
nextcloud_url=nextcloud_host,
client_id=client_info.client_id,
registration_access_token=client_info.registration_access_token,
client_secret=client_info.client_secret,
registration_client_uri=client_info.registration_client_uri,
)
assert not success, "Second deletion should fail (client already deleted)"
logger.info("✅ Second deletion correctly failed (client already deleted)")