Skip to main content
Glama
nesirat

MCP Vulnerability Management System

by nesirat
main.py35.3 kB
from fastapi import FastAPI, Depends, HTTPException, status, Query, Body, Response, Form from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, StreamingResponse, RedirectResponse, HTMLResponse from fastapi.requests import Request from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from sqlalchemy.orm import Session from sqlalchemy import or_, and_, func, case from datetime import datetime, timedelta from typing import List, Optional import jwt from fastapi.middleware.trustedhost import TrustedHostMiddleware import asyncio import json import secrets from pydantic import BaseModel, EmailStr, constr import re from pydantic import field_validator import logging import os from fastapi.openapi.docs import get_swagger_ui_html from fastapi.openapi.utils import get_openapi from fastapi.middleware.compression import CompressionMiddleware from fastapi.middleware.http2 import HTTP2Middleware import uvicorn from .core.config import settings from .db.database import SessionLocal, engine, get_db, init_db from .db import models from .collectors.bsi_collector import BSICollector from .collectors.nvd_collector import NVDCollector from .collectors.mitre_collector import MITRECollector from .core.security import verify_password, get_password_hash, get_current_user from .auth.auth import router as auth_router from .core.security.tokens import TokenManager from .core.logging import logger from .routers import dashboard, tickets, auth, api_keys, alerts, notifications, analytics, versioning from .core.middleware import APIUsageMiddleware, SecurityMiddleware, RateLimitHeadersMiddleware from .core.middleware.versioning import VersioningMiddleware from .services.notification import NotificationService from .middleware.cache import CacheMiddleware from .core.tasks import celery from .api.v1.api import api_router from .middleware.rate_limit import RateLimitMiddleware from .middleware.api_key import APIKeyMiddleware # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Create database tables init_db() app = FastAPI( title=settings.API_TITLE, description=settings.API_DESCRIPTION, version=settings.API_VERSION, docs_url=settings.API_DOCS_URL, redoc_url=settings.API_REDOC_URL, openapi_url=settings.API_OPENAPI_URL, contact=settings.API_CONTACT, license_info=settings.API_LICENSE, terms_of_service=settings.API_TERMS_OF_SERVICE ) # Add middleware versioning_middleware = VersioningMiddleware(app) app.add_middleware(APIUsageMiddleware) app.add_middleware(SecurityMiddleware) app.add_middleware(RateLimitHeadersMiddleware) # Mount static files app.mount("/static", StaticFiles(directory="/app/app/static"), name="static") # Templates templates = Jinja2Templates(directory="/app/app/templates") # Middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) app.add_middleware( TrustedHostMiddleware, allowed_hosts=["*"] ) # Add cache middleware app.add_middleware(CacheMiddleware) # Add compression middleware app.add_middleware(CompressionMiddleware) # Add HTTP/2 middleware app.add_middleware(HTTP2Middleware) # Add rate limit middleware app.add_middleware(RateLimitMiddleware) # Add API key middleware app.add_middleware(APIKeyMiddleware) # Security oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/token", auto_error=False) # Password validation regex PASSWORD_REGEX = re.compile(r'^.{4,}$') # Just require 4 or more characters for testing # Initialize token manager token_manager = TokenManager(settings.SECRET_KEY) # Store versioning middleware in app state for access in routes app.state.versioning_middleware = versioning_middleware # Include routers app.include_router(auth_router, prefix="/auth", tags=["authentication"]) app.include_router(dashboard.router, prefix="/api/v1", tags=["dashboard"]) app.include_router(tickets.router, prefix="/api/v1", tags=["tickets"]) app.include_router(auth.router, prefix="/api/auth", tags=["auth"]) app.include_router(api_keys.router, prefix="/api/api-keys", tags=["api-keys"]) app.include_router(alerts.router, prefix="/api/alerts", tags=["alerts"]) app.include_router(notifications.router, prefix="/api/notifications", tags=["notifications"]) app.include_router(analytics.router, prefix="/api/analytics", tags=["analytics"]) app.include_router(versioning.router, prefix="/api/versioning", tags=["versioning"]) # Initialize notification service notification_service = NotificationService() # Add notification service to app state app.state.notification_service = notification_service def create_access_token(data: dict, remember_me: bool = False): print(f"\n=== TOKEN CREATION START ===") print(f"Debug - Creating token with data: {data}") to_encode = data.copy() if remember_me: expire = datetime.utcnow() + timedelta(days=30) # 30 days for remember me else: expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) print(f"Debug - Token payload: {to_encode}") try: # Ensure SECRET_KEY is properly set if not settings.SECRET_KEY: raise ValueError("SECRET_KEY is not set") print(f"Debug - Using SECRET_KEY length: {len(settings.SECRET_KEY)}") print(f"Debug - Using ALGORITHM: {settings.ALGORITHM}") encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM) print(f"Debug - Token created successfully") print(f"Debug - Token length: {len(encoded_jwt)}") print("=== TOKEN CREATION COMPLETE ===\n") return encoded_jwt # Return without Bearer prefix except Exception as e: print(f"Debug - Error creating token: {str(e)}") print("=== TOKEN CREATION FAILED ===\n") raise # Token model class Token(BaseModel): access_token: str token_type: str def authenticate_user(db: Session, email: str, password: str): user = db.query(models.User).filter(models.User.email == email).first() if not user or not verify_password(password, user.hashed_password): return None return user @app.get("/") async def root(request: Request): """Root endpoint""" logger.info("Root endpoint accessed") return templates.TemplateResponse( "index.html", {"request": request, "settings": settings} ) @app.get("/login") async def login_page(request: Request): """Login page""" logger.info("Login page accessed") return templates.TemplateResponse( "login.html", {"request": request, "settings": settings} ) @app.post("/token") async def login_for_access_token( request: Request, response: Response, username: str = Form(...), password: str = Form(...), remember: str = Form(default="false"), # Handle remember separately db: Session = Depends(get_db) ): print("\n=== DETAILED LOGIN ATTEMPT LOG ===") print(f"Request URL: {request.url}") print(f"Request Method: {request.method}") print(f"Request Headers: {dict(request.headers)}") print(f"Request Cookies: {request.cookies}") print(f"Form Data: username={username}, password=***, remember={remember}") # Convert remember string to boolean remember_bool = remember.lower() == "true" try: # Get user from database user = db.query(models.User).filter(models.User.email == username).first() print(f"User found: {user is not None}") if not user: print("User not found in database") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect email or password", headers={"WWW-Authenticate": "Bearer"}, ) # Verify password print("Verifying password...") if not verify_password(password, user.hashed_password): print("Password verification failed") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect email or password", headers={"WWW-Authenticate": "Bearer"}, ) print("Password verified successfully") # Create access token print(f"Creating access token with remember={remember_bool}") access_token = create_access_token( data={"sub": user.email}, remember_me=remember_bool ) print(f"Access token created successfully, length: {len(access_token)}") # Update last login print("Updating last login time") user.last_login = datetime.utcnow() db.commit() print("Last login time updated") # Set cookie print(f"Setting cookie with remember={remember_bool}") max_age = 30 * 24 * 60 * 60 if remember_bool else settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60 print(f"Cookie max_age: {max_age} seconds") response.set_cookie( key="access_token", value=access_token, httponly=True, secure=True, samesite="lax", max_age=max_age ) print("Cookie set successfully") print("=== LOGIN SUCCESSFUL ===") return {"access_token": access_token, "token_type": "bearer"} except Exception as e: print(f"Error during login: {str(e)}") print(f"Error type: {type(e)}") raise @app.get("/vulnerabilities/", response_model=List[dict]) async def get_vulnerabilities( skip: int = 0, limit: int = 100, severity: Optional[str] = None, source: Optional[str] = None, search: Optional[str] = None, db: Session = Depends(get_db), token: str = Depends(oauth2_scheme) ): query = db.query(models.Vulnerability) if severity: query = query.filter(models.Vulnerability.severity == severity) if source: query = query.filter(models.Vulnerability.source == source) if search: search_filter = or_( models.Vulnerability.title.ilike(f"%{search}%"), models.Vulnerability.description.ilike(f"%{search}%"), models.Vulnerability.cve_id.ilike(f"%{search}%") ) query = query.filter(search_filter) vulnerabilities = query.offset(skip).limit(limit).all() return [ { "id": v.id, "cve_id": v.cve_id, "title": v.title, "description": v.description, "severity": v.severity, "cvss_score": v.cvss_score, "published_date": v.published_date, "last_modified_date": v.last_modified_date, "source": v.source, "references": v.references } for v in vulnerabilities ] @app.get("/vulnerabilities/{cve_id}", response_model=dict) async def get_vulnerability( cve_id: str, db: Session = Depends(get_db), token: str = Depends(oauth2_scheme) ): vulnerability = db.query(models.Vulnerability).filter(models.Vulnerability.cve_id == cve_id).first() if not vulnerability: raise HTTPException(status_code=404, detail="Vulnerability not found") return { "id": vulnerability.id, "cve_id": vulnerability.cve_id, "title": vulnerability.title, "description": vulnerability.description, "severity": vulnerability.severity, "cvss_score": vulnerability.cvss_score, "published_date": vulnerability.published_date, "last_modified_date": vulnerability.last_modified_date, "source": vulnerability.source, "references": vulnerability.references } @app.post("/collect/bsi") async def collect_bsi_data( db: Session = Depends(get_db), token: str = Depends(oauth2_scheme) ): collector = BSICollector() vulnerabilities = await collector.collect_vulnerabilities() for vuln_data in vulnerabilities: db_vuln = models.Vulnerability(**vuln_data) db.add(db_vuln) try: db.commit() return {"message": f"Successfully collected {len(vulnerabilities)} vulnerabilities"} except Exception as e: db.rollback() raise HTTPException(status_code=500, detail=str(e)) @app.post("/collect/nvd") async def collect_nvd_data( days_back: int = Query(7, ge=1, le=120), db: Session = Depends(get_db), token: str = Depends(oauth2_scheme) ): collector = NVDCollector() vulnerabilities = await collector.collect_vulnerabilities(days_back) for vuln_data in vulnerabilities: db_vuln = models.Vulnerability(**vuln_data) db.add(db_vuln) try: db.commit() return {"message": f"Successfully collected {len(vulnerabilities)} vulnerabilities"} except Exception as e: db.rollback() raise HTTPException(status_code=500, detail=str(e)) @app.post("/collect/mitre") async def collect_mitre_data( limit: int = Query(100, ge=1, le=1000), db: Session = Depends(get_db), token: str = Depends(oauth2_scheme) ): collector = MITRECollector() vulnerabilities = await collector.collect_vulnerabilities(limit) for vuln_data in vulnerabilities: db_vuln = models.Vulnerability(**vuln_data) db.add(db_vuln) try: db.commit() return {"message": f"Successfully collected {len(vulnerabilities)} vulnerabilities"} except Exception as e: db.rollback() raise HTTPException(status_code=500, detail=str(e)) @app.get("/stats") async def get_vulnerability_stats( db: Session = Depends(get_db), token: str = Depends(oauth2_scheme) ): # Basic vulnerability stats total_vulns = db.query(models.Vulnerability).count() severity_counts = db.query( models.Vulnerability.severity, func.count(models.Vulnerability.id) ).group_by(models.Vulnerability.severity).all() source_counts = db.query( models.Vulnerability.source, func.count(models.Vulnerability.id) ).group_by(models.Vulnerability.source).all() # Monthly trends monthly_trends = db.query( func.date_trunc('month', models.Vulnerability.last_modified_date).label('month'), func.count(models.Vulnerability.id).label('count') ).group_by('month').order_by('month').all() # API usage stats api_usage = db.query( models.APIKeyUsage.key_id, func.count(models.APIKeyUsage.id).label('total_requests'), func.avg(models.APIKeyUsage.response_time).label('avg_response_time'), func.count(case((models.APIKeyUsage.status_code >= 400, 1))).label('error_count') ).group_by(models.APIKeyUsage.key_id).all() # User activity stats user_activity = db.query( models.User.id, models.User.email, func.count(models.Ticket.id).label('ticket_count'), func.count(models.APIKey.id).label('api_key_count'), func.max(models.User.last_login).label('last_login') ).outerjoin(models.Ticket).outerjoin(models.APIKey).group_by(models.User.id, models.User.email).all() return { "total_vulnerabilities": total_vulns, "severity_distribution": dict(severity_counts), "source_distribution": dict(source_counts), "monthly_trends": [ { "month": trend.month.strftime("%Y-%m"), "count": trend.count } for trend in monthly_trends ], "api_usage": [ { "key_id": usage.key_id, "total_requests": usage.total_requests, "avg_response_time": float(usage.avg_response_time) if usage.avg_response_time else 0, "error_count": usage.error_count } for usage in api_usage ], "user_activity": [ { "user_id": activity.id, "email": activity.email, "ticket_count": activity.ticket_count, "api_key_count": activity.api_key_count, "last_login": activity.last_login.isoformat() if activity.last_login else None } for activity in user_activity ] } @app.get("/health") async def health_check(): return { "status": "healthy", "version": settings.API_VERSION, "cache_enabled": settings.CACHE_ENABLED, "compression_enabled": settings.COMPRESSION_ENABLED, "celery_connected": celery.connection().connected } @app.get("/events") async def events(request: Request, token: str = Depends(oauth2_scheme)): """ Server-Sent Events endpoint for real-time updates """ async def event_generator(): while True: if await request.is_disconnected(): break # Get latest vulnerabilities db = next(SessionLocal()) vulnerabilities = db.query(models.Vulnerability).order_by(models.Vulnerability.last_modified_date.desc()).limit(10).all() # Convert to list of dictionaries vuln_list = [ { "id": v.id, "cve_id": v.cve_id, "title": v.title, "description": v.description, "severity": v.severity, "cvss_score": v.cvss_score, "published_date": v.published_date, "last_modified_date": v.last_modified_date, "source": v.source, "references": v.references } for v in vulnerabilities ] yield f"data: {json.dumps(vuln_list)}\n\n" await asyncio.sleep(30) # Update every 30 seconds return StreamingResponse(event_generator(), media_type="text/event-stream") @app.post("/api/v1/messages") async def post_message( message: dict, token: str = Depends(oauth2_scheme) ): """ Endpoint for receiving messages from the client """ # Here you can process the message and trigger appropriate actions # For example, you could trigger a vulnerability collection if message.get("type") == "collect": collector = NVDCollector() vulnerabilities = await collector.collect_vulnerabilities(days_back=7) db = next(SessionLocal()) for vuln_data in vulnerabilities: db_vuln = models.Vulnerability(**vuln_data) db.add(db_vuln) try: db.commit() return {"message": f"Successfully collected {len(vulnerabilities)} vulnerabilities"} except Exception as e: db.rollback() raise HTTPException(status_code=500, detail=str(e)) return {"message": "Message received"} class UserCreate(BaseModel): email: EmailStr password: str @field_validator('password') def validate_password(cls, v): if not re.match(r'^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[@$!%*?&])[A-Za-z\d@$!%*?&]{8,}$', v): raise ValueError('Password must be at least 8 characters long and contain at least one uppercase letter, one lowercase letter, one number, and one special character') return v class APIKeyCreate(BaseModel): name: str description: Optional[str] = None class APIKeyResponse(BaseModel): id: int name: str description: Optional[str] key: str is_active: bool created_at: datetime last_used: Optional[datetime] class Config: from_attributes = True class APIKeyUsage(BaseModel): key_id: int endpoint: str timestamp: datetime status_code: int response_time: float # Dependency def get_db(): db = SessionLocal() try: yield db finally: db.close() @app.post("/register", response_model=dict) async def register_user(user: UserCreate, db: Session = Depends(get_db)): # Check if user already exists db_user = db.query(models.User).filter(models.User.email == user.email).first() if db_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered" ) try: hashed_password = get_password_hash(user.password) db_user = models.User( email=user.email, hashed_password=hashed_password ) db.add(db_user) db.commit() db.refresh(db_user) return {"message": "User created successfully"} except ValueError as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=str(e) ) @app.get("/dashboard") async def dashboard_page( request: Request, current_user: models.User = Depends(get_current_user) ): return templates.TemplateResponse( "dashboard.html", { "request": request, "settings": settings, "user": current_user } ) @app.get("/admin") async def admin_dashboard(request: Request): return templates.TemplateResponse("admin.html", {"request": request}) # Admin routes @app.get("/admin/users", response_model=List[dict]) async def get_users( db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user) ): if not current_user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized to access admin features" ) users = db.query(models.User).all() return [ { "id": user.id, "email": user.email, "is_active": user.is_active, "is_admin": user.is_admin, "created_at": user.created_at, "last_login": user.last_login, "api_key_count": len(user.api_keys), "ticket_count": len(user.tickets) } for user in users ] @app.get("/admin/api-keys", response_model=List[dict]) async def get_all_api_keys( db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user) ): if not current_user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized to access admin features" ) api_keys = db.query(models.APIKey).all() return [ { "id": key.id, "user_email": key.user.email, "name": key.name, "description": key.description, "is_active": key.is_active, "created_at": key.created_at, "last_used": key.last_used, "usage_count": key.usage_count } for key in api_keys ] @app.get("/admin/tickets", response_model=List[dict]) async def get_all_tickets( status: Optional[str] = None, priority: Optional[str] = None, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user) ): if not current_user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized to access admin features" ) query = db.query(models.Ticket) if status: query = query.filter(models.Ticket.status == status) if priority: query = query.filter(models.Ticket.priority == priority) tickets = query.order_by(models.Ticket.created_at.desc()).all() return [ { "id": ticket.id, "user_email": ticket.user.email, "subject": ticket.subject, "status": ticket.status, "priority": ticket.priority, "created_at": ticket.created_at, "updated_at": ticket.updated_at, "response_count": len(ticket.responses) } for ticket in tickets ] @app.get("/admin/tickets/{ticket_id}", response_model=dict) async def get_ticket_details( ticket_id: int, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user) ): if not current_user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized to access admin features" ) ticket = db.query(models.Ticket).filter(models.Ticket.id == ticket_id).first() if not ticket: raise HTTPException(status_code=404, detail="Ticket not found") return { "id": ticket.id, "user_email": ticket.user.email, "subject": ticket.subject, "description": ticket.description, "status": ticket.status, "priority": ticket.priority, "created_at": ticket.created_at, "updated_at": ticket.updated_at, "responses": [ { "id": response.id, "message": response.message, "created_at": response.created_at, "is_admin": response.is_admin, "user_email": response.user.email } for response in ticket.responses ] } @app.post("/admin/tickets/{ticket_id}/respond") async def respond_to_ticket( ticket_id: int, message: str = Body(...), db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user) ): if not current_user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized to access admin features" ) ticket = db.query(models.Ticket).filter(models.Ticket.id == ticket_id).first() if not ticket: raise HTTPException(status_code=404, detail="Ticket not found") response = models.TicketResponse( ticket_id=ticket_id, user_id=current_user.id, message=message, is_admin=True ) db.add(response) db.commit() return {"message": "Response added successfully"} @app.put("/admin/tickets/{ticket_id}/status") async def update_ticket_status( ticket_id: int, status: str = Body(...), db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user) ): if not current_user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized to access admin features" ) ticket = db.query(models.Ticket).filter(models.Ticket.id == ticket_id).first() if not ticket: raise HTTPException(status_code=404, detail="Ticket not found") ticket.status = status db.commit() return {"message": "Ticket status updated successfully"} # Customer ticket routes @app.post("/tickets", response_model=dict) async def create_ticket( subject: str = Body(...), description: str = Body(...), priority: str = Body("medium"), db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user) ): ticket = models.Ticket( user_id=current_user.id, subject=subject, description=description, priority=priority ) db.add(ticket) db.commit() db.refresh(ticket) return { "id": ticket.id, "subject": ticket.subject, "status": ticket.status, "created_at": ticket.created_at } @app.get("/tickets", response_model=List[dict]) async def get_user_tickets( db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user) ): tickets = db.query(models.Ticket).filter( models.Ticket.user_id == current_user.id ).order_by(models.Ticket.created_at.desc()).all() return [ { "id": ticket.id, "subject": ticket.subject, "status": ticket.status, "priority": ticket.priority, "created_at": ticket.created_at, "updated_at": ticket.updated_at, "response_count": len(ticket.responses) } for ticket in tickets ] @app.get("/tickets/{ticket_id}", response_model=dict) async def get_ticket( ticket_id: int, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user) ): ticket = db.query(models.Ticket).filter( models.Ticket.id == ticket_id, models.Ticket.user_id == current_user.id ).first() if not ticket: raise HTTPException(status_code=404, detail="Ticket not found") return { "id": ticket.id, "subject": ticket.subject, "description": ticket.description, "status": ticket.status, "priority": ticket.priority, "created_at": ticket.created_at, "updated_at": ticket.updated_at, "responses": [ { "id": response.id, "message": response.message, "created_at": response.created_at, "is_admin": response.is_admin } for response in ticket.responses ] } @app.post("/tickets/{ticket_id}/respond") async def respond_to_ticket_user( ticket_id: int, message: str = Body(...), db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user) ): ticket = db.query(models.Ticket).filter( models.Ticket.id == ticket_id, models.Ticket.user_id == current_user.id ).first() if not ticket: raise HTTPException(status_code=404, detail="Ticket not found") response = models.TicketResponse( ticket_id=ticket_id, user_id=current_user.id, message=message, is_admin=False ) db.add(response) db.commit() return {"message": "Response added successfully"} class UserProfileUpdate(BaseModel): name: Optional[str] = None phone: Optional[str] = None company: Optional[str] = None newsletter_subscribed: Optional[bool] = None email_frequency: Optional[str] = None @app.post("/profile", response_model=dict) async def update_profile( profile: UserProfileUpdate, current_user: models.User = Depends(get_current_user), db: Session = Depends(get_db) ): """Update user profile information""" try: # Update only provided fields for field, value in profile.model_dump(exclude_unset=True).items(): setattr(current_user, field, value) db.commit() return {"message": "Profile updated successfully"} except Exception as e: db.rollback() raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e) ) @app.get("/profile") async def profile_page( request: Request, current_user: models.User = Depends(get_current_user) ): """Display user profile page""" context = { "request": request, "user": current_user } return templates.TemplateResponse("profile.html", context) @app.get("/logout") async def logout(): response = RedirectResponse(url="/login-page", status_code=status.HTTP_302_FOUND) response.delete_cookie(key="access_token") return response @app.get("/tickets") async def tickets_page( request: Request, current_user: models.User = Depends(get_current_user) ): """Display the tickets list page""" return templates.TemplateResponse( "tickets/list.html", {"request": request, "user": current_user} ) @app.get("/tickets/{ticket_id}") async def ticket_detail_page( request: Request, ticket_id: int, current_user: models.User = Depends(get_current_user), db: Session = Depends(get_db) ): """Display the ticket detail page""" ticket = db.query(models.Ticket).filter( models.Ticket.id == ticket_id, models.Ticket.user_id == current_user.id ).first() if not ticket: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Ticket not found" ) return templates.TemplateResponse( "tickets/detail.html", { "request": request, "user": current_user, "ticket": ticket, "get_status_color": lambda status: { "open": "primary", "in_progress": "warning", "resolved": "success", "closed": "secondary" }.get(status, "secondary"), "get_priority_color": lambda priority: { "low": "info", "medium": "primary", "high": "warning", "critical": "danger" }.get(priority, "secondary"), "format_date": lambda date: date.strftime("%Y-%m-%d %H:%M:%S") } ) @app.exception_handler(Exception) async def global_exception_handler(request: Request, exc: Exception): logger.error(f"Unhandled exception: {str(exc)}", exc_info=True) return JSONResponse( status_code=500, content={"detail": "Internal server error"} ) # Custom OpenAPI schema def custom_openapi(): if app.openapi_schema: return app.openapi_schema openapi_schema = get_openapi( title=settings.API_TITLE, version=settings.API_VERSION, description=settings.API_DESCRIPTION, routes=app.routes, contact=settings.API_CONTACT, license_info=settings.API_LICENSE, terms_of_service=settings.API_TERMS_OF_SERVICE, ) # Add security schemes openapi_schema["components"]["securitySchemes"] = { "BearerAuth": { "type": "http", "scheme": "bearer", "bearerFormat": "JWT", } } # Add security requirements openapi_schema["security"] = [{"BearerAuth": []}] # Add API version information openapi_schema["info"]["x-api-version"] = settings.CURRENT_API_VERSION openapi_schema["info"]["x-supported-versions"] = settings.SUPPORTED_API_VERSIONS app.openapi_schema = openapi_schema return app.openapi_schema app.openapi = custom_openapi # Custom docs endpoints @app.get(settings.API_DOCS_URL, include_in_schema=False) async def custom_swagger_ui_html(): return get_swagger_ui_html( openapi_url=settings.API_OPENAPI_URL, title=f"{settings.API_TITLE} - Swagger UI", swagger_js_url="/static/swagger-ui-bundle.js", swagger_css_url="/static/swagger-ui.css", ) @app.get("/", include_in_schema=False) async def root(): return { "message": "Welcome to MCP API", "docs_url": settings.API_DOCS_URL, "redoc_url": settings.API_REDOC_URL, "openapi_url": settings.API_OPENAPI_URL, "current_version": settings.CURRENT_API_VERSION, "supported_versions": settings.SUPPORTED_API_VERSIONS, } if __name__ == "__main__": uvicorn.run( "app.main:app", host="0.0.0.0", port=8000, reload=True, http="h11", # Use h11 for HTTP/1.1 # For HTTP/2 support, use: # http="h2", # ssl_keyfile="path/to/key.pem", # ssl_certfile="path/to/cert.pem" )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nesirat/MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server