from __future__ import annotations import asyncio import base64 from collections.abc import Iterable from datetime import datetime, timedelta, timezone from typing import Any from urllib.parse import urlencode import httpx from app.config import Settings from app.utils.time import parse_spotify_datetime, to_unix_ms class SpotifyApiError(RuntimeError): def __init__(self, message: str, status_code: int | None = None, payload: Any | None = None) -> None: super().__init__(message) self.status_code = status_code self.payload = payload class SpotifyClient: AUTH_BASE = "https://accounts.spotify.com" API_BASE = "https://api.spotify.com/v1" def __init__(self, settings: Settings, http: httpx.AsyncClient) -> None: self.settings = settings self.http = http def build_authorize_url(self, state: str, scopes: Iterable[str]) -> str: params = { "client_id": self.settings.spotify_client_id, "response_type": "code", "redirect_uri": self.settings.spotify_redirect_uri, "scope": " ".join(scopes), "state": state, "show_dialog": "false", } return f"{self.AUTH_BASE}/authorize?{urlencode(params)}" async def exchange_code(self, code: str) -> dict[str, Any]: data = { "grant_type": "authorization_code", "code": code, "redirect_uri": self.settings.spotify_redirect_uri, } return await self._token_request(data) async def refresh_access_token(self, refresh_token: str) -> dict[str, Any]: data = { "grant_type": "refresh_token", "refresh_token": refresh_token, } return await self._token_request(data) async def _token_request(self, data: dict[str, str]) -> dict[str, Any]: creds = f"{self.settings.spotify_client_id}:{self.settings.spotify_client_secret}".encode() auth = base64.b64encode(creds).decode() headers = { "Authorization": f"Basic {auth}", "Content-Type": "application/x-www-form-urlencoded", } resp = await self.http.post(f"{self.AUTH_BASE}/api/token", data=data, headers=headers, timeout=30) if resp.status_code >= 400: raise SpotifyApiError("Spotify token request failed", resp.status_code, resp.text) return resp.json() async def get_current_user(self, access_token: str) -> dict[str, Any]: return await self._request_json("GET", "/me", access_token=access_token) async def get_saved_tracks_all(self, access_token: str) -> list[dict[str, Any]]: tracks: list[dict[str, Any]] = [] offset = 0 limit = 50 while True: payload = await self._request_json( "GET", "/me/tracks", access_token=access_token, params={"limit": limit, "offset": offset}, ) items = payload.get("items", []) for item in items: t = item.get("track") or {} track_id = t.get("id") if not track_id: continue artists = t.get("artists") or [] tracks.append( { "id": track_id, "uri": t.get("uri") or f"spotify:track:{track_id}", "name": t.get("name") or "Unknown", "artist_names": [a.get("name") or "Unknown" for a in artists], "artist_ids": [a.get("id") for a in artists if a.get("id")], "album_name": (t.get("album") or {}).get("name"), "added_at": parse_spotify_datetime(item.get("added_at")), "popularity": t.get("popularity"), } ) if not payload.get("next"): break offset += limit return tracks async def get_recently_played( self, access_token: str, *, since: datetime, max_pages: int = 10, ) -> list[dict[str, Any]]: items: list[dict[str, Any]] = [] before: int | None = None pages = 0 since_ms = to_unix_ms(since) while pages < max_pages: params: dict[str, Any] = {"limit": 50} if before: params["before"] = before payload = await self._request_json( "GET", "/me/player/recently-played", access_token=access_token, params=params ) batch = payload.get("items", []) if not batch: break stop = False for item in batch: played_at = parse_spotify_datetime(item.get("played_at")) t = item.get("track") or {} track_id = t.get("id") if not played_at or not track_id: continue if to_unix_ms(played_at) < since_ms: stop = True continue artists = t.get("artists") or [] items.append( { "id": track_id, "uri": t.get("uri") or f"spotify:track:{track_id}", "name": t.get("name") or "Unknown", "artist_names": [a.get("name") or "Unknown" for a in artists], "artist_ids": [a.get("id") for a in artists if a.get("id")], "played_at": played_at, "popularity": t.get("popularity"), } ) cursors = payload.get("cursors") or {} before = cursors.get("before") pages += 1 if stop or not before: break return items async def create_playlist( self, access_token: str, *, user_id: str, name: str, description: str, public: bool, ) -> dict[str, Any]: # Prefer /me/playlists: in some accounts/apps /users/{id}/playlists returns 403 # even with valid playlist-modify scopes, while /me/playlists succeeds. return await self._request_json( "POST", "/me/playlists", access_token=access_token, json={"name": name, "description": description, "public": public}, ) async def delete_playlist(self, access_token: str, playlist_id: str) -> None: # Spotify has no hard delete for playlists; DELETE here unfollows/removes it from the library. await self._request_json( "DELETE", f"/playlists/{playlist_id}/followers", access_token=access_token, ) async def add_playlist_items(self, access_token: str, playlist_id: str, uris: list[str]) -> None: for i in range(0, len(uris), 100): await self._request_json( "POST", f"/playlists/{playlist_id}/items", access_token=access_token, json={"uris": uris[i : i + 100]}, ) async def get_recommendations( self, access_token: str, *, seed_tracks: list[str], seed_artists: list[str], limit: int = 100, market: str | None = None, ) -> list[dict[str, Any]]: params: dict[str, Any] = {"limit": limit} if market: params["market"] = market if seed_tracks: params["seed_tracks"] = ",".join(seed_tracks[:5]) if seed_artists: params["seed_artists"] = ",".join(seed_artists[:5]) if not params.get("seed_tracks") and not params.get("seed_artists"): return [] payload = await self._request_json("GET", "/recommendations", access_token=access_token, params=params) return payload.get("tracks", []) async def get_artist_top_tracks(self, access_token: str, artist_id: str, market: str) -> list[dict[str, Any]]: payload = await self._request_json( "GET", f"/artists/{artist_id}/top-tracks", access_token=access_token, params={"market": market} ) return payload.get("tracks", []) async def search_track( self, access_token: str, *, track_name: str, artist_name: str | None = None, market: str | None = None, ) -> list[dict[str, Any]]: q_parts: list[str] = [] if track_name: q_parts.append(f'track:"{track_name}"') if artist_name: q_parts.append(f'artist:"{artist_name}"') if not q_parts: return [] q = " ".join(q_parts) params: dict[str, Any] = {"q": q, "type": "track", "limit": 5} if market: params["market"] = market payload = await self._request_json("GET", "/search", access_token=access_token, params=params) return (payload.get("tracks") or {}).get("items", []) async def _request_json( self, method: str, path: str, *, access_token: str, params: dict[str, Any] | None = None, json: dict[str, Any] | None = None, ) -> dict[str, Any]: headers = {"Authorization": f"Bearer {access_token}"} last_error: SpotifyApiError | None = None for _ in range(3): resp = await self.http.request( method, f"{self.API_BASE}{path}", headers=headers, params=params, json=json, timeout=30, ) if resp.status_code == 429: retry_after = int(resp.headers.get("Retry-After", "1")) await asyncio.sleep(max(1, retry_after)) continue if resp.status_code >= 400: last_error = SpotifyApiError( f"Spotify API request failed: {method} {path}", resp.status_code, resp.text, ) break return resp.json() if resp.content else {} if last_error: raise last_error raise SpotifyApiError(f"Spotify API request failed after retries: {method} {path}") @staticmethod def token_expiry_from_response(token_payload: dict[str, Any]) -> datetime: expires_in = int(token_payload.get("expires_in", 3600)) return datetime.now(timezone.utc) + timedelta(seconds=max(60, expires_in - 60))