Skip to main content
Glama
server.py24.2 kB
""" Rekordbox MCP Server A FastMCP-based server for rekordbox database management with real-time database access. """ import asyncio from pathlib import Path from typing import Optional, List, Dict, Any from fastmcp import FastMCP from loguru import logger from pydantic import BaseModel, Field from .database import RekordboxDatabase from .models import Track, Playlist, SearchOptions, HistorySession, HistoryTrack, HistoryStats # Initialize FastMCP server mcp = FastMCP("Rekordbox Database MCP Server") # Global database instance db: Optional[RekordboxDatabase] = None _db_initialized = False class ServerConfig(BaseModel): """Server configuration model.""" database_path: Optional[Path] = Field( default=None, description="Path to rekordbox database directory" ) auto_detect: bool = Field( default=True, description="Automatically detect rekordbox database location" ) backup_enabled: bool = Field( default=True, description="Enable automatic database backups before mutations" ) @mcp.tool() async def search_tracks( query: str = "", artist: Optional[str] = None, title: Optional[str] = None, genre: Optional[str] = None, key: Optional[str] = None, bpm_min: Optional[float] = None, bpm_max: Optional[float] = None, rating_min: Optional[int] = None, limit: int = 50 ) -> List[Dict[str, Any]]: """ Search tracks in the rekordbox database. Args: query: General search query (searches across multiple fields) artist: Filter by artist name title: Filter by track title genre: Filter by genre key: Filter by musical key (e.g., "5A", "12B") bpm_min: Minimum BPM bpm_max: Maximum BPM rating_min: Minimum rating (0-5) limit: Maximum number of results to return Returns: List of matching tracks with metadata """ await ensure_database_connected() search_options = SearchOptions( query=query, artist=artist, title=title, genre=genre, key=key, bpm_min=bpm_min, bpm_max=bpm_max, rating_min=rating_min, limit=limit ) tracks = await db.search_tracks(search_options) return [track.model_dump() for track in tracks] @mcp.tool() async def get_track_details(track_id: str) -> Dict[str, Any]: """ Get detailed information about a specific track. Args: track_id: The unique track identifier Returns: Detailed track information including metadata, cue points, and play history """ if not db: raise RuntimeError("Database not initialized.") track = await db.get_track_by_id(track_id) if not track: raise ValueError(f"Track with ID {track_id} not found") return track.model_dump() @mcp.tool() async def get_tracks_by_key(key: str) -> List[Dict[str, Any]]: """ Get all tracks in a specific musical key. Args: key: Musical key (e.g., "5A", "12B") Returns: List of tracks in the specified key """ if not db: raise RuntimeError("Database not initialized.") search_options = SearchOptions(key=key, limit=1000) tracks = await db.search_tracks(search_options) return [track.model_dump() for track in tracks] @mcp.tool() async def get_tracks_by_bpm_range(bpm_min: float, bpm_max: float) -> List[Dict[str, Any]]: """ Get tracks within a specific BPM range. Args: bpm_min: Minimum BPM bpm_max: Maximum BPM Returns: List of tracks within the BPM range """ if not db: raise RuntimeError("Database not initialized.") search_options = SearchOptions(bpm_min=bpm_min, bpm_max=bpm_max, limit=1000) tracks = await db.search_tracks(search_options) return [track.model_dump() for track in tracks] @mcp.tool() async def get_most_played_tracks(limit: int = 20) -> List[Dict[str, Any]]: """ Get the most played tracks in the library. Args: limit: Maximum number of tracks to return Returns: List of most played tracks """ if not db: raise RuntimeError("Database not initialized.") tracks = await db.get_most_played_tracks(limit) return [track.model_dump() for track in tracks] @mcp.tool() async def get_top_rated_tracks(limit: int = 20) -> List[Dict[str, Any]]: """ Get the highest rated tracks in the library. Args: limit: Maximum number of tracks to return Returns: List of top rated tracks """ if not db: raise RuntimeError("Database not initialized.") tracks = await db.get_top_rated_tracks(limit) return [track.model_dump() for track in tracks] @mcp.tool() async def get_unplayed_tracks(limit: int = 50) -> List[Dict[str, Any]]: """ Get tracks that have never been played. Args: limit: Maximum number of tracks to return Returns: List of unplayed tracks """ if not db: raise RuntimeError("Database not initialized.") tracks = await db.get_unplayed_tracks(limit) return [track.model_dump() for track in tracks] @mcp.tool() async def get_track_file_path(track_id: str) -> Dict[str, str]: """ Get the file system path for a specific track. Args: track_id: The unique track identifier Returns: Dictionary containing file path information """ if not db: raise RuntimeError("Database not initialized.") track = await db.get_track_by_id(track_id) if not track: raise ValueError(f"Track with ID {track_id} not found") return { "track_id": track_id, "file_path": track.file_path or "", "file_name": track.file_path.split("/")[-1] if track.file_path else "" } @mcp.tool() async def search_tracks_by_filename(filename: str) -> List[Dict[str, Any]]: """ Search for tracks by filename. Args: filename: Filename to search for (partial match) Returns: List of tracks matching the filename """ if not db: raise RuntimeError("Database not initialized.") tracks = await db.search_tracks_by_filename(filename) return [track.model_dump() for track in tracks] @mcp.tool() async def analyze_library( group_by: str = "genre", aggregate_by: str = "count", top_n: int = 10 ) -> Dict[str, Any]: """ Analyze library with grouping and aggregation. Args: group_by: Field to group by (genre, key, year, artist, rating) aggregate_by: Aggregation method (count, playCount, totalTime) top_n: Number of top results to return Returns: Analysis results """ if not db: raise RuntimeError("Database not initialized.") analysis = await db.analyze_library(group_by, aggregate_by, top_n) return analysis @mcp.tool() async def validate_track_ids(track_ids: List[str]) -> Dict[str, Any]: """ Validate a list of track IDs and show which are valid/invalid. Args: track_ids: List of track IDs to validate Returns: Validation results with valid and invalid IDs """ if not db: raise RuntimeError("Database not initialized.") validation = await db.validate_track_ids(track_ids) return validation @mcp.tool() async def get_playlists() -> List[Dict[str, Any]]: """ Get all playlists from the rekordbox database. Returns: List of playlists with metadata """ await ensure_database_connected() playlists = await db.get_playlists() return [playlist.model_dump() for playlist in playlists] @mcp.tool() async def get_playlist_tracks(playlist_id: str) -> List[Dict[str, Any]]: """ Get all tracks in a specific playlist. Args: playlist_id: The unique playlist identifier Returns: List of tracks in the playlist """ if not db: raise RuntimeError("Database not initialized.") tracks = await db.get_playlist_tracks(playlist_id) return [track.model_dump() for track in tracks] @mcp.tool() async def get_library_stats() -> Dict[str, Any]: """ Get comprehensive library statistics. Returns: Dictionary containing various library statistics """ if not db: raise RuntimeError("Database not initialized.") stats = await db.get_library_stats() return stats @mcp.tool() async def connect_database(database_path: Optional[str] = None) -> Dict[str, str]: """ Connect to the rekordbox database. Args: database_path: Optional path to database directory. If not provided, auto-detection is used. Returns: Connection status message """ global db try: db = RekordboxDatabase() path = Path(database_path) if database_path else None await db.connect(database_path=path) return { "status": "success", "message": f"Connected to rekordbox database at {db.database_path}", "total_tracks": str(await db.get_track_count()) } except Exception as e: logger.error(f"Failed to connect to database: {e}") return {"status": "error", "message": f"Failed to connect: {str(e)}"} @mcp.tool() async def get_history_sessions( include_folders: bool = False, limit: int = 100 ) -> List[Dict[str, Any]]: """ Get DJ history sessions from rekordbox. Args: include_folders: Whether to include folder entries (years/months) limit: Maximum number of sessions to return Returns: List of history sessions with metadata """ await ensure_database_connected() sessions = await db.get_history_sessions(include_folders=include_folders) # Sort by date created, most recent first sessions.sort(key=lambda x: x.date_created or "", reverse=True) return [session.model_dump() for session in sessions[:limit]] @mcp.tool() async def get_session_tracks(session_id: str) -> List[Dict[str, Any]]: """ Get all tracks from a specific DJ history session. Args: session_id: The session's unique identifier Returns: List of tracks in the session with performance context """ await ensure_database_connected() tracks = await db.get_session_tracks(session_id) return [track.model_dump() for track in tracks] @mcp.tool() async def get_recent_sessions(days: int = 30) -> List[Dict[str, Any]]: """ Get recent DJ history sessions within the specified number of days. Args: days: Number of days to look back (default: 30) Returns: List of recent history sessions """ await ensure_database_connected() from datetime import datetime, timedelta cutoff_date = datetime.now() - timedelta(days=days) cutoff_str = cutoff_date.strftime("%Y-%m-%d") sessions = await db.get_history_sessions(include_folders=False) # Filter by date recent_sessions = [ s for s in sessions if s.date_created and s.date_created >= cutoff_str ] # Sort by date, most recent first recent_sessions.sort(key=lambda x: x.date_created or "", reverse=True) return [session.model_dump() for session in recent_sessions] @mcp.tool() async def get_history_stats() -> Dict[str, Any]: """ Get comprehensive statistics about DJ history sessions. Returns: Statistics about all history sessions including totals and trends """ await ensure_database_connected() stats = await db.get_history_stats() return stats.model_dump() @mcp.tool() async def search_history_sessions( query: str = "", year: Optional[str] = None, month: Optional[str] = None, min_tracks: Optional[int] = None, limit: int = 50 ) -> List[Dict[str, Any]]: """ Search DJ history sessions with various filters. Args: query: Search query for session names year: Filter by year (e.g., "2025") month: Filter by month (e.g., "08" for August) min_tracks: Minimum number of tracks in session limit: Maximum number of results Returns: List of matching history sessions """ await ensure_database_connected() sessions = await db.get_history_sessions(include_folders=False) # Apply filters filtered_sessions = [] for session in sessions: # Text search if query and query.lower() not in session.name.lower(): continue # Date filters if session.date_created: if year and not session.date_created.startswith(year): continue if month and year: month_str = f"{year}-{month.zfill(2)}" if not session.date_created.startswith(month_str): continue elif year or month: # Skip if date filters specified but no date available continue # Track count filter if min_tracks and session.track_count < min_tracks: continue filtered_sessions.append(session) # Sort by date, most recent first filtered_sessions.sort(key=lambda x: x.date_created or "", reverse=True) return [session.model_dump() for session in filtered_sessions[:limit]] # Playlist Mutation Tools (with safety annotations) @mcp.tool( annotations={ "readOnlyHint": False, "destructiveHint": False, "idempotentHint": False } ) async def create_playlist( name: str, parent_id: Optional[str] = None ) -> Dict[str, Any]: """ Create a new playlist in rekordbox. ⚠️ CAUTION: This modifies your rekordbox database! Args: name: Name for the new playlist parent_id: Optional parent folder ID (omit for root level) Returns: Information about the created playlist """ await ensure_database_connected() if not name.strip(): raise ValueError("Playlist name cannot be empty") try: playlist_id = await db.create_playlist(name.strip(), parent_id) return { "status": "success", "message": f"Created playlist '{name}'", "playlist_id": playlist_id, "playlist_name": name } except Exception as e: return { "status": "error", "message": f"Failed to create playlist: {str(e)}" } @mcp.tool( annotations={ "readOnlyHint": False, "destructiveHint": False, "idempotentHint": True } ) async def add_tracks_to_playlist( playlist_id: str, track_ids: List[str] ) -> Dict[str, Any]: """ Add multiple tracks to an existing playlist in one operation. ⚠️ CAUTION: This modifies your rekordbox database! Args: playlist_id: ID of the playlist to modify track_ids: List of track IDs to add Returns: Detailed results of the batch operation """ await ensure_database_connected() try: results = await db.add_tracks_to_playlist(playlist_id, track_ids) return { "status": "success", "message": f"Batch add completed: {len(results['added'])} added, {len(results['skipped'])} skipped, {len(results['failed'])} failed", "playlist_id": playlist_id, "summary": { "added_count": len(results['added']), "skipped_count": len(results['skipped']), "failed_count": len(results['failed']) }, "details": results } except Exception as e: logger.error(f"Failed to add tracks to playlist: {e}") return { "status": "error", "message": f"Failed to add tracks to playlist: {str(e)}" } @mcp.tool( annotations={ "readOnlyHint": False, "destructiveHint": False, "idempotentHint": True } ) async def add_track_to_playlist( playlist_id: str, track_id: str ) -> Dict[str, Any]: """ Add a track to an existing playlist. ⚠️ CAUTION: This modifies your rekordbox database! Args: playlist_id: ID of the playlist to modify track_id: ID of the track to add Returns: Result of the operation """ await ensure_database_connected() try: success = await db.add_track_to_playlist(playlist_id, track_id) if success: return { "status": "success", "message": f"Added track {track_id} to playlist {playlist_id}", "playlist_id": playlist_id, "track_id": track_id } else: return { "status": "error", "message": "Failed to add track to playlist" } except Exception as e: return { "status": "error", "message": f"Failed to add track to playlist: {str(e)}" } @mcp.tool( annotations={ "readOnlyHint": False, "destructiveHint": False, "idempotentHint": True } ) async def remove_track_from_playlist( playlist_id: str, track_id: str ) -> Dict[str, Any]: """ Remove a track from a playlist. ⚠️ CAUTION: This modifies your rekordbox database! Args: playlist_id: ID of the playlist to modify track_id: ID of the track to remove Returns: Result of the operation """ await ensure_database_connected() try: success = await db.remove_track_from_playlist(playlist_id, track_id) if success: return { "status": "success", "message": f"Removed track {track_id} from playlist {playlist_id}", "playlist_id": playlist_id, "track_id": track_id } else: return { "status": "error", "message": "Failed to remove track from playlist" } except Exception as e: return { "status": "error", "message": f"Failed to remove track from playlist: {str(e)}" } @mcp.tool( annotations={ "readOnlyHint": False, "destructiveHint": True, "idempotentHint": True } ) async def delete_playlist(playlist_id: str) -> Dict[str, Any]: """ Delete a playlist from rekordbox. ⚠️ DANGER: This permanently deletes a playlist and cannot be undone! Args: playlist_id: ID of the playlist to delete Returns: Result of the operation """ await ensure_database_connected() try: # Get playlist info before deletion for confirmation playlists = await db.get_playlists() target_playlist = next((p for p in playlists if p.id == playlist_id), None) if not target_playlist: return { "status": "error", "message": f"Playlist {playlist_id} not found" } # Prevent deletion of smart playlists for safety if target_playlist.is_smart_playlist: return { "status": "error", "message": "Cannot delete smart playlists - they are managed by rekordbox" } success = await db.delete_playlist(playlist_id) if success: return { "status": "success", "message": f"Deleted playlist '{target_playlist.name}' ({playlist_id})", "deleted_playlist": { "id": playlist_id, "name": target_playlist.name, "track_count": target_playlist.track_count } } else: return { "status": "error", "message": "Failed to delete playlist" } except Exception as e: return { "status": "error", "message": f"Failed to delete playlist: {str(e)}" } @mcp.resource("file://database-status") async def database_status() -> str: """Get the current database connection status.""" if not db: return "Database not connected. Use connect_database tool to establish connection." if await db.is_connected(): track_count = await db.get_track_count() return f"Connected to rekordbox database at {db.database_path}. Total tracks: {track_count}" else: return "Database connection lost. Please reconnect." async def ensure_database_connected(): """Ensure database is connected, initialize if not.""" global db, _db_initialized if _db_initialized and db and await db.is_connected(): return if not _db_initialized: logger.info("Initializing database connection...") try: db = RekordboxDatabase() await db.connect() track_count = await db.get_track_count() playlist_count = len(await db.get_playlists()) logger.success(f"✅ Connected to rekordbox database!") logger.info(f"📊 Database contains {track_count} tracks and {playlist_count} playlists") _db_initialized = True except Exception as e: logger.error(f"❌ Failed to connect to rekordbox database: {e}") logger.error("🔧 Please ensure:") logger.error(" - Rekordbox is closed") logger.error(" - Database key is available (run: uv run python -m pyrekordbox download-key)") logger.error(" - Database path is accessible") raise RuntimeError(f"Database initialization failed: {str(e)}") elif db and not await db.is_connected(): # Reconnect if connection was lost await db.connect() def main(): """Main entry point for the MCP server.""" import sys import signal import asyncio # Configure logging logger.remove() logger.add( sink=lambda msg: print(msg, end=""), format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | {message}", level="INFO" ) # Setup signal handlers for graceful shutdown def signal_handler(signum, frame): logger.info(f"Received signal {signum}, shutting down gracefully...") # Cleanup database connection global db if db: try: # Use asyncio to call the async disconnect method if hasattr(asyncio, '_get_running_loop'): try: loop = asyncio.get_running_loop() loop.create_task(db.disconnect()) except RuntimeError: # No running loop, create one asyncio.run(db.disconnect()) else: asyncio.run(db.disconnect()) except Exception as e: logger.warning(f"Error during database cleanup: {e}") sys.exit(0) # Register signal handlers signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) try: # Run the FastMCP server (database will be initialized on first tool call) mcp.run() except KeyboardInterrupt: logger.info("Interrupted by user") except Exception as e: logger.error(f"Server error: {e}") finally: # Final cleanup if db: try: asyncio.run(db.disconnect()) except Exception as e: logger.warning(f"Error during final cleanup: {e}") if __name__ == "__main__": main()

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/davehenke/rekordbox-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server