#!/usr/bin/env python3
"""
SDR MCP Server - Direct control for RTL-SDR with protocol decoding
"""
import asyncio
import json
import logging
from typing import Dict, List, Optional, Any, Callable
from dataclasses import dataclass, asdict
from datetime import datetime
import numpy as np
from contextlib import asynccontextmanager
# MCP imports
from mcp.server import Server
from mcp.types import Tool, TextContent, Resource
import mcp.server.stdio
# Import hardware drivers
from .hardware.rtlsdr import RTLSDRDevice, RTLSDR_AVAILABLE
from .hardware.hackrf import HackRFDevice, HACKRF_AVAILABLE, HackRFMode
# Import analysis modules
from .analysis.spectrum import SpectrumAnalyzer, SignalRecorder, FrequencyScanner, AudioRecorder
# Protocol decoder imports
try:
import pyModeS as pms
ADSB_AVAILABLE = True
except ImportError:
ADSB_AVAILABLE = False
logging.warning("ADS-B decoder not available. Install with: pip install pyModeS")
from .decoders.pocsag import POCSAGDecoder
from .decoders.ais import AISDecoder
from .decoders.noaa_apt import NOAAAPTDecoder
from .decoders.rtl433 import RTL433Decoder
from .decoders.meteor_lrpt import MeteorLRPTDecoder
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Constants
DEFAULT_SAMPLE_RATE = 2.048e6
DEFAULT_GAIN = 'auto'
ADSB_FREQUENCY = 1090e6
ADSB_SAMPLE_RATE = 2e6
@dataclass
class SDRStatus:
"""Current SDR hardware status"""
connected: bool
device_name: str
frequency: float
sample_rate: float
gain: float
is_capturing: bool
active_decoders: List[str]
@dataclass
class Aircraft:
"""Tracked aircraft data"""
icao: str
callsign: Optional[str] = None
altitude: Optional[int] = None
speed: Optional[float] = None
heading: Optional[float] = None
latitude: Optional[float] = None
longitude: Optional[float] = None
last_seen: datetime = None
message_count: int = 0
class ADSBDecoder:
"""ADS-B protocol decoder"""
def __init__(self):
self.aircraft: Dict[str, Aircraft] = {}
self.message_count = 0
def decode_message(self, msg_hex: str) -> Optional[Dict[str, Any]]:
"""Decode a single ADS-B message from hex string"""
if not ADSB_AVAILABLE:
return None
try:
# pyModeS expects hex strings, not bytes
if len(msg_hex) != 28: # 14 bytes * 2 hex chars
return None
# Get ICAO address
icao = pms.adsb.icao(msg_hex)
if not icao:
return None
# Update or create aircraft entry
if icao not in self.aircraft:
self.aircraft[icao] = Aircraft(icao=icao, last_seen=datetime.now())
aircraft = self.aircraft[icao]
aircraft.last_seen = datetime.now()
aircraft.message_count += 1
self.message_count += 1
# Decode message type
tc = pms.adsb.typecode(msg_hex)
# Aircraft identification (callsign)
if 1 <= tc <= 4:
callsign = pms.adsb.callsign(msg_hex)
if callsign:
aircraft.callsign = callsign.strip()
# Airborne position
elif tc in [9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 20, 21, 22]:
alt = pms.adsb.altitude(msg_hex)
if alt:
aircraft.altitude = alt
# Airborne velocity
elif tc == 19:
velocity = pms.adsb.velocity(msg_hex)
if velocity:
speed, heading, _, _ = velocity
if speed:
aircraft.speed = speed
if heading:
aircraft.heading = heading
return {
"icao": icao,
"aircraft": asdict(aircraft),
"message_type": tc,
"raw": msg_hex
}
except Exception as e:
logger.debug(f"Failed to decode ADS-B message: {e}")
return None
def get_aircraft_list(self) -> List[Dict[str, Any]]:
"""Get list of all tracked aircraft"""
now = datetime.now()
active_aircraft = []
for icao, aircraft in self.aircraft.items():
# Only include aircraft seen in last 120 seconds (2 minutes)
time_diff = (now - aircraft.last_seen).total_seconds()
if time_diff < 120:
active_aircraft.append(asdict(aircraft))
return active_aircraft
class SDRMCPServer:
"""MCP Server for SDR control"""
def __init__(self):
self.server = Server("sdr-mcp")
self.sdr: Optional[SDRDevice] = None
self.adsb_decoder = ADSBDecoder()
self.pocsag_decoder = POCSAGDecoder()
self.ais_decoder = AISDecoder()
self.noaa_decoder = NOAAAPTDecoder()
self.rtl433_decoder = RTL433Decoder()
self.meteor_decoder = MeteorLRPTDecoder()
self.active_decoders: Dict[str, asyncio.Task] = {}
# Analysis modules
self.spectrum_analyzer = SpectrumAnalyzer()
self.signal_recorder = SignalRecorder()
self.audio_recorder = AudioRecorder()
self.frequency_scanner = FrequencyScanner(self.spectrum_analyzer)
self.setup_handlers()
def setup_handlers(self):
"""Setup MCP server handlers"""
@self.server.list_tools()
async def list_tools() -> List[Tool]:
"""List available SDR tools"""
tools = [
Tool(
name="sdr_connect",
description="Connect to SDR hardware (RTL-SDR or HackRF)",
inputSchema={
"type": "object",
"properties": {
"device_type": {
"type": "string",
"enum": ["rtlsdr", "hackrf"],
"default": "rtlsdr"
},
"device_index": {
"type": "integer",
"description": "Device index if multiple devices connected",
"default": 0
}
}
}
),
Tool(
name="sdr_disconnect",
description="Disconnect from SDR hardware",
inputSchema={"type": "object", "properties": {}}
),
Tool(
name="sdr_set_frequency",
description="Set SDR center frequency in Hz",
inputSchema={
"type": "object",
"properties": {
"frequency": {
"type": "number",
"description": "Frequency in Hz (e.g., 1090000000 for 1090 MHz)"
}
},
"required": ["frequency"]
}
),
Tool(
name="sdr_set_gain",
description="Set SDR gain in dB or 'auto'",
inputSchema={
"type": "object",
"properties": {
"gain": {
"oneOf": [
{"type": "number", "description": "Gain in dB"},
{"type": "string", "enum": ["auto"]}
]
}
},
"required": ["gain"]
}
),
Tool(
name="sdr_get_status",
description="Get current SDR status and configuration",
inputSchema={"type": "object", "properties": {}}
),
Tool(
name="aviation_track_aircraft",
description="Start tracking aircraft via ADS-B on 1090 MHz",
inputSchema={"type": "object", "properties": {}}
),
Tool(
name="aviation_stop_tracking",
description="Stop tracking aircraft",
inputSchema={"type": "object", "properties": {}}
),
Tool(
name="aviation_get_aircraft",
description="Get list of currently tracked aircraft",
inputSchema={"type": "object", "properties": {}}
),
Tool(
name="pager_start_decoding",
description="Start decoding POCSAG pager messages on current frequency",
inputSchema={
"type": "object",
"properties": {
"baud_rate": {
"type": "integer",
"enum": [512, 1200, 2400],
"description": "POCSAG baud rate",
"default": 1200
}
}
}
),
Tool(
name="pager_stop_decoding",
description="Stop decoding POCSAG pager messages",
inputSchema={"type": "object", "properties": {}}
),
Tool(
name="pager_get_messages",
description="Get decoded pager messages",
inputSchema={"type": "object", "properties": {}}
),
Tool(
name="marine_track_vessels",
description="Start tracking ships via AIS on 161.975 MHz or 162.025 MHz",
inputSchema={
"type": "object",
"properties": {
"channel": {
"type": "string",
"enum": ["A", "B"],
"description": "AIS channel (A=161.975 MHz, B=162.025 MHz)",
"default": "A"
}
}
}
),
Tool(
name="marine_stop_tracking",
description="Stop tracking ships",
inputSchema={"type": "object", "properties": {}}
),
Tool(
name="marine_get_vessels",
description="Get list of tracked vessels",
inputSchema={"type": "object", "properties": {}}
),
Tool(
name="satellite_decode_noaa",
description="Decode NOAA weather satellite APT transmission (DEPRECATED - all NOAA APT satellites decommissioned Aug 2025)",
inputSchema={
"type": "object",
"properties": {
"satellite": {
"type": "string",
"enum": ["NOAA-15", "NOAA-18", "NOAA-19"],
"description": "NOAA satellite identifier"
},
"duration": {
"type": "number",
"description": "Recording duration in seconds (typically 600-900 for full pass)",
"default": 600
}
},
"required": ["satellite"]
}
),
Tool(
name="satellite_decode_meteor",
description="Decode Meteor-M weather satellite LRPT transmission using SatDump",
inputSchema={
"type": "object",
"properties": {
"satellite": {
"type": "string",
"enum": ["METEOR-M2-3", "METEOR-M2-4"],
"description": "Meteor satellite identifier",
"default": "METEOR-M2-4"
},
"duration": {
"type": "number",
"description": "Recording duration in seconds (typically 600-900 for full pass)",
"default": 600
},
"gain": {
"type": "number",
"description": "RTL-SDR gain in dB",
"default": 40
}
}
}
),
Tool(
name="spectrum_analyze",
description="Perform advanced spectrum analysis at current frequency",
inputSchema={
"type": "object",
"properties": {
"bandwidth": {
"type": "number",
"description": "Analysis bandwidth in Hz",
"default": 2048000
},
"fft_size": {
"type": "integer",
"description": "FFT size (power of 2)",
"default": 2048
},
"window": {
"type": "string",
"description": "Window function",
"enum": ["hamming", "hann", "blackman", "blackman-harris", "flattop"],
"default": "blackman-harris"
},
"averaging": {
"type": "boolean",
"description": "Enable spectrum averaging",
"default": True
}
}
}
),
Tool(
name="spectrum_scan",
description="Scan a frequency range for signals",
inputSchema={
"type": "object",
"properties": {
"start_freq": {
"type": "number",
"description": "Start frequency in Hz"
},
"stop_freq": {
"type": "number",
"description": "Stop frequency in Hz"
},
"step": {
"type": "number",
"description": "Step size in Hz",
"default": 1000000
},
"dwell_time": {
"type": "number",
"description": "Dwell time per frequency in seconds",
"default": 0.1
}
},
"required": ["start_freq", "stop_freq"]
}
),
Tool(
name="recording_start",
description="Start recording IQ samples to file",
inputSchema={
"type": "object",
"properties": {
"description": {
"type": "string",
"description": "Recording description",
"default": ""
}
}
}
),
Tool(
name="recording_stop",
description="Stop current recording",
inputSchema={"type": "object", "properties": {}}
),
Tool(
name="audio_record_start",
description="Start recording demodulated audio (FM/AM) to WAV file",
inputSchema={
"type": "object",
"properties": {
"modulation": {
"type": "string",
"description": "Modulation type: FM or AM",
"enum": ["FM", "AM"],
"default": "FM"
},
"description": {
"type": "string",
"description": "Recording description",
"default": ""
}
}
}
),
Tool(
name="audio_record_stop",
description="Stop current audio recording",
inputSchema={"type": "object", "properties": {}}
),
Tool(
name="hackrf_set_tx_gain",
description="Set HackRF transmit gain (0-47 dB)",
inputSchema={
"type": "object",
"properties": {
"gain": {
"type": "integer",
"description": "TX VGA gain in dB (0-47)",
"minimum": 0,
"maximum": 47
}
},
"required": ["gain"]
}
),
Tool(
name="signal_generator",
description="Generate and transmit a signal (HackRF only)",
inputSchema={
"type": "object",
"properties": {
"frequency": {
"type": "number",
"description": "Transmit frequency in Hz"
},
"signal_type": {
"type": "string",
"enum": ["cw", "tone", "noise", "sweep"],
"description": "Type of signal to generate"
},
"duration": {
"type": "number",
"description": "Duration in seconds",
"default": 1.0
},
"tone_freq": {
"type": "number",
"description": "Tone frequency for 'tone' type (Hz)",
"default": 1000
}
},
"required": ["frequency", "signal_type"]
}
),
Tool(
name="ism_start_scanning",
description="Start scanning ISM bands for devices (433MHz, 315MHz, 868MHz, 915MHz) using rtl_433",
inputSchema={
"type": "object",
"properties": {
"frequencies": {
"type": "array",
"description": "List of frequencies to scan in MHz (e.g., [433.92, 315])",
"items": {"type": "number"},
"default": [433.92, 315]
},
"hop_interval": {
"type": "integer",
"description": "Hop between frequencies every N seconds",
"default": 30
}
}
}
),
Tool(
name="ism_stop_scanning",
description="Stop ISM band scanning",
inputSchema={"type": "object", "properties": {}}
),
Tool(
name="ism_get_devices",
description="Get list of detected ISM band devices (weather stations, sensors, etc.)",
inputSchema={
"type": "object",
"properties": {
"max_age": {
"type": "integer",
"description": "Maximum age of devices in seconds",
"default": 300
}
}
}
)
]
return tools
@self.server.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]:
"""Handle tool calls"""
try:
if name == "sdr_connect":
device_type = arguments.get("device_type", "rtlsdr")
device_index = arguments.get("device_index", 0)
if device_type == "rtlsdr":
self.sdr = RTLSDRDevice()
success = await self.sdr.connect()
if success:
return [TextContent(type="text", text="Successfully connected to RTL-SDR")]
else:
return [TextContent(type="text", text="Failed to connect to RTL-SDR. Check device connection.")]
elif device_type == "hackrf":
self.sdr = HackRFDevice(device_index)
success = await self.sdr.connect()
if success:
return [TextContent(type="text", text="Successfully connected to HackRF")]
else:
return [TextContent(type="text", text="Failed to connect to HackRF. Check device connection.")]
else:
return [TextContent(type="text", text=f"Unsupported device type: {device_type}")]
elif name == "sdr_disconnect":
if self.sdr:
# Stop all active decoders
for decoder_name, task in self.active_decoders.items():
task.cancel()
self.active_decoders.clear()
await self.sdr.disconnect()
self.sdr = None
return [TextContent(type="text", text="Disconnected from SDR")]
else:
return [TextContent(type="text", text="No SDR connected")]
elif name == "sdr_set_frequency":
if not self.sdr:
return [TextContent(type="text", text="No SDR connected")]
freq = arguments["frequency"]
await self.sdr.set_frequency(freq)
return [TextContent(type="text", text=f"Set frequency to {freq/1e6:.3f} MHz")]
elif name == "sdr_set_gain":
if not self.sdr:
return [TextContent(type="text", text="No SDR connected")]
gain = arguments["gain"]
await self.sdr.set_gain(gain)
# Format gain display based on device type
if isinstance(self.sdr, HackRFDevice) and isinstance(gain, dict):
gain_str = f"LNA: {gain.get('lna_gain', 'N/A')} dB, VGA: {gain.get('vga_gain', 'N/A')} dB"
if 'amp_enable' in gain:
gain_str += f", Amp: {'ON' if gain['amp_enable'] else 'OFF'}"
return [TextContent(type="text", text=f"Set gain to {gain_str}")]
else:
return [TextContent(type="text", text=f"Set gain to {gain}")]
elif name == "sdr_get_status":
if not self.sdr:
status = SDRStatus(
connected=False,
device_name="None",
frequency=0,
sample_rate=0,
gain=0,
is_capturing=False,
active_decoders=[]
)
else:
status = SDRStatus(
connected=True,
device_name=self.sdr.device_name,
frequency=self.sdr.frequency,
sample_rate=self.sdr.sample_rate,
gain=self.sdr.gain,
is_capturing=self.sdr.is_capturing,
active_decoders=list(self.active_decoders.keys())
)
return [TextContent(type="text", text=json.dumps(asdict(status), indent=2))]
elif name == "aviation_track_aircraft":
if "adsb" in self.active_decoders:
return [TextContent(type="text", text="ADS-B tracking already active")]
# Start ADS-B decoder task (will handle SDR access)
try:
self.active_decoders["adsb"] = asyncio.create_task(self._adsb_decoder_task())
await asyncio.sleep(2.0) # Give it time to disconnect SDR and start rtl_adsb
# Check if it failed immediately
if self.active_decoders["adsb"].done():
try:
await self.active_decoders["adsb"]
except Exception as e:
del self.active_decoders["adsb"]
return [TextContent(type="text", text=f"Failed to start ADS-B: {str(e)}\n\nMake sure the RTL-SDR is connected.")]
return [TextContent(type="text", text="Started ADS-B aircraft tracking on 1090 MHz\n\nNOTE: Python SDR control is paused while tracking.\nUse aviation_stop_tracking to regain SDR control.")]
except Exception as e:
return [TextContent(type="text", text=f"Failed to start ADS-B tracking: {str(e)}")]
elif name == "aviation_stop_tracking":
if "adsb" in self.active_decoders:
self.active_decoders["adsb"].cancel()
del self.active_decoders["adsb"]
return [TextContent(type="text", text="Stopped ADS-B tracking")]
else:
return [TextContent(type="text", text="ADS-B tracking not active")]
elif name == "aviation_get_aircraft":
# Get aircraft list and log details
aircraft_list = self.adsb_decoder.get_aircraft_list()
total_tracked = len(self.adsb_decoder.aircraft)
logger.info(f"Total aircraft ever seen: {total_tracked}")
logger.info(f"Active aircraft (last 2 min): {len(aircraft_list)}")
logger.info(f"Total messages decoded: {self.adsb_decoder.message_count}")
summary = f"Tracking {len(aircraft_list)} active aircraft\n"
summary += f"Total messages decoded: {self.adsb_decoder.message_count}\n"
summary += f"Total aircraft seen: {total_tracked}\n\n"
if not aircraft_list and total_tracked > 0:
summary += "⚠️ Aircraft were detected but none are active in the last 2 minutes.\n"
summary += "This is normal - aircraft may have flown out of range.\n\n"
for aircraft in aircraft_list:
summary += f"ICAO: {aircraft['icao']}"
if aircraft['callsign']:
summary += f" ({aircraft['callsign']})"
if aircraft['altitude']:
summary += f" - Alt: {aircraft['altitude']:,.0f} ft"
if aircraft['speed']:
summary += f" - Speed: {aircraft['speed']:.0f} kts"
summary += f" - Messages: {aircraft['message_count']}\n"
return [TextContent(type="text", text=summary)]
elif name == "pager_start_decoding":
if not self.sdr:
return [TextContent(type="text", text="No SDR connected")]
if "pocsag" in self.active_decoders:
return [TextContent(type="text", text="POCSAG decoding already active")]
baud_rate = arguments.get("baud_rate", 1200)
self.pocsag_decoder.baud_rate = baud_rate
# Start decoder task
self.active_decoders["pocsag"] = asyncio.create_task(
self._pocsag_decoder_task()
)
return [TextContent(type="text", text=f"Started POCSAG pager decoding at {baud_rate} baud\nFrequency: {self.sdr.frequency/1e6:.3f} MHz")]
elif name == "pager_stop_decoding":
if "pocsag" in self.active_decoders:
self.active_decoders["pocsag"].cancel()
del self.active_decoders["pocsag"]
return [TextContent(type="text", text="Stopped POCSAG decoding")]
else:
return [TextContent(type="text", text="POCSAG decoding not active")]
elif name == "pager_get_messages":
stats = self.pocsag_decoder.get_statistics()
messages = self.pocsag_decoder.messages
result = f"POCSAG Messages: {stats['total_messages']}\n"
result += f"Messages stored: {stats['messages_stored']}\n"
result += f"Addresses seen: {stats['addresses_seen']}\n\n"
if not messages:
result += "No messages decoded yet\n"
else:
for msg in messages[-20:]: # Show last 20
result += f"Address: {msg['address']} (Function {msg['function']})\n"
result += f"Type: {msg['message_type']}\n"
result += f"Message: {msg['message']}\n"
result += f"Time: {msg['timestamp']}\n\n"
return [TextContent(type="text", text=result)]
elif name == "marine_track_vessels":
if not self.sdr:
return [TextContent(type="text", text="No SDR connected")]
if "ais" in self.active_decoders:
return [TextContent(type="text", text="AIS tracking already active")]
channel = arguments.get("channel", "A")
ais_freq = 161.975e6 if channel == "A" else 162.025e6
# Set frequency for AIS
await self.sdr.set_frequency(ais_freq)
# Start decoder task
self.active_decoders["ais"] = asyncio.create_task(
self._ais_decoder_task()
)
return [TextContent(type="text", text=f"Started AIS vessel tracking on channel {channel} ({ais_freq/1e6:.3f} MHz)")]
elif name == "marine_stop_tracking":
if "ais" in self.active_decoders:
self.active_decoders["ais"].cancel()
del self.active_decoders["ais"]
return [TextContent(type="text", text="Stopped AIS tracking")]
else:
return [TextContent(type="text", text="AIS tracking not active")]
elif name == "marine_get_vessels":
vessels = self.ais_decoder.get_vessel_list()
stats = self.ais_decoder.get_statistics()
result = f"Tracking {len(vessels)} vessels\n"
result += f"Total messages: {stats['total_messages']}\n"
result += f"Total vessels seen: {stats['total_vessels']}\n"
result += f"Active vessels: {stats['active_vessels']}\n\n"
if not vessels:
result += "No vessels tracked yet\n"
else:
for vessel in vessels:
result += f"MMSI: {vessel['mmsi']}"
if vessel.get('name'):
result += f" - {vessel['name']}"
if vessel.get('latitude') and vessel.get('longitude'):
result += f"\nPosition: {vessel['latitude']:.4f}, {vessel['longitude']:.4f}"
if vessel.get('speed'):
result += f" - Speed: {vessel['speed']:.1f} kts"
if vessel.get('heading'):
result += f" - Heading: {vessel['heading']:.0f}°"
if vessel.get('ship_type'):
result += f"\nType: {vessel['ship_type']}"
result += f"\nMessages: {vessel['message_count']}\n\n"
return [TextContent(type="text", text=result)]
elif name == "satellite_decode_noaa":
if not self.sdr:
return [TextContent(type="text", text="No SDR connected")]
satellite = arguments["satellite"]
duration = arguments.get("duration", 600)
# Get NOAA frequency
from .decoders.noaa_apt import NOAA_FREQUENCIES
if satellite not in NOAA_FREQUENCIES:
return [TextContent(type="text", text=f"Unknown satellite: {satellite}")]
freq = NOAA_FREQUENCIES[satellite]
await self.sdr.set_frequency(freq)
result = f"Recording {satellite} APT transmission...\n"
result += f"Frequency: {freq/1e6:.3f} MHz\n"
result += f"Duration: {duration} seconds\n\n"
# Record IQ samples
logger.info(f"Recording {satellite} for {duration} seconds")
samples_list = []
chunks = int(duration / 5) # 5 second chunks
for i in range(chunks):
chunk = await self.sdr.read_samples(int(self.sdr.sample_rate * 5))
samples_list.append(chunk)
if i % 6 == 0: # Log every 30 seconds
logger.info(f"Recording progress: {(i+1)*5}/{duration} seconds")
all_samples = np.concatenate(samples_list)
# Decode APT
logger.info("Decoding NOAA APT image...")
image = self.noaa_decoder.decode_pass(all_samples, int(self.sdr.sample_rate), satellite)
if image:
# Save image to /tmp/sdr_recordings (same as other recording tools)
import os
output_dir = "/tmp/sdr_recordings"
os.makedirs(output_dir, exist_ok=True)
filename = f"{output_dir}/noaa_{satellite}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
self.noaa_decoder.save_image(image, filename)
result += f"✅ Successfully decoded!\n"
result += f"Lines decoded: {image.num_lines}\n"
result += f"Quality: {image.quality*100:.1f}%\n"
result += f"Saved to: {filename}\n"
else:
result += "❌ Failed to decode image - no valid sync patterns found\n"
return [TextContent(type="text", text=result)]
elif name == "satellite_decode_meteor":
satellite = arguments.get("satellite", "METEOR-M2-4")
duration = arguments.get("duration", 600)
gain = arguments.get("gain", 40)
# Get satellite info
sat_info = self.meteor_decoder.get_satellite_info(satellite)
if not sat_info:
return [TextContent(type="text", text=f"Unknown satellite: {satellite}")]
if sat_info["status"] != "active":
return [TextContent(type="text", text=f"Warning: {satellite} status is '{sat_info['status']}'. Decoding may fail.\n\nActive satellites: {', '.join(self.meteor_decoder.get_active_satellites())}")]
freq = sat_info["frequency"]
# Check for SatDump
import shutil
satdump_path = shutil.which('satdump')
if not satdump_path:
return [TextContent(type="text", text="SatDump not found! Install with: brew install satdump\n\nSatDump is required for Meteor-M LRPT decoding.")]
result = f"Decoding {satellite} LRPT transmission...\n"
result += f"Frequency: {freq/1e6:.3f} MHz\n"
result += f"Duration: {duration} seconds\n"
result += f"Gain: {gain} dB\n\n"
# Create output directory
import os
output_dir = f"/tmp/sdr_recordings/meteor_{satellite}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
os.makedirs(output_dir, exist_ok=True)
# Build SatDump command
cmd = self.meteor_decoder.build_satdump_command(
satellite, freq, output_dir, duration, gain
)
result += f"Running: {' '.join(cmd)}\n\n"
result += "This will take several minutes. SatDump will:\n"
result += "1. Tune RTL-SDR to " + f"{freq/1e6:.1f} MHz\n"
result += "2. Demodulate OQPSK signal\n"
result += "3. Decode LRPT frames with error correction\n"
result += "4. Generate channel images (visible, infrared)\n"
result += f"5. Save results to: {output_dir}/\n\n"
# Run SatDump
try:
logger.info(f"Starting SatDump for {satellite}")
logger.info(f"Command: {' '.join(cmd)}")
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT # Combine stderr with stdout
)
# Wait for completion and capture output
stdout, _ = await process.communicate()
# Decode output
output_text = stdout.decode('utf-8', errors='replace') if stdout else ""
logger.info(f"SatDump return code: {process.returncode}")
if output_text:
logger.info(f"SatDump output (last 500 chars): {output_text[-500:]}")
if process.returncode == 0:
# Parse output
parsed = self.meteor_decoder.parse_satdump_output(output_dir)
if parsed["success"] and parsed["images"]:
result += f"✅ Successfully decoded {satellite}!\n\n"
result += f"Images generated: {len(parsed['images'])}\n"
result += f"Output directory: {output_dir}/\n\n"
result += "Decoded files:\n"
for img in parsed["images"]:
result += f" - {os.path.basename(img)}\n"
# Create pass record
from .decoders.meteor_lrpt import MeteorPass
meteor_pass = MeteorPass(
satellite=satellite,
frequency=freq,
start_time=datetime.now(),
duration=duration,
output_dir=output_dir,
decoded_images=parsed["images"],
success=True,
channels_received=parsed["channels"]
)
self.meteor_decoder.add_pass(meteor_pass)
else:
result += "❌ Decoding completed but no images were generated.\n"
result += "This usually means:\n"
result += "- Satellite was below horizon (check pass prediction)\n"
result += "- Signal too weak (check antenna and gain)\n"
result += "- Incorrect frequency\n"
else:
result += f"❌ SatDump failed with return code {process.returncode}\n"
if output_text:
# Show last part of output which usually contains the error
result += f"\nOutput (last 1000 chars):\n{output_text[-1000:]}\n"
except Exception as e:
result += f"❌ Error running SatDump: {str(e)}\n"
import traceback
result += f"\nTraceback:\n{traceback.format_exc()}\n"
return [TextContent(type="text", text=result)]
elif name == "spectrum_analyze":
if not self.sdr:
return [TextContent(type="text", text="No SDR connected")]
bandwidth = arguments.get("bandwidth", 2048000)
fft_size = arguments.get("fft_size", 2048)
window = arguments.get("window", "blackman-harris")
averaging = arguments.get("averaging", True)
# Update analyzer settings
self.spectrum_analyzer.fft_size = fft_size
self.spectrum_analyzer.window_type = window
self.spectrum_analyzer.window = self.spectrum_analyzer._get_window(window, fft_size)
# Read samples
samples = await self.sdr.read_samples(fft_size * 2) # Double for overlap
# Analyze spectrum
frame = await self.spectrum_analyzer.analyze_spectrum(
samples[:fft_size],
self.sdr.sample_rate,
self.sdr.frequency
)
# Format results
result = f"Spectrum Analysis at {frame.center_freq/1e6:.3f} MHz\n"
result += f"Bandwidth: {bandwidth/1e6:.3f} MHz\n"
result += f"Window: {window}\n"
result += f"Peak power: {frame.peak_power:.1f} dB\n"
result += f"Noise floor: {frame.noise_floor:.1f} dB\n"
result += f"Dynamic range: {frame.peak_power - frame.noise_floor:.1f} dB\n"
if frame.detected_signals:
result += f"\nDetected {len(frame.detected_signals)} signals:\n"
for sig in frame.detected_signals:
result += f" {sig.frequency/1e6:.3f} MHz: "
result += f"{sig.power:.1f} dB, "
result += f"BW: {sig.bandwidth/1e3:.1f} kHz, "
result += f"SNR: {sig.snr:.1f} dB"
if sig.modulation_hint:
result += f" [{sig.modulation_hint}]"
result += f" (confidence: {sig.confidence*100:.0f}%)\n"
else:
result += "\nNo signals detected above threshold"
return [TextContent(type="text", text=result)]
elif name == "spectrum_scan":
if not self.sdr:
return [TextContent(type="text", text="No SDR connected")]
start_freq = arguments["start_freq"]
stop_freq = arguments["stop_freq"]
step = arguments.get("step", 1e6)
dwell_time = arguments.get("dwell_time", 0.1)
result = f"Scanning {start_freq/1e6:.1f} - {stop_freq/1e6:.1f} MHz...\n"
# Perform scan
scan_results = await self.frequency_scanner.scan_range(
self.sdr, start_freq, stop_freq, step, dwell_time
)
# Get summary
summary = self.frequency_scanner.get_activity_summary()
result += f"\nScan complete:\n"
result += f"- Scanned {summary['scan_points']} frequencies\n"
result += f"- Found {summary['total_signals']} signals\n"
if summary['signal_types']:
result += "\nSignal types detected:\n"
for sig_type, count in summary['signal_types'].items():
result += f" - {sig_type}: {count}\n"
if summary['strongest_signal']:
sig = summary['strongest_signal']
result += f"\nStrongest signal:\n"
result += f" {sig['frequency']/1e6:.3f} MHz @ {sig['power']:.1f} dB"
if sig.get('type'):
result += f" [{sig['type']}]"
return [TextContent(type="text", text=result)]
elif name == "recording_start":
if not self.sdr:
return [TextContent(type="text", text="No SDR connected")]
description = arguments.get("description", "")
# Start recording
recording_id = await self.signal_recorder.start_recording(
self.sdr.frequency,
self.sdr.sample_rate,
self.sdr.gain,
description
)
# Start recording task
self.active_decoders["recorder"] = asyncio.create_task(
self._recording_task()
)
return [TextContent(type="text", text=f"Started recording: {recording_id}")]
elif name == "recording_stop":
if "recorder" in self.active_decoders:
self.active_decoders["recorder"].cancel()
del self.active_decoders["recorder"]
metadata = await self.signal_recorder.stop_recording()
result = f"Recording stopped:\n"
result += f"- ID: {metadata.get('id', 'N/A')}\n"
result += f"- Duration: {metadata.get('duration', 0):.1f} seconds\n"
result += f"- Samples: {metadata.get('samples_recorded', 0):,}\n"
return [TextContent(type="text", text=result)]
else:
return [TextContent(type="text", text="No recording in progress")]
elif name == "audio_record_start":
if not self.sdr:
return [TextContent(type="text", text="No SDR connected")]
modulation = arguments.get("modulation", "FM")
description = arguments.get("description", "")
# Start audio recording
recording_id = await self.audio_recorder.start_recording(
self.sdr.frequency,
self.sdr.sample_rate,
modulation,
description
)
# Start audio recording task
self.active_decoders["audio_recorder"] = asyncio.create_task(
self._audio_recording_task(modulation)
)
return [TextContent(type="text", text=f"Started audio recording ({modulation}): {recording_id}\nSaving to: /tmp/sdr_recordings/{recording_id}.wav")]
elif name == "audio_record_stop":
if "audio_recorder" in self.active_decoders:
self.active_decoders["audio_recorder"].cancel()
del self.active_decoders["audio_recorder"]
metadata = await self.audio_recorder.stop_recording()
result = f"Audio recording stopped:\n"
result += f"- ID: {metadata.get('id', 'N/A')}\n"
result += f"- Duration: {metadata.get('duration', 0):.1f} seconds\n"
result += f"- Audio samples: {metadata.get('samples_recorded', 0):,}\n"
result += f"- Modulation: {metadata.get('modulation', 'N/A')}\n"
result += f"- File: /tmp/sdr_recordings/{metadata.get('id', 'N/A')}.wav"
return [TextContent(type="text", text=result)]
else:
return [TextContent(type="text", text="No audio recording in progress")]
elif name == "hackrf_set_tx_gain":
if not isinstance(self.sdr, HackRFDevice):
return [TextContent(type="text", text="This command requires a HackRF device")]
gain = arguments["gain"]
await self.sdr.set_tx_gain(gain)
return [TextContent(type="text", text=f"Set HackRF TX gain to {gain} dB")]
elif name == "signal_generator":
if not isinstance(self.sdr, HackRFDevice):
return [TextContent(type="text", text="Signal generation requires a HackRF device")]
frequency = arguments["frequency"]
signal_type = arguments["signal_type"]
duration = arguments.get("duration", 1.0)
tone_freq = arguments.get("tone_freq", 1000)
# Safety check
if not self.sdr.validate_tx_safety(frequency):
return [TextContent(type="text", text="Cannot transmit on this frequency (safety restriction)")]
# Generate signal
num_samples = int(self.sdr.sample_rate * duration)
t = np.arange(num_samples) / self.sdr.sample_rate
if signal_type == "cw":
# Continuous wave (carrier only)
signal = np.ones(num_samples, dtype=complex)
elif signal_type == "tone":
# Single tone
signal = np.exp(2j * np.pi * tone_freq * t)
elif signal_type == "noise":
# White noise
signal = (np.random.randn(num_samples) +
1j * np.random.randn(num_samples)) / np.sqrt(2)
elif signal_type == "sweep":
# Frequency sweep
sweep_rate = self.sdr.sample_rate / 4 / duration
phase = 2 * np.pi * sweep_rate * t**2 / 2
signal = np.exp(1j * phase)
else:
return [TextContent(type="text", text=f"Unknown signal type: {signal_type}")]
# Set frequency and transmit
await self.sdr.set_frequency(frequency)
await self.sdr.write_samples(signal * 0.8) # Scale for safety
await asyncio.sleep(duration)
return [TextContent(type="text",
text=f"Transmitted {signal_type} signal at {frequency/1e6:.3f} MHz for {duration} seconds")]
elif name == "ism_start_scanning":
if "rtl433" in self.active_decoders:
return [TextContent(type="text", text="ISM scanning already active")]
# Get frequencies and hop interval
freq_mhz_list = arguments.get("frequencies", [433.92, 315])
hop_interval = arguments.get("hop_interval", 30)
# Convert MHz to Hz
frequencies = [f * 1e6 for f in freq_mhz_list]
# Update decoder settings
self.rtl433_decoder.set_frequencies(frequencies)
self.rtl433_decoder.set_hop_interval(hop_interval)
# Start decoder task
try:
self.active_decoders["rtl433"] = asyncio.create_task(self._rtl433_decoder_task())
await asyncio.sleep(2.0) # Give it time to start
# Check if it failed immediately
if self.active_decoders["rtl433"].done():
try:
await self.active_decoders["rtl433"]
except Exception as e:
del self.active_decoders["rtl433"]
return [TextContent(type="text", text=f"Failed to start rtl_433: {str(e)}\n\nMake sure rtl_433 is installed and RTL-SDR is connected.")]
freq_str = ", ".join([f"{f:.2f} MHz" for f in freq_mhz_list])
return [TextContent(type="text", text=f"Started ISM band scanning with rtl_433\n\nFrequencies: {freq_str}\nHop interval: {hop_interval} seconds\n\nNOTE: Python SDR control is paused while scanning.\nUse ism_stop_scanning to regain SDR control.")]
except Exception as e:
return [TextContent(type="text", text=f"Failed to start ISM scanning: {str(e)}")]
elif name == "ism_stop_scanning":
if "rtl433" in self.active_decoders:
self.active_decoders["rtl433"].cancel()
del self.active_decoders["rtl433"]
return [TextContent(type="text", text="Stopped ISM scanning")]
else:
return [TextContent(type="text", text="ISM scanning not active")]
elif name == "ism_get_devices":
max_age = arguments.get("max_age", 300)
devices = self.rtl433_decoder.get_device_list(max_age_seconds=max_age)
stats = self.rtl433_decoder.get_statistics()
result = f"ISM Band Devices Detected\n"
result += f"=" * 50 + "\n\n"
result += f"Total messages: {stats['total_messages']}\n"
result += f"Unique devices: {stats['total_devices_seen']}\n"
result += f"Active devices: {stats['active_devices']}\n"
result += f"Scanning: {', '.join([f'{f:.2f} MHz' for f in stats['frequencies_MHz']])}\n"
result += f"Hop interval: {stats['hop_interval_seconds']}s\n\n"
if stats['device_types']:
result += "Device types seen:\n"
for device_type, count in stats['device_types'].items():
result += f" • {device_type}: {count}\n"
result += "\n"
if not devices:
result += "No devices detected in the last " + str(max_age) + " seconds.\n"
result += "\nTips:\n"
result += "- Make sure devices are transmitting\n"
result += "- Weather stations typically transmit every 30-60 seconds\n"
result += "- Try waiting longer for more results\n"
else:
result += f"Recently Active Devices ({len(devices)}):\n"
result += "-" * 50 + "\n\n"
for device in devices:
result += self.rtl433_decoder.get_device_summary(device) + "\n"
result += f" Last seen: {device['age_seconds']}s ago\n\n"
return [TextContent(type="text", text=result)]
else:
return [TextContent(type="text", text=f"Unknown tool: {name}")]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
@self.server.list_resources()
async def list_resources() -> List[Resource]:
"""List available resources"""
return [
Resource(
uri="sdr://status",
name="SDR Status",
mimeType="application/json",
description="Current SDR hardware status"
),
Resource(
uri="aviation://aircraft",
name="Tracked Aircraft",
mimeType="application/json",
description="Currently tracked aircraft from ADS-B"
),
Resource(
uri="spectrum://waterfall",
name="Waterfall Data",
mimeType="application/json",
description="Recent waterfall display data"
),
Resource(
uri="scan://results",
name="Scan Results",
mimeType="application/json",
description="Latest frequency scan results"
)
]
@self.server.read_resource()
async def read_resource(uri: str) -> str:
"""Read resource content"""
if uri == "sdr://status":
if not self.sdr:
status = {
"connected": False,
"message": "No SDR connected"
}
else:
status = asdict(SDRStatus(
connected=True,
device_name=self.sdr.device_name,
frequency=self.sdr.frequency,
sample_rate=self.sdr.sample_rate,
gain=self.sdr.gain,
is_capturing=self.sdr.is_capturing,
active_decoders=list(self.active_decoders.keys())
))
return json.dumps(status, indent=2)
elif uri == "aviation://aircraft":
aircraft_data = {
"aircraft": self.adsb_decoder.get_aircraft_list(),
"total_messages": self.adsb_decoder.message_count,
"decoder_active": "adsb" in self.active_decoders
}
return json.dumps(aircraft_data, indent=2, default=str)
elif uri == "spectrum://waterfall":
waterfall_data = self.spectrum_analyzer.get_waterfall_data(50)
data = {
"lines": waterfall_data.tolist() if len(waterfall_data) > 0 else [],
"fft_size": self.spectrum_analyzer.fft_size,
"center_freq": self.sdr.frequency if self.sdr else 0,
"sample_rate": self.sdr.sample_rate if self.sdr else 0
}
return json.dumps(data, indent=2)
elif uri == "scan://results":
scan_data = {
"results": self.frequency_scanner.scan_results,
"summary": self.frequency_scanner.get_activity_summary()
}
return json.dumps(scan_data, indent=2, default=str)
else:
return f"Unknown resource: {uri}"
async def _adsb_decoder_task(self):
"""Background task for ADS-B decoding using rtl_adsb subprocess
NOTE: This temporarily disconnects Python SDR control and gives
exclusive access to rtl_adsb. Other SDR functions won't work while this runs.
Stop tracking to regain SDR control.
"""
logger.info("Starting ADS-B decoder with rtl_adsb subprocess")
# Disconnect Python SDR to free the device
python_sdr_was_connected = self.sdr is not None
if self.sdr:
logger.info("Releasing SDR device for rtl_adsb")
await self.sdr.disconnect()
self.sdr = None
# Force garbage collection and wait for USB release
import gc
gc.collect()
await asyncio.sleep(0.5) # Give OS time to release USB device
logger.info("USB device released")
process = None
try:
# Find rtl_adsb binary
import shutil
import os
rtl_adsb_path = shutil.which('rtl_adsb') or '/opt/homebrew/bin/rtl_adsb'
if not rtl_adsb_path or not os.path.exists(rtl_adsb_path):
logger.error(f"rtl_adsb not found! Checked: {rtl_adsb_path}")
raise FileNotFoundError(f"rtl_adsb not found at {rtl_adsb_path}")
logger.info(f"Found rtl_adsb at: {rtl_adsb_path}")
logger.info("Starting rtl_adsb subprocess...")
# Start rtl_adsb subprocess
process = await asyncio.create_subprocess_exec(
rtl_adsb_path,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
logger.info(f"rtl_adsb started (PID: {process.pid})")
# Monitor stderr for errors
async def check_stderr():
first_lines = []
try:
for _ in range(15):
line = await asyncio.wait_for(process.stderr.readline(), timeout=0.3)
if line:
decoded = line.decode().strip()
first_lines.append(decoded)
logger.info(f"rtl_adsb: {decoded}")
if 'error' in decoded.lower() or 'failed' in decoded.lower():
logger.error(f"rtl_adsb ERROR: {decoded}")
except asyncio.TimeoutError:
pass # No more stderr
return first_lines
logger.info("Reading rtl_adsb startup messages...")
stderr_lines = await check_stderr()
logger.info(f"Got {len(stderr_lines)} stderr lines")
# Check if it started successfully
if any('Failed' in line or 'error -' in line for line in stderr_lines):
error_msg = '\n'.join(stderr_lines)
logger.error(f"rtl_adsb FAILED TO START:\n{error_msg}")
raise RuntimeError(f"rtl_adsb failed: {error_msg}")
msg_count = 0
decode_count = 0
last_log_time = asyncio.get_event_loop().time()
# Read and decode messages
while True:
line = await process.stdout.readline()
if not line:
logger.error("rtl_adsb output ended")
break
msg_line = line.decode().strip()
if msg_line.startswith('*') and msg_line.endswith(';'):
msg_count += 1
msg_hex = msg_line[1:-1] # Remove * and ;
if len(msg_hex) == 28:
decoded = self.adsb_decoder.decode_message(msg_hex)
if decoded:
decode_count += 1
# Log every 10 seconds
current_time = asyncio.get_event_loop().time()
if current_time - last_log_time > 10:
logger.info(f"ADS-B: {msg_count} msgs, {decode_count} decoded, {len(self.adsb_decoder.aircraft)} aircraft")
last_log_time = current_time
except asyncio.CancelledError:
logger.info("ADS-B tracking stopped by user")
raise
except Exception as e:
import traceback
logger.error(f"ADS-B decoder error: {e}")
logger.error(f"Traceback: {traceback.format_exc()}")
raise
finally:
# Cleanup
if process:
process.terminate()
try:
await asyncio.wait_for(process.wait(), timeout=2.0)
except:
process.kill()
# Reconnect Python SDR if it was connected before
if python_sdr_was_connected:
logger.info("Reconnecting Python SDR control")
from .hardware.rtlsdr import RTLSDRDevice
self.sdr = RTLSDRDevice()
await self.sdr.connect()
async def _pocsag_decoder_task(self):
"""Background task for POCSAG pager decoding"""
logger.info("Starting POCSAG decoder task")
try:
while True:
# Read samples
chunk_size = int(self.sdr.sample_rate * 0.5) # 500ms chunks
samples = await self.sdr.read_samples(chunk_size)
# Demodulate FSK
from scipy import signal
# Simple FSK demodulation using frequency discrimination
instantaneous_phase = np.unwrap(np.angle(samples))
instantaneous_frequency = np.diff(instantaneous_phase)
# Low-pass filter
b, a = signal.butter(5, self.pocsag_decoder.baud_rate * 2 / (self.sdr.sample_rate / 2), 'low')
demod = signal.filtfilt(b, a, instantaneous_frequency)
# Convert to bits (simple threshold)
bits = (demod > np.mean(demod)).astype(int)
# Decode POCSAG frames from bit stream
# Look for sync pattern and decode codewords
# This is simplified - real implementation needs proper frame sync
logger.debug(f"POCSAG: processed {len(bits)} bits")
except asyncio.CancelledError:
logger.info("POCSAG decoder task cancelled")
raise
except Exception as e:
logger.error(f"POCSAG decoder error: {e}")
raise
async def _ais_decoder_task(self):
"""Background task for AIS ship tracking"""
logger.info("Starting AIS decoder task")
try:
while True:
# Read samples
chunk_size = int(self.sdr.sample_rate * 0.5) # 500ms chunks
samples = await self.sdr.read_samples(chunk_size)
# Demodulate GMSK (simplified - AIS uses GMSK modulation)
# This is a placeholder - real AIS decoding requires proper GMSK demodulation
from scipy import signal
# FM demodulation
instantaneous_phase = np.unwrap(np.angle(samples))
instantaneous_frequency = np.diff(instantaneous_phase)
# Low-pass filter for 9600 baud
b, a = signal.butter(5, 9600 * 2 / (self.sdr.sample_rate / 2), 'low')
demod = signal.filtfilt(b, a, instantaneous_frequency)
# Convert to bits
bits = (demod > np.mean(demod)).astype(int)
# In real implementation, would decode HDLC frames here
logger.debug(f"AIS: processed {len(bits)} bits")
except asyncio.CancelledError:
logger.info("AIS decoder task cancelled")
raise
except Exception as e:
logger.error(f"AIS decoder error: {e}")
raise
async def _recording_task(self):
"""Background task for recording IQ samples"""
logger.info("Starting recording task")
try:
while True:
# Read samples in chunks
chunk_size = int(self.sdr.sample_rate * 0.1) # 100ms chunks
samples = await self.sdr.read_samples(chunk_size)
# Add to recording
await self.signal_recorder.add_samples(samples)
except asyncio.CancelledError:
logger.info("Recording task cancelled")
raise
async def _audio_recording_task(self, modulation: str = "FM"):
"""Background task for recording demodulated audio"""
logger.info(f"Starting audio recording task ({modulation})")
try:
while True:
# Read samples in chunks
chunk_size = int(self.sdr.sample_rate * 0.1) # 100ms chunks
samples = await self.sdr.read_samples(chunk_size)
# Demodulate and add to audio recording
await self.audio_recorder.add_samples(
samples,
self.sdr.sample_rate,
modulation
)
except asyncio.CancelledError:
logger.info("Audio recording task cancelled")
raise
async def _rtl433_decoder_task(self):
"""Background task for rtl_433 ISM band decoding using rtl_433 subprocess
NOTE: This temporarily disconnects Python SDR control and gives
exclusive access to rtl_433. Other SDR functions won't work while this runs.
Stop scanning to regain SDR control.
"""
logger.info("Starting rtl_433 decoder with subprocess")
# Disconnect Python SDR to free the device
python_sdr_was_connected = self.sdr is not None
if self.sdr:
logger.info("Releasing SDR device for rtl_433")
await self.sdr.disconnect()
self.sdr = None
# Force garbage collection and wait for USB release
import gc
gc.collect()
await asyncio.sleep(0.5) # Give OS time to release USB device
logger.info("USB device released")
process = None
try:
# Find rtl_433 binary
import shutil
import os
rtl_433_path = shutil.which('rtl_433') or '/opt/homebrew/bin/rtl_433'
if not rtl_433_path or not os.path.exists(rtl_433_path):
logger.error(f"rtl_433 not found! Checked: {rtl_433_path}")
raise FileNotFoundError(f"rtl_433 not found at {rtl_433_path}")
logger.info(f"Found rtl_433 at: {rtl_433_path}")
# Build command with frequency hopping
cmd = [rtl_433_path, "-F", "json"]
# Add frequencies
for freq in self.rtl433_decoder.frequencies:
cmd.extend(["-f", f"{int(freq)}"])
# Add hop interval if multiple frequencies
if len(self.rtl433_decoder.frequencies) > 1:
cmd.extend(["-H", str(self.rtl433_decoder.hop_interval)])
logger.info(f"Starting rtl_433 with command: {' '.join(cmd)}")
# Start rtl_433 subprocess
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
logger.info(f"rtl_433 started (PID: {process.pid})")
# Monitor stderr for errors in background
async def check_stderr():
try:
while True:
line = await process.stderr.readline()
if not line:
break
decoded = line.decode().strip()
if decoded:
logger.info(f"rtl_433: {decoded}")
if 'error' in decoded.lower() or 'failed' in decoded.lower():
logger.error(f"rtl_433 ERROR: {decoded}")
except Exception as e:
logger.debug(f"stderr monitoring ended: {e}")
# Start stderr monitoring
asyncio.create_task(check_stderr())
msg_count = 0
last_log_time = asyncio.get_event_loop().time()
# Read and decode JSON messages
while True:
line = await process.stdout.readline()
if not line:
logger.error("rtl_433 output ended")
break
json_line = line.decode().strip()
if json_line:
device = self.rtl433_decoder.parse_message(json_line)
if device:
msg_count += 1
logger.debug(f"Decoded {device.model}: {self.rtl433_decoder.get_device_summary(device)}")
# Log every 30 seconds
current_time = asyncio.get_event_loop().time()
if current_time - last_log_time > 30:
stats = self.rtl433_decoder.get_statistics()
logger.info(f"RTL_433: {msg_count} msgs, {stats['total_devices_seen']} devices, {stats['active_devices']} active")
last_log_time = current_time
except asyncio.CancelledError:
logger.info("RTL_433 scanning stopped by user")
raise
except Exception as e:
import traceback
logger.error(f"RTL_433 decoder error: {e}")
logger.error(f"Traceback: {traceback.format_exc()}")
raise
finally:
# Cleanup
if process:
process.terminate()
try:
await asyncio.wait_for(process.wait(), timeout=2.0)
except:
process.kill()
# Reconnect Python SDR if it was connected before
if python_sdr_was_connected:
logger.info("Reconnecting Python SDR control")
from .hardware.rtlsdr import RTLSDRDevice
self.sdr = RTLSDRDevice()
await self.sdr.connect()
async def run(self):
"""Run the MCP server"""
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
self.server.create_initialization_options()
)
async def main():
"""Main entry point"""
server = SDRMCPServer()
await server.run()
if __name__ == "__main__":
asyncio.run(main())