"""
Persistent Storage for MCP Server State
This module provides SQLite-based storage for multiple concerns across both
BasicAuth and OAuth authentication modes:
1. **Refresh Tokens** (OAuth mode only, for background jobs)
- Securely stores encrypted refresh tokens for offline access
- Used ONLY by background jobs to obtain access tokens
- NEVER used within MCP client sessions or browser sessions
2. **User Profile Cache** (OAuth mode only, for browser UI display)
- Caches IdP user profile data for browser-based admin UI
- Queried ONCE at login, displayed from cache thereafter
- NOT used for authorization decisions or background jobs
3. **Webhook Registration Tracking** (both modes, for webhook management)
- Tracks registered webhook IDs mapped to presets
- Enables persistent webhook state across restarts
- Avoids redundant Nextcloud API calls for webhook status
IMPORTANT: The database is initialized in both BasicAuth and OAuth modes.
Token storage requires TOKEN_ENCRYPTION_KEY, but webhook tracking does not.
Sensitive data (tokens, secrets) is encrypted at rest using Fernet symmetric encryption.
"""
import json
import logging
import os
import time
from pathlib import Path
from typing import Any, Optional
import aiosqlite
from cryptography.fernet import Fernet
from nextcloud_mcp_server.observability.metrics import record_db_operation
logger = logging.getLogger(__name__)
class RefreshTokenStorage:
"""Persistent storage for MCP server state (tokens, webhooks, and future features).
This class manages multiple concerns across both BasicAuth and OAuth modes:
**OAuth-specific concerns**:
- Refresh tokens: Encrypted storage for background job access (requires encryption key)
- User profiles: Plain JSON cache for browser UI display
- OAuth client credentials: Encrypted client secrets from DCR
- OAuth sessions: Temporary session state for progressive consent flow
**Both modes**:
- Webhook registration: Track registered webhooks mapped to presets
- Schema versioning: Handle database migrations automatically
Token-related operations require TOKEN_ENCRYPTION_KEY, but webhook operations do not.
"""
def __init__(self, db_path: str, encryption_key: bytes | None = None):
"""
Initialize persistent storage.
Args:
db_path: Path to SQLite database file
encryption_key: Optional Fernet encryption key (32 bytes, base64-encoded).
Required for token storage operations, not required for webhook tracking.
"""
self.db_path = db_path
self.cipher = Fernet(encryption_key) if encryption_key else None
self._initialized = False
@classmethod
def from_env(cls) -> "RefreshTokenStorage":
"""
Create storage instance from environment variables.
Environment variables:
TOKEN_STORAGE_DB: Path to database file (default: /app/data/tokens.db)
TOKEN_ENCRYPTION_KEY: Optional base64-encoded Fernet key (required for token storage)
Returns:
RefreshTokenStorage instance
Note:
If TOKEN_ENCRYPTION_KEY is not set, token storage operations will fail,
but webhook tracking will still work.
"""
db_path = os.getenv("TOKEN_STORAGE_DB", "/app/data/tokens.db")
encryption_key_b64 = os.getenv("TOKEN_ENCRYPTION_KEY")
encryption_key = None
if encryption_key_b64:
# Fernet expects a base64url-encoded key as bytes, not decoded bytes
# The key from Fernet.generate_key() is already base64url-encoded
try:
# Convert string to bytes if needed
if isinstance(encryption_key_b64, str):
encryption_key = encryption_key_b64.encode()
else:
encryption_key = encryption_key_b64
# Validate the key by trying to create a Fernet instance
Fernet(encryption_key)
except Exception as e:
raise ValueError(
f"Invalid TOKEN_ENCRYPTION_KEY: {e}. "
"Must be a valid Fernet key (base64url-encoded 32 bytes)."
) from e
else:
logger.info(
"TOKEN_ENCRYPTION_KEY not set - token storage operations will be unavailable, "
"but webhook tracking will still work"
)
return cls(db_path=db_path, encryption_key=encryption_key)
async def initialize(self) -> None:
"""
Initialize database schema using Alembic migrations.
This method handles three scenarios:
1. New database: Run migrations from scratch
2. Pre-Alembic database: Stamp with initial revision (no changes)
3. Alembic-managed database: Upgrade to latest version
"""
if self._initialized:
return
# Ensure directory exists
db_dir = Path(self.db_path).parent
db_dir.mkdir(parents=True, exist_ok=True)
# Set restrictive permissions on database file if it exists
if Path(self.db_path).exists():
os.chmod(self.db_path, 0o600)
# Check database state and run appropriate migration strategy
async with aiosqlite.connect(self.db_path) as db:
# Check if database is managed by Alembic
cursor = await db.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='alembic_version'"
)
has_alembic = await cursor.fetchone() is not None
if not has_alembic:
# Check if this is a pre-Alembic database with existing schema
cursor = await db.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='refresh_tokens'"
)
has_schema = await cursor.fetchone() is not None
if has_schema:
logger.info(
f"Detected pre-Alembic database at {self.db_path}, "
"stamping with initial revision"
)
else:
logger.info(
f"Initializing new database at {self.db_path} with migrations"
)
# Run migrations in a worker thread using anyio.to_thread
# This allows Alembic to run its own async operations in a separate context
from anyio import to_thread
from nextcloud_mcp_server.migrations import stamp_database, upgrade_database
if not has_alembic:
if has_schema:
# Stamp existing database without running migrations
await to_thread.run_sync(stamp_database, self.db_path, "001")
logger.info(
"Pre-Alembic database stamped successfully. "
"Future schema changes will use migrations."
)
else:
# New database - run migrations
await to_thread.run_sync(upgrade_database, self.db_path, "head")
logger.info("Database initialized with migrations")
else:
# Alembic-managed database - upgrade to latest
await to_thread.run_sync(upgrade_database, self.db_path, "head")
logger.info("Database upgraded to latest version")
# Set restrictive permissions after initialization
os.chmod(self.db_path, 0o600)
self._initialized = True
logger.info(f"Initialized refresh token storage at {self.db_path}")
async def store_refresh_token(
self,
user_id: str,
refresh_token: str,
expires_at: Optional[int] = None,
flow_type: str = "hybrid",
token_audience: str = "nextcloud",
provisioning_client_id: Optional[str] = None,
scopes: Optional[list[str]] = None,
) -> None:
"""
Store encrypted refresh token for user.
Args:
user_id: User identifier (from OIDC 'sub' claim)
refresh_token: Refresh token to store
expires_at: Token expiration timestamp (Unix epoch), if known
flow_type: Type of flow ('hybrid', 'flow1', 'flow2')
token_audience: Token audience ('mcp-server' or 'nextcloud')
provisioning_client_id: Client ID that initiated Flow 1
scopes: List of granted scopes
"""
if not self._initialized:
await self.initialize()
# Type narrowing: cipher is set after initialize()
assert self.cipher is not None
encrypted_token = self.cipher.encrypt(refresh_token.encode())
now = int(time.time())
scopes_json = json.dumps(scopes) if scopes else None
# For Flow 2, set provisioned_at timestamp
provisioned_at = now if flow_type == "flow2" else None
start_time = time.time()
try:
async with aiosqlite.connect(self.db_path) as db:
await db.execute(
"""
INSERT OR REPLACE INTO refresh_tokens
(user_id, encrypted_token, expires_at, created_at, updated_at,
flow_type, token_audience, provisioned_at, provisioning_client_id, scopes)
VALUES (?, ?, ?, COALESCE((SELECT created_at FROM refresh_tokens WHERE user_id = ?), ?), ?,
?, ?, ?, ?, ?)
""",
(
user_id,
encrypted_token,
expires_at,
user_id,
now,
now,
flow_type,
token_audience,
provisioned_at,
provisioning_client_id,
scopes_json,
),
)
await db.commit()
duration = time.time() - start_time
record_db_operation("sqlite", "insert", duration, "success")
logger.info(
f"Stored refresh token for user {user_id}"
+ (f" (expires at {expires_at})" if expires_at else "")
)
except Exception:
duration = time.time() - start_time
record_db_operation("sqlite", "insert", duration, "error")
raise
# Audit log
await self._audit_log(
event="store_refresh_token",
user_id=user_id,
auth_method="offline_access",
)
async def store_user_profile(
self, user_id: str, profile_data: dict[str, Any]
) -> None:
"""
Store user profile data (cached from IdP userinfo endpoint).
This profile is cached ONLY for browser UI display purposes, not for
authorization decisions. Background jobs should NOT rely on this data.
Args:
user_id: User identifier (must match refresh_tokens.user_id)
profile_data: User profile dict from IdP userinfo endpoint
"""
if not self._initialized:
await self.initialize()
profile_json = json.dumps(profile_data)
now = int(time.time())
async with aiosqlite.connect(self.db_path) as db:
await db.execute(
"""
UPDATE refresh_tokens
SET user_profile = ?, profile_cached_at = ?
WHERE user_id = ?
""",
(profile_json, now, user_id),
)
await db.commit()
logger.debug(f"Cached user profile for {user_id}")
async def get_user_profile(self, user_id: str) -> Optional[dict[str, Any]]:
"""
Retrieve cached user profile data.
This returns cached profile data from the initial OAuth login,
NOT fresh data from the IdP. Use this for browser UI display only.
Args:
user_id: User identifier
Returns:
User profile dict or None if not cached
"""
if not self._initialized:
await self.initialize()
async with aiosqlite.connect(self.db_path) as db:
async with db.execute(
"""
SELECT user_profile, profile_cached_at
FROM refresh_tokens
WHERE user_id = ?
""",
(user_id,),
) as cursor:
row = await cursor.fetchone()
if not row or not row[0]:
return None
profile_json, cached_at = row
profile_data = json.loads(profile_json)
# Optionally add cache metadata
profile_data["_cached_at"] = cached_at
return profile_data
async def get_refresh_token(self, user_id: str) -> Optional[dict]:
"""
Retrieve and decrypt refresh token for user.
Args:
user_id: User identifier
Returns:
Dictionary with token data including ADR-004 fields:
{
"refresh_token": str,
"expires_at": int | None,
"flow_type": str,
"token_audience": str,
"provisioned_at": int | None,
"provisioning_client_id": str | None,
"scopes": list[str] | None
}
or None if not found or expired
"""
if not self._initialized:
await self.initialize()
# Type narrowing: cipher is set after initialize()
assert self.cipher is not None
start_time = time.time()
try:
async with aiosqlite.connect(self.db_path) as db:
async with db.execute(
"""
SELECT encrypted_token, expires_at, flow_type, token_audience,
provisioned_at, provisioning_client_id, scopes
FROM refresh_tokens WHERE user_id = ?
""",
(user_id,),
) as cursor:
row = await cursor.fetchone()
if not row:
logger.debug(f"No refresh token found for user {user_id}")
duration = time.time() - start_time
record_db_operation("sqlite", "select", duration, "success")
return None
(
encrypted_token,
expires_at,
flow_type,
token_audience,
provisioned_at,
provisioning_client_id,
scopes_json,
) = row
# Check expiration
if expires_at is not None and expires_at < time.time():
logger.warning(
f"Refresh token for user {user_id} has expired (expired at {expires_at})"
)
await self.delete_refresh_token(user_id)
duration = time.time() - start_time
record_db_operation("sqlite", "select", duration, "success")
return None
decrypted_token = self.cipher.decrypt(encrypted_token).decode()
scopes = json.loads(scopes_json) if scopes_json else None
logger.debug(
f"Retrieved refresh token for user {user_id} (flow_type: {flow_type})"
)
duration = time.time() - start_time
record_db_operation("sqlite", "select", duration, "success")
return {
"refresh_token": decrypted_token,
"expires_at": expires_at,
"flow_type": flow_type or "hybrid", # Default for existing tokens
"token_audience": token_audience
or "nextcloud", # Default for existing tokens
"provisioned_at": provisioned_at,
"provisioning_client_id": provisioning_client_id,
"scopes": scopes,
}
except Exception as e:
duration = time.time() - start_time
record_db_operation("sqlite", "select", duration, "error")
logger.error(f"Failed to decrypt refresh token for user {user_id}: {e}")
return None
async def get_refresh_token_by_provisioning_client_id(
self, provisioning_client_id: str
) -> Optional[dict]:
"""
Retrieve and decrypt refresh token by provisioning_client_id (state parameter).
This is used to check if an OAuth Flow 2 login completed successfully
by looking up the refresh token using the state parameter that was generated
during the authorization request.
Args:
provisioning_client_id: OAuth state parameter from the authorization request
Returns:
Dictionary with token data or None if not found
"""
if not self._initialized:
await self.initialize()
# Type narrowing: cipher is set after initialize()
assert self.cipher is not None
async with aiosqlite.connect(self.db_path) as db:
async with db.execute(
"""
SELECT user_id, encrypted_token, expires_at, flow_type, token_audience,
provisioned_at, provisioning_client_id, scopes
FROM refresh_tokens WHERE provisioning_client_id = ?
""",
(provisioning_client_id,),
) as cursor:
row = await cursor.fetchone()
if not row:
logger.debug(
f"No refresh token found for provisioning_client_id {provisioning_client_id[:16]}..."
)
return None
(
user_id,
encrypted_token,
expires_at,
flow_type,
token_audience,
provisioned_at,
prov_client_id,
scopes_json,
) = row
# Check expiration
if expires_at is not None and expires_at < time.time():
logger.warning(
f"Refresh token for provisioning_client_id {provisioning_client_id[:16]}... has expired"
)
return None
try:
decrypted_token = self.cipher.decrypt(encrypted_token).decode()
scopes = json.loads(scopes_json) if scopes_json else None
logger.debug(
f"Retrieved refresh token for provisioning_client_id {provisioning_client_id[:16]}... (user_id: {user_id})"
)
return {
"user_id": user_id,
"refresh_token": decrypted_token,
"expires_at": expires_at,
"flow_type": flow_type or "hybrid",
"token_audience": token_audience or "nextcloud",
"provisioned_at": provisioned_at,
"provisioning_client_id": prov_client_id,
"scopes": scopes,
}
except Exception as e:
logger.error(
f"Failed to decrypt refresh token for provisioning_client_id {provisioning_client_id[:16]}...: {e}"
)
return None
async def delete_refresh_token(self, user_id: str) -> bool:
"""
Delete refresh token for user.
Args:
user_id: User identifier
Returns:
True if token was deleted, False if not found
"""
if not self._initialized:
await self.initialize()
start_time = time.time()
try:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"DELETE FROM refresh_tokens WHERE user_id = ?",
(user_id,),
)
await db.commit()
deleted = cursor.rowcount > 0
duration = time.time() - start_time
record_db_operation("sqlite", "delete", duration, "success")
if deleted:
logger.info(f"Deleted refresh token for user {user_id}")
await self._audit_log(
event="delete_refresh_token",
user_id=user_id,
auth_method="offline_access",
)
else:
logger.debug(f"No refresh token to delete for user {user_id}")
return deleted
except Exception:
duration = time.time() - start_time
record_db_operation("sqlite", "delete", duration, "error")
raise
async def get_all_user_ids(self) -> list[str]:
"""
Get list of all user IDs with stored refresh tokens.
Returns:
List of user IDs
"""
if not self._initialized:
await self.initialize()
async with aiosqlite.connect(self.db_path) as db:
async with db.execute(
"SELECT user_id FROM refresh_tokens ORDER BY updated_at DESC"
) as cursor:
rows = await cursor.fetchall()
user_ids = [row[0] for row in rows]
logger.debug(f"Found {len(user_ids)} users with refresh tokens")
return user_ids
async def cleanup_expired_tokens(self) -> int:
"""
Remove expired refresh tokens from storage.
Returns:
Number of tokens deleted
"""
if not self._initialized:
await self.initialize()
now = int(time.time())
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"DELETE FROM refresh_tokens WHERE expires_at IS NOT NULL AND expires_at < ?",
(now,),
)
await db.commit()
deleted = cursor.rowcount
if deleted > 0:
logger.info(f"Cleaned up {deleted} expired refresh token(s)")
return deleted
async def store_oauth_client(
self,
client_id: str,
client_secret: str,
client_id_issued_at: int,
client_secret_expires_at: int,
redirect_uris: list[str],
registration_access_token: Optional[str] = None,
registration_client_uri: Optional[str] = None,
) -> None:
"""
Store encrypted OAuth client credentials.
Args:
client_id: OAuth client identifier
client_secret: OAuth client secret (will be encrypted)
client_id_issued_at: Unix timestamp when client was issued
client_secret_expires_at: Unix timestamp when secret expires
redirect_uris: List of redirect URIs
registration_access_token: RFC 7592 registration token (will be encrypted)
registration_client_uri: RFC 7592 client management URI
"""
if not self._initialized:
await self.initialize()
# Type narrowing: cipher is set after initialize()
assert self.cipher is not None
# Encrypt sensitive data
encrypted_secret = self.cipher.encrypt(client_secret.encode())
encrypted_reg_token = (
self.cipher.encrypt(registration_access_token.encode())
if registration_access_token
else None
)
# Serialize redirect_uris as JSON
redirect_uris_json = json.dumps(redirect_uris)
now = int(time.time())
async with aiosqlite.connect(self.db_path) as db:
await db.execute(
"""
INSERT OR REPLACE INTO oauth_clients
(id, client_id, encrypted_client_secret, client_id_issued_at,
client_secret_expires_at, redirect_uris, encrypted_registration_access_token,
registration_client_uri, created_at, updated_at)
VALUES (
1, ?, ?, ?, ?, ?, ?, ?,
COALESCE((SELECT created_at FROM oauth_clients WHERE id = 1), ?),
?
)
""",
(
client_id,
encrypted_secret,
client_id_issued_at,
client_secret_expires_at,
redirect_uris_json,
encrypted_reg_token,
registration_client_uri,
now,
now,
),
)
await db.commit()
logger.info(
f"Stored OAuth client credentials (client_id: {client_id[:16]}..., "
f"expires at {client_secret_expires_at})"
)
# Audit log
await self._audit_log(
event="store_oauth_client",
user_id="system",
auth_method="oauth",
)
async def get_oauth_client(self) -> Optional[dict]:
"""
Retrieve and decrypt OAuth client credentials.
Returns:
Dictionary with client credentials, or None if not found or expired:
{
"client_id": str,
"client_secret": str,
"client_id_issued_at": int,
"client_secret_expires_at": int,
"redirect_uris": list[str],
"registration_access_token": str | None,
"registration_client_uri": str | None,
}
"""
if not self._initialized:
await self.initialize()
# Type narrowing: cipher is set after initialize()
assert self.cipher is not None
async with aiosqlite.connect(self.db_path) as db:
async with db.execute(
"""
SELECT client_id, encrypted_client_secret, client_id_issued_at,
client_secret_expires_at, redirect_uris,
encrypted_registration_access_token, registration_client_uri
FROM oauth_clients WHERE id = 1
"""
) as cursor:
row = await cursor.fetchone()
if not row:
logger.debug("No OAuth client credentials found in storage")
return None
(
client_id,
encrypted_secret,
issued_at,
expires_at,
redirect_uris_json,
encrypted_reg_token,
reg_client_uri,
) = row
# Check expiration
if expires_at < time.time():
logger.warning(
f"OAuth client has expired (expired at {expires_at}), deleting"
)
await self.delete_oauth_client()
return None
try:
# Decrypt sensitive data
client_secret = self.cipher.decrypt(encrypted_secret).decode()
reg_token = (
self.cipher.decrypt(encrypted_reg_token).decode()
if encrypted_reg_token
else None
)
# Deserialize redirect_uris
redirect_uris = json.loads(redirect_uris_json)
logger.debug(
f"Retrieved OAuth client credentials (client_id: {client_id[:16]}...)"
)
return {
"client_id": client_id,
"client_secret": client_secret,
"client_id_issued_at": issued_at,
"client_secret_expires_at": expires_at,
"redirect_uris": redirect_uris,
"registration_access_token": reg_token,
"registration_client_uri": reg_client_uri,
}
except Exception as e:
logger.error(f"Failed to decrypt OAuth client credentials: {e}")
return None
async def delete_oauth_client(self) -> bool:
"""
Delete OAuth client credentials.
Returns:
True if client was deleted, False if not found
"""
if not self._initialized:
await self.initialize()
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute("DELETE FROM oauth_clients WHERE id = 1")
await db.commit()
deleted = cursor.rowcount > 0
if deleted:
logger.info("Deleted OAuth client credentials from storage")
await self._audit_log(
event="delete_oauth_client",
user_id="system",
auth_method="oauth",
)
else:
logger.debug("No OAuth client credentials to delete")
return deleted
async def has_oauth_client(self) -> bool:
"""
Check if OAuth client credentials exist (and are not expired).
Returns:
True if valid client exists, False otherwise
"""
if not self._initialized:
await self.initialize()
async with aiosqlite.connect(self.db_path) as db:
async with db.execute(
"SELECT client_secret_expires_at FROM oauth_clients WHERE id = 1"
) as cursor:
row = await cursor.fetchone()
if not row:
return False
expires_at = row[0]
return expires_at >= time.time()
async def _audit_log(
self,
event: str,
user_id: str,
resource_type: Optional[str] = None,
resource_id: Optional[str] = None,
auth_method: Optional[str] = None,
) -> None:
"""
Log operation to audit log.
Args:
event: Event name (e.g., "store_refresh_token", "token_refresh")
user_id: User identifier
resource_type: Resource type (e.g., "note", "file")
resource_id: Resource identifier
auth_method: Authentication method used
"""
import socket
hostname = socket.gethostname()
timestamp = int(time.time())
async with aiosqlite.connect(self.db_path) as db:
await db.execute(
"""
INSERT INTO audit_logs
(timestamp, event, user_id, resource_type, resource_id, auth_method, hostname)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
timestamp,
event,
user_id,
resource_type,
resource_id,
auth_method,
hostname,
),
)
await db.commit()
async def get_audit_logs(
self,
user_id: Optional[str] = None,
since: Optional[int] = None,
limit: int = 100,
) -> list[dict]:
"""
Retrieve audit logs.
Args:
user_id: Filter by user ID (optional)
since: Filter by timestamp (Unix epoch, optional)
limit: Maximum number of logs to return
Returns:
List of audit log entries
"""
if not self._initialized:
await self.initialize()
query = "SELECT * FROM audit_logs WHERE 1=1"
params = []
if user_id:
query += " AND user_id = ?"
params.append(user_id)
if since:
query += " AND timestamp >= ?"
params.append(since)
query += " ORDER BY timestamp DESC LIMIT ?"
params.append(limit)
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute(query, params) as cursor:
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def store_oauth_session(
self,
session_id: str,
client_redirect_uri: str,
state: Optional[str] = None,
code_challenge: Optional[str] = None,
code_challenge_method: Optional[str] = None,
mcp_authorization_code: Optional[str] = None,
client_id: Optional[str] = None,
flow_type: str = "hybrid",
is_provisioning: bool = False,
requested_scopes: Optional[str] = None,
ttl_seconds: int = 600, # 10 minutes
) -> None:
"""
Store OAuth session for ADR-004 Progressive Consent.
Args:
session_id: Unique session identifier
client_redirect_uri: Client's localhost redirect URI
state: CSRF protection state parameter
code_challenge: PKCE code challenge
code_challenge_method: PKCE method (S256)
mcp_authorization_code: Pre-generated MCP authorization code
client_id: Client identifier (for Flow 1)
flow_type: Type of flow ('hybrid', 'flow1', 'flow2')
is_provisioning: Whether this is a Flow 2 provisioning session
requested_scopes: Requested OAuth scopes
ttl_seconds: Session TTL in seconds
"""
if not self._initialized:
await self.initialize()
now = int(time.time())
expires_at = now + ttl_seconds
async with aiosqlite.connect(self.db_path) as db:
await db.execute(
"""
INSERT INTO oauth_sessions
(session_id, client_id, client_redirect_uri, state, code_challenge,
code_challenge_method, mcp_authorization_code, flow_type,
is_provisioning, requested_scopes, created_at, expires_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
session_id,
client_id,
client_redirect_uri,
state,
code_challenge,
code_challenge_method,
mcp_authorization_code,
flow_type,
is_provisioning,
requested_scopes,
now,
expires_at,
),
)
await db.commit()
logger.debug(f"Stored OAuth session {session_id} (expires in {ttl_seconds}s)")
async def get_oauth_session(self, session_id: str) -> Optional[dict]:
"""
Retrieve OAuth session by session ID.
Returns:
Session dictionary or None if not found/expired
"""
if not self._initialized:
await self.initialize()
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"SELECT * FROM oauth_sessions WHERE session_id = ?", (session_id,)
) as cursor:
row = await cursor.fetchone()
if not row:
return None
session = dict(row)
# Check expiration
if session["expires_at"] < time.time():
logger.debug(f"OAuth session {session_id} has expired")
await self.delete_oauth_session(session_id)
return None
return session
async def get_oauth_session_by_mcp_code(
self, mcp_authorization_code: str
) -> Optional[dict]:
"""
Retrieve OAuth session by MCP authorization code.
Returns:
Session dictionary or None if not found/expired
"""
if not self._initialized:
await self.initialize()
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"SELECT * FROM oauth_sessions WHERE mcp_authorization_code = ?",
(mcp_authorization_code,),
) as cursor:
row = await cursor.fetchone()
if not row:
return None
session = dict(row)
# Check expiration
if session["expires_at"] < time.time():
logger.debug(
f"OAuth session with MCP code {mcp_authorization_code[:16]}... has expired"
)
await self.delete_oauth_session(session["session_id"])
return None
return session
async def update_oauth_session(
self,
session_id: str,
user_id: Optional[str] = None,
idp_access_token: Optional[str] = None,
idp_refresh_token: Optional[str] = None,
) -> bool:
"""
Update OAuth session with IdP token data.
Returns:
True if session was updated, False if not found
"""
if not self._initialized:
await self.initialize()
update_fields = []
params = []
if user_id is not None:
update_fields.append("user_id = ?")
params.append(user_id)
if idp_access_token is not None:
update_fields.append("idp_access_token = ?")
params.append(idp_access_token)
if idp_refresh_token is not None:
update_fields.append("idp_refresh_token = ?")
params.append(idp_refresh_token)
if not update_fields:
return False
params.append(session_id)
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
f"""
UPDATE oauth_sessions
SET {", ".join(update_fields)}
WHERE session_id = ?
""",
params,
)
await db.commit()
updated = cursor.rowcount > 0
if updated:
logger.debug(f"Updated OAuth session {session_id}")
return updated
async def delete_oauth_session(self, session_id: str) -> bool:
"""
Delete OAuth session.
Returns:
True if session was deleted, False if not found
"""
if not self._initialized:
await self.initialize()
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"DELETE FROM oauth_sessions WHERE session_id = ?", (session_id,)
)
await db.commit()
deleted = cursor.rowcount > 0
if deleted:
logger.debug(f"Deleted OAuth session {session_id}")
return deleted
async def cleanup_expired_sessions(self) -> int:
"""
Remove expired OAuth sessions from storage.
Returns:
Number of sessions deleted
"""
if not self._initialized:
await self.initialize()
now = int(time.time())
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"DELETE FROM oauth_sessions WHERE expires_at < ?", (now,)
)
await db.commit()
deleted = cursor.rowcount
if deleted > 0:
logger.info(f"Cleaned up {deleted} expired OAuth session(s)")
return deleted
# ============================================================================
# Webhook Registration Tracking (both BasicAuth and OAuth modes)
# ============================================================================
async def store_webhook(self, webhook_id: int, preset_id: str) -> None:
"""
Store registered webhook ID for tracking.
Args:
webhook_id: Nextcloud webhook ID
preset_id: Preset identifier (e.g., "notes_sync", "calendar_sync")
"""
if not self._initialized:
await self.initialize()
async with aiosqlite.connect(self.db_path) as db:
await db.execute(
"INSERT OR REPLACE INTO registered_webhooks (webhook_id, preset_id, created_at) VALUES (?, ?, ?)",
(webhook_id, preset_id, time.time()),
)
await db.commit()
logger.debug(f"Stored webhook {webhook_id} for preset '{preset_id}'")
async def get_webhooks_by_preset(self, preset_id: str) -> list[int]:
"""
Get all webhook IDs registered for a preset.
Args:
preset_id: Preset identifier
Returns:
List of webhook IDs
"""
if not self._initialized:
await self.initialize()
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"SELECT webhook_id FROM registered_webhooks WHERE preset_id = ?",
(preset_id,),
)
rows = await cursor.fetchall()
return [row[0] for row in rows]
async def delete_webhook(self, webhook_id: int) -> bool:
"""
Remove webhook from tracking.
Args:
webhook_id: Nextcloud webhook ID to remove
Returns:
True if webhook was deleted, False if not found
"""
if not self._initialized:
await self.initialize()
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"DELETE FROM registered_webhooks WHERE webhook_id = ?", (webhook_id,)
)
await db.commit()
deleted = cursor.rowcount > 0
if deleted:
logger.debug(f"Deleted webhook {webhook_id} from tracking")
return deleted
async def list_all_webhooks(self) -> list[dict]:
"""
List all tracked webhooks with metadata.
Returns:
List of webhook dictionaries with keys: webhook_id, preset_id, created_at
"""
if not self._initialized:
await self.initialize()
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"SELECT webhook_id, preset_id, created_at FROM registered_webhooks ORDER BY created_at DESC"
)
rows = await cursor.fetchall()
return [
{"webhook_id": row[0], "preset_id": row[1], "created_at": row[2]}
for row in rows
]
async def clear_preset_webhooks(self, preset_id: str) -> int:
"""
Delete all webhooks for a preset (bulk operation).
Args:
preset_id: Preset identifier
Returns:
Number of webhooks deleted
"""
if not self._initialized:
await self.initialize()
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"DELETE FROM registered_webhooks WHERE preset_id = ?", (preset_id,)
)
await db.commit()
deleted = cursor.rowcount
if deleted > 0:
logger.debug(f"Cleared {deleted} webhook(s) for preset '{preset_id}'")
return deleted
async def generate_encryption_key() -> str:
"""
Generate a new Fernet encryption key.
Returns:
Base64-encoded encryption key suitable for TOKEN_ENCRYPTION_KEY env var
"""
return Fernet.generate_key().decode()
# Example usage
if __name__ == "__main__":
import anyio
async def main():
# Generate a key for testing
key = await generate_encryption_key()
print(f"Generated encryption key: {key}")
print(f"Set this in your environment: export TOKEN_ENCRYPTION_KEY='{key}'")
anyio.run(main)