spotify_mcp.py•22.3 kB
"""
Spotify MCP Server
"""
import asyncio
import json
import os
import sys
from typing import Any, Optional
from urllib.parse import urlencode, parse_qs
import httpx
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
# Spotify API配置
SPOTIFY_API_BASE = "https://api.spotify.com/v1"
SPOTIFY_AUTH_BASE = "https://accounts.spotify.com"
SPOTIFY_TOKEN_URL = f"{SPOTIFY_AUTH_BASE}/api/token"
# OAuth配置(从环境变量读取)
CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID", "")
CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET", "")
REDIRECT_URI = os.getenv("SPOTIFY_REDIRECT_URI", "http://localhost:8888/callback")
SCOPE = "user-read-playback-state user-modify-playback-state user-read-currently-playing user-library-read playlist-read-private playlist-read-collaborative playlist-modify-private playlist-modify-public"
# Token存储文件
TOKEN_FILE = os.path.expanduser("~/.spotify_mcp_token.json")
# 全局token存储
_access_token: Optional[str] = None
_refresh_token: Optional[str] = None
class SpotifyClient:
"""Spotify API客户端"""
def __init__(self, access_token: str):
self.access_token = access_token
self.headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
async def _request(self, method: str, endpoint: str, **kwargs) -> dict:
"""发送API请求"""
url = f"{SPOTIFY_API_BASE}{endpoint}"
async with httpx.AsyncClient() as client:
response = await client.request(
method, url, headers=self.headers, **kwargs
)
# 有些设备返回纯文本,如 "Command sent"
response.raise_for_status()
text = response.text.strip()
try:
return response.json()
except Exception:
# 返回原始文本,不报错
return {"success": True, "raw": text}
async def get_current_playback(self) -> dict:
"""获取当前播放状态"""
return await self._request("GET", "/me/player")
async def play(self, device_id: Optional[str] = None, context_uri: Optional[str] = None) -> dict:
"""播放音乐"""
data = {}
if context_uri:
data["context_uri"] = context_uri
params = {}
if device_id:
params["device_id"] = device_id
return await self._request("PUT", "/me/player/play", json=data, params=params)
async def pause(self, device_id: Optional[str] = None) -> dict:
"""暂停播放"""
params = {}
if device_id:
params["device_id"] = device_id
return await self._request("PUT", "/me/player/pause", params=params)
async def next_track(self, device_id: Optional[str] = None) -> dict:
"""下一首"""
params = {}
if device_id:
params["device_id"] = device_id
return await self._request("POST", "/me/player/next", params=params)
async def previous_track(self, device_id: Optional[str] = None) -> dict:
"""上一首"""
params = {}
if device_id:
params["device_id"] = device_id
return await self._request("POST", "/me/player/previous", params=params)
async def set_volume(self, volume_percent: int, device_id: Optional[str] = None) -> dict:
"""设置音量 (0-100)"""
params = {"volume_percent": volume_percent}
if device_id:
params["device_id"] = device_id
return await self._request("PUT", "/me/player/volume", params=params)
async def seek(self, position_ms: int, device_id: Optional[str] = None) -> dict:
"""跳转到指定位置(毫秒)"""
params = {"position_ms": position_ms}
if device_id:
params["device_id"] = device_id
return await self._request("PUT", "/me/player/seek", params=params)
async def get_devices(self) -> dict:
"""获取可用设备列表"""
return await self._request("GET", "/me/player/devices")
# 搜索指定艺术家的指定歌曲
async def search_song_by_artist(self, artist: str, song: str) -> Optional[str]:
query = f"track:{song} artist:{artist}"
result = await self._request("GET", "/search", params={
"q": query,
"type": "track",
"limit": 1
})
items = result.get("tracks", {}).get("items", [])
if not items:
return None
return items[0]["uri"]
# 播放指定艺术家的指定歌曲
async def play_song_by_artist(self, artist: str, song: str, device_id=None) -> str:
uri = await self.search_song_by_artist(artist, song)
if not uri:
return f"找不到 {artist} 的《{song}》"
data = {"uris": [uri]}
params = {}
if device_id:
params["device_id"] = device_id
await self._request("PUT", "/me/player/play", json=data, params=params)
return f"正在播放 {artist} 的《{song}》"
# 搜索艺术家id
async def search_artist_id(self, artist: str) -> Optional[str]:
result = await self._request("GET", "/search", params={
"q": artist,
"type": "artist",
"limit": 1
})
items = result.get("artists", {}).get("items", [])
if not items:
return None
return items[0]["id"]
# 根据艺术家id获取top tracks
async def get_artist_top_tracks(self, artist_id: str, limit: int = 10) -> list[str]:
result = await self._request(
"GET",
f"/artists/{artist_id}/top-tracks",
params={"market": "US"},
)
tracks = result.get("tracks", [])[:limit]
return [track["uri"] for track in tracks]
# 播放指定艺术家的top tracks
async def play_artist_top_songs(self, artist: str, limit: int = 10, device_id=None) -> str:
artist_id = await self.search_artist_id(artist)
if not artist_id:
return f"找不到歌手:{artist}"
uris = await self.get_artist_top_tracks(artist_id, limit)
if not uris:
return f"{artist} 没有找到热门歌曲"
data = {"uris": uris}
params = {}
if device_id:
params["device_id"] = device_id
await self._request("PUT", "/me/player/play", json=data, params=params)
return f"正在播放 {artist} 的最热 {len(uris)} 首歌曲"
def load_tokens() -> tuple[Optional[str], Optional[str]]:
"""从文件加载token"""
if os.path.exists(TOKEN_FILE):
try:
with open(TOKEN_FILE, "r") as f:
data = json.load(f)
return data.get("access_token"), data.get("refresh_token")
except Exception:
pass
return None, None
def save_tokens(access_token: str, refresh_token: str):
"""保存token到文件"""
os.makedirs(os.path.dirname(TOKEN_FILE), exist_ok=True)
with open(TOKEN_FILE, "w") as f:
json.dump({
"access_token": access_token,
"refresh_token": refresh_token
}, f)
async def refresh_access_token(refresh_token: str) -> tuple[str, str]:
"""刷新访问token"""
async with httpx.AsyncClient() as client:
response = await client.post(
SPOTIFY_TOKEN_URL,
data={
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
},
)
response.raise_for_status()
data = response.json()
new_access_token = data["access_token"]
new_refresh_token = data.get("refresh_token", refresh_token)
return new_access_token, new_refresh_token
async def get_authenticated_client() -> Optional[SpotifyClient]:
"""获取已认证的Spotify客户端"""
global _access_token, _refresh_token
# 从内存或文件加载token
if not _access_token:
_access_token, _refresh_token = load_tokens()
if not _access_token:
return None
# 尝试使用token,如果失败则刷新
try:
client = SpotifyClient(_access_token)
await client.get_current_playback()
return client
except httpx.HTTPStatusError as e:
if e.response.status_code == 401 and _refresh_token:
# Token过期,尝试刷新
try:
_access_token, _refresh_token = await refresh_access_token(_refresh_token)
save_tokens(_access_token, _refresh_token)
return SpotifyClient(_access_token)
except Exception:
return None
return None
except Exception:
return None
def get_auth_url() -> str:
"""生成OAuth授权URL"""
params = {
"client_id": CLIENT_ID,
"response_type": "code",
"redirect_uri": REDIRECT_URI,
"scope": SCOPE,
}
return f"{SPOTIFY_AUTH_BASE}/authorize?{urlencode(params)}"
async def exchange_code_for_token(code: str) -> tuple[str, str]:
"""用授权码换取token"""
async with httpx.AsyncClient() as client:
response = await client.post(
SPOTIFY_TOKEN_URL,
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": REDIRECT_URI,
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
},
)
response.raise_for_status()
data = response.json()
return data["access_token"], data["refresh_token"]
# 创建MCP服务器
server = Server("spotify-mcp")
@server.list_tools()
async def list_tools() -> list[Tool]:
"""列出所有可用的工具"""
return [
Tool(
name="spotify_get_auth_url",
description="获取Spotify OAuth授权URL。用户需要访问此URL进行授权,然后将返回的授权码用于spotify_authenticate工具",
inputSchema={
"type": "object",
"properties": {},
},
),
Tool(
name="spotify_authenticate",
description="使用授权码完成Spotify OAuth认证。需要先调用spotify_get_auth_url获取授权URL",
inputSchema={
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "从授权URL回调中获取的授权码"
}
},
"required": ["code"]
},
),
Tool(
name="spotify_get_current_playback",
description="获取当前播放状态,包括当前歌曲、播放进度、设备等信息",
inputSchema={
"type": "object",
"properties": {
"device_id": {
"type": "string",
"description": "设备ID(可选)"
}
},
},
),
Tool(
name="spotify_pause",
description="暂停播放",
inputSchema={
"type": "object",
"properties": {
"device_id": {
"type": "string",
"description": "设备ID(可选)"
}
},
},
),
Tool(
name="spotify_next",
description="播放下一首歌曲",
inputSchema={
"type": "object",
"properties": {
"device_id": {
"type": "string",
"description": "设备ID(可选)"
}
},
},
),
Tool(
name="spotify_previous",
description="播放上一首歌曲",
inputSchema={
"type": "object",
"properties": {
"device_id": {
"type": "string",
"description": "设备ID(可选)"
}
},
},
),
Tool(
name="spotify_set_volume",
description="设置音量,范围0-100",
inputSchema={
"type": "object",
"properties": {
"volume_percent": {
"type": "integer",
"description": "音量百分比(0-100)",
"minimum": 0,
"maximum": 100
},
"device_id": {
"type": "string",
"description": "设备ID(可选)"
}
},
"required": ["volume_percent"]
},
),
Tool(
name="spotify_seek",
description="跳转到指定播放位置",
inputSchema={
"type": "object",
"properties": {
"position_ms": {
"type": "integer",
"description": "目标位置(毫秒)"
},
"device_id": {
"type": "string",
"description": "设备ID(可选)"
}
},
"required": ["position_ms"]
},
),
Tool(
name="spotify_get_devices",
description="获取所有可用的Spotify设备列表",
inputSchema={
"type": "object",
"properties": {},
},
),
Tool(
name="spotify_play_artist",
description="搜索并播放指定艺术家的歌曲,如 'Taylor Swift的歌曲'",
inputSchema={
"type": "object",
"properties": {
"artist": {
"type": "string",
"description": "艺术家名称"
}
},
"required": ["artist"]
},
),
Tool(
name="spotify_play_song",
description="根据歌手和歌曲名播放歌曲。例如:播放 Taylor Swift 的 Love Story",
inputSchema={
"type": "object",
"properties": {
"artist": {"type": "string", "description": "歌手名"},
"song": {"type": "string", "description": "歌曲名"},
"device_id": {"type": "string", "description": "播放设备ID(可选)"}
},
"required": ["artist", "song"]
},
),
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
"""处理工具调用"""
try:
if name == "spotify_get_auth_url":
if not CLIENT_ID:
return [TextContent(
type="text",
text="错误:未设置SPOTIFY_CLIENT_ID环境变量。请先配置Spotify OAuth凭据。"
)]
auth_url = get_auth_url()
return [TextContent(
type="text",
text=f"请访问以下URL进行授权:\n{auth_url}\n\n授权后,您将获得一个授权码,请使用spotify_authenticate工具完成认证。"
)]
elif name == "spotify_authenticate":
code = arguments.get("code")
if not code:
return [TextContent(
type="text",
text="错误:缺少授权码参数"
)]
try:
access_token, refresh_token = await exchange_code_for_token(code)
global _access_token, _refresh_token
_access_token = access_token
_refresh_token = refresh_token
save_tokens(access_token, refresh_token)
return [TextContent(
type="text",
text="认证成功!您现在可以使用其他Spotify控制功能了。"
)]
except Exception as e:
return [TextContent(
type="text",
text=f"认证失败:{str(e)}"
)]
# 其他功能需要认证
client = await get_authenticated_client()
if not client:
return [TextContent(
type="text",
text="错误:未认证。请先使用spotify_get_auth_url和spotify_authenticate进行认证。"
)]
if name == "spotify_get_current_playback":
try:
result = await client.get_current_playback()
if not result:
return [TextContent(
type="text",
text="当前没有播放任何内容"
)]
# 格式化输出
track = result.get("item", {})
device = result.get("device", {})
is_playing = result.get("is_playing", False)
progress_ms = result.get("progress_ms", 0)
output = f"播放状态:{'播放中' if is_playing else '已暂停'}\n"
output += f"设备:{device.get('name', '未知')}\n"
if track:
output += f"歌曲:{track.get('name', '未知')}\n"
artists = [a.get('name', '') for a in track.get('artists', [])]
output += f"艺术家:{', '.join(artists)}\n"
output += f"专辑:{track.get('album', {}).get('name', '未知')}\n"
output += f"进度:{progress_ms // 1000}秒 / {track.get('duration_ms', 0) // 1000}秒"
return [TextContent(type="text", text=output)]
except httpx.HTTPStatusError as e:
if e.response.status_code == 204:
return [TextContent(type="text", text="当前没有播放任何内容")]
return [TextContent(type="text", text=f"获取播放状态失败:{str(e)}")]
elif name == "spotify_pause":
try:
await client.pause(device_id=arguments.get("device_id"))
return [TextContent(type="text", text="已暂停")]
except Exception as e:
return [TextContent(type="text", text=f"暂停失败:{str(e)}")]
elif name == "spotify_next":
try:
await client.next_track(device_id=arguments.get("device_id"))
return [TextContent(type="text", text="已切换到下一首")]
except Exception as e:
return [TextContent(type="text", text=f"切换失败:{str(e)}")]
elif name == "spotify_previous":
try:
await client.previous_track(device_id=arguments.get("device_id"))
return [TextContent(type="text", text="已切换到上一首")]
except Exception as e:
return [TextContent(type="text", text=f"切换失败:{str(e)}")]
elif name == "spotify_set_volume":
try:
volume = arguments.get("volume_percent")
await client.set_volume(volume, device_id=arguments.get("device_id"))
return [TextContent(type="text", text=f"音量已设置为 {volume}%")]
except Exception as e:
return [TextContent(type="text", text=f"设置音量失败:{str(e)}")]
elif name == "spotify_seek":
try:
position_ms = arguments.get("position_ms")
await client.seek(position_ms, device_id=arguments.get("device_id"))
return [TextContent(type="text", text=f"已跳转到 {position_ms // 1000}秒")]
except Exception as e:
return [TextContent(type="text", text=f"跳转失败:{str(e)}")]
elif name == "spotify_get_devices":
try:
result = await client.get_devices()
devices = result.get("devices", [])
if not devices:
return [TextContent(type="text", text="没有可用的设备")]
output = "可用设备:\n"
for device in devices:
output += f"- {device.get('name', '未知')} ({device.get('type', '未知')})"
if device.get('is_active'):
output += " [当前活动]"
output += f"\n ID: {device.get('id')}\n"
return [TextContent(type="text", text=output)]
except Exception as e:
return [TextContent(type="text", text=f"获取设备列表失败:{str(e)}")]
elif name == "spotify_play_artist":
artist = arguments["artist"]
msg = await client.play_artist_top_songs(artist)
return [TextContent(type="text", text=msg)]
elif name == "spotify_play_song":
artist = arguments.get("artist")
song = arguments.get("song")
device = arguments.get("device_id")
msg = await client.play_song_by_artist(artist, song, device)
return [TextContent(type="text", text=msg)]
else:
return [TextContent(type="text", text=f"未知工具:{name}")]
except Exception as e:
return [TextContent(type="text", text=f"执行失败:{str(e)}")]
async def main():
"""主函数"""
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())