Skip to main content
Glama
utils.py7.07 kB
import functools from collections import defaultdict from collections.abc import Callable from typing import Any, TypeVar from urllib.parse import quote, urlparse, urlunparse T = TypeVar("T") def normalize_redirect_uri(url: str) -> str: """Normalize redirect URI to meet Spotify's requirements. Converts localhost to 127.0.0.1 for better Spotify API compatibility. """ if not url: return url parsed = urlparse(url) # Convert localhost to 127.0.0.1 if parsed.netloc == "localhost" or parsed.netloc.startswith("localhost:"): port = "" if ":" in parsed.netloc: port = ":" + parsed.netloc.split(":")[1] parsed = parsed._replace(netloc=f"127.0.0.1{port}") return urlunparse(parsed) def parse_track( track_item: dict[str, Any] | None, detailed: bool = False ) -> dict[str, Any] | None: if not track_item: return None narrowed_item = { "name": track_item["name"], "id": track_item["id"], } if "is_playing" in track_item: narrowed_item["is_playing"] = track_item["is_playing"] if detailed: narrowed_item["album"] = parse_album(track_item.get("album")) for k in ["track_number", "duration_ms"]: narrowed_item[k] = track_item.get(k) if not track_item.get("is_playable", True): narrowed_item["is_playable"] = False artists = [a["name"] for a in track_item["artists"]] if detailed: artists = [parse_artist(a) for a in track_item["artists"]] if len(artists) == 1: narrowed_item["artist"] = artists[0] else: narrowed_item["artists"] = artists return narrowed_item def parse_artist( artist_item: dict[str, Any] | None, detailed: bool = False ) -> dict[str, Any] | None: if not artist_item: return None narrowed_item = { "name": artist_item["name"], "id": artist_item["id"], } if detailed: narrowed_item["genres"] = artist_item.get("genres") return narrowed_item def parse_playlist( playlist_item: dict[str, Any] | None, detailed: bool = False ) -> dict[str, Any] | None: if not playlist_item: return None narrowed_item = { "name": playlist_item["name"], "id": playlist_item["id"], "owner": playlist_item["owner"]["display_name"], } if detailed: narrowed_item["description"] = playlist_item.get("description") tracks = [] for t in playlist_item["tracks"]["items"]: tracks.append(parse_track(t["track"])) narrowed_item["tracks"] = tracks return narrowed_item def parse_album( album_item: dict[str, Any] | None, detailed: bool = False ) -> dict[str, Any] | None: if not album_item: return None narrowed_item = { "name": album_item["name"], "id": album_item["id"], } artists = [a["name"] for a in album_item["artists"]] if detailed: tracks = [] for t in album_item["tracks"]["items"]: tracks.append(parse_track(t)) narrowed_item["tracks"] = tracks artists = [parse_artist(a) for a in album_item["artists"]] for k in ["total_tracks", "release_date", "genres"]: narrowed_item[k] = album_item.get(k) if len(artists) == 1: narrowed_item["artist"] = artists[0] else: narrowed_item["artists"] = artists return narrowed_item def parse_search_results( results: dict[str, Any] | None, qtype: str ) -> dict[str, list[dict[str, Any]]]: if not results: return {} _results = defaultdict(list) for q in qtype.split(","): match q: case "track": for _idx, item in enumerate(results["tracks"]["items"]): if not item: continue _results["tracks"].append(parse_track(item)) case "artist": for _idx, item in enumerate(results["artists"]["items"]): if not item: continue _results["artists"].append(parse_artist(item)) case "playlist": for _idx, item in enumerate(results["playlists"]["items"]): if not item: continue _results["playlists"].append(parse_playlist(item)) case "album": for _idx, item in enumerate(results["albums"]["items"]): if not item: continue _results["albums"].append(parse_album(item)) case _: raise ValueError(f"uknown qtype {qtype}") # Filter out None values and convert to regular dict filtered_results = { key: [item for item in items if item is not None] for key, items in _results.items() } return dict(filtered_results) def build_search_query( base_query: str, artist: str | None = None, track: str | None = None, album: str | None = None, year: str | None = None, year_range: tuple[int, int] | None = None, # upc: Optional[str] = None, # isrc: Optional[str] = None, genre: str | None = None, is_hipster: bool = False, is_new: bool = False, ) -> str: """ Build a search query string with optional filters. Args: base_query: Base search term artist: Artist name filter track: Track name filter album: Album name filter year: Specific year filter year_range: Tuple of (start_year, end_year) for year range filter genre: Genre filter is_hipster: Filter for lowest 10% popularity albums is_new: Filter for albums released in past two weeks Returns: Encoded query string with applied filters """ filters = [] if artist: filters.append(f"artist:{artist}") if track: filters.append(f"track:{track}") if album: filters.append(f"album:{album}") if year: filters.append(f"year:{year}") if year_range: filters.append(f"year:{year_range[0]}-{year_range[1]}") if genre: filters.append(f"genre:{genre}") if is_hipster: filters.append("tag:hipster") if is_new: filters.append("tag:new") query_parts = [base_query] + filters return quote(" ".join(query_parts)) def validate[T](func: Callable[..., T]) -> Callable[..., T]: """ Decorator for Spotify API methods that handles authentication and device validation. - Checks and refreshes authentication if needed - Validates active device and retries with candidate device if needed """ @functools.wraps(func) def wrapper(self: Any, *args: Any, **kwargs: Any) -> T: # Handle authentication if not self.auth_ok(): self.auth_refresh() # Handle device validation if not self.is_active_device(): kwargs["device"] = self._get_candidate_device() return func(self, *args, **kwargs) return wrapper

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jamiew/spotify-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server