We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/cheesejaguar/aerospace-mcp'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""MCP Server implementation for Aerospace flight planning tools.
Ensures .env is loaded very early so environment-driven options are
available when importing submodules and defining tools.
"""
import asyncio
import json
import logging
import math
from dataclasses import asdict
# Load environment from .env before other imports that read env
try:
from dotenv import load_dotenv
load_dotenv()
except Exception:
pass
import mcp.server.sse
import mcp.server.stdio
from mcp.server import Server
from mcp.types import (
EmbeddedResource,
ImageContent,
TextContent,
Tool,
)
from pydantic import ValidationError
from .core import (
NM_PER_KM,
OPENAP_AVAILABLE,
AirportResolutionError,
OpenAPError,
PlanRequest,
_airport_from_iata,
_find_city_airports,
_resolve_endpoint,
estimates_openap,
great_circle_points,
)
from .integrations.orbits import vector_magnitude
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize MCP server
server = Server("aerospace-mcp")
# Tool definitions
TOOLS = [
Tool(
name="search_airports",
description="Search for airports by IATA code or city name",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "IATA code (e.g., 'SJC') or city name (e.g., 'San Jose')",
},
"country": {
"type": "string",
"description": "Optional ISO country code to filter by (e.g., 'US', 'JP')",
},
"query_type": {
"type": "string",
"enum": ["iata", "city", "auto"],
"description": "Type of query - 'iata' for IATA codes, 'city' for city names, 'auto' to detect",
"default": "auto",
},
},
"required": ["query"],
},
),
Tool(
name="plan_flight",
description="Plan a flight route between two airports with performance estimates",
inputSchema={
"type": "object",
"properties": {
"departure": {
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "Departure city name",
},
"country": {
"type": "string",
"description": "Departure country code (optional)",
},
"iata": {
"type": "string",
"description": "Preferred departure IATA code (optional)",
},
},
"required": ["city"],
},
"arrival": {
"type": "object",
"properties": {
"city": {"type": "string", "description": "Arrival city name"},
"country": {
"type": "string",
"description": "Arrival country code (optional)",
},
"iata": {
"type": "string",
"description": "Preferred arrival IATA code (optional)",
},
},
"required": ["city"],
},
"aircraft": {
"type": "object",
"properties": {
"type": {
"type": "string",
"description": "ICAO aircraft type (e.g., 'A320', 'B738', 'A359')",
},
"cruise_altitude": {
"type": "integer",
"description": "Cruise altitude in feet",
"minimum": 8000,
"maximum": 45000,
"default": 35000,
},
"mass_kg": {
"type": "number",
"description": "Aircraft mass in kg (optional, uses 85% MTOW if not specified)",
},
},
"required": ["type"],
},
"route_options": {
"type": "object",
"properties": {
"step_km": {
"type": "number",
"description": "Distance between polyline points in km",
"minimum": 1.0,
"default": 25.0,
}
},
},
},
"required": ["departure", "arrival", "aircraft"],
},
),
Tool(
name="calculate_distance",
description="Calculate great circle distance between two points",
inputSchema={
"type": "object",
"properties": {
"origin": {
"type": "object",
"properties": {
"latitude": {"type": "number", "minimum": -90, "maximum": 90},
"longitude": {
"type": "number",
"minimum": -180,
"maximum": 180,
},
},
"required": ["latitude", "longitude"],
},
"destination": {
"type": "object",
"properties": {
"latitude": {"type": "number", "minimum": -90, "maximum": 90},
"longitude": {
"type": "number",
"minimum": -180,
"maximum": 180,
},
},
"required": ["latitude", "longitude"],
},
"step_km": {
"type": "number",
"description": "Step size for polyline generation in km",
"minimum": 1.0,
"default": 50.0,
},
},
"required": ["origin", "destination"],
},
),
Tool(
name="get_aircraft_performance",
description="Get performance estimates for an aircraft type (requires OpenAP)",
inputSchema={
"type": "object",
"properties": {
"aircraft_type": {
"type": "string",
"description": "ICAO aircraft type code (e.g., 'A320', 'B738')",
},
"distance_km": {
"type": "number",
"description": "Route distance in kilometers",
"minimum": 1.0,
},
"cruise_altitude": {
"type": "integer",
"description": "Cruise altitude in feet",
"minimum": 8000,
"maximum": 45000,
"default": 35000,
},
"mass_kg": {
"type": "number",
"description": "Aircraft mass in kg (optional)",
},
},
"required": ["aircraft_type", "distance_km"],
},
),
Tool(
name="get_system_status",
description="Get system status and capabilities",
inputSchema={"type": "object", "properties": {}, "additionalProperties": False},
),
Tool(
name="get_atmosphere_profile",
description="Get atmospheric properties (pressure, temperature, density) at specified altitudes using ISA model",
inputSchema={
"type": "object",
"properties": {
"altitudes_m": {
"type": "array",
"items": {"type": "number", "minimum": 0, "maximum": 86000},
"description": "List of altitudes in meters (0-86000m)",
"minItems": 1,
},
"model_type": {
"type": "string",
"enum": ["ISA", "COESA"],
"default": "ISA",
"description": "Atmosphere model type",
},
},
"required": ["altitudes_m"],
"additionalProperties": False,
},
),
Tool(
name="wind_model_simple",
description="Calculate wind speeds at different altitudes using logarithmic or power law models",
inputSchema={
"type": "object",
"properties": {
"altitudes_m": {
"type": "array",
"items": {"type": "number", "minimum": 0},
"description": "List of altitudes in meters",
"minItems": 1,
},
"surface_wind_mps": {
"type": "number",
"minimum": 0,
"description": "Wind speed at reference height (m/s)",
},
"surface_altitude_m": {
"type": "number",
"default": 0.0,
"description": "Surface elevation in meters",
},
"model": {
"type": "string",
"enum": ["logarithmic", "power"],
"default": "logarithmic",
"description": "Wind profile model",
},
"roughness_length_m": {
"type": "number",
"minimum": 0.001,
"maximum": 10.0,
"default": 0.1,
"description": "Surface roughness length for logarithmic model",
},
},
"required": ["altitudes_m", "surface_wind_mps"],
"additionalProperties": False,
},
),
Tool(
name="transform_frames",
description="Transform coordinates between reference frames (ECEF, ECI, ITRF, GCRS, GEODETIC)",
inputSchema={
"type": "object",
"properties": {
"xyz": {
"type": "array",
"items": {"type": "number"},
"minItems": 3,
"maxItems": 3,
"description": "Coordinates [x, y, z] in meters (or lat, lon, alt for GEODETIC)",
},
"from_frame": {
"type": "string",
"enum": ["ECEF", "ECI", "ITRF", "GCRS", "GEODETIC"],
"description": "Source coordinate frame",
},
"to_frame": {
"type": "string",
"enum": ["ECEF", "ECI", "ITRF", "GCRS", "GEODETIC"],
"description": "Target coordinate frame",
},
"epoch_iso": {
"type": "string",
"default": "2000-01-01T12:00:00",
"description": "Reference epoch in ISO format",
},
},
"required": ["xyz", "from_frame", "to_frame"],
"additionalProperties": False,
},
),
Tool(
name="geodetic_to_ecef",
description="Convert geodetic coordinates (lat/lon/alt) to Earth-centered Earth-fixed (ECEF) coordinates",
inputSchema={
"type": "object",
"properties": {
"latitude_deg": {
"type": "number",
"minimum": -90,
"maximum": 90,
"description": "Latitude in degrees",
},
"longitude_deg": {
"type": "number",
"minimum": -180,
"maximum": 180,
"description": "Longitude in degrees",
},
"altitude_m": {
"type": "number",
"description": "Height above WGS84 ellipsoid in meters",
},
},
"required": ["latitude_deg", "longitude_deg", "altitude_m"],
"additionalProperties": False,
},
),
Tool(
name="ecef_to_geodetic",
description="Convert ECEF coordinates to geodetic (lat/lon/alt) coordinates",
inputSchema={
"type": "object",
"properties": {
"x": {"type": "number", "description": "ECEF X coordinate in meters"},
"y": {"type": "number", "description": "ECEF Y coordinate in meters"},
"z": {"type": "number", "description": "ECEF Z coordinate in meters"},
},
"required": ["x", "y", "z"],
"additionalProperties": False,
},
),
Tool(
name="wing_vlm_analysis",
description="Analyze wing aerodynamics using Vortex Lattice Method or simplified lifting line theory",
inputSchema={
"type": "object",
"properties": {
"geometry": {
"type": "object",
"properties": {
"span_m": {"type": "number", "minimum": 0.1, "maximum": 100},
"chord_root_m": {
"type": "number",
"minimum": 0.05,
"maximum": 10,
},
"chord_tip_m": {
"type": "number",
"minimum": 0.05,
"maximum": 10,
},
"sweep_deg": {
"type": "number",
"minimum": -45,
"maximum": 45,
"default": 0,
},
"dihedral_deg": {
"type": "number",
"minimum": -15,
"maximum": 15,
"default": 0,
},
"twist_deg": {
"type": "number",
"minimum": -10,
"maximum": 10,
"default": 0,
},
"airfoil_root": {"type": "string", "default": "NACA2412"},
"airfoil_tip": {"type": "string", "default": "NACA2412"},
},
"required": ["span_m", "chord_root_m", "chord_tip_m"],
"description": "Wing planform geometry",
},
"alpha_deg_list": {
"type": "array",
"items": {"type": "number", "minimum": -30, "maximum": 30},
"minItems": 1,
"description": "List of angles of attack to analyze (degrees)",
},
"mach": {
"type": "number",
"minimum": 0.05,
"maximum": 0.8,
"default": 0.2,
"description": "Mach number",
},
},
"required": ["geometry", "alpha_deg_list"],
"additionalProperties": False,
},
),
Tool(
name="airfoil_polar_analysis",
description="Generate airfoil polar data (CL, CD, CM vs alpha) using database or advanced methods",
inputSchema={
"type": "object",
"properties": {
"airfoil_name": {
"type": "string",
"description": "Airfoil name (e.g., NACA2412, NACA0012, CLARKY)",
},
"alpha_deg_list": {
"type": "array",
"items": {"type": "number", "minimum": -25, "maximum": 25},
"minItems": 1,
"description": "Angles of attack to analyze (degrees)",
},
"reynolds": {
"type": "number",
"minimum": 50000,
"maximum": 10000000,
"default": 1000000,
"description": "Reynolds number",
},
"mach": {
"type": "number",
"minimum": 0.05,
"maximum": 0.7,
"default": 0.1,
"description": "Mach number",
},
},
"required": ["airfoil_name", "alpha_deg_list"],
"additionalProperties": False,
},
),
Tool(
name="calculate_stability_derivatives",
description="Calculate basic longitudinal stability derivatives for a wing",
inputSchema={
"type": "object",
"properties": {
"geometry": {
"type": "object",
"properties": {
"span_m": {"type": "number", "minimum": 0.1, "maximum": 100},
"chord_root_m": {
"type": "number",
"minimum": 0.05,
"maximum": 10,
},
"chord_tip_m": {
"type": "number",
"minimum": 0.05,
"maximum": 10,
},
"sweep_deg": {
"type": "number",
"minimum": -45,
"maximum": 45,
"default": 0,
},
"airfoil_root": {"type": "string", "default": "NACA2412"},
},
"required": ["span_m", "chord_root_m", "chord_tip_m"],
"description": "Wing planform geometry",
},
"alpha_deg": {
"type": "number",
"minimum": -10,
"maximum": 10,
"default": 2.0,
"description": "Reference angle of attack",
},
"mach": {
"type": "number",
"minimum": 0.05,
"maximum": 0.8,
"default": 0.2,
"description": "Mach number",
},
},
"required": ["geometry"],
"additionalProperties": False,
},
),
Tool(
name="propeller_bemt_analysis",
description="Analyze propeller performance using Blade Element Momentum Theory",
inputSchema={
"type": "object",
"properties": {
"geometry": {
"type": "object",
"properties": {
"diameter_m": {
"type": "number",
"minimum": 0.05,
"maximum": 5.0,
},
"pitch_m": {"type": "number", "minimum": 0.02, "maximum": 3.0},
"num_blades": {"type": "integer", "minimum": 2, "maximum": 6},
"activity_factor": {
"type": "number",
"minimum": 50,
"maximum": 200,
"default": 100,
},
"cl_design": {
"type": "number",
"minimum": 0.1,
"maximum": 1.5,
"default": 0.5,
},
"cd_design": {
"type": "number",
"minimum": 0.005,
"maximum": 0.1,
"default": 0.02,
},
},
"required": ["diameter_m", "pitch_m", "num_blades"],
"description": "Propeller geometry parameters",
},
"rpm_list": {
"type": "array",
"items": {"type": "number", "minimum": 100, "maximum": 15000},
"minItems": 1,
"description": "List of RPM values to analyze",
},
"velocity_ms": {
"type": "number",
"minimum": 0,
"maximum": 100,
"default": 0,
"description": "Forward velocity in m/s (0 for static thrust)",
},
"altitude_m": {
"type": "number",
"minimum": 0,
"maximum": 15000,
"default": 0,
"description": "Altitude in meters",
},
},
"required": ["geometry", "rpm_list"],
"additionalProperties": False,
},
),
Tool(
name="uav_energy_estimate",
description="Estimate UAV flight time and energy consumption for mission planning",
inputSchema={
"type": "object",
"properties": {
"uav_config": {
"type": "object",
"properties": {
"mass_kg": {"type": "number", "minimum": 0.1, "maximum": 100},
"wing_area_m2": {"type": "number", "minimum": 0, "maximum": 50},
"disk_area_m2": {"type": "number", "minimum": 0, "maximum": 10},
"cd0": {
"type": "number",
"minimum": 0.01,
"maximum": 0.2,
"default": 0.03,
},
"cl_cruise": {"type": "number", "minimum": 0.1, "maximum": 2.0},
"num_motors": {
"type": "integer",
"minimum": 1,
"maximum": 8,
"default": 1,
},
"motor_efficiency": {
"type": "number",
"minimum": 0.5,
"maximum": 1.0,
"default": 0.85,
},
"esc_efficiency": {
"type": "number",
"minimum": 0.8,
"maximum": 1.0,
"default": 0.95,
},
},
"required": ["mass_kg"],
"description": "UAV configuration parameters",
},
"battery_config": {
"type": "object",
"properties": {
"capacity_ah": {
"type": "number",
"minimum": 0.1,
"maximum": 50,
},
"voltage_nominal_v": {
"type": "number",
"minimum": 3,
"maximum": 50,
},
"mass_kg": {"type": "number", "minimum": 0.01, "maximum": 20},
"energy_density_wh_kg": {
"type": "number",
"minimum": 50,
"maximum": 300,
"default": 150,
},
"discharge_efficiency": {
"type": "number",
"minimum": 0.8,
"maximum": 1.0,
"default": 0.95,
},
},
"required": ["capacity_ah", "voltage_nominal_v", "mass_kg"],
"description": "Battery configuration",
},
"mission_profile": {
"type": "object",
"properties": {
"velocity_ms": {
"type": "number",
"minimum": 1,
"maximum": 50,
"default": 15,
},
"altitude_m": {
"type": "number",
"minimum": 0,
"maximum": 5000,
"default": 100,
},
},
"description": "Mission parameters",
},
},
"required": ["uav_config", "battery_config"],
"additionalProperties": False,
},
),
Tool(
name="get_airfoil_database",
description="Get available airfoil database with aerodynamic coefficients",
inputSchema={"type": "object", "properties": {}, "additionalProperties": False},
),
Tool(
name="get_propeller_database",
description="Get available propeller database with geometric and performance data",
inputSchema={"type": "object", "properties": {}, "additionalProperties": False},
),
Tool(
name="rocket_3dof_trajectory",
description="Calculate 3DOF rocket trajectory using numerical integration",
inputSchema={
"type": "object",
"properties": {
"geometry": {
"type": "object",
"properties": {
"dry_mass_kg": {
"type": "number",
"minimum": 0.1,
"maximum": 100000,
},
"propellant_mass_kg": {
"type": "number",
"minimum": 0.1,
"maximum": 500000,
},
"diameter_m": {
"type": "number",
"minimum": 0.01,
"maximum": 10,
},
"length_m": {"type": "number", "minimum": 0.1, "maximum": 100},
"cd": {
"type": "number",
"minimum": 0.1,
"maximum": 2.0,
"default": 0.3,
},
"thrust_curve": {
"type": "array",
"items": {
"type": "array",
"items": {"type": "number"},
"minItems": 2,
"maxItems": 2,
},
"description": "Array of [time_s, thrust_N] points",
},
},
"required": [
"dry_mass_kg",
"propellant_mass_kg",
"diameter_m",
"length_m",
],
"description": "Rocket geometry and mass properties",
},
"dt_s": {
"type": "number",
"minimum": 0.01,
"maximum": 1.0,
"default": 0.1,
"description": "Time step for integration (seconds)",
},
"max_time_s": {
"type": "number",
"minimum": 10,
"maximum": 1000,
"default": 300,
"description": "Maximum simulation time (seconds)",
},
"launch_angle_deg": {
"type": "number",
"minimum": 45,
"maximum": 90,
"default": 90,
"description": "Launch angle from horizontal (degrees)",
},
},
"required": ["geometry"],
"additionalProperties": False,
},
),
Tool(
name="estimate_rocket_sizing",
description="Estimate rocket sizing requirements for target altitude and payload",
inputSchema={
"type": "object",
"properties": {
"target_altitude_m": {
"type": "number",
"minimum": 100,
"maximum": 100000,
"description": "Target apogee altitude in meters",
},
"payload_mass_kg": {
"type": "number",
"minimum": 0.01,
"maximum": 10000,
"description": "Payload mass in kg",
},
"propellant_type": {
"type": "string",
"enum": ["solid", "liquid"],
"default": "solid",
"description": "Propellant type",
},
},
"required": ["target_altitude_m", "payload_mass_kg"],
"additionalProperties": False,
},
),
Tool(
name="optimize_launch_angle",
description="Optimize rocket launch angle for maximum altitude or range",
inputSchema={
"type": "object",
"properties": {
"geometry": {
"type": "object",
"properties": {
"dry_mass_kg": {
"type": "number",
"minimum": 0.1,
"maximum": 100000,
},
"propellant_mass_kg": {
"type": "number",
"minimum": 0.1,
"maximum": 500000,
},
"diameter_m": {
"type": "number",
"minimum": 0.01,
"maximum": 10,
},
"length_m": {"type": "number", "minimum": 0.1, "maximum": 100},
"cd": {
"type": "number",
"minimum": 0.1,
"maximum": 2.0,
"default": 0.3,
},
"thrust_curve": {
"type": "array",
"items": {
"type": "array",
"items": {"type": "number"},
"minItems": 2,
"maxItems": 2,
},
"description": "Array of [time_s, thrust_N] points",
},
},
"required": [
"dry_mass_kg",
"propellant_mass_kg",
"diameter_m",
"length_m",
],
"description": "Rocket geometry and mass properties",
},
"objective": {
"type": "string",
"enum": ["max_altitude", "max_range"],
"default": "max_altitude",
"description": "Optimization objective",
},
"angle_bounds": {
"type": "array",
"items": {"type": "number", "minimum": 45, "maximum": 90},
"minItems": 2,
"maxItems": 2,
"default": [80, 90],
"description": "Launch angle bounds [min_deg, max_deg]",
},
},
"required": ["geometry"],
"additionalProperties": False,
},
),
Tool(
name="optimize_thrust_profile",
description="Optimize rocket thrust profile for better performance using trajectory optimization",
inputSchema={
"type": "object",
"properties": {
"geometry": {
"type": "object",
"properties": {
"dry_mass_kg": {
"type": "number",
"minimum": 0.1,
"maximum": 100000,
},
"propellant_mass_kg": {
"type": "number",
"minimum": 0.1,
"maximum": 500000,
},
"diameter_m": {
"type": "number",
"minimum": 0.01,
"maximum": 10,
},
"length_m": {"type": "number", "minimum": 0.1, "maximum": 100},
"cd": {
"type": "number",
"minimum": 0.1,
"maximum": 2.0,
"default": 0.3,
},
},
"required": [
"dry_mass_kg",
"propellant_mass_kg",
"diameter_m",
"length_m",
],
"description": "Base rocket geometry (thrust curve will be optimized)",
},
"burn_time_s": {
"type": "number",
"minimum": 1,
"maximum": 300,
"description": "Total burn time in seconds",
},
"total_impulse_target": {
"type": "number",
"minimum": 100,
"maximum": 10000000,
"description": "Target total impulse (N·s)",
},
"n_segments": {
"type": "integer",
"minimum": 3,
"maximum": 10,
"default": 5,
"description": "Number of thrust profile segments",
},
"objective": {
"type": "string",
"enum": ["max_altitude", "min_max_q", "min_gravity_loss"],
"default": "max_altitude",
"description": "Optimization objective",
},
},
"required": ["geometry", "burn_time_s", "total_impulse_target"],
"additionalProperties": False,
},
),
Tool(
name="trajectory_sensitivity_analysis",
description="Perform sensitivity analysis on rocket trajectory parameters",
inputSchema={
"type": "object",
"properties": {
"base_geometry": {
"type": "object",
"properties": {
"dry_mass_kg": {
"type": "number",
"minimum": 0.1,
"maximum": 100000,
},
"propellant_mass_kg": {
"type": "number",
"minimum": 0.1,
"maximum": 500000,
},
"diameter_m": {
"type": "number",
"minimum": 0.01,
"maximum": 10,
},
"length_m": {"type": "number", "minimum": 0.1, "maximum": 100},
"cd": {
"type": "number",
"minimum": 0.1,
"maximum": 2.0,
"default": 0.3,
},
"thrust_curve": {
"type": "array",
"items": {
"type": "array",
"items": {"type": "number"},
"minItems": 2,
"maxItems": 2,
},
"description": "Array of [time_s, thrust_N] points",
},
},
"required": [
"dry_mass_kg",
"propellant_mass_kg",
"diameter_m",
"length_m",
],
"description": "Baseline rocket geometry",
},
"parameter_variations": {
"type": "object",
"description": "Parameters to vary with their variation ranges",
"additionalProperties": {
"type": "array",
"items": {"type": "number"},
"minItems": 3,
},
},
"objective": {
"type": "string",
"enum": ["max_altitude", "max_velocity", "specific_impulse"],
"default": "max_altitude",
"description": "Objective metric for sensitivity analysis",
},
},
"required": ["base_geometry", "parameter_variations"],
"additionalProperties": False,
},
),
Tool(
name="elements_to_state_vector",
description="Convert orbital elements to state vector in J2000 frame",
inputSchema={
"type": "object",
"properties": {
"elements": {
"type": "object",
"properties": {
"semi_major_axis_m": {
"type": "number",
"minimum": 6.6e6,
"maximum": 1e12,
},
"eccentricity": {
"type": "number",
"minimum": 0.0,
"maximum": 0.99,
},
"inclination_deg": {
"type": "number",
"minimum": 0.0,
"maximum": 180.0,
},
"raan_deg": {
"type": "number",
"minimum": 0.0,
"maximum": 360.0,
},
"arg_periapsis_deg": {
"type": "number",
"minimum": 0.0,
"maximum": 360.0,
},
"true_anomaly_deg": {
"type": "number",
"minimum": 0.0,
"maximum": 360.0,
},
"epoch_utc": {
"type": "string",
"description": "Epoch in UTC ISO format",
},
},
"required": [
"semi_major_axis_m",
"eccentricity",
"inclination_deg",
"raan_deg",
"arg_periapsis_deg",
"true_anomaly_deg",
"epoch_utc",
],
"description": "Classical orbital elements",
}
},
"required": ["elements"],
"additionalProperties": False,
},
),
Tool(
name="state_vector_to_elements",
description="Convert state vector to classical orbital elements",
inputSchema={
"type": "object",
"properties": {
"state": {
"type": "object",
"properties": {
"position_m": {
"type": "array",
"items": {"type": "number"},
"minItems": 3,
"maxItems": 3,
"description": "Position vector [x, y, z] in meters",
},
"velocity_ms": {
"type": "array",
"items": {"type": "number"},
"minItems": 3,
"maxItems": 3,
"description": "Velocity vector [vx, vy, vz] in m/s",
},
"epoch_utc": {
"type": "string",
"description": "Epoch in UTC ISO format",
},
"frame": {
"type": "string",
"default": "J2000",
"description": "Reference frame",
},
},
"required": ["position_m", "velocity_ms", "epoch_utc"],
"description": "Spacecraft state vector",
}
},
"required": ["state"],
"additionalProperties": False,
},
),
Tool(
name="propagate_orbit_j2",
description="Propagate orbit with J2 perturbations using numerical integration",
inputSchema={
"type": "object",
"properties": {
"initial_state": {
"type": "object",
"properties": {
"position_m": {
"type": "array",
"items": {"type": "number"},
"minItems": 3,
"maxItems": 3,
"description": "Position vector [x, y, z] in meters",
},
"velocity_ms": {
"type": "array",
"items": {"type": "number"},
"minItems": 3,
"maxItems": 3,
"description": "Velocity vector [vx, vy, vz] in m/s",
},
"epoch_utc": {
"type": "string",
"description": "Epoch in UTC ISO format",
},
"frame": {
"type": "string",
"default": "J2000",
"description": "Reference frame",
},
},
"required": ["position_m", "velocity_ms", "epoch_utc"],
"description": "Initial spacecraft state",
},
"time_span_s": {
"type": "number",
"minimum": 60,
"maximum": 86400 * 30,
"description": "Propagation time span (seconds)",
},
"time_step_s": {
"type": "number",
"minimum": 1,
"maximum": 3600,
"default": 60,
"description": "Integration time step (seconds)",
},
},
"required": ["initial_state", "time_span_s"],
"additionalProperties": False,
},
),
Tool(
name="calculate_ground_track",
description="Calculate ground track from orbital state vectors",
inputSchema={
"type": "object",
"properties": {
"orbit_states": {
"type": "array",
"items": {
"type": "object",
"properties": {
"position_m": {
"type": "array",
"items": {"type": "number"},
"minItems": 3,
"maxItems": 3,
},
"velocity_ms": {
"type": "array",
"items": {"type": "number"},
"minItems": 3,
"maxItems": 3,
},
"epoch_utc": {"type": "string"},
"frame": {"type": "string", "default": "J2000"},
},
"required": ["position_m", "velocity_ms", "epoch_utc"],
},
"minItems": 1,
"description": "Array of orbital state vectors",
},
"time_step_s": {
"type": "number",
"minimum": 1,
"maximum": 3600,
"default": 60,
"description": "Time step between states (seconds)",
},
},
"required": ["orbit_states"],
"additionalProperties": False,
},
),
Tool(
name="hohmann_transfer",
description="Calculate Hohmann transfer orbit parameters between two circular orbits",
inputSchema={
"type": "object",
"properties": {
"r1_m": {
"type": "number",
"minimum": 6.6e6,
"maximum": 1e12,
"description": "Initial circular orbit radius from Earth center (m)",
},
"r2_m": {
"type": "number",
"minimum": 6.6e6,
"maximum": 1e12,
"description": "Final circular orbit radius from Earth center (m)",
},
},
"required": ["r1_m", "r2_m"],
"additionalProperties": False,
},
),
Tool(
name="orbital_rendezvous_planning",
description="Plan orbital rendezvous maneuvers between two spacecraft",
inputSchema={
"type": "object",
"properties": {
"chaser_elements": {
"type": "object",
"properties": {
"semi_major_axis_m": {
"type": "number",
"minimum": 6.6e6,
"maximum": 1e12,
},
"eccentricity": {
"type": "number",
"minimum": 0.0,
"maximum": 0.99,
},
"inclination_deg": {
"type": "number",
"minimum": 0.0,
"maximum": 180.0,
},
"raan_deg": {
"type": "number",
"minimum": 0.0,
"maximum": 360.0,
},
"arg_periapsis_deg": {
"type": "number",
"minimum": 0.0,
"maximum": 360.0,
},
"true_anomaly_deg": {
"type": "number",
"minimum": 0.0,
"maximum": 360.0,
},
"epoch_utc": {"type": "string"},
},
"required": [
"semi_major_axis_m",
"eccentricity",
"inclination_deg",
"raan_deg",
"arg_periapsis_deg",
"true_anomaly_deg",
"epoch_utc",
],
"description": "Chaser spacecraft orbital elements",
},
"target_elements": {
"type": "object",
"properties": {
"semi_major_axis_m": {
"type": "number",
"minimum": 6.6e6,
"maximum": 1e12,
},
"eccentricity": {
"type": "number",
"minimum": 0.0,
"maximum": 0.99,
},
"inclination_deg": {
"type": "number",
"minimum": 0.0,
"maximum": 180.0,
},
"raan_deg": {
"type": "number",
"minimum": 0.0,
"maximum": 360.0,
},
"arg_periapsis_deg": {
"type": "number",
"minimum": 0.0,
"maximum": 360.0,
},
"true_anomaly_deg": {
"type": "number",
"minimum": 0.0,
"maximum": 360.0,
},
"epoch_utc": {"type": "string"},
},
"required": [
"semi_major_axis_m",
"eccentricity",
"inclination_deg",
"raan_deg",
"arg_periapsis_deg",
"true_anomaly_deg",
"epoch_utc",
],
"description": "Target spacecraft orbital elements",
},
},
"required": ["chaser_elements", "target_elements"],
"additionalProperties": False,
},
),
Tool(
name="genetic_algorithm_optimization",
description="Optimize spacecraft trajectory using genetic algorithm",
inputSchema={
"type": "object",
"properties": {
"initial_trajectory": {
"type": "array",
"items": {
"type": "object",
"properties": {
"time_s": {"type": "number"},
"position_m": {
"type": "array",
"items": {"type": "number"},
"minItems": 3,
"maxItems": 3,
},
"velocity_ms": {
"type": "array",
"items": {"type": "number"},
"minItems": 3,
"maxItems": 3,
},
"thrust_n": {
"type": "array",
"items": {"type": "number"},
"minItems": 3,
"maxItems": 3,
},
"mass_kg": {
"type": "number",
"minimum": 100,
"maximum": 10000,
},
},
"required": ["time_s", "position_m", "velocity_ms", "mass_kg"],
},
"minItems": 2,
"maxItems": 20,
"description": "Initial trajectory waypoints",
},
"objective": {
"type": "object",
"properties": {
"type": {
"type": "string",
"enum": [
"minimize_fuel",
"minimize_time",
"minimize_delta_v",
"maximize_payload",
],
},
"target_state": {
"type": "array",
"items": {"type": "number"},
"minItems": 6,
"maxItems": 6,
"description": "Target state [x, y, z, vx, vy, vz] if applicable",
},
},
"required": ["type"],
"description": "Optimization objective",
},
"constraints": {
"type": "object",
"properties": {
"max_thrust_n": {
"type": "number",
"minimum": 1,
"maximum": 100000,
"default": 10000,
},
"max_acceleration_ms2": {
"type": "number",
"minimum": 0.1,
"maximum": 100,
"default": 50,
},
"min_altitude_m": {
"type": "number",
"minimum": 150000,
"maximum": 1000000,
"default": 200000,
},
"max_delta_v_ms": {
"type": "number",
"minimum": 100,
"maximum": 15000,
"default": 5000,
},
},
"description": "Optimization constraints",
},
"ga_params": {
"type": "object",
"properties": {
"population_size": {
"type": "integer",
"minimum": 10,
"maximum": 200,
"default": 50,
},
"generations": {
"type": "integer",
"minimum": 10,
"maximum": 500,
"default": 100,
},
"mutation_rate": {
"type": "number",
"minimum": 0.01,
"maximum": 0.5,
"default": 0.1,
},
"crossover_rate": {
"type": "number",
"minimum": 0.1,
"maximum": 1.0,
"default": 0.8,
},
},
"description": "Genetic algorithm parameters",
},
},
"required": ["initial_trajectory", "objective"],
"additionalProperties": False,
},
),
Tool(
name="particle_swarm_optimization",
description="Optimize spacecraft trajectory using particle swarm optimization",
inputSchema={
"type": "object",
"properties": {
"initial_trajectory": {
"type": "array",
"items": {
"type": "object",
"properties": {
"time_s": {"type": "number"},
"position_m": {
"type": "array",
"items": {"type": "number"},
"minItems": 3,
"maxItems": 3,
},
"velocity_ms": {
"type": "array",
"items": {"type": "number"},
"minItems": 3,
"maxItems": 3,
},
"thrust_n": {
"type": "array",
"items": {"type": "number"},
"minItems": 3,
"maxItems": 3,
},
"mass_kg": {
"type": "number",
"minimum": 100,
"maximum": 10000,
},
},
"required": ["time_s", "position_m", "velocity_ms", "mass_kg"],
},
"minItems": 2,
"maxItems": 20,
"description": "Initial trajectory waypoints",
},
"objective": {
"type": "object",
"properties": {
"type": {
"type": "string",
"enum": [
"minimize_fuel",
"minimize_time",
"minimize_delta_v",
"maximize_payload",
],
},
"target_state": {
"type": "array",
"items": {"type": "number"},
"minItems": 6,
"maxItems": 6,
"description": "Target state [x, y, z, vx, vy, vz] if applicable",
},
},
"required": ["type"],
"description": "Optimization objective",
},
"constraints": {
"type": "object",
"properties": {
"max_thrust_n": {
"type": "number",
"minimum": 1,
"maximum": 100000,
"default": 10000,
},
"max_acceleration_ms2": {
"type": "number",
"minimum": 0.1,
"maximum": 100,
"default": 50,
},
"min_altitude_m": {
"type": "number",
"minimum": 150000,
"maximum": 1000000,
"default": 200000,
},
"max_delta_v_ms": {
"type": "number",
"minimum": 100,
"maximum": 15000,
"default": 5000,
},
},
"description": "Optimization constraints",
},
"pso_params": {
"type": "object",
"properties": {
"num_particles": {
"type": "integer",
"minimum": 10,
"maximum": 100,
"default": 30,
},
"max_iterations": {
"type": "integer",
"minimum": 10,
"maximum": 300,
"default": 100,
},
"w": {
"type": "number",
"minimum": 0.1,
"maximum": 1.0,
"default": 0.7,
},
"c1": {
"type": "number",
"minimum": 0.5,
"maximum": 3.0,
"default": 1.5,
},
"c2": {
"type": "number",
"minimum": 0.5,
"maximum": 3.0,
"default": 1.5,
},
},
"description": "Particle swarm optimization parameters",
},
},
"required": ["initial_trajectory", "objective"],
"additionalProperties": False,
},
),
Tool(
name="porkchop_plot_analysis",
description="Generate porkchop plot for interplanetary transfer opportunities",
inputSchema={
"type": "object",
"properties": {
"departure_body": {
"type": "string",
"enum": ["Earth", "Mars", "Venus", "Jupiter"],
"default": "Earth",
"description": "Departure celestial body",
},
"arrival_body": {
"type": "string",
"enum": ["Earth", "Mars", "Venus", "Jupiter"],
"default": "Mars",
"description": "Arrival celestial body",
},
"departure_dates": {
"type": "array",
"items": {"type": "string"},
"description": "Optional list of departure dates (ISO format)",
},
"arrival_dates": {
"type": "array",
"items": {"type": "string"},
"description": "Optional list of arrival dates (ISO format)",
},
"min_tof_days": {
"type": "integer",
"minimum": 30,
"maximum": 1000,
"default": 100,
"description": "Minimum time of flight in days",
},
"max_tof_days": {
"type": "integer",
"minimum": 50,
"maximum": 2000,
"default": 400,
"description": "Maximum time of flight in days",
},
},
"required": [],
},
),
Tool(
name="monte_carlo_uncertainty_analysis",
description="Perform Monte Carlo uncertainty analysis on spacecraft trajectory",
inputSchema={
"type": "object",
"properties": {
"nominal_trajectory": {
"type": "array",
"items": {
"type": "object",
"properties": {
"time_s": {"type": "number"},
"position_m": {
"type": "array",
"items": {"type": "number"},
"minItems": 3,
"maxItems": 3,
},
"velocity_ms": {
"type": "array",
"items": {"type": "number"},
"minItems": 3,
"maxItems": 3,
},
"thrust_n": {
"type": "array",
"items": {"type": "number"},
"minItems": 3,
"maxItems": 3,
},
"mass_kg": {"type": "number"},
},
"required": ["time_s", "position_m", "velocity_ms", "mass_kg"],
},
"minItems": 2,
"description": "Nominal trajectory waypoints",
},
"uncertainty_params": {
"type": "object",
"properties": {
"position_m": {
"type": "object",
"properties": {
"std": {
"type": "number",
"minimum": 1,
"maximum": 10000,
"default": 100,
}
},
"description": "Position uncertainty parameters",
},
"velocity_ms": {
"type": "object",
"properties": {
"std": {
"type": "number",
"minimum": 0.1,
"maximum": 100,
"default": 10,
}
},
"description": "Velocity uncertainty parameters",
},
"thrust_n": {
"type": "object",
"properties": {
"std": {
"type": "number",
"minimum": 1,
"maximum": 1000,
"default": 50,
}
},
"description": "Thrust uncertainty parameters",
},
},
"description": "Uncertainty parameter definitions",
},
"n_samples": {
"type": "integer",
"minimum": 100,
"maximum": 10000,
"default": 1000,
"description": "Number of Monte Carlo samples",
},
},
"required": ["nominal_trajectory"],
"additionalProperties": False,
},
),
]
@server.list_tools()
async def handle_list_tools() -> list[Tool]:
"""List all available tools."""
return TOOLS
@server.call_tool()
async def handle_call_tool(
name: str, arguments: dict
) -> list[TextContent | ImageContent | EmbeddedResource]:
"""Handle tool calls."""
try:
if name == "search_airports":
return await _handle_search_airports(arguments)
elif name == "plan_flight":
return await _handle_plan_flight(arguments)
elif name == "calculate_distance":
return await _handle_calculate_distance(arguments)
elif name == "get_aircraft_performance":
return await _handle_get_aircraft_performance(arguments)
elif name == "get_system_status":
return await _handle_get_system_status(arguments)
elif name == "get_atmosphere_profile":
return await _handle_get_atmosphere_profile(arguments)
elif name == "wind_model_simple":
return await _handle_wind_model_simple(arguments)
elif name == "transform_frames":
return await _handle_transform_frames(arguments)
elif name == "geodetic_to_ecef":
return await _handle_geodetic_to_ecef(arguments)
elif name == "ecef_to_geodetic":
return await _handle_ecef_to_geodetic(arguments)
elif name == "wing_vlm_analysis":
return await _handle_wing_vlm_analysis(arguments)
elif name == "airfoil_polar_analysis":
return await _handle_airfoil_polar_analysis(arguments)
elif name == "calculate_stability_derivatives":
return await _handle_calculate_stability_derivatives(arguments)
elif name == "propeller_bemt_analysis":
return await _handle_propeller_bemt_analysis(arguments)
elif name == "uav_energy_estimate":
return await _handle_uav_energy_estimate(arguments)
elif name == "get_airfoil_database":
return await _handle_get_airfoil_database(arguments)
elif name == "get_propeller_database":
return await _handle_get_propeller_database(arguments)
elif name == "rocket_3dof_trajectory":
return await _handle_rocket_3dof_trajectory(arguments)
elif name == "estimate_rocket_sizing":
return await _handle_estimate_rocket_sizing(arguments)
elif name == "optimize_launch_angle":
return await _handle_optimize_launch_angle(arguments)
elif name == "optimize_thrust_profile":
return await _handle_optimize_thrust_profile(arguments)
elif name == "trajectory_sensitivity_analysis":
return await _handle_trajectory_sensitivity_analysis(arguments)
elif name == "elements_to_state_vector":
return await _handle_elements_to_state_vector(arguments)
elif name == "state_vector_to_elements":
return await _handle_state_vector_to_elements(arguments)
elif name == "propagate_orbit_j2":
return await _handle_propagate_orbit_j2(arguments)
elif name == "calculate_ground_track":
return await _handle_calculate_ground_track(arguments)
elif name == "hohmann_transfer":
return await _handle_hohmann_transfer(arguments)
elif name == "orbital_rendezvous_planning":
return await _handle_orbital_rendezvous_planning(arguments)
elif name == "genetic_algorithm_optimization":
return await _handle_genetic_algorithm_optimization(arguments)
elif name == "particle_swarm_optimization":
return await _handle_particle_swarm_optimization(arguments)
elif name == "porkchop_plot_analysis":
return await _handle_porkchop_plot_analysis(arguments)
elif name == "monte_carlo_uncertainty_analysis":
return await _handle_monte_carlo_uncertainty_analysis(arguments)
else:
raise ValueError(f"Unknown tool: {name}")
except Exception as e:
logger.error(f"Error in tool {name}: {str(e)}", exc_info=True)
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def _handle_search_airports(arguments: dict) -> list[TextContent]:
"""Handle airport search requests."""
query = arguments.get("query", "").strip()
country = arguments.get("country")
query_type = arguments.get("query_type", "auto")
if not query:
return [TextContent(type="text", text="Error: Query parameter is required")]
results = []
# Auto-detect query type if needed
if query_type == "auto":
query_type = "iata" if len(query) == 3 and query.isalpha() else "city"
try:
if query_type == "iata":
# Search by IATA code
airport = _airport_from_iata(query)
if airport:
results = [airport]
else:
# Search by city name
results = _find_city_airports(query, country)
if not results:
message = f"No airports found for {query_type} '{query}'"
if country:
message += f" in country '{country}'"
return [TextContent(type="text", text=message)]
# Format results
response_lines = [f"Found {len(results)} airport(s):"]
for airport in results:
line = f"• {airport.iata} ({airport.icao}) - {airport.name}"
line += f"\n City: {airport.city}, {airport.country}"
line += f"\n Coordinates: {airport.lat:.4f}, {airport.lon:.4f}"
if airport.tz:
line += f"\n Timezone: {airport.tz}"
response_lines.append(line)
return [TextContent(type="text", text="\n\n".join(response_lines))]
except Exception as e:
return [TextContent(type="text", text=f"Search error: {str(e)}")]
async def _handle_plan_flight(arguments: dict) -> list[TextContent]:
"""Handle flight planning requests."""
try:
# Extract parameters
departure = arguments.get("departure", {})
arrival = arguments.get("arrival", {})
aircraft = arguments.get("aircraft", {})
route_options = arguments.get("route_options", {})
# Build PlanRequest
plan_request = PlanRequest(
depart_city=departure.get("city", ""),
arrive_city=arrival.get("city", ""),
depart_country=departure.get("country"),
arrive_country=arrival.get("country"),
prefer_depart_iata=departure.get("iata"),
prefer_arrive_iata=arrival.get("iata"),
ac_type=aircraft.get("type", ""),
cruise_alt_ft=aircraft.get("cruise_altitude", 35000),
mass_kg=aircraft.get("mass_kg"),
route_step_km=route_options.get("step_km", 25.0),
)
# Validate same city check
if (
plan_request.depart_city.strip().lower()
== plan_request.arrive_city.strip().lower()
and not plan_request.prefer_arrive_iata
and not plan_request.prefer_depart_iata
):
return [
TextContent(
type="text",
text="Error: Departure and arrival cities are identical. Please specify airports explicitly.",
)
]
# Resolve airports
try:
dep = _resolve_endpoint(
plan_request.depart_city,
plan_request.depart_country,
plan_request.prefer_depart_iata,
"departure",
)
arr = _resolve_endpoint(
plan_request.arrive_city,
plan_request.arrive_country,
plan_request.prefer_arrive_iata,
"arrival",
)
except AirportResolutionError as e:
return [
TextContent(type="text", text=f"Airport resolution error: {str(e)}")
]
# Calculate route
polyline, distance_km = great_circle_points(
dep.lat, dep.lon, arr.lat, arr.lon, plan_request.route_step_km
)
distance_nm = distance_km * NM_PER_KM
# Get performance estimates
try:
estimates, engine_name = estimates_openap(
plan_request.ac_type,
plan_request.cruise_alt_ft,
plan_request.mass_kg,
distance_km,
)
except OpenAPError as e:
return [
TextContent(type="text", text=f"Performance estimation error: {str(e)}")
]
# Format response
response_lines = [
f"Flight Plan: {dep.iata} → {arr.iata}",
f"Route: {dep.name} → {arr.name}",
f"Distance: {distance_km:.0f} km ({distance_nm:.0f} NM)",
f"Aircraft: {plan_request.ac_type}",
f"Cruise Altitude: {plan_request.cruise_alt_ft:,} ft",
"",
"Performance Estimates (OpenAP):",
f"• Block Time: {estimates['block']['time_min']:.0f} minutes ({estimates['block']['time_min'] / 60:.1f} hours)",
f"• Block Fuel: {estimates['block']['fuel_kg']:.0f} kg",
"",
"Flight Segments:",
f"• Climb: {estimates['climb']['time_min']:.0f} min, {estimates['climb']['distance_km']:.0f} km, {estimates['climb']['fuel_kg']:.0f} kg fuel",
f"• Cruise: {estimates['cruise']['time_min']:.0f} min, {estimates['cruise']['distance_km']:.0f} km, {estimates['cruise']['fuel_kg']:.0f} kg fuel",
f"• Descent: {estimates['descent']['time_min']:.0f} min, {estimates['descent']['distance_km']:.0f} km, {estimates['descent']['fuel_kg']:.0f} kg fuel",
"",
f"Route Polyline: {len(polyline)} points (every {plan_request.route_step_km} km)",
f"Mass Assumption: {estimates['assumptions']['mass_kg']:.0f} kg",
]
return [TextContent(type="text", text="\n".join(response_lines))]
except ValidationError as e:
return [TextContent(type="text", text=f"Validation error: {str(e)}")]
except Exception as e:
return [TextContent(type="text", text=f"Flight planning error: {str(e)}")]
async def _handle_calculate_distance(arguments: dict) -> list[TextContent]:
"""Handle distance calculation requests."""
try:
origin = arguments.get("origin", {})
destination = arguments.get("destination", {})
step_km = arguments.get("step_km", 50.0)
lat1 = origin.get("latitude")
lon1 = origin.get("longitude")
lat2 = destination.get("latitude")
lon2 = destination.get("longitude")
if None in [lat1, lon1, lat2, lon2]:
return [
TextContent(
type="text",
text="Error: Origin and destination coordinates are required",
)
]
polyline, distance_km = great_circle_points(lat1, lon1, lat2, lon2, step_km)
distance_nm = distance_km * NM_PER_KM
response_lines = [
"Great Circle Distance Calculation",
f"Origin: {lat1:.4f}, {lon1:.4f}",
f"Destination: {lat2:.4f}, {lon2:.4f}",
f"Distance: {distance_km:.2f} km ({distance_nm:.2f} NM)",
f"Polyline Points: {len(polyline)} (every {step_km} km)",
]
return [TextContent(type="text", text="\n".join(response_lines))]
except Exception as e:
return [TextContent(type="text", text=f"Distance calculation error: {str(e)}")]
async def _handle_get_aircraft_performance(arguments: dict) -> list[TextContent]:
"""Handle aircraft performance requests."""
try:
aircraft_type = arguments.get("aircraft_type", "")
distance_km = arguments.get("distance_km", 0)
cruise_altitude = arguments.get("cruise_altitude", 35000)
mass_kg = arguments.get("mass_kg")
if not aircraft_type:
return [TextContent(type="text", text="Error: Aircraft type is required")]
if distance_km <= 0:
return [TextContent(type="text", text="Error: Distance must be positive")]
estimates, engine_name = estimates_openap(
aircraft_type, cruise_altitude, mass_kg, distance_km
)
response_lines = [
f"Aircraft Performance Estimates ({engine_name})",
f"Aircraft: {aircraft_type}",
f"Distance: {distance_km:.0f} km",
f"Cruise Altitude: {cruise_altitude:,} ft",
f"Mass: {estimates['assumptions']['mass_kg']:.0f} kg",
"",
"Block Estimates:",
f"• Time: {estimates['block']['time_min']:.0f} minutes ({estimates['block']['time_min'] / 60:.1f} hours)",
f"• Fuel: {estimates['block']['fuel_kg']:.0f} kg",
"",
"Segment Breakdown:",
f"• Climb: {estimates['climb']['time_min']:.0f} min, {estimates['climb']['distance_km']:.0f} km, {estimates['climb']['fuel_kg']:.0f} kg",
f"• Cruise: {estimates['cruise']['time_min']:.0f} min, {estimates['cruise']['distance_km']:.0f} km, {estimates['cruise']['fuel_kg']:.0f} kg",
f"• Descent: {estimates['descent']['time_min']:.0f} min, {estimates['descent']['distance_km']:.0f} km, {estimates['descent']['fuel_kg']:.0f} kg",
]
return [TextContent(type="text", text="\n".join(response_lines))]
except OpenAPError as e:
return [
TextContent(type="text", text=f"Performance estimation error: {str(e)}")
]
except Exception as e:
return [TextContent(type="text", text=f"Aircraft performance error: {str(e)}")]
async def _handle_get_system_status(arguments: dict) -> list[TextContent]:
"""Handle system status requests."""
try:
from .core import _AIRPORTS_IATA
status_lines = [
"Aerospace MCP Server Status",
f"OpenAP Available: {'Yes' if OPENAP_AVAILABLE else 'No'}",
f"Airports Loaded: {len(_AIRPORTS_IATA):,}",
"",
"Available Tools:",
]
# Add all available tools dynamically
for tool in TOOLS:
status_lines.append(f"• {tool.name} - {tool.description}")
status_lines.append("")
if not OPENAP_AVAILABLE:
status_lines.extend(
[
"",
"Note: OpenAP is not available. Flight performance estimates will not work.",
"Install with: pip install openap",
]
)
return [TextContent(type="text", text="\n".join(status_lines))]
except Exception as e:
return [TextContent(type="text", text=f"Status error: {str(e)}")]
async def _handle_get_atmosphere_profile(arguments: dict) -> list[TextContent]:
"""Handle atmosphere profile requests."""
try:
from .integrations.atmosphere import get_atmosphere_profile
altitudes_m = arguments.get("altitudes_m", [])
model_type = arguments.get("model_type", "ISA")
if not altitudes_m:
return [TextContent(type="text", text="Error: altitudes_m is required")]
profile = get_atmosphere_profile(altitudes_m, model_type)
# Format response
result_lines = [f"Atmospheric Profile ({model_type})", "=" * 50]
result_lines.append(
f"{'Alt (m)':>8} {'Press (Pa)':>12} {'Temp (K)':>9} {'Density':>10} {'Sound (m/s)':>12}"
)
result_lines.append("-" * 60)
for point in profile:
result_lines.append(
f"{point.altitude_m:8.0f} {point.pressure_pa:12.1f} {point.temperature_k:9.2f} "
f"{point.density_kg_m3:10.6f} {point.speed_of_sound_mps:12.1f}"
)
# Add JSON data for programmatic use
json_data = json.dumps([p.model_dump() for p in profile], indent=2)
result_lines.extend(["", "JSON Data:", json_data])
return [TextContent(type="text", text="\n".join(result_lines))]
except Exception as e:
return [TextContent(type="text", text=f"Atmosphere profile error: {str(e)}")]
async def _handle_wind_model_simple(arguments: dict) -> list[TextContent]:
"""Handle simple wind model requests."""
try:
from .integrations.atmosphere import wind_model_simple
altitudes_m = arguments.get("altitudes_m", [])
surface_wind_mps = arguments.get("surface_wind_mps")
surface_altitude_m = arguments.get("surface_altitude_m", 0.0)
model = arguments.get("model", "logarithmic")
roughness_length_m = arguments.get("roughness_length_m", 0.1)
if not altitudes_m or surface_wind_mps is None:
return [
TextContent(
type="text",
text="Error: altitudes_m and surface_wind_mps are required",
)
]
wind_profile = wind_model_simple(
altitudes_m, surface_wind_mps, surface_altitude_m, model, roughness_length_m
)
# Format response
result_lines = [f"Wind Profile ({model} model)", "=" * 40]
result_lines.append(f"Surface wind: {surface_wind_mps} m/s")
if model == "logarithmic":
result_lines.append(f"Roughness length: {roughness_length_m} m")
result_lines.extend(["", f"{'Alt (m)':>8} {'Wind (m/s)':>12}", "-" * 25])
for point in wind_profile:
result_lines.append(f"{point.altitude_m:8.0f} {point.wind_speed_mps:12.2f}")
# Add JSON data
json_data = json.dumps([p.model_dump() for p in wind_profile], indent=2)
result_lines.extend(["", "JSON Data:", json_data])
return [TextContent(type="text", text="\n".join(result_lines))]
except Exception as e:
return [TextContent(type="text", text=f"Wind model error: {str(e)}")]
async def _handle_transform_frames(arguments: dict) -> list[TextContent]:
"""Handle coordinate frame transformation requests."""
try:
from .integrations.frames import transform_frames
xyz = arguments.get("xyz", [])
from_frame = arguments.get("from_frame")
to_frame = arguments.get("to_frame")
epoch_iso = arguments.get("epoch_iso", "2000-01-01T12:00:00")
if not xyz or not from_frame or not to_frame:
return [
TextContent(
type="text",
text="Error: xyz, from_frame, and to_frame are required",
)
]
result = transform_frames(xyz, from_frame, to_frame, epoch_iso)
# Format response
result_lines = ["Coordinate Frame Transformation", "=" * 40]
result_lines.append(f"From: {from_frame}")
result_lines.append(f"To: {to_frame}")
result_lines.append(f"Epoch: {epoch_iso}")
result_lines.extend(["", "Input coordinates:"])
if from_frame == "GEODETIC":
result_lines.append(f" Latitude: {xyz[0]:11.6f}°")
result_lines.append(f" Longitude: {xyz[1]:11.6f}°")
result_lines.append(f" Altitude: {xyz[2]:11.2f} m")
else:
result_lines.append(f" X: {xyz[0]:15.2f} m")
result_lines.append(f" Y: {xyz[1]:15.2f} m")
result_lines.append(f" Z: {xyz[2]:15.2f} m")
result_lines.extend(["", "Transformed coordinates:"])
if to_frame == "GEODETIC":
result_lines.append(f" Latitude: {result.x:11.6f}°")
result_lines.append(f" Longitude: {result.y:11.6f}°")
result_lines.append(f" Altitude: {result.z:11.2f} m")
else:
result_lines.append(f" X: {result.x:15.2f} m")
result_lines.append(f" Y: {result.y:15.2f} m")
result_lines.append(f" Z: {result.z:15.2f} m")
# Add JSON data
json_data = json.dumps(result.model_dump(), indent=2)
result_lines.extend(["", "JSON Data:", json_data])
return [TextContent(type="text", text="\n".join(result_lines))]
except Exception as e:
return [TextContent(type="text", text=f"Frame transformation error: {str(e)}")]
async def _handle_geodetic_to_ecef(arguments: dict) -> list[TextContent]:
"""Handle geodetic to ECEF conversion."""
try:
from .integrations.frames import geodetic_to_ecef
latitude_deg = arguments.get("latitude_deg")
longitude_deg = arguments.get("longitude_deg")
altitude_m = arguments.get("altitude_m")
if latitude_deg is None or longitude_deg is None or altitude_m is None:
return [
TextContent(
type="text",
text="Error: latitude_deg, longitude_deg, and altitude_m are required",
)
]
result = geodetic_to_ecef(latitude_deg, longitude_deg, altitude_m)
# Format response
result_lines = [
"Geodetic to ECEF Conversion",
"=" * 35,
f"Latitude: {latitude_deg:11.6f}°",
f"Longitude: {longitude_deg:11.6f}°",
f"Altitude: {altitude_m:11.2f} m",
"",
"ECEF Coordinates:",
f" X: {result.x:15.2f} m",
f" Y: {result.y:15.2f} m",
f" Z: {result.z:15.2f} m",
]
# Add JSON data
json_data = json.dumps(result.model_dump(), indent=2)
result_lines.extend(["", "JSON Data:", json_data])
return [TextContent(type="text", text="\n".join(result_lines))]
except Exception as e:
return [TextContent(type="text", text=f"Geodetic to ECEF error: {str(e)}")]
async def _handle_ecef_to_geodetic(arguments: dict) -> list[TextContent]:
"""Handle ECEF to geodetic conversion."""
try:
from .integrations.frames import ecef_to_geodetic
x = arguments.get("x")
y = arguments.get("y")
z = arguments.get("z")
if x is None or y is None or z is None:
return [
TextContent(
type="text", text="Error: x, y, and z coordinates are required"
)
]
result = ecef_to_geodetic(x, y, z)
# Format response
result_lines = [
"ECEF to Geodetic Conversion",
"=" * 35,
f"ECEF X: {x:15.2f} m",
f"ECEF Y: {y:15.2f} m",
f"ECEF Z: {z:15.2f} m",
"",
"Geodetic Coordinates:",
f" Latitude: {result.latitude_deg:11.6f}°",
f" Longitude: {result.longitude_deg:11.6f}°",
f" Altitude: {result.altitude_m:11.2f} m",
]
# Add JSON data
json_data = json.dumps(result.model_dump(), indent=2)
result_lines.extend(["", "JSON Data:", json_data])
return [TextContent(type="text", text="\n".join(result_lines))]
except Exception as e:
return [TextContent(type="text", text=f"ECEF to geodetic error: {str(e)}")]
async def _handle_wing_vlm_analysis(arguments: dict) -> list[TextContent]:
"""Handle wing VLM analysis requests."""
try:
from .integrations.aero import WingGeometry, wing_vlm_analysis
geometry_data = arguments.get("geometry", {})
alpha_deg_list = arguments.get("alpha_deg_list", [])
mach = arguments.get("mach", 0.2)
if not geometry_data or not alpha_deg_list:
return [
TextContent(
type="text", text="Error: geometry and alpha_deg_list are required"
)
]
# Create geometry object
geometry = WingGeometry(**geometry_data)
# Run analysis
results = wing_vlm_analysis(geometry, alpha_deg_list, mach)
# Format response
result_lines = [
f"Wing VLM Analysis (Mach {mach})",
"=" * 50,
f"Geometry: {geometry.span_m:.2f}m span, AR={geometry.span_m**2 / ((geometry.chord_root_m + geometry.chord_tip_m) * geometry.span_m / 2):.1f}",
f"Airfoils: {geometry.airfoil_root} (root) -> {geometry.airfoil_tip} (tip)",
"",
f"{'Alpha (°)':>8} {'CL':>8} {'CD':>8} {'CM':>8} {'L/D':>8} {'Eff':>8}",
"-" * 55,
]
for point in results:
eff_str = f"{point.span_efficiency:.3f}" if point.span_efficiency else "N/A"
result_lines.append(
f"{point.alpha_deg:8.1f} {point.CL:8.4f} {point.CD:8.5f} {point.CM:8.4f} "
f"{point.L_D_ratio:8.1f} {eff_str:>8}"
)
# Add JSON data
json_data = json.dumps([p.model_dump() for p in results], indent=2)
result_lines.extend(["", "JSON Data:", json_data])
return [TextContent(type="text", text="\n".join(result_lines))]
except Exception as e:
return [TextContent(type="text", text=f"Wing analysis error: {str(e)}")]
async def _handle_airfoil_polar_analysis(arguments: dict) -> list[TextContent]:
"""Handle airfoil polar analysis requests."""
try:
from .integrations.aero import airfoil_polar_analysis
airfoil_name = arguments.get("airfoil_name", "NACA2412")
alpha_deg_list = arguments.get("alpha_deg_list", [])
reynolds = arguments.get("reynolds", 1000000)
mach = arguments.get("mach", 0.1)
if not alpha_deg_list:
return [TextContent(type="text", text="Error: alpha_deg_list is required")]
# Run analysis
results = airfoil_polar_analysis(airfoil_name, alpha_deg_list, reynolds, mach)
# Format response
result_lines = [
f"Airfoil Polar Analysis: {airfoil_name}",
"=" * 50,
f"Reynolds: {reynolds:.0e}, Mach: {mach:.3f}",
"",
f"{'Alpha (°)':>8} {'CL':>8} {'CD':>8} {'CM':>8} {'L/D':>8}",
"-" * 45,
]
for point in results:
result_lines.append(
f"{point.alpha_deg:8.1f} {point.cl:8.4f} {point.cd:8.5f} "
f"{point.cm:8.4f} {point.cl_cd_ratio:8.1f}"
)
# Add JSON data
json_data = json.dumps([p.model_dump() for p in results], indent=2)
result_lines.extend(["", "JSON Data:", json_data])
return [TextContent(type="text", text="\n".join(result_lines))]
except Exception as e:
return [TextContent(type="text", text=f"Airfoil polar error: {str(e)}")]
async def _handle_calculate_stability_derivatives(arguments: dict) -> list[TextContent]:
"""Handle stability derivatives calculation requests."""
try:
from .integrations.aero import (
WingGeometry,
calculate_stability_derivatives,
estimate_wing_area,
)
geometry_data = arguments.get("geometry", {})
alpha_deg = arguments.get("alpha_deg", 2.0)
mach = arguments.get("mach", 0.2)
if not geometry_data:
return [TextContent(type="text", text="Error: geometry is required")]
# Create geometry object
geometry = WingGeometry(**geometry_data)
# Calculate stability derivatives
stability = calculate_stability_derivatives(geometry, alpha_deg, mach)
# Calculate wing properties
wing_props = estimate_wing_area(geometry)
# Format response
result_lines = [
"Stability Derivatives Analysis",
"=" * 40,
f"Reference: α = {alpha_deg}°, M = {mach}",
"",
"Wing Geometry:",
f" Area: {wing_props['wing_area_m2']:.2f} m²",
f" Aspect Ratio: {wing_props['aspect_ratio']:.1f}",
f" Taper Ratio: {wing_props['taper_ratio']:.2f}",
f" MAC: {wing_props['mean_aerodynamic_chord_m']:.3f} m",
"",
"Stability Derivatives:",
f" CL_α = {stability.CL_alpha:.3f} /rad ({math.degrees(stability.CL_alpha):.3f} /deg)",
f" CM_α = {stability.CM_alpha:.4f} /rad ({math.degrees(stability.CM_alpha):.4f} /deg)",
]
if stability.CL_alpha_dot:
result_lines.append(f" CL_α̇ = {stability.CL_alpha_dot:.4f}")
if stability.CM_alpha_dot:
result_lines.append(f" CM_α̇ = {stability.CM_alpha_dot:.4f}")
# Stability assessment
result_lines.extend(["", "Stability Assessment:"])
if stability.CM_alpha < 0:
result_lines.append(" ✓ Statically stable (CM_α < 0)")
else:
result_lines.append(" ⚠ Statically unstable (CM_α > 0)")
# Add JSON data
combined_data = {
"stability_derivatives": stability.model_dump(),
"wing_properties": wing_props,
}
json_data = json.dumps(combined_data, indent=2)
result_lines.extend(["", "JSON Data:", json_data])
return [TextContent(type="text", text="\n".join(result_lines))]
except Exception as e:
return [TextContent(type="text", text=f"Stability derivatives error: {str(e)}")]
async def _handle_propeller_bemt_analysis(arguments: dict) -> list[TextContent]:
"""Handle propeller BEMT analysis requests."""
try:
from .integrations.propellers import PropellerGeometry, propeller_bemt_analysis
geometry_data = arguments.get("geometry", {})
rpm_list = arguments.get("rpm_list", [])
velocity_ms = arguments.get("velocity_ms", 0.0)
altitude_m = arguments.get("altitude_m", 0.0)
if not geometry_data or not rpm_list:
return [
TextContent(
type="text", text="Error: geometry and rpm_list are required"
)
]
# Create geometry object
geometry = PropellerGeometry(**geometry_data)
# Run analysis
results = propeller_bemt_analysis(geometry, rpm_list, velocity_ms, altitude_m)
# Format response
result_lines = [
"Propeller BEMT Analysis",
"=" * 60,
f"Propeller: {geometry.diameter_m:.3f}m dia × {geometry.pitch_m:.3f}m pitch, {geometry.num_blades} blades",
f"Conditions: V = {velocity_ms:.1f} m/s, Alt = {altitude_m:.0f} m",
"",
f"{'RPM':>6} {'Thrust(N)':>10} {'Power(W)':>10} {'Torque(Nm)':>11} {'Efficiency':>10} {'J':>6} {'CT':>8} {'CP':>8}",
"-" * 85,
]
for point in results:
result_lines.append(
f"{point.rpm:6.0f} {point.thrust_n:10.1f} {point.power_w:10.1f} "
f"{point.torque_nm:11.3f} {point.efficiency:10.3f} {point.advance_ratio:6.3f} "
f"{point.thrust_coefficient:8.4f} {point.power_coefficient:8.4f}"
)
# Find peak efficiency point
if results:
peak_eff_point = max(results, key=lambda x: x.efficiency)
result_lines.extend(
[
"",
f"Peak Efficiency: {peak_eff_point.efficiency:.1%} at {peak_eff_point.rpm:.0f} RPM",
f" Thrust: {peak_eff_point.thrust_n:.1f} N, Power: {peak_eff_point.power_w:.1f} W",
]
)
# Add JSON data
json_data = json.dumps([p.model_dump() for p in results], indent=2)
result_lines.extend(["", "JSON Data:", json_data])
return [TextContent(type="text", text="\n".join(result_lines))]
except Exception as e:
return [TextContent(type="text", text=f"Propeller analysis error: {str(e)}")]
async def _handle_uav_energy_estimate(arguments: dict) -> list[TextContent]:
"""Handle UAV energy estimation requests."""
try:
from .integrations.propellers import (
BatteryConfiguration,
UAVConfiguration,
uav_energy_estimate,
)
uav_data = arguments.get("uav_config", {})
battery_data = arguments.get("battery_config", {})
mission_data = arguments.get("mission_profile", {})
if not uav_data or not battery_data:
return [
TextContent(
type="text",
text="Error: uav_config and battery_config are required",
)
]
# Create configuration objects
uav_config = UAVConfiguration(**uav_data)
battery_config = BatteryConfiguration(**battery_data)
# Run analysis
result = uav_energy_estimate(uav_config, battery_config, mission_data)
# Determine aircraft type
aircraft_type = "Fixed-Wing" if uav_config.wing_area_m2 else "Multirotor"
# Format response
result_lines = [
f"UAV Energy Analysis ({aircraft_type})",
"=" * 45,
f"Aircraft Mass: {uav_config.mass_kg:.1f} kg",
f"Battery: {battery_config.capacity_ah:.1f} Ah @ {battery_config.voltage_nominal_v:.1f}V ({battery_config.mass_kg:.2f} kg)",
]
if uav_config.wing_area_m2:
result_lines.extend(
[
f"Wing Area: {uav_config.wing_area_m2:.2f} m²",
f"Wing Loading: {uav_config.mass_kg * 9.81 / uav_config.wing_area_m2:.1f} N/m²",
]
)
elif uav_config.disk_area_m2:
result_lines.extend(
[
f"Rotor Disk Area: {uav_config.disk_area_m2:.2f} m²",
f"Disk Loading: {uav_config.mass_kg * 9.81 / uav_config.disk_area_m2:.1f} N/m²",
]
)
result_lines.extend(
[
"",
"Energy Analysis:",
f" Battery Energy: {result.battery_energy_wh:.0f} Wh",
f" Usable Energy: {result.energy_consumed_wh:.0f} Wh",
f" Power Required: {result.power_required_w:.0f} W",
f" System Efficiency: {result.efficiency_overall:.1%}",
"",
"Mission Performance:",
f" Flight Time: {result.flight_time_min:.1f} minutes ({result.flight_time_min / 60:.1f} hours)",
]
)
if result.range_km:
result_lines.append(f" Range: {result.range_km:.1f} km")
if result.hover_time_min:
result_lines.append(f" Hover Time: {result.hover_time_min:.1f} minutes")
# Add recommendations
result_lines.extend(["", "Recommendations:"])
if result.flight_time_min < 10:
result_lines.append(
" ⚠ Very short flight time - consider larger battery or lighter aircraft"
)
elif result.flight_time_min < 20:
result_lines.append(
" ⚠ Short flight time - optimize for efficiency or add battery capacity"
)
elif result.flight_time_min > 120:
result_lines.append(
" ✓ Excellent endurance - well optimized configuration"
)
else:
result_lines.append(" ✓ Good flight time for mission requirements")
if result.efficiency_overall < 0.7:
result_lines.append(
" ⚠ Low system efficiency - check motor/propeller matching"
)
else:
result_lines.append(" ✓ Good system efficiency")
# Add JSON data
json_data = json.dumps(result.model_dump(), indent=2)
result_lines.extend(["", "JSON Data:", json_data])
return [TextContent(type="text", text="\n".join(result_lines))]
except Exception as e:
return [TextContent(type="text", text=f"UAV energy analysis error: {str(e)}")]
async def _handle_get_airfoil_database(arguments: dict) -> list[TextContent]:
"""Handle airfoil database requests."""
try:
from .integrations.aero import get_airfoil_database
database = get_airfoil_database()
# Format response
result_lines = [
"Available Airfoil Database",
"=" * 35,
f"{'Airfoil':>12} {'CL_α':>8} {'CD0':>8} {'CL_max':>8} {'α_stall':>8}",
"-" * 50,
]
for name, data in database.items():
result_lines.append(
f"{name:>12} {data['cl_alpha']:8.2f} {data['cd0']:8.4f} "
f"{data['cl_max']:8.2f} {data['alpha_stall_deg']:8.1f}°"
)
result_lines.extend(
[
"",
"Notes:",
" CL_α: Lift curve slope (per radian)",
" CD0: Zero-lift drag coefficient",
" CL_max: Maximum lift coefficient",
" α_stall: Stall angle of attack",
]
)
# Add JSON data
json_data = json.dumps(database, indent=2)
result_lines.extend(["", "JSON Data:", json_data])
return [TextContent(type="text", text="\n".join(result_lines))]
except Exception as e:
return [TextContent(type="text", text=f"Airfoil database error: {str(e)}")]
async def _handle_get_propeller_database(arguments: dict) -> list[TextContent]:
"""Handle propeller database requests."""
try:
from .integrations.propellers import get_propeller_database
database = get_propeller_database()
# Format response
result_lines = [
"Available Propeller Database",
"=" * 40,
f"{'Propeller':>15} {'Diameter':>9} {'Pitch':>7} {'Blades':>7} {'η_max':>7}",
"-" * 50,
]
for name, data in database.items():
diameter_in = data["diameter_m"] * 39.37 # Convert to inches
pitch_in = data["pitch_m"] * 39.37
result_lines.append(
f'{name:>15} {diameter_in:9.1f}" {pitch_in:7.1f}" '
f"{data['num_blades']:7d} {data['efficiency_max']:7.1%}"
)
result_lines.extend(
[
"",
"Notes:",
" Diameter and pitch shown in inches",
" η_max: Maximum efficiency (estimated)",
" Data includes activity factor and design coefficients",
]
)
# Add JSON data
json_data = json.dumps(database, indent=2)
result_lines.extend(["", "JSON Data:", json_data])
return [TextContent(type="text", text="\n".join(result_lines))]
except Exception as e:
return [TextContent(type="text", text=f"Propeller database error: {str(e)}")]
async def _handle_rocket_3dof_trajectory(arguments: dict) -> list[TextContent]:
"""Handle rocket trajectory calculation requests."""
try:
from .integrations.rockets import (
RocketGeometry,
analyze_rocket_performance,
rocket_3dof_trajectory,
)
geometry_data = arguments.get("geometry", {})
dt_s = arguments.get("dt_s", 0.1)
max_time_s = arguments.get("max_time_s", 300.0)
launch_angle_deg = arguments.get("launch_angle_deg", 90.0)
if not geometry_data:
return [TextContent(type="text", text="Error: geometry is required")]
# Create geometry object
geometry = RocketGeometry(**geometry_data)
# Run trajectory calculation
trajectory = rocket_3dof_trajectory(
geometry, dt_s, max_time_s, launch_angle_deg
)
performance = analyze_rocket_performance(trajectory)
# Format response
result_lines = [
"3DOF Rocket Trajectory Analysis",
"=" * 50,
f"Launch Angle: {launch_angle_deg}°",
f"Rocket: {geometry.dry_mass_kg + geometry.propellant_mass_kg:.0f} kg total ({geometry.dry_mass_kg:.0f} kg dry)",
f"Geometry: {geometry.diameter_m:.2f}m × {geometry.length_m:.1f}m (CD = {geometry.cd:.2f})",
"",
"Performance Summary:",
f" Max Altitude: {performance.max_altitude_m / 1000:.2f} km ({performance.max_altitude_m:.0f} m)",
f" Apogee Time: {performance.apogee_time_s:.1f} seconds",
f" Max Velocity: {performance.max_velocity_ms:.0f} m/s (Mach {performance.max_mach:.2f})",
f" Max Dynamic Pressure: {performance.max_q_pa / 1000:.1f} kPa",
"",
"Engine Performance:",
f" Burnout Altitude: {performance.burnout_altitude_m / 1000:.2f} km",
f" Burnout Velocity: {performance.burnout_velocity_ms:.0f} m/s",
f" Burnout Time: {performance.burnout_time_s:.1f} s",
f" Total Impulse: {performance.total_impulse_ns / 1000:.1f} kN·s",
f" Specific Impulse: {performance.specific_impulse_s:.0f} s",
"",
f"Trajectory Points: {len(trajectory)} (Δt = {dt_s:.2f} s)",
]
# Add JSON data for trajectory points (sample every 10th point to avoid huge output)
sample_trajectory = trajectory[
:: max(1, len(trajectory) // 50)
] # Max 50 points
json_data = {
"performance": (
performance.model_dump()
if hasattr(performance, "model_dump")
else asdict(performance)
),
"trajectory_sample": [asdict(point) for point in sample_trajectory],
}
result_lines.extend(
["", "JSON Data (sampled trajectory):", json.dumps(json_data, indent=2)]
)
return [TextContent(type="text", text="\n".join(result_lines))]
except Exception as e:
return [TextContent(type="text", text=f"Rocket trajectory error: {str(e)}")]
async def _handle_estimate_rocket_sizing(arguments: dict) -> list[TextContent]:
"""Handle rocket sizing estimation requests."""
try:
from .integrations.rockets import estimate_rocket_sizing
target_altitude_m = arguments.get("target_altitude_m")
payload_mass_kg = arguments.get("payload_mass_kg")
propellant_type = arguments.get("propellant_type", "solid")
if target_altitude_m is None or payload_mass_kg is None:
return [
TextContent(
type="text",
text="Error: target_altitude_m and payload_mass_kg are required",
)
]
# Run sizing analysis
sizing = estimate_rocket_sizing(
target_altitude_m, payload_mass_kg, propellant_type
)
# Format response
result_lines = [
f"Rocket Sizing Estimate ({propellant_type.title()} Propellant)",
"=" * 50,
"Mission Requirements:",
f" Target Altitude: {target_altitude_m / 1000:.1f} km",
f" Payload Mass: {payload_mass_kg:.2f} kg",
"",
f"Mission Feasible: {'✓ Yes' if sizing['feasible'] else '✗ No (requires staging)'}",
]
if sizing["feasible"]:
result_lines.extend(
[
"",
"Rocket Sizing:",
f" Total Mass: {sizing['total_mass_kg']:.0f} kg",
f" Propellant Mass: {sizing['propellant_mass_kg']:.0f} kg ({sizing['propellant_mass_kg'] / sizing['total_mass_kg'] * 100:.1f}%)",
f" Structure Mass: {sizing['structure_mass_kg']:.0f} kg ({sizing['structure_mass_kg'] / sizing['total_mass_kg'] * 100:.1f}%)",
f" Payload Mass: {payload_mass_kg:.0f} kg ({payload_mass_kg / sizing['total_mass_kg'] * 100:.1f}%)",
"",
"Performance:",
f" Mass Ratio: {sizing['mass_ratio']:.2f}",
f" Delta-V Required: {sizing['delta_v_ms']:.0f} m/s",
f" Specific Impulse: {sizing['specific_impulse_s']:.0f} s",
f" Thrust Required: {sizing['thrust_n'] / 1000:.1f} kN",
f" Thrust-to-Weight: {sizing['thrust_to_weight']:.1f}",
"",
"Geometry Estimates:",
f" Diameter: {sizing['diameter_m']:.2f} m",
f" Length: {sizing['length_m']:.1f} m",
f" L/D Ratio: {sizing['length_m'] / sizing['diameter_m']:.1f}",
]
)
else:
result_lines.extend(
[
"",
"⚠ Mission requires staging or different propellant",
"Consider:",
" - Multi-stage rocket design",
" - Higher performance propellant (liquid vs solid)",
" - Reduced payload mass",
" - Lower target altitude",
]
)
# Add JSON data
json_data = json.dumps(sizing, indent=2)
result_lines.extend(["", "JSON Data:", json_data])
return [TextContent(type="text", text="\n".join(result_lines))]
except Exception as e:
return [TextContent(type="text", text=f"Rocket sizing error: {str(e)}")]
async def _handle_optimize_launch_angle(arguments: dict) -> list[TextContent]:
"""Handle launch angle optimization requests."""
try:
from .integrations.rockets import RocketGeometry
from .integrations.trajopt import (
optimize_launch_angle,
)
geometry_data = arguments.get("geometry", {})
objective = arguments.get("objective", "max_altitude")
angle_bounds = arguments.get("angle_bounds", [80.0, 90.0])
if not geometry_data:
return [TextContent(type="text", text="Error: geometry is required")]
# Create geometry object
geometry = RocketGeometry(**geometry_data)
# Run optimization
result = optimize_launch_angle(geometry, objective, tuple(angle_bounds))
# Format response
result_lines = [
f"Launch Angle Optimization ({objective})",
"=" * 50,
f"Rocket: {geometry.dry_mass_kg + geometry.propellant_mass_kg:.0f} kg total",
f"Search Range: {angle_bounds[0]:.1f}° to {angle_bounds[1]:.1f}°",
"",
"Optimization Results:",
f" Optimal Launch Angle: {result.optimal_parameters['launch_angle_deg']:.2f}°",
f" Optimal {objective.replace('_', ' ').title()}: {result.optimal_objective:.1f} {'m' if 'altitude' in objective else 'units'}",
f" Converged: {'✓ Yes' if result.converged else '✗ No'}",
f" Iterations: {result.iterations}",
"",
"Performance at Optimal Angle:",
f" Max Altitude: {result.performance.max_altitude_m / 1000:.2f} km",
f" Max Velocity: {result.performance.max_velocity_ms:.0f} m/s",
f" Burnout Time: {result.performance.burnout_time_s:.1f} s",
f" Total Impulse: {result.performance.total_impulse_ns / 1000:.1f} kN·s",
]
# Add JSON data
json_data = {
"optimization_result": {
"optimal_parameters": result.optimal_parameters,
"optimal_objective": result.optimal_objective,
"converged": result.converged,
"iterations": result.iterations,
},
"performance": asdict(result.performance),
}
result_lines.extend(["", "JSON Data:", json.dumps(json_data, indent=2)])
return [TextContent(type="text", text="\n".join(result_lines))]
except Exception as e:
return [
TextContent(type="text", text=f"Launch angle optimization error: {str(e)}")
]
async def _handle_optimize_thrust_profile(arguments: dict) -> list[TextContent]:
"""Handle thrust profile optimization requests."""
try:
from .integrations.rockets import RocketGeometry
from .integrations.trajopt import optimize_thrust_profile
geometry_data = arguments.get("geometry", {})
burn_time_s = arguments.get("burn_time_s")
total_impulse_target = arguments.get("total_impulse_target")
n_segments = arguments.get("n_segments", 5)
objective = arguments.get("objective", "max_altitude")
if not geometry_data or burn_time_s is None or total_impulse_target is None:
return [
TextContent(
type="text",
text="Error: geometry, burn_time_s, and total_impulse_target are required",
)
]
# Create geometry object
geometry = RocketGeometry(**geometry_data)
# Run optimization
result = optimize_thrust_profile(
geometry, burn_time_s, total_impulse_target, n_segments, objective
)
# Format response
result_lines = [
f"Thrust Profile Optimization ({objective})",
"=" * 60,
f"Target: {objective.replace('_', ' ').title()}",
f"Burn Time: {burn_time_s:.1f} s, Target Impulse: {total_impulse_target / 1000:.1f} kN·s",
f"Segments: {n_segments}",
"",
"Optimization Results:",
f" Converged: {'✓ Yes' if result.converged else '✗ No'}",
f" Iterations: {result.iterations}",
f" Optimal {objective.replace('_', ' ').title()}: {result.optimal_objective:.1f} {'m' if 'altitude' in objective else 'units'}",
"",
"Optimized Performance:",
f" Max Altitude: {result.performance.max_altitude_m / 1000:.2f} km",
f" Max Velocity: {result.performance.max_velocity_ms:.0f} m/s",
f" Max Dynamic Pressure: {result.performance.max_q_pa / 1000:.1f} kPa",
f" Actual Total Impulse: {result.performance.total_impulse_ns / 1000:.1f} kN·s",
"",
"Thrust Profile (segment multipliers):",
]
# Show thrust multipliers
for i in range(n_segments):
param_key = f"thrust_mult_seg_{i + 1}"
if param_key in result.optimal_parameters:
multiplier = result.optimal_parameters[param_key]
time_start = i * burn_time_s / n_segments
time_end = (i + 1) * burn_time_s / n_segments
result_lines.append(
f" Segment {i + 1} ({time_start:.1f}-{time_end:.1f}s): {multiplier:.2f}x"
)
# Add JSON data (excluding large thrust_curve for brevity)
json_data = {
"optimization_result": {
"converged": result.converged,
"iterations": result.iterations,
"optimal_objective": result.optimal_objective,
"thrust_multipliers": {
k: v
for k, v in result.optimal_parameters.items()
if k.startswith("thrust_mult")
},
},
"performance": asdict(result.performance),
}
result_lines.extend(["", "JSON Data:", json.dumps(json_data, indent=2)])
return [TextContent(type="text", text="\n".join(result_lines))]
except Exception as e:
return [
TextContent(
type="text", text=f"Thrust profile optimization error: {str(e)}"
)
]
async def _handle_trajectory_sensitivity_analysis(arguments: dict) -> list[TextContent]:
"""Handle trajectory sensitivity analysis requests."""
try:
from .integrations.rockets import RocketGeometry
from .integrations.trajopt import trajectory_sensitivity_analysis
geometry_data = arguments.get("base_geometry", {})
parameter_variations = arguments.get("parameter_variations", {})
objective = arguments.get("objective", "max_altitude")
if not geometry_data or not parameter_variations:
return [
TextContent(
type="text",
text="Error: base_geometry and parameter_variations are required",
)
]
# Create geometry object
base_geometry = RocketGeometry(**geometry_data)
# Run sensitivity analysis
result = trajectory_sensitivity_analysis(
base_geometry, parameter_variations, objective
)
# Format response
result_lines = [
f"Trajectory Sensitivity Analysis ({objective})",
"=" * 60,
f"Baseline {objective.replace('_', ' ').title()}: {result['baseline_value']:.1f}",
f"Parameters Analyzed: {len(parameter_variations)}",
"",
]
# Show sensitivity for each parameter
for param_name, param_results in result["parameter_sensitivities"].items():
result_lines.extend([f"Parameter: {param_name}", "-" * 40])
# Calculate average sensitivity (excluding failed simulations)
valid_results = [
r
for r in param_results
if "sensitivity" in r and r["sensitivity"] is not None
]
if valid_results:
avg_sensitivity = sum(
abs(r["sensitivity"]) for r in valid_results
) / len(valid_results)
result_lines.append(
f" Average |Sensitivity|: {avg_sensitivity:.3f} %/% change"
)
# Show most sensitive point
max_sens = max(
valid_results, key=lambda x: abs(x.get("sensitivity", 0))
)
result_lines.append(
f" Max Sensitivity: {max_sens['sensitivity']:.3f} at {param_name} = {max_sens['parameter_value']:.3f}"
)
# Classification
if avg_sensitivity > 2.0:
sensitivity_class = "HIGH"
elif avg_sensitivity > 0.5:
sensitivity_class = "MEDIUM"
else:
sensitivity_class = "LOW"
result_lines.append(f" Sensitivity Class: {sensitivity_class}")
else:
result_lines.append(" No valid sensitivity data")
result_lines.append("")
# Ranking by average sensitivity
param_sensitivities = {}
for param_name, param_results in result["parameter_sensitivities"].items():
valid_results = [
r
for r in param_results
if "sensitivity" in r and r["sensitivity"] is not None
]
if valid_results:
param_sensitivities[param_name] = sum(
abs(r["sensitivity"]) for r in valid_results
) / len(valid_results)
if param_sensitivities:
sorted_params = sorted(
param_sensitivities.items(), key=lambda x: x[1], reverse=True
)
result_lines.extend(["Parameter Sensitivity Ranking:", "-" * 35])
for i, (param, sens) in enumerate(sorted_params, 1):
result_lines.append(f" {i}. {param}: {sens:.3f}")
# Add JSON data
json_data = json.dumps(result, indent=2)
result_lines.extend(["", "JSON Data:", json_data])
return [TextContent(type="text", text="\n".join(result_lines))]
except Exception as e:
return [
TextContent(
type="text", text=f"Trajectory sensitivity analysis error: {str(e)}"
)
]
async def _handle_elements_to_state_vector(arguments: dict) -> list[TextContent]:
"""Handle orbital elements to state vector conversion."""
try:
from .integrations.orbits import OrbitElements, elements_to_state_vector
elements_data = arguments.get("elements", {})
if not elements_data:
return [TextContent(type="text", text="Error: elements are required")]
# Create elements object
elements = OrbitElements(**elements_data)
# Convert to state vector
state = elements_to_state_vector(elements)
# Format response
result_lines = [
"Orbital Elements → State Vector Conversion",
"=" * 55,
"Input Elements:",
f" Semi-major axis: {elements.semi_major_axis_m / 1000:.1f} km",
f" Eccentricity: {elements.eccentricity:.4f}",
f" Inclination: {elements.inclination_deg:.2f}°",
f" RAAN: {elements.raan_deg:.2f}°",
f" Arg. Periapsis: {elements.arg_periapsis_deg:.2f}°",
f" True Anomaly: {elements.true_anomaly_deg:.2f}°",
"",
"Output State Vector (J2000):",
f" Position: [{state.position_m[0]:.0f}, {state.position_m[1]:.0f}, {state.position_m[2]:.0f}] m",
f" Velocity: [{state.velocity_ms[0]:.3f}, {state.velocity_ms[1]:.3f}, {state.velocity_ms[2]:.3f}] m/s",
f" Epoch: {state.epoch_utc}",
f" Frame: {state.frame}",
]
# Add JSON data
json_data = json.dumps(asdict(state), indent=2)
result_lines.extend(["", "JSON Data:", json_data])
return [TextContent(type="text", text="\n".join(result_lines))]
except Exception as e:
return [
TextContent(type="text", text=f"Elements to state vector error: {str(e)}")
]
async def _handle_state_vector_to_elements(arguments: dict) -> list[TextContent]:
"""Handle state vector to orbital elements conversion."""
try:
from .integrations.orbits import (
StateVector,
calculate_orbit_properties,
state_vector_to_elements,
)
state_data = arguments.get("state", {})
if not state_data:
return [TextContent(type="text", text="Error: state is required")]
# Create state vector object
state = StateVector(**state_data)
# Convert to elements
elements = state_vector_to_elements(state)
# Calculate orbital properties
properties = calculate_orbit_properties(elements)
# Format response
result_lines = [
"State Vector → Orbital Elements Conversion",
"=" * 55,
"Input State Vector:",
f" Position: [{state.position_m[0]:.0f}, {state.position_m[1]:.0f}, {state.position_m[2]:.0f}] m",
f" Velocity: [{state.velocity_ms[0]:.3f}, {state.velocity_ms[1]:.3f}, {state.velocity_ms[2]:.3f}] m/s",
"",
"Output Elements:",
f" Semi-major axis: {elements.semi_major_axis_m / 1000:.1f} km",
f" Eccentricity: {elements.eccentricity:.4f}",
f" Inclination: {elements.inclination_deg:.2f}°",
f" RAAN: {elements.raan_deg:.2f}°",
f" Arg. Periapsis: {elements.arg_periapsis_deg:.2f}°",
f" True Anomaly: {elements.true_anomaly_deg:.2f}°",
"",
"Orbital Properties:",
f" Period: {properties.period_s / 3600:.2f} hours",
f" Apoapsis: {properties.apoapsis_m / 1000:.1f} km altitude",
f" Periapsis: {properties.periapsis_m / 1000:.1f} km altitude",
f" Energy: {properties.energy_j_kg / 1000:.1f} kJ/kg",
]
# Add JSON data
combined_data = {"elements": asdict(elements), "properties": asdict(properties)}
json_data = json.dumps(combined_data, indent=2)
result_lines.extend(["", "JSON Data:", json_data])
return [TextContent(type="text", text="\n".join(result_lines))]
except Exception as e:
return [
TextContent(type="text", text=f"State vector to elements error: {str(e)}")
]
async def _handle_propagate_orbit_j2(arguments: dict) -> list[TextContent]:
"""Handle J2 orbit propagation."""
try:
from .integrations.orbits import StateVector, propagate_orbit_j2
initial_state_data = arguments.get("initial_state", {})
time_span_s = arguments.get("time_span_s", 3600.0)
time_step_s = arguments.get("time_step_s", 60.0)
if not initial_state_data:
return [TextContent(type="text", text="Error: initial_state is required")]
# Create initial state
initial_state = StateVector(**initial_state_data)
# Propagate orbit
states = propagate_orbit_j2(initial_state, time_span_s, time_step_s)
# Format response
result_lines = [
"J2 Orbit Propagation",
"=" * 40,
f"Time Span: {time_span_s / 3600:.2f} hours ({len(states)} states)",
f"Time Step: {time_step_s:.0f} seconds",
"",
"Initial State:",
f" Position: {vector_magnitude(initial_state.position_m) / 1000:.1f} km altitude",
f" Velocity: {vector_magnitude(initial_state.velocity_ms):.3f} m/s",
"",
"Final State:",
f" Position: {vector_magnitude(states[-1].position_m) / 1000:.1f} km altitude",
f" Velocity: {vector_magnitude(states[-1].velocity_ms):.3f} m/s",
"",
"Propagation Summary:",
f" Total States: {len(states)}",
f" Position Change: {vector_magnitude([states[-1].position_m[i] - states[0].position_m[i] for i in range(3)]) / 1000:.1f} km",
f" Velocity Change: {vector_magnitude([states[-1].velocity_ms[i] - states[0].velocity_ms[i] for i in range(3)]):.3f} m/s",
]
# Sample states for JSON (every 10th state to avoid huge output)
sample_states = states[:: max(1, len(states) // 20)] # Max 20 states
json_data = {
"propagation_summary": {
"total_states": len(states),
"time_span_s": time_span_s,
"time_step_s": time_step_s,
},
"sample_states": [asdict(state) for state in sample_states],
}
result_lines.extend(
["", "JSON Data (sampled states):", json.dumps(json_data, indent=2)]
)
return [TextContent(type="text", text="\n".join(result_lines))]
except Exception as e:
return [TextContent(type="text", text=f"Orbit propagation error: {str(e)}")]
async def _handle_calculate_ground_track(arguments: dict) -> list[TextContent]:
"""Handle ground track calculation."""
try:
from .integrations.orbits import StateVector, calculate_ground_track
orbit_states_data = arguments.get("orbit_states", [])
time_step_s = arguments.get("time_step_s", 60.0)
if not orbit_states_data:
return [TextContent(type="text", text="Error: orbit_states are required")]
# Create state vector objects
orbit_states = [StateVector(**state_data) for state_data in orbit_states_data]
# Calculate ground track
ground_track = calculate_ground_track(orbit_states, time_step_s)
# Format response
result_lines = [
"Ground Track Calculation",
"=" * 35,
f"Orbital States: {len(orbit_states)}",
f"Ground Track Points: {len(ground_track)}",
"",
"Ground Track Summary:",
f" Latitude Range: {min(p.latitude_deg for p in ground_track):.2f}° to {max(p.latitude_deg for p in ground_track):.2f}°",
f" Longitude Range: {min(p.longitude_deg for p in ground_track):.2f}° to {max(p.longitude_deg for p in ground_track):.2f}°",
f" Altitude Range: {min(p.altitude_m for p in ground_track) / 1000:.1f} to {max(p.altitude_m for p in ground_track) / 1000:.1f} km",
"",
"Sample Ground Track Points:",
]
# Show sample points
sample_points = ground_track[
:: max(1, len(ground_track) // 10)
] # Max 10 points
for i, point in enumerate(sample_points):
result_lines.append(
f" Point {i + 1}: {point.latitude_deg:.2f}°N, {point.longitude_deg:.2f}°E, {point.altitude_m / 1000:.1f}km"
)
# Add JSON data
json_data = {
"ground_track_summary": {
"total_points": len(ground_track),
"latitude_range": [
min(p.latitude_deg for p in ground_track),
max(p.latitude_deg for p in ground_track),
],
"longitude_range": [
min(p.longitude_deg for p in ground_track),
max(p.longitude_deg for p in ground_track),
],
},
"ground_track_points": [asdict(point) for point in ground_track],
}
result_lines.extend(["", "JSON Data:", json.dumps(json_data, indent=2)])
return [TextContent(type="text", text="\n".join(result_lines))]
except Exception as e:
return [
TextContent(type="text", text=f"Ground track calculation error: {str(e)}")
]
async def _handle_hohmann_transfer(arguments: dict) -> list[TextContent]:
"""Handle Hohmann transfer calculation."""
try:
from .integrations.orbits import hohmann_transfer
r1_m = arguments.get("r1_m")
r2_m = arguments.get("r2_m")
if r1_m is None or r2_m is None:
return [TextContent(type="text", text="Error: r1_m and r2_m are required")]
# Calculate Hohmann transfer
transfer = hohmann_transfer(r1_m, r2_m)
# Format response
result_lines = [
"Hohmann Transfer Analysis",
"=" * 35,
f"Initial Orbit: {r1_m / 1000:.0f} km radius ({(r1_m - 6.378137e6) / 1000:.0f} km altitude)",
f"Final Orbit: {r2_m / 1000:.0f} km radius ({(r2_m - 6.378137e6) / 1000:.0f} km altitude)",
"",
"Transfer Requirements:",
f" First Burn (ΔV₁): {transfer['delta_v_1_ms']:.1f} m/s",
f" Second Burn (ΔV₂): {transfer['delta_v_2_ms']:.1f} m/s",
f" Total ΔV: {transfer['delta_v_total_ms']:.1f} m/s",
"",
"Transfer Orbit:",
f" Semi-major Axis: {transfer['semi_major_axis_m'] / 1000:.0f} km",
f" Transfer Time: {transfer['transfer_time_h']:.2f} hours",
"",
"Mission Summary:",
f" Orbit Ratio: {r2_m / r1_m:.2f}",
f" Altitude Change: {(r2_m - r1_m) / 1000:.0f} km",
f" Transfer Efficiency: {transfer['delta_v_total_ms'] / (abs(math.sqrt(3.986004418e14 / r1_m) - math.sqrt(3.986004418e14 / r2_m))):.2f}",
]
# Add JSON data
json_data = json.dumps(transfer, indent=2)
result_lines.extend(["", "JSON Data:", json_data])
return [TextContent(type="text", text="\n".join(result_lines))]
except Exception as e:
return [TextContent(type="text", text=f"Hohmann transfer error: {str(e)}")]
async def _handle_orbital_rendezvous_planning(arguments: dict) -> list[TextContent]:
"""Handle orbital rendezvous planning."""
try:
from .integrations.orbits import OrbitElements, orbital_rendezvous_planning
chaser_data = arguments.get("chaser_elements", {})
target_data = arguments.get("target_elements", {})
if not chaser_data or not target_data:
return [
TextContent(
type="text",
text="Error: both chaser_elements and target_elements are required",
)
]
# Create elements objects
chaser_elements = OrbitElements(**chaser_data)
target_elements = OrbitElements(**target_data)
# Plan rendezvous
plan = orbital_rendezvous_planning(chaser_elements, target_elements)
# Format response
result_lines = [
"Orbital Rendezvous Planning",
"=" * 40,
"Spacecraft Orbits:",
f" Chaser: {chaser_elements.semi_major_axis_m / 1000:.1f} km × {chaser_elements.eccentricity:.3f} e × {chaser_elements.inclination_deg:.1f}° i",
f" Target: {target_elements.semi_major_axis_m / 1000:.1f} km × {target_elements.eccentricity:.3f} e × {target_elements.inclination_deg:.1f}° i",
"",
"Rendezvous Analysis:",
f" Relative Distance: {plan['relative_distance_km']:.1f} km",
f" Phase Angle: {plan['phase_angle_deg']:.1f}°",
f" Altitude Difference: {plan['altitude_difference_m'] / 1000:.1f} km",
"",
"Timing Analysis:",
f" Chaser Period: {plan['chaser_period_s'] / 3600:.2f} hours",
f" Target Period: {plan['target_period_s'] / 3600:.2f} hours",
f" Period Difference: {plan['period_difference_s']:.0f} seconds",
]
if plan["phasing_time_h"] != float("inf"):
result_lines.append(f" Phasing Time: {plan['phasing_time_h']:.1f} hours")
else:
result_lines.append(" Phasing Time: ∞ (similar periods)")
result_lines.extend(
[
"",
"Mission Assessment:",
f" Feasibility: {plan['feasibility']}",
f" Est. Circularization ΔV: {plan['estimated_circularization_dv_ms']:.1f} m/s",
]
)
# Add recommendations
if plan["feasibility"] == "Good":
result_lines.append(" ✓ Favorable rendezvous conditions")
else:
result_lines.append(" ⚠ Challenging rendezvous - consider phasing orbits")
# Add JSON data
json_data = json.dumps(plan, indent=2)
result_lines.extend(["", "JSON Data:", json_data])
return [TextContent(type="text", text="\n".join(result_lines))]
except Exception as e:
return [TextContent(type="text", text=f"Rendezvous planning error: {str(e)}")]
async def _handle_genetic_algorithm_optimization(arguments: dict) -> list[TextContent]:
"""Handle genetic algorithm trajectory optimization."""
try:
from .integrations.gnc import (
GeneticAlgorithm,
GeneticAlgorithmParams,
OptimizationConstraints,
OptimizationObjective,
TrajectoryWaypoint,
)
initial_trajectory_data = arguments.get("initial_trajectory", [])
objective_data = arguments.get("objective", {})
constraints_data = arguments.get("constraints", {})
ga_params_data = arguments.get("ga_params", {})
if not initial_trajectory_data or not objective_data:
return [
TextContent(
type="text",
text="Error: initial_trajectory and objective are required",
)
]
# Create objects
initial_trajectory = [
TrajectoryWaypoint(**wp) for wp in initial_trajectory_data
]
objective = OptimizationObjective(**objective_data)
constraints = OptimizationConstraints(**constraints_data)
ga_params = GeneticAlgorithmParams(**ga_params_data)
# Run optimization
ga = GeneticAlgorithm(ga_params)
result = ga.optimize(initial_trajectory, objective, constraints)
# Format response
result_lines = [
"Genetic Algorithm Trajectory Optimization",
"=" * 55,
f"Objective: {objective.type.replace('_', ' ').title()}",
f"Waypoints: {len(initial_trajectory)}",
"",
"Algorithm Parameters:",
f" Population Size: {ga_params.population_size}",
f" Generations: {ga_params.generations}",
f" Mutation Rate: {ga_params.mutation_rate:.2f}",
f" Crossover Rate: {ga_params.crossover_rate:.2f}",
"",
"Optimization Results:",
f" Converged: {'✓ Yes' if result.converged else '✗ No'}",
f" Iterations: {result.iterations}",
f" Computation Time: {result.computation_time_s:.2f} seconds",
f" Optimal Cost: {result.optimal_cost:.3f}",
"",
"Trajectory Performance:",
f" Total ΔV: {result.delta_v_total_ms:.1f} m/s",
f" Fuel Mass: {result.fuel_mass_kg:.2f} kg",
f" Flight Time: {result.optimal_trajectory[-1].time_s - result.optimal_trajectory[0].time_s:.0f} seconds",
]
if result.converged:
result_lines.append(" ✓ Optimization successful")
else:
result_lines.append(" ⚠ Optimization may not have converged fully")
# Add JSON data (summary only to avoid huge output)
json_data = {
"optimization_summary": {
"algorithm": result.algorithm,
"converged": result.converged,
"iterations": result.iterations,
"computation_time_s": result.computation_time_s,
"optimal_cost": result.optimal_cost,
"delta_v_total_ms": result.delta_v_total_ms,
"fuel_mass_kg": result.fuel_mass_kg,
},
"trajectory_length": len(result.optimal_trajectory),
}
result_lines.extend(
["", "JSON Data (summary):", json.dumps(json_data, indent=2)]
)
return [TextContent(type="text", text="\n".join(result_lines))]
except Exception as e:
return [
TextContent(
type="text", text=f"Genetic algorithm optimization error: {str(e)}"
)
]
async def _handle_particle_swarm_optimization(arguments: dict) -> list[TextContent]:
"""Handle particle swarm optimization."""
try:
from .integrations.gnc import (
OptimizationConstraints,
OptimizationObjective,
ParticleSwarmOptimizer,
ParticleSwarmParams,
TrajectoryWaypoint,
)
initial_trajectory_data = arguments.get("initial_trajectory", [])
objective_data = arguments.get("objective", {})
constraints_data = arguments.get("constraints", {})
pso_params_data = arguments.get("pso_params", {})
if not initial_trajectory_data or not objective_data:
return [
TextContent(
type="text",
text="Error: initial_trajectory and objective are required",
)
]
# Create objects
initial_trajectory = [
TrajectoryWaypoint(**wp) for wp in initial_trajectory_data
]
objective = OptimizationObjective(**objective_data)
constraints = OptimizationConstraints(**constraints_data)
pso_params = ParticleSwarmParams(**pso_params_data)
# Run optimization
pso = ParticleSwarmOptimizer(pso_params)
result = pso.optimize(initial_trajectory, objective, constraints)
# Format response
result_lines = [
"Particle Swarm Trajectory Optimization",
"=" * 50,
f"Objective: {objective.type.replace('_', ' ').title()}",
f"Waypoints: {len(initial_trajectory)}",
"",
"Algorithm Parameters:",
f" Particles: {pso_params.num_particles}",
f" Max Iterations: {pso_params.max_iterations}",
f" Inertia Weight (w): {pso_params.w:.2f}",
f" Cognitive (c1): {pso_params.c1:.2f}",
f" Social (c2): {pso_params.c2:.2f}",
"",
"Optimization Results:",
f" Converged: {'✓ Yes' if result.converged else '✗ No'}",
f" Iterations: {result.iterations}",
f" Computation Time: {result.computation_time_s:.2f} seconds",
f" Optimal Cost: {result.optimal_cost:.3f}",
"",
"Trajectory Performance:",
f" Total ΔV: {result.delta_v_total_ms:.1f} m/s",
f" Fuel Mass: {result.fuel_mass_kg:.2f} kg",
f" Flight Time: {result.optimal_trajectory[-1].time_s - result.optimal_trajectory[0].time_s:.0f} seconds",
]
# Add JSON data (summary only)
json_data = {
"optimization_summary": {
"algorithm": result.algorithm,
"converged": result.converged,
"iterations": result.iterations,
"computation_time_s": result.computation_time_s,
"optimal_cost": result.optimal_cost,
"delta_v_total_ms": result.delta_v_total_ms,
"fuel_mass_kg": result.fuel_mass_kg,
},
"trajectory_length": len(result.optimal_trajectory),
}
result_lines.extend(
["", "JSON Data (summary):", json.dumps(json_data, indent=2)]
)
return [TextContent(type="text", text="\n".join(result_lines))]
except Exception as e:
return [
TextContent(
type="text", text=f"Particle swarm optimization error: {str(e)}"
)
]
async def _handle_porkchop_plot_analysis(arguments: dict) -> list[TextContent]:
"""Handle porkchop plot analysis requests."""
try:
from .integrations.orbits import porkchop_plot_analysis
departure_body = arguments.get("departure_body", "Earth")
arrival_body = arguments.get("arrival_body", "Mars")
departure_dates = arguments.get("departure_dates")
arrival_dates = arguments.get("arrival_dates")
min_tof_days = arguments.get("min_tof_days", 100)
max_tof_days = arguments.get("max_tof_days", 400)
# Run porkchop analysis
analysis = porkchop_plot_analysis(
departure_body,
arrival_body,
departure_dates,
arrival_dates,
min_tof_days,
max_tof_days,
)
# Format response
result_lines = [
f"Porkchop Plot Analysis: {departure_body} to {arrival_body}",
"=" * 60,
"Transfer Opportunities Analysis",
f"Time of Flight Range: {min_tof_days} - {max_tof_days} days",
"",
]
stats = analysis["summary_statistics"]
if stats["feasible_transfers"] > 0:
result_lines.extend(
[
f"Feasible Transfers: {stats['feasible_transfers']} of {stats['total_transfers_computed']} computed",
f"C3 Range: {stats['min_c3_km2_s2']:.2f} - {stats['max_c3_km2_s2']:.2f} km²/s²",
f"Mean C3: {stats['mean_c3_km2_s2']:.2f} km²/s²",
f"TOF Range: {stats['min_tof_days']:.0f} - {stats['max_tof_days']:.0f} days",
"",
]
)
if analysis["optimal_transfer"]:
opt = analysis["optimal_transfer"]
result_lines.extend(
[
"Optimal Transfer (Minimum C3):",
f" Departure: {opt['departure_date']}",
f" Arrival: {opt['arrival_date']}",
f" Time of Flight: {opt['time_of_flight_days']:.1f} days",
f" C3: {opt['c3_km2_s2']:.2f} km²/s²",
f" Delta-V Estimate: {opt['delta_v_ms'] / 1000:.2f} km/s",
"",
]
)
else:
result_lines.extend(
[
"⚠ No feasible transfers found in the specified date range.",
"Consider adjusting time-of-flight constraints or date ranges.",
"",
]
)
# Add sample of transfer opportunities
if analysis["transfer_grid"]:
result_lines.extend(
[
"Transfer Grid Sample (first 10 opportunities):",
f"{'Departure':>12} {'Arrival':>12} {'TOF(d)':>8} {'C3':>8} {'Feasible':>10}",
"-" * 65,
]
)
for _i, transfer in enumerate(analysis["transfer_grid"][:10]):
dep_short = transfer["departure_date"][:10] # Just the date part
arr_short = transfer["arrival_date"][:10]
tof = transfer["time_of_flight_days"]
c3 = (
transfer["c3_km2_s2"]
if transfer["c3_km2_s2"] != float("inf")
else 999.9
)
feasible = "Yes" if transfer["transfer_feasible"] else "No"
result_lines.append(
f"{dep_short:>12} {arr_short:>12} {tof:8.1f} {c3:8.2f} {feasible:>10}"
)
result_lines.extend(["", analysis["note"]])
# Add JSON data for the full transfer grid
import json
json_data = json.dumps(analysis, indent=2, default=str)
result_lines.extend(["", "Full Transfer Grid JSON:", json_data])
return [TextContent(type="text", text="\n".join(result_lines))]
except Exception as e:
return [TextContent(type="text", text=f"Porkchop analysis error: {str(e)}")]
async def _handle_monte_carlo_uncertainty_analysis(
arguments: dict,
) -> list[TextContent]:
"""Handle Monte Carlo uncertainty analysis."""
try:
from .integrations.gnc import (
TrajectoryWaypoint,
monte_carlo_uncertainty_analysis,
)
nominal_trajectory_data = arguments.get("nominal_trajectory", [])
uncertainty_params = arguments.get("uncertainty_params", {})
n_samples = arguments.get("n_samples", 1000)
if not nominal_trajectory_data:
return [
TextContent(type="text", text="Error: nominal_trajectory is required")
]
# Create trajectory waypoints
nominal_trajectory = [
TrajectoryWaypoint(**wp) for wp in nominal_trajectory_data
]
# Run Monte Carlo analysis
analysis = monte_carlo_uncertainty_analysis(
nominal_trajectory, uncertainty_params, n_samples
)
if "error" in analysis:
return [
TextContent(
type="text", text=f"Monte Carlo analysis error: {analysis['error']}"
)
]
# Format response
result_lines = [
"Monte Carlo Uncertainty Analysis",
"=" * 45,
f"Nominal Trajectory: {len(nominal_trajectory)} waypoints",
f"Monte Carlo Samples: {analysis['n_samples']}",
"",
"Delta-V Uncertainty:",
f" Mean: {analysis['delta_v_statistics']['mean_ms']:.1f} ± {analysis['delta_v_statistics']['std_ms']:.1f} m/s",
f" Range: {analysis['delta_v_statistics']['min_ms']:.1f} to {analysis['delta_v_statistics']['max_ms']:.1f} m/s",
f" 95% Confidence: [{analysis['confidence_intervals']['delta_v_95_ms'][0]:.1f}, {analysis['confidence_intervals']['delta_v_95_ms'][1]:.1f}] m/s",
"",
"Flight Time Uncertainty:",
f" Mean: {analysis['flight_time_statistics']['mean_s']:.0f} ± {analysis['flight_time_statistics']['std_s']:.0f} seconds",
f" Range: {analysis['flight_time_statistics']['min_s']:.0f} to {analysis['flight_time_statistics']['max_s']:.0f} seconds",
"",
"Position Error Statistics:",
f" Mean Error: {analysis['position_error_statistics']['mean_m']:.0f} ± {analysis['position_error_statistics']['std_m']:.0f} m",
f" Maximum Error: {analysis['position_error_statistics']['max_m']:.0f} m",
f" 95% Confidence: [{analysis['confidence_intervals']['position_95_m'][0]:.0f}, {analysis['confidence_intervals']['position_95_m'][1]:.0f}] m",
]
# Assessment
delta_v_uncertainty = (
analysis["delta_v_statistics"]["std_ms"]
/ analysis["delta_v_statistics"]["mean_ms"]
* 100
)
pos_uncertainty = analysis["position_error_statistics"]["std_m"]
result_lines.extend(
[
"",
"Uncertainty Assessment:",
f" Delta-V Uncertainty: {delta_v_uncertainty:.1f}%",
f" Position Uncertainty: {pos_uncertainty / 1000:.1f} km (1σ)",
]
)
if delta_v_uncertainty < 5.0:
result_lines.append(" ✓ Low delta-V uncertainty")
elif delta_v_uncertainty < 15.0:
result_lines.append(" ⚠ Moderate delta-V uncertainty")
else:
result_lines.append(" ⚠ High delta-V uncertainty - review mission design")
if pos_uncertainty < 1000:
result_lines.append(" ✓ Good position accuracy")
elif pos_uncertainty < 10000:
result_lines.append(" ⚠ Moderate position accuracy")
else:
result_lines.append(" ⚠ Large position uncertainties")
# Add JSON data
json_data = json.dumps(analysis, indent=2)
result_lines.extend(["", "JSON Data:", json_data])
return [TextContent(type="text", text="\n".join(result_lines))]
except Exception as e:
return [
TextContent(
type="text", text=f"Monte Carlo uncertainty analysis error: {str(e)}"
)
]
def run_stdio():
"""Run the MCP server with stdio transport."""
async def _main():
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await server.run(
read_stream, write_stream, server.create_initialization_options()
)
asyncio.run(_main())
def run_sse(host: str = "localhost", port: int = 8001):
"""Run the MCP server with SSE transport."""
import mcp.server.sse
sse_app = mcp.server.sse.SseServerTransport("/sse")
async def _main():
async with sse_app.run_server() as server_context:
await server.run(
server_context.read_stream,
server_context.write_stream,
server.create_initialization_options(),
)
asyncio.run(_main())
def run():
"""Main entry point - defaults to stdio."""
import sys
if len(sys.argv) > 1 and sys.argv[1] == "sse":
host = sys.argv[2] if len(sys.argv) > 2 else "localhost"
port = int(sys.argv[3]) if len(sys.argv) > 3 else 8001
run_sse(host, port)
else:
run_stdio()
if __name__ == "__main__":
run()