Skip to main content
Glama

Adaptive Graph of Thoughts MCP Server

app_setup.pyβ€’19.6 kB
import asyncio import importlib import os import stat import sys import time from contextlib import asynccontextmanager from pathlib import Path import neo4j import yaml from dotenv import load_dotenv, set_key from fastapi import Body, Depends, FastAPI, Form, HTTPException, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse from fastapi.security import HTTPBasic, HTTPBasicCredentials from fastapi.templating import Jinja2Templates from loguru import logger # type: ignore from neo4j import GraphDatabase from neo4j.exceptions import ServiceUnavailable, AuthError, Neo4jError from adaptive_graph_of_thoughts.api.routes.explorer import explorer_router from adaptive_graph_of_thoughts.api.routes.mcp import mcp_router from adaptive_graph_of_thoughts.api.routes.mcp_public import mcp_public_router from adaptive_graph_of_thoughts.api.routes.nlq import nlq_router from adaptive_graph_of_thoughts.api.routes.tools import tools_router from adaptive_graph_of_thoughts.config import ( env_settings, RuntimeSettings, runtime_settings, settings, ) from adaptive_graph_of_thoughts.domain.services.got_processor import ( GoTProcessor, ) from adaptive_graph_of_thoughts.services.resource_monitor import ResourceMonitor from adaptive_graph_of_thoughts.services.llm import LLM_QUERY_LOGS, ask_llm security = HTTPBasic() def get_basic_auth(credentials: HTTPBasicCredentials = Depends(security)) -> bool: """ Validates HTTP Basic authentication credentials against environment variables. Raises an HTTP 401 Unauthorized exception if credentials do not match the configured username and password. """ user = os.getenv("BASIC_AUTH_USER") password = os.getenv("BASIC_AUTH_PASS") if user and password: if not (credentials.username == user and credentials.password == password): raise HTTPException( status_code=401, detail="Unauthorized", headers={"WWW-Authenticate": "Basic"}, ) return True def _ask_llm(prompt: str) -> str: """ Forwards the given prompt to the LLM service and returns its response. Parameters: prompt (str): The input prompt to send to the language model. Returns: str: The response generated by the language model. """ return ask_llm(prompt) def _test_conn(uri: str, user: str, password: str, database: str) -> bool: """Attempt to connect to Neo4j with the given parameters.""" try: driver = GraphDatabase.driver(uri, auth=(user, password)) with driver.session(database=database) as session: session.run("RETURN 1") driver.close() return True except ServiceUnavailable as e: logger.warning(f"Neo4j connection failed: Service unavailable. Error: {e}") return False except AuthError as e: logger.warning(f"Neo4j connection failed: Authentication error. Error: {e}") return False except Neo4jError as e: logger.warning(f"Neo4j connection failed: {e}") return False except Exception as e: logger.error(f"An unexpected error occurred during Neo4j connection test: {e}") return False # Add src directory to Python path if not already there # This must be done before other project imports src_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")) if src_path not in sys.path: sys.path.insert(0, src_path) @asynccontextmanager async def lifespan(app: FastAPI): """ Lifespan context manager for FastAPI app startup and shutdown events. This replaces the deprecated @app.on_event decorators. """ # Startup logger.info("Application startup sequence initiated.") # Any other async initializations can go here. logger.info("Application startup completed successfully.") yield # This is where the app runs # Shutdown logger.info("Application shutdown sequence initiated.") # Clean up resources if hasattr(app.state, "got_processor") and hasattr( app.state.got_processor, "shutdown_resources" ): try: await app.state.got_processor.shutdown_resources() except Exception as e: logger.error(f"Error shutting down GoTProcessor: {e}") logger.info("Application shutdown completed.") def create_app() -> FastAPI: """ Create and configure the FastAPI application instance for the Adaptive Graph of Thoughts project. Initializes logging, sets up CORS middleware based on application settings, attaches a GoTProcessor to the app state, configures Jinja2 templates, and registers endpoints for setup, configuration management, dashboard, chat, health checks, and debugging. Includes routers for MCP, NLQ, and explorer functionality, all secured with HTTP Basic authentication. Returns: FastAPI: The fully configured FastAPI application instance. """ # Configure logger logger.remove() logger.add( sys.stderr, level=settings.app.log_level.upper(), format="<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | " "<level>{level: <8}</level> | " "<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>", colorize=True, ) logger.info("Logger configured with level: {}", settings.app.log_level.upper()) # Create FastAPI app with lifespan app = FastAPI( title=settings.app.name, version=settings.app.version, description="Adaptive Graph of Thoughts: Intelligent Scientific Reasoning through Graph-of-Thoughts MCP Server", openapi_url="/openapi.json", docs_url="/docs", redoc_url="/redoc", lifespan=lifespan, ) templates = Jinja2Templates(directory=str(Path(__file__).parent / "templates")) # Store GoTProcessor instance on app.state resource_monitor = ResourceMonitor() app.state.got_processor = GoTProcessor( settings=settings, resource_monitor=resource_monitor ) logger.info("GoTProcessor instance created and attached to app state.") # Process allowed origins from settings allowed_origins_str = settings.app.cors_allowed_origins_str if allowed_origins_str == "*": allowed_origins = ["*"] else: allowed_origins = [ origin.strip() for origin in allowed_origins_str.split(",") if origin.strip() ] if not allowed_origins: # Default if empty or only whitespace after split logger.warning( "APP_CORS_ALLOWED_ORIGINS_STR was not '*' and parsed to empty list. Defaulting to ['*']." ) allowed_origins = [ "*" ] # Default to all if configuration is invalid or empty # Configure CORS app.add_middleware( CORSMiddleware, allow_origins=allowed_origins, # Use the parsed list allow_credentials=True, allow_methods=["GET", "POST", "OPTIONS", "PUT", "DELETE", "PATCH"], allow_headers=["*"], ) logger.info(f"CORS middleware configured with origins: {allowed_origins}") # ----------------------- Setup Wizard ----------------------- @app.get("/setup", response_class=HTMLResponse) async def setup_get(request: Request, _=Depends(get_basic_auth)): """ Render the Neo4j setup form with current connection values and missing LLM dependencies. Displays a setup page pre-filled with Neo4j connection parameters from environment variables or runtime settings, and lists any missing LLM-related Python packages required for operation. """ if Path(".env").exists(): load_dotenv(".env") values = { "uri": os.getenv("NEO4J_URI", runtime_settings.neo4j.uri), "user": os.getenv("NEO4J_USER", runtime_settings.neo4j.user), "password": os.getenv("NEO4J_PASSWORD", runtime_settings.neo4j.password), "database": os.getenv("NEO4J_DATABASE", runtime_settings.neo4j.database), } missing = [ pkg for pkg in ("openai", "anthropic") if importlib.util.find_spec(pkg) is None ] return templates.TemplateResponse( "setup_neo4j.html", { "request": request, "values": values, "message": None, "missing_deps": missing, }, ) @app.post("/setup", response_class=HTMLResponse) async def setup_post( request: Request, _=Depends(get_basic_auth), uri: str = Form(...), user: str = Form(...), password: str = Form(...), database: str = Form(...), ): """ Handle POST requests to configure Neo4j connection settings during setup. Validates the provided Neo4j connection parameters. If the connection fails, re-renders the setup form with an error message. On success, securely writes the credentials to the `.env` file and redirects to the settings configuration page. """ if not _test_conn(uri, user, password, database): msg = "Failed to connect to Neo4j" values = { "uri": uri, "user": user, "password": "", # Don't expose password on failure "database": database, } return templates.TemplateResponse( "setup_neo4j.html", { "request": request, "values": values, "message": msg, "success": False, }, ) env_path = Path(".env") env_path.touch(exist_ok=True) env_path.chmod(stat.S_IRUSR | stat.S_IWUSR) os.chown(env_path, os.getuid(), os.getgid()) set_key(str(env_path), "NEO4J_URI", uri) set_key(str(env_path), "NEO4J_USER", user) set_key(str(env_path), "NEO4J_PASSWORD", password) set_key(str(env_path), "NEO4J_DATABASE", database) env_path.chmod(stat.S_IRUSR | stat.S_IWUSR) return RedirectResponse("/setup/settings", status_code=303) yaml_path = Path(__file__).resolve().parents[2] / "config" / "settings.yaml" try: original_settings = yaml.safe_load(yaml_path.read_text()) or {} logger.error(f"Failed to load YAML settings from {yaml_path}: {exc}") logger.error(f"Failed to load YAML settings: {exc}") original_settings = {} def _read_settings() -> dict[str, str]: """ Read and return the application settings from the YAML configuration file. Returns: dict[str, str]: A dictionary containing the application's settings from the "app" section of the YAML file. """ with open(yaml_path) as fh: data = yaml.safe_load(fh) or {} return dict(data.get("app", {})) def _write_settings(data: dict[str, str]) -> None: """ Safely updates the application settings in the YAML configuration file with the provided data. Acquires an exclusive file lock to prevent concurrent writes, merges the new settings into the "app" section, and overwrites the file with the updated configuration. """ import fcntl try: with open(yaml_path, "r+") as fh: # acquire an exclusive lock to prevent concurrent writes fcntl.flock(fh.fileno(), fcntl.LOCK_EX) # read the existing contents fh.seek(0) existing = yaml.safe_load(fh) or {} # merge in the new data existing.setdefault("app", {}).update(data) # overwrite the file with the updated contents fh.seek(0) fh.truncate() yaml.safe_dump(existing, fh) except neo4j.exceptions.ServiceUnavailable as e: logger.warning(f"Neo4j connection failed: Service unavailable. Error: {e}") return False except neo4j.exceptions.AuthError as e: logger.warning(f"Neo4j connection failed: Authentication error. Error: {e}") return False except Exception as e: logger.error( f"An unexpected error occurred during Neo4j connection test: {e}" ) return False @app.get("/setup/settings", response_class=HTMLResponse) async def edit_settings(request: Request, _=Depends(get_basic_auth)): """ Render the settings editing page with the current application settings. Returns an HTML response displaying the editable application settings. """ return templates.TemplateResponse( "setup_settings.html", {"request": request, "settings": _read_settings(), "message": None}, ) @app.post("/setup/settings", response_class=HTMLResponse) async def save_settings(request: Request, _=Depends(get_basic_auth)): """ Process and save application settings from a submitted form, then redirect to the dashboard. Only whitelisted configuration keys are accepted and persisted. """ form = await request.form() # Whitelist allowed configuration keys allowed_keys = {"name", "version", "host", "port", "log_level"} data = {k: form[k] for k in form if k in allowed_keys} _write_settings(data) return RedirectResponse("/dashboard", status_code=303) @app.post("/setup/settings/reset", name="reset_settings") async def reset_settings(_=Depends(get_basic_auth)) -> RedirectResponse: """ Reset the application settings to their original defaults and redirect to the dashboard. Returns: RedirectResponse: Redirects the user to the dashboard page after resetting settings. """ with open(yaml_path, "w") as fh: yaml.safe_dump(original_settings, fh) return RedirectResponse("/dashboard", status_code=303) @app.get("/dashboard", response_class=HTMLResponse) async def dashboard(request: Request, _=Depends(get_basic_auth)): """ Render the dashboard page displaying the current YAML configuration. Returns: TemplateResponse: Rendered HTML dashboard with the application's YAML configuration. """ return templates.TemplateResponse( "dashboard.html", {"request": request, "config_yaml": yaml_path.read_text()}, ) @app.post("/dashboard/save_config") async def dashboard_save_config( payload: dict = Body(...), _=Depends(get_basic_auth) ): """ Validate and save the provided YAML configuration to the settings file. Parameters: payload (dict): Dictionary containing a 'yaml' key with the YAML configuration as a string. Returns: dict: Success message if the configuration is valid and saved. JSONResponse: Error message with HTTP 400 status if validation or saving fails. """ yaml_text = payload.get("yaml", "") try: data = yaml.safe_load(yaml_text) or {} RuntimeSettings(**data) with open(yaml_path, "w") as fh: yaml.safe_dump(data, fh) return {"message": "Saved"} except Exception as e: logger.error( "Error occurred while saving configuration: {}", e ) # Log the exception return JSONResponse( status_code=400, content={"message": "An error occurred while processing your request."}, ) @app.post("/chat") async def chat_endpoint( payload: dict = Body(...), _=Depends(get_basic_auth) ) -> dict[str, str]: """ Handles chat requests by forwarding a question to the language model and returning its answer. Parameters: payload (dict): JSON payload containing the "question" key. Returns: dict[str, str]: A dictionary with the LLM's answer under the "answer" key. """ question = payload.get("question", "") answer = await asyncio.to_thread(ask_llm, question) return {"answer": answer} # Add health check endpoint @app.get("/health", tags=["Health"]) async def health_check(_=Depends(get_basic_auth)): """ Checks the application's health and verifies Neo4j database connectivity. Returns: dict or JSONResponse: A dictionary with overall status and Neo4j status if healthy, or a JSONResponse with HTTP 500 if Neo4j is unreachable. """ logger.debug("Health check endpoint was called.") # type: ignore payload = {"status": "ok"} try: driver = GraphDatabase.driver( runtime_settings.neo4j.uri, auth=(runtime_settings.neo4j.user, runtime_settings.neo4j.password), ) with driver.session(database=runtime_settings.neo4j.database) as session: session.run("RETURN 1") driver.close() payload["neo4j"] = "up" return payload except Exception: payload["neo4j"] = "down" payload["status"] = "unhealthy" # Or a more descriptive status return JSONResponse(status_code=500, content=payload) @app.get("/debug", response_class=HTMLResponse) async def debug_page(_=Depends(get_basic_auth)) -> HTMLResponse: """ Render a debug page displaying Neo4j connection status, latency, and recent LLM query logs. Returns: HTMLResponse: An HTML page showing Neo4j database status and latency, along with a list of recent LLM prompts and responses. """ start = time.time() try: driver = GraphDatabase.driver( runtime_settings.neo4j.uri, auth=(runtime_settings.neo4j.user, runtime_settings.neo4j.password), ) with driver.session(database=runtime_settings.neo4j.database) as session: session.run("RETURN 1") latency = int((time.time() - start) * 1000) status = "up" except Exception: latency = -1 status = "down" logs_html = "".join( f"<li><b>Prompt:</b> {l['prompt'][:50]}<br><b>Response:</b> {l['response'][:50]}</li>" for l in LLM_QUERY_LOGS ) html = ( f"<h1>Debug</h1><p>Neo4j status: {status}, latency: {latency} ms</p>" f"<h2>Last LLM Queries</h2><ul>{logs_html}</ul>" ) return HTMLResponse(content=html) # Include routers app.include_router( mcp_public_router, prefix="/mcp", tags=["MCP"], ) app.include_router( mcp_router, prefix="/admin/mcp", tags=["Admin MCP"], dependencies=[Depends(get_basic_auth)], ) app.include_router(nlq_router, dependencies=[Depends(get_basic_auth)]) app.include_router(explorer_router, dependencies=[Depends(get_basic_auth)]) app.include_router(tools_router, dependencies=[Depends(get_basic_auth)]) logger.info("API routers included. MCP router mounted at /mcp.") logger.info( "{} v{} application instance created.", settings.app.name, settings.app.version ) return app

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/SaptaDey/Adaptive-Graph-of-Thoughts-MCP-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server