Spotify MCP Server
by varunneal
- src
- spotify_mcp
import logging
import os
from typing import Optional, Dict, List
import spotipy
from dotenv import load_dotenv
from spotipy.cache_handler import CacheFileHandler
from spotipy.oauth2 import SpotifyOAuth
from . import utils
load_dotenv()
CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID")
CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET")
REDIRECT_URI = os.getenv("SPOTIFY_REDIRECT_URI")
SCOPES = ["user-read-currently-playing", "user-read-playback-state", "user-read-currently-playing", # spotify connect
"app-remote-control", "streaming", # playback
"playlist-read-private", "playlist-read-collaborative", "playlist-modify-private", "playlist-modify-public",
# playlists
"user-read-playback-position", "user-top-read", "user-read-recently-played", # listening history
"user-library-modify", "user-library-read", # library
]
class Client:
def __init__(self, logger: logging.Logger):
"""Initialize Spotify client with necessary permissions"""
self.logger = logger
scope = "user-library-read,user-read-playback-state,user-modify-playback-state,user-read-currently-playing"
try:
self.sp = spotipy.Spotify(auth_manager=SpotifyOAuth(
scope=scope,
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
redirect_uri=REDIRECT_URI))
self.auth_manager: SpotifyOAuth = self.sp.auth_manager
self.cache_handler: CacheFileHandler = self.auth_manager.cache_handler
self.username = self.get_username()
except Exception as e:
self.logger.error(f"Failed to initialize Spotify client: {str(e)}")
raise
@utils.validate
def get_username(self, device=None):
return self.sp.current_user()['display_name']
@utils.validate
def search(self, query: str, qtype: str = 'track', limit=10, device=None):
"""
Searches based of query term.
- query: query term
- qtype: the types of items to return. One or more of 'artist', 'album', 'track', 'playlist'.
If multiple types are desired, pass in a comma separated string; e.g. 'track,album'
- limit: max # items to return
"""
results = self.sp.search(q=query, limit=limit, type=qtype)
return utils.parse_search_results(results, qtype, self.username)
def recommendations(self, artists: Optional[List] = None, tracks: Optional[List] = None, limit=20):
# doesnt work
recs = self.sp.recommendations(seed_artists=artists, seed_tracks=tracks, limit=limit)
return recs
def get_info(self, item_uri: str) -> dict:
"""
Returns more info about item.
- item_uri: uri. Looks like 'spotify:track:xxxxxx', 'spotify:album:xxxxxx', etc.
"""
_, qtype, item_id = item_uri.split(":")
match qtype:
case 'track':
return utils.parse_track(self.sp.track(item_id), detailed=True)
case 'album':
album_info = utils.parse_album(self.sp.album(item_id), detailed=True)
return album_info
case 'artist':
artist_info = utils.parse_artist(self.sp.artist(item_id), detailed=True)
albums = self.sp.artist_albums(item_id)
top_tracks = self.sp.artist_top_tracks(item_id)['tracks']
albums_and_tracks = {
'albums': albums,
'tracks': {'items': top_tracks}
}
parsed_info = utils.parse_search_results(albums_and_tracks, qtype="album,track")
artist_info['top_tracks'] = parsed_info['tracks']
artist_info['albums'] = parsed_info['albums']
return artist_info
case 'playlist':
playlist = self.sp.playlist(item_id)
self.logger.info(f"playlist info is {playlist}")
playlist_info = utils.parse_playlist(playlist, self.username, detailed=True)
return playlist_info
raise ValueError(f"Unknown qtype {qtype}")
def get_current_track(self) -> Optional[Dict]:
"""Get information about the currently playing track"""
try:
# current_playback vs current_user_playing_track?
current = self.sp.current_user_playing_track()
if not current:
self.logger.info("No playback session found")
return None
if current.get('currently_playing_type') != 'track':
self.logger.info("Current playback is not a track")
return None
track_info = utils.parse_track(current['item'])
if 'is_playing' in current:
track_info['is_playing'] = current['is_playing']
self.logger.info(
f"Current track: {track_info.get('name', 'Unknown')} by {track_info.get('artist', 'Unknown')}")
return track_info
except Exception as e:
self.logger.error("Error getting current track info")
raise
@utils.validate
def start_playback(self, spotify_uri=None, device=None):
"""
Starts spotify playback of uri. If spotify_uri is omitted, resumes current playback.
- spotify_uri: ID of resource to play, or None. Typically looks like 'spotify:track:xxxxxx' or 'spotify:album:xxxxxx'.
"""
try:
self.logger.info(f"Starting playback for spotify_uri: {spotify_uri} on {device}")
if not spotify_uri:
if self.is_track_playing():
self.logger.info("No track_id provided and playback already active.")
return
if not self.get_current_track():
raise ValueError("No track_id provided and no current playback to resume.")
if spotify_uri is not None:
if spotify_uri.startswith('spotify:track:'):
uris = [spotify_uri]
context_uri = None
else:
uris = None
context_uri = spotify_uri
else:
uris = None
context_uri = None
device_id = device.get('id') if device else None
self.logger.info(f"Starting playback of on {device}: context_uri={context_uri}, uris={uris}")
result = self.sp.start_playback(uris=uris, context_uri=context_uri, device_id=device_id)
self.logger.info(f"Playback result: {result}")
return result
except Exception as e:
self.logger.error(f"Error starting playback: {str(e)}")
raise
@utils.validate
def pause_playback(self, device=None):
"""Pauses playback."""
playback = self.sp.current_playback()
if playback and playback.get('is_playing'):
self.sp.pause_playback(device.get('id') if device else None)
@utils.validate
def add_to_queue(self, track_id: str, device=None):
"""
Adds track to queue.
- track_id: ID of track to play.
"""
self.sp.add_to_queue(track_id, device.get('id') if device else None)
@utils.validate
def get_queue(self, device=None):
"""Returns the current queue of tracks."""
queue_info = self.sp.queue()
self.logger.info(f"currently playing keys {queue_info['currently_playing'].keys()}")
queue_info['currently_playing'] = self.get_current_track()
queue_info['queue'] = [utils.parse_track(track) for track in queue_info.pop('queue')]
return queue_info
def get_liked_songs(self):
# todo
results = self.sp.current_user_saved_tracks()
for idx, item in enumerate(results['items']):
track = item['track']
print(idx, track['artists'][0]['name'], " – ", track['name'])
def is_track_playing(self) -> bool:
"""Returns if a track is actively playing."""
curr_track = self.get_current_track()
if not curr_track:
return False
if curr_track.get('is_playing'):
return True
return False
def get_devices(self) -> dict:
return self.sp.devices()['devices']
def is_active_device(self):
return any([device.get('is_active') for device in self.get_devices()])
def _get_candidate_device(self):
devices = self.get_devices()
for device in devices:
if device.get('is_active'):
return device
self.logger.info(f"No active device, assigning {devices[0]['name']}.")
return devices[0]
def auth_ok(self) -> bool:
try:
token = self.cache_handler.get_cached_token()
if token is None:
self.logger.info("Auth check result: no token exists")
return False
is_expired = self.auth_manager.is_token_expired(token)
self.logger.info(f"Auth check result: {'valid' if not is_expired else 'expired'}")
return not is_expired # Return True if token is NOT expired
except Exception as e:
self.logger.error(f"Error checking auth status: {str(e)}")
return False # Return False on error rather than raising
def auth_refresh(self):
self.auth_manager.validate_token(self.cache_handler.get_cached_token())
def skip_track(self, n=1):
# todo: Better error handling
for _ in range(n):
self.sp.next_track()
def previous_track(self):
self.sp.previous_track()
def seek_to_position(self, position_ms):
self.sp.seek_track(position_ms=position_ms)
def set_volume(self, volume_percent):
self.sp.volume(volume_percent)