from __future__ import annotations import asyncio from dataclasses import dataclass from datetime import date from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker from app.config import Settings from app.db.repositories import PlaylistRunRepository, RecommendationHistoryRepository, UserRepository from app.services.recommendation import RecommendationEngine from app.services.spotify_auth import SpotifyAuthService from app.types import PlaylistBuildResult @dataclass class JobOutcome: user_id: int ok: bool message: str playlist_url: str | None = None class PlaylistJobService: def __init__( self, settings: Settings, session_factory: async_sessionmaker[AsyncSession], auth_service: SpotifyAuthService, recommendation_engine: RecommendationEngine, generate_lock: asyncio.Lock, ) -> None: self.settings = settings self.session_factory = session_factory self.auth_service = auth_service self.recommendation_engine = recommendation_engine self.generate_lock = generate_lock self._notify = None def set_notifier(self, notifier) -> None: self._notify = notifier async def generate_for_user(self, user_id: int, *, force: bool = False, notify: bool = True) -> JobOutcome: async with self.generate_lock: async with self.session_factory() as session: users = UserRepository(session) runs = PlaylistRunRepository(session) history = RecommendationHistoryRepository(session) user = await users.get_by_id(user_id) if not user: return JobOutcome(user_id=user_id, ok=False, message="User not found") if not user.spotify_refresh_token: return JobOutcome(user_id=user_id, ok=False, message="Spotify is not connected") if not force and user.last_generated_date == date.today(): latest = await runs.latest_for_user(user.id) return JobOutcome( user_id=user.id, ok=True, message="Already generated today", playlist_url=latest.playlist_url if latest else user.latest_playlist_url, ) run = await runs.create_run(user_id=user.id, run_date=date.today()) try: access_token = await self.auth_service.ensure_valid_access_token(session, user) # Re-sync likes each run so new likes affect next day's picks. await self.recommendation_engine.sync_saved_tracks(session, user, access_token) build = await self.recommendation_engine.build_daily_playlist(session, user, access_token) if not build.tracks: raise RuntimeError("No candidate tracks found. Try listening more or widen sources.") playlist_meta = await self._create_spotify_playlist(session, user, access_token, build) serialized_tracks = [] for c in build.tracks: serialized_tracks.append( { "id": c.id, "name": c.name, "artist_names": c.artist_names, "source": c.source, "is_new_to_bot": True, # fixed below } ) history_ids = await history.list_track_ids(user.id) for item in serialized_tracks: item["is_new_to_bot"] = item["id"] not in history_ids await runs.add_tracks(run.id, serialized_tracks) await history.mark_tracks(user.id, [c.id for c in build.tracks]) await runs.mark_success( run, playlist_id=playlist_meta["id"], playlist_name=playlist_meta["name"], playlist_url=playlist_meta.get("url"), total_tracks=len(build.tracks), new_tracks=build.new_count, reused_tracks=build.reused_count, notes=build.notes, ) user.last_generated_date = date.today() user.latest_playlist_id = playlist_meta["id"] user.latest_playlist_url = playlist_meta.get("url") await session.commit() message = ( f"Playlist ready: {playlist_meta['name']} ({len(build.tracks)} tracks, " f"new {build.new_count}/{len(build.tracks)})" ) if notify and self._notify: await self._notify(user.telegram_chat_id, f"{message}\n{playlist_meta.get('url', '')}".strip()) return JobOutcome(user_id=user.id, ok=True, message=message, playlist_url=playlist_meta.get("url")) except Exception as exc: await runs.mark_failed(run, str(exc)) await session.commit() if notify and self._notify: await self._notify(user.telegram_chat_id, f"Playlist generation failed: {exc}") return JobOutcome(user_id=user.id, ok=False, message=str(exc)) async def generate_for_all_connected_users(self) -> list[JobOutcome]: async with self.session_factory() as session: users_repo = UserRepository(session) users = await users_repo.list_active_connected_users() outcomes: list[JobOutcome] = [] for user in users: outcomes.append(await self.generate_for_user(user.id, notify=True)) return outcomes async def _create_spotify_playlist( self, session: AsyncSession, user, access_token: str, build: PlaylistBuildResult ) -> dict[str, str | None]: public = self.settings.playlist_visibility.lower() == "public" name = f"Daily Vibe {date.today().isoformat()}" desc = ( "Auto-generated from your recent listening + liked tracks. " f"New-to-bot: {build.new_count}/{len(build.tracks)}." ) playlist = await self.auth_service.spotify.create_playlist( access_token, user_id=user.spotify_user_id, name=name, description=desc, public=public, ) await self.auth_service.spotify.add_playlist_items(access_token, playlist["id"], [c.uri for c in build.tracks]) return { "id": playlist["id"], "name": playlist["name"], "url": ((playlist.get("external_urls") or {}).get("spotify")), }