"""FastMCP server for DX Cluster integration with Auth0 authentication."""
import os
import logging
from typing import Optional
from contextlib import asynccontextmanager
from fastmcp import FastMCP
from fastmcp.server.auth.providers.auth0 import Auth0Provider
from dotenv import load_dotenv
from .dx_cluster import DXClusterClient
from .analysis import SpotAnalyzer
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
level=os.getenv("LOG_LEVEL", "INFO"),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Global DX Cluster client
dx_client: Optional[DXClusterClient] = None
def get_dx_client() -> DXClusterClient:
"""Get or create DX Cluster client instance.
Returns:
DXClusterClient instance
"""
global dx_client
if dx_client is None:
host = os.getenv("DX_CLUSTER_HOST", "dxspider.net")
port = int(os.getenv("DX_CLUSTER_PORT", "7300"))
callsign = os.getenv("DX_CLUSTER_CALLSIGN", "")
if not callsign:
raise ValueError(
"DX_CLUSTER_CALLSIGN environment variable must be set"
)
dx_client = DXClusterClient(host, port, callsign)
logger.info(f"Created DX Cluster client for {callsign}@{host}:{port}")
return dx_client
# Initialize Auth0 provider if credentials are provided
auth_provider = None
auth0_domain = os.getenv("AUTH0_DOMAIN")
auth0_client_id = os.getenv("AUTH0_CLIENT_ID")
auth0_client_secret = os.getenv("AUTH0_CLIENT_SECRET")
auth0_audience = os.getenv("AUTH0_AUDIENCE")
server_base_url = os.getenv("SERVER_BASE_URL", "http://localhost:8000")
if auth0_domain and auth0_client_id and auth0_client_secret and auth0_audience:
logger.info("Configuring Auth0 authentication")
# Construct the OIDC config URL
config_url = f"https://{auth0_domain}/.well-known/openid-configuration"
auth_provider = Auth0Provider(
config_url=config_url,
client_id=auth0_client_id,
client_secret=auth0_client_secret,
audience=auth0_audience,
base_url=server_base_url
)
else:
logger.warning(
"Auth0 credentials not found - server will run without authentication. "
"Set AUTH0_DOMAIN, AUTH0_CLIENT_ID, AUTH0_CLIENT_SECRET, and AUTH0_AUDIENCE to enable."
)
# Create FastMCP server
mcp = FastMCP(
"DX Cluster Server",
version="1.0.0",
auth=auth_provider
)
@mcp.tool()
async def read_spots(count: int = 10) -> dict:
"""Read recent DX spots from the cluster.
Args:
count: Number of spots to retrieve (default: 10, max: 100)
Returns:
Dictionary containing list of spots and metadata
"""
try:
# Validate count
if count < 1 or count > 100:
return {
"error": "Count must be between 1 and 100",
"spots": []
}
client = get_dx_client()
# Ensure connected
if not client._connected:
connected = await client.connect()
if not connected:
return {
"error": "Failed to connect to DX Cluster",
"spots": []
}
# Fetch spots
spots = await client.get_spots(count)
return {
"success": True,
"count": len(spots),
"spots": [spot.to_dict() for spot in spots],
"cluster": {
"host": client.host,
"port": client.port,
"callsign": client.callsign
}
}
except Exception as e:
logger.error(f"Error reading spots: {e}")
return {
"error": str(e),
"spots": []
}
@mcp.tool()
async def create_spot(
dx_callsign: str,
frequency: float,
comment: str = ""
) -> dict:
"""Post a new DX spot to the cluster.
Args:
dx_callsign: Callsign of the DX station being spotted
frequency: Frequency in kHz (e.g., 14025.0 for 14.025 MHz)
comment: Optional comment about the spot (e.g., "CQ", "Loud signal")
Returns:
Dictionary containing success status and confirmation
"""
try:
# Validate inputs
if not dx_callsign or len(dx_callsign) < 3:
return {
"success": False,
"error": "Invalid callsign"
}
if frequency < 1800 or frequency > 250000:
return {
"success": False,
"error": "Frequency must be between 1800 and 250000 kHz"
}
client = get_dx_client()
# Ensure connected
if not client._connected:
connected = await client.connect()
if not connected:
return {
"success": False,
"error": "Failed to connect to DX Cluster"
}
# Post spot
success = await client.post_spot(dx_callsign, frequency, comment)
if success:
return {
"success": True,
"message": f"Spot posted: {dx_callsign} on {frequency} kHz",
"spot": {
"dx_callsign": dx_callsign,
"frequency": frequency,
"comment": comment,
"spotter": client.callsign
}
}
else:
return {
"success": False,
"error": "Failed to post spot - cluster may have rejected it"
}
except Exception as e:
logger.error(f"Error creating spot: {e}")
return {
"success": False,
"error": str(e)
}
@mcp.tool()
async def analyze_spots(
spot_count: int = 50,
include_rare_dx: bool = True
) -> dict:
"""Perform comprehensive analysis of recent DX spots.
This tool analyzes recent spots and provides:
- Band activity statistics (which bands are most active)
- DX entity statistics (most spotted countries)
- Time-based propagation trends (activity by hour)
- Rare DX alerts (uncommon entities)
Args:
spot_count: Number of recent spots to analyze (default: 50, max: 200)
include_rare_dx: Whether to include rare DX alerts in analysis
Returns:
Dictionary containing comprehensive analysis results
"""
try:
# Validate spot_count
if spot_count < 10 or spot_count > 200:
return {
"error": "Spot count must be between 10 and 200"
}
client = get_dx_client()
# Ensure connected
if not client._connected:
connected = await client.connect()
if not connected:
return {
"error": "Failed to connect to DX Cluster"
}
# Fetch spots
spots = await client.get_spots(spot_count)
if not spots:
return {
"error": "No spots available for analysis"
}
# Perform analysis
analyzer = SpotAnalyzer(spots)
report = analyzer.generate_comprehensive_report()
# Optionally exclude rare DX
if not include_rare_dx and 'rare_dx_alerts' in report:
del report['rare_dx_alerts']
return {
"success": True,
"analysis": report
}
except Exception as e:
logger.error(f"Error analyzing spots: {e}")
return {
"error": str(e)
}
@mcp.tool()
async def get_band_activity(spot_count: int = 50) -> dict:
"""Get current band activity statistics.
Analyzes recent spots to show which amateur radio bands are most active,
including spot counts, unique DX entities per band, and activity rates.
Args:
spot_count: Number of recent spots to analyze (default: 50)
Returns:
Dictionary containing band activity statistics
"""
try:
client = get_dx_client()
# Ensure connected
if not client._connected:
connected = await client.connect()
if not connected:
return {
"error": "Failed to connect to DX Cluster"
}
# Fetch spots
spots = await client.get_spots(spot_count)
if not spots:
return {
"error": "No spots available"
}
# Analyze band activity
analyzer = SpotAnalyzer(spots)
band_stats = analyzer.analyze_band_activity()
return {
"success": True,
"analyzed_spots": len(spots),
"bands": band_stats
}
except Exception as e:
logger.error(f"Error getting band activity: {e}")
return {
"error": str(e)
}
@mcp.tool()
async def cluster_info() -> dict:
"""Get information about the current DX Cluster connection.
Returns:
Dictionary containing cluster connection details
"""
try:
client = get_dx_client()
return {
"cluster": {
"host": client.host,
"port": client.port,
"callsign": client.callsign,
"connected": client._connected
},
"server": {
"version": "1.0.0",
"auth_enabled": auth_provider is not None
}
}
except Exception as e:
logger.error(f"Error getting cluster info: {e}")
return {
"error": str(e)
}
# Server lifecycle management
@asynccontextmanager
async def lifespan(app):
"""Manage server lifecycle."""
logger.info("Starting DX Cluster MCP Server")
yield
# Cleanup
logger.info("Shutting down DX Cluster MCP Server")
global dx_client
if dx_client and dx_client._connected:
await dx_client.disconnect()
def main():
"""Run the FastMCP server."""
host = os.getenv("SERVER_HOST", "0.0.0.0")
port = int(os.getenv("SERVER_PORT", "8000"))
logger.info(f"Starting server on {host}:{port}")
logger.info(f"Authentication: {'Enabled (Auth0)' if auth_provider else 'Disabled'}")
# Run with uvicorn
mcp.run(
transport="sse",
host=host,
port=port
)
if __name__ == "__main__":
main()