Skip to main content
Glama

plan_flight

Calculate flight routes between airports with performance estimates. Generate detailed flight plans using departure and arrival data, aircraft configurations, and route options.

Instructions

Plan a flight route between two airports with performance estimates.

Args: departure: Dict with departure info (city, country, iata) arrival: Dict with arrival info (city, country, iata) aircraft: Optional aircraft config (ac_type, cruise_alt_ft, route_step_km) route_options: Optional route options

Returns: JSON string with flight plan details

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
departureYes
arrivalYes
aircraftNo
route_optionsNo

Implementation Reference

  • The primary handler function for the 'plan_flight' MCP tool. It processes structured input dictionaries for departure, arrival, and aircraft details, resolves airports using internal helpers, computes the great-circle route, generates performance estimates if OpenAP is available, and returns a formatted JSON response.
    def plan_flight(
        departure: dict,
        arrival: dict,
        aircraft: dict | None = None,
        route_options: dict | None = None,
    ) -> str:
        """Plan a flight route between two airports with performance estimates.
    
        Args:
            departure: Dict with departure info (city, country, iata)
            arrival: Dict with arrival info (city, country, iata)
            aircraft: Optional aircraft config (ac_type, cruise_alt_ft, route_step_km)
            route_options: Optional route options
    
        Returns:
            JSON string with flight plan details
        """
        try:
            # Build request object
            request_data = {
                "depart_city": departure["city"],
                "arrive_city": arrival["city"],
            }
    
            # Add optional fields
            if departure.get("country"):
                request_data["depart_country"] = departure["country"]
            if arrival.get("country"):
                request_data["arrive_country"] = arrival["country"]
            if departure.get("iata"):
                request_data["prefer_depart_iata"] = departure["iata"]
            if arrival.get("iata"):
                request_data["prefer_arrive_iata"] = arrival["iata"]
    
            # Add aircraft options (ac_type is required by PlanRequest)
            if aircraft and aircraft.get("ac_type"):
                request_data["ac_type"] = aircraft["ac_type"]
                if aircraft.get("cruise_alt_ft"):
                    request_data["cruise_alt_ft"] = aircraft["cruise_alt_ft"]
                if aircraft.get("route_step_km"):
                    request_data["route_step_km"] = aircraft["route_step_km"]
            else:
                # Use a default aircraft type if none provided
                request_data["ac_type"] = "A320"  # Default aircraft type
    
            # Create and validate request
            try:
                request = PlanRequest(**request_data)
            except Exception as e:
                return f"Invalid request: {str(e)}"
    
            # Resolve airports
            try:
                depart_airport = _resolve_endpoint(
                    request.depart_city, request.depart_country, request.prefer_depart_iata
                )
                arrive_airport = _resolve_endpoint(
                    request.arrive_city, request.arrive_country, request.prefer_arrive_iata
                )
            except AirportResolutionError as e:
                return f"Airport resolution error: {str(e)}"
    
            # Calculate route
            route_points = great_circle_points(
                depart_airport.lat,
                depart_airport.lon,
                arrive_airport.lat,
                arrive_airport.lon,
                step_km=request.route_step_km,
            )
    
            # Build response
            response = {
                "departure": {
                    "airport": {
                        "iata": depart_airport.iata,
                        "icao": depart_airport.icao,
                        "name": depart_airport.name,
                        "city": depart_airport.city,
                        "country": depart_airport.country,
                        "coordinates": {
                            "lat": depart_airport.lat,
                            "lon": depart_airport.lon,
                        },
                    }
                },
                "arrival": {
                    "airport": {
                        "iata": arrive_airport.iata,
                        "icao": arrive_airport.icao,
                        "name": arrive_airport.name,
                        "city": arrive_airport.city,
                        "country": arrive_airport.country,
                        "coordinates": {
                            "lat": arrive_airport.lat,
                            "lon": arrive_airport.lon,
                        },
                    }
                },
                "route": {
                    "distance_km": route_points["distance_km"],
                    "distance_nm": route_points["distance_nm"],
                    "initial_bearing_deg": route_points["initial_bearing_deg"],
                    "final_bearing_deg": route_points["final_bearing_deg"],
                    "points": [
                        {"lat": p[0], "lon": p[1], "distance_km": p[2]}
                        for p in route_points["points"]
                    ],
                },
            }
    
            # Add performance estimates if available
            if request.ac_type and OPENAP_AVAILABLE:
                try:
                    performance, engine_name = estimates_openap(
                        request.ac_type,
                        request.cruise_alt_ft,
                        request.mass_kg,
                        route_points["distance_km"],
                    )
                    response["performance"] = performance
                    response["engine"] = engine_name
                except OpenAPError as e:
                    response["performance_note"] = (
                        f"Performance estimation failed: {str(e)}"
                    )
            elif request.ac_type:
                response["performance_note"] = (
                    f"OpenAP not available - no performance estimates for {request.ac_type}"
                )
    
            return json.dumps(response, indent=2)
    
        except Exception as e:
            logger.error(f"Flight planning error: {str(e)}", exc_info=True)
            return f"Flight planning error: {str(e)}"
  • Registration of the 'plan_flight' tool using FastMCP's mcp.tool(plan_flight) as part of the core flight planning tools.
    mcp = FastMCP("aerospace-mcp")
    
    # Register all tools
    # Core flight planning tools
    mcp.tool(search_airports)
    mcp.tool(plan_flight)
    mcp.tool(calculate_distance)
    mcp.tool(get_aircraft_performance)
    mcp.tool(get_system_status)
  • Pydantic schema (PlanRequest) used internally by the plan_flight handler for input validation and structuring the flight planning request.
    class PlanRequest(BaseModel):
        # You can pass cities, or override with explicit IATA
        depart_city: str = Field(..., description="e.g., 'San Jose'")
        arrive_city: str = Field(..., description="e.g., 'Tokyo'")
        depart_country: str | None = Field(
            None, description="ISO alpha-2 country code (optional)"
        )
        arrive_country: str | None = Field(
            None, description="ISO alpha-2 country code (optional)"
        )
        prefer_depart_iata: str | None = Field(
            None, description="Force a particular departure airport by IATA"
        )
        prefer_arrive_iata: str | None = Field(
            None, description="Force a particular arrival airport by IATA"
        )
    
        # Aircraft/performance knobs
        ac_type: str = Field(..., description="ICAO aircraft type (e.g., 'A320', 'B738')")
        cruise_alt_ft: int = Field(35000, ge=8000, le=45000)
        mass_kg: float | None = Field(
            None, description="If not set, defaults to 85% MTOW when available"
        )
        route_step_km: float = Field(
            25.0, gt=1.0, description="Sampling step for polyline points"
        )
        backend: Literal["openap"] = "openap"  # Placeholder for future backends
  • Helper function to compute great-circle polyline points and total distance between two lat/lon coordinates, used by the plan_flight handler.
    def great_circle_points(
        lat1: float, lon1: float, lat2: float, lon2: float, step_km: float
    ) -> tuple[list[tuple[float, float]], float]:
        g = Geodesic.WGS84.Inverse(lat1, lon1, lat2, lon2)
        dist_m = g["s12"]
        line = Geodesic.WGS84.Line(lat1, lon1, g["azi1"])
        n = max(1, int(math.ceil((dist_m / 1000.0) / step_km)))
        pts = []
        for i in range(n + 1):
            s = min(dist_m, (dist_m * i) / n)
            p = line.Position(s)
            pts.append((p["lat2"], p["lon2"]))
        return pts, dist_m / 1000.0
  • Helper function for generating flight performance estimates (time, fuel, etc.) using the OpenAP library, called within the plan_flight handler.
    def estimates_openap(
        ac_type: str, cruise_alt_ft: int, mass_kg: float | None, route_dist_km: float
    ) -> tuple[dict, str]:
        if not OPENAP_AVAILABLE:
            raise OpenAPError("OpenAP backend unavailable. Please `pip install openap`.")
    
        # Resolve mass default from aircraft properties (fallback to 85% MTOW if present)
        mass = mass_kg
        engine_note = "openap"
        try:
            ac_props = prop.aircraft(ac_type, use_synonym=True)
            mtow = (ac_props.get("limits") or {}).get("MTOW") or ac_props.get("mtow")
            if mass is None and mtow:
                mass = 0.85 * float(mtow)  # conservative default
            elif mass is None:
                # fallback generic narrowbody guess
                mass = 60_000.0
        except Exception:
            mass = mass or 60_000.0
    
        fgen = FlightGenerator(ac=ac_type)
        dt = 10  # seconds
    
        # Generate climb & descent segments at requested cruise altitude (if allowed)
        try:
            climb = fgen.climb(dt=dt, alt_cr=cruise_alt_ft)
        except TypeError:
            climb = fgen.climb(dt=dt)
    
        try:
            descent = fgen.descent(dt=dt, alt_cr=cruise_alt_ft)
        except TypeError:
            descent = fgen.descent(dt=dt)
    
        # Cruise segment baseline
        cruise_seg = fgen.cruise(dt=dt)
    
        def seg(df):
            t_s = float(df["t"].iloc[-1])
            dist_km = float(df["s"].iloc[-1]) / 1000.0  # 's' is meters in OpenAP docs
            alt_ft = float(df["altitude"].mean())
            gs_kts = float(df["groundspeed"].mean())
            vs_fpm = float(df["vertical_rate"].mean())
            return t_s, dist_km, alt_ft, gs_kts, vs_fpm
    
        t_climb, d_climb, a_climb, gs_climb, vs_climb = seg(climb)
        t_des, d_des, a_des, gs_des, vs_des = seg(descent)
        _, _, a_cru, gs_cru, _ = seg(cruise_seg)
    
        # How much cruise distance remains after climb+descent?
        d_remaining = max(0.0, route_dist_km - (d_climb + d_des))
        # Guard for super-short hops
        cruise_time_s = (
            0.0 if gs_cru <= 1e-6 else (d_remaining * KM_PER_NM) / (gs_cru / 3600.0 / 1.852)
        )  # but simpler to compute by kts:
        # Convert properly: kts = nm/hour → km/s = (kts * NM_PER_KM) / 3600
        cruise_time_s = (
            0.0 if gs_cru <= 1e-6 else (d_remaining / ((gs_cru * NM_PER_KM) / 3600.0))
        )
    
        fuelflow = FuelFlow(ac=ac_type)
    
        def fuel_from(
            avg_gs_kts: float, avg_alt_ft: float, vs_fpm: float, time_s: float
        ) -> float:
            # TAS ~ GS (zero-wind assumption for baseline)
            try:
                ff_kg_s = float(
                    fuelflow.enroute(mass=mass, tas=avg_gs_kts, alt=avg_alt_ft, vs=vs_fpm)
                )
            except Exception:
                ff_kg_s = 0.0
            return ff_kg_s * time_s
    
        fuel_climb = fuel_from(gs_climb, a_climb, vs_climb, t_climb)
        fuel_des = fuel_from(gs_des, a_des, vs_des, t_des)
        fuel_cru = fuel_from(gs_cru, cruise_alt_ft, 0.0, cruise_time_s)
    
        # Build segments
        climb_out = SegmentEst(
            time_min=t_climb / 60.0,
            distance_km=d_climb,
            avg_gs_kts=gs_climb,
            fuel_kg=fuel_climb,
        )
        cruise_out = SegmentEst(
            time_min=cruise_time_s / 60.0,
            distance_km=d_remaining,
            avg_gs_kts=gs_cru,
            fuel_kg=fuel_cru,
        )
        des_out = SegmentEst(
            time_min=t_des / 60.0, distance_km=d_des, avg_gs_kts=gs_des, fuel_kg=fuel_des
        )
    
        block_time_min = climb_out.time_min + cruise_out.time_min + des_out.time_min
        block_fuel_kg = climb_out.fuel_kg + cruise_out.fuel_kg + des_out.fuel_kg
    
        estimates = {
            "block": {"time_min": block_time_min, "fuel_kg": block_fuel_kg},
            "climb": climb_out.model_dump(),
            "cruise": cruise_out.model_dump(),
            "descent": des_out.model_dump(),
            "assumptions": {
                "zero_wind": True,
                "mass_kg": mass,
                "cruise_alt_ft": cruise_alt_ft,
            },
        }
        return estimates, engine_note

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/cheesejaguar/aerospace-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server