app.py•14.6 kB
#!/usr/bin/env python3
"""FastAPI application for Spotify MCP Server on Vercel."""
import os
import json
import logging
from typing import Dict, Any, List
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse, HTMLResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from mcp_server import SpotifyMCPServer
from config import config
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Create FastAPI app
app = FastAPI(
title="Spotify MCP Server",
description="Model Context Protocol server for Spotify API integration",
version="1.0.0"
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Global MCP server instance
mcp_server = None
class ToolCallRequest(BaseModel):
name: str
arguments: Dict[str, Any] = {}
class ToolCallResponse(BaseModel):
success: bool
result: Any = None
error: str = None
@app.on_event("startup")
async def startup_event():
"""Initialize the MCP server on startup."""
global mcp_server
try:
logger.info("Initializing Spotify MCP Server...")
mcp_server = SpotifyMCPServer()
logger.info("MCP Server initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize MCP server: {e}")
# Don't fail startup - create a minimal server instance
mcp_server = type('MockMCPServer', (), {'spotify_client': None})()
@app.get("/")
async def root():
"""Root endpoint with server information."""
return {
"name": config.MCP_SERVER_NAME,
"version": config.MCP_SERVER_VERSION,
"description": config.MCP_SERVER_DESCRIPTION,
"status": "running",
"endpoints": {
"health": "/health",
"tools": "/tools",
"call_tool": "/call_tool"
}
}
@app.get("/health")
async def health_check():
"""Health check endpoint."""
try:
# Basic health check - just ensure the server is responding
return {
"status": "healthy",
"server": config.MCP_SERVER_NAME,
"version": config.MCP_SERVER_VERSION,
"mcp_server_ready": mcp_server is not None
}
except Exception as e:
logger.error(f"Health check failed: {e}")
return {
"status": "unhealthy",
"error": str(e)
}
@app.get("/tools")
async def list_tools():
"""List all available MCP tools."""
try:
if mcp_server is None:
raise HTTPException(status_code=500, detail="MCP server not initialized")
# Get tools from the MCP server
tools = []
expected_tools = [
'spotify_search', 'spotify_play', 'spotify_pause', 'spotify_resume',
'spotify_skip_next', 'spotify_skip_previous', 'spotify_set_volume',
'spotify_get_current_track', 'spotify_get_devices', 'spotify_get_user_playlists',
'spotify_create_playlist', 'spotify_add_to_playlist', 'spotify_get_user_top_tracks',
'spotify_get_recently_played', 'spotify_get_user_profile'
]
for tool_name in expected_tools:
tools.append({
"name": tool_name,
"description": f"Spotify {tool_name.replace('spotify_', '').replace('_', ' ')} functionality"
})
return {
"tools": tools,
"count": len(tools)
}
except Exception as e:
logger.error(f"Error listing tools: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/call_tool", response_model=ToolCallResponse)
async def call_tool(request: ToolCallRequest):
"""Call an MCP tool with the given arguments."""
try:
if mcp_server is None:
raise HTTPException(status_code=500, detail="MCP server not initialized")
# Initialize Spotify client if needed
if mcp_server.spotify_client is None:
from spotify_client_railway import SpotifyClientRailway
mcp_server.spotify_client = SpotifyClientRailway()
# Route to appropriate handler
result = await mcp_server._handle_tool_call(request.name, request.arguments)
return ToolCallResponse(
success=True,
result=result
)
except Exception as e:
logger.error(f"Error calling tool {request.name}: {e}")
return ToolCallResponse(
success=False,
error=str(e)
)
@app.get("/test")
async def test_endpoint():
"""Simple test endpoint."""
return {
"message": "Spotify MCP Server is running!",
"status": "success",
"server": config.MCP_SERVER_NAME
}
@app.get("/top-songs")
async def get_top_songs():
"""Get user's top songs."""
try:
if mcp_server is None:
raise HTTPException(status_code=500, detail="MCP server not initialized")
# Initialize Spotify client if needed
if mcp_server.spotify_client is None:
from spotify_client_railway import SpotifyClientRailway
mcp_server.spotify_client = SpotifyClientRailway()
# Get top tracks
top_tracks = mcp_server.spotify_client.get_user_top_tracks(limit=10)
# Format the response
formatted_tracks = []
for i, track in enumerate(top_tracks, 1):
formatted_tracks.append({
"rank": i,
"name": track.get('name', 'Unknown'),
"artists": [artist['name'] for artist in track.get('artists', [])],
"album": track.get('album', {}).get('name', 'Unknown Album'),
"popularity": track.get('popularity', 0),
"duration_ms": track.get('duration_ms', 0),
"external_urls": track.get('external_urls', {})
})
return {
"success": True,
"tracks": formatted_tracks,
"count": len(formatted_tracks)
}
except Exception as e:
logger.error(f"Error getting top songs: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/recent-songs")
async def get_recent_songs():
"""Get user's recently played songs."""
try:
if mcp_server is None:
raise HTTPException(status_code=500, detail="MCP server not initialized")
# Initialize Spotify client if needed
if mcp_server.spotify_client is None:
from spotify_client_railway import SpotifyClientRailway
mcp_server.spotify_client = SpotifyClientRailway()
# Get recent tracks
recent_tracks = mcp_server.spotify_client.get_recently_played(limit=10)
# Format the response
formatted_tracks = []
for i, item in enumerate(recent_tracks, 1):
track = item.get('track', {})
formatted_tracks.append({
"rank": i,
"name": track.get('name', 'Unknown'),
"artists": [artist['name'] for artist in track.get('artists', [])],
"album": track.get('album', {}).get('name', 'Unknown Album'),
"played_at": item.get('played_at', 'Unknown time'),
"popularity": track.get('popularity', 0),
"duration_ms": track.get('duration_ms', 0),
"external_urls": track.get('external_urls', {})
})
return {
"success": True,
"tracks": formatted_tracks,
"count": len(formatted_tracks)
}
except Exception as e:
logger.error(f"Error getting recent songs: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/dashboard", response_class=HTMLResponse)
async def dashboard():
"""Simple HTML dashboard to display top songs."""
html_content = """
<!DOCTYPE html>
<html>
<head>
<title>🎧 Spotify Dashboard</title>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<style>
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
margin: 0;
padding: 20px;
background: linear-gradient(135deg, #1DB954, #191414);
color: white;
min-height: 100vh;
}
.container {
max-width: 1200px;
margin: 0 auto;
}
.header {
text-align: center;
margin-bottom: 40px;
}
.header h1 {
font-size: 3em;
margin: 0;
text-shadow: 2px 2px 4px rgba(0,0,0,0.5);
}
.section {
background: rgba(255,255,255,0.1);
border-radius: 15px;
padding: 30px;
margin-bottom: 30px;
backdrop-filter: blur(10px);
}
.section h2 {
color: #1DB954;
border-bottom: 2px solid #1DB954;
padding-bottom: 10px;
margin-bottom: 20px;
}
.track {
background: rgba(255,255,255,0.05);
border-radius: 10px;
padding: 15px;
margin-bottom: 15px;
border-left: 4px solid #1DB954;
}
.track-name {
font-size: 1.2em;
font-weight: bold;
margin-bottom: 5px;
}
.track-info {
color: #ccc;
font-size: 0.9em;
}
.loading {
text-align: center;
font-size: 1.2em;
color: #1DB954;
}
.error {
background: rgba(255,0,0,0.2);
border-left-color: #ff4444;
color: #ffcccc;
}
.refresh-btn {
background: #1DB954;
color: white;
border: none;
padding: 10px 20px;
border-radius: 25px;
cursor: pointer;
font-size: 1em;
margin: 10px;
}
.refresh-btn:hover {
background: #1ed760;
}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>🎧 Your Spotify Dashboard</h1>
<button class="refresh-btn" onclick="loadData()">🔄 Refresh</button>
</div>
<div class="section">
<h2>🏆 Your Top 10 Songs</h2>
<div id="top-songs" class="loading">Loading your top songs...</div>
</div>
<div class="section">
<h2>🕒 Recently Played</h2>
<div id="recent-songs" class="loading">Loading recent songs...</div>
</div>
</div>
<script>
function formatDuration(ms) {
const minutes = Math.floor(ms / 60000);
const seconds = Math.floor((ms % 60000) / 1000);
return `${minutes}:${seconds.toString().padStart(2, '0')}`;
}
function renderTracks(tracks, containerId, showPlayedAt = false) {
const container = document.getElementById(containerId);
if (!tracks || tracks.length === 0) {
container.innerHTML = '<div class="track error">No tracks found</div>';
return;
}
container.innerHTML = tracks.map(track => `
<div class="track">
<div class="track-name">
${track.rank}. ${track.name}
</div>
<div class="track-info">
👤 ${track.artists.join(', ')} |
💿 ${track.album} |
⏱️ ${formatDuration(track.duration_ms)} |
📊 ${track.popularity}/100
${showPlayedAt && track.played_at ? `<br>🕒 ${new Date(track.played_at).toLocaleString()}` : ''}
</div>
</div>
`).join('');
}
async function loadTopSongs() {
try {
const response = await fetch('/top-songs');
const data = await response.json();
if (data.success) {
renderTracks(data.tracks, 'top-songs');
} else {
document.getElementById('top-songs').innerHTML = '<div class="track error">Failed to load top songs</div>';
}
} catch (error) {
document.getElementById('top-songs').innerHTML = '<div class="track error">Error loading top songs: ' + error.message + '</div>';
}
}
async function loadRecentSongs() {
try {
const response = await fetch('/recent-songs');
const data = await response.json();
if (data.success) {
renderTracks(data.tracks, 'recent-songs', true);
} else {
document.getElementById('recent-songs').innerHTML = '<div class="track error">Failed to load recent songs</div>';
}
} catch (error) {
document.getElementById('recent-songs').innerHTML = '<div class="track error">Error loading recent songs: ' + error.message + '</div>';
}
}
function loadData() {
loadTopSongs();
loadRecentSongs();
}
// Load data when page loads
loadData();
</script>
</body>
</html>
"""
return html_content
# For Vercel deployment
def handler(request):
"""Vercel serverless function handler."""
return app
if __name__ == "__main__":
import uvicorn
port = int(os.environ.get("PORT", 8000))
uvicorn.run(app, host="0.0.0.0", port=port)