282 lines
10 KiB
Python
282 lines
10 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import base64
|
|
from collections.abc import Iterable
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Any
|
|
from urllib.parse import urlencode
|
|
|
|
import httpx
|
|
|
|
from app.config import Settings
|
|
from app.utils.time import parse_spotify_datetime, to_unix_ms
|
|
|
|
|
|
class SpotifyApiError(RuntimeError):
|
|
def __init__(self, message: str, status_code: int | None = None, payload: Any | None = None) -> None:
|
|
super().__init__(message)
|
|
self.status_code = status_code
|
|
self.payload = payload
|
|
|
|
|
|
class SpotifyClient:
|
|
AUTH_BASE = "https://accounts.spotify.com"
|
|
API_BASE = "https://api.spotify.com/v1"
|
|
|
|
def __init__(self, settings: Settings, http: httpx.AsyncClient) -> None:
|
|
self.settings = settings
|
|
self.http = http
|
|
|
|
def build_authorize_url(self, state: str, scopes: Iterable[str]) -> str:
|
|
params = {
|
|
"client_id": self.settings.spotify_client_id,
|
|
"response_type": "code",
|
|
"redirect_uri": self.settings.spotify_redirect_uri,
|
|
"scope": " ".join(scopes),
|
|
"state": state,
|
|
"show_dialog": "false",
|
|
}
|
|
return f"{self.AUTH_BASE}/authorize?{urlencode(params)}"
|
|
|
|
async def exchange_code(self, code: str) -> dict[str, Any]:
|
|
data = {
|
|
"grant_type": "authorization_code",
|
|
"code": code,
|
|
"redirect_uri": self.settings.spotify_redirect_uri,
|
|
}
|
|
return await self._token_request(data)
|
|
|
|
async def refresh_access_token(self, refresh_token: str) -> dict[str, Any]:
|
|
data = {
|
|
"grant_type": "refresh_token",
|
|
"refresh_token": refresh_token,
|
|
}
|
|
return await self._token_request(data)
|
|
|
|
async def _token_request(self, data: dict[str, str]) -> dict[str, Any]:
|
|
creds = f"{self.settings.spotify_client_id}:{self.settings.spotify_client_secret}".encode()
|
|
auth = base64.b64encode(creds).decode()
|
|
headers = {
|
|
"Authorization": f"Basic {auth}",
|
|
"Content-Type": "application/x-www-form-urlencoded",
|
|
}
|
|
resp = await self.http.post(f"{self.AUTH_BASE}/api/token", data=data, headers=headers, timeout=30)
|
|
if resp.status_code >= 400:
|
|
raise SpotifyApiError("Spotify token request failed", resp.status_code, resp.text)
|
|
return resp.json()
|
|
|
|
async def get_current_user(self, access_token: str) -> dict[str, Any]:
|
|
return await self._request_json("GET", "/me", access_token=access_token)
|
|
|
|
async def get_saved_tracks_all(self, access_token: str) -> list[dict[str, Any]]:
|
|
tracks: list[dict[str, Any]] = []
|
|
offset = 0
|
|
limit = 50
|
|
while True:
|
|
payload = await self._request_json(
|
|
"GET",
|
|
"/me/tracks",
|
|
access_token=access_token,
|
|
params={"limit": limit, "offset": offset},
|
|
)
|
|
items = payload.get("items", [])
|
|
for item in items:
|
|
t = item.get("track") or {}
|
|
track_id = t.get("id")
|
|
if not track_id:
|
|
continue
|
|
artists = t.get("artists") or []
|
|
tracks.append(
|
|
{
|
|
"id": track_id,
|
|
"uri": t.get("uri") or f"spotify:track:{track_id}",
|
|
"name": t.get("name") or "Unknown",
|
|
"artist_names": [a.get("name") or "Unknown" for a in artists],
|
|
"artist_ids": [a.get("id") for a in artists if a.get("id")],
|
|
"album_name": (t.get("album") or {}).get("name"),
|
|
"added_at": parse_spotify_datetime(item.get("added_at")),
|
|
"popularity": t.get("popularity"),
|
|
}
|
|
)
|
|
if not payload.get("next"):
|
|
break
|
|
offset += limit
|
|
return tracks
|
|
|
|
async def get_recently_played(
|
|
self,
|
|
access_token: str,
|
|
*,
|
|
since: datetime,
|
|
max_pages: int = 10,
|
|
) -> list[dict[str, Any]]:
|
|
items: list[dict[str, Any]] = []
|
|
before: int | None = None
|
|
pages = 0
|
|
since_ms = to_unix_ms(since)
|
|
|
|
while pages < max_pages:
|
|
params: dict[str, Any] = {"limit": 50}
|
|
if before:
|
|
params["before"] = before
|
|
payload = await self._request_json(
|
|
"GET", "/me/player/recently-played", access_token=access_token, params=params
|
|
)
|
|
batch = payload.get("items", [])
|
|
if not batch:
|
|
break
|
|
stop = False
|
|
for item in batch:
|
|
played_at = parse_spotify_datetime(item.get("played_at"))
|
|
t = item.get("track") or {}
|
|
track_id = t.get("id")
|
|
if not played_at or not track_id:
|
|
continue
|
|
if to_unix_ms(played_at) < since_ms:
|
|
stop = True
|
|
continue
|
|
artists = t.get("artists") or []
|
|
items.append(
|
|
{
|
|
"id": track_id,
|
|
"uri": t.get("uri") or f"spotify:track:{track_id}",
|
|
"name": t.get("name") or "Unknown",
|
|
"artist_names": [a.get("name") or "Unknown" for a in artists],
|
|
"artist_ids": [a.get("id") for a in artists if a.get("id")],
|
|
"played_at": played_at,
|
|
"popularity": t.get("popularity"),
|
|
}
|
|
)
|
|
cursors = payload.get("cursors") or {}
|
|
before = cursors.get("before")
|
|
pages += 1
|
|
if stop or not before:
|
|
break
|
|
return items
|
|
|
|
async def create_playlist(
|
|
self,
|
|
access_token: str,
|
|
*,
|
|
user_id: str,
|
|
name: str,
|
|
description: str,
|
|
public: bool,
|
|
) -> dict[str, Any]:
|
|
# Prefer /me/playlists: in some accounts/apps /users/{id}/playlists returns 403
|
|
# even with valid playlist-modify scopes, while /me/playlists succeeds.
|
|
return await self._request_json(
|
|
"POST",
|
|
"/me/playlists",
|
|
access_token=access_token,
|
|
json={"name": name, "description": description, "public": public},
|
|
)
|
|
|
|
async def delete_playlist(self, access_token: str, playlist_id: str) -> None:
|
|
# Spotify has no hard delete for playlists; DELETE here unfollows/removes it from the library.
|
|
await self._request_json(
|
|
"DELETE",
|
|
f"/playlists/{playlist_id}/followers",
|
|
access_token=access_token,
|
|
)
|
|
|
|
async def add_playlist_items(self, access_token: str, playlist_id: str, uris: list[str]) -> None:
|
|
for i in range(0, len(uris), 100):
|
|
await self._request_json(
|
|
"POST",
|
|
f"/playlists/{playlist_id}/items",
|
|
access_token=access_token,
|
|
json={"uris": uris[i : i + 100]},
|
|
)
|
|
|
|
async def get_recommendations(
|
|
self,
|
|
access_token: str,
|
|
*,
|
|
seed_tracks: list[str],
|
|
seed_artists: list[str],
|
|
limit: int = 100,
|
|
market: str | None = None,
|
|
) -> list[dict[str, Any]]:
|
|
params: dict[str, Any] = {"limit": limit}
|
|
if market:
|
|
params["market"] = market
|
|
if seed_tracks:
|
|
params["seed_tracks"] = ",".join(seed_tracks[:5])
|
|
if seed_artists:
|
|
params["seed_artists"] = ",".join(seed_artists[:5])
|
|
if not params.get("seed_tracks") and not params.get("seed_artists"):
|
|
return []
|
|
payload = await self._request_json("GET", "/recommendations", access_token=access_token, params=params)
|
|
return payload.get("tracks", [])
|
|
|
|
async def get_artist_top_tracks(self, access_token: str, artist_id: str, market: str) -> list[dict[str, Any]]:
|
|
payload = await self._request_json(
|
|
"GET", f"/artists/{artist_id}/top-tracks", access_token=access_token, params={"market": market}
|
|
)
|
|
return payload.get("tracks", [])
|
|
|
|
async def search_track(
|
|
self,
|
|
access_token: str,
|
|
*,
|
|
track_name: str,
|
|
artist_name: str | None = None,
|
|
market: str | None = None,
|
|
) -> list[dict[str, Any]]:
|
|
q_parts: list[str] = []
|
|
if track_name:
|
|
q_parts.append(f'track:"{track_name}"')
|
|
if artist_name:
|
|
q_parts.append(f'artist:"{artist_name}"')
|
|
if not q_parts:
|
|
return []
|
|
q = " ".join(q_parts)
|
|
params: dict[str, Any] = {"q": q, "type": "track", "limit": 5}
|
|
if market:
|
|
params["market"] = market
|
|
payload = await self._request_json("GET", "/search", access_token=access_token, params=params)
|
|
return (payload.get("tracks") or {}).get("items", [])
|
|
|
|
async def _request_json(
|
|
self,
|
|
method: str,
|
|
path: str,
|
|
*,
|
|
access_token: str,
|
|
params: dict[str, Any] | None = None,
|
|
json: dict[str, Any] | None = None,
|
|
) -> dict[str, Any]:
|
|
headers = {"Authorization": f"Bearer {access_token}"}
|
|
last_error: SpotifyApiError | None = None
|
|
for _ in range(3):
|
|
resp = await self.http.request(
|
|
method,
|
|
f"{self.API_BASE}{path}",
|
|
headers=headers,
|
|
params=params,
|
|
json=json,
|
|
timeout=30,
|
|
)
|
|
if resp.status_code == 429:
|
|
retry_after = int(resp.headers.get("Retry-After", "1"))
|
|
await asyncio.sleep(max(1, retry_after))
|
|
continue
|
|
if resp.status_code >= 400:
|
|
last_error = SpotifyApiError(
|
|
f"Spotify API request failed: {method} {path}",
|
|
resp.status_code,
|
|
resp.text,
|
|
)
|
|
break
|
|
return resp.json() if resp.content else {}
|
|
if last_error:
|
|
raise last_error
|
|
raise SpotifyApiError(f"Spotify API request failed after retries: {method} {path}")
|
|
|
|
@staticmethod
|
|
def token_expiry_from_response(token_payload: dict[str, Any]) -> datetime:
|
|
expires_in = int(token_payload.get("expires_in", 3600))
|
|
return datetime.now(timezone.utc) + timedelta(seconds=max(60, expires_in - 60))
|