"""DX Cluster connection handler for telnet-based cluster servers."""
import asyncio
import logging
import re
from datetime import datetime
from typing import Optional, List, Dict, Any
from dataclasses import dataclass, asdict
import telnetlib3
logger = logging.getLogger(__name__)
@dataclass
class DXSpot:
"""Represents a DX spot from the cluster."""
dx_callsign: str
frequency: float
spotter: str
timestamp: datetime
comment: str
band: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert spot to dictionary."""
data = asdict(self)
data['timestamp'] = self.timestamp.isoformat()
return data
class DXClusterClient:
"""Async telnet client for DX Cluster servers."""
def __init__(self, host: str, port: int, callsign: str):
"""Initialize DX Cluster client.
Args:
host: Cluster server hostname
port: Cluster server port (typically 7300)
callsign: Your amateur radio callsign
"""
self.host = host
self.port = port
self.callsign = callsign
self.reader: Optional[telnetlib3.TelnetReader] = None
self.writer: Optional[telnetlib3.TelnetWriter] = None
self._connected = False
self._spots_cache: List[DXSpot] = []
self._lock = asyncio.Lock()
# Regex pattern for parsing DX spots (DXSpider format)
# Example: DX de K1TTT: 14025.0 W1AW CQ 1200Z
self.spot_pattern = re.compile(
r'DX de ([A-Z0-9/]+):\s+(\d+\.\d+)\s+([A-Z0-9/]+)\s+(.+?)\s+(\d{4})Z'
)
async def connect(self) -> bool:
"""Connect to the DX Cluster server.
Returns:
True if connection successful, False otherwise
"""
try:
logger.info(f"Connecting to {self.host}:{self.port}")
self.reader, self.writer = await asyncio.wait_for(
telnetlib3.open_connection(self.host, self.port),
timeout=10.0
)
# Wait for login prompt
await self._read_until("login:", timeout=5.0)
# Send callsign
self.writer.write(f"{self.callsign}\r\n")
await self.writer.drain()
# Wait for welcome message
await asyncio.sleep(2)
self._connected = True
logger.info(f"Connected to {self.host} as {self.callsign}")
return True
except Exception as e:
logger.error(f"Connection failed: {e}")
self._connected = False
return False
async def disconnect(self):
"""Disconnect from the DX Cluster server."""
if self.writer:
self.writer.write("bye\r\n")
await self.writer.drain()
self.writer.close()
await self.writer.wait_closed()
self._connected = False
logger.info("Disconnected from cluster")
async def _read_until(self, marker: str, timeout: float = 30.0) -> str:
"""Read from cluster until marker is found.
Args:
marker: String to search for
timeout: Timeout in seconds
Returns:
Data read until marker
"""
buffer = ""
try:
while True:
data = await asyncio.wait_for(
self.reader.read(1024),
timeout=timeout
)
if not data:
break
buffer += data
if marker.lower() in buffer.lower():
break
except asyncio.TimeoutError:
logger.warning(f"Timeout waiting for '{marker}'")
return buffer
async def send_command(self, command: str) -> str:
"""Send a command to the cluster and get response.
Args:
command: Command to send
Returns:
Response from cluster
"""
if not self._connected:
await self.connect()
async with self._lock:
self.writer.write(f"{command}\r\n")
await self.writer.drain()
# Wait a bit for response
await asyncio.sleep(1)
# Read available data
response = ""
try:
while True:
data = await asyncio.wait_for(
self.reader.read(4096),
timeout=0.5
)
if not data:
break
response += data
except asyncio.TimeoutError:
pass
return response
def _parse_spot(self, line: str) -> Optional[DXSpot]:
"""Parse a DX spot line.
Args:
line: Raw spot line from cluster
Returns:
DXSpot object or None if parsing fails
"""
match = self.spot_pattern.search(line)
if not match:
return None
spotter, freq_str, dx_call, comment, time_str = match.groups()
# Parse frequency
try:
frequency = float(freq_str)
except ValueError:
return None
# Determine band from frequency
band = self._frequency_to_band(frequency)
# Parse time (HHMM format)
try:
now = datetime.utcnow()
hour = int(time_str[:2])
minute = int(time_str[2:])
timestamp = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
except (ValueError, IndexError):
timestamp = datetime.utcnow()
return DXSpot(
dx_callsign=dx_call,
frequency=frequency,
spotter=spotter,
timestamp=timestamp,
comment=comment.strip(),
band=band
)
def _frequency_to_band(self, freq_khz: float) -> str:
"""Convert frequency to ham radio band.
Args:
freq_khz: Frequency in kHz
Returns:
Band name (e.g., "20m", "40m")
"""
if 1800 <= freq_khz < 2000:
return "160m"
elif 3500 <= freq_khz < 4000:
return "80m"
elif 5330 <= freq_khz < 5405:
return "60m"
elif 7000 <= freq_khz < 7300:
return "40m"
elif 10100 <= freq_khz < 10150:
return "30m"
elif 14000 <= freq_khz < 14350:
return "20m"
elif 18068 <= freq_khz < 18168:
return "17m"
elif 21000 <= freq_khz < 21450:
return "15m"
elif 24890 <= freq_khz < 24990:
return "12m"
elif 28000 <= freq_khz < 29700:
return "10m"
elif 50000 <= freq_khz < 54000:
return "6m"
elif 144000 <= freq_khz < 148000:
return "2m"
else:
return "Unknown"
async def get_spots(self, count: int = 10) -> List[DXSpot]:
"""Get recent DX spots from the cluster.
Args:
count: Number of spots to retrieve (default 10)
Returns:
List of DXSpot objects
"""
# Send command to show recent spots
response = await self.send_command(f"show/dx {count}")
# Parse spots from response
spots = []
for line in response.split('\n'):
spot = self._parse_spot(line)
if spot:
spots.append(spot)
# Update cache
self._spots_cache = spots
return spots
async def post_spot(
self,
dx_callsign: str,
frequency: float,
comment: str = ""
) -> bool:
"""Post a DX spot to the cluster.
Args:
dx_callsign: Callsign of the DX station
frequency: Frequency in kHz
comment: Optional comment
Returns:
True if successful, False otherwise
"""
try:
# Format: DX frequency callsign comment
cmd = f"DX {frequency} {dx_callsign}"
if comment:
cmd += f" {comment}"
response = await self.send_command(cmd)
# Check if spot was accepted
# Most clusters respond with confirmation
success = "spot" in response.lower() or len(response) > 0
if success:
logger.info(f"Posted spot: {dx_callsign} on {frequency} kHz")
else:
logger.warning(f"Spot may not have been posted: {response}")
return success
except Exception as e:
logger.error(f"Failed to post spot: {e}")
return False
def get_cached_spots(self) -> List[DXSpot]:
"""Get cached spots without querying cluster.
Returns:
List of cached DXSpot objects
"""
return self._spots_cache.copy()