Skip to main content
Glama

Spotify MCP Server

by boristopalov
server.py10 kB
import asyncio import os import logging from enum import Enum import json from typing import List, Optional, Tuple from datetime import datetime from pathlib import Path import mcp.types as types from mcp.server import NotificationOptions, Server, stdio_server from pydantic import BaseModel, Field, AnyUrl from spotipy import SpotifyException from . import spotify_api def setup_logger(): # TODO: can use mcp.server.stdio logger = logging.getLogger("spotify_mcp") # Check if LOGGING_PATH environment variable is set logging_path = os.getenv("LOGGING_PATH") if os.getenv("LOGGING_PATH"): log_dir = Path(logging_path) log_dir.mkdir(parents=True, exist_ok=True) formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') log_file = log_dir / f"spotify_mcp_{datetime.now().strftime('%Y%m%d')}.log" file_handler = logging.FileHandler(log_file) file_handler.setFormatter(formatter) error_log_file = log_dir / f"spotify_mcp_errors_{datetime.now().strftime('%Y%m%d')}.log" error_file_handler = logging.FileHandler(error_log_file) error_file_handler.setFormatter(formatter) error_file_handler.setLevel(logging.ERROR) # Configure logger with both handlers logger.setLevel(logging.INFO) logger.addHandler(file_handler) logger.addHandler(error_file_handler) else: # Use default logging configuration logger.setLevel(logging.INFO) return logger logger = setup_logger() server = Server("spotify-mcp") spotify_client = spotify_api.Client(logger) class ToolModel(BaseModel): @classmethod def as_tool(cls): return types.Tool( name="Spotify" + cls.__name__, description=cls.__doc__, inputSchema=cls.model_json_schema() ) class Playback(ToolModel): """Manages the current playback with the following actions: - get: Get information about user's current track. - start: Starts of resumes playback. - pause: Pauses current playback. - skip: Skips current track. """ action: str = Field(description="Action to perform: 'get', 'start', 'pause' or 'skip'.") track_id: Optional[str] = Field(default=None, description="Specifies track to play for 'start' action. If omitted, resumes current playback.") num_skips: Optional[int] = Field(default=1, description="Number of tracks to skip for `skip` action.") class Queue(ToolModel): """Manage the playback queue - get the queue or add tracks.""" action: str = Field(description="Action to perform: 'add' or 'get'.") track_id: Optional[str] = Field(default=None, description="Track ID to add to queue (required for add action)") class GetInfo(ToolModel): """Get detailed information about a Spotify item (track, album, artist, or playlist).""" item_id: str = Field(description="ID of the item to get information about") qtype: str = Field(default="track", description="Type of item: 'track', 'album', 'artist', or 'playlist'. " "If 'playlist' or 'album', returns its tracks. If 'artist'," "returns albums and top tracks.") class Search(ToolModel): """Search for tracks, albums, artists, or playlists on Spotify.""" query: str = Field(description="query term") qtype: Optional[str] = Field(default="track", description="Type of items to search for (track, album, artist, playlist, or comma-separated combination)") limit: Optional[int] = Field(default=10, description="Maximum number of items to return") @server.list_tools() async def handle_list_tools() -> list[types.Tool]: """List available tools.""" logger.info("Listing available tools") tools = [ Playback.as_tool(), Search.as_tool(), Queue.as_tool(), GetInfo.as_tool(), ] logger.info(f"Available tools: {[tool.name for tool in tools]}") return tools @server.call_tool() async def handle_call_tool( name: str, arguments: dict | None ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: """Handle tool execution requests.""" logger.info(f"Tool called: {name} with arguments: {arguments}") assert name[:7] == "Spotify", f"Unknown tool: {name}" try: match name[7:]: case "Playback": action = arguments.get("action") match action: case "get": logger.info("Attempting to get current track") curr_track = spotify_client.get_current_track() if curr_track: logger.info(f"Current track retrieved: {curr_track.get('name', 'Unknown')}") return [types.TextContent( type="text", text=json.dumps(curr_track, indent=2) )] logger.info("No track currently playing") return [types.TextContent( type="text", text="No track playing." )] case "start": logger.info(f"Starting playback with arguments: {arguments}") spotify_client.start_playback(track_id=arguments.get("track_id")) logger.info("Playback started successfully") return [types.TextContent( type="text", text="Playback starting with no errors." )] case "pause": logger.info("Attempting to pause playback") spotify_client.pause_playback() logger.info("Playback paused successfully") return [types.TextContent( type="text", text="Playback paused successfully." )] case "skip": num_skips = int(arguments.get("num_skips", 1)) logger.info(f"Skipping {num_skips} tracks.") spotify_client.skip_track(n=num_skips) return [types.TextContent( type="text", text="Skipped to next track." )] case "Search": logger.info(f"Performing search with arguments: {arguments}") search_results = spotify_client.search( query=arguments.get("query", ""), qtype=arguments.get("qtype", "track"), limit=arguments.get("limit", 10) ) logger.info("Search completed successfully") return [types.TextContent( type="text", text=json.dumps(search_results, indent=2) )] case "Queue": logger.info(f"Queue operation with arguments: {arguments}") action = arguments.get("action") match action: case "add": track_id = arguments.get("track_id") if not track_id: logger.error("track_id is required for add to queue.") return [types.TextContent( type="text", text="track_id is required for add action" )] spotify_client.add_to_queue(track_id) return [types.TextContent( type="text", text=f"Track added to queue successfully." )] case "get": queue = spotify_client.get_queue() return [types.TextContent( type="text", text=json.dumps(queue, indent=2) )] case _: return [types.TextContent( type="text", text=f"Unknown queue action: {action}. Supported actions are: add, remove, and get." )] case "GetInfo": logger.info(f"Getting item info with arguments: {arguments}") item_info = spotify_client.get_info( item_id=arguments.get("item_id"), qtype=arguments.get("qtype", "track") ) return [types.TextContent( type="text", text=json.dumps(item_info, indent=2) )] case _: error_msg = f"Unknown tool: {name}" logger.error(error_msg) raise ValueError(error_msg) except SpotifyException as se: error_msg = f"Spotify Client error occurred: {str(se)}" logger.error(error_msg, exc_info=True) return [types.TextContent( type="text", text=f"An error occurred with the Spotify Client: {str(se)}" )] except Exception as e: error_msg = f"Unexpected error occurred: {str(e)}" logger.error(error_msg, exc_info=True) raise async def main(): logger.info("Starting Spotify MCP server") try: options = server.create_initialization_options() async with stdio_server() as (read_stream, write_stream): logger.info("Server initialized successfully") await server.run( read_stream, write_stream, options ) except Exception as e: logger.error(f"Server error occurred: {str(e)}", exc_info=True) raise

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/boristopalov/spotify-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server