Spotify MCP Server
by jamiew
Verified
- spotify-mcp
- src
- spotify_mcp
import logging
import os
from typing import Optional, Dict, List, Any
import spotipy
from dotenv import load_dotenv
from spotipy.cache_handler import CacheFileHandler
from spotipy.oauth2 import SpotifyOAuth
from . import utils
load_dotenv()
CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID")
CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET")
REDIRECT_URI = os.getenv("SPOTIFY_REDIRECT_URI")
# Define all required scopes
SCOPES = [
# Playback
"user-read-currently-playing",
"user-read-playback-state",
"user-modify-playback-state",
"app-remote-control",
"streaming",
# Playlists
"playlist-read-private",
"playlist-read-collaborative",
"playlist-modify-private",
"playlist-modify-public",
# Library
"user-library-read",
"user-library-modify",
# History
"user-read-playback-position",
"user-top-read",
"user-read-recently-played",
]
class Client:
sp: spotipy.Spotify
auth_manager: SpotifyOAuth
cache_handler: CacheFileHandler
logger: logging.Logger
def __init__(self, logger: logging.Logger):
"""Initialize Spotify client with necessary permissions"""
self.logger = logger
# Use all defined scopes
scope = ",".join(SCOPES)
self.logger.info(f"Initializing Spotify client with scopes: {scope}")
try:
auth_manager = SpotifyOAuth(
scope=scope,
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
redirect_uri=REDIRECT_URI)
self.sp = spotipy.Spotify(auth_manager=auth_manager)
self.auth_manager = auth_manager
self.cache_handler = auth_manager.cache_handler
self.logger.info("Successfully initialized Spotify client")
except Exception as e:
self.logger.error(f"Failed to initialize Spotify client: {str(e)}", exc_info=True)
raise
def search(self, query: str, qtype: str = 'track', limit: int = 10) -> Dict[str, List[Dict[str, Any]]]:
"""
Searches based of query term.
- query: query term
- qtype: the types of items to return. One or more of 'artist', 'album', 'track', 'playlist'.
If multiple types are desired, pass in a comma separated string; e.g. 'track,album'
- limit: max # items to return
"""
results = self.sp.search(q=query, limit=limit, type=qtype)
search_results = utils.parse_search_results(results, qtype)
return search_results if search_results else {}
def recommendations(self, artists: Optional[List[str]] = None, tracks: Optional[List[str]] = None,
limit: int = 20) -> Dict[str, Any]:
recs = self.sp.recommendations(seed_artists=artists, seed_tracks=tracks, limit=limit)
return recs if recs else {}
def get_info(self, item_id: str, qtype: str = 'track') -> Dict[str, Any]:
"""
Returns more info about item.
- item_id: id.
- qtype: Either 'track', 'album', 'artist', or 'playlist'.
"""
match qtype:
case 'track':
track_info = utils.parse_track(self.sp.track(item_id), detailed=True)
return track_info if track_info else {}
case 'album':
album_info = utils.parse_album(self.sp.album(item_id), detailed=True)
return album_info if album_info else {}
case 'artist':
artist_info = utils.parse_artist(self.sp.artist(item_id), detailed=True)
if not artist_info:
return {}
albums = self.sp.artist_albums(item_id)
top_tracks_response = self.sp.artist_top_tracks(item_id)
if not top_tracks_response:
return artist_info
albums_and_tracks = {
'albums': albums,
'tracks': {'items': top_tracks_response.get('tracks', [])}
}
parsed_info = utils.parse_search_results(albums_and_tracks, qtype="album,track")
artist_info['top_tracks'] = parsed_info.get('tracks', [])
artist_info['albums'] = parsed_info.get('albums', [])
return artist_info
case 'playlist':
playlist = self.sp.playlist(item_id)
playlist_info = utils.parse_playlist(playlist, detailed=True)
return playlist_info if playlist_info else {}
raise ValueError(f"unknown qtype {qtype}")
def get_current_track(self) -> Optional[Dict[str, Any]]:
"""Get information about the currently playing track"""
try:
current = self.sp.current_user_playing_track()
if not current:
self.logger.info("No playback session found")
return None
if current.get('currently_playing_type') != 'track':
self.logger.info("Current playback is not a track")
return None
track_info = utils.parse_track(current.get('item'))
if not track_info:
return None
if 'is_playing' in current:
track_info['is_playing'] = current['is_playing']
self.logger.info(
f"Current track: {track_info.get('name', 'Unknown')} by {track_info.get('artist', 'Unknown')}")
return track_info
except Exception as e:
self.logger.error("Error getting current track info", exc_info=True)
raise
@utils.validate
def start_playback(self, track_id: Optional[str] = None,
device: Optional[Dict[str, Any]] = None) -> Optional[Dict[str, Any]]:
"""
Starts track playback. If track_id is omitted, resumes current playback.
- track_id: ID of track to play, or None.
"""
try:
if not track_id:
if self.is_track_playing():
self.logger.info("No track_id provided and playback already active.")
return None
if not self.get_current_track():
raise ValueError("No track_id provided and no current playback to resume.")
uris = [f'spotify:track:{track_id}'] if track_id else None
device_id = device.get('id') if device else None
result = self.sp.start_playback(uris=uris, device_id=device_id)
self.logger.info(f"Playback started successfully{' for track_id: ' + track_id if track_id else ''}")
return result
except Exception as e:
self.logger.error(f"Error starting playback: {str(e)}", exc_info=True)
raise
@utils.validate
def pause_playback(self, device: Optional[Dict[str, Any]] = None) -> None:
"""Pauses playback."""
playback = self.sp.current_playback()
if playback and playback.get('is_playing'):
self.sp.pause_playback(device.get('id') if device else None)
@utils.validate
def add_to_queue(self, track_id: str, device: Optional[Dict[str, Any]] = None) -> None:
"""
Adds track to queue.
- track_id: ID of track to play.
"""
self.sp.add_to_queue(track_id, device.get('id') if device else None)
@utils.validate
def get_queue(self, device: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Returns the current queue of tracks."""
queue_info = self.sp.queue()
if not queue_info:
return {'currently_playing': None, 'queue': []}
self.logger.info(f"currently playing keys {queue_info.get('currently_playing', {}).keys()}")
queue_info['currently_playing'] = self.get_current_track()
queue = queue_info.pop('queue', [])
queue_info['queue'] = [track_info for track in queue
if (track_info := utils.parse_track(track)) is not None]
return queue_info
def get_liked_songs(self) -> List[Dict[str, Any]]:
results = self.sp.current_user_saved_tracks()
if not results or 'items' not in results:
return []
tracks = []
for item in results['items']:
if track := item.get('track'):
if track_info := utils.parse_track(track):
tracks.append(track_info)
return tracks
def is_track_playing(self) -> bool:
"""Returns if a track is actively playing."""
curr_track = self.get_current_track()
if not curr_track:
return False
return bool(curr_track.get('is_playing'))
def get_devices(self) -> List[Dict[str, Any]]:
"""Get list of available devices"""
devices = self.sp.devices()
return devices.get('devices', []) if devices else []
def is_active_device(self) -> bool:
"""Check if there is an active device"""
return any(device.get('is_active', False) for device in self.get_devices())
def _get_candidate_device(self) -> Optional[Dict[str, Any]]:
"""Get an active device or the first available device"""
devices = self.get_devices()
if not devices:
return None
for device in devices:
if device.get('is_active'):
return device
self.logger.info(f"No active device, assigning {devices[0].get('name', 'Unknown')}.")
return devices[0]
def auth_ok(self) -> bool:
"""Check if authentication is valid"""
try:
result = self.auth_manager.is_token_expired(self.cache_handler.get_cached_token())
self.logger.info(f"Auth check result: {'valid' if not result else 'expired'}")
return result
except Exception as e:
self.logger.error(f"Error checking auth status: {str(e)}", exc_info=True)
raise
def auth_refresh(self) -> None:
"""Refresh authentication token"""
self.auth_manager.validate_token(self.cache_handler.get_cached_token())
def skip_track(self, n: int = 1) -> None:
"""Skip n tracks"""
for _ in range(n):
self.sp.next_track()
def previous_track(self) -> None:
"""Go to previous track"""
self.sp.previous_track()
def seek_to_position(self, position_ms: int) -> None:
"""Seek to position in current track"""
self.sp.seek_track(position_ms=position_ms)
def set_volume(self, volume_percent: int) -> None:
"""Set playback volume"""
self.sp.volume(volume_percent)
# Playlist Methods
def get_playlist(self, playlist_id: str) -> Dict[str, Any]:
"""Get a playlist's details"""
try:
self.logger.info(f"Getting playlist with ID: {playlist_id}")
playlist = self.sp.playlist(playlist_id)
playlist_info = utils.parse_playlist(playlist, detailed=True)
if playlist_info:
self.logger.info(f"Successfully retrieved playlist: {playlist_info.get('name', 'Unknown')}")
else:
self.logger.warning(f"Retrieved empty playlist info for ID: {playlist_id}")
return playlist_info if playlist_info else {}
except Exception as e:
self.logger.error(f"Error getting playlist: {str(e)}", exc_info=True)
raise
def update_playlist_details(self, playlist_id: str, name: Optional[str] = None,
description: Optional[str] = None, public: Optional[bool] = None) -> None:
"""Update a playlist's details"""
try:
self.logger.info(f"Updating playlist {playlist_id} with name: {name}, description: {description}, public: {public}")
self.sp.playlist_change_details(
playlist_id,
name=name,
description=description,
public=public
)
self.logger.info(f"Successfully updated playlist details for ID: {playlist_id}")
except Exception as e:
self.logger.error(f"Error updating playlist: {str(e)}", exc_info=True)
raise
def update_playlist_items(self, playlist_id: str, uris: List[str],
range_start: Optional[int] = None,
insert_before: Optional[int] = None,
range_length: Optional[int] = None,
snapshot_id: Optional[str] = None) -> Dict[str, str]:
"""Update a playlist's items"""
try:
self.logger.info(f"Updating playlist {playlist_id} items. URIs count: {len(uris)}")
self.logger.info(f"Range params - start: {range_start}, insert_before: {insert_before}, length: {range_length}")
result = self.sp.playlist_replace_items(playlist_id, uris)
if range_start is not None and insert_before is not None:
self.logger.info(f"Reordering items in playlist {playlist_id}")
self.sp.playlist_reorder_items(
playlist_id,
range_start=range_start,
insert_before=insert_before,
range_length=range_length or 1,
snapshot_id=snapshot_id
)
snapshot_id = result["snapshot_id"] if result and isinstance(result, dict) else ""
self.logger.info(f"Successfully updated playlist items. Snapshot ID: {snapshot_id}")
return {"snapshot_id": snapshot_id}
except Exception as e:
self.logger.error(f"Error updating playlist items: {str(e)}", exc_info=True)
raise
def add_playlist_items(self, playlist_id: str, uris: List[str],
position: Optional[int] = None) -> Dict[str, str]:
"""Add items to a playlist"""
try:
self.logger.info(f"Adding {len(uris)} items to playlist {playlist_id} at position: {position}")
result = self.sp.playlist_add_items(playlist_id, uris, position=position)
snapshot_id = result["snapshot_id"] if result and isinstance(result, dict) else ""
self.logger.info(f"Successfully added items to playlist. Snapshot ID: {snapshot_id}")
return {"snapshot_id": snapshot_id}
except Exception as e:
self.logger.error(f"Error adding items to playlist: {str(e)}", exc_info=True)
raise
def remove_playlist_items(self, playlist_id: str, uris: List[str],
snapshot_id: Optional[str] = None) -> Dict[str, str]:
"""Remove items from a playlist"""
try:
self.logger.info(f"Removing {len(uris)} items from playlist {playlist_id}")
result = self.sp.playlist_remove_all_occurrences_of_items(
playlist_id,
uris,
snapshot_id=snapshot_id
)
snapshot_id = result["snapshot_id"] if result and isinstance(result, dict) else ""
self.logger.info(f"Successfully removed items from playlist. Snapshot ID: {snapshot_id}")
return {"snapshot_id": snapshot_id}
except Exception as e:
self.logger.error(f"Error removing items from playlist: {str(e)}", exc_info=True)
raise
def get_user_playlists(self, user_id: Optional[str] = None,
limit: int = 20, offset: int = 0) -> Dict[str, Any]:
"""Get a user's playlists"""
try:
self.logger.info(f"Getting playlists for user: {user_id if user_id else 'current user'}")
self.logger.info(f"Limit: {limit}, Offset: {offset}")
if user_id:
playlists = self.sp.user_playlists(user_id, limit=limit, offset=offset)
else:
playlists = self.sp.current_user_playlists(limit=limit, offset=offset)
if not playlists:
self.logger.info("No playlists found")
return {
'items': [],
'total': 0,
'limit': limit,
'offset': offset,
'next': None,
'previous': None
}
# Parse the playlists
result = {
'items': [playlist_info for playlist in playlists.get('items', [])
if (playlist_info := utils.parse_playlist(playlist)) is not None],
'total': playlists.get('total', 0),
'limit': playlists.get('limit', limit),
'offset': playlists.get('offset', offset),
'next': playlists.get('next'),
'previous': playlists.get('previous')
}
self.logger.info(f"Successfully retrieved {len(result['items'])} playlists")
return result
except Exception as e:
self.logger.error(f"Error getting user playlists: {str(e)}", exc_info=True)
raise
def create_playlist(self, user_id: str, name: str, description: str = "",
public: bool = False) -> Dict[str, Any]:
"""Create a new playlist"""
try:
self.logger.info(f"Creating playlist '{name}' for user {user_id}")
self.logger.info(f"Description: {description}, Public: {public}")
playlist = self.sp.user_playlist_create(
user_id,
name,
public=public,
description=description
)
playlist_info = utils.parse_playlist(playlist, detailed=True)
if playlist_info:
self.logger.info(f"Successfully created playlist. ID: {playlist_info.get('id', 'Unknown')}")
else:
self.logger.warning("Created playlist but received empty playlist info")
return playlist_info if playlist_info else {}
except Exception as e:
self.logger.error(f"Error creating playlist: {str(e)}", exc_info=True)
raise
def get_playlist_cover_image(self, playlist_id: str) -> List[Dict[str, Any]]:
"""Get a playlist's cover image"""
try:
self.logger.info(f"Getting cover image for playlist: {playlist_id}")
images = self.sp.playlist_cover_image(playlist_id)
if images:
self.logger.info(f"Successfully retrieved {len(images)} cover images")
else:
self.logger.info("No cover images found")
return images if images else []
except Exception as e:
self.logger.error(f"Error getting playlist cover: {str(e)}", exc_info=True)
raise
def upload_playlist_cover_image(self, playlist_id: str, image_data: str) -> None:
"""Upload a custom playlist cover image"""
try:
self.logger.info(f"Uploading cover image for playlist: {playlist_id}")
self.logger.info(f"Image data length: {len(image_data)} characters")
self.sp.playlist_upload_cover_image(playlist_id, image_data)
self.logger.info("Successfully uploaded playlist cover image")
except Exception as e:
self.logger.error(f"Error uploading playlist cover: {str(e)}", exc_info=True)
raise