app.pyโข14.3 kB
#!/usr/bin/env python3
"""
FastAPI Web Application
Provides web-based administration interface for AnyDocs MCP server.
"""
import os
import asyncio
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from pathlib import Path
from fastapi import FastAPI, HTTPException, Depends, Request, Response
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
from ..config import get_config, get_config_manager
from ..database import DatabaseManager, get_database_manager
from ..auth import AuthManager, AuthMethod
from ..utils import get_logger, get_memory_usage
from ..server import AnyDocsMCPServer
logger = get_logger(__name__)
security = HTTPBearer(auto_error=False)
# Pydantic models for API
class ConfigUpdate(BaseModel):
"""Configuration update request."""
key: str
value: Any
save_to: Optional[str] = 'file'
class UserCreate(BaseModel):
"""User creation request."""
username: str
email: str
password: str
role: str = 'user'
class APIKeyCreate(BaseModel):
"""API key creation request."""
name: str
permissions: List[str] = []
expires_at: Optional[datetime] = None
class DocumentSourceCreate(BaseModel):
"""Document source creation request."""
name: str
adapter_type: str
config: Dict[str, Any]
enabled: bool = True
class SystemStats(BaseModel):
"""System statistics."""
uptime: str
memory_usage: Dict[str, Any]
database_stats: Dict[str, Any]
mcp_stats: Dict[str, Any]
active_connections: int
def create_app(
mcp_server: Optional[AnyDocsMCPServer] = None,
db_manager: Optional[DatabaseManager] = None,
auth_manager: Optional[AuthManager] = None
) -> FastAPI:
"""Create FastAPI application.
Args:
mcp_server: MCP server instance
db_manager: Database manager
auth_manager: Authentication manager
Returns:
FastAPI application
"""
config = get_config()
app = FastAPI(
title="AnyDocs MCP Admin",
description="Administration interface for AnyDocs MCP server",
version="1.0.0",
docs_url="/api/docs",
redoc_url="/api/redoc"
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Configure appropriately for production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Static files and templates
static_dir = Path(__file__).parent / "static"
templates_dir = Path(__file__).parent / "templates"
if static_dir.exists():
app.mount("/static", StaticFiles(directory=str(static_dir)), name="static")
templates = Jinja2Templates(directory=str(templates_dir)) if templates_dir.exists() else None
# Store instances in app state
app.state.mcp_server = mcp_server
app.state.db_manager = db_manager or get_database_manager()
app.state.auth_manager = auth_manager
app.state.start_time = datetime.utcnow()
# Authentication dependency
async def get_current_user(credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)):
"""Get current authenticated user."""
if not app.state.auth_manager or not credentials:
raise HTTPException(status_code=401, detail="Authentication required")
try:
user = await app.state.auth_manager.validate_token(credentials.credentials)
if not user:
raise HTTPException(status_code=401, detail="Invalid token")
return user
except Exception as e:
logger.error("Authentication failed", error=str(e))
raise HTTPException(status_code=401, detail="Authentication failed")
# Routes
@app.get("/", response_class=HTMLResponse)
async def dashboard(request: Request):
"""Main dashboard."""
if not templates:
return HTMLResponse("""
<html>
<head><title>AnyDocs MCP Admin</title></head>
<body>
<h1>AnyDocs MCP Administration</h1>
<p>API documentation: <a href="/api/docs">/api/docs</a></p>
</body>
</html>
""")
context = {
"request": request,
"title": "Dashboard",
"config": config.to_dict()
}
return templates.TemplateResponse("dashboard.html", context)
@app.get("/api/health")
async def health_check():
"""Health check endpoint."""
try:
# Check database
db_healthy = False
if app.state.db_manager:
db_healthy = await app.state.db_manager.health_check()
# Check MCP server
mcp_healthy = app.state.mcp_server is not None
status = "healthy" if db_healthy and mcp_healthy else "unhealthy"
return {
"status": status,
"timestamp": datetime.utcnow().isoformat(),
"components": {
"database": "healthy" if db_healthy else "unhealthy",
"mcp_server": "healthy" if mcp_healthy else "unhealthy"
}
}
except Exception as e:
logger.error("Health check failed", error=str(e))
return {
"status": "unhealthy",
"timestamp": datetime.utcnow().isoformat(),
"error": str(e)
}
@app.get("/api/stats", response_model=SystemStats)
async def get_system_stats():
"""Get system statistics."""
try:
uptime = datetime.utcnow() - app.state.start_time
memory_usage = get_memory_usage()
# Database stats
db_stats = {"status": "unknown"}
if app.state.db_manager:
try:
with app.state.db_manager.get_session() as session:
# Get table counts
from ..database.models import User, DocumentSource, Document
db_stats = {
"status": "connected",
"users": session.query(User).count(),
"document_sources": session.query(DocumentSource).count(),
"documents": session.query(Document).count()
}
except Exception as e:
db_stats = {"status": "error", "error": str(e)}
# MCP stats
mcp_stats = {"status": "unknown"}
if app.state.mcp_server:
mcp_stats = {
"status": "running",
"adapters": len(app.state.mcp_server.adapters),
"resources": 0, # Would need to implement resource counting
"tools": 2 # search_docs, get_doc_structure
}
return SystemStats(
uptime=str(uptime),
memory_usage=memory_usage,
database_stats=db_stats,
mcp_stats=mcp_stats,
active_connections=0 # Would need connection tracking
)
except Exception as e:
logger.error("Failed to get system stats", error=str(e))
raise HTTPException(status_code=500, detail="Failed to get system stats")
@app.get("/api/config")
async def get_configuration():
"""Get current configuration."""
try:
config = get_config()
return config.to_dict()
except Exception as e:
logger.error("Failed to get configuration", error=str(e))
raise HTTPException(status_code=500, detail="Failed to get configuration")
@app.post("/api/config")
async def update_configuration(
update: ConfigUpdate,
current_user=Depends(get_current_user)
):
"""Update configuration."""
try:
config_manager = get_config_manager()
if not config_manager:
raise HTTPException(status_code=500, detail="Configuration manager not available")
success = config_manager.set_setting(update.key, update.value, update.save_to)
if not success:
raise HTTPException(status_code=500, detail="Failed to update configuration")
return {"message": "Configuration updated successfully"}
except Exception as e:
logger.error("Failed to update configuration", error=str(e))
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/users")
async def list_users(current_user=Depends(get_current_user)):
"""List all users."""
try:
if not app.state.db_manager:
raise HTTPException(status_code=500, detail="Database not available")
users = app.state.db_manager.list_users()
return [{"id": u.id, "username": u.username, "email": u.email, "role": u.role.value} for u in users]
except Exception as e:
logger.error("Failed to list users", error=str(e))
raise HTTPException(status_code=500, detail="Failed to list users")
@app.post("/api/users")
async def create_user(
user_data: UserCreate,
current_user=Depends(get_current_user)
):
"""Create new user."""
try:
if not app.state.db_manager:
raise HTTPException(status_code=500, detail="Database not available")
user = app.state.db_manager.create_user(
username=user_data.username,
email=user_data.email,
password_hash=user_data.password, # Should be hashed
role=user_data.role
)
if not user:
raise HTTPException(status_code=400, detail="Failed to create user")
return {"id": user.id, "username": user.username, "email": user.email}
except Exception as e:
logger.error("Failed to create user", error=str(e))
raise HTTPException(status_code=500, detail="Failed to create user")
@app.get("/api/document-sources")
async def list_document_sources(current_user=Depends(get_current_user)):
"""List document sources."""
try:
if not app.state.db_manager:
raise HTTPException(status_code=500, detail="Database not available")
sources = app.state.db_manager.list_document_sources()
return [{
"id": s.id,
"name": s.name,
"adapter_type": s.adapter_type.value,
"enabled": s.enabled,
"last_sync": s.last_sync.isoformat() if s.last_sync else None
} for s in sources]
except Exception as e:
logger.error("Failed to list document sources", error=str(e))
raise HTTPException(status_code=500, detail="Failed to list document sources")
@app.post("/api/document-sources")
async def create_document_source(
source_data: DocumentSourceCreate,
current_user=Depends(get_current_user)
):
"""Create document source."""
try:
if not app.state.db_manager:
raise HTTPException(status_code=500, detail="Database not available")
source = app.state.db_manager.create_document_source(
name=source_data.name,
adapter_type=source_data.adapter_type,
config=source_data.config,
enabled=source_data.enabled
)
if not source:
raise HTTPException(status_code=400, detail="Failed to create document source")
return {"id": source.id, "name": source.name}
except Exception as e:
logger.error("Failed to create document source", error=str(e))
raise HTTPException(status_code=500, detail="Failed to create document source")
@app.post("/api/document-sources/{source_id}/sync")
async def sync_document_source(
source_id: int,
current_user=Depends(get_current_user)
):
"""Trigger document source synchronization."""
try:
if not app.state.mcp_server:
raise HTTPException(status_code=500, detail="MCP server not available")
# This would trigger a sync operation
# Implementation depends on how sync is handled
return {"message": "Sync triggered successfully"}
except Exception as e:
logger.error("Failed to sync document source", error=str(e))
raise HTTPException(status_code=500, detail="Failed to sync document source")
@app.get("/api/logs")
async def get_logs(
lines: int = 100,
level: Optional[str] = None,
current_user=Depends(get_current_user)
):
"""Get application logs."""
try:
# This would read from log files
# Implementation depends on logging configuration
return {"logs": [], "message": "Log reading not implemented"}
except Exception as e:
logger.error("Failed to get logs", error=str(e))
raise HTTPException(status_code=500, detail="Failed to get logs")
return app
# Create default app instance
app = create_app()
if __name__ == "__main__":
import uvicorn
config = get_config()
uvicorn.run(
"anydocs_mcp.web.app:app",
host=config.server.host,
port=config.server.port,
reload=config.server.reload,
debug=config.server.debug
)