"""
TTG Scratchpad MCP Server - Database Operations
MongoDB persistence layer with user isolation.
"""
import os
import logging
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional
from contextlib import asynccontextmanager
from bson import ObjectId
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase
logger = logging.getLogger(__name__)
# TTL for workspaces (7 days)
WORKSPACE_TTL_DAYS = 7
# ============================================================
# DATABASE LIFESPAN MANAGEMENT
# ============================================================
@asynccontextmanager
async def database_lifespan(app):
"""Lifespan context manager for database connection."""
mongo_uri = os.environ.get("MONGODB_URI")
if not mongo_uri:
logger.warning("MONGODB_URI not set - database features disabled")
yield {"db": None}
return
# Connect to MongoDB
client = AsyncIOMotorClient(
mongo_uri,
serverSelectionTimeoutMS=5000,
connectTimeoutMS=5000,
maxPoolSize=50,
minPoolSize=5,
)
# Get database
db = client["ttg-workspaces"]
# Verify connection
try:
await client.admin.command('ping')
logger.info("Connected to MongoDB (ttg-workspaces)")
except Exception as e:
logger.error(f"Failed to connect to MongoDB: {e}")
raise
# Create indexes
await create_indexes(db)
try:
yield {"db": db}
finally:
client.close()
logger.info("Closed MongoDB connection")
async def create_indexes(db: AsyncIOMotorDatabase):
"""Create database indexes for efficient queries."""
try:
# Workspaces indexes
await db.workspaces.create_index(
[("user_id", 1), ("status", 1)],
name="user_status_idx"
)
await db.workspaces.create_index(
[("user_id", 1), ("updated_at", -1)],
name="user_updated_idx"
)
# TTL index for completed workspaces (7 days)
await db.workspaces.create_index(
[("updated_at", 1)],
expireAfterSeconds=WORKSPACE_TTL_DAYS * 24 * 60 * 60,
name="workspace_ttl_idx",
partialFilterExpression={"status": "complete"}
)
# Files indexes
await db.files.create_index(
[("user_id", 1), ("workspace_id", 1)],
name="user_workspace_idx"
)
await db.files.create_index(
[("workspace_id", 1), ("path", 1)],
unique=True,
name="workspace_path_unique_idx"
)
# TTL index for files (7 days)
await db.files.create_index(
[("updated_at", 1)],
expireAfterSeconds=WORKSPACE_TTL_DAYS * 24 * 60 * 60,
name="files_ttl_idx"
)
# Activity logs indexes
await db.activity_logs.create_index(
[("user_id", 1), ("timestamp", -1)],
name="user_timestamp_idx"
)
await db.activity_logs.create_index(
[("workspace_id", 1), ("timestamp", -1)],
name="workspace_timestamp_idx"
)
# TTL index for activity logs (7 days)
await db.activity_logs.create_index(
[("timestamp", 1)],
expireAfterSeconds=WORKSPACE_TTL_DAYS * 24 * 60 * 60,
name="activity_ttl_idx"
)
logger.info("Database indexes created/verified")
except Exception as e:
logger.error(f"Error creating indexes: {e}")
# ============================================================
# WORKSPACE OPERATIONS
# ============================================================
async def create_workspace(
db: AsyncIOMotorDatabase,
user_id: str,
task_description: str,
conversation_id: Optional[str] = None
) -> Dict[str, Any]:
"""Create a new workspace for a user."""
now = datetime.utcnow()
workspace = {
"user_id": user_id,
"task": task_description,
"status": "active",
"progress": {"current": 0, "total": 1},
"working_on": "Initializing...",
"conversation_id": conversation_id,
"created_at": now,
"updated_at": now
}
result = await db.workspaces.insert_one(workspace)
workspace["_id"] = result.inserted_id
return workspace
async def get_active_workspace(
db: AsyncIOMotorDatabase,
user_id: str
) -> Optional[Dict[str, Any]]:
"""Get the most recent active workspace for a user."""
return await db.workspaces.find_one(
{"user_id": user_id, "status": "active"},
sort=[("updated_at", -1)]
)
async def get_workspace_by_id(
db: AsyncIOMotorDatabase,
user_id: str,
workspace_id: str
) -> Optional[Dict[str, Any]]:
"""Get a specific workspace by ID (with user validation)."""
return await db.workspaces.find_one({
"_id": ObjectId(workspace_id),
"user_id": user_id
})
async def get_user_workspaces(
db: AsyncIOMotorDatabase,
user_id: str,
limit: int = 20,
include_completed: bool = True
) -> List[Dict[str, Any]]:
"""
Get all workspaces for a user with TTL countdown.
Returns list of workspaces with:
- id, task, status, created_at, updated_at
- ttl_remaining: seconds until auto-delete (for completed workspaces)
- files_count: number of files in workspace
"""
query = {"user_id": user_id}
if not include_completed:
query["status"] = "active"
cursor = db.workspaces.find(query).sort("updated_at", -1).limit(limit)
workspaces = await cursor.to_list(length=limit)
result = []
now = datetime.utcnow()
ttl_seconds = WORKSPACE_TTL_DAYS * 24 * 60 * 60
for ws in workspaces:
workspace_id = ws["_id"]
# Count files in workspace
files_count = await db.files.count_documents({
"workspace_id": workspace_id,
"user_id": user_id
})
# Calculate TTL remaining for completed workspaces
ttl_remaining = None
if ws.get("status") == "complete" and ws.get("updated_at"):
elapsed = (now - ws["updated_at"]).total_seconds()
ttl_remaining = max(0, ttl_seconds - elapsed)
result.append({
"id": str(workspace_id),
"task": ws.get("task", ""),
"status": ws.get("status", "unknown"),
"created_at": ws.get("created_at"),
"updated_at": ws.get("updated_at"),
"files_count": files_count,
"ttl_remaining": ttl_remaining,
"progress": ws.get("progress")
})
return result
async def update_workspace(
db: AsyncIOMotorDatabase,
user_id: str,
workspace_id: str,
updates: Dict[str, Any]
) -> bool:
"""Update a workspace."""
updates["updated_at"] = datetime.utcnow()
result = await db.workspaces.update_one(
{"_id": ObjectId(workspace_id), "user_id": user_id},
{"$set": updates}
)
return result.modified_count > 0
async def complete_workspace(
db: AsyncIOMotorDatabase,
user_id: str,
workspace_id: str,
cleanup: bool = False
) -> bool:
"""
Mark a workspace as complete.
Args:
cleanup: If True, immediately delete files and logs (default: False, rely on TTL)
"""
success = await update_workspace(
db, user_id, workspace_id,
{"status": "complete", "progress": None, "working_on": None}
)
if success and cleanup:
await cleanup_workspace(db, user_id, workspace_id)
return success
async def cleanup_workspace(
db: AsyncIOMotorDatabase,
user_id: str,
workspace_id: str
) -> Dict[str, int]:
"""Delete all files and activity logs for a workspace (immediate cleanup)."""
workspace_oid = ObjectId(workspace_id)
# Delete files
files_result = await db.files.delete_many({
"workspace_id": workspace_oid,
"user_id": user_id
})
# Delete activity logs
logs_result = await db.activity_logs.delete_many({
"workspace_id": workspace_oid,
"user_id": user_id
})
return {
"files_deleted": files_result.deleted_count,
"logs_deleted": logs_result.deleted_count
}
async def delete_workspace(
db: AsyncIOMotorDatabase,
user_id: str,
workspace_id: str
) -> bool:
"""Delete a workspace and all associated data."""
workspace_oid = ObjectId(workspace_id)
# Delete files
await db.files.delete_many({
"workspace_id": workspace_oid,
"user_id": user_id
})
# Delete activity logs
await db.activity_logs.delete_many({
"workspace_id": workspace_oid,
"user_id": user_id
})
# Delete workspace
result = await db.workspaces.delete_one({
"_id": workspace_oid,
"user_id": user_id
})
return result.deleted_count > 0
# ============================================================
# FILE OPERATIONS
# ============================================================
async def create_or_update_file(
db: AsyncIOMotorDatabase,
user_id: str,
workspace_id: str,
name: str,
path: str,
content: str,
file_type: str = "file"
) -> Dict[str, Any]:
"""Create or update a file in the workspace."""
now = datetime.utcnow()
# Build update fields (WITHOUT created_at - that goes in $setOnInsert)
update_fields = {
"user_id": user_id,
"workspace_id": ObjectId(workspace_id),
"name": name,
"path": path,
"type": file_type,
"content": content,
"updated": True,
"updated_at": now
}
# Upsert based on workspace + path
# created_at only set on insert (new documents)
result = await db.files.update_one(
{"workspace_id": ObjectId(workspace_id), "path": path, "user_id": user_id},
{
"$set": update_fields,
"$setOnInsert": {"created_at": now}
},
upsert=True
)
# Build return document
file_doc = {**update_fields, "created_at": now}
if result.upserted_id:
file_doc["_id"] = result.upserted_id
return file_doc
async def get_file(
db: AsyncIOMotorDatabase,
user_id: str,
workspace_id: str,
path: str
) -> Optional[Dict[str, Any]]:
"""Get a file by path."""
return await db.files.find_one({
"workspace_id": ObjectId(workspace_id),
"path": path,
"user_id": user_id
})
async def list_workspace_files(
db: AsyncIOMotorDatabase,
user_id: str,
workspace_id: str
) -> List[Dict[str, Any]]:
"""List all files in a workspace."""
cursor = db.files.find({
"workspace_id": ObjectId(workspace_id),
"user_id": user_id
}).sort("path", 1)
return await cursor.to_list(length=100)
async def delete_file(
db: AsyncIOMotorDatabase,
user_id: str,
workspace_id: str,
path: str
) -> bool:
"""Delete a file by path."""
result = await db.files.delete_one({
"workspace_id": ObjectId(workspace_id),
"path": path,
"user_id": user_id
})
return result.deleted_count > 0
async def clear_file_updated_flags(
db: AsyncIOMotorDatabase,
user_id: str,
workspace_id: str
):
"""Clear the 'updated' flag on all files in a workspace."""
await db.files.update_many(
{"workspace_id": ObjectId(workspace_id), "user_id": user_id},
{"$set": {"updated": False}}
)
# ============================================================
# ACTIVITY LOG OPERATIONS
# ============================================================
async def log_activity(
db: AsyncIOMotorDatabase,
user_id: str,
workspace_id: str,
action: str,
icon: str = "read"
) -> Dict[str, Any]:
"""Log an activity to the workspace."""
activity = {
"user_id": user_id,
"workspace_id": ObjectId(workspace_id),
"action": action,
"icon": icon,
"timestamp": datetime.utcnow()
}
result = await db.activity_logs.insert_one(activity)
activity["_id"] = result.inserted_id
return activity
async def get_recent_activities(
db: AsyncIOMotorDatabase,
user_id: str,
workspace_id: str,
limit: int = 10
) -> List[Dict[str, Any]]:
"""Get recent activities for a workspace."""
cursor = db.activity_logs.find({
"workspace_id": ObjectId(workspace_id),
"user_id": user_id
}).sort("timestamp", -1).limit(limit)
return await cursor.to_list(length=limit)
# ============================================================
# RESPONSE FORMATTERS
# ============================================================
def format_file_for_response(file_doc: Dict[str, Any]) -> Dict[str, Any]:
"""Format a file document for API response."""
return {
"name": file_doc.get("name", ""),
"type": file_doc.get("type", "file"),
"path": file_doc.get("path", ""),
"updated": file_doc.get("updated", False)
}
def format_activity_for_response(activity_doc: Dict[str, Any]) -> Dict[str, Any]:
"""Format an activity document for API response."""
return {
"action": activity_doc.get("action", ""),
"icon": activity_doc.get("icon", "read"),
"timestamp": activity_doc.get("timestamp", "").isoformat() if activity_doc.get("timestamp") else None
}
def format_workspace_for_list(workspace: Dict[str, Any]) -> Dict[str, Any]:
"""Format a workspace for the list workspaces response."""
ttl_remaining = workspace.get("ttl_remaining")
# Format TTL as human-readable
ttl_display = None
if ttl_remaining is not None:
days = int(ttl_remaining // (24 * 60 * 60))
hours = int((ttl_remaining % (24 * 60 * 60)) // 3600)
if days > 0:
ttl_display = f"{days}d {hours}h remaining"
elif hours > 0:
ttl_display = f"{hours}h remaining"
else:
minutes = int(ttl_remaining // 60)
ttl_display = f"{minutes}m remaining"
return {
"id": workspace.get("id"),
"task": workspace.get("task", "")[:100], # Truncate for display
"status": workspace.get("status"),
"files_count": workspace.get("files_count", 0),
"created_at": workspace.get("created_at").isoformat() if workspace.get("created_at") else None,
"updated_at": workspace.get("updated_at").isoformat() if workspace.get("updated_at") else None,
"ttl_remaining_seconds": ttl_remaining,
"ttl_display": ttl_display,
"progress": workspace.get("progress")
}