fastmcp_server.py•26 kB
"""
Modern FastMCP-based Spotify MCP Server.
Clean, simple implementation using FastMCP's automatic features.
"""
import json
import logging
from typing import Any
from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel
from spotipy import SpotifyException
import spotify_mcp.spotify_api as spotify_api
from spotify_mcp.errors import convert_spotify_error
from spotify_mcp.logging_utils import (
log_pagination_info,
log_tool_execution,
)
# Configure logging
logger = logging.getLogger(__name__)
# Configure structured logging for better observability
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
# Create FastMCP app
mcp = FastMCP("Spotify MCP")
# Initialize Spotify client
_client_wrapper = spotify_api.Client()
spotify_client = _client_wrapper.sp # Use spotipy client directly
# Data models for structured output
class Track(BaseModel):
"""A Spotify track with metadata."""
name: str
id: str
artist: str
artists: list[str] | None = None
album: str | None = None
duration_ms: int | None = None
popularity: int | None = None
external_urls: dict[str, str] | None = None
class PlaybackState(BaseModel):
"""Current playback state."""
is_playing: bool
track: Track | None = None
device: str | None = None
volume: int | None = None
shuffle: bool = False
repeat: str = "off"
progress_ms: int | None = None
class Playlist(BaseModel):
"""A Spotify playlist."""
name: str
id: str
owner: str | None = None
description: str | None = None
tracks: list[Track] | None = None
total_tracks: int | None = None
public: bool | None = None
class Artist(BaseModel):
"""A Spotify artist."""
name: str
id: str
genres: list[str] | None = None
popularity: int | None = None
followers: int | None = None
def parse_track(item: dict[str, Any]) -> Track:
"""Parse Spotify track data into Track model."""
return Track(
name=item["name"],
id=item["id"],
artist=item["artists"][0]["name"] if item.get("artists") else "Unknown",
artists=[a["name"] for a in item.get("artists", [])],
album=item.get("album", {}).get("name"),
duration_ms=item.get("duration_ms"),
popularity=item.get("popularity"),
external_urls=item.get("external_urls"),
)
def get_playlist_tracks_paginated(
playlist_id: str, limit: int | None = None, offset: int = 0
) -> list[Track]:
"""Get playlist tracks with proper pagination support.
Args:
playlist_id: Spotify playlist ID
limit: Maximum number of tracks to return (None for all)
offset: Number of tracks to skip
Returns:
List of Track objects
"""
tracks = []
current_offset = offset
batch_size = min(limit, 100) if limit else 100 # Spotify API max is 100 per request
remaining = limit
logger.info(
f"📄 Starting paginated fetch for playlist {playlist_id} (limit={limit}, offset={offset})"
)
while True:
# Determine how many to fetch in this batch
batch_limit = min(batch_size, remaining) if remaining else batch_size
logger.info(f"📄 Fetching batch: offset={current_offset}, limit={batch_limit}")
# Get playlist tracks with pagination
tracks_result = spotify_client.playlist_tracks(
playlist_id, limit=batch_limit, offset=current_offset
)
if not tracks_result or not tracks_result.get("items"):
break
# Parse and add tracks
batch_tracks = []
for item in tracks_result["items"]:
if item and item.get("track"):
batch_tracks.append(parse_track(item["track"]))
tracks.extend(batch_tracks)
logger.info(
f"📄 Batch complete: retrieved {len(batch_tracks)} tracks (total so far: {len(tracks)})"
)
# Update remaining count if we have a limit
if remaining:
remaining -= len(batch_tracks)
if remaining <= 0:
break
# Check if we've reached the end
if len(tracks_result["items"]) < batch_limit or not tracks_result.get("next"):
break
current_offset += len(tracks_result["items"])
# Safety check to prevent infinite loops
if current_offset > 10000:
logger.warning(
f"⚠️ Safety limit reached: stopping at offset {current_offset}"
)
break
logger.info(f"📄 Pagination complete: total {len(tracks)} tracks retrieved")
return tracks
# === TOOLS ===
@mcp.tool()
@log_tool_execution
def playback_control(
action: str, track_id: str | None = None, num_skips: int = 1
) -> PlaybackState:
"""Control Spotify playback.
Args:
action: Action ('get', 'start', 'pause', 'skip')
track_id: Track ID to play (for 'start')
num_skips: Number of tracks to skip
"""
try:
if action == "get":
logger.info("🎵 Getting current playback state")
result = spotify_client.current_user_playing_track()
elif action == "start":
if track_id:
logger.info(f"🎵 Starting playback of track: {track_id}")
spotify_client.start_playback(uris=[f"spotify:track:{track_id}"])
else:
logger.info("🎵 Resuming playback")
spotify_client.start_playback()
result = spotify_client.current_user_playing_track()
elif action == "pause":
logger.info("🎵 Pausing playback")
spotify_client.pause_playback()
result = spotify_client.current_user_playing_track()
elif action == "skip":
logger.info(f"🎵 Skipping {num_skips} track(s)")
for _ in range(num_skips):
spotify_client.next_track()
result = spotify_client.current_user_playing_track()
else:
raise ValueError(f"Invalid action: {action}")
# Parse result
track = None
if result and result.get("item"):
track = parse_track(result["item"])
return PlaybackState(
is_playing=result.get("is_playing", False) if result else False,
track=track,
device=result.get("device", {}).get("name")
if result and result.get("device")
else None,
volume=result.get("device", {}).get("volume_percent")
if result and result.get("device")
else None,
shuffle=result.get("shuffle_state", False) if result else False,
repeat=result.get("repeat_state", "off") if result else "off",
progress_ms=result.get("progress_ms") if result else None,
)
except SpotifyException as e:
raise convert_spotify_error(e) from e
@mcp.tool()
@log_tool_execution
def search_tracks(
query: str, qtype: str = "track", limit: int = 10, offset: int = 0
) -> dict[str, Any]:
"""Search Spotify for tracks, albums, artists, or playlists.
Args:
query: Search query
qtype: Type ('track', 'album', 'artist', 'playlist')
limit: Max results per page (1-50, default 10)
offset: Number of results to skip for pagination (default 0)
Returns:
Dict with 'items' (list of tracks) and pagination info ('total', 'limit', 'offset')
Note: For large result sets, use offset to paginate through results.
Example: offset=0 gets results 1-10, offset=10 gets results 11-20, etc.
"""
try:
# Validate limit (Spotify API accepts 1-50)
limit = max(1, min(50, limit))
logger.info(
f"🔍 Searching {qtype}s: '{query}' (limit={limit}, offset={offset})"
)
result = spotify_client.search(q=query, type=qtype, limit=limit, offset=offset)
tracks = []
items_key = f"{qtype}s"
result_section = result.get(items_key, {})
if qtype == "track" and result_section.get("items"):
tracks = [parse_track(item) for item in result_section["items"]]
else:
# Convert other types to track-like format for consistency
if result_section.get("items"):
for item in result_section["items"]:
track = Track(
name=item["name"],
id=item["id"],
artist=item.get("artists", [{}])[0].get("name", "Unknown")
if qtype != "artist"
else item["name"],
external_urls=item.get("external_urls"),
)
tracks.append(track)
total_results = result_section.get("total", 0)
logger.info(
f"🔍 Search returned {len(tracks)} items (total available: {total_results})"
)
log_pagination_info("search_tracks", total_results, limit, offset)
return {
"items": tracks,
"total": total_results,
"limit": result_section.get("limit", limit),
"offset": result_section.get("offset", offset),
"next": result_section.get("next"),
"previous": result_section.get("previous"),
}
except SpotifyException as e:
raise convert_spotify_error(e) from e
@mcp.tool()
@log_tool_execution
def add_to_queue(track_id: str) -> dict[str, str]:
"""Add a track to the playback queue.
Args:
track_id: Spotify track ID to add to queue
Returns:
Dict with status and message
"""
try:
logger.info(f"🎵 Adding track {track_id} to queue")
spotify_client.add_to_queue(f"spotify:track:{track_id}")
return {"status": "success", "message": "Added track to queue"}
except SpotifyException as e:
raise convert_spotify_error(e) from e
@mcp.tool()
@log_tool_execution
def get_queue() -> dict[str, Any]:
"""Get the current playback queue.
Returns:
Dict with currently_playing track and queue of upcoming tracks
"""
try:
logger.info("🎵 Getting playback queue")
result = spotify_client.queue()
queue_tracks = []
if result.get("queue"):
queue_tracks = [parse_track(item) for item in result["queue"]]
return {
"currently_playing": parse_track(result["currently_playing"]).model_dump()
if result.get("currently_playing")
else None,
"queue": [track.model_dump() for track in queue_tracks],
}
except SpotifyException as e:
raise convert_spotify_error(e) from e
@mcp.tool()
@log_tool_execution
def get_track_info(track_id: str) -> dict[str, Any]:
"""Get detailed information about a Spotify track.
Args:
track_id: Spotify track ID
Returns:
Dict with complete track metadata
"""
try:
logger.info(f"🎵 Getting track info: {track_id}")
result = spotify_client.track(track_id)
return parse_track(result).model_dump()
except SpotifyException as e:
raise convert_spotify_error(e) from e
@mcp.tool()
@log_tool_execution
def get_artist_info(artist_id: str) -> dict[str, Any]:
"""Get detailed information about a Spotify artist.
Args:
artist_id: Spotify artist ID
Returns:
Dict with artist info and top tracks
"""
try:
logger.info(f"🎤 Getting artist info: {artist_id}")
result = spotify_client.artist(artist_id)
top_tracks = spotify_client.artist_top_tracks(artist_id)
artist = Artist(
name=result["name"],
id=result["id"],
genres=result.get("genres", []),
popularity=result.get("popularity"),
followers=result.get("followers", {}).get("total"),
)
tracks = [parse_track(track) for track in top_tracks.get("tracks", [])[:10]]
return {
"artist": artist.model_dump(),
"top_tracks": [track.model_dump() for track in tracks],
}
except SpotifyException as e:
raise convert_spotify_error(e) from e
@mcp.tool()
@log_tool_execution
def get_playlist_info(playlist_id: str) -> dict[str, Any]:
"""Get basic information about a Spotify playlist.
Args:
playlist_id: Spotify playlist ID
Returns:
Dict with playlist metadata (no tracks - use get_playlist_tracks for tracks)
Note: This returns playlist info only. For tracks, use get_playlist_tracks
which supports full pagination for large playlists.
"""
try:
logger.info(f"📋 Getting playlist info: {playlist_id}")
result = spotify_client.playlist(
playlist_id, fields="id,name,description,owner,public,tracks.total"
)
playlist = Playlist(
name=result["name"],
id=result["id"],
owner=result.get("owner", {}).get("display_name"),
description=result.get("description"),
tracks=None, # No tracks - use get_playlist_tracks
total_tracks=result.get("tracks", {}).get("total"),
public=result.get("public"),
)
return playlist.model_dump()
except SpotifyException as e:
raise convert_spotify_error(e) from e
@mcp.tool()
@log_tool_execution
def create_playlist(
name: str, description: str = "", public: bool = True
) -> dict[str, Any]:
"""Create a new Spotify playlist.
Args:
name: Playlist name
description: Playlist description (default: empty)
public: Whether playlist is public (default: True)
Returns:
Dict with created playlist information
"""
try:
logger.info(f"🎧 Creating playlist: '{name}' (public={public})")
user = spotify_client.current_user()
result = spotify_client.user_playlist_create(
user["id"], name, public=public, description=description
)
playlist = Playlist(
name=result["name"],
id=result["id"],
owner=result.get("owner", {}).get("display_name"),
description=result.get("description"),
tracks=[],
total_tracks=0,
public=result.get("public"),
)
return playlist.model_dump()
except SpotifyException as e:
raise convert_spotify_error(e) from e
@mcp.tool()
@log_tool_execution
def add_tracks_to_playlist(playlist_id: str, track_uris: list[str]) -> dict[str, str]:
"""Add tracks to a playlist.
Args:
playlist_id: Playlist ID
track_uris: List of track URIs (up to 100)
"""
try:
# Convert track IDs to URIs if needed
uris = [
uri if uri.startswith("spotify:track:") else f"spotify:track:{uri}"
for uri in track_uris
]
logger.info(f"🎧 Adding {len(uris)} tracks to playlist {playlist_id}")
spotify_client.playlist_add_items(playlist_id, uris)
return {"status": "success", "message": f"Added {len(uris)} tracks to playlist"}
except SpotifyException as e:
raise convert_spotify_error(e) from e
@mcp.tool()
@log_tool_execution
def get_user_playlists(limit: int = 20, offset: int = 0) -> dict[str, Any]:
"""Get current user's playlists with pagination support.
Args:
limit: Max playlists to return per page (1-50, default 20)
offset: Number of playlists to skip for pagination (default 0)
Returns:
Dict with 'items' (list of playlists) and pagination info ('total', 'limit', 'offset')
Note: For users with many playlists, use offset to paginate through results.
Example: offset=0 gets playlists 1-20, offset=20 gets playlists 21-40, etc.
"""
try:
# Validate limit (Spotify API accepts 1-50)
limit = max(1, min(50, limit))
logger.info(f"📋 Getting user playlists (limit={limit}, offset={offset})")
result = spotify_client.current_user_playlists(limit=limit, offset=offset)
# Log pagination info
log_pagination_info("get_user_playlists", result.get("total", 0), limit, offset)
playlists = []
for item in result.get("items", []):
playlist = Playlist(
name=item["name"],
id=item["id"],
owner=item.get("owner", {}).get("display_name"),
description=item.get("description"),
total_tracks=item.get("tracks", {}).get("total"),
public=item.get("public"),
)
playlists.append(playlist)
return {
"items": playlists,
"total": result.get("total", 0),
"limit": result.get("limit", limit),
"offset": result.get("offset", offset),
"next": result.get("next"),
"previous": result.get("previous"),
}
except SpotifyException as e:
raise convert_spotify_error(e) from e
@mcp.tool()
@log_tool_execution
def get_playlist_tracks(
playlist_id: str, limit: int | None = None, offset: int = 0
) -> dict[str, Any]:
"""Get tracks from a playlist with full pagination support.
Args:
playlist_id: Playlist ID
limit: Max tracks to return (None for all tracks, up to 10,000 safety limit)
offset: Number of tracks to skip for pagination (default 0)
Returns:
Dict with 'items' (list of tracks), 'total', 'limit', 'offset'
Note: Large playlists require pagination. Use limit/offset to get specific ranges:
- Get first 100: limit=100, offset=0
- Get next 100: limit=100, offset=100
- Get all tracks: limit=None (use with caution on very large playlists)
"""
try:
logger.info(
f"📋 Getting playlist tracks: {playlist_id} (limit={limit}, offset={offset})"
)
tracks = get_playlist_tracks_paginated(playlist_id, limit, offset)
# Get total track count from playlist info
playlist_info = spotify_client.playlist(playlist_id, fields="tracks.total")
total_tracks = playlist_info.get("tracks", {}).get("total", len(tracks))
# Log pagination info
log_pagination_info("get_playlist_tracks", total_tracks, limit, offset)
logger.info(f"📋 Retrieved {len(tracks)} tracks from playlist {playlist_id}")
return {
"items": tracks,
"total": total_tracks,
"limit": limit,
"offset": offset,
"returned": len(tracks),
}
except SpotifyException as e:
raise convert_spotify_error(e) from e
@mcp.tool()
@log_tool_execution
def remove_tracks_from_playlist(
playlist_id: str, track_uris: list[str]
) -> dict[str, str]:
"""Remove tracks from a playlist.
Args:
playlist_id: Playlist ID
track_uris: List of track URIs to remove
"""
try:
# Convert track IDs to URIs if needed
uris = [
uri if uri.startswith("spotify:track:") else f"spotify:track:{uri}"
for uri in track_uris
]
logger.info(f"🚮 Removing {len(uris)} tracks from playlist {playlist_id}")
spotify_client.playlist_remove_all_occurrences_of_items(playlist_id, uris)
return {
"status": "success",
"message": f"Removed {len(uris)} tracks from playlist",
}
except SpotifyException as e:
raise convert_spotify_error(e) from e
@mcp.tool()
@log_tool_execution
def modify_playlist_details(
playlist_id: str,
name: str | None = None,
description: str | None = None,
public: bool | None = None,
) -> dict[str, str]:
"""Modify playlist details.
Args:
playlist_id: Playlist ID
name: New playlist name (optional)
description: New playlist description (optional)
public: Whether playlist should be public (optional)
"""
try:
if not name and not description and public is None:
raise ValueError(
"At least one of name, description, or public must be provided"
)
updates = []
if name:
updates.append(f"name='{name}'")
if description:
updates.append(f"description='{description}'")
if public is not None:
updates.append(f"public={public}")
logger.info(f"📋 Modifying playlist {playlist_id}: {', '.join(updates)}")
spotify_client.playlist_change_details(
playlist_id, name=name, description=description, public=public
)
return {"status": "success", "message": "Playlist details updated successfully"}
except SpotifyException as e:
raise convert_spotify_error(e) from e
# === RESOURCES ===
@mcp.resource("spotify://user/current")
def current_user() -> str:
"""Current user's profile."""
try:
user = spotify_client.current_user()
return json.dumps(
{
"id": user.get("id"),
"display_name": user.get("display_name"),
"followers": user.get("followers", {}).get("total"),
"country": user.get("country"),
"product": user.get("product"),
}
)
except Exception as e:
return json.dumps({"error": str(e)})
@mcp.resource("spotify://playback/current")
def current_playback_resource() -> str:
"""Current playback state."""
try:
playback = spotify_client.current_user_playing_track()
if not playback:
return json.dumps({"status": "no_playback"})
track_info = playback.get("item", {})
return json.dumps(
{
"is_playing": playback.get("is_playing", False),
"track": {
"name": track_info.get("name"),
"artist": track_info.get("artists", [{}])[0].get("name"),
"album": track_info.get("album", {}).get("name"),
"id": track_info.get("id"),
}
if track_info
else None,
"device": playback.get("device", {}).get("name"),
"progress_ms": playback.get("progress_ms"),
}
)
except Exception as e:
return json.dumps({"error": str(e)})
# === PROMPTS ===
@mcp.prompt()
def create_mood_playlist(mood: str, genre: str = "", decade: str = "") -> str:
"""Create a playlist based on mood and preferences."""
prompt = f"Create a Spotify playlist for a {mood} mood"
if genre:
prompt += f" with {genre} music"
if decade:
prompt += f" from the {decade}"
return f"""{prompt}.
Workflow:
1. Use search_tracks with different queries to find diverse songs
- For large search results, use offset parameter to get more options
- Example: search_tracks("upbeat pop", limit=20, offset=0) then offset=20 for more
2. Create playlist with create_playlist
3. Add tracks with add_tracks_to_playlist (supports up to 100 tracks per call)
Pagination Tips:
- Search results are paginated (limit=1-50, use offset for more results)
- For variety, try multiple search queries with different offsets
- Large playlists: batch add tracks in groups of 50-100
Consider:
1. Energy level for {mood} mood
2. {f"Focus on {genre}" if genre else "Genre variety"}
3. {f"Songs from {decade}" if decade else "Mix of eras"}
4. 15-20 songs with good flow"""
@mcp.prompt()
def analyze_large_playlist(playlist_id: str, analysis_type: str = "overview") -> str:
"""Analyze a large playlist efficiently using pagination."""
return f"""Analyze playlist {playlist_id} with focus on {analysis_type}.
For large playlists (>100 tracks), use pagination to analyze efficiently:
Step 1: Get overview
- Use get_item_info(playlist_id, "playlist") for basic info and first 50 tracks
- Check total_tracks to understand playlist size
Step 2: Full analysis (if needed)
- For playlists >100 tracks, use get_playlist_tracks with pagination:
- get_playlist_tracks(playlist_id, limit=100, offset=0) for first 100
- get_playlist_tracks(playlist_id, limit=100, offset=100) for next 100
- Continue until you have all tracks or sufficient sample
Step 3: Analysis
Based on analysis_type:
- "overview": Basic stats, genres, mood distribution
- "detailed": Track-by-track analysis, recommendations
- "duplicates": Find duplicate tracks across large playlist
- "mood": Analyze mood/energy progression through playlist
Pagination Benefits:
- Memory efficient for 1000+ track playlists
- Can stop early if sufficient data collected
- Allows progressive analysis with user feedback"""
@mcp.prompt()
def discover_music_systematically(
seed_query: str, exploration_depth: str = "medium"
) -> str:
"""Systematically discover music using search pagination."""
return f"""Discover music related to "{seed_query}" with {exploration_depth} exploration.
Search Strategy with Pagination:
1. Initial search: search_tracks("{seed_query}", limit=20, offset=0)
2. Diverse results: Use different offsets to explore deeper:
- Popular results: offset=0-20
- Hidden gems: offset=20-40, offset=40-60
- Deep cuts: offset=80-100+
3. Related searches with pagination:
- Artist names from initial results
- Album names from initial results
- Genre + decade combinations
- Similar mood/energy descriptors
Exploration Depth:
- "light": 2-3 search queries, 20 results each
- "medium": 5-6 search queries, explore offsets 0-40
- "deep": 10+ search queries, explore offsets 0-100+
Pagination Best Practices:
- Start with limit=20 for quick overview
- Use offset to avoid duplicate results
- Try different query variations rather than just advancing offset
- Stop when you find enough quality matches
Output: Curated list of 15-25 discovered tracks with variety"""
if __name__ == "__main__":
mcp.run()