Skip to main content
Glama

mcp-optimizer

fastapi_integration.pyโ€ข16.9 kB
#!/usr/bin/env python3 """ MCP Optimizer - FastAPI Integration Example This module demonstrates how to integrate mcp-optimizer with FastAPI to create a RESTful optimization service. Installation: pip install mcp-optimizer fastapi uvicorn python-multipart Usage: uvicorn fastapi_integration:app --reload --port 8000 Then visit: http://localhost:8000/docs """ import asyncio import logging import time import uuid from contextlib import asynccontextmanager from datetime import datetime from enum import Enum from typing import Any import uvicorn from fastapi import BackgroundTasks, Depends, FastAPI, HTTPException, status from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field, field_validator # Import mcp-optimizer components try: from mcp_optimizer import ( AssignmentSolver, KnapsackSolver, LinearProgrammingSolver, OptimizationEngine, OptimizationResult, PortfolioOptimizer, RoutingSolver, SolverConfig, TransportationSolver, ) except ImportError: print("Error: mcp-optimizer package not found. Install with: pip install mcp-optimizer") exit(1) # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Global optimization engine optimization_engine: OptimizationEngine | None = None job_store: dict[str, dict] = {} class ProblemType(str, Enum): """Supported optimization problem types.""" LINEAR_PROGRAMMING = "linear_programming" ASSIGNMENT = "assignment" TRANSPORTATION = "transportation" KNAPSACK = "knapsack" ROUTING = "routing" PORTFOLIO = "portfolio" class JobStatus(str, Enum): """Job execution status.""" PENDING = "pending" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" # Pydantic models for request/response class OptimizationRequest(BaseModel): """Base optimization request model.""" problem_type: ProblemType problem_data: dict[str, Any] config: dict[str, Any] | None = None job_id: str | None = None @field_validator("job_id", mode="before") @classmethod def generate_job_id(cls, v: str | None) -> str: return v or str(uuid.uuid4()) class LinearProgrammingRequest(BaseModel): """Linear programming specific request.""" objective: dict[str, Any] = Field(..., description="Objective function") constraints: list[dict[str, Any]] = Field(..., description="Problem constraints") bounds: list[tuple] = Field(..., description="Variable bounds") variable_names: list[str] | None = None class AssignmentRequest(BaseModel): """Assignment problem specific request.""" cost_matrix: list[list[float]] = Field(..., description="Cost matrix") employee_names: list[str] | None = None task_names: list[str] | None = None class TransportationRequest(BaseModel): """Transportation problem specific request.""" supply: list[float] = Field(..., description="Supply capacities") demand: list[float] = Field(..., description="Demand requirements") costs: list[list[float]] = Field(..., description="Transportation costs") supplier_names: list[str] | None = None customer_names: list[str] | None = None class KnapsackRequest(BaseModel): """Knapsack problem specific request.""" items: list[dict[str, Any]] = Field(..., description="Items with weight and value") capacity: float = Field(..., description="Knapsack capacity") knapsack_type: str = Field(default="0-1", description="Knapsack type") class RoutingRequest(BaseModel): """Routing problem specific request.""" distance_matrix: list[list[float]] = Field(..., description="Distance matrix") locations: list[str] = Field(..., description="Location names") start_location: int = Field(default=0, description="Starting location index") problem_type: str = Field(default="TSP", description="Routing problem type") class PortfolioRequest(BaseModel): """Portfolio optimization specific request.""" assets: list[str] = Field(..., description="Asset names") expected_returns: list[float] = Field(..., description="Expected returns") covariance_matrix: list[list[float]] = Field(..., description="Covariance matrix") target_return: float | None = None risk_tolerance: float | None = None constraints: dict[str, Any] | None = None class OptimizationResponse(BaseModel): """Optimization response model.""" job_id: str status: JobStatus problem_type: ProblemType created_at: datetime completed_at: datetime | None = None execution_time: float | None = None result: dict[str, Any] | None = None error: str | None = None class JobListResponse(BaseModel): """Job list response model.""" jobs: list[OptimizationResponse] total: int page: int page_size: int @asynccontextmanager async def lifespan(app: FastAPI): """Application lifespan manager.""" global optimization_engine # Startup logger.info("Starting MCP Optimizer FastAPI service...") optimization_engine = OptimizationEngine( config=SolverConfig(timeout=300, max_iterations=10000, tolerance=1e-6, threads=4) ) logger.info("Optimization engine initialized") yield # Shutdown logger.info("Shutting down MCP Optimizer service...") # Create FastAPI app app = FastAPI( title="MCP Optimizer API", description="RESTful API for mathematical optimization using MCP Optimizer", version="1.0.0", lifespan=lifespan, ) # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) def get_optimization_engine() -> OptimizationEngine: """Dependency to get optimization engine.""" if optimization_engine is None: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Optimization engine not initialized", ) return optimization_engine async def solve_optimization_problem( problem_type: ProblemType, problem_data: dict[str, Any], config: dict[str, Any] | None = None, ) -> OptimizationResult: """Solve optimization problem based on type.""" solver_config = SolverConfig(**(config or {})) if problem_type == ProblemType.LINEAR_PROGRAMMING: solver = LinearProgrammingSolver(config=solver_config) return await asyncio.to_thread(solver.solve, problem_data) elif problem_type == ProblemType.ASSIGNMENT: solver = AssignmentSolver(config=solver_config) return await asyncio.to_thread(solver.solve, problem_data) elif problem_type == ProblemType.TRANSPORTATION: solver = TransportationSolver(config=solver_config) return await asyncio.to_thread(solver.solve, problem_data) elif problem_type == ProblemType.KNAPSACK: solver = KnapsackSolver(config=solver_config) return await asyncio.to_thread(solver.solve, problem_data) elif problem_type == ProblemType.ROUTING: solver = RoutingSolver(config=solver_config) return await asyncio.to_thread(solver.solve, problem_data) elif problem_type == ProblemType.PORTFOLIO: optimizer = PortfolioOptimizer(config=solver_config) return await asyncio.to_thread(optimizer.solve, problem_data) else: raise ValueError(f"Unsupported problem type: {problem_type}") async def execute_optimization_job(job_id: str): """Execute optimization job in background.""" job = job_store[job_id] try: # Update job status job["status"] = JobStatus.RUNNING job["started_at"] = datetime.now() # Solve the problem start_time = time.time() result = await solve_optimization_problem( job["problem_type"], job["problem_data"], job["config"] ) execution_time = time.time() - start_time # Update job with results job["status"] = JobStatus.COMPLETED job["completed_at"] = datetime.now() job["execution_time"] = execution_time job["result"] = { "objective_value": result.objective_value, "variables": result.variables, "status": result.status, "solver_info": result.solver_info, } logger.info(f"Job {job_id} completed successfully in {execution_time:.3f}s") except Exception as e: # Update job with error job["status"] = JobStatus.FAILED job["completed_at"] = datetime.now() job["error"] = str(e) logger.error(f"Job {job_id} failed: {e}") # API Endpoints @app.get("/", response_model=dict[str, str]) async def root(): """Root endpoint with API information.""" return { "name": "MCP Optimizer API", "version": "1.0.0", "description": "RESTful API for mathematical optimization", "docs": "/docs", "health": "/health", } @app.get("/health", response_model=dict[str, str]) async def health_check(): """Health check endpoint.""" return { "status": "healthy", "timestamp": datetime.now().isoformat(), "engine_status": "ready" if optimization_engine else "not_ready", } @app.post("/optimize", response_model=OptimizationResponse) async def optimize( request: OptimizationRequest, background_tasks: BackgroundTasks, engine: OptimizationEngine = Depends(get_optimization_engine), ): """Submit optimization problem for solving.""" job_id = request.job_id # Create job record job_store[job_id] = { "job_id": job_id, "status": JobStatus.PENDING, "problem_type": request.problem_type, "problem_data": request.problem_data, "config": request.config, "created_at": datetime.now(), "completed_at": None, "execution_time": None, "result": None, "error": None, } # Start background task background_tasks.add_task(execute_optimization_job, job_id) return OptimizationResponse(**job_store[job_id]) @app.post("/optimize/sync", response_model=OptimizationResponse) async def optimize_sync( request: OptimizationRequest, engine: OptimizationEngine = Depends(get_optimization_engine), ): """Solve optimization problem synchronously.""" job_id = request.job_id start_time = time.time() try: # Solve the problem result = await solve_optimization_problem( request.problem_type, request.problem_data, request.config ) execution_time = time.time() - start_time # Create response response = OptimizationResponse( job_id=job_id, status=JobStatus.COMPLETED, problem_type=request.problem_type, created_at=datetime.now(), completed_at=datetime.now(), execution_time=execution_time, result={ "objective_value": result.objective_value, "variables": result.variables, "status": result.status, "solver_info": result.solver_info, }, ) # Store in job store job_store[job_id] = response.dict() return response except Exception as e: execution_time = time.time() - start_time response = OptimizationResponse( job_id=job_id, status=JobStatus.FAILED, problem_type=request.problem_type, created_at=datetime.now(), completed_at=datetime.now(), execution_time=execution_time, error=str(e), ) job_store[job_id] = response.dict() raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Optimization failed: {e}" ) from e @app.get("/jobs/{job_id}", response_model=OptimizationResponse) async def get_job(job_id: str): """Get job status and results.""" if job_id not in job_store: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Job {job_id} not found") return OptimizationResponse(**job_store[job_id]) @app.get("/jobs", response_model=JobListResponse) async def list_jobs(page: int = 1, page_size: int = 10, status_filter: JobStatus | None = None): """List all jobs with pagination.""" jobs = list(job_store.values()) # Filter by status if provided if status_filter: jobs = [job for job in jobs if job["status"] == status_filter] # Sort by creation time (newest first) jobs.sort(key=lambda x: x["created_at"], reverse=True) # Pagination total = len(jobs) start_idx = (page - 1) * page_size end_idx = start_idx + page_size page_jobs = jobs[start_idx:end_idx] return JobListResponse( jobs=[OptimizationResponse(**job) for job in page_jobs], total=total, page=page, page_size=page_size, ) @app.delete("/jobs/{job_id}") async def delete_job(job_id: str): """Delete a job.""" if job_id not in job_store: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Job {job_id} not found") del job_store[job_id] return {"message": f"Job {job_id} deleted successfully"} # Problem-specific endpoints @app.post("/optimize/linear-programming", response_model=OptimizationResponse) async def optimize_linear_programming( request: LinearProgrammingRequest, background_tasks: BackgroundTasks ): """Solve linear programming problem.""" opt_request = OptimizationRequest( problem_type=ProblemType.LINEAR_PROGRAMMING, problem_data=request.dict() ) return await optimize(opt_request, background_tasks) @app.post("/optimize/assignment", response_model=OptimizationResponse) async def optimize_assignment(request: AssignmentRequest, background_tasks: BackgroundTasks): """Solve assignment problem.""" opt_request = OptimizationRequest( problem_type=ProblemType.ASSIGNMENT, problem_data=request.dict() ) return await optimize(opt_request, background_tasks) @app.post("/optimize/transportation", response_model=OptimizationResponse) async def optimize_transportation( request: TransportationRequest, background_tasks: BackgroundTasks ): """Solve transportation problem.""" opt_request = OptimizationRequest( problem_type=ProblemType.TRANSPORTATION, problem_data=request.dict() ) return await optimize(opt_request, background_tasks) @app.post("/optimize/knapsack", response_model=OptimizationResponse) async def optimize_knapsack(request: KnapsackRequest, background_tasks: BackgroundTasks): """Solve knapsack problem.""" opt_request = OptimizationRequest( problem_type=ProblemType.KNAPSACK, problem_data=request.dict() ) return await optimize(opt_request, background_tasks) @app.post("/optimize/routing", response_model=OptimizationResponse) async def optimize_routing(request: RoutingRequest, background_tasks: BackgroundTasks): """Solve routing problem.""" opt_request = OptimizationRequest(problem_type=ProblemType.ROUTING, problem_data=request.dict()) return await optimize(opt_request, background_tasks) @app.post("/optimize/portfolio", response_model=OptimizationResponse) async def optimize_portfolio(request: PortfolioRequest, background_tasks: BackgroundTasks): """Solve portfolio optimization problem.""" opt_request = OptimizationRequest( problem_type=ProblemType.PORTFOLIO, problem_data=request.dict() ) return await optimize(opt_request, background_tasks) # Statistics endpoint @app.get("/stats", response_model=dict[str, Any]) async def get_statistics(): """Get API usage statistics.""" jobs = list(job_store.values()) stats = { "total_jobs": len(jobs), "jobs_by_status": {}, "jobs_by_type": {}, "average_execution_time": 0, "total_execution_time": 0, } # Count by status for job_status in JobStatus: stats["jobs_by_status"][job_status.value] = len( [job for job in jobs if job["status"] == job_status] ) # Count by type for problem_type in ProblemType: stats["jobs_by_type"][problem_type.value] = len( [job for job in jobs if job["problem_type"] == problem_type] ) # Calculate execution times completed_jobs = [job for job in jobs if job["execution_time"] is not None] if completed_jobs: execution_times = [job["execution_time"] for job in completed_jobs] stats["average_execution_time"] = sum(execution_times) / len(execution_times) stats["total_execution_time"] = sum(execution_times) return stats if __name__ == "__main__": uvicorn.run( "fastapi_integration:app", host="0.0.0.0", port=8000, reload=True, log_level="info", )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dmitryanchikov/mcp-optimizer'

If you have feedback or need assistance with the MCP directory API, please join our Discord server