"""
Modern FastMCP-based Spotify MCP Server.
Clean, simple implementation using FastMCP's automatic features.
"""
import json
import logging
from typing import Any
from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel
from spotipy import SpotifyException
import spotify_mcp.spotify_api as spotify_api
from spotify_mcp.errors import convert_spotify_error
from spotify_mcp.logging_utils import (
log_pagination_info,
log_tool_execution,
)
# Configure logging
logger = logging.getLogger(__name__)
# Configure structured logging for better observability
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
# Create FastMCP app
mcp = FastMCP("Spotify MCP")
# Initialize Spotify client
_client_wrapper = spotify_api.Client()
spotify_client = _client_wrapper.sp # Use spotipy client directly
# Data models for structured output
class Track(BaseModel):
"""A Spotify track with metadata."""
name: str
id: str
artist: str
artists: list[str] | None = None
album: str | None = None
album_id: str | None = None
release_date: str | None = None
duration_ms: int | None = None
popularity: int | None = None
external_urls: dict[str, str] | None = None
class PlaybackState(BaseModel):
"""Current playback state."""
is_playing: bool
track: Track | None = None
device: str | None = None
volume: int | None = None
shuffle: bool = False
repeat: str = "off"
progress_ms: int | None = None
class Playlist(BaseModel):
"""A Spotify playlist."""
name: str
id: str
owner: str | None = None
description: str | None = None
tracks: list[Track] | None = None
total_tracks: int | None = None
public: bool | None = None
class Artist(BaseModel):
"""A Spotify artist."""
name: str
id: str
genres: list[str] | None = None
popularity: int | None = None
followers: int | None = None
class Album(BaseModel):
"""A Spotify album."""
name: str
id: str
artist: str
artists: list[str] | None = None
release_date: str | None = None
release_date_precision: str | None = None
total_tracks: int | None = None
album_type: str | None = None
label: str | None = None
genres: list[str] | None = None
popularity: int | None = None
external_urls: dict[str, str] | None = None
class AudioFeatures(BaseModel):
"""Audio features for a track."""
id: str
tempo: float | None = None
key: int | None = None
mode: int | None = None
time_signature: int | None = None
danceability: float | None = None
energy: float | None = None
valence: float | None = None
loudness: float | None = None
speechiness: float | None = None
acousticness: float | None = None
instrumentalness: float | None = None
liveness: float | None = None
def parse_track(item: dict[str, Any]) -> Track:
"""Parse Spotify track data into Track model."""
album_data = item.get("album", {})
return Track(
name=item["name"],
id=item["id"],
artist=item["artists"][0]["name"] if item.get("artists") else "Unknown",
artists=[a["name"] for a in item.get("artists", [])],
album=album_data.get("name"),
album_id=album_data.get("id"),
release_date=album_data.get("release_date"),
duration_ms=item.get("duration_ms"),
popularity=item.get("popularity"),
external_urls=item.get("external_urls"),
)
def get_playlist_tracks_paginated(
playlist_id: str, limit: int | None = None, offset: int = 0
) -> list[Track]:
"""Get playlist tracks with proper pagination support.
Args:
playlist_id: Spotify playlist ID
limit: Maximum number of tracks to return (None for all)
offset: Number of tracks to skip
Returns:
List of Track objects
"""
tracks = []
current_offset = offset
batch_size = min(limit, 100) if limit else 100 # Spotify API max is 100 per request
remaining = limit
logger.info(
f"๐ Starting paginated fetch for playlist {playlist_id} (limit={limit}, offset={offset})"
)
while True:
# Determine how many to fetch in this batch
batch_limit = min(batch_size, remaining) if remaining else batch_size
logger.info(f"๐ Fetching batch: offset={current_offset}, limit={batch_limit}")
# Get playlist tracks with pagination
tracks_result = spotify_client.playlist_tracks(
playlist_id, limit=batch_limit, offset=current_offset
)
if not tracks_result or not tracks_result.get("items"):
break
# Parse and add tracks
batch_tracks = []
for item in tracks_result["items"]:
if item and item.get("track"):
batch_tracks.append(parse_track(item["track"]))
tracks.extend(batch_tracks)
logger.info(
f"๐ Batch complete: retrieved {len(batch_tracks)} tracks (total so far: {len(tracks)})"
)
# Update remaining count if we have a limit
if remaining:
remaining -= len(batch_tracks)
if remaining <= 0:
break
# Check if we've reached the end
if len(tracks_result["items"]) < batch_limit or not tracks_result.get("next"):
break
current_offset += len(tracks_result["items"])
# Safety check to prevent infinite loops
if current_offset > 10000:
logger.warning(
f"โ ๏ธ Safety limit reached: stopping at offset {current_offset}"
)
break
logger.info(f"๐ Pagination complete: total {len(tracks)} tracks retrieved")
return tracks
# === TOOLS ===
@mcp.tool()
@log_tool_execution
def playback_control(
action: str, track_id: str | None = None, num_skips: int = 1
) -> PlaybackState:
"""Control Spotify playback.
Args:
action: Action ('get', 'start', 'pause', 'skip')
track_id: Track ID to play (for 'start')
num_skips: Number of tracks to skip
"""
try:
if action == "get":
logger.info("๐ต Getting current playback state")
result = spotify_client.current_user_playing_track()
elif action == "start":
if track_id:
logger.info(f"๐ต Starting playback of track: {track_id}")
spotify_client.start_playback(uris=[f"spotify:track:{track_id}"])
else:
logger.info("๐ต Resuming playback")
spotify_client.start_playback()
result = spotify_client.current_user_playing_track()
elif action == "pause":
logger.info("๐ต Pausing playback")
spotify_client.pause_playback()
result = spotify_client.current_user_playing_track()
elif action == "skip":
logger.info(f"๐ต Skipping {num_skips} track(s)")
for _ in range(num_skips):
spotify_client.next_track()
result = spotify_client.current_user_playing_track()
else:
raise ValueError(f"Invalid action: {action}")
# Parse result
track = None
if result and result.get("item"):
track = parse_track(result["item"])
return PlaybackState(
is_playing=result.get("is_playing", False) if result else False,
track=track,
device=result.get("device", {}).get("name")
if result and result.get("device")
else None,
volume=result.get("device", {}).get("volume_percent")
if result and result.get("device")
else None,
shuffle=result.get("shuffle_state", False) if result else False,
repeat=result.get("repeat_state", "off") if result else "off",
progress_ms=result.get("progress_ms") if result else None,
)
except SpotifyException as e:
raise convert_spotify_error(e) from e
@mcp.tool()
@log_tool_execution
def search_tracks(
query: str,
qtype: str = "track",
limit: int = 10,
offset: int = 0,
year: str | None = None,
year_range: str | None = None,
genre: str | None = None,
artist: str | None = None,
album: str | None = None,
) -> dict[str, Any]:
"""Search Spotify for tracks, albums, artists, or playlists.
Args:
query: Search query
qtype: Type ('track', 'album', 'artist', 'playlist')
limit: Max results per page (1-50, default 10)
offset: Number of results to skip for pagination (default 0)
year: Filter by year (e.g., '2024')
year_range: Filter by year range (e.g., '2020-2024')
genre: Filter by genre (e.g., 'electronic', 'hip-hop')
artist: Filter by artist name
album: Filter by album name
Returns:
Dict with 'items' (list of tracks) and pagination info ('total', 'limit', 'offset')
Note: Filters use Spotify's search syntax. For large result sets, use offset to paginate.
Example: query='love', year='2024', genre='pop' searches for 'love year:2024 genre:pop'
"""
try:
limit = max(1, min(50, limit))
# Build filtered query
filters = []
if artist:
filters.append(f"artist:{artist}")
if album:
filters.append(f"album:{album}")
if year:
filters.append(f"year:{year}")
if year_range:
filters.append(f"year:{year_range}")
if genre:
filters.append(f"genre:{genre}")
full_query = " ".join([query] + filters) if filters else query
logger.info(
f"๐ Searching {qtype}s: '{full_query}' (limit={limit}, offset={offset})"
)
result = spotify_client.search(q=full_query, type=qtype, limit=limit, offset=offset)
tracks = []
items_key = f"{qtype}s"
result_section = result.get(items_key, {})
if qtype == "track" and result_section.get("items"):
tracks = [parse_track(item) for item in result_section["items"]]
else:
# Convert other types to track-like format for consistency
if result_section.get("items"):
for item in result_section["items"]:
track = Track(
name=item["name"],
id=item["id"],
artist=item.get("artists", [{}])[0].get("name", "Unknown")
if qtype != "artist"
else item["name"],
external_urls=item.get("external_urls"),
)
tracks.append(track)
total_results = result_section.get("total", 0)
logger.info(
f"๐ Search returned {len(tracks)} items (total available: {total_results})"
)
log_pagination_info("search_tracks", total_results, limit, offset)
return {
"items": tracks,
"total": total_results,
"limit": result_section.get("limit", limit),
"offset": result_section.get("offset", offset),
"next": result_section.get("next"),
"previous": result_section.get("previous"),
}
except SpotifyException as e:
raise convert_spotify_error(e) from e
@mcp.tool()
@log_tool_execution
def add_to_queue(track_id: str) -> dict[str, str]:
"""Add a track to the playback queue.
Args:
track_id: Spotify track ID to add to queue
Returns:
Dict with status and message
"""
try:
logger.info(f"๐ต Adding track {track_id} to queue")
spotify_client.add_to_queue(f"spotify:track:{track_id}")
return {"status": "success", "message": "Added track to queue"}
except SpotifyException as e:
raise convert_spotify_error(e) from e
@mcp.tool()
@log_tool_execution
def get_queue() -> dict[str, Any]:
"""Get the current playback queue.
Returns:
Dict with currently_playing track and queue of upcoming tracks
"""
try:
logger.info("๐ต Getting playback queue")
result = spotify_client.queue()
queue_tracks = []
if result.get("queue"):
queue_tracks = [parse_track(item) for item in result["queue"]]
return {
"currently_playing": parse_track(result["currently_playing"]).model_dump()
if result.get("currently_playing")
else None,
"queue": [track.model_dump() for track in queue_tracks],
}
except SpotifyException as e:
raise convert_spotify_error(e) from e
@mcp.tool()
@log_tool_execution
def get_track_info(track_ids: str | list[str]) -> dict[str, Any]:
"""Get detailed information about one or more Spotify tracks.
Args:
track_ids: Single track ID or list of track IDs (up to 50)
Returns:
Dict with 'tracks' list containing track metadata including release_date.
For single ID, returns {'tracks': [track]}.
Note: Batch lookup is much more efficient - 50 tracks = 1 API call instead of 50.
"""
try:
# Normalize to list
ids = [track_ids] if isinstance(track_ids, str) else track_ids
if len(ids) > 50:
raise ValueError("Maximum 50 track IDs per request (Spotify API limit)")
logger.info(f"๐ต Getting track info for {len(ids)} track(s)")
if len(ids) == 1:
result = spotify_client.track(ids[0])
tracks = [parse_track(result).model_dump()]
else:
result = spotify_client.tracks(ids)
tracks = [
parse_track(item).model_dump()
for item in result.get("tracks", [])
if item
]
return {"tracks": tracks}
except SpotifyException as e:
raise convert_spotify_error(e) from e
@mcp.tool()
@log_tool_execution
def get_artist_info(artist_id: str) -> dict[str, Any]:
"""Get detailed information about a Spotify artist.
Args:
artist_id: Spotify artist ID
Returns:
Dict with artist info and top tracks
"""
try:
logger.info(f"๐ค Getting artist info: {artist_id}")
result = spotify_client.artist(artist_id)
top_tracks = spotify_client.artist_top_tracks(artist_id)
artist = Artist(
name=result["name"],
id=result["id"],
genres=result.get("genres", []),
popularity=result.get("popularity"),
followers=result.get("followers", {}).get("total"),
)
tracks = [parse_track(track) for track in top_tracks.get("tracks", [])[:10]]
return {
"artist": artist.model_dump(),
"top_tracks": [track.model_dump() for track in tracks],
}
except SpotifyException as e:
raise convert_spotify_error(e) from e
@mcp.tool()
@log_tool_execution
def get_playlist_info(playlist_id: str) -> dict[str, Any]:
"""Get basic information about a Spotify playlist.
Args:
playlist_id: Spotify playlist ID
Returns:
Dict with playlist metadata (no tracks - use get_playlist_tracks for tracks)
Note: This returns playlist info only. For tracks, use get_playlist_tracks
which supports full pagination for large playlists.
"""
try:
logger.info(f"๐ Getting playlist info: {playlist_id}")
result = spotify_client.playlist(
playlist_id, fields="id,name,description,owner,public,tracks.total"
)
playlist = Playlist(
name=result["name"],
id=result["id"],
owner=result.get("owner", {}).get("display_name"),
description=result.get("description"),
tracks=None, # No tracks - use get_playlist_tracks
total_tracks=result.get("tracks", {}).get("total"),
public=result.get("public"),
)
return playlist.model_dump()
except SpotifyException as e:
raise convert_spotify_error(e) from e
@mcp.tool()
@log_tool_execution
def create_playlist(
name: str, description: str = "", public: bool = True
) -> dict[str, Any]:
"""Create a new Spotify playlist.
Args:
name: Playlist name
description: Playlist description (default: empty)
public: Whether playlist is public (default: True)
Returns:
Dict with created playlist information
"""
try:
logger.info(f"๐ง Creating playlist: '{name}' (public={public})")
user = spotify_client.current_user()
result = spotify_client.user_playlist_create(
user["id"], name, public=public, description=description
)
playlist = Playlist(
name=result["name"],
id=result["id"],
owner=result.get("owner", {}).get("display_name"),
description=result.get("description"),
tracks=[],
total_tracks=0,
public=result.get("public"),
)
return playlist.model_dump()
except SpotifyException as e:
raise convert_spotify_error(e) from e
@mcp.tool()
@log_tool_execution
def add_tracks_to_playlist(playlist_id: str, track_uris: list[str]) -> dict[str, str]:
"""Add tracks to a playlist.
Args:
playlist_id: Playlist ID
track_uris: List of track URIs (up to 100)
"""
try:
# Convert track IDs to URIs if needed
uris = [
uri if uri.startswith("spotify:track:") else f"spotify:track:{uri}"
for uri in track_uris
]
logger.info(f"๐ง Adding {len(uris)} tracks to playlist {playlist_id}")
spotify_client.playlist_add_items(playlist_id, uris)
return {"status": "success", "message": f"Added {len(uris)} tracks to playlist"}
except SpotifyException as e:
raise convert_spotify_error(e) from e
@mcp.tool()
@log_tool_execution
def get_user_playlists(limit: int = 20, offset: int = 0) -> dict[str, Any]:
"""Get current user's playlists with pagination support.
Args:
limit: Max playlists to return per page (1-50, default 20)
offset: Number of playlists to skip for pagination (default 0)
Returns:
Dict with 'items' (list of playlists) and pagination info ('total', 'limit', 'offset')
Note: For users with many playlists, use offset to paginate through results.
Example: offset=0 gets playlists 1-20, offset=20 gets playlists 21-40, etc.
"""
try:
# Validate limit (Spotify API accepts 1-50)
limit = max(1, min(50, limit))
logger.info(f"๐ Getting user playlists (limit={limit}, offset={offset})")
result = spotify_client.current_user_playlists(limit=limit, offset=offset)
# Log pagination info
log_pagination_info("get_user_playlists", result.get("total", 0), limit, offset)
playlists = []
for item in result.get("items", []):
playlist = Playlist(
name=item["name"],
id=item["id"],
owner=item.get("owner", {}).get("display_name"),
description=item.get("description"),
total_tracks=item.get("tracks", {}).get("total"),
public=item.get("public"),
)
playlists.append(playlist)
return {
"items": playlists,
"total": result.get("total", 0),
"limit": result.get("limit", limit),
"offset": result.get("offset", offset),
"next": result.get("next"),
"previous": result.get("previous"),
}
except SpotifyException as e:
raise convert_spotify_error(e) from e
@mcp.tool()
@log_tool_execution
def get_playlist_tracks(
playlist_id: str, limit: int | None = None, offset: int = 0
) -> dict[str, Any]:
"""Get tracks from a playlist with full pagination support.
Args:
playlist_id: Playlist ID
limit: Max tracks to return (None for all tracks, up to 10,000 safety limit)
offset: Number of tracks to skip for pagination (default 0)
Returns:
Dict with 'items' (list of tracks), 'total', 'limit', 'offset'
Note: Large playlists require pagination. Use limit/offset to get specific ranges:
- Get first 100: limit=100, offset=0
- Get next 100: limit=100, offset=100
- Get all tracks: limit=None (use with caution on very large playlists)
"""
try:
logger.info(
f"๐ Getting playlist tracks: {playlist_id} (limit={limit}, offset={offset})"
)
tracks = get_playlist_tracks_paginated(playlist_id, limit, offset)
# Get total track count from playlist info
playlist_info = spotify_client.playlist(playlist_id, fields="tracks.total")
total_tracks = playlist_info.get("tracks", {}).get("total", len(tracks))
# Log pagination info
log_pagination_info("get_playlist_tracks", total_tracks, limit, offset)
logger.info(f"๐ Retrieved {len(tracks)} tracks from playlist {playlist_id}")
return {
"items": tracks,
"total": total_tracks,
"limit": limit,
"offset": offset,
"returned": len(tracks),
}
except SpotifyException as e:
raise convert_spotify_error(e) from e
@mcp.tool()
@log_tool_execution
def remove_tracks_from_playlist(
playlist_id: str, track_uris: list[str]
) -> dict[str, str]:
"""Remove tracks from a playlist.
Args:
playlist_id: Playlist ID
track_uris: List of track URIs to remove
"""
try:
# Convert track IDs to URIs if needed
uris = [
uri if uri.startswith("spotify:track:") else f"spotify:track:{uri}"
for uri in track_uris
]
logger.info(f"๐ฎ Removing {len(uris)} tracks from playlist {playlist_id}")
spotify_client.playlist_remove_all_occurrences_of_items(playlist_id, uris)
return {
"status": "success",
"message": f"Removed {len(uris)} tracks from playlist",
}
except SpotifyException as e:
raise convert_spotify_error(e) from e
@mcp.tool()
@log_tool_execution
def modify_playlist_details(
playlist_id: str,
name: str | None = None,
description: str | None = None,
public: bool | None = None,
) -> dict[str, str]:
"""Modify playlist details.
Args:
playlist_id: Playlist ID
name: New playlist name (optional)
description: New playlist description (optional)
public: Whether playlist should be public (optional)
"""
try:
if not name and not description and public is None:
raise ValueError(
"At least one of name, description, or public must be provided"
)
updates = []
if name:
updates.append(f"name='{name}'")
if description:
updates.append(f"description='{description}'")
if public is not None:
updates.append(f"public={public}")
logger.info(f"๐ Modifying playlist {playlist_id}: {', '.join(updates)}")
spotify_client.playlist_change_details(
playlist_id, name=name, description=description, public=public
)
return {"status": "success", "message": "Playlist details updated successfully"}
except SpotifyException as e:
raise convert_spotify_error(e) from e
@mcp.tool()
@log_tool_execution
def get_album_info(album_id: str) -> dict[str, Any]:
"""Get detailed information about a Spotify album.
Args:
album_id: Spotify album ID
Returns:
Dict with album metadata including release_date, label, tracks
"""
try:
logger.info(f"๐ฟ Getting album info: {album_id}")
result = spotify_client.album(album_id)
album = Album(
name=result["name"],
id=result["id"],
artist=result["artists"][0]["name"] if result.get("artists") else "Unknown",
artists=[a["name"] for a in result.get("artists", [])],
release_date=result.get("release_date"),
release_date_precision=result.get("release_date_precision"),
total_tracks=result.get("total_tracks"),
album_type=result.get("album_type"),
label=result.get("label"),
genres=result.get("genres", []),
popularity=result.get("popularity"),
external_urls=result.get("external_urls"),
)
# Parse album tracks
tracks = []
for item in result.get("tracks", {}).get("items", []):
if item:
# Album track items don't have album info, add it
item["album"] = {
"name": result["name"],
"id": result["id"],
"release_date": result.get("release_date"),
}
tracks.append(parse_track(item).model_dump())
return {
"album": album.model_dump(),
"tracks": tracks,
}
except SpotifyException as e:
raise convert_spotify_error(e) from e
@mcp.tool()
@log_tool_execution
def get_audio_features(track_ids: str | list[str]) -> dict[str, Any]:
"""Get audio features for one or more tracks (tempo, key, energy, danceability, etc).
Args:
track_ids: Single track ID or list of track IDs (up to 100)
Returns:
Dict with 'features' list containing audio features for each track.
Features include: tempo, key, mode, time_signature, danceability, energy,
valence, loudness, speechiness, acousticness, instrumentalness, liveness.
Note: Batch lookup is efficient - 100 tracks = 1 API call.
"""
try:
# Normalize to list
ids = [track_ids] if isinstance(track_ids, str) else track_ids
if len(ids) > 100:
raise ValueError("Maximum 100 track IDs per request (Spotify API limit)")
logger.info(f"๐ผ Getting audio features for {len(ids)} track(s)")
result = spotify_client.audio_features(ids)
features_list = []
for features in result:
if features:
audio = AudioFeatures(
id=features["id"],
tempo=features.get("tempo"),
key=features.get("key"),
mode=features.get("mode"),
time_signature=features.get("time_signature"),
danceability=features.get("danceability"),
energy=features.get("energy"),
valence=features.get("valence"),
loudness=features.get("loudness"),
speechiness=features.get("speechiness"),
acousticness=features.get("acousticness"),
instrumentalness=features.get("instrumentalness"),
liveness=features.get("liveness"),
)
features_list.append(audio.model_dump())
return {"features": features_list}
except SpotifyException as e:
raise convert_spotify_error(e) from e
@mcp.tool()
@log_tool_execution
def get_saved_tracks(limit: int = 20, offset: int = 0) -> dict[str, Any]:
"""Get user's saved/liked tracks (Liked Songs library).
Args:
limit: Max tracks to return per page (1-50, default 20)
offset: Number of tracks to skip for pagination (default 0)
Returns:
Dict with 'items' (list of tracks with added_at timestamp) and pagination info
"""
try:
limit = max(1, min(50, limit))
logger.info(f"โค๏ธ Getting saved tracks (limit={limit}, offset={offset})")
result = spotify_client.current_user_saved_tracks(limit=limit, offset=offset)
tracks = []
for item in result.get("items", []):
if item and item.get("track"):
track_data = parse_track(item["track"]).model_dump()
track_data["added_at"] = item.get("added_at")
tracks.append(track_data)
log_pagination_info("get_saved_tracks", result.get("total", 0), limit, offset)
return {
"items": tracks,
"total": result.get("total", 0),
"limit": result.get("limit", limit),
"offset": result.get("offset", offset),
"next": result.get("next"),
"previous": result.get("previous"),
}
except SpotifyException as e:
raise convert_spotify_error(e) from e
@mcp.tool()
@log_tool_execution
def get_recommendations(
seed_artists: list[str] | None = None,
seed_tracks: list[str] | None = None,
seed_genres: list[str] | None = None,
limit: int = 20,
) -> dict[str, Any]:
"""Get track recommendations based on seed artists, tracks, or genres.
Args:
seed_artists: List of artist IDs (up to 5 total seeds combined)
seed_tracks: List of track IDs (up to 5 total seeds combined)
seed_genres: List of genres (up to 5 total seeds combined)
limit: Number of recommendations to return (1-100, default 20)
Returns:
Dict with 'tracks' list of recommended tracks
Note: Total seeds (artists + tracks + genres) must be between 1 and 5.
Use search_tracks to find seed track/artist IDs, or common genres like:
'pop', 'rock', 'hip-hop', 'electronic', 'jazz', 'classical', 'r-n-b', 'country'
"""
try:
# Validate seeds
total_seeds = (
len(seed_artists or [])
+ len(seed_tracks or [])
+ len(seed_genres or [])
)
if total_seeds == 0:
raise ValueError("At least one seed (artist, track, or genre) is required")
if total_seeds > 5:
raise ValueError("Maximum 5 total seeds allowed (artists + tracks + genres)")
limit = max(1, min(100, limit))
logger.info(
f"๐ฒ Getting recommendations (artists={seed_artists}, "
f"tracks={seed_tracks}, genres={seed_genres}, limit={limit})"
)
result = spotify_client.recommendations(
seed_artists=seed_artists,
seed_tracks=seed_tracks,
seed_genres=seed_genres,
limit=limit,
)
tracks = []
for item in result.get("tracks", []):
if item:
tracks.append(parse_track(item).model_dump())
return {
"tracks": tracks,
"seeds": {
"artists": seed_artists or [],
"tracks": seed_tracks or [],
"genres": seed_genres or [],
},
}
except SpotifyException as e:
raise convert_spotify_error(e) from e
# === RESOURCES ===
@mcp.resource("spotify://user/current")
def current_user() -> str:
"""Current user's profile."""
try:
user = spotify_client.current_user()
return json.dumps(
{
"id": user.get("id"),
"display_name": user.get("display_name"),
"followers": user.get("followers", {}).get("total"),
"country": user.get("country"),
"product": user.get("product"),
}
)
except Exception as e:
return json.dumps({"error": str(e)})
@mcp.resource("spotify://playback/current")
def current_playback_resource() -> str:
"""Current playback state."""
try:
playback = spotify_client.current_user_playing_track()
if not playback:
return json.dumps({"status": "no_playback"})
track_info = playback.get("item", {})
return json.dumps(
{
"is_playing": playback.get("is_playing", False),
"track": {
"name": track_info.get("name"),
"artist": track_info.get("artists", [{}])[0].get("name"),
"album": track_info.get("album", {}).get("name"),
"id": track_info.get("id"),
}
if track_info
else None,
"device": playback.get("device", {}).get("name"),
"progress_ms": playback.get("progress_ms"),
}
)
except Exception as e:
return json.dumps({"error": str(e)})
# === PROMPTS ===
@mcp.prompt()
def create_mood_playlist(mood: str, genre: str = "", decade: str = "") -> str:
"""Create a playlist based on mood and preferences."""
prompt = f"Create a Spotify playlist for a {mood} mood"
if genre:
prompt += f" with {genre} music"
if decade:
prompt += f" from the {decade}"
return f"""{prompt}.
Workflow:
1. Use search_tracks with different queries to find diverse songs
- For large search results, use offset parameter to get more options
- Example: search_tracks("upbeat pop", limit=20, offset=0) then offset=20 for more
2. Create playlist with create_playlist
3. Add tracks with add_tracks_to_playlist (supports up to 100 tracks per call)
Pagination Tips:
- Search results are paginated (limit=1-50, use offset for more results)
- For variety, try multiple search queries with different offsets
- Large playlists: batch add tracks in groups of 50-100
Consider:
1. Energy level for {mood} mood
2. {f"Focus on {genre}" if genre else "Genre variety"}
3. {f"Songs from {decade}" if decade else "Mix of eras"}
4. 15-20 songs with good flow"""
@mcp.prompt()
def analyze_large_playlist(playlist_id: str, analysis_type: str = "overview") -> str:
"""Analyze a large playlist efficiently using pagination."""
return f"""Analyze playlist {playlist_id} with focus on {analysis_type}.
For large playlists (>100 tracks), use pagination to analyze efficiently:
Step 1: Get overview
- Use get_item_info(playlist_id, "playlist") for basic info and first 50 tracks
- Check total_tracks to understand playlist size
Step 2: Full analysis (if needed)
- For playlists >100 tracks, use get_playlist_tracks with pagination:
- get_playlist_tracks(playlist_id, limit=100, offset=0) for first 100
- get_playlist_tracks(playlist_id, limit=100, offset=100) for next 100
- Continue until you have all tracks or sufficient sample
Step 3: Analysis
Based on analysis_type:
- "overview": Basic stats, genres, mood distribution
- "detailed": Track-by-track analysis, recommendations
- "duplicates": Find duplicate tracks across large playlist
- "mood": Analyze mood/energy progression through playlist
Pagination Benefits:
- Memory efficient for 1000+ track playlists
- Can stop early if sufficient data collected
- Allows progressive analysis with user feedback"""
@mcp.prompt()
def discover_music_systematically(
seed_query: str, exploration_depth: str = "medium"
) -> str:
"""Systematically discover music using search pagination."""
return f"""Discover music related to "{seed_query}" with {exploration_depth} exploration.
Search Strategy with Pagination:
1. Initial search: search_tracks("{seed_query}", limit=20, offset=0)
2. Diverse results: Use different offsets to explore deeper:
- Popular results: offset=0-20
- Hidden gems: offset=20-40, offset=40-60
- Deep cuts: offset=80-100+
3. Related searches with pagination:
- Artist names from initial results
- Album names from initial results
- Genre + decade combinations
- Similar mood/energy descriptors
Exploration Depth:
- "light": 2-3 search queries, 20 results each
- "medium": 5-6 search queries, explore offsets 0-40
- "deep": 10+ search queries, explore offsets 0-100+
Pagination Best Practices:
- Start with limit=20 for quick overview
- Use offset to avoid duplicate results
- Try different query variations rather than just advancing offset
- Stop when you find enough quality matches
Output: Curated list of 15-25 discovered tracks with variety"""
if __name__ == "__main__":
mcp.run()