Skip to main content
Glama
N-Erickson

AetherLink SDR MCP

by N-Erickson
server.py35.6 kB
#!/usr/bin/env python3 """ SDR MCP Server - Direct control for RTL-SDR with protocol decoding """ import asyncio import json import logging from typing import Dict, List, Optional, Any, Callable from dataclasses import dataclass, asdict from datetime import datetime import numpy as np from contextlib import asynccontextmanager # MCP imports from mcp.server import Server from mcp.types import Tool, TextContent, Resource import mcp.server.stdio # Import hardware drivers from .hardware.rtlsdr import RTLSDRDevice, RTLSDR_AVAILABLE from .hardware.hackrf import HackRFDevice, HACKRF_AVAILABLE, HackRFMode # Import analysis modules from .analysis.spectrum import SpectrumAnalyzer, SignalRecorder, FrequencyScanner # Protocol decoder imports try: import pyModeS as pms ADSB_AVAILABLE = True except ImportError: ADSB_AVAILABLE = False logging.warning("ADS-B decoder not available. Install with: pip install pyModeS") # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Constants DEFAULT_SAMPLE_RATE = 2.048e6 DEFAULT_GAIN = 'auto' ADSB_FREQUENCY = 1090e6 ADSB_SAMPLE_RATE = 2e6 @dataclass class SDRStatus: """Current SDR hardware status""" connected: bool device_name: str frequency: float sample_rate: float gain: float is_capturing: bool active_decoders: List[str] @dataclass class Aircraft: """Tracked aircraft data""" icao: str callsign: Optional[str] = None altitude: Optional[int] = None speed: Optional[float] = None heading: Optional[float] = None latitude: Optional[float] = None longitude: Optional[float] = None last_seen: datetime = None message_count: int = 0 class ADSBDecoder: """ADS-B protocol decoder""" def __init__(self): self.aircraft: Dict[str, Aircraft] = {} self.message_count = 0 def decode_message(self, msg: bytes) -> Optional[Dict[str, Any]]: """Decode a single ADS-B message""" if not ADSB_AVAILABLE: return None try: if len(msg) != 14: # Standard ADS-B message length return None # Get ICAO address icao = pms.icao(msg) if not icao: return None # Update or create aircraft entry if icao not in self.aircraft: self.aircraft[icao] = Aircraft(icao=icao, last_seen=datetime.now()) aircraft = self.aircraft[icao] aircraft.last_seen = datetime.now() aircraft.message_count += 1 self.message_count += 1 # Decode message type tc = pms.typecode(msg) # Aircraft identification (callsign) if 1 <= tc <= 4: callsign = pms.adsb.callsign(msg) if callsign: aircraft.callsign = callsign.strip() # Airborne position elif tc in [9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 20, 21, 22]: alt = pms.adsb.altitude(msg) if alt: aircraft.altitude = alt # Airborne velocity elif tc == 19: velocity = pms.adsb.velocity(msg) if velocity: speed, heading, _, _ = velocity if speed: aircraft.speed = speed if heading: aircraft.heading = heading return { "icao": icao, "aircraft": asdict(aircraft), "message_type": tc, "raw": msg.hex() } except Exception as e: logger.debug(f"Failed to decode ADS-B message: {e}") return None def get_aircraft_list(self) -> List[Dict[str, Any]]: """Get list of all tracked aircraft""" now = datetime.now() active_aircraft = [] for icao, aircraft in self.aircraft.items(): # Only include aircraft seen in last 60 seconds if (now - aircraft.last_seen).seconds < 60: active_aircraft.append(asdict(aircraft)) return active_aircraft class SDRMCPServer: """MCP Server for SDR control""" def __init__(self): self.server = Server("sdr-mcp") self.sdr: Optional[SDRDevice] = None self.adsb_decoder = ADSBDecoder() self.active_decoders: Dict[str, asyncio.Task] = {} # Analysis modules self.spectrum_analyzer = SpectrumAnalyzer() self.signal_recorder = SignalRecorder() self.frequency_scanner = FrequencyScanner(self.spectrum_analyzer) self.setup_handlers() def setup_handlers(self): """Setup MCP server handlers""" @self.server.list_tools() async def list_tools() -> List[Tool]: """List available SDR tools""" tools = [ Tool( name="sdr_connect", description="Connect to SDR hardware (RTL-SDR or HackRF)", inputSchema={ "type": "object", "properties": { "device_type": { "type": "string", "enum": ["rtlsdr", "hackrf"], "default": "rtlsdr" }, "device_index": { "type": "integer", "description": "Device index if multiple devices connected", "default": 0 } } } ), Tool( name="sdr_disconnect", description="Disconnect from SDR hardware", inputSchema={"type": "object", "properties": {}} ), Tool( name="sdr_set_frequency", description="Set SDR center frequency in Hz", inputSchema={ "type": "object", "properties": { "frequency": { "type": "number", "description": "Frequency in Hz (e.g., 1090000000 for 1090 MHz)" } }, "required": ["frequency"] } ), Tool( name="sdr_set_gain", description="Set SDR gain in dB or 'auto'", inputSchema={ "type": "object", "properties": { "gain": { "oneOf": [ {"type": "number", "description": "Gain in dB"}, {"type": "string", "enum": ["auto"]} ] } }, "required": ["gain"] } ), Tool( name="sdr_get_status", description="Get current SDR status and configuration", inputSchema={"type": "object", "properties": {}} ), Tool( name="aviation_track_aircraft", description="Start tracking aircraft via ADS-B on 1090 MHz", inputSchema={"type": "object", "properties": {}} ), Tool( name="aviation_stop_tracking", description="Stop tracking aircraft", inputSchema={"type": "object", "properties": {}} ), Tool( name="aviation_get_aircraft", description="Get list of currently tracked aircraft", inputSchema={"type": "object", "properties": {}} ), Tool( name="spectrum_analyze", description="Perform advanced spectrum analysis at current frequency", inputSchema={ "type": "object", "properties": { "bandwidth": { "type": "number", "description": "Analysis bandwidth in Hz", "default": 2048000 }, "fft_size": { "type": "integer", "description": "FFT size (power of 2)", "default": 2048 }, "window": { "type": "string", "description": "Window function", "enum": ["hamming", "hann", "blackman", "blackman-harris", "flattop"], "default": "blackman-harris" }, "averaging": { "type": "boolean", "description": "Enable spectrum averaging", "default": True } } } ), Tool( name="spectrum_scan", description="Scan a frequency range for signals", inputSchema={ "type": "object", "properties": { "start_freq": { "type": "number", "description": "Start frequency in Hz" }, "stop_freq": { "type": "number", "description": "Stop frequency in Hz" }, "step": { "type": "number", "description": "Step size in Hz", "default": 1000000 }, "dwell_time": { "type": "number", "description": "Dwell time per frequency in seconds", "default": 0.1 } }, "required": ["start_freq", "stop_freq"] } ), Tool( name="recording_start", description="Start recording IQ samples to file", inputSchema={ "type": "object", "properties": { "description": { "type": "string", "description": "Recording description", "default": "" } } } ), Tool( name="recording_stop", description="Stop current recording", inputSchema={"type": "object", "properties": {}} ), Tool( name="hackrf_set_tx_gain", description="Set HackRF transmit gain (0-47 dB)", inputSchema={ "type": "object", "properties": { "gain": { "type": "integer", "description": "TX VGA gain in dB (0-47)", "minimum": 0, "maximum": 47 } }, "required": ["gain"] } ), Tool( name="signal_generator", description="Generate and transmit a signal (HackRF only)", inputSchema={ "type": "object", "properties": { "frequency": { "type": "number", "description": "Transmit frequency in Hz" }, "signal_type": { "type": "string", "enum": ["cw", "tone", "noise", "sweep"], "description": "Type of signal to generate" }, "duration": { "type": "number", "description": "Duration in seconds", "default": 1.0 }, "tone_freq": { "type": "number", "description": "Tone frequency for 'tone' type (Hz)", "default": 1000 } }, "required": ["frequency", "signal_type"] } ) ] return tools @self.server.call_tool() async def call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]: """Handle tool calls""" try: if name == "sdr_connect": device_type = arguments.get("device_type", "rtlsdr") device_index = arguments.get("device_index", 0) if device_type == "rtlsdr": self.sdr = RTLSDRDevice() success = await self.sdr.connect() if success: return [TextContent(type="text", text="Successfully connected to RTL-SDR")] else: return [TextContent(type="text", text="Failed to connect to RTL-SDR. Check device connection.")] elif device_type == "hackrf": self.sdr = HackRFDevice(device_index) success = await self.sdr.connect() if success: return [TextContent(type="text", text="Successfully connected to HackRF")] else: return [TextContent(type="text", text="Failed to connect to HackRF. Check device connection.")] else: return [TextContent(type="text", text=f"Unsupported device type: {device_type}")] elif name == "sdr_disconnect": if self.sdr: # Stop all active decoders for decoder_name, task in self.active_decoders.items(): task.cancel() self.active_decoders.clear() await self.sdr.disconnect() self.sdr = None return [TextContent(type="text", text="Disconnected from SDR")] else: return [TextContent(type="text", text="No SDR connected")] elif name == "sdr_set_frequency": if not self.sdr: return [TextContent(type="text", text="No SDR connected")] freq = arguments["frequency"] await self.sdr.set_frequency(freq) return [TextContent(type="text", text=f"Set frequency to {freq/1e6:.3f} MHz")] elif name == "sdr_set_gain": if not self.sdr: return [TextContent(type="text", text="No SDR connected")] gain = arguments["gain"] await self.sdr.set_gain(gain) # Format gain display based on device type if isinstance(self.sdr, HackRFDevice) and isinstance(gain, dict): gain_str = f"LNA: {gain.get('lna_gain', 'N/A')} dB, VGA: {gain.get('vga_gain', 'N/A')} dB" if 'amp_enable' in gain: gain_str += f", Amp: {'ON' if gain['amp_enable'] else 'OFF'}" return [TextContent(type="text", text=f"Set gain to {gain_str}")] else: return [TextContent(type="text", text=f"Set gain to {gain}")] elif name == "sdr_get_status": if not self.sdr: status = SDRStatus( connected=False, device_name="None", frequency=0, sample_rate=0, gain=0, is_capturing=False, active_decoders=[] ) else: status = SDRStatus( connected=True, device_name=self.sdr.device_name, frequency=self.sdr.frequency, sample_rate=self.sdr.sample_rate, gain=self.sdr.gain, is_capturing=self.sdr.is_capturing, active_decoders=list(self.active_decoders.keys()) ) return [TextContent(type="text", text=json.dumps(asdict(status), indent=2))] elif name == "aviation_track_aircraft": if not self.sdr: return [TextContent(type="text", text="No SDR connected")] if "adsb" in self.active_decoders: return [TextContent(type="text", text="ADS-B tracking already active")] # Configure for ADS-B await self.sdr.set_frequency(ADSB_FREQUENCY) await self.sdr.set_sample_rate(ADSB_SAMPLE_RATE) await self.sdr.set_gain('auto') # Start ADS-B decoder task self.active_decoders["adsb"] = asyncio.create_task(self._adsb_decoder_task()) return [TextContent(type="text", text="Started ADS-B aircraft tracking on 1090 MHz")] elif name == "aviation_stop_tracking": if "adsb" in self.active_decoders: self.active_decoders["adsb"].cancel() del self.active_decoders["adsb"] return [TextContent(type="text", text="Stopped ADS-B tracking")] else: return [TextContent(type="text", text="ADS-B tracking not active")] elif name == "aviation_get_aircraft": aircraft_list = self.adsb_decoder.get_aircraft_list() summary = f"Tracking {len(aircraft_list)} aircraft\n" summary += f"Total messages: {self.adsb_decoder.message_count}\n\n" for aircraft in aircraft_list: summary += f"ICAO: {aircraft['icao']}" if aircraft['callsign']: summary += f" ({aircraft['callsign']})" if aircraft['altitude']: summary += f" - Alt: {aircraft['altitude']} ft" if aircraft['speed']: summary += f" - Speed: {aircraft['speed']} kts" summary += f" - Messages: {aircraft['message_count']}\n" return [TextContent(type="text", text=summary)] elif name == "spectrum_analyze": if not self.sdr: return [TextContent(type="text", text="No SDR connected")] bandwidth = arguments.get("bandwidth", 2048000) fft_size = arguments.get("fft_size", 2048) window = arguments.get("window", "blackman-harris") averaging = arguments.get("averaging", True) # Update analyzer settings self.spectrum_analyzer.fft_size = fft_size self.spectrum_analyzer.window_type = window self.spectrum_analyzer.window = self.spectrum_analyzer._get_window(window, fft_size) # Read samples samples = await self.sdr.read_samples(fft_size * 2) # Double for overlap # Analyze spectrum frame = await self.spectrum_analyzer.analyze_spectrum( samples[:fft_size], self.sdr.sample_rate, self.sdr.frequency ) # Format results result = f"Spectrum Analysis at {frame.center_freq/1e6:.3f} MHz\n" result += f"Bandwidth: {bandwidth/1e6:.3f} MHz\n" result += f"Window: {window}\n" result += f"Peak power: {frame.peak_power:.1f} dB\n" result += f"Noise floor: {frame.noise_floor:.1f} dB\n" result += f"Dynamic range: {frame.peak_power - frame.noise_floor:.1f} dB\n" if frame.detected_signals: result += f"\nDetected {len(frame.detected_signals)} signals:\n" for sig in frame.detected_signals: result += f" {sig.frequency/1e6:.3f} MHz: " result += f"{sig.power:.1f} dB, " result += f"BW: {sig.bandwidth/1e3:.1f} kHz, " result += f"SNR: {sig.snr:.1f} dB" if sig.modulation_hint: result += f" [{sig.modulation_hint}]" result += f" (confidence: {sig.confidence*100:.0f}%)\n" else: result += "\nNo signals detected above threshold" return [TextContent(type="text", text=result)] elif name == "spectrum_scan": if not self.sdr: return [TextContent(type="text", text="No SDR connected")] start_freq = arguments["start_freq"] stop_freq = arguments["stop_freq"] step = arguments.get("step", 1e6) dwell_time = arguments.get("dwell_time", 0.1) result = f"Scanning {start_freq/1e6:.1f} - {stop_freq/1e6:.1f} MHz...\n" # Perform scan scan_results = await self.frequency_scanner.scan_range( self.sdr, start_freq, stop_freq, step, dwell_time ) # Get summary summary = self.frequency_scanner.get_activity_summary() result += f"\nScan complete:\n" result += f"- Scanned {summary['scan_points']} frequencies\n" result += f"- Found {summary['total_signals']} signals\n" if summary['signal_types']: result += "\nSignal types detected:\n" for sig_type, count in summary['signal_types'].items(): result += f" - {sig_type}: {count}\n" if summary['strongest_signal']: sig = summary['strongest_signal'] result += f"\nStrongest signal:\n" result += f" {sig['frequency']/1e6:.3f} MHz @ {sig['power']:.1f} dB" if sig.get('type'): result += f" [{sig['type']}]" return [TextContent(type="text", text=result)] elif name == "recording_start": if not self.sdr: return [TextContent(type="text", text="No SDR connected")] description = arguments.get("description", "") # Start recording recording_id = await self.signal_recorder.start_recording( self.sdr.frequency, self.sdr.sample_rate, self.sdr.gain, description ) # Start recording task self.active_decoders["recorder"] = asyncio.create_task( self._recording_task() ) return [TextContent(type="text", text=f"Started recording: {recording_id}")] elif name == "recording_stop": if "recorder" in self.active_decoders: self.active_decoders["recorder"].cancel() del self.active_decoders["recorder"] metadata = await self.signal_recorder.stop_recording() result = f"Recording stopped:\n" result += f"- ID: {metadata.get('id', 'N/A')}\n" result += f"- Duration: {metadata.get('duration', 0):.1f} seconds\n" result += f"- Samples: {metadata.get('samples_recorded', 0):,}\n" return [TextContent(type="text", text=result)] else: return [TextContent(type="text", text="No recording in progress")] elif name == "hackrf_set_tx_gain": if not isinstance(self.sdr, HackRFDevice): return [TextContent(type="text", text="This command requires a HackRF device")] gain = arguments["gain"] await self.sdr.set_tx_gain(gain) return [TextContent(type="text", text=f"Set HackRF TX gain to {gain} dB")] elif name == "signal_generator": if not isinstance(self.sdr, HackRFDevice): return [TextContent(type="text", text="Signal generation requires a HackRF device")] frequency = arguments["frequency"] signal_type = arguments["signal_type"] duration = arguments.get("duration", 1.0) tone_freq = arguments.get("tone_freq", 1000) # Safety check if not self.sdr.validate_tx_safety(frequency): return [TextContent(type="text", text="Cannot transmit on this frequency (safety restriction)")] # Generate signal num_samples = int(self.sdr.sample_rate * duration) t = np.arange(num_samples) / self.sdr.sample_rate if signal_type == "cw": # Continuous wave (carrier only) signal = np.ones(num_samples, dtype=complex) elif signal_type == "tone": # Single tone signal = np.exp(2j * np.pi * tone_freq * t) elif signal_type == "noise": # White noise signal = (np.random.randn(num_samples) + 1j * np.random.randn(num_samples)) / np.sqrt(2) elif signal_type == "sweep": # Frequency sweep sweep_rate = self.sdr.sample_rate / 4 / duration phase = 2 * np.pi * sweep_rate * t**2 / 2 signal = np.exp(1j * phase) else: return [TextContent(type="text", text=f"Unknown signal type: {signal_type}")] # Set frequency and transmit await self.sdr.set_frequency(frequency) await self.sdr.write_samples(signal * 0.8) # Scale for safety await asyncio.sleep(duration) return [TextContent(type="text", text=f"Transmitted {signal_type} signal at {frequency/1e6:.3f} MHz for {duration} seconds")] else: return [TextContent(type="text", text=f"Unknown tool: {name}")] except Exception as e: return [TextContent(type="text", text=f"Error: {str(e)}")] @self.server.list_resources() async def list_resources() -> List[Resource]: """List available resources""" return [ Resource( uri="sdr://status", name="SDR Status", mimeType="application/json", description="Current SDR hardware status" ), Resource( uri="aviation://aircraft", name="Tracked Aircraft", mimeType="application/json", description="Currently tracked aircraft from ADS-B" ), Resource( uri="spectrum://waterfall", name="Waterfall Data", mimeType="application/json", description="Recent waterfall display data" ), Resource( uri="scan://results", name="Scan Results", mimeType="application/json", description="Latest frequency scan results" ) ] @self.server.read_resource() async def read_resource(uri: str) -> str: """Read resource content""" if uri == "sdr://status": if not self.sdr: status = { "connected": False, "message": "No SDR connected" } else: status = asdict(SDRStatus( connected=True, device_name=self.sdr.device_name, frequency=self.sdr.frequency, sample_rate=self.sdr.sample_rate, gain=self.sdr.gain, is_capturing=self.sdr.is_capturing, active_decoders=list(self.active_decoders.keys()) )) return json.dumps(status, indent=2) elif uri == "aviation://aircraft": aircraft_data = { "aircraft": self.adsb_decoder.get_aircraft_list(), "total_messages": self.adsb_decoder.message_count, "decoder_active": "adsb" in self.active_decoders } return json.dumps(aircraft_data, indent=2, default=str) elif uri == "spectrum://waterfall": waterfall_data = self.spectrum_analyzer.get_waterfall_data(50) data = { "lines": waterfall_data.tolist() if len(waterfall_data) > 0 else [], "fft_size": self.spectrum_analyzer.fft_size, "center_freq": self.sdr.frequency if self.sdr else 0, "sample_rate": self.sdr.sample_rate if self.sdr else 0 } return json.dumps(data, indent=2) elif uri == "scan://results": scan_data = { "results": self.frequency_scanner.scan_results, "summary": self.frequency_scanner.get_activity_summary() } return json.dumps(scan_data, indent=2, default=str) else: return f"Unknown resource: {uri}" async def _adsb_decoder_task(self): """Background task for ADS-B decoding""" logger.info("Starting ADS-B decoder task") self.sdr.is_capturing = True try: while True: # This is a simplified decoder - real implementation would need # proper ADS-B frame detection and error checking samples = await self.sdr.read_samples(int(ADSB_SAMPLE_RATE * 0.1)) # 100ms chunks # Here you would implement actual ADS-B demodulation # For now, this is a placeholder await asyncio.sleep(0.1) except asyncio.CancelledError: logger.info("ADS-B decoder task cancelled") self.sdr.is_capturing = False raise async def _recording_task(self): """Background task for recording IQ samples""" logger.info("Starting recording task") try: while True: # Read samples in chunks chunk_size = int(self.sdr.sample_rate * 0.1) # 100ms chunks samples = await self.sdr.read_samples(chunk_size) # Add to recording await self.signal_recorder.add_samples(samples) except asyncio.CancelledError: logger.info("Recording task cancelled") raise async def run(self): """Run the MCP server""" async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await self.server.run( read_stream, write_stream, self.server.create_initialization_options() ) async def main(): """Main entry point""" server = SDRMCPServer() await server.run() if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/N-Erickson/AetherLink-SDR-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server