Spotify MCP
by ashwanth1109
import os
from typing import Optional
import spotipy
from spotipy.oauth2 import SpotifyOAuth
from dotenv import load_dotenv
load_dotenv()
# Constants
SPOTIFY_API_BASE = "https://api.spotify.com/v1"
# Spotify setup
SCOPES = [
"user-read-currently-playing",
"user-read-playback-state",
"user-read-currently-playing", # spotify connect
"app-remote-control",
"streaming", # playback
"playlist-read-private",
"playlist-read-collaborative",
"playlist-modify-private",
"playlist-modify-public",
# playlists
"user-read-playback-position",
"user-top-read",
"user-read-recently-played", # listening history
"user-library-modify",
"user-library-read", # library
]
class SpotifyClient:
"""
A class that encapsulates all Spotify API functionality.
"""
def __init__(self) -> None:
self.sp = spotipy.Spotify(
auth_manager=SpotifyOAuth(
client_id=os.getenv("SPOTIFY_CLIENT_ID"),
client_secret=os.getenv("SPOTIFY_CLIENT_SECRET"),
redirect_uri=os.getenv("SPOTIFY_REDIRECT_URI"),
scope=SCOPES,
)
)
async def get_my_playlists(self) -> str:
"""
Get all playlists for the current user.
"""
try:
results = self.sp.current_user_playlists()
if not results["items"]:
return "No playlists found."
playlists = []
for item in results["items"]:
name = item["name"].encode("ascii", "ignore").decode()
playlist = (
f"Name: {name}\n"
f"Tracks: {item['tracks']['total']}\n"
f"ID: {item['id']}"
)
playlists.append(playlist)
return "\n---\n".join(playlists)
except Exception as e:
return f"Error fetching playlists: {str(e)}"
async def start_playback(self) -> str:
"""
Resume playback on the currently active device.
"""
try:
self.sp.start_playback()
return "Playback started successfully"
except Exception as e:
return f"Error starting playback: {str(e)}"
async def pause_playback(self) -> str:
"""
Pause playback on the specified device.
"""
try:
self.sp.pause_playback()
return "Playback paused successfully"
except Exception as e:
return f"Error pausing playback: {str(e)}"
async def recommendations(
self,
seed_artists: Optional[str] = None,
seed_tracks: Optional[str] = None,
seed_genres: Optional[str] = None,
limit: int = 20,
) -> dict:
"""
Get recommendations based on seed artists, tracks, or genres.
"""
try:
recommendations = self.sp.recommendations(
seed_artists=seed_artists,
seed_tracks=seed_tracks,
seed_genres=seed_genres,
limit=limit,
)
return recommendations
except Exception as e:
return f"Error getting recommendations: {str(e)}"
async def get_info(
self,
item_id: str,
qtype: str = "track",
) -> dict:
"""
Get information about a specific item.
- item_id: id.
- qtype: Either "track", "album", "artist", or "playlist".
"""
try:
match qtype:
case "track":
return self.sp.track(item_id)
case "album":
return self.sp.album(item_id)
case "artist":
return self.sp.artist(item_id)
case "playlist":
return self.sp.playlist(item_id)
raise ValueError(f"Invalid query type: {qtype}")
except Exception as e:
return f"Error getting information: {str(e)}"
async def search(self, query: str, qtype: str = "track", limit: int = 20) -> dict:
"""
Search for tracks, artists, albums, or playlists.
- query: Search query
- qtype: Either "track", "artist", "album", or "playlist"
- limit: Max number of results (default 20)
"""
try:
results = self.sp.search(q=query, type=qtype, limit=limit)
return results
except Exception as e:
return f"Error searching: {str(e)}"
async def get_current_playback(self) -> str:
"""
Get information about current playback state.
"""
try:
playback = self.sp.current_playback()
if not playback:
return "No active playback session."
return playback
except Exception as e:
return f"Error getting playback state: {str(e)}"
async def next_track(self) -> str:
"""
Skip to the next track.
"""
try:
self.sp.next_track()
return "Skipped to next track"
except Exception as e:
return f"Error skipping track: {str(e)}"
async def previous_track(self) -> str:
"""
Skip to the previous track.
"""
try:
self.sp.previous_track()
return "Skipped to previous track"
except Exception as e:
return f"Error skipping track: {str(e)}"
async def seek_track(self, position_ms: int) -> str:
"""
Seek to position in currently playing track.
- position_ms: Position in milliseconds
"""
try:
self.sp.seek_track(position_ms=position_ms)
return f"Seeked to position {position_ms}ms"
except Exception as e:
return f"Error seeking: {str(e)}"
async def start_playback_track(
self, track_uri: str, device_id: Optional[str] = None
) -> str:
"""
Start playback of a specific track.
- track_uri: Spotify URI of the track to play
- device_id: Optional device to play on
"""
try:
self.sp.start_playback(device_id=device_id, uris=[track_uri])
return "Started playing track successfully"
except Exception as e:
return f"Error starting track playback: {str(e)}"
async def get_top_artists(
self, limit: int = 20, time_range: str = "medium_term"
) -> dict:
"""
Get the current user's top artists.
- limit: Number of artists to return (default 20, max 50)
- time_range: 'short_term' (last 4 weeks), 'medium_term' (last 6 months),
or 'long_term' (all time) (default: medium_term)
"""
try:
results = self.sp.current_user_top_artists(
limit=limit, offset=0, time_range=time_range
)
return results
except Exception as e:
return f"Error getting top artists: {str(e)}"
async def get_queue(self) -> dict:
"""
Get the current user's queue.
Returns information about the queue including currently playing track and next tracks.
"""
try:
results = self.sp.queue()
return results
except Exception as e:
return f"Error getting queue: {str(e)}"
async def add_to_queue(self, track_id: str) -> str:
"""
Add a track to the user's queue.
- track_id: Spotify track ID to add
"""
try:
self.sp.add_to_queue(uri=f"spotify:track:{track_id}")
return "Track added to queue successfully"
except Exception as e:
return f"Error adding track to queue: {str(e)}"
async def skip_track(self, n: int = 1) -> str:
"""
Skip multiple tracks in the queue.
- n: Number of tracks to skip (default: 1)
"""
try:
for _ in range(n):
self.sp.next_track()
return f"Skipped {n} tracks successfully"
except Exception as e:
return f"Error skipping tracks: {str(e)}"
async def get_current_track(self) -> dict:
"""
Get detailed information about the currently playing track.
Returns None if no track is currently playing.
"""
try:
result = self.sp.current_user_playing_track()
if not result:
return "No track currently playing"
return result
except Exception as e:
return f"Error getting current track: {str(e)}"
async def start_context_playback(
self, context_uri: str, device_id: Optional[str] = None
) -> str:
"""
Start playback of a context (playlist, album, artist)
- context_uri: Spotify URI (e.g. 'spotify:playlist:37i9dQ...')
- device_id: Optional device to play on
"""
try:
self.sp.start_playback(device_id=device_id, context_uri=context_uri)
return "Started playing context successfully"
except Exception as e:
return f"Error starting context playback: {str(e)}"
async def get_artist_top_tracks(self, artist_id: str) -> dict:
"""
Get an artist's top tracks
- artist_id: Spotify artist ID
"""
try:
# Note: market parameter defaults to user's country
results = self.sp.artist_top_tracks(artist_id)
return results
except Exception as e:
return f"Error getting artist top tracks: {str(e)}"
async def set_repeat(self, state: str) -> str:
"""
Set repeat mode for playback
- state: 'track', 'context' or 'off'
"""
try:
self.sp.repeat(state)
return f"Repeat mode set to {state}"
except Exception as e:
return f"Error setting repeat mode: {str(e)}"
async def add_to_playlist(self, playlist_id: str, track_ids: list[str]) -> str:
"""
Add tracks to a playlist
- playlist_id: Spotify playlist ID
- track_ids: List of track IDs to add
"""
try:
# Convert track IDs to full URIs
track_uris = [f"spotify:track:{track_id}" for track_id in track_ids]
self.sp.playlist_add_items(playlist_id, track_uris)
return "Tracks added to playlist successfully"
except Exception as e:
return f"Error adding tracks to playlist: {str(e)}"
async def reorder_queue(self, range_start: int, insert_before: int) -> str:
"""
Reorder tracks in queue by moving a track to a different position
- range_start: Position of track to move (0-based)
- insert_before: Position to insert the track (0-based)
"""
try:
# Note: Spotify's queue reordering is done through a special endpoint
# that requires both positions to be 0-based
self.sp.queue_reorder(
range_start=range_start,
insert_before=insert_before,
range_length=1, # Move one track at a time
)
return "Queue reordered successfully"
except Exception as e:
return f"Error reordering queue: {str(e)}"