Skip to main content
Glama

Createve.AI Nexus

by spgoodman
app.pyβ€’4.53 kB
"""Main application factory for the Createve.AI Nexus Server.""" import asyncio from typing import Dict, Any, Optional import uvicorn from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from ..utils import ConfigManager, LoggingManager from .security import SecurityManager from .queue import QueueManager from ..api import APILoader, APIExecutor, OpenAPIGenerator, RouteGenerator from ..mcp import MCPServer async def create_app(config_path: str = "./config.yaml"): """Create a new FastAPI application instance.""" # Initialize config config = ConfigManager(config_path) # Initialize logging logging_manager = LoggingManager(config) logger = logging_manager.logger # Create FastAPI app app = FastAPI( title="Createve.AI Nexus Server", description="API and MCP server for various services", version="1.0.0" ) # Initialize MCP if enabled if config.mcp_enabled: # Initialize FastMCP from mcp.server.fastmcp import FastMCP mcp = FastMCP( config.mcp_server_name, version=config.mcp_server_version, description=config.mcp_server_description ) # Mount MCP server app.mount("/mcp", app=mcp.sse_app()) logger.info(f"MCP server initialized: {config.mcp_server_name}") else: logger.info("MCP server is disabled") # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"] ) # Add authentication middleware security_manager = SecurityManager(config, logger) app.middleware("http")(security_manager.authenticate_request) # Load APIs api_loader = APILoader(config, logger) apis = await api_loader.load_apis() # Create API executor api_executor = APIExecutor(apis, logger) # Create queue manager queue_manager = QueueManager(config, logger) await queue_manager.start_workers(api_executor) # Create OpenAPI generator openapi_generator = OpenAPIGenerator(apis, config) # Create routes route_generator = RouteGenerator( app, apis, api_executor, queue_manager, security_manager, openapi_generator, logger ) route_generator.generate_routes() # Initialize MCP server (if enabled) if config.mcp_enabled: logger.info("Initializing MCP server") mcp_server = MCPServer(app, config, logger, security_manager) await mcp_server.setup_registries(api_executor, queue_manager, apis, mcp) logger.info(f"MCP server initialized: {config.mcp_server_name}") else: logger.info("MCP server is disabled") # Add file watcher for API changes if config.reload_on_api_dir_change or config.reload_on_api_file_change: @app.on_event("startup") async def start_api_watcher(): asyncio.create_task(api_watcher()) async def api_watcher(): while True: try: # Check for changes if await api_loader.check_for_changes(): # Wait for processing to complete if configured if config.wait_for_processing_before_reload: # TODO: Wait for queue to be empty pass # Reload APIs logger.info("Reloading APIs...") new_apis = await api_loader.load_apis() # Update executor api_executor.apis = new_apis # TODO: Update routes logger.info("APIs reloaded") except Exception as e: logger.error(f"API watcher error: {str(e)}") # Sleep before checking again if config.reload_on_api_dir_change: await asyncio.sleep(config.reload_on_api_dir_change_interval_seconds) elif config.reload_on_api_file_change: await asyncio.sleep(config.reload_on_api_file_change_interval_seconds) @app.on_event("shutdown") async def shutdown_event(): await queue_manager.stop_workers() return app

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/spgoodman/createveai-nexus-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server