add_tracks_to_playlist
Add multiple tracks to an existing rekordbox playlist in one batch operation to modify your DJ database efficiently.
Instructions
Add multiple tracks to an existing playlist in one operation.
⚠️ CAUTION: This modifies your rekordbox database!
Args: playlist_id: ID of the playlist to modify track_ids: List of track IDs to add
Returns: Detailed results of the batch operation
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| playlist_id | Yes | ||
| track_ids | Yes |
Implementation Reference
- rekordbox_mcp/server.py:549-595 (handler)Primary MCP tool handler and registration for 'add_tracks_to_playlist'. Handles tool invocation, input validation via type hints/Pydantic, database connection check, delegates to RekordboxDatabase implementation, and returns formatted response with results summary.@mcp.tool( annotations={ "readOnlyHint": False, "destructiveHint": False, "idempotentHint": True } ) async def add_tracks_to_playlist( playlist_id: str, track_ids: List[str] ) -> Dict[str, Any]: """ Add multiple tracks to an existing playlist in one operation. ⚠️ CAUTION: This modifies your rekordbox database! Args: playlist_id: ID of the playlist to modify track_ids: List of track IDs to add Returns: Detailed results of the batch operation """ await ensure_database_connected() try: results = await db.add_tracks_to_playlist(playlist_id, track_ids) return { "status": "success", "message": f"Batch add completed: {len(results['added'])} added, {len(results['skipped'])} skipped, {len(results['failed'])} failed", "playlist_id": playlist_id, "summary": { "added_count": len(results['added']), "skipped_count": len(results['skipped']), "failed_count": len(results['failed']) }, "details": results } except Exception as e: logger.error(f"Failed to add tracks to playlist: {e}") return { "status": "error", "message": f"Failed to add tracks to playlist: {str(e)}" }
- rekordbox_mcp/database.py:765-816 (handler)Core database implementation of add_tracks_to_playlist in RekordboxDatabase class. Creates backup, iterates over track IDs, calls pyrekordbox's add_to_playlist for each, commits transaction, tracks added/failed, handles rollback on error.async def add_tracks_to_playlist(self, playlist_id: str, track_ids: List[str]) -> Dict[str, Any]: """ Add multiple tracks to a playlist. Args: playlist_id: ID of the playlist to modify track_ids: List of track IDs to add Returns: Dictionary with success/failure details """ if not self.db: raise RuntimeError("Database not connected") try: # Create backup before mutation await self._create_backup() results = { "added": [], "failed": [], "skipped": [] } playlist_int_id = int(playlist_id) for track_id in track_ids: try: track_int_id = int(track_id) # Use the same method as the working single-track function self.db.add_to_playlist(playlist_int_id, track_int_id) results["added"].append(track_id) logger.info(f"Added track {track_id} to playlist {playlist_id}") except Exception as e: results["failed"].append({"track_id": track_id, "reason": str(e)}) logger.warning(f"Failed to add track {track_id}: {e}") # Commit all changes self.db.commit() logger.info(f"Batch add to playlist {playlist_id}: {len(results['added'])} added, {len(results['failed'])} failed") return results except Exception as e: logger.error(f"Failed to add tracks to playlist {playlist_id}: {e}") # Rollback on error if hasattr(self.db, 'rollback'): self.db.rollback() raise RuntimeError(f"Failed to add tracks to playlist: {str(e)}")
- rekordbox_mcp/database.py:926-963 (helper)Supporting helper method _create_backup called before mutations like add_tracks_to_playlist to create timestamped backup of the master.db file for safety.async def _create_backup(self) -> None: """ Create a backup of the database before performing mutations. """ if not self.database_path: return try: import shutil timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # Try different database file patterns possible_files = [ self.database_path / "master.db", self.database_path / "rekordbox" / "master.db", *list(self.database_path.glob("**/master.db")), *list(self.database_path.glob("**/*.db")) ] db_file = None for file_path in possible_files: if file_path.exists() and file_path.is_file(): db_file = file_path break if db_file: backup_path = self.database_path / f"master_backup_{timestamp}.db" shutil.copy2(db_file, backup_path) logger.info(f"Database backup created: {backup_path}") else: # List available files for debugging all_files = list(self.database_path.rglob("*")) db_files = [f for f in all_files if f.suffix == '.db'] logger.warning(f"No database file found for backup. Available .db files: {db_files}") except Exception as e: logger.warning(f"Failed to create database backup: {e}")