Skip to main content
Glama

Aerospace MCP

by cheesejaguar
main.pyβ€’11.5 kB
from __future__ import annotations # Load environment variables from .env for local/dev runs try: from dotenv import load_dotenv load_dotenv() except Exception: pass import math from typing import Any, Literal import airportsdata from fastapi import FastAPI, HTTPException, Query from geographiclib.geodesic import Geodesic from pydantic import BaseModel, Field # Optional / graceful import for OpenAP (perf + fuel) OPENAP_AVAILABLE = True try: from openap import FuelFlow, prop from openap.gen import FlightGenerator except Exception: OPENAP_AVAILABLE = False NM_PER_KM = 0.539956803 KM_PER_NM = 1.0 / NM_PER_KM # ---------------------------- # Models # ---------------------------- class AirportOut(BaseModel): iata: str icao: str name: str city: str country: str lat: float lon: float tz: str | None = None class PlanRequest(BaseModel): # You can pass cities, or override with explicit IATA depart_city: str = Field(..., description="e.g., 'San Jose'") arrive_city: str = Field(..., description="e.g., 'Tokyo'") depart_country: str | None = Field( None, description="ISO alpha-2 country code (optional)" ) arrive_country: str | None = Field( None, description="ISO alpha-2 country code (optional)" ) prefer_depart_iata: str | None = Field( None, description="Force a particular departure airport by IATA" ) prefer_arrive_iata: str | None = Field( None, description="Force a particular arrival airport by IATA" ) # Aircraft/performance knobs ac_type: str = Field(..., description="ICAO aircraft type (e.g., 'A320', 'B738')") cruise_alt_ft: int = Field(35000, ge=8000, le=45000) mass_kg: float | None = Field( None, description="If not set, defaults to 85% MTOW when available" ) route_step_km: float = Field( 25.0, gt=1.0, description="Sampling step for polyline points" ) backend: Literal["openap"] = "openap" # Placeholder for future backends class SegmentEst(BaseModel): time_min: float distance_km: float avg_gs_kts: float fuel_kg: float class PlanResponse(BaseModel): engine: str depart: AirportOut arrive: AirportOut distance_km: float distance_nm: float polyline: list[tuple[float, float]] # [(lat, lon), ...] estimates: dict[str, Any] # {"block": {...}, "climb": {...}, ...} # ---------------------------- # Airport resolution # ---------------------------- _AIRPORTS_IATA = airportsdata.load("IATA") # Fast, in-process dict (no network) # Shape example: {'SJC': {'iata':'SJC','icao':'KSJC','name':'San Jose Intl', 'city':'San Jose', 'country':'US', 'lat':..., 'lon':...}, ...} def _airport_from_iata(iata: str) -> AirportOut | None: ap = _AIRPORTS_IATA.get(iata.upper()) if not ap: return None return AirportOut( iata=iata.upper(), icao=ap.get("icao", ""), name=ap.get("name", ""), city=ap.get("city", ""), country=ap.get("country", ""), lat=float(ap["lat"]), lon=float(ap["lon"]), tz=ap.get("tz"), ) def _find_city_airports(city: str, country: str | None = None) -> list[AirportOut]: city_l = city.strip().lower() out = [] for iata, ap in _AIRPORTS_IATA.items(): if not iata or not ap.get("iata"): continue if ( ap.get("city", "").strip().lower() == city_l or city_l in ap.get("name", "").lower() ): if country is None or (ap.get("country", "").upper() == country.upper()): out.append( AirportOut( iata=iata.upper(), icao=ap.get("icao", ""), name=ap.get("name", ""), city=ap.get("city", ""), country=ap.get("country", ""), lat=float(ap["lat"]), lon=float(ap["lon"]), tz=ap.get("tz"), ) ) # Heuristic: prefer airports with "International" in the name, else keep order out.sort(key=lambda a: ("international" not in a.name.lower(), a.name)) # De-dup city matches that are clearly heliports or without IATA (already filtered) return out def _resolve_endpoint( city: str, country: str | None, prefer_iata: str | None, role: str, ) -> AirportOut: if prefer_iata: ap = _airport_from_iata(prefer_iata) if not ap: raise HTTPException( status_code=400, detail=f"{role}: IATA '{prefer_iata}' not found." ) return ap cands = _find_city_airports(city, country) if not cands: raise HTTPException( status_code=404, detail=f"{role}: no airport for city='{city}' (country={country or 'ANY'}).", ) return cands[0] # ---------------------------- # Geodesic / polyline # ---------------------------- def great_circle_points( lat1: float, lon1: float, lat2: float, lon2: float, step_km: float ) -> tuple[list[tuple[float, float]], float]: g = Geodesic.WGS84.Inverse(lat1, lon1, lat2, lon2) dist_m = g["s12"] line = Geodesic.WGS84.Line(lat1, lon1, g["azi1"]) n = max(1, int(math.ceil((dist_m / 1000.0) / step_km))) pts = [] for i in range(n + 1): s = min(dist_m, (dist_m * i) / n) p = line.Position(s) pts.append((p["lat2"], p["lon2"])) return pts, dist_m / 1000.0 # ---------------------------- # OpenAP estimates (climb / cruise / descent) # ---------------------------- def estimates_openap( ac_type: str, cruise_alt_ft: int, mass_kg: float | None, route_dist_km: float ) -> tuple[dict[str, Any], str]: if not OPENAP_AVAILABLE: raise HTTPException( status_code=501, detail="OpenAP backend unavailable. Please `pip install openap`.", ) # Resolve mass default from aircraft properties (fallback to 85% MTOW if present) mass = mass_kg engine_note = "openap" try: ac_props = prop.aircraft(ac_type, use_synonym=True) mtow = (ac_props.get("limits") or {}).get("MTOW") or ac_props.get("mtow") if mass is None and mtow: mass = 0.85 * float(mtow) # conservative default elif mass is None: # fallback generic narrowbody guess mass = 60_000.0 except Exception: mass = mass or 60_000.0 fgen = FlightGenerator(ac=ac_type) dt = 10 # seconds # Generate climb & descent segments at requested cruise altitude (if allowed) try: climb = fgen.climb(dt=dt, alt_cr=cruise_alt_ft) except TypeError: climb = fgen.climb(dt=dt) try: descent = fgen.descent(dt=dt, alt_cr=cruise_alt_ft) except TypeError: descent = fgen.descent(dt=dt) # Cruise segment baseline cruise_seg = fgen.cruise(dt=dt) def seg(df: Any) -> tuple[float, float, float, float, float]: t_s = float(df["t"].iloc[-1]) dist_km = float(df["s"].iloc[-1]) / 1000.0 # 's' is meters in OpenAP docs alt_ft = float(df["altitude"].mean()) gs_kts = float(df["groundspeed"].mean()) vs_fpm = float(df["vertical_rate"].mean()) return t_s, dist_km, alt_ft, gs_kts, vs_fpm t_climb, d_climb, a_climb, gs_climb, vs_climb = seg(climb) t_des, d_des, a_des, gs_des, vs_des = seg(descent) _, _, a_cru, gs_cru, _ = seg(cruise_seg) # How much cruise distance remains after climb+descent? d_remaining = max(0.0, route_dist_km - (d_climb + d_des)) # Guard for super-short hops cruise_time_s = ( 0.0 if gs_cru <= 1e-6 else (d_remaining * KM_PER_NM) / (gs_cru / 3600.0 / 1.852) ) # but simpler to compute by kts: # Convert properly: kts = nm/hour β†’ km/s = (kts * NM_PER_KM) / 3600 cruise_time_s = ( 0.0 if gs_cru <= 1e-6 else (d_remaining / ((gs_cru * NM_PER_KM) / 3600.0)) ) fuelflow = FuelFlow(ac=ac_type) def fuel_from( avg_gs_kts: float, avg_alt_ft: float, vs_fpm: float, time_s: float ) -> float: # TAS ~ GS (zero-wind assumption for baseline) try: ff_kg_s = float( fuelflow.enroute(mass=mass, tas=avg_gs_kts, alt=avg_alt_ft, vs=vs_fpm) ) except Exception: ff_kg_s = 0.0 return ff_kg_s * time_s fuel_climb = fuel_from(gs_climb, a_climb, vs_climb, t_climb) fuel_des = fuel_from(gs_des, a_des, vs_des, t_des) fuel_cru = fuel_from(gs_cru, cruise_alt_ft, 0.0, cruise_time_s) # Build segments climb_out = SegmentEst( time_min=t_climb / 60.0, distance_km=d_climb, avg_gs_kts=gs_climb, fuel_kg=fuel_climb, ) cruise_out = SegmentEst( time_min=cruise_time_s / 60.0, distance_km=d_remaining, avg_gs_kts=gs_cru, fuel_kg=fuel_cru, ) des_out = SegmentEst( time_min=t_des / 60.0, distance_km=d_des, avg_gs_kts=gs_des, fuel_kg=fuel_des ) block_time_min = climb_out.time_min + cruise_out.time_min + des_out.time_min block_fuel_kg = climb_out.fuel_kg + cruise_out.fuel_kg + des_out.fuel_kg estimates = { "block": {"time_min": block_time_min, "fuel_kg": block_fuel_kg}, "climb": climb_out.model_dump(), "cruise": cruise_out.model_dump(), "descent": des_out.model_dump(), "assumptions": { "zero_wind": True, "mass_kg": mass, "cruise_alt_ft": cruise_alt_ft, }, } return estimates, engine_note # ---------------------------- # FastAPI # ---------------------------- app = FastAPI(title="Flight Planner API", version="0.1.0") @app.get("/health") def health() -> dict[str, Any]: return { "status": "ok", "openap": OPENAP_AVAILABLE, "airports_count": len(_AIRPORTS_IATA), } @app.get("/airports/by_city", response_model=list[AirportOut]) def airports_by_city( city: str = Query(...), country: str | None = Query(None) ) -> list[AirportOut]: return _find_city_airports(city, country) @app.post("/plan", response_model=PlanResponse) def plan(req: PlanRequest) -> PlanResponse: if ( req.depart_city.strip().lower() == req.arrive_city.strip().lower() and not req.prefer_arrive_iata and not req.prefer_depart_iata ): raise HTTPException( status_code=400, detail="Departure and arrival look identicalβ€”please specify airports explicitly.", ) dep = _resolve_endpoint( req.depart_city, req.depart_country, req.prefer_depart_iata, role="departure" ) arr = _resolve_endpoint( req.arrive_city, req.arrive_country, req.prefer_arrive_iata, role="arrival" ) # Great-circle route poly, dist_km = great_circle_points( dep.lat, dep.lon, arr.lat, arr.lon, req.route_step_km ) # Estimates if req.backend == "openap": est, engine_name = estimates_openap( req.ac_type, req.cruise_alt_ft, req.mass_kg, dist_km ) else: raise HTTPException(status_code=400, detail=f"Unknown backend: {req.backend}") return PlanResponse( engine=engine_name, depart=dep, arrive=arr, distance_km=dist_km, distance_nm=dist_km * NM_PER_KM, polyline=poly, estimates=est, )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/cheesejaguar/aerospace-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server