Skip to main content
Glama
N-Erickson

AetherLink SDR MCP

by N-Erickson
rtlsdr.py6.78 kB
""" RTL-SDR hardware support for SDR-MCP """ import asyncio import numpy as np import logging from typing import Optional, Any try: import rtlsdr RTLSDR_AVAILABLE = True except ImportError: RTLSDR_AVAILABLE = False logging.warning("RTL-SDR library not available. Install with: pip install pyrtlsdr") from .base import SDRDevice logger = logging.getLogger(__name__) class RTLSDRDevice(SDRDevice): """RTL-SDR hardware implementation""" def __init__(self): super().__init__() self.device_name = "RTL-SDR" # RTL-SDR specific limits self.min_frequency = 24e6 # 24 MHz self.max_frequency = 1.766e9 # 1.766 GHz self.min_sample_rate = 225e3 # 225 ksps self.max_sample_rate = 3.2e6 # 3.2 Msps async def connect(self) -> bool: """Connect to RTL-SDR device""" if not RTLSDR_AVAILABLE: logger.error("RTL-SDR library not available") return False try: # Find and open first available device self.device = rtlsdr.RtlSdr() # Set initial parameters self.device.sample_rate = self.sample_rate self.device.center_freq = self.frequency self.device.gain = self.gain # Get device info logger.info(f"Connected to RTL-SDR: {self.device.get_tuner_type()}") return True except Exception as e: logger.error(f"Failed to connect to RTL-SDR: {e}") self.device = None return False async def disconnect(self): """Disconnect from RTL-SDR device""" if self.device: try: self.device.close() self.device = None logger.info("Disconnected from RTL-SDR") except Exception as e: logger.error(f"Error disconnecting RTL-SDR: {e}") async def set_frequency(self, freq: float): """Set center frequency in Hz""" # Validate frequency range if not (self.min_frequency <= freq <= self.max_frequency): raise ValueError( f"Frequency {freq/1e6:.3f} MHz out of RTL-SDR range " f"({self.min_frequency/1e6:.1f}-{self.max_frequency/1e6:.1f} MHz)" ) self.frequency = freq if self.device: self.device.center_freq = freq logger.debug(f"Set frequency to {freq/1e6:.3f} MHz") async def set_sample_rate(self, rate: float): """Set sample rate in Hz""" # Validate sample rate if not (self.min_sample_rate <= rate <= self.max_sample_rate): raise ValueError( f"Sample rate {rate/1e6:.3f} Msps out of RTL-SDR range " f"({self.min_sample_rate/1e6:.3f}-{self.max_sample_rate/1e6:.1f} Msps)" ) self.sample_rate = rate if self.device: self.device.sample_rate = rate logger.debug(f"Set sample rate to {rate/1e6:.3f} Msps") async def set_gain(self, gain: Any): """Set gain (dB or 'auto')""" self.gain = gain if self.device: self.device.gain = gain # Log actual gain if manual mode if gain != 'auto': actual_gain = self.device.gain logger.debug(f"Set gain to {actual_gain} dB") else: logger.debug("Set gain to auto") async def read_samples(self, num_samples: int) -> np.ndarray: """Read IQ samples from RTL-SDR""" if not self.device: raise RuntimeError("Device not connected") # RTL-SDR returns complex64 samples self.is_capturing = True try: # Use asyncio to avoid blocking samples = await asyncio.to_thread( self.device.read_samples, num_samples ) return samples finally: self.is_capturing = False async def get_info(self) -> dict: """Get extended device information""" info = await super().get_info() if self.device: try: info.update({ "tuner_type": self.device.get_tuner_type(), "tuner_gains": self.device.get_gains(), "freq_correction": self.device.freq_correction, "sample_rate_range": f"{self.min_sample_rate/1e6:.3f}-{self.max_sample_rate/1e6:.1f} Msps", "frequency_range": f"{self.min_frequency/1e6:.1f}-{self.max_frequency/1e6:.1f} MHz" }) except Exception as e: logger.debug(f"Could not get extended info: {e}") return info # Mock implementation for testing without hardware class MockRTLSDRDevice(SDRDevice): """Mock RTL-SDR for testing""" def __init__(self): super().__init__() self.device_name = "RTL-SDR (Mock)" logger.info("Using mock RTL-SDR implementation") async def connect(self) -> bool: self.device = True # Fake connection logger.info("Mock RTL-SDR connected") return True async def disconnect(self): self.device = None logger.info("Mock RTL-SDR disconnected") async def set_frequency(self, freq: float): self.frequency = freq logger.debug(f"Mock RTL-SDR set frequency to {freq/1e6:.3f} MHz") async def set_sample_rate(self, rate: float): self.sample_rate = rate logger.debug(f"Mock RTL-SDR set sample rate to {rate/1e6:.3f} Msps") async def set_gain(self, gain: Any): self.gain = gain logger.debug(f"Mock RTL-SDR set gain to {gain}") async def read_samples(self, num_samples: int) -> np.ndarray: """Generate mock samples with some signals""" # Generate noise noise = (np.random.randn(num_samples) + 1j * np.random.randn(num_samples)) * 0.1 # Add a few tones for testing t = np.arange(num_samples) / self.sample_rate # Add some signals at different frequencies signal = noise signal += 0.5 * np.exp(2j * np.pi * 1e3 * t) # 1 kHz tone signal += 0.3 * np.exp(2j * np.pi * 10e3 * t) # 10 kHz tone signal += 0.2 * np.exp(2j * np.pi * -5e3 * t) # -5 kHz tone return signal.astype(np.complex64) # Use mock if hardware not available if not RTLSDR_AVAILABLE: RTLSDRDevice = MockRTLSDRDevice

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/N-Erickson/AetherLink-SDR-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server