"""Core satellite orbital mechanics calculations using Skyfield library."""
import logging
import math
import re
from datetime import datetime, timezone
from typing import Dict, List, Optional, Tuple, Any
from dataclasses import dataclass, asdict
import numpy as np
from skyfield.api import Topos, load, EarthSatellite, wgs84
from skyfield.timelib import Time
from skyfield.units import Angle
from skyfield.almanac import find_discrete, risings_and_settings
from skyfield.searchlib import find_minima
from .world_cities import lookup_city, search_cities
logger = logging.getLogger(__name__)
def calculate_tle_checksum(line: str) -> int:
"""Calculate TLE checksum."""
checksum = 0
for char in line[:-1]: # Exclude the checksum digit itself
if char.isdigit():
checksum += int(char)
elif char == '-':
checksum += 1
return checksum % 10
# Orbital type definitions with standard parameters
ORBIT_TYPES = {
"LEO": {
"description": "Low Earth Orbit",
"typical_altitude_km": 400,
"altitude_range": (160, 2000),
"inclination_range": (0, 180),
"eccentricity": 0.0001,
"typical_period_minutes": 90
},
"MEO": {
"description": "Medium Earth Orbit",
"typical_altitude_km": 20200,
"altitude_range": (2000, 35786),
"inclination_range": (0, 180),
"eccentricity": 0.001,
"typical_period_minutes": 720
},
"GEO": {
"description": "Geostationary Earth Orbit",
"typical_altitude_km": 35786,
"altitude_range": (35786, 35786),
"inclination_range": (0, 0.1),
"eccentricity": 0.0001,
"typical_period_minutes": 1436.1
},
"SSO": {
"description": "Sun-Synchronous Orbit",
"typical_altitude_km": 800,
"altitude_range": (600, 1500),
"inclination_range": (96, 102),
"eccentricity": 0.001,
"typical_period_minutes": 100
},
"MOLNIYA": {
"description": "Molniya Orbit (highly elliptical)",
"typical_altitude_km": 19100, # Semi-major axis altitude equivalent
"altitude_range": (500, 39300),
"inclination_range": (63, 65),
"eccentricity": 0.74,
"typical_period_minutes": 718
},
"POLAR": {
"description": "Polar Orbit",
"typical_altitude_km": 800,
"altitude_range": (300, 1500),
"inclination_range": (88, 92),
"eccentricity": 0.001,
"typical_period_minutes": 100
}
}
class OrbitalElementsParser:
"""Parse natural language text to extract orbital parameters."""
def __init__(self):
"""Initialize the parser with regex patterns."""
# Altitude patterns
self.altitude_patterns = [
r'(\d+(?:\.\d+)?)\s*km(?:\s+altitude)?',
r'altitude\s+(?:of\s+)?(\d+(?:\.\d+)?)\s*km',
r'at\s+(\d+(?:\.\d+)?)\s*km',
r'(\d+(?:\.\d+)?)\s*kilometers?(?:\s+altitude)?'
]
# Inclination patterns
self.inclination_patterns = [
r'inclination\s+(?:of\s+)?(\d+(?:\.\d+)?)\s*(?:degrees?|°)',
r'(\d+(?:\.\d+)?)\s*(?:degrees?|°)\s+inclination',
r'inclined\s+at\s+(\d+(?:\.\d+)?)\s*(?:degrees?|°)'
]
# Orbit type patterns
self.orbit_type_patterns = {
'LEO': [r'low\s+earth\s+orbit', r'\bLEO\b'],
'MEO': [r'medium\s+earth\s+orbit', r'\bMEO\b'],
'GEO': [r'geostationary', r'\bGEO\b', r'geosynchronous'],
'SSO': [r'sun[\s-]synchronous', r'\bSSO\b'],
'MOLNIYA': [r'molniya', r'highly\s+elliptical'],
'POLAR': [r'polar\s+orbit', r'\bpolar\b']
}
# Location patterns for reference
self.location_patterns = [
r'over\s+([a-zA-Z\s,]+)',
r'above\s+([a-zA-Z\s,]+)',
r'passing\s+over\s+([a-zA-Z\s,]+)'
]
def parse_text(self, text: str) -> Dict[str, Any]:
"""Parse natural language text to extract orbital parameters."""
text = text.lower().strip()
result = {
'altitude_km': None,
'inclination_deg': None,
'orbit_type': None,
'reference_location': None,
'eccentricity': None,
'parsed_elements': []
}
# Extract altitude
for pattern in self.altitude_patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
result['altitude_km'] = float(match.group(1))
result['parsed_elements'].append(f"altitude: {result['altitude_km']} km")
break
# Extract inclination
for pattern in self.inclination_patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
result['inclination_deg'] = float(match.group(1))
result['parsed_elements'].append(f"inclination: {result['inclination_deg']}°")
break
# Extract orbit type
for orbit_type, patterns in self.orbit_type_patterns.items():
for pattern in patterns:
if re.search(pattern, text, re.IGNORECASE):
result['orbit_type'] = orbit_type
result['parsed_elements'].append(f"orbit type: {orbit_type}")
break
if result['orbit_type']:
break
# Extract reference location
for pattern in self.location_patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
result['reference_location'] = match.group(1).strip()
result['parsed_elements'].append(f"reference location: {result['reference_location']}")
break
return result
class TLEGenerator:
"""Generate TLE data from orbital elements."""
def __init__(self):
"""Initialize TLE generator."""
self.earth_radius_km = 6371.0
self.earth_mu = 398600.4418 # km³/s²
self.parser = OrbitalElementsParser()
def altitude_to_semi_major_axis(self, altitude_km: float) -> float:
"""Convert altitude to semi-major axis."""
return altitude_km + self.earth_radius_km
def semi_major_axis_to_mean_motion(self, semi_major_axis_km: float) -> float:
"""Calculate mean motion (revolutions per day) from semi-major axis."""
# Kepler's third law: n = sqrt(μ/a³)
n_rad_per_sec = math.sqrt(self.earth_mu / (semi_major_axis_km ** 3))
n_rev_per_day = n_rad_per_sec * 86400 / (2 * math.pi)
return n_rev_per_day
def calculate_sso_inclination(self, altitude_km: float) -> float:
"""Calculate inclination for sun-synchronous orbit at given altitude."""
# Simplified calculation for SSO inclination
# Real calculation involves J2 perturbation, this is an approximation
a = self.altitude_to_semi_major_axis(altitude_km)
# Approximate formula for SSO inclination
inclination_rad = math.acos(-((a / 12352) ** 3.5))
return math.degrees(inclination_rad)
def generate_orbital_elements(self, parsed_params: Dict[str, Any]) -> Dict[str, Any]:
"""Generate complete orbital elements from parsed parameters."""
elements = {
'altitude_km': 400, # Default LEO altitude
'inclination_deg': 51.6, # Default ISS-like inclination
'eccentricity': 0.0001, # Nearly circular
'argument_of_perigee_deg': 0.0,
'raan_deg': 0.0, # Right Ascension of Ascending Node
'mean_anomaly_deg': 0.0,
'orbit_type': 'LEO'
}
# Use parsed parameters
if parsed_params.get('altitude_km'):
elements['altitude_km'] = parsed_params['altitude_km']
if parsed_params.get('inclination_deg'):
elements['inclination_deg'] = parsed_params['inclination_deg']
if parsed_params.get('orbit_type'):
elements['orbit_type'] = parsed_params['orbit_type']
orbit_def = ORBIT_TYPES.get(parsed_params['orbit_type'], {})
# Apply orbit type defaults if not explicitly specified
if not parsed_params.get('altitude_km'):
elements['altitude_km'] = orbit_def.get('typical_altitude_km', 400)
if not parsed_params.get('inclination_deg'):
if parsed_params['orbit_type'] == 'GEO':
elements['inclination_deg'] = 0.0
elif parsed_params['orbit_type'] == 'SSO':
elements['inclination_deg'] = self.calculate_sso_inclination(elements['altitude_km'])
elif parsed_params['orbit_type'] == 'POLAR':
elements['inclination_deg'] = 90.0
elif parsed_params['orbit_type'] == 'MOLNIYA':
elements['inclination_deg'] = 63.4
else:
# Use range midpoint for other types
inc_range = orbit_def.get('inclination_range', (51, 52))
elements['inclination_deg'] = (inc_range[0] + inc_range[1]) / 2
# Set eccentricity based on orbit type
elements['eccentricity'] = orbit_def.get('eccentricity', 0.0001)
# Calculate derived parameters
elements['semi_major_axis_km'] = self.altitude_to_semi_major_axis(elements['altitude_km'])
elements['mean_motion_rev_per_day'] = self.semi_major_axis_to_mean_motion(elements['semi_major_axis_km'])
elements['orbital_period_minutes'] = 1440.0 / elements['mean_motion_rev_per_day']
return elements
def generate_tle_from_elements(self, elements: Dict[str, Any], satellite_number: int = None) -> Tuple[str, str]:
"""Generate TLE lines from orbital elements."""
# Use dummy satellite number in range 90000-99999 if not provided
if satellite_number is None:
satellite_number = 90000 + hash(str(elements)) % 10000
# Current epoch
now = datetime.now(timezone.utc)
epoch_year = now.year % 100 # Two-digit year
day_of_year = now.timetuple().tm_yday
fraction_of_day = (now.hour * 3600 + now.minute * 60 + now.second) / 86400.0
epoch_day = day_of_year + fraction_of_day
# Format TLE Line 1 (without checksum)
line1_base = f"1 {satellite_number:05d}U 24001A {epoch_year:02d}{epoch_day:012.8f} .00001000 00000-0 10000-4 0 999"
# Format TLE Line 2 (without checksum)
inclination = elements['inclination_deg']
raan = elements['raan_deg']
eccentricity_str = f"{int(elements['eccentricity'] * 10000000):07d}"
arg_perigee = elements['argument_of_perigee_deg']
mean_anomaly = elements['mean_anomaly_deg']
mean_motion = elements['mean_motion_rev_per_day']
rev_number = 1 # Dummy revolution number
line2_base = f"2 {satellite_number:05d} {inclination:8.4f} {raan:8.4f} {eccentricity_str} {arg_perigee:8.4f} {mean_anomaly:8.4f} {mean_motion:11.8f}{rev_number:5d}"
# Calculate and append checksums
checksum1 = calculate_tle_checksum(line1_base + "0")
checksum2 = calculate_tle_checksum(line2_base + "0")
line1_final = line1_base + str(checksum1)
line2_final = line2_base + str(checksum2)
return line1_final, line2_final
def parse_and_generate_tle(self, text: str) -> Dict[str, Any]:
"""Parse natural language text and generate TLE."""
# Parse the text
parsed_params = self.parser.parse_text(text)
# Generate orbital elements
elements = self.generate_orbital_elements(parsed_params)
# Generate TLE
line1, line2 = self.generate_tle_from_elements(elements)
return {
'parsed_text': text,
'parsed_parameters': parsed_params,
'orbital_elements': elements,
'tle': {
'line1': line1,
'line2': line2
},
'summary': {
'satellite_type': elements['orbit_type'],
'altitude_km': elements['altitude_km'],
'inclination_deg': round(elements['inclination_deg'], 2),
'period_minutes': round(elements['orbital_period_minutes'], 2),
'eccentricity': elements['eccentricity']
}
}
@dataclass
class AccessWindow:
"""Represents a satellite access window over a ground station."""
aos_time: datetime
los_time: datetime
culmination_time: datetime
duration_seconds: float
max_elevation_deg: float
aos_azimuth_deg: float
los_azimuth_deg: float
culmination_azimuth_deg: float
# Lighting conditions during culmination
ground_lighting: Dict[str, Any] # Dawn/dusk conditions at ground station
satellite_lighting: Dict[str, Any] # Sunlight/eclipse conditions for satellite
@dataclass
class TLEValidationResult:
"""Result of TLE validation."""
is_valid: bool
errors: List[str]
satellite_number: Optional[int]
classification: Optional[str]
international_designator: Optional[str]
epoch: Optional[datetime]
mean_motion: Optional[float]
eccentricity: Optional[float]
inclination_deg: Optional[float]
orbital_period_minutes: Optional[float]
class SatelliteCalculator:
"""Handles satellite orbital mechanics calculations."""
def __init__(self):
"""Initialize calculator with Skyfield timescale."""
self.ts = load.timescale()
self.eph = load('de421.bsp') # Load planetary ephemeris
self.sun = self.eph['sun']
self.earth = self.eph['earth']
self.tle_generator = TLEGenerator()
def validate_tle(self, line1: str, line2: str) -> TLEValidationResult:
"""Validate Two-Line Element data and extract orbital parameters."""
errors = []
# Basic format validation
if len(line1) != 69:
errors.append(f"Line 1 length {len(line1)}, expected 69 characters")
if len(line2) != 69:
errors.append(f"Line 2 length {len(line2)}, expected 69 characters")
if not line1.startswith('1 '):
errors.append("Line 1 must start with '1 '")
if not line2.startswith('2 '):
errors.append("Line 2 must start with '2 '")
# Checksum validation
def calculate_checksum(line: str) -> int:
"""Calculate TLE line checksum."""
checksum = 0
for char in line[:-1]:
if char.isdigit():
checksum += int(char)
elif char == '-':
checksum += 1
return checksum % 10
try:
line1_checksum = int(line1[-1])
if calculate_checksum(line1) != line1_checksum:
errors.append("Line 1 checksum validation failed")
except (ValueError, IndexError):
errors.append("Line 1 checksum format invalid")
try:
line2_checksum = int(line2[-1])
if calculate_checksum(line2) != line2_checksum:
errors.append("Line 2 checksum validation failed")
except (ValueError, IndexError):
errors.append("Line 2 checksum format invalid")
# Satellite number consistency
try:
sat_num_1 = int(line1[2:7])
sat_num_2 = int(line2[2:7])
if sat_num_1 != sat_num_2:
errors.append(f"Satellite numbers don't match: {sat_num_1} vs {sat_num_2}")
except ValueError:
errors.append("Invalid satellite number format")
sat_num_1 = None
# Extract orbital parameters if basic validation passes
satellite_number = None
classification = None
international_designator = None
epoch = None
mean_motion = None
eccentricity = None
inclination_deg = None
orbital_period_minutes = None
if len(errors) == 0:
try:
# Create satellite object to validate orbital parameters
satellite = EarthSatellite(line1, line2, ts=self.ts)
satellite_number = sat_num_1
classification = line1[7]
international_designator = line1[9:17].strip()
# Extract epoch
epoch_year = int(line1[18:20])
epoch_year += 2000 if epoch_year < 57 else 1900 # Two-digit year logic
epoch_day = float(line1[20:32])
from datetime import timedelta
epoch = datetime(epoch_year, 1, 1, tzinfo=timezone.utc)
epoch = epoch.replace(day=1) + timedelta(days=int(epoch_day - 1))
epoch = epoch + timedelta(milliseconds=int((epoch_day % 1) * 24 * 3600 * 1000))
# Orbital parameters
inclination_deg = float(line2[8:16])
eccentricity = float('0.' + line2[26:33])
mean_motion = float(line2[52:63]) # revolutions per day
orbital_period_minutes = 1440.0 / mean_motion if mean_motion > 0 else None
# Validate parameter ranges
if not (0 <= inclination_deg <= 180):
errors.append(f"Invalid inclination: {inclination_deg}°")
if not (0 <= eccentricity < 1):
errors.append(f"Invalid eccentricity: {eccentricity}")
if mean_motion <= 0:
errors.append(f"Invalid mean motion: {mean_motion}")
except Exception as e:
errors.append(f"Failed to parse orbital parameters: {str(e)}")
return TLEValidationResult(
is_valid=len(errors) == 0,
errors=errors,
satellite_number=satellite_number,
classification=classification,
international_designator=international_designator,
epoch=epoch,
mean_motion=mean_motion,
eccentricity=eccentricity,
inclination_deg=inclination_deg,
orbital_period_minutes=orbital_period_minutes
)
def calculate_ground_lighting(self, latitude: float, longitude: float, timestamp: datetime) -> Dict[str, Any]:
"""Calculate ground lighting conditions at a specific time and location."""
# Convert to Skyfield time
t = self.ts.from_datetime(timestamp.replace(tzinfo=timezone.utc))
# Create ground location object
location = wgs84.latlon(latitude, longitude)
# Calculate sun position
sun_apparent = (self.earth + location).at(t).observe(self.sun).apparent()
sun_alt, sun_az, _ = sun_apparent.altaz()
sun_elevation = sun_alt.degrees
# Determine lighting conditions based on sun elevation
if sun_elevation > -0.833: # Above horizon (accounting for atmospheric refraction)
condition = "daylight"
elif sun_elevation > -6: # Civil twilight
condition = "civil_twilight"
elif sun_elevation > -12: # Nautical twilight
condition = "nautical_twilight"
elif sun_elevation > -18: # Astronomical twilight
condition = "astronomical_twilight"
else:
condition = "night"
return {
"condition": condition,
"sun_elevation_deg": round(sun_elevation, 2),
"sun_azimuth_deg": round(sun_az.degrees, 2),
"civil_twilight": sun_elevation > -6,
"nautical_twilight": sun_elevation > -12,
"astronomical_twilight": sun_elevation > -18,
"is_daylight": sun_elevation > -0.833,
"is_night": sun_elevation <= -18
}
def calculate_satellite_lighting(self, satellite: EarthSatellite, timestamp: datetime) -> Dict[str, Any]:
"""Calculate satellite lighting conditions (sunlight/eclipse) at a specific time."""
# Convert to Skyfield time
t = self.ts.from_datetime(timestamp.replace(tzinfo=timezone.utc))
# Get satellite position
geocentric = satellite.at(t)
# Get satellite's subpoint for ground lighting reference
sat_lat = geocentric.subpoint().latitude.degrees
sat_lon = geocentric.subpoint().longitude.degrees
# Calculate ground lighting at satellite's subpoint
ground_lighting = self.calculate_ground_lighting(sat_lat, sat_lon, timestamp)
# Get satellite position vector
sat_position = geocentric.position.km
sat_distance = np.linalg.norm(sat_position)
# Simplified eclipse calculation:
# Satellite is in eclipse if it's on the night side of Earth
# This is an approximation based on the ground track lighting
# For LEO satellites, use ground track lighting as approximation
# For higher satellites, they're more likely to be in sunlight
in_eclipse = False
if sat_distance < 2000: # LEO satellites (below ~2000km)
in_eclipse = ground_lighting["condition"] in ["night", "astronomical_twilight"]
elif sat_distance < 20000: # MEO satellites
in_eclipse = ground_lighting["condition"] == "night"
# GEO and higher satellites are rarely in eclipse, so default to sunlight
return {
"condition": "eclipse" if in_eclipse else "sunlight",
"in_eclipse": in_eclipse,
"in_sunlight": not in_eclipse,
"satellite_altitude_km": round(sat_distance - 6371, 2), # Altitude above Earth surface
"subpoint_latitude": round(sat_lat, 4),
"subpoint_longitude": round(sat_lon, 4),
"ground_track_lighting": ground_lighting["condition"],
"eclipse_calculation": "simplified_altitude_based"
}
def calculate_access_windows(
self,
latitude: float,
longitude: float,
tle_line1: str,
tle_line2: str,
start_time: datetime,
end_time: datetime,
elevation_threshold: float = 10.0,
time_step_seconds: int = 30
) -> List[AccessWindow]:
"""Calculate satellite access windows for a ground station."""
# Validate inputs
if not (-90 <= latitude <= 90):
raise ValueError(f"Invalid latitude: {latitude}")
if not (-180 <= longitude <= 180):
raise ValueError(f"Invalid longitude: {longitude}")
if elevation_threshold < 0 or elevation_threshold > 90:
raise ValueError(f"Invalid elevation threshold: {elevation_threshold}")
if start_time >= end_time:
raise ValueError("Start time must be before end time")
# Validate TLE
tle_validation = self.validate_tle(tle_line1, tle_line2)
if not tle_validation.is_valid:
raise ValueError(f"Invalid TLE: {'; '.join(tle_validation.errors)}")
# Create ground station and satellite objects
ground_station = Topos(latitude_degrees=latitude, longitude_degrees=longitude)
satellite = EarthSatellite(tle_line1, tle_line2, ts=self.ts)
# Create time range
t_start = self.ts.from_datetime(start_time.replace(tzinfo=timezone.utc))
t_end = self.ts.from_datetime(end_time.replace(tzinfo=timezone.utc))
# Calculate satellite positions at regular intervals
time_points = []
current_time = t_start
while current_time.tt <= t_end.tt:
time_points.append(current_time)
current_time = self.ts.tt_jd(current_time.tt + time_step_seconds / 86400.0)
if not time_points:
return []
times = self.ts.tt_jd([t.tt for t in time_points])
# Calculate topocentric coordinates
difference = satellite - ground_station
topocentric = difference.at(times)
elevation, azimuth, distance = topocentric.altaz()
# Find access windows
access_windows = []
in_access = False
aos_idx = None
culmination_idx = None
max_elevation = -90.0
for i, (elev_deg, az_deg) in enumerate(zip(elevation.degrees, azimuth.degrees)):
if elev_deg >= elevation_threshold and not in_access:
# Access start (AOS)
in_access = True
aos_idx = i
culmination_idx = i
max_elevation = elev_deg
elif elev_deg >= elevation_threshold and in_access:
# Continue access - check for new maximum
if elev_deg > max_elevation:
max_elevation = elev_deg
culmination_idx = i
elif elev_deg < elevation_threshold and in_access:
# Access end (LOS)
in_access = False
los_idx = i - 1 if i > 0 else i
# Create access window
aos_time = time_points[aos_idx].utc_datetime()
los_time = time_points[los_idx].utc_datetime()
culmination_time = time_points[culmination_idx].utc_datetime()
duration_seconds = (los_time - aos_time).total_seconds()
# Calculate lighting conditions at culmination
ground_lighting = self.calculate_ground_lighting(latitude, longitude, culmination_time)
satellite_lighting = self.calculate_satellite_lighting(satellite, culmination_time)
access_window = AccessWindow(
aos_time=aos_time,
los_time=los_time,
culmination_time=culmination_time,
duration_seconds=duration_seconds,
max_elevation_deg=max_elevation,
aos_azimuth_deg=azimuth.degrees[aos_idx],
los_azimuth_deg=azimuth.degrees[los_idx],
culmination_azimuth_deg=azimuth.degrees[culmination_idx],
ground_lighting=ground_lighting,
satellite_lighting=satellite_lighting
)
access_windows.append(access_window)
# Handle case where access window extends beyond end time
if in_access and aos_idx is not None:
los_idx = len(time_points) - 1
aos_time = time_points[aos_idx].utc_datetime()
los_time = time_points[los_idx].utc_datetime()
culmination_time = time_points[culmination_idx].utc_datetime()
duration_seconds = (los_time - aos_time).total_seconds()
# Calculate lighting conditions at culmination
ground_lighting = self.calculate_ground_lighting(latitude, longitude, culmination_time)
satellite_lighting = self.calculate_satellite_lighting(satellite, culmination_time)
access_window = AccessWindow(
aos_time=aos_time,
los_time=los_time,
culmination_time=culmination_time,
duration_seconds=duration_seconds,
max_elevation_deg=max_elevation,
aos_azimuth_deg=azimuth.degrees[aos_idx],
los_azimuth_deg=azimuth.degrees[los_idx],
culmination_azimuth_deg=azimuth.degrees[culmination_idx],
ground_lighting=ground_lighting,
satellite_lighting=satellite_lighting
)
access_windows.append(access_window)
logger.info(f"Found {len(access_windows)} access windows")
return access_windows
def parse_locations_from_csv_content(self, csv_content: str) -> List[Dict[str, Any]]:
"""Parse locations from CSV content with flexible format handling."""
import csv
import io
lines = csv_content.strip().split('\n')
if not lines:
raise ValueError("CSV content is empty")
# Parse header to identify columns
reader = csv.DictReader(io.StringIO(csv_content))
locations = []
for row_num, row in enumerate(reader, 1):
try:
location = {}
# Find latitude column (flexible naming)
lat_keys = ['latitude', 'lat', 'Latitude', 'Lat', 'LATITUDE', 'LAT']
lat_value = None
for key in lat_keys:
if key in row and row[key].strip():
lat_value = float(row[key])
break
if lat_value is None:
raise ValueError(f"No latitude column found in row {row_num}")
# Find longitude column (flexible naming)
lon_keys = ['longitude', 'lon', 'lng', 'Longitude', 'Lon', 'Lng', 'LONGITUDE', 'LON', 'LNG']
lon_value = None
for key in lon_keys:
if key in row and row[key].strip():
lon_value = float(row[key])
break
if lon_value is None:
raise ValueError(f"No longitude column found in row {row_num}")
# Find altitude column (optional, default to sea level)
alt_keys = ['altitude', 'alt', 'elevation', 'elev', 'Altitude', 'Alt', 'Elevation', 'Elev']
alt_value = 0.0 # Default to sea level
for key in alt_keys:
if key in row and row[key].strip():
alt_value = float(row[key])
break
# Find name column (optional)
name_keys = ['name', 'Name', 'NAME', 'site', 'Site', 'SITE', 'station', 'Station', 'STATION']
name_value = f"Location_{row_num}" # Default name
for key in name_keys:
if key in row and row[key].strip():
name_value = row[key].strip()
break
location = {
'name': name_value,
'latitude': lat_value,
'longitude': lon_value,
'altitude': alt_value
}
# Validate coordinates
if not (-90 <= lat_value <= 90):
raise ValueError(f"Invalid latitude {lat_value} in row {row_num}")
if not (-180 <= lon_value <= 180):
raise ValueError(f"Invalid longitude {lon_value} in row {row_num}")
locations.append(location)
except Exception as e:
logger.warning(f"Skipping row {row_num}: {str(e)}")
continue
if not locations:
raise ValueError("No valid locations found in CSV")
logger.info(f"Parsed {len(locations)} locations from CSV")
return locations
def parse_satellites_from_csv_content(self, csv_content: str) -> List[Dict[str, Any]]:
"""Parse satellites from CSV content with TLE data."""
import csv
import io
lines = csv_content.strip().split('\n')
if not lines:
raise ValueError("CSV content is empty")
reader = csv.DictReader(io.StringIO(csv_content))
satellites = []
for row_num, row in enumerate(reader, 1):
try:
satellite = {}
# Find name column
name_keys = ['name', 'Name', 'NAME', 'satellite', 'Satellite', 'SATELLITE', 'sat_name']
name_value = f"Satellite_{row_num}"
for key in name_keys:
if key in row and row[key].strip():
name_value = row[key].strip()
break
# Find TLE line 1
tle1_keys = ['tle_line1', 'tle1', 'line1', 'TLE_LINE1', 'TLE1', 'LINE1']
tle1_value = None
for key in tle1_keys:
if key in row and row[key].strip():
tle1_value = row[key].strip()
break
if not tle1_value:
raise ValueError(f"No TLE line 1 found in row {row_num}")
# Find TLE line 2
tle2_keys = ['tle_line2', 'tle2', 'line2', 'TLE_LINE2', 'TLE2', 'LINE2']
tle2_value = None
for key in tle2_keys:
if key in row and row[key].strip():
tle2_value = row[key].strip()
break
if not tle2_value:
raise ValueError(f"No TLE line 2 found in row {row_num}")
# Validate TLE
tle_validation = self.validate_tle(tle1_value, tle2_value)
if not tle_validation.is_valid:
raise ValueError(f"Invalid TLE: {'; '.join(tle_validation.errors)}")
satellite = {
'name': name_value,
'tle_line1': tle1_value,
'tle_line2': tle2_value
}
satellites.append(satellite)
except Exception as e:
logger.warning(f"Skipping satellite row {row_num}: {str(e)}")
continue
if not satellites:
raise ValueError("No valid satellites found in CSV")
logger.info(f"Parsed {len(satellites)} satellites from CSV")
return satellites
def calculate_bulk_access_windows(
self,
locations_csv: str,
satellites_csv: str,
start_time: datetime,
end_time: datetime,
elevation_threshold: float = 10.0,
time_step_seconds: int = 30
) -> Dict[str, Any]:
"""Calculate access windows for multiple satellites and locations."""
# Parse locations and satellites from CSV content
locations = self.parse_locations_from_csv_content(locations_csv)
satellites = self.parse_satellites_from_csv_content(satellites_csv)
results = {
'summary': {
'total_locations': len(locations),
'total_satellites': len(satellites),
'total_combinations': len(locations) * len(satellites),
'calculation_parameters': {
'start_time': start_time.isoformat(),
'end_time': end_time.isoformat(),
'elevation_threshold': elevation_threshold,
'time_step_seconds': time_step_seconds
}
},
'results': []
}
total_combinations = len(locations) * len(satellites)
current_combination = 0
for location in locations:
for satellite in satellites:
current_combination += 1
logger.info(f"Processing combination {current_combination}/{total_combinations}: {satellite['name']} over {location['name']}")
try:
access_windows = self.calculate_access_windows(
latitude=location['latitude'],
longitude=location['longitude'],
tle_line1=satellite['tle_line1'],
tle_line2=satellite['tle_line2'],
start_time=start_time,
end_time=end_time,
elevation_threshold=elevation_threshold,
time_step_seconds=time_step_seconds
)
# Format windows data
windows_data = []
total_duration = 0.0
max_elevation = 0.0
for window in access_windows:
window_dict = {
"aos_time": window.aos_time.isoformat(),
"los_time": window.los_time.isoformat(),
"culmination_time": window.culmination_time.isoformat(),
"duration_seconds": window.duration_seconds,
"duration_minutes": round(window.duration_seconds / 60.0, 2),
"max_elevation_deg": round(window.max_elevation_deg, 2),
"aos_azimuth_deg": round(window.aos_azimuth_deg, 2),
"los_azimuth_deg": round(window.los_azimuth_deg, 2),
"culmination_azimuth_deg": round(window.culmination_azimuth_deg, 2),
"ground_lighting": window.ground_lighting,
"satellite_lighting": window.satellite_lighting
}
windows_data.append(window_dict)
total_duration += window.duration_seconds
max_elevation = max(max_elevation, window.max_elevation_deg)
combination_result = {
'satellite': satellite,
'location': location,
'access_windows': windows_data,
'summary': {
'total_windows': len(access_windows),
'total_duration_seconds': total_duration,
'total_duration_minutes': round(total_duration / 60.0, 2),
'max_elevation_deg': round(max_elevation, 2)
}
}
results['results'].append(combination_result)
except Exception as e:
logger.error(f"Error calculating windows for {satellite['name']} over {location['name']}: {str(e)}")
# Add error result
combination_result = {
'satellite': satellite,
'location': location,
'access_windows': [],
'error': str(e),
'summary': {
'total_windows': 0,
'total_duration_seconds': 0,
'total_duration_minutes': 0,
'max_elevation_deg': 0
}
}
results['results'].append(combination_result)
# Calculate overall summary statistics
total_windows = sum(r['summary']['total_windows'] for r in results['results'])
total_duration = sum(r['summary']['total_duration_seconds'] for r in results['results'])
results['summary']['total_access_windows'] = total_windows
results['summary']['total_access_duration_seconds'] = total_duration
results['summary']['total_access_duration_minutes'] = round(total_duration / 60.0, 2)
logger.info(f"Bulk calculation completed: {total_windows} total access windows found")
return results
def calculate_access_windows_by_city(
self,
city_name: str,
tle_line1: str,
tle_line2: str,
start_time: datetime,
end_time: datetime,
elevation_threshold: float = 10.0,
time_step_seconds: int = 30
) -> Dict[str, Any]:
"""Calculate satellite access windows for a city by name lookup."""
# Look up city coordinates
city_info = lookup_city(city_name)
if not city_info:
# Try searching for partial matches
search_results = search_cities(city_name, limit=5)
if search_results:
suggested_cities = [city["name"] for city in search_results]
raise ValueError(f"City '{city_name}' not found. Did you mean one of: {', '.join(suggested_cities)}?")
else:
raise ValueError(f"City '{city_name}' not found in database")
# Calculate access windows using city coordinates
access_windows = self.calculate_access_windows(
latitude=city_info["latitude"],
longitude=city_info["longitude"],
tle_line1=tle_line1,
tle_line2=tle_line2,
start_time=start_time,
end_time=end_time,
elevation_threshold=elevation_threshold,
time_step_seconds=time_step_seconds
)
# Format response with city information
windows_data = []
total_duration = 0.0
max_elevation = 0.0
for window in access_windows:
window_dict = {
"aos_time": window.aos_time.isoformat(),
"los_time": window.los_time.isoformat(),
"culmination_time": window.culmination_time.isoformat(),
"duration_seconds": window.duration_seconds,
"duration_minutes": round(window.duration_seconds / 60.0, 2),
"max_elevation_deg": round(window.max_elevation_deg, 2),
"aos_azimuth_deg": round(window.aos_azimuth_deg, 2),
"los_azimuth_deg": round(window.los_azimuth_deg, 2),
"culmination_azimuth_deg": round(window.culmination_azimuth_deg, 2),
"ground_lighting": window.ground_lighting,
"satellite_lighting": window.satellite_lighting
}
windows_data.append(window_dict)
total_duration += window.duration_seconds
max_elevation = max(max_elevation, window.max_elevation_deg)
response = {
"city_info": {
"name": city_info["name"],
"country": city_info["country"],
"latitude": city_info["latitude"],
"longitude": city_info["longitude"],
"altitude": city_info["altitude"],
"type": city_info["type"]
},
"summary": {
"total_windows": len(access_windows),
"total_duration_seconds": total_duration,
"total_duration_minutes": round(total_duration / 60.0, 2),
"max_elevation_deg": round(max_elevation, 2),
"calculation_parameters": {
"start_time": start_time.isoformat(),
"end_time": end_time.isoformat(),
"elevation_threshold": elevation_threshold,
"time_step_seconds": time_step_seconds
}
},
"access_windows": windows_data
}
logger.info(f"Found {len(access_windows)} access windows for {city_info['name']}, {city_info['country']}")
return response
def search_cities_by_name(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
"""Search for cities by name or country."""
return search_cities(query, limit)
def parse_orbital_elements_from_text(self, text: str) -> Dict[str, Any]:
"""Parse natural language text to extract orbital parameters and generate TLE."""
return self.tle_generator.parse_and_generate_tle(text)
def calculate_access_windows_from_orbital_elements(
self,
orbital_text: str,
latitude: float,
longitude: float,
start_time: datetime,
end_time: datetime,
elevation_threshold: float = 10.0,
time_step_seconds: int = 30
) -> Dict[str, Any]:
"""Calculate access windows using orbital elements from natural language text."""
# Parse the orbital elements and generate TLE
orbital_result = self.parse_orbital_elements_from_text(orbital_text)
# Extract TLE lines
tle_line1 = orbital_result['tle']['line1']
tle_line2 = orbital_result['tle']['line2']
# Calculate access windows using the generated TLE
access_windows = self.calculate_access_windows(
latitude=latitude,
longitude=longitude,
tle_line1=tle_line1,
tle_line2=tle_line2,
start_time=start_time,
end_time=end_time,
elevation_threshold=elevation_threshold,
time_step_seconds=time_step_seconds
)
# Format response with orbital elements information
windows_data = []
total_duration = 0.0
max_elevation = 0.0
for window in access_windows:
window_dict = {
"aos_time": window.aos_time.isoformat(),
"los_time": window.los_time.isoformat(),
"culmination_time": window.culmination_time.isoformat(),
"duration_seconds": window.duration_seconds,
"duration_minutes": round(window.duration_seconds / 60.0, 2),
"max_elevation_deg": round(window.max_elevation_deg, 2),
"aos_azimuth_deg": round(window.aos_azimuth_deg, 2),
"los_azimuth_deg": round(window.los_azimuth_deg, 2),
"culmination_azimuth_deg": round(window.culmination_azimuth_deg, 2),
"ground_lighting": window.ground_lighting,
"satellite_lighting": window.satellite_lighting
}
windows_data.append(window_dict)
total_duration += window.duration_seconds
max_elevation = max(max_elevation, window.max_elevation_deg)
response = {
"orbital_request": {
"parsed_text": orbital_result['parsed_text'],
"parsed_parameters": orbital_result['parsed_parameters'],
"generated_tle": orbital_result['tle'],
"orbital_summary": orbital_result['summary']
},
"ground_location": {
"latitude": latitude,
"longitude": longitude
},
"summary": {
"total_windows": len(access_windows),
"total_duration_seconds": total_duration,
"total_duration_minutes": round(total_duration / 60.0, 2),
"max_elevation_deg": round(max_elevation, 2),
"calculation_parameters": {
"start_time": start_time.isoformat(),
"end_time": end_time.isoformat(),
"elevation_threshold": elevation_threshold,
"time_step_seconds": time_step_seconds
}
},
"access_windows": windows_data
}
logger.info(f"Found {len(access_windows)} access windows for orbital elements: {orbital_result['summary']}")
return response
def calculate_access_windows_from_orbital_elements_by_city(
self,
orbital_text: str,
city_name: str,
start_time: datetime,
end_time: datetime,
elevation_threshold: float = 10.0,
time_step_seconds: int = 30
) -> Dict[str, Any]:
"""Calculate access windows using orbital elements and city lookup."""
# Look up city coordinates
city_info = lookup_city(city_name)
if not city_info:
# Try searching for partial matches
search_results = search_cities(city_name, limit=5)
if search_results:
suggested_cities = [city["name"] for city in search_results]
raise ValueError(f"City '{city_name}' not found. Did you mean one of: {', '.join(suggested_cities)}?")
else:
raise ValueError(f"City '{city_name}' not found in database")
# Calculate access windows using orbital elements
result = self.calculate_access_windows_from_orbital_elements(
orbital_text=orbital_text,
latitude=city_info["latitude"],
longitude=city_info["longitude"],
start_time=start_time,
end_time=end_time,
elevation_threshold=elevation_threshold,
time_step_seconds=time_step_seconds
)
# Add city information to the response
result["city_info"] = {
"name": city_info["name"],
"country": city_info["country"],
"latitude": city_info["latitude"],
"longitude": city_info["longitude"],
"altitude": city_info["altitude"],
"type": city_info["type"]
}
# Update ground location to use city info
result["ground_location"] = result["city_info"]
logger.info(f"Found {len(result['access_windows'])} access windows for orbital elements over {city_info['name']}, {city_info['country']}")
return result
def get_orbit_types(self) -> Dict[str, Any]:
"""Get available orbit types and their definitions."""
return {
"orbit_types": ORBIT_TYPES,
"description": "Available orbit types for TLE generation from orbital elements"
}
def _earth_occlusion_check(self, pos1_km: np.ndarray, pos2_km: np.ndarray) -> bool:
"""Check if Earth blocks line of sight between two 3D positions.
Args:
pos1_km: First satellite position [x, y, z] in km
pos2_km: Second satellite position [x, y, z] in km
Returns:
True if Earth blocks line of sight, False if clear
"""
earth_radius_km = 6371.0 # Mean Earth radius
# Vector from pos1 to pos2
direction = pos2_km - pos1_km
distance = np.linalg.norm(direction)
if distance < 1e-6: # Same position
return False
direction_unit = direction / distance
# Find closest point on line to Earth center (origin)
# t is parameter along line: point = pos1 + t * direction_unit
t_closest = -np.dot(pos1_km, direction_unit)
# Clamp t to line segment
t_closest = max(0, min(distance, t_closest))
# Closest point on line segment to Earth center
closest_point = pos1_km + t_closest * direction_unit
closest_distance = np.linalg.norm(closest_point)
# If closest point is inside Earth, line is occluded
return closest_distance < earth_radius_km
def calculate_satellite_to_satellite_access_windows(
self,
satellite_tles: List[Dict[str, str]],
start_time: datetime,
end_time: datetime,
min_separation_deg: float = 0.0,
time_step_seconds: int = 30
) -> Dict[str, Any]:
"""Calculate access windows between satellites.
Args:
satellite_tles: List of dicts with 'name', 'tle_line1', 'tle_line2'
start_time: Start time for calculations
end_time: End time for calculations
min_separation_deg: Minimum angular separation (0 = just line of sight)
time_step_seconds: Time step for calculations
Returns:
Dict with satellite pair access windows
"""
ts = load.timescale()
start_ts = ts.from_datetime(start_time.replace(tzinfo=timezone.utc))
end_ts = ts.from_datetime(end_time.replace(tzinfo=timezone.utc))
# Create satellite objects
satellites = []
for sat_data in satellite_tles:
try:
satellite = EarthSatellite(
sat_data['tle_line1'],
sat_data['tle_line2'],
name=sat_data.get('name', 'Unknown'),
ts=ts
)
satellites.append({'sat': satellite, 'name': sat_data.get('name', 'Unknown')})
except Exception as e:
logger.error(f"Failed to create satellite from TLE: {e}")
continue
if len(satellites) < 2:
return {
"error": "Need at least 2 valid satellites for inter-satellite calculations",
"valid_satellites": len(satellites)
}
# Calculate all pairwise access windows
all_results = []
for i in range(len(satellites)):
for j in range(i + 1, len(satellites)):
sat1 = satellites[i]
sat2 = satellites[j]
logger.info(f"Calculating access windows between {sat1['name']} and {sat2['name']}")
# Generate time array
duration_seconds = (end_time - start_time).total_seconds()
num_steps = int(duration_seconds / time_step_seconds) + 1
times = start_ts + np.linspace(0, duration_seconds / 86400, num_steps)
# Calculate positions for both satellites
pos1 = sat1['sat'].at(times)
pos2 = sat2['sat'].at(times)
# Get positions in km relative to Earth center
pos1_km = pos1.position.km
pos2_km = pos2.position.km
# Calculate separations and check occlusion
access_periods = []
in_access = False
access_start = None
for idx, time in enumerate(times):
# Check Earth occlusion
occluded = self._earth_occlusion_check(pos1_km[:, idx], pos2_km[:, idx])
# Calculate angular separation
# Vector from sat1 to sat2
sep_vector = pos2_km[:, idx] - pos1_km[:, idx]
sep_distance = np.linalg.norm(sep_vector)
# Angular separation as seen from sat1
sat1_distance = np.linalg.norm(pos1_km[:, idx])
if sat1_distance > 0 and sep_distance > 0:
# Angle between sat1 position vector and separation vector
cos_angle = np.dot(pos1_km[:, idx], sep_vector) / (sat1_distance * sep_distance)
cos_angle = np.clip(cos_angle, -1, 1)
separation_deg = np.degrees(np.arccos(np.abs(cos_angle)))
else:
separation_deg = 0
# Check if in access (not occluded and meets separation requirement)
has_access = not occluded and separation_deg >= min_separation_deg
if has_access and not in_access:
# Access start
access_start = time
in_access = True
elif not has_access and in_access:
# Access end
if access_start is not None:
access_periods.append({
'start_time': access_start,
'end_time': times[idx - 1] if idx > 0 else time,
'start_idx': len(access_periods),
'end_idx': idx - 1
})
in_access = False
access_start = None
# Handle case where access period extends to end
if in_access and access_start is not None:
access_periods.append({
'start_time': access_start,
'end_time': times[-1],
'start_idx': len(access_periods),
'end_idx': len(times) - 1
})
# Format access windows
access_windows = []
for period in access_periods:
start_idx = max(0, period['start_idx'])
end_idx = min(len(times) - 1, period['end_idx'])
if start_idx < len(times) and end_idx < len(times):
# Calculate statistics for this window
window_indices = range(start_idx, end_idx + 1)
separations = []
for idx in window_indices:
sep_vector = pos2_km[:, idx] - pos1_km[:, idx]
sep_distance = np.linalg.norm(sep_vector)
sat1_distance = np.linalg.norm(pos1_km[:, idx])
if sat1_distance > 0 and sep_distance > 0:
cos_angle = np.dot(pos1_km[:, idx], sep_vector) / (sat1_distance * sep_distance)
cos_angle = np.clip(cos_angle, -1, 1)
separation_deg = np.degrees(np.arccos(np.abs(cos_angle)))
separations.append(separation_deg)
if separations:
min_sep = min(separations)
max_sep = max(separations)
else:
min_sep = max_sep = 0
# Calculate relative velocity at midpoint
mid_idx = (start_idx + end_idx) // 2
if mid_idx < len(times) - 1:
dt = (times[mid_idx + 1].tt - times[mid_idx].tt) * 86400 # seconds
if dt > 0:
vel1 = (pos1_km[:, mid_idx + 1] - pos1_km[:, mid_idx]) / dt
vel2 = (pos2_km[:, mid_idx + 1] - pos2_km[:, mid_idx]) / dt
rel_velocity = np.linalg.norm(vel2 - vel1)
else:
rel_velocity = 0
else:
rel_velocity = 0
duration_seconds = (period['end_time'].tt - period['start_time'].tt) * 86400
access_windows.append({
"aos_time": period['start_time'].utc_iso(),
"los_time": period['end_time'].utc_iso(),
"duration_seconds": duration_seconds,
"duration_minutes": duration_seconds / 60,
"min_separation_deg": round(min_sep, 2),
"max_separation_deg": round(max_sep, 2),
"relative_velocity_km_s": round(rel_velocity, 2)
})
pair_result = {
"satellite_pair": {
"satellite_1": sat1['name'],
"satellite_2": sat2['name']
},
"summary": {
"total_windows": len(access_windows),
"total_duration_minutes": sum(w["duration_minutes"] for w in access_windows),
"calculation_parameters": {
"min_separation_deg": min_separation_deg,
"time_step_seconds": time_step_seconds
}
},
"access_windows": access_windows
}
all_results.append(pair_result)
logger.info(f"Found {len(access_windows)} access windows between {sat1['name']} and {sat2['name']}")
return {
"calculation_info": {
"start_time": start_time.isoformat(),
"end_time": end_time.isoformat(),
"total_satellites": len(satellites),
"total_pairs": len(all_results)
},
"satellite_pairs": all_results
}