Skip to main content
Glama

Spotify MCP Server

by boristopalov
spotify_api.py8.73 kB
import logging import os from typing import Optional, Dict, List import spotipy from dotenv import load_dotenv from spotipy.cache_handler import CacheFileHandler from spotipy.oauth2 import SpotifyOAuth from . import utils load_dotenv() CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID") CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET") REDIRECT_URI = os.getenv("SPOTIFY_REDIRECT_URI") SCOPES = ["user-read-currently-playing", "user-read-playback-state", "user-read-currently-playing", # spotify connect "app-remote-control", "streaming", # playback "playlist-read-private", "playlist-read-collaborative", "playlist-modify-private", "playlist-modify-public", # playlists "user-read-playback-position", "user-top-read", "user-read-recently-played", # listening history "user-library-modify", "user-library-read", # library ] class Client: def __init__(self, logger: logging.Logger): """Initialize Spotify client with necessary permissions""" self.logger = logger scope = "user-library-read,user-read-playback-state,user-modify-playback-state,user-read-currently-playing" try: self.sp = spotipy.Spotify(auth_manager=SpotifyOAuth( scope=scope, client_id=CLIENT_ID, client_secret=CLIENT_SECRET, redirect_uri=REDIRECT_URI)) self.auth_manager: SpotifyOAuth = self.sp.auth_manager self.cache_handler: CacheFileHandler = self.auth_manager.cache_handler except Exception as e: self.logger.error(f"Failed to initialize Spotify client: {str(e)}", exc_info=True) raise def search(self, query: str, qtype: str = 'track', limit=10): """ Searches based of query term. - query: query term - qtype: the types of items to return. One or more of 'artist', 'album', 'track', 'playlist'. If multiple types are desired, pass in a comma separated string; e.g. 'track,album' - limit: max # items to return """ results = self.sp.search(q=query, limit=limit, type=qtype) return utils.parse_search_results(results, qtype) def recommendations(self, artists: Optional[List] = None, tracks: Optional[List] = None, limit=20): recs = self.sp.recommendations(seed_artists=artists, seed_tracks=tracks, limit=limit) return recs def get_info(self, item_id: str, qtype: str = 'track') -> dict: """ Returns more info about item. - item_id: id. - qtype: Either 'track', 'album', 'artist', or 'playlist'. """ match qtype: case 'track': return utils.parse_track(self.sp.track(item_id), detailed=True) case 'album': album_info = utils.parse_album(self.sp.album(item_id), detailed=True) return album_info case 'artist': artist_info = utils.parse_artist(self.sp.artist(item_id), detailed=True) albums = self.sp.artist_albums(item_id) top_tracks = self.sp.artist_top_tracks(item_id)['tracks'] albums_and_tracks = { 'albums': albums, 'tracks': {'items': top_tracks} } parsed_info = utils.parse_search_results(albums_and_tracks, qtype="album,track") artist_info['top_tracks'] = parsed_info['tracks'] artist_info['albums'] = parsed_info['albums'] return artist_info case 'playlist': playlist = self.sp.playlist(item_id) playlist_info = utils.parse_playlist(playlist, detailed=True) return playlist_info raise ValueError(f"uknown qtype {qtype}") def get_current_track(self) -> Optional[Dict]: """Get information about the currently playing track""" try: # current_playback vs current_user_playing_track? current = self.sp.current_user_playing_track() if not current: self.logger.info("No playback session found") return None if current.get('currently_playing_type') != 'track': self.logger.info("Current playback is not a track") return None track_info = utils.parse_track(current['item']) if 'is_playing' in current: track_info['is_playing'] = current['is_playing'] self.logger.info( f"Current track: {track_info.get('name', 'Unknown')} by {track_info.get('artist', 'Unknown')}") return track_info except Exception as e: self.logger.error("Error getting current track info", exc_info=True) raise @utils.validate def start_playback(self, track_id=None, device=None): """ Starts track playback. If track_id is omitted, resumes current playback. - track_id: ID of track to play, or None. """ try: if not track_id: if self.is_track_playing(): self.logger.info("No track_id provided and playback already active.") return if not self.get_current_track(): raise ValueError("No track_id provided and no current playback to resume.") uris = [f'spotify:track:{track_id}'] if track_id else None device_id = device.get('id') if device else None result = self.sp.start_playback(uris=uris, device_id=device_id) self.logger.info(f"Playback started successfully{' for track_id: ' + track_id if track_id else ''}") return result except Exception as e: self.logger.error(f"Error starting playback: {str(e)}", exc_info=True) raise @utils.validate def pause_playback(self, device=None): """Pauses playback.""" playback = self.sp.current_playback() if playback and playback.get('is_playing'): self.sp.pause_playback(device.get('id') if device else None) @utils.validate def add_to_queue(self, track_id: str, device=None): """ Adds track to queue. - track_id: ID of track to play. """ self.sp.add_to_queue(track_id, device.get('id') if device else None) @utils.validate def get_queue(self, device=None): """Returns the current queue of tracks.""" queue_info = self.sp.queue() self.logger.info(f"currently playing keys {queue_info['currently_playing'].keys()}") queue_info['currently_playing'] = self.get_current_track() queue_info['queue'] = [utils.parse_track(track) for track in queue_info.pop('queue')] return queue_info def get_liked_songs(self): # todo results = self.sp.current_user_saved_tracks() for idx, item in enumerate(results['items']): track = item['track'] print(idx, track['artists'][0]['name'], " – ", track['name']) def is_track_playing(self) -> bool: """Returns if a track is actively playing.""" curr_track = self.get_current_track() if not curr_track: return False if curr_track.get('is_playing'): return True return False def get_devices(self) -> dict: return self.sp.devices()['devices'] def is_active_device(self): return any([device.get('is_active') for device in self.get_devices()]) def _get_candidate_device(self): devices = self.get_devices() for device in devices: if device.get('is_active'): return device self.logger.info(f"No active device, assigning {devices[0]['name']}.") return devices[0] def auth_ok(self) -> bool: try: result = self.auth_manager.is_token_expired(self.cache_handler.get_cached_token()) self.logger.info(f"Auth check result: {'valid' if not result else 'expired'}") return result except Exception as e: self.logger.error(f"Error checking auth status: {str(e)}", exc_info=True) raise def auth_refresh(self): self.auth_manager.validate_token(self.cache_handler.get_cached_token()) def skip_track(self, n=1): # todo: Better error handling for _ in range(n): self.sp.next_track() def previous_track(self): self.sp.previous_track() def seek_to_position(self, position_ms): self.sp.seek_track(position_ms=position_ms) def set_volume(self, volume_percent): self.sp.volume(volume_percent)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/boristopalov/spotify-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server