Skip to main content
Glama
preview.py22.4 kB
""" Data Preview Module Provides file structure analysis and preview functionality for astronomical data files. """ import json from pathlib import Path from typing import Any, Dict import pandas as pd # Optional imports for additional file types try: from astropy.io import fits ASTROPY_AVAILABLE = True except ImportError: ASTROPY_AVAILABLE = False try: import h5py H5PY_AVAILABLE = True except ImportError: H5PY_AVAILABLE = False def get_json_structure(data, max_depth=3, current_depth=0): """ Get structure of JSON data without values for preview purposes. Args: data: JSON data to analyze max_depth: Maximum depth to traverse current_depth: Current traversal depth Returns: String representation of the data structure """ if current_depth >= max_depth: return "..." if isinstance(data, dict): structure = "{\n" for key in list(data.keys())[:10]: # Show first 10 keys indent = " " * (current_depth + 1) value_type = type(data[key]).__name__ if isinstance(data[key], (dict, list)): sub_structure = get_json_structure(data[key], max_depth, current_depth + 1) structure += f"{indent}{key}: {sub_structure}\n" else: structure += f"{indent}{key}: <{value_type}>\n" if len(data) > 10: structure += f"{indent}... ({len(data) - 10} more keys)\n" structure += " " * current_depth + "}" return structure elif isinstance(data, list): if len(data) == 0: return "[]" first_item_type = type(data[0]).__name__ if isinstance(data[0], (dict, list)): sub_structure = get_json_structure(data[0], max_depth, current_depth + 1) return f"[{len(data)} items of {sub_structure}]" else: return f"[{len(data)} items of <{first_item_type}>]" else: return f"<{type(data).__name__}>" class DataPreviewManager: """ Manager class for previewing astronomical data files. Provides structured preview of various file formats with metadata and loading code examples. """ def __init__(self, registry: Dict[str, Any]): """ Initialize with file registry. Args: registry: File registry dictionary from data source """ self.registry = registry def preview_file(self, identifier: str, preview_rows: int = 10) -> str: """ Generate comprehensive preview of a data file. Args: identifier: File ID or filename to preview preview_rows: Number of rows to show in preview Returns: Formatted preview string with metadata, structure, and loading examples """ # Find file record file_record = None # Check if identifier is a file ID if identifier in self.registry['files']: file_record = self.registry['files'][identifier] else: # Search by filename for fid, record in self.registry['files'].items(): if record['filename'] == identifier or Path(record['filename']).name == identifier: file_record = record break if not file_record: return f"Error: File not found: {identifier}" filename = file_record['filename'] filepath = Path(filename) if not filepath.exists(): return f"Error: File no longer exists: {filepath}" # Build response with metadata response = f""" DATA FILE PREVIEW ================ Filename: {filepath.name} Full Path: {filename} File Type: {file_record['file_type']} Source: {file_record.get('source', 'unknown')} Size: {file_record['size_bytes']:,} bytes ({file_record['size_bytes']/1024/1024:.1f} MB) Created: {file_record['created']} METADATA: """ # Add any stored metadata for key, value in file_record.get('metadata', {}).items(): response += f" {key}: {value}\n" # Preview based on file type file_type = file_record['file_type'] if file_type == 'json': response += self._preview_json(filepath, preview_rows) elif file_type == 'csv': response += self._preview_csv(filepath, preview_rows) elif file_type == 'fits' and ASTROPY_AVAILABLE: response += self._preview_fits(filepath) elif file_type == 'hdf5' and H5PY_AVAILABLE: response += self._preview_hdf5(filepath) else: response += f"\nPreview not available for file type: {file_type}\n" # Add loading code examples response += self._generate_loading_examples(filepath, file_type) return response def _preview_json(self, filepath: Path, preview_rows: int) -> str: """Preview JSON file structure.""" try: with open(filepath, 'r') as f: data = json.load(f) response = "\nJSON STRUCTURE PREVIEW:\n" response += get_json_structure(data, max_depth=3) # If it's array data, show first few items if isinstance(data, list) and len(data) > 0: response += f"\nFirst {min(preview_rows, len(data))} items:\n" for i, item in enumerate(data[:preview_rows]): response += f" [{i}]: {json.dumps(item, indent=2)[:200]}...\n" # If it has 'results' key (common pattern) elif isinstance(data, dict) and 'results' in data: results = data['results'] response += f"\nTotal results: {len(results)}\n" if len(results) > 0: response += f"First result structure:\n{json.dumps(results[0], indent=2)[:500]}...\n" return response except Exception as e: return f"\nError previewing JSON: {e}\n" def _preview_csv(self, filepath: Path, preview_rows: int) -> str: """Preview CSV file structure.""" try: # Use pandas to read just first rows df_preview = pd.read_csv(filepath, nrows=preview_rows) df_info = pd.read_csv(filepath, nrows=0) # Just headers for dtype info response = "\nCSV PREVIEW:\n" response += f"Shape: {sum(1 for _ in open(filepath))-1} rows × {len(df_info.columns)} columns\n\n" response += "COLUMNS:\n" for col in df_info.columns: dtype = df_preview[col].dtype response += f" - {col}: {dtype}\n" response += f"\nFIRST {preview_rows} ROWS:\n" response += df_preview.to_string(max_cols=10) return response except Exception as e: return f"\nError previewing CSV: {e}\n" def _preview_fits(self, filepath: Path) -> str: """Preview FITS file structure with detailed astronomical data analysis.""" try: response = "\nFITS FILE ANALYSIS:\n" response += "==================\n" with fits.open(filepath) as hdul: response += f"Number of HDUs: {len(hdul)}\n\n" # Analyze each HDU for i, hdu in enumerate(hdul): hdu_name = hdu.name or 'PRIMARY' response += f"HDU {i}: {hdu_name}\n" response += f"{'─' * (len(hdu_name) + 7)}\n" response += f"Type: {type(hdu).__name__}\n" # Header analysis if hdu.header: # Key FITS keywords important_keys = { 'SIMPLE': 'FITS format', 'BITPIX': 'Data type', 'NAXIS': 'Number of axes', 'NAXIS1': 'First axis size', 'NAXIS2': 'Second axis size', 'NAXIS3': 'Third axis size', 'OBJECT': 'Object name', 'DATE-OBS': 'Observation date', 'DATE': 'File creation date', 'ORIGIN': 'Data origin', 'SOURCE': 'Data source', 'BTYPE': 'Data type', 'BUNIT': 'Data units', 'COMMENT': 'Comments' } # WCS (World Coordinate System) keywords for spectra wcs_keys = { 'CTYPE1': 'Coordinate type', 'CUNIT1': 'Coordinate unit', 'CRVAL1': 'Reference value', 'CRPIX1': 'Reference pixel', 'CDELT1': 'Coordinate step', 'CNAME1': 'Coordinate name' } # Show important headers found_headers = {} for key, desc in {**important_keys, **wcs_keys}.items(): if key in hdu.header: found_headers[key] = (hdu.header[key], desc) if found_headers: response += "Headers:\n" for key, (value, desc) in found_headers.items(): response += f" {key}: {value} ({desc})\n" # Data analysis if hasattr(hdu, 'data') and hdu.data is not None: data = hdu.data response += f"Data shape: {data.shape}\n" response += f"Data type: {data.dtype}\n" # Statistics for numerical data if hasattr(data, 'min') and hasattr(data, 'max'): # Handle masked arrays if hasattr(data, 'compressed'): valid_data = data.compressed() if len(valid_data) > 0: response += f"Data range: {valid_data.min():.3e} to {valid_data.max():.3e}\n" response += f"Valid pixels: {len(valid_data):,} / {data.size:,}\n" else: response += f"Data range: {data.min():.3e} to {data.max():.3e}\n" # Detect data type by HDU structure and content data_type = self._detect_fits_data_type(hdu, hdul) if data_type: response += f"Detected type: {data_type}\n" # Type-specific analysis if isinstance(hdu, fits.BinTableHDU): response += self._analyze_fits_table(hdu) elif isinstance(hdu, fits.ImageHDU) or isinstance(hdu, fits.PrimaryHDU): response += self._analyze_fits_image_or_spectrum(hdu) response += "\n" # Overall file analysis response += self._analyze_fits_overall_structure(hdul) return response except Exception as e: return f"\nError previewing FITS: {e}\n" def _detect_fits_data_type(self, hdu, hdul): """Detect the type of astronomical data in the FITS file.""" # Check HDU name and structure hdu_name = (hdu.name or '').upper() if hdu_name == 'CATALOG' or 'CATALOG' in hdu_name: return "Astronomical catalog" elif hdu_name == 'WAVELENGTH' or any(name in ['WAVELENGTH', 'FLUX_ERR'] for name in [h.name for h in hdul]): return "Spectrum with extensions" elif isinstance(hdu, fits.BinTableHDU) and hasattr(hdu, 'data') and hdu.data is not None: # Check column names for astronomical indicators if hasattr(hdu, 'columns'): col_names = [col.name.lower() for col in hdu.columns] if any(name in col_names for name in ['ra', 'dec', 'redshift']): return "Astronomical catalog (table)" elif isinstance(hdu, (fits.ImageHDU, fits.PrimaryHDU)) and hasattr(hdu, 'data') and hdu.data is not None: # Check WCS keywords for spectrum if 'CTYPE1' in hdu.header and hdu.header['CTYPE1'] == 'WAVE': return "1D spectrum" elif len(hdu.data.shape) == 2: return "2D image" elif len(hdu.data.shape) == 1: return "1D array (possibly spectrum)" return None def _analyze_fits_table(self, hdu): """Analyze FITS binary table data.""" response = "" if hasattr(hdu, 'data') and hdu.data is not None: data = hdu.data response += f"Table rows: {len(data):,}\n" if hasattr(hdu, 'columns'): response += f"Columns ({len(hdu.columns)}):\n" for col in hdu.columns[:15]: # Show first 15 columns col_info = f" - {col.name}: {col.format}" if col.unit: col_info += f" [{col.unit}]" if hasattr(col, 'disp') and col.disp: col_info += f" (display: {col.disp})" response += col_info + "\n" if len(hdu.columns) > 15: response += f" ... and {len(hdu.columns) - 15} more columns\n" # Show sample data for key astronomical columns astro_cols = ['ra', 'dec', 'redshift', 'spectype', 'targetid', 'sparcl_id'] sample_cols = [col.name for col in hdu.columns if col.name.lower() in astro_cols] if sample_cols: response += "Sample data (first 3 rows):\n" for i in range(min(3, len(data))): row_data = [] for col_name in sample_cols[:5]: # Show up to 5 columns try: value = data[col_name][i] if isinstance(value, float): row_data.append(f"{col_name}={value:.4f}") else: row_data.append(f"{col_name}={value}") except: row_data.append(f"{col_name}=N/A") response += f" [{i}]: {', '.join(row_data)}\n" return response def _analyze_fits_image_or_spectrum(self, hdu): """Analyze FITS image or spectrum data.""" response = "" if hasattr(hdu, 'data') and hdu.data is not None: data = hdu.data # Check for WCS (spectrum) information if 'CTYPE1' in hdu.header and hdu.header['CTYPE1'] == 'WAVE': response += "SPECTRAL WCS DETECTED:\n" if 'CRVAL1' in hdu.header and 'CDELT1' in hdu.header: wave_start = hdu.header['CRVAL1'] wave_step = hdu.header['CDELT1'] wave_end = wave_start + (len(data) - 1) * wave_step unit = hdu.header.get('CUNIT1', 'unknown') response += f" Wavelength range: {wave_start:.1f} to {wave_end:.1f} {unit}\n" response += f" Wavelength step: {wave_step:.3f} {unit}\n" response += f" Number of pixels: {len(data):,}\n" if 'BUNIT' in hdu.header: response += f" Flux units: {hdu.header['BUNIT']}\n" # For 2D images elif len(data.shape) == 2: response += f"Image dimensions: {data.shape[1]} × {data.shape[0]} pixels\n" # Show data sample for 1D arrays (spectra) if len(data.shape) == 1 and len(data) > 0: response += "Data sample (first 10 values):\n" sample_values = data[:10] response += " " + ", ".join([f"{val:.3e}" for val in sample_values]) + "\n" return response def _analyze_fits_overall_structure(self, hdul): """Analyze overall FITS file structure to identify common patterns.""" response = "OVERALL STRUCTURE ANALYSIS:\n" response += "=" * 28 + "\n" # Check for common astronomical patterns hdu_names = [hdu.name or 'PRIMARY' for hdu in hdul] if 'CATALOG' in hdu_names: response += "✓ This appears to be an ASTRONOMICAL CATALOG file\n" response += " - Contains tabular data with astronomical objects\n" response += " - Likely converted from search results or database query\n" elif any(name in hdu_names for name in ['WAVELENGTH', 'FLUX_ERR']): response += "✓ This appears to be a SPECTRUM file\n" response += " - Contains flux data with wavelength information\n" response += " - May include error arrays for analysis\n" elif len(hdul) == 1 and len(hdul[0].data.shape) == 2: response += "✓ This appears to be an IMAGE file\n" response += " - Contains 2D astronomical image data\n" elif len(hdul) == 1 and len(hdul[0].data.shape) == 1: response += "✓ This appears to be a 1D DATA ARRAY\n" response += " - May be spectrum, time series, or other 1D data\n" # Check for astropy MCP origin if any('Astro MCP Server' in str(hdu.header.get('ORIGIN', '')) for hdu in hdul): response += "✓ Created by Astro MCP Server convert_to_fits tool\n" # Find original file info orig_file = None orig_type = None for hdu in hdul: if 'ORIGFILE' in hdu.header: orig_file = hdu.header['ORIGFILE'] if 'ORIGTYPE' in hdu.header: orig_type = hdu.header['ORIGTYPE'] if orig_file and orig_type: response += f" - Converted from: {orig_file} ({orig_type})\n" # Usage recommendations response += "\nRECOMMENDED USAGE:\n" if 'CATALOG' in hdu_names: response += " - Load with: astropy.table.Table.read(filename)\n" response += " - Or: from astropy.io import fits; data = fits.getdata(filename, ext='CATALOG')\n" elif any(name in hdu_names for name in ['WAVELENGTH', 'FLUX_ERR']): response += " - Load flux: from astropy.io import fits; flux = fits.getdata(filename, ext=0)\n" response += " - Load wavelength: wavelength = fits.getdata(filename, ext='WAVELENGTH')\n" else: response += " - Load with: from astropy.io import fits; data = fits.getdata(filename)\n" return response def _preview_hdf5(self, filepath: Path) -> str: """Preview HDF5 file structure.""" try: response = "\nHDF5 FILE STRUCTURE:\n" with h5py.File(filepath, 'r') as f: response += "Groups and datasets:\n" def visit_item(name, obj): nonlocal response if isinstance(obj, h5py.Dataset): response += f" {name}: {obj.shape} {obj.dtype}\n" else: response += f" {name}/ (group)\n" f.visititems(visit_item) return response except Exception as e: return f"\nError previewing HDF5: {e}\n" def _generate_loading_examples(self, filepath: Path, file_type: str) -> str: """Generate code examples for loading the file.""" response = f""" LOADING CODE EXAMPLES: ==================== # Python code to load this file: """ if file_type == 'json': response += f"""import json with open('{filepath}', 'r') as f: data = json.load(f) # Access results: data['results'] if exists""" elif file_type == 'csv': response += f"""import pandas as pd df = pd.read_csv('{filepath}') # For large files, use chunking: # for chunk in pd.read_csv('{filepath}', chunksize=10000): # process(chunk)""" elif file_type == 'fits': response += f"""from astropy.io import fits from astropy.table import Table # Method 1: Open file to explore structure hdul = fits.open('{filepath}') print(hdul.info()) # Show all HDUs hdul.close() # Method 2: Load specific data # For catalogs (tables): if 'CATALOG' in [hdu.name for hdu in fits.open('{filepath}')]: catalog = Table.read('{filepath}', hdu='CATALOG') # Access columns: catalog['ra'], catalog['dec'], etc. # For spectra: primary_data = fits.getdata('{filepath}', ext=0) # Flux data header = fits.getheader('{filepath}', ext=0) if 'WAVELENGTH' in [fits.getheader('{filepath}', i).get('EXTNAME', '') for i in range(len(fits.open('{filepath}')))]: wavelength = fits.getdata('{filepath}', ext='WAVELENGTH') # For images: image_data = fits.getdata('{filepath}') wcs_info = fits.getheader('{filepath}')""" elif file_type == 'hdf5': response += f"""import h5py with h5py.File('{filepath}', 'r') as f: # List all keys: list(f.keys()) # Access dataset: data = f['dataset_name'][:]""" elif file_type == 'npy': response += f"""import numpy as np data = np.load('{filepath}')""" return response

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/SandyYuan/astro_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server