app_setup.pyβ’19.6 kB
import asyncio
import importlib
import os
import stat
import sys
import time
from contextlib import asynccontextmanager
from pathlib import Path
import neo4j
import yaml
from dotenv import load_dotenv, set_key
from fastapi import Body, Depends, FastAPI, Form, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from fastapi.templating import Jinja2Templates
from loguru import logger # type: ignore
from neo4j import GraphDatabase
from neo4j.exceptions import ServiceUnavailable, AuthError, Neo4jError
from adaptive_graph_of_thoughts.api.routes.explorer import explorer_router
from adaptive_graph_of_thoughts.api.routes.mcp import mcp_router
from adaptive_graph_of_thoughts.api.routes.mcp_public import mcp_public_router
from adaptive_graph_of_thoughts.api.routes.nlq import nlq_router
from adaptive_graph_of_thoughts.api.routes.tools import tools_router
from adaptive_graph_of_thoughts.config import (
env_settings,
RuntimeSettings,
runtime_settings,
settings,
)
from adaptive_graph_of_thoughts.domain.services.got_processor import (
GoTProcessor,
)
from adaptive_graph_of_thoughts.services.resource_monitor import ResourceMonitor
from adaptive_graph_of_thoughts.services.llm import LLM_QUERY_LOGS, ask_llm
security = HTTPBasic()
def get_basic_auth(credentials: HTTPBasicCredentials = Depends(security)) -> bool:
"""
Validates HTTP Basic authentication credentials against environment variables.
Raises an HTTP 401 Unauthorized exception if credentials do not match the configured username and password.
"""
user = os.getenv("BASIC_AUTH_USER")
password = os.getenv("BASIC_AUTH_PASS")
if user and password:
if not (credentials.username == user and credentials.password == password):
raise HTTPException(
status_code=401,
detail="Unauthorized",
headers={"WWW-Authenticate": "Basic"},
)
return True
def _ask_llm(prompt: str) -> str:
"""
Forwards the given prompt to the LLM service and returns its response.
Parameters:
prompt (str): The input prompt to send to the language model.
Returns:
str: The response generated by the language model.
"""
return ask_llm(prompt)
def _test_conn(uri: str, user: str, password: str, database: str) -> bool:
"""Attempt to connect to Neo4j with the given parameters."""
try:
driver = GraphDatabase.driver(uri, auth=(user, password))
with driver.session(database=database) as session:
session.run("RETURN 1")
driver.close()
return True
except ServiceUnavailable as e:
logger.warning(f"Neo4j connection failed: Service unavailable. Error: {e}")
return False
except AuthError as e:
logger.warning(f"Neo4j connection failed: Authentication error. Error: {e}")
return False
except Neo4jError as e:
logger.warning(f"Neo4j connection failed: {e}")
return False
except Exception as e:
logger.error(f"An unexpected error occurred during Neo4j connection test: {e}")
return False
# Add src directory to Python path if not already there
# This must be done before other project imports
src_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
if src_path not in sys.path:
sys.path.insert(0, src_path)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
Lifespan context manager for FastAPI app startup and shutdown events.
This replaces the deprecated @app.on_event decorators.
"""
# Startup
logger.info("Application startup sequence initiated.")
# Any other async initializations can go here.
logger.info("Application startup completed successfully.")
yield # This is where the app runs
# Shutdown
logger.info("Application shutdown sequence initiated.")
# Clean up resources
if hasattr(app.state, "got_processor") and hasattr(
app.state.got_processor, "shutdown_resources"
):
try:
await app.state.got_processor.shutdown_resources()
except Exception as e:
logger.error(f"Error shutting down GoTProcessor: {e}")
logger.info("Application shutdown completed.")
def create_app() -> FastAPI:
"""
Create and configure the FastAPI application instance for the Adaptive Graph of Thoughts project.
Initializes logging, sets up CORS middleware based on application settings, attaches a GoTProcessor to the app state, configures Jinja2 templates, and registers endpoints for setup, configuration management, dashboard, chat, health checks, and debugging. Includes routers for MCP, NLQ, and explorer functionality, all secured with HTTP Basic authentication.
Returns:
FastAPI: The fully configured FastAPI application instance.
"""
# Configure logger
logger.remove()
logger.add(
sys.stderr,
level=settings.app.log_level.upper(),
format="<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | "
"<level>{level: <8}</level> | "
"<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
colorize=True,
)
logger.info("Logger configured with level: {}", settings.app.log_level.upper())
# Create FastAPI app with lifespan
app = FastAPI(
title=settings.app.name,
version=settings.app.version,
description="Adaptive Graph of Thoughts: Intelligent Scientific Reasoning through Graph-of-Thoughts MCP Server",
openapi_url="/openapi.json",
docs_url="/docs",
redoc_url="/redoc",
lifespan=lifespan,
)
templates = Jinja2Templates(directory=str(Path(__file__).parent / "templates"))
# Store GoTProcessor instance on app.state
resource_monitor = ResourceMonitor()
app.state.got_processor = GoTProcessor(
settings=settings, resource_monitor=resource_monitor
)
logger.info("GoTProcessor instance created and attached to app state.")
# Process allowed origins from settings
allowed_origins_str = settings.app.cors_allowed_origins_str
if allowed_origins_str == "*":
allowed_origins = ["*"]
else:
allowed_origins = [
origin.strip()
for origin in allowed_origins_str.split(",")
if origin.strip()
]
if not allowed_origins: # Default if empty or only whitespace after split
logger.warning(
"APP_CORS_ALLOWED_ORIGINS_STR was not '*' and parsed to empty list. Defaulting to ['*']."
)
allowed_origins = [
"*"
] # Default to all if configuration is invalid or empty
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=allowed_origins, # Use the parsed list
allow_credentials=True,
allow_methods=["GET", "POST", "OPTIONS", "PUT", "DELETE", "PATCH"],
allow_headers=["*"],
)
logger.info(f"CORS middleware configured with origins: {allowed_origins}")
# ----------------------- Setup Wizard -----------------------
@app.get("/setup", response_class=HTMLResponse)
async def setup_get(request: Request, _=Depends(get_basic_auth)):
"""
Render the Neo4j setup form with current connection values and missing LLM dependencies.
Displays a setup page pre-filled with Neo4j connection parameters from environment variables or runtime settings, and lists any missing LLM-related Python packages required for operation.
"""
if Path(".env").exists():
load_dotenv(".env")
values = {
"uri": os.getenv("NEO4J_URI", runtime_settings.neo4j.uri),
"user": os.getenv("NEO4J_USER", runtime_settings.neo4j.user),
"password": os.getenv("NEO4J_PASSWORD", runtime_settings.neo4j.password),
"database": os.getenv("NEO4J_DATABASE", runtime_settings.neo4j.database),
}
missing = [
pkg
for pkg in ("openai", "anthropic")
if importlib.util.find_spec(pkg) is None
]
return templates.TemplateResponse(
"setup_neo4j.html",
{
"request": request,
"values": values,
"message": None,
"missing_deps": missing,
},
)
@app.post("/setup", response_class=HTMLResponse)
async def setup_post(
request: Request,
_=Depends(get_basic_auth),
uri: str = Form(...),
user: str = Form(...),
password: str = Form(...),
database: str = Form(...),
):
"""
Handle POST requests to configure Neo4j connection settings during setup.
Validates the provided Neo4j connection parameters. If the connection fails, re-renders the setup form with an error message. On success, securely writes the credentials to the `.env` file and redirects to the settings configuration page.
"""
if not _test_conn(uri, user, password, database):
msg = "Failed to connect to Neo4j"
values = {
"uri": uri,
"user": user,
"password": "", # Don't expose password on failure
"database": database,
}
return templates.TemplateResponse(
"setup_neo4j.html",
{
"request": request,
"values": values,
"message": msg,
"success": False,
},
)
env_path = Path(".env")
env_path.touch(exist_ok=True)
env_path.chmod(stat.S_IRUSR | stat.S_IWUSR)
os.chown(env_path, os.getuid(), os.getgid())
set_key(str(env_path), "NEO4J_URI", uri)
set_key(str(env_path), "NEO4J_USER", user)
set_key(str(env_path), "NEO4J_PASSWORD", password)
set_key(str(env_path), "NEO4J_DATABASE", database)
env_path.chmod(stat.S_IRUSR | stat.S_IWUSR)
return RedirectResponse("/setup/settings", status_code=303)
yaml_path = Path(__file__).resolve().parents[2] / "config" / "settings.yaml"
try:
original_settings = yaml.safe_load(yaml_path.read_text()) or {}
logger.error(f"Failed to load YAML settings from {yaml_path}: {exc}")
logger.error(f"Failed to load YAML settings: {exc}")
original_settings = {}
def _read_settings() -> dict[str, str]:
"""
Read and return the application settings from the YAML configuration file.
Returns:
dict[str, str]: A dictionary containing the application's settings from the "app" section of the YAML file.
"""
with open(yaml_path) as fh:
data = yaml.safe_load(fh) or {}
return dict(data.get("app", {}))
def _write_settings(data: dict[str, str]) -> None:
"""
Safely updates the application settings in the YAML configuration file with the provided data.
Acquires an exclusive file lock to prevent concurrent writes, merges the new settings into the "app" section, and overwrites the file with the updated configuration.
"""
import fcntl
try:
with open(yaml_path, "r+") as fh:
# acquire an exclusive lock to prevent concurrent writes
fcntl.flock(fh.fileno(), fcntl.LOCK_EX)
# read the existing contents
fh.seek(0)
existing = yaml.safe_load(fh) or {}
# merge in the new data
existing.setdefault("app", {}).update(data)
# overwrite the file with the updated contents
fh.seek(0)
fh.truncate()
yaml.safe_dump(existing, fh)
except neo4j.exceptions.ServiceUnavailable as e:
logger.warning(f"Neo4j connection failed: Service unavailable. Error: {e}")
return False
except neo4j.exceptions.AuthError as e:
logger.warning(f"Neo4j connection failed: Authentication error. Error: {e}")
return False
except Exception as e:
logger.error(
f"An unexpected error occurred during Neo4j connection test: {e}"
)
return False
@app.get("/setup/settings", response_class=HTMLResponse)
async def edit_settings(request: Request, _=Depends(get_basic_auth)):
"""
Render the settings editing page with the current application settings.
Returns an HTML response displaying the editable application settings.
"""
return templates.TemplateResponse(
"setup_settings.html",
{"request": request, "settings": _read_settings(), "message": None},
)
@app.post("/setup/settings", response_class=HTMLResponse)
async def save_settings(request: Request, _=Depends(get_basic_auth)):
"""
Process and save application settings from a submitted form, then redirect to the dashboard.
Only whitelisted configuration keys are accepted and persisted.
"""
form = await request.form()
# Whitelist allowed configuration keys
allowed_keys = {"name", "version", "host", "port", "log_level"}
data = {k: form[k] for k in form if k in allowed_keys}
_write_settings(data)
return RedirectResponse("/dashboard", status_code=303)
@app.post("/setup/settings/reset", name="reset_settings")
async def reset_settings(_=Depends(get_basic_auth)) -> RedirectResponse:
"""
Reset the application settings to their original defaults and redirect to the dashboard.
Returns:
RedirectResponse: Redirects the user to the dashboard page after resetting settings.
"""
with open(yaml_path, "w") as fh:
yaml.safe_dump(original_settings, fh)
return RedirectResponse("/dashboard", status_code=303)
@app.get("/dashboard", response_class=HTMLResponse)
async def dashboard(request: Request, _=Depends(get_basic_auth)):
"""
Render the dashboard page displaying the current YAML configuration.
Returns:
TemplateResponse: Rendered HTML dashboard with the application's YAML configuration.
"""
return templates.TemplateResponse(
"dashboard.html",
{"request": request, "config_yaml": yaml_path.read_text()},
)
@app.post("/dashboard/save_config")
async def dashboard_save_config(
payload: dict = Body(...), _=Depends(get_basic_auth)
):
"""
Validate and save the provided YAML configuration to the settings file.
Parameters:
payload (dict): Dictionary containing a 'yaml' key with the YAML configuration as a string.
Returns:
dict: Success message if the configuration is valid and saved.
JSONResponse: Error message with HTTP 400 status if validation or saving fails.
"""
yaml_text = payload.get("yaml", "")
try:
data = yaml.safe_load(yaml_text) or {}
RuntimeSettings(**data)
with open(yaml_path, "w") as fh:
yaml.safe_dump(data, fh)
return {"message": "Saved"}
except Exception as e:
logger.error(
"Error occurred while saving configuration: {}", e
) # Log the exception
return JSONResponse(
status_code=400,
content={"message": "An error occurred while processing your request."},
)
@app.post("/chat")
async def chat_endpoint(
payload: dict = Body(...), _=Depends(get_basic_auth)
) -> dict[str, str]:
"""
Handles chat requests by forwarding a question to the language model and returning its answer.
Parameters:
payload (dict): JSON payload containing the "question" key.
Returns:
dict[str, str]: A dictionary with the LLM's answer under the "answer" key.
"""
question = payload.get("question", "")
answer = await asyncio.to_thread(ask_llm, question)
return {"answer": answer}
# Add health check endpoint
@app.get("/health", tags=["Health"])
async def health_check(_=Depends(get_basic_auth)):
"""
Checks the application's health and verifies Neo4j database connectivity.
Returns:
dict or JSONResponse: A dictionary with overall status and Neo4j status if healthy, or a JSONResponse with HTTP 500 if Neo4j is unreachable.
"""
logger.debug("Health check endpoint was called.") # type: ignore
payload = {"status": "ok"}
try:
driver = GraphDatabase.driver(
runtime_settings.neo4j.uri,
auth=(runtime_settings.neo4j.user, runtime_settings.neo4j.password),
)
with driver.session(database=runtime_settings.neo4j.database) as session:
session.run("RETURN 1")
driver.close()
payload["neo4j"] = "up"
return payload
except Exception:
payload["neo4j"] = "down"
payload["status"] = "unhealthy" # Or a more descriptive status
return JSONResponse(status_code=500, content=payload)
@app.get("/debug", response_class=HTMLResponse)
async def debug_page(_=Depends(get_basic_auth)) -> HTMLResponse:
"""
Render a debug page displaying Neo4j connection status, latency, and recent LLM query logs.
Returns:
HTMLResponse: An HTML page showing Neo4j database status and latency, along with a list of recent LLM prompts and responses.
"""
start = time.time()
try:
driver = GraphDatabase.driver(
runtime_settings.neo4j.uri,
auth=(runtime_settings.neo4j.user, runtime_settings.neo4j.password),
)
with driver.session(database=runtime_settings.neo4j.database) as session:
session.run("RETURN 1")
latency = int((time.time() - start) * 1000)
status = "up"
except Exception:
latency = -1
status = "down"
logs_html = "".join(
f"<li><b>Prompt:</b> {l['prompt'][:50]}<br><b>Response:</b> {l['response'][:50]}</li>"
for l in LLM_QUERY_LOGS
)
html = (
f"<h1>Debug</h1><p>Neo4j status: {status}, latency: {latency} ms</p>"
f"<h2>Last LLM Queries</h2><ul>{logs_html}</ul>"
)
return HTMLResponse(content=html)
# Include routers
app.include_router(
mcp_public_router,
prefix="/mcp",
tags=["MCP"],
)
app.include_router(
mcp_router,
prefix="/admin/mcp",
tags=["Admin MCP"],
dependencies=[Depends(get_basic_auth)],
)
app.include_router(nlq_router, dependencies=[Depends(get_basic_auth)])
app.include_router(explorer_router, dependencies=[Depends(get_basic_auth)])
app.include_router(tools_router, dependencies=[Depends(get_basic_auth)])
logger.info("API routers included. MCP router mounted at /mcp.")
logger.info(
"{} v{} application instance created.", settings.app.name, settings.app.version
)
return app