import argparse
import os
import json
from dotenv import load_dotenv
from fastmcp import FastMCP
from .load_tools import ToolLoader
from .database import AstraDBManager
from fastmcp.server.dependencies import get_context
from .logger import get_logger
from .run_tool import RunToolMiddleware
import asyncio
from fastmcp.server.auth.providers.jwt import StaticTokenVerifier, TokenVerifier
from fastmcp.server.dependencies import get_http_headers
from .utils import load_env_variables
# Initialize logger
logger = get_logger("agentic_astra", level=os.getenv(
"LOG_LEVEL"), log_file=os.getenv("LOG_FILE"))
async def main():
logger.info(f"Starting Agentic Astra MCP Server")
# First, do a preliminary parse to get --env argument if provided
preliminary_parser = argparse.ArgumentParser(add_help=False)
preliminary_parser.add_argument(
"--env-file", help="Environment variables file to load")
preliminary_args, _ = preliminary_parser.parse_known_args()
preliminary_parser.add_argument("--env-var", action="append",
help="Environment variables in KEY=VALUE format (can be used multiple times)")
preliminary_args, _ = preliminary_parser.parse_known_args()
# Load environment file if provided (before parsing other arguments)
if preliminary_args.env_file:
logger.info(
f"Loading environment variables from {preliminary_args.env_file}")
load_dotenv(preliminary_args.env_file, override=True)
if preliminary_args.env_var:
load_env_variables(preliminary_args.env_var, logger)
# Parse arguments
parser = argparse.ArgumentParser(conflict_handler="resolve")
# ---- Uvicorn standard params ----
parser.add_argument("--host", type=str, default=os.getenv("HOST") or "127.0.0.1",
help="Bind socket to this host")
parser.add_argument("--port", type=int, default=os.getenv("PORT") or 8000,
help="Bind socket to this port")
parser.add_argument("--reload", action="store_true",
help="Enable auto-reload")
parser.add_argument("--workers", type=int, default=1,
help="Number of worker processes")
parser.add_argument("--log-level", type=str,
default=os.getenv("LOG_LEVEL") or "info", help="Logging level")
parser.add_argument("--log-file", type=str,
default=os.getenv("LOG_FILE") or "logs.log", help="Logging file")
parser.add_argument("--transport", "-tr", default="stdio")
# ---- Astra MCP Server params ----
parser.add_argument("--astra_token", "-t",
default=os.getenv("ASTRA_DB_APPLICATION_TOKEN"))
parser.add_argument("--astra_endpoint", "-e",
default=os.getenv("ASTRA_DB_API_ENDPOINT"))
parser.add_argument("--astra_db_name", "-db",
default=os.getenv("ASTRA_DB_DB_NAME"))
parser.add_argument("--astra_db_audit_table", "-audit",
default=os.getenv("ASTRA_DB_AUDIT_TABLE_NAME") or "mcp_audit_trail")
parser.add_argument("--catalog_file", "-f")
parser.add_argument("--catalog_collection", "-c",
default=os.getenv("ASTRA_DB_CATALOG_COLLECTION") or "tool_catalog")
parser.add_argument("--tags") # For filtering tools
parser.add_argument("--auth", default=True,
action="store_true", help="Disable authentication")
parser.add_argument("--audit", default=False,
action="store_true", help="Enable audit trail")
parser.add_argument("--env-file", help="Environment variables file to load")
parser.add_argument("--env-var", action="append",
help="Environment variables in KEY=VALUE format (can be used multiple times)")
args = parser.parse_args()
# Validate required arguments
required_args = []
if not args.astra_token or args.astra_token.strip() == "":
required_args.append("--astra_token (or ASTRA_DB_APPLICATION_TOKEN env var)")
if required_args:
error_msg = f"Missing required arguments: {', '.join(required_args)}"
logger.error(error_msg)
parser.print_help()
raise ValueError(error_msg)
astra_db_manager = None
try:
astra_db_manager = AstraDBManager(
token=args.astra_token,
endpoint=args.astra_endpoint,
db_name=args.astra_db_name)
except Exception as e:
logger.error(f"Error initializing Astra DB manager: {e}")
raise ValueError(f"Error initializing Astra DB manager: {e}")
if args.audit:
astra_db_manager.setup_audit_trail(args.astra_db_audit_table)
logger.info(f"Audit table name: {args.astra_db_audit_table}")
# Initialize MCP
# Configure JWT verifier
token = os.getenv("AGENTIC_ASTRA_TOKEN") or os.getenv("ASTRA_MCP_SERVER_TOKEN")
tokens_dict = {}
if token:
tokens_dict[token] = {
"client_id": "agentic-astra",
"scopes": ["read:data"]
}
verifier = StaticTokenVerifier(
tokens=tokens_dict,
required_scopes=["read:data"]
)
mcp = FastMCP("Agentic Astra MCP Server", auth=verifier)
# Load tools config content
tools_config_content = None
if args.catalog_file:
logger.info(f"Loading tools config from {args.catalog_file}")
tools_config_content = json.load(open(args.catalog_file))
else:
logger.info(
f"Loading tools Astra collection {args.catalog_collection}")
tools_config_content = astra_db_manager.get_catalog_content(
collection_name=args.catalog_collection, tags=args.tags)
logger.info(f"Tools config content: {tools_config_content}")
if not tools_config_content or len(tools_config_content) == 0:
logger.error(
"No tools found. Load tools to Astra DB collection or reference the catalog file")
return
# Add middleware to process tool calling
mcp.add_middleware(RunToolMiddleware(
astra_db_manager, tools_config_content))
logger.info("Initializing Agentic Astra MCP Server")
logger.info(f"Starting Agentic Astra MCP Server on port {args.port}")
# Generate tools based on tools config content
tool_loader = ToolLoader(mcp, astra_db_manager, tools_config_content)
tool_loader.load_all_tools()
logger.info("All tools loaded successfully")
app = None
# Return the appropriate transport app
if args.transport == "http" or args.transport == "sse":
await mcp.run_async(transport=args.transport, host=args.host, port=args.port, log_level=args.log_level)
elif args.transport == "stdio":
await mcp.run_async(transport=args.transport, log_level=args.log_level)
else:
raise ValueError(f"Invalid transport: {args.transport}")
logger.info("Agentic Astra MCP Server started successfully")
def run_server():
"""Synchronous entry point for the agentic-astra command."""
asyncio.run(main())
if __name__ == "__main__":
run_server()