Skip to main content
Glama
fits_converter.py21.6 kB
""" FITS Converter Converts saved astronomical data files to FITS format using astropy. Supports catalogs, spectra, images, and other astronomical data modes. """ import json import logging import hashlib from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, Union import numpy as np import pandas as pd from astropy.io import fits from astropy.table import Table from astropy import units as u from astropy.coordinates import SkyCoord logger = logging.getLogger(__name__) class FITSConverter: """ Converts astronomical data files to FITS format with proper headers and structure. Supports: - Catalog data (as FITS tables) - Spectral data (as FITS arrays with WCS) - Image data (as FITS images) - Generic tabular data """ def __init__(self, registry: Dict[str, Any]): """ Initialize FITS converter with file registry. Args: registry: File registry containing metadata for all saved files """ self.registry = registry def convert_to_fits( self, identifier: str, output_file: str = None, data_type: str = 'auto', preserve_metadata: bool = True ) -> Dict[str, Any]: """ Convert a saved data file to FITS format. Args: identifier: File ID or filename to convert output_file: Custom output filename (auto-generated if None) data_type: Type of data ('catalog', 'spectrum', 'image', 'auto') preserve_metadata: Include original metadata in FITS headers Returns: Dict with conversion result and file information """ # Find the source file file_info = self._find_file(identifier) if file_info['status'] == 'error': return file_info source_path = Path(file_info['metadata']['filename']) if not source_path.exists(): return { 'status': 'error', 'error': f"Source file not found: {source_path}" } # Load source data try: data = self._load_source_data(source_path, file_info['metadata']['file_type']) except Exception as e: return { 'status': 'error', 'error': f"Failed to load source data: {str(e)}" } # Auto-detect data type if needed if data_type == 'auto': data_type = self._detect_data_type(data, file_info['metadata']) # Generate output filename if needed if output_file is None: base_name = source_path.stem output_file = f"{base_name}.fits" # Ensure .fits extension if not output_file.endswith('.fits'): output_file = f"{output_file}.fits" # Convert to FITS based on data type try: if data_type == 'catalog': result = self._convert_catalog_to_fits( data, output_file, file_info['metadata'], preserve_metadata ) elif data_type == 'spectrum': result = self._convert_spectrum_to_fits( data, output_file, file_info['metadata'], preserve_metadata ) elif data_type == 'image': result = self._convert_image_to_fits( data, output_file, file_info['metadata'], preserve_metadata ) else: result = self._convert_generic_to_fits( data, output_file, file_info['metadata'], preserve_metadata ) return result except Exception as e: return { 'status': 'error', 'error': f"FITS conversion failed: {str(e)}" } def _find_file(self, identifier: str) -> Dict[str, Any]: """Find file by ID or filename.""" file_record = None # Check if identifier is a file ID if identifier in self.registry['files']: file_record = self.registry['files'][identifier] else: # Search by filename for fid, record in self.registry['files'].items(): if record['filename'] == identifier or Path(record['filename']).name == identifier: file_record = record break if not file_record: return { 'status': 'error', 'error': f"File not found in registry: {identifier}" } return { 'status': 'success', 'metadata': file_record } def _load_source_data(self, file_path: Path, file_type: str) -> Any: """Load data from source file.""" if file_type == 'json': with open(file_path, 'r') as f: return json.load(f) elif file_type == 'csv': return pd.read_csv(file_path) elif file_type == 'npy': return np.load(file_path, allow_pickle=True) else: raise ValueError(f"Unsupported source file type: {file_type}") def _detect_data_type(self, data: Any, metadata: Dict) -> str: """Auto-detect the type of astronomical data.""" source = metadata.get('source', '').lower() description = metadata.get('description', '').lower() # Check for spectral data indicators if any(keyword in description for keyword in ['spectrum', 'flux', 'wavelength']): return 'spectrum' # Check for catalog data indicators if any(keyword in description for keyword in ['catalog', 'search', 'objects', 'sources']): return 'catalog' # Check for image data indicators if any(keyword in description for keyword in ['image', 'map', 'fits']): return 'image' # Check data structure for spectral data if isinstance(data, dict): if any(key in data for key in ['wavelength', 'flux', 'wave', 'spec']): return 'spectrum' if any(key in data for key in ['ra', 'dec', 'redshift', 'magnitude']): return 'catalog' # Check for tabular data if isinstance(data, (list, pd.DataFrame)) or (isinstance(data, dict) and 'results' in data): return 'catalog' # Default to generic return 'generic' def _convert_catalog_to_fits( self, data: Any, output_file: str, metadata: Dict, preserve_metadata: bool ) -> Dict[str, Any]: """Convert catalog data to FITS table.""" # Extract tabular data if isinstance(data, dict): if 'results' in data: table_data = data['results'] else: table_data = data elif isinstance(data, pd.DataFrame): table_data = data.to_dict('records') elif isinstance(data, list): table_data = data else: raise ValueError("Cannot extract tabular data from source") # Convert to astropy Table if isinstance(table_data, list) and len(table_data) > 0: # Create table from list of dictionaries table = Table(table_data) elif isinstance(table_data, pd.DataFrame): table = Table.from_pandas(table_data) else: raise ValueError("Unsupported table data format") # Add coordinate columns as SkyCoord if RA/Dec present if 'ra' in table.colnames and 'dec' in table.colnames: # Add coordinate metadata table['ra'].unit = u.deg table['dec'].unit = u.deg table['ra'].description = 'Right Ascension (J2000)' table['dec'].description = 'Declination (J2000)' # Add redshift units if present if 'redshift' in table.colnames: table['redshift'].description = 'Spectroscopic redshift' # Create primary HDU primary_hdu = fits.PrimaryHDU() # Add metadata to header if requested if preserve_metadata: primary_hdu.header['SOURCE'] = metadata.get('source', 'unknown').upper() primary_hdu.header['ORIGIN'] = 'Astro MCP Server' primary_hdu.header['DATE'] = datetime.now().isoformat() primary_hdu.header['COMMENT'] = metadata.get('description', 'Astronomical catalog data') # Add original file info primary_hdu.header['ORIGFILE'] = Path(metadata['filename']).name primary_hdu.header['ORIGTYPE'] = metadata['file_type'].upper() # Create table HDU table_hdu = fits.BinTableHDU(table, name='CATALOG') # Create HDU list and save hdul = fits.HDUList([primary_hdu, table_hdu]) output_path = Path(metadata['filename']).parent / output_file hdul.writeto(output_path, overwrite=True) # Register the FITS file in the registry conversion_info = { 'n_objects': len(table), 'columns': list(table.colnames) } file_id = self._register_fits_file(output_path, 'catalog', metadata, conversion_info) return { 'status': 'success', 'output_file': str(output_path), 'data_type': 'catalog', 'n_objects': len(table), 'columns': table.colnames, 'size_bytes': output_path.stat().st_size, 'file_id': file_id } def _convert_spectrum_to_fits( self, data: Any, output_file: str, metadata: Dict, preserve_metadata: bool ) -> Dict[str, Any]: """Convert spectral data to FITS with proper WCS.""" # Extract spectral arrays if isinstance(data, dict): wavelength = np.array(data.get('wavelength', data.get('wave', []))) flux = np.array(data.get('flux', data.get('spectrum', []))) flux_err = np.array(data.get('flux_err', data.get('error', []))) # Get metadata spec_metadata = data.get('metadata', {}) else: raise ValueError("Spectrum data must be a dictionary with wavelength/flux arrays") if len(wavelength) == 0 or len(flux) == 0: raise ValueError("Missing wavelength or flux data") # Create primary HDU with flux data primary_hdu = fits.PrimaryHDU(flux) # Add spectral WCS (World Coordinate System) if len(wavelength) > 1: # Calculate wavelength step wave_step = np.median(np.diff(wavelength)) wave_start = wavelength[0] # Add WCS keywords for wavelength axis primary_hdu.header['CTYPE1'] = 'WAVE' primary_hdu.header['CUNIT1'] = 'Angstrom' primary_hdu.header['CRVAL1'] = wave_start primary_hdu.header['CRPIX1'] = 1.0 primary_hdu.header['CDELT1'] = wave_step primary_hdu.header['CNAME1'] = 'Wavelength' # Add flux metadata primary_hdu.header['BUNIT'] = '10^-17 erg/s/cm^2/Angstrom' primary_hdu.header['BTYPE'] = 'Flux density' # Add metadata if requested if preserve_metadata: primary_hdu.header['SOURCE'] = metadata.get('source', 'unknown').upper() primary_hdu.header['ORIGIN'] = 'Astro MCP Server' primary_hdu.header['DATE'] = datetime.now().isoformat() # Add spectral metadata if spec_metadata: for key, value in spec_metadata.items(): if isinstance(value, (int, float, str)) and len(str(value)) < 68: try: primary_hdu.header[key.upper()[:8]] = value except: pass # Skip problematic headers # Create HDU list hdus = [primary_hdu] # Add wavelength and error as extensions if present if len(wavelength) > 0: wave_hdu = fits.ImageHDU(wavelength, name='WAVELENGTH') wave_hdu.header['BUNIT'] = 'Angstrom' hdus.append(wave_hdu) if len(flux_err) > 0: err_hdu = fits.ImageHDU(flux_err, name='FLUX_ERR') err_hdu.header['BUNIT'] = '10^-17 erg/s/cm^2/Angstrom' hdus.append(err_hdu) hdul = fits.HDUList(hdus) # Save file output_path = Path(metadata['filename']).parent / output_file hdul.writeto(output_path, overwrite=True) # Register the FITS file in the registry conversion_info = { 'n_pixels': len(flux), 'wavelength_range': [float(wavelength.min()), float(wavelength.max())] if len(wavelength) > 0 else None, 'extensions': ['PRIMARY', 'WAVELENGTH'] + (['FLUX_ERR'] if len(flux_err) > 0 else []) } file_id = self._register_fits_file(output_path, 'spectrum', metadata, conversion_info) return { 'status': 'success', 'output_file': str(output_path), 'data_type': 'spectrum', 'n_pixels': len(flux), 'wavelength_range': [float(wavelength.min()), float(wavelength.max())] if len(wavelength) > 0 else None, 'extensions': ['PRIMARY', 'WAVELENGTH'] + (['FLUX_ERR'] if len(flux_err) > 0 else []), 'size_bytes': output_path.stat().st_size, 'file_id': file_id } def _convert_image_to_fits( self, data: Any, output_file: str, metadata: Dict, preserve_metadata: bool ) -> Dict[str, Any]: """Convert image data to FITS.""" # Extract image array if isinstance(data, dict): if 'image' in data: image_data = np.array(data['image']) elif 'data' in data: image_data = np.array(data['data']) else: # Try to find a 2D array in the data for key, value in data.items(): if isinstance(value, (list, np.ndarray)): arr = np.array(value) if arr.ndim >= 2: image_data = arr break else: raise ValueError("No image data found") elif isinstance(data, (list, np.ndarray)): image_data = np.array(data) else: raise ValueError("Cannot extract image data") # Create primary HDU primary_hdu = fits.PrimaryHDU(image_data) # Add basic image metadata primary_hdu.header['BTYPE'] = 'Intensity' # Add metadata if requested if preserve_metadata: primary_hdu.header['SOURCE'] = metadata.get('source', 'unknown').upper() primary_hdu.header['ORIGIN'] = 'Astro MCP Server' primary_hdu.header['DATE'] = datetime.now().isoformat() primary_hdu.header['COMMENT'] = metadata.get('description', 'Astronomical image data') # Save file output_path = Path(metadata['filename']).parent / output_file primary_hdu.writeto(output_path, overwrite=True) # Register the FITS file in the registry conversion_info = { 'dimensions': image_data.shape } file_id = self._register_fits_file(output_path, 'image', metadata, conversion_info) return { 'status': 'success', 'output_file': str(output_path), 'data_type': 'image', 'dimensions': image_data.shape, 'size_bytes': output_path.stat().st_size, 'file_id': file_id } def _convert_generic_to_fits( self, data: Any, output_file: str, metadata: Dict, preserve_metadata: bool ) -> Dict[str, Any]: """Convert generic data to FITS format.""" # Try to convert to table format first try: if isinstance(data, dict): # Convert dict to table if possible table_data = [] for key, value in data.items(): if isinstance(value, (list, np.ndarray)): table_data.append({'parameter': key, 'value': str(value)}) else: table_data.append({'parameter': key, 'value': str(value)}) table = Table(table_data) # Create HDU primary_hdu = fits.PrimaryHDU() table_hdu = fits.BinTableHDU(table, name='DATA') hdul = fits.HDUList([primary_hdu, table_hdu]) else: # Try to save as array arr = np.array(data) primary_hdu = fits.PrimaryHDU(arr) hdul = fits.HDUList([primary_hdu]) # Add metadata if requested if preserve_metadata: hdul[0].header['SOURCE'] = metadata.get('source', 'unknown').upper() hdul[0].header['ORIGIN'] = 'Astro MCP Server' hdul[0].header['DATE'] = datetime.now().isoformat() hdul[0].header['COMMENT'] = metadata.get('description', 'Generic astronomical data') # Save file output_path = Path(metadata['filename']).parent / output_file hdul.writeto(output_path, overwrite=True) # Register the FITS file in the registry conversion_info = {} file_id = self._register_fits_file(output_path, 'generic', metadata, conversion_info) return { 'status': 'success', 'output_file': str(output_path), 'data_type': 'generic', 'size_bytes': output_path.stat().st_size, 'file_id': file_id } except Exception as e: raise ValueError(f"Cannot convert generic data to FITS: {str(e)}") def _register_fits_file( self, output_path: Path, data_type: str, source_metadata: Dict, conversion_info: Dict ) -> str: """ Register the created FITS file in the registry for discovery and preview. Args: output_path: Path to the created FITS file data_type: Type of data ('catalog', 'spectrum', 'image', 'generic') source_metadata: Metadata from the original file conversion_info: Additional information about the conversion Returns: File ID for the registered FITS file """ file_size = output_path.stat().st_size # Generate unique file ID file_id = hashlib.md5(f"{output_path}_{datetime.now().isoformat()}".encode()).hexdigest()[:12] # Create description source_file = Path(source_metadata['filename']).name description = f"FITS {data_type} converted from {source_file} ({source_metadata['file_type'].upper()})" # Create file record file_record = { 'id': file_id, 'filename': str(output_path), 'file_type': 'fits', 'source': source_metadata.get('source', 'unknown'), 'size_bytes': file_size, 'created': datetime.now().isoformat(), 'description': description, 'metadata': { 'data_type': data_type, 'original_file': source_metadata['filename'], 'original_type': source_metadata['file_type'], 'conversion_date': datetime.now().isoformat(), **conversion_info } } # Update registry self.registry['files'][file_id] = file_record self.registry['statistics']['total_files'] += 1 self.registry['statistics']['total_size_bytes'] += file_size # Update by_type stats if 'fits' not in self.registry['statistics']['by_type']: self.registry['statistics']['by_type']['fits'] = 0 self.registry['statistics']['by_type']['fits'] += 1 # Update by_source stats source_name = source_metadata.get('source', 'unknown') if source_name not in self.registry['statistics']['by_source']: self.registry['statistics']['by_source'][source_name] = 0 self.registry['statistics']['by_source'][source_name] += 1 # Save registry (assuming registry is managed by parent class) self._save_registry() return file_id def _save_registry(self): """Save file registry to disk.""" # Get the base directory from any existing file base_dir = None if self.registry['files']: first_file = next(iter(self.registry['files'].values())) base_dir = Path(first_file['filename']).parent.parent if base_dir: registry_path = base_dir / 'file_registry.json' with open(registry_path, 'w') as f: json.dump(self.registry, f, indent=2)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/SandyYuan/astro_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server