Skip to main content
Glama
spotify_mcp.py22.3 kB
""" Spotify MCP Server """ import asyncio import json import os import sys from typing import Any, Optional from urllib.parse import urlencode, parse_qs import httpx from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import Tool, TextContent # Spotify API配置 SPOTIFY_API_BASE = "https://api.spotify.com/v1" SPOTIFY_AUTH_BASE = "https://accounts.spotify.com" SPOTIFY_TOKEN_URL = f"{SPOTIFY_AUTH_BASE}/api/token" # OAuth配置(从环境变量读取) CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID", "") CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET", "") REDIRECT_URI = os.getenv("SPOTIFY_REDIRECT_URI", "http://localhost:8888/callback") SCOPE = "user-read-playback-state user-modify-playback-state user-read-currently-playing user-library-read playlist-read-private playlist-read-collaborative playlist-modify-private playlist-modify-public" # Token存储文件 TOKEN_FILE = os.path.expanduser("~/.spotify_mcp_token.json") # 全局token存储 _access_token: Optional[str] = None _refresh_token: Optional[str] = None class SpotifyClient: """Spotify API客户端""" def __init__(self, access_token: str): self.access_token = access_token self.headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json" } async def _request(self, method: str, endpoint: str, **kwargs) -> dict: """发送API请求""" url = f"{SPOTIFY_API_BASE}{endpoint}" async with httpx.AsyncClient() as client: response = await client.request( method, url, headers=self.headers, **kwargs ) # 有些设备返回纯文本,如 "Command sent" response.raise_for_status() text = response.text.strip() try: return response.json() except Exception: # 返回原始文本,不报错 return {"success": True, "raw": text} async def get_current_playback(self) -> dict: """获取当前播放状态""" return await self._request("GET", "/me/player") async def play(self, device_id: Optional[str] = None, context_uri: Optional[str] = None) -> dict: """播放音乐""" data = {} if context_uri: data["context_uri"] = context_uri params = {} if device_id: params["device_id"] = device_id return await self._request("PUT", "/me/player/play", json=data, params=params) async def pause(self, device_id: Optional[str] = None) -> dict: """暂停播放""" params = {} if device_id: params["device_id"] = device_id return await self._request("PUT", "/me/player/pause", params=params) async def next_track(self, device_id: Optional[str] = None) -> dict: """下一首""" params = {} if device_id: params["device_id"] = device_id return await self._request("POST", "/me/player/next", params=params) async def previous_track(self, device_id: Optional[str] = None) -> dict: """上一首""" params = {} if device_id: params["device_id"] = device_id return await self._request("POST", "/me/player/previous", params=params) async def set_volume(self, volume_percent: int, device_id: Optional[str] = None) -> dict: """设置音量 (0-100)""" params = {"volume_percent": volume_percent} if device_id: params["device_id"] = device_id return await self._request("PUT", "/me/player/volume", params=params) async def seek(self, position_ms: int, device_id: Optional[str] = None) -> dict: """跳转到指定位置(毫秒)""" params = {"position_ms": position_ms} if device_id: params["device_id"] = device_id return await self._request("PUT", "/me/player/seek", params=params) async def get_devices(self) -> dict: """获取可用设备列表""" return await self._request("GET", "/me/player/devices") # 搜索指定艺术家的指定歌曲 async def search_song_by_artist(self, artist: str, song: str) -> Optional[str]: query = f"track:{song} artist:{artist}" result = await self._request("GET", "/search", params={ "q": query, "type": "track", "limit": 1 }) items = result.get("tracks", {}).get("items", []) if not items: return None return items[0]["uri"] # 播放指定艺术家的指定歌曲 async def play_song_by_artist(self, artist: str, song: str, device_id=None) -> str: uri = await self.search_song_by_artist(artist, song) if not uri: return f"找不到 {artist} 的《{song}》" data = {"uris": [uri]} params = {} if device_id: params["device_id"] = device_id await self._request("PUT", "/me/player/play", json=data, params=params) return f"正在播放 {artist} 的《{song}》" # 搜索艺术家id async def search_artist_id(self, artist: str) -> Optional[str]: result = await self._request("GET", "/search", params={ "q": artist, "type": "artist", "limit": 1 }) items = result.get("artists", {}).get("items", []) if not items: return None return items[0]["id"] # 根据艺术家id获取top tracks async def get_artist_top_tracks(self, artist_id: str, limit: int = 10) -> list[str]: result = await self._request( "GET", f"/artists/{artist_id}/top-tracks", params={"market": "US"}, ) tracks = result.get("tracks", [])[:limit] return [track["uri"] for track in tracks] # 播放指定艺术家的top tracks async def play_artist_top_songs(self, artist: str, limit: int = 10, device_id=None) -> str: artist_id = await self.search_artist_id(artist) if not artist_id: return f"找不到歌手:{artist}" uris = await self.get_artist_top_tracks(artist_id, limit) if not uris: return f"{artist} 没有找到热门歌曲" data = {"uris": uris} params = {} if device_id: params["device_id"] = device_id await self._request("PUT", "/me/player/play", json=data, params=params) return f"正在播放 {artist} 的最热 {len(uris)} 首歌曲" def load_tokens() -> tuple[Optional[str], Optional[str]]: """从文件加载token""" if os.path.exists(TOKEN_FILE): try: with open(TOKEN_FILE, "r") as f: data = json.load(f) return data.get("access_token"), data.get("refresh_token") except Exception: pass return None, None def save_tokens(access_token: str, refresh_token: str): """保存token到文件""" os.makedirs(os.path.dirname(TOKEN_FILE), exist_ok=True) with open(TOKEN_FILE, "w") as f: json.dump({ "access_token": access_token, "refresh_token": refresh_token }, f) async def refresh_access_token(refresh_token: str) -> tuple[str, str]: """刷新访问token""" async with httpx.AsyncClient() as client: response = await client.post( SPOTIFY_TOKEN_URL, data={ "grant_type": "refresh_token", "refresh_token": refresh_token, "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET, }, ) response.raise_for_status() data = response.json() new_access_token = data["access_token"] new_refresh_token = data.get("refresh_token", refresh_token) return new_access_token, new_refresh_token async def get_authenticated_client() -> Optional[SpotifyClient]: """获取已认证的Spotify客户端""" global _access_token, _refresh_token # 从内存或文件加载token if not _access_token: _access_token, _refresh_token = load_tokens() if not _access_token: return None # 尝试使用token,如果失败则刷新 try: client = SpotifyClient(_access_token) await client.get_current_playback() return client except httpx.HTTPStatusError as e: if e.response.status_code == 401 and _refresh_token: # Token过期,尝试刷新 try: _access_token, _refresh_token = await refresh_access_token(_refresh_token) save_tokens(_access_token, _refresh_token) return SpotifyClient(_access_token) except Exception: return None return None except Exception: return None def get_auth_url() -> str: """生成OAuth授权URL""" params = { "client_id": CLIENT_ID, "response_type": "code", "redirect_uri": REDIRECT_URI, "scope": SCOPE, } return f"{SPOTIFY_AUTH_BASE}/authorize?{urlencode(params)}" async def exchange_code_for_token(code: str) -> tuple[str, str]: """用授权码换取token""" async with httpx.AsyncClient() as client: response = await client.post( SPOTIFY_TOKEN_URL, data={ "grant_type": "authorization_code", "code": code, "redirect_uri": REDIRECT_URI, "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET, }, ) response.raise_for_status() data = response.json() return data["access_token"], data["refresh_token"] # 创建MCP服务器 server = Server("spotify-mcp") @server.list_tools() async def list_tools() -> list[Tool]: """列出所有可用的工具""" return [ Tool( name="spotify_get_auth_url", description="获取Spotify OAuth授权URL。用户需要访问此URL进行授权,然后将返回的授权码用于spotify_authenticate工具", inputSchema={ "type": "object", "properties": {}, }, ), Tool( name="spotify_authenticate", description="使用授权码完成Spotify OAuth认证。需要先调用spotify_get_auth_url获取授权URL", inputSchema={ "type": "object", "properties": { "code": { "type": "string", "description": "从授权URL回调中获取的授权码" } }, "required": ["code"] }, ), Tool( name="spotify_get_current_playback", description="获取当前播放状态,包括当前歌曲、播放进度、设备等信息", inputSchema={ "type": "object", "properties": { "device_id": { "type": "string", "description": "设备ID(可选)" } }, }, ), Tool( name="spotify_pause", description="暂停播放", inputSchema={ "type": "object", "properties": { "device_id": { "type": "string", "description": "设备ID(可选)" } }, }, ), Tool( name="spotify_next", description="播放下一首歌曲", inputSchema={ "type": "object", "properties": { "device_id": { "type": "string", "description": "设备ID(可选)" } }, }, ), Tool( name="spotify_previous", description="播放上一首歌曲", inputSchema={ "type": "object", "properties": { "device_id": { "type": "string", "description": "设备ID(可选)" } }, }, ), Tool( name="spotify_set_volume", description="设置音量,范围0-100", inputSchema={ "type": "object", "properties": { "volume_percent": { "type": "integer", "description": "音量百分比(0-100)", "minimum": 0, "maximum": 100 }, "device_id": { "type": "string", "description": "设备ID(可选)" } }, "required": ["volume_percent"] }, ), Tool( name="spotify_seek", description="跳转到指定播放位置", inputSchema={ "type": "object", "properties": { "position_ms": { "type": "integer", "description": "目标位置(毫秒)" }, "device_id": { "type": "string", "description": "设备ID(可选)" } }, "required": ["position_ms"] }, ), Tool( name="spotify_get_devices", description="获取所有可用的Spotify设备列表", inputSchema={ "type": "object", "properties": {}, }, ), Tool( name="spotify_play_artist", description="搜索并播放指定艺术家的歌曲,如 'Taylor Swift的歌曲'", inputSchema={ "type": "object", "properties": { "artist": { "type": "string", "description": "艺术家名称" } }, "required": ["artist"] }, ), Tool( name="spotify_play_song", description="根据歌手和歌曲名播放歌曲。例如:播放 Taylor Swift 的 Love Story", inputSchema={ "type": "object", "properties": { "artist": {"type": "string", "description": "歌手名"}, "song": {"type": "string", "description": "歌曲名"}, "device_id": {"type": "string", "description": "播放设备ID(可选)"} }, "required": ["artist", "song"] }, ), ] @server.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: """处理工具调用""" try: if name == "spotify_get_auth_url": if not CLIENT_ID: return [TextContent( type="text", text="错误:未设置SPOTIFY_CLIENT_ID环境变量。请先配置Spotify OAuth凭据。" )] auth_url = get_auth_url() return [TextContent( type="text", text=f"请访问以下URL进行授权:\n{auth_url}\n\n授权后,您将获得一个授权码,请使用spotify_authenticate工具完成认证。" )] elif name == "spotify_authenticate": code = arguments.get("code") if not code: return [TextContent( type="text", text="错误:缺少授权码参数" )] try: access_token, refresh_token = await exchange_code_for_token(code) global _access_token, _refresh_token _access_token = access_token _refresh_token = refresh_token save_tokens(access_token, refresh_token) return [TextContent( type="text", text="认证成功!您现在可以使用其他Spotify控制功能了。" )] except Exception as e: return [TextContent( type="text", text=f"认证失败:{str(e)}" )] # 其他功能需要认证 client = await get_authenticated_client() if not client: return [TextContent( type="text", text="错误:未认证。请先使用spotify_get_auth_url和spotify_authenticate进行认证。" )] if name == "spotify_get_current_playback": try: result = await client.get_current_playback() if not result: return [TextContent( type="text", text="当前没有播放任何内容" )] # 格式化输出 track = result.get("item", {}) device = result.get("device", {}) is_playing = result.get("is_playing", False) progress_ms = result.get("progress_ms", 0) output = f"播放状态:{'播放中' if is_playing else '已暂停'}\n" output += f"设备:{device.get('name', '未知')}\n" if track: output += f"歌曲:{track.get('name', '未知')}\n" artists = [a.get('name', '') for a in track.get('artists', [])] output += f"艺术家:{', '.join(artists)}\n" output += f"专辑:{track.get('album', {}).get('name', '未知')}\n" output += f"进度:{progress_ms // 1000}秒 / {track.get('duration_ms', 0) // 1000}秒" return [TextContent(type="text", text=output)] except httpx.HTTPStatusError as e: if e.response.status_code == 204: return [TextContent(type="text", text="当前没有播放任何内容")] return [TextContent(type="text", text=f"获取播放状态失败:{str(e)}")] elif name == "spotify_pause": try: await client.pause(device_id=arguments.get("device_id")) return [TextContent(type="text", text="已暂停")] except Exception as e: return [TextContent(type="text", text=f"暂停失败:{str(e)}")] elif name == "spotify_next": try: await client.next_track(device_id=arguments.get("device_id")) return [TextContent(type="text", text="已切换到下一首")] except Exception as e: return [TextContent(type="text", text=f"切换失败:{str(e)}")] elif name == "spotify_previous": try: await client.previous_track(device_id=arguments.get("device_id")) return [TextContent(type="text", text="已切换到上一首")] except Exception as e: return [TextContent(type="text", text=f"切换失败:{str(e)}")] elif name == "spotify_set_volume": try: volume = arguments.get("volume_percent") await client.set_volume(volume, device_id=arguments.get("device_id")) return [TextContent(type="text", text=f"音量已设置为 {volume}%")] except Exception as e: return [TextContent(type="text", text=f"设置音量失败:{str(e)}")] elif name == "spotify_seek": try: position_ms = arguments.get("position_ms") await client.seek(position_ms, device_id=arguments.get("device_id")) return [TextContent(type="text", text=f"已跳转到 {position_ms // 1000}秒")] except Exception as e: return [TextContent(type="text", text=f"跳转失败:{str(e)}")] elif name == "spotify_get_devices": try: result = await client.get_devices() devices = result.get("devices", []) if not devices: return [TextContent(type="text", text="没有可用的设备")] output = "可用设备:\n" for device in devices: output += f"- {device.get('name', '未知')} ({device.get('type', '未知')})" if device.get('is_active'): output += " [当前活动]" output += f"\n ID: {device.get('id')}\n" return [TextContent(type="text", text=output)] except Exception as e: return [TextContent(type="text", text=f"获取设备列表失败:{str(e)}")] elif name == "spotify_play_artist": artist = arguments["artist"] msg = await client.play_artist_top_songs(artist) return [TextContent(type="text", text=msg)] elif name == "spotify_play_song": artist = arguments.get("artist") song = arguments.get("song") device = arguments.get("device_id") msg = await client.play_song_by_artist(artist, song, device) return [TextContent(type="text", text=msg)] else: return [TextContent(type="text", text=f"未知工具:{name}")] except Exception as e: return [TextContent(type="text", text=f"执行失败:{str(e)}")] async def main(): """主函数""" async with stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, server.create_initialization_options() ) if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/EvansLR/spotify-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server