summaryrefslogtreecommitdiff
path: root/generate_daily_playlist.py
diff options
context:
space:
mode:
Diffstat (limited to 'generate_daily_playlist.py')
-rwxr-xr-xgenerate_daily_playlist.py421
1 files changed, 421 insertions, 0 deletions
diff --git a/generate_daily_playlist.py b/generate_daily_playlist.py
new file mode 100755
index 0000000..424984e
--- /dev/null
+++ b/generate_daily_playlist.py
@@ -0,0 +1,421 @@
+#!/usr/bin/env python3
+"""Generate a full 24-hour playlist for Radio Susan.
+Run at midnight via cron. Outputs playlist.json and playlist.m3u.
+"""
+
+import asyncio
+import grp
+import hashlib
+import json
+import os
+import random
+import subprocess
+import sys
+import tempfile
+import time
+from datetime import datetime, timedelta
+from pathlib import Path
+
+sys.path.insert(0, os.path.dirname(__file__))
+import dj_core
+
+# ── Paths ──────────────────────────────────────────────────────────────────────
+RADIO_DIR = Path("/var/lib/radio")
+PLAYLIST_JSON = RADIO_DIR / "playlist.json"
+PLAYLIST_M3U = RADIO_DIR / "playlist.m3u"
+JINGLES_DIR = RADIO_DIR / "jingles"
+ANNOUNCE_DIR = RADIO_DIR / "announcements"
+VENV_BIN = RADIO_DIR / "venv" / "bin"
+EDGE_TTS = VENV_BIN / "edge-tts"
+
+ANNOUNCE_DIR.mkdir(parents=True, exist_ok=True)
+
+DAY_SECONDS = 86400
+DEFAULT_ANNOUNCE_DURATION = 8
+DEFAULT_JINGLE_DURATION = 10
+
+ANNOUNCE_INTERVAL = 3
+JINGLE_INTERVAL = 5
+
+
+def set_mediaserver_perms(path):
+ """Set file group to mediaserver and perms to 664."""
+ try:
+ gid = grp.getgrnam("mediaserver").gr_gid
+ os.chown(str(path), -1, gid)
+ os.chmod(str(path), 0o664)
+ except Exception:
+ pass
+
+
+def get_jingle():
+ """Pick a random jingle, return (path, duration)."""
+ jingles = list(JINGLES_DIR.glob("*.mp3"))
+ if not jingles:
+ return None, 0
+ j = random.choice(jingles)
+ dur = get_audio_duration(str(j))
+ return str(j), dur if dur > 0 else DEFAULT_JINGLE_DURATION
+
+
+def get_audio_duration(path):
+ """Get audio duration in seconds via ffprobe."""
+ try:
+ result = subprocess.run(
+ ["ffprobe", "-v", "quiet", "-show_entries", "format=duration",
+ "-of", "default=noprint_wrappers=1:nokey=1", path],
+ capture_output=True, text=True, timeout=5
+ )
+ return float(result.stdout.strip())
+ except Exception:
+ return 0
+
+
+def generate_announcement_tts(text, voice=None):
+ """Generate TTS announcement, return cached path and duration."""
+ if voice is None:
+ h = int(hashlib.md5(text.encode()).hexdigest(), 16)
+ voice = dj_core.TTS_VOICES[h % len(dj_core.TTS_VOICES)]
+
+ content_hash = hashlib.md5(text.encode()).hexdigest()[:12]
+ cached = ANNOUNCE_DIR / f"announce_{content_hash}.mp3"
+
+ if cached.exists():
+ dur = get_audio_duration(str(cached))
+ return str(cached), dur if dur > 0 else DEFAULT_ANNOUNCE_DURATION
+
+ with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as tmp:
+ tmp_path = tmp.name
+
+ try:
+ result = subprocess.run(
+ [str(EDGE_TTS), "--voice", voice, "--rate", "+10%",
+ "--text", text, "--write-media", tmp_path],
+ capture_output=True, text=True, timeout=15
+ )
+ if result.returncode != 0:
+ return None, 0
+
+ # Normalize
+ norm_result = subprocess.run(
+ ["ffmpeg", "-y", "-hide_banner", "-loglevel", "error",
+ "-i", tmp_path,
+ "-af", "loudnorm=I=-14:TP=-1:LRA=11,aformat=sample_rates=44100:channel_layouts=stereo",
+ "-b:a", "192k", str(cached)],
+ capture_output=True, text=True, timeout=15
+ )
+ if norm_result.returncode != 0:
+ os.rename(tmp_path, str(cached))
+
+ set_mediaserver_perms(cached)
+ dur = get_audio_duration(str(cached))
+ return str(cached), dur if dur > 0 else DEFAULT_ANNOUNCE_DURATION
+ except Exception:
+ return None, 0
+ finally:
+ try:
+ os.unlink(tmp_path)
+ except OSError:
+ pass
+
+
+def format_time(seconds):
+ """Format seconds into HH:MM:SS."""
+ h = int(seconds // 3600)
+ m = int((seconds % 3600) // 60)
+ s = int(seconds % 60)
+ return f"{h:02d}:{m:02d}:{s:02d}"
+
+
+def build_playlist():
+ """Generate the full 24-hour playlist."""
+ print("Loading tracks...")
+ all_tracks = dj_core.load_tracks()
+ if not all_tracks:
+ print("ERROR: No tracks found!")
+ return None
+
+ nav_data = dj_core.load_navidrome()
+ now = time.time()
+
+ print(f"Loaded {len(all_tracks)} tracks, {len(nav_data)} with play data")
+
+ # In-memory cooldown tracking
+ cooled_paths = set()
+ cooled_artists = {} # artist_lower → timestamp of last play
+ cooled_albums = {} # album_lower → timestamp of last play
+ last_genre = None
+ recent_energies = []
+
+ entries = []
+ current_time = 0.0 # seconds from midnight
+ track_count = 0
+ tracks_since_announce = 0
+ tracks_since_jingle = 0
+
+ jingle_files = list(JINGLES_DIR.glob("*.mp3"))
+
+ # Recent track info for announcements
+ recent_tracks = []
+
+ print("Building playlist...")
+
+ while current_time < DAY_SECONDS:
+ hour = int(current_time // 3600) % 24
+ target_energy = dj_core.ENERGY_CURVE.get(hour, 5)
+ show = dj_core.get_show_for_hour(hour)
+
+ # ── Energy filter ─────────────────────────────────────────────
+ pool = dj_core.filter_by_energy(all_tracks, target_energy)
+
+ # ── Cooldown filter (in-memory) ───────────────────────────────
+ sim_now = now + current_time
+ active_paths = cooled_paths # 7-day cooldown, but within a day they're all active
+ active_artists = {a for a, ts in cooled_artists.items() if sim_now - ts < dj_core.ARTIST_COOLDOWN}
+ active_albums = {a for a, ts in cooled_albums.items() if sim_now - ts < dj_core.ALBUM_COOLDOWN}
+
+ filtered = dj_core.apply_cooldowns(pool, active_paths, active_artists, active_albums, last_genre)
+
+ if len(filtered) < 5:
+ # Relax cooldowns
+ filtered = dj_core.apply_cooldowns(pool, active_paths, set(), set(), None)
+ if len(filtered) < 5:
+ filtered = pool
+ if not filtered:
+ filtered = all_tracks
+
+ # ── Mode selection ────────────────────────────────────────────
+ mode = dj_core.pick_mode()
+
+ # Handle full_album
+ if mode == "full_album":
+ multi = dj_core.find_album_tracks(filtered)
+ if multi:
+ chosen_key = random.choice(list(multi.keys()))
+ album_tracks = multi[chosen_key]
+ for i, t in enumerate(album_tracks):
+ if current_time >= DAY_SECONDS:
+ break
+ dur = t["duration"] if t["duration"] > 0 else 240
+ entries.append({
+ "type": "track",
+ "path": t["path"],
+ "artist": t["artist"],
+ "title": t["title"],
+ "album": t["album"],
+ "genre": t["genre"],
+ "energy": t["energy"],
+ "duration": round(dur, 1),
+ "start_time": round(current_time, 1),
+ "start_formatted": format_time(current_time),
+ "mode": "full_album",
+ "show": show["name"],
+ })
+
+ # Update cooldowns
+ cooled_paths.add(t["path"])
+ if t["artist"]:
+ cooled_artists[t["artist"].lower()] = sim_now
+ if t["album"]:
+ cooled_albums[t["album"].lower()] = sim_now
+ last_genre = dj_core.normalize_genre(t["genre"])
+ recent_energies = (recent_energies + [t["energy"]])[-3:]
+ recent_tracks.append(t)
+
+ current_time += dur
+ track_count += 1
+ tracks_since_announce += 1
+ tracks_since_jingle += 1
+
+ # After album, maybe insert announcement
+ if tracks_since_announce >= ANNOUNCE_INTERVAL:
+ ann_entry = _make_announcement(recent_tracks, current_time)
+ if ann_entry:
+ entries.append(ann_entry)
+ current_time += ann_entry["duration"]
+ tracks_since_announce = 0
+
+ continue # Skip normal pick, already added album
+ else:
+ mode = "shuffle"
+
+ # ── Normal pick ───────────────────────────────────────────────
+ mode_pool, actual_mode = dj_core.apply_mode(filtered, mode, nav_data, now)
+ pick = dj_core.contrast_pick(mode_pool, recent_energies)
+
+ if not os.path.isfile(pick["path"]):
+ mode_pool = [t for t in mode_pool if os.path.isfile(t["path"])]
+ if mode_pool:
+ pick = random.choice(mode_pool)
+ else:
+ continue
+
+ dur = pick["duration"] if pick["duration"] > 0 else 240
+
+ entries.append({
+ "type": "track",
+ "path": pick["path"],
+ "artist": pick["artist"],
+ "title": pick["title"],
+ "album": pick["album"],
+ "genre": pick["genre"],
+ "energy": pick["energy"],
+ "duration": round(dur, 1),
+ "start_time": round(current_time, 1),
+ "start_formatted": format_time(current_time),
+ "mode": actual_mode,
+ "show": show["name"],
+ })
+
+ # Update cooldowns
+ cooled_paths.add(pick["path"])
+ if pick["artist"]:
+ cooled_artists[pick["artist"].lower()] = sim_now
+ if pick["album"]:
+ cooled_albums[pick["album"].lower()] = sim_now
+ last_genre = dj_core.normalize_genre(pick["genre"])
+ recent_energies = (recent_energies + [pick["energy"]])[-3:]
+ recent_tracks.append(pick)
+
+ current_time += dur
+ track_count += 1
+ tracks_since_announce += 1
+ tracks_since_jingle += 1
+
+ # ── Insert announcement ───────────────────────────────────────
+ if tracks_since_announce >= ANNOUNCE_INTERVAL and current_time < DAY_SECONDS:
+ ann_entry = _make_announcement(recent_tracks, current_time)
+ if ann_entry:
+ entries.append(ann_entry)
+ current_time += ann_entry["duration"]
+ tracks_since_announce = 0
+
+ # ── Insert jingle ─────────────────────────────────────────────
+ if tracks_since_jingle >= JINGLE_INTERVAL and jingle_files and current_time < DAY_SECONDS:
+ j_path, j_dur = get_jingle()
+ if j_path:
+ entries.append({
+ "type": "jingle",
+ "path": j_path,
+ "duration": round(j_dur, 1),
+ "start_time": round(current_time, 1),
+ "start_formatted": format_time(current_time),
+ })
+ current_time += j_dur
+ tracks_since_jingle = 0
+
+ # Build show schedule with formatted times
+ shows = []
+ for s in dj_core.SHOWS:
+ shows.append({
+ "name": s["name"],
+ "start": f"{s['start_hour']:02d}:00",
+ "end": f"{s['end_hour']:02d}:00" if s["end_hour"] < 24 else "00:00",
+ "energy": s["energy"],
+ "description": s["description"],
+ })
+
+ playlist = {
+ "generated_at": datetime.now().isoformat(timespec="seconds"),
+ "total_tracks": track_count,
+ "total_entries": len(entries),
+ "total_duration_hours": round(current_time / 3600, 1),
+ "shows": shows,
+ "entries": entries,
+ }
+
+ return playlist
+
+
+def _make_announcement(recent_tracks, current_time):
+ """Build an announcement entry from recent tracks."""
+ if len(recent_tracks) < 2:
+ return None
+
+ last = recent_tracks[-1]
+ prev = recent_tracks[-2]
+
+ last_str = f"{last['artist']} - {last['title']}" if last["artist"] and last["title"] else None
+ prev_str = f"{prev['artist']} - {prev['title']}" if prev["artist"] and prev["title"] else None
+
+ if not last_str:
+ return None
+
+ station_id = random.choice(dj_core.STATION_IDS)
+
+ if prev_str:
+ text = f"You've been listening to {last_str}, and {prev_str}. {station_id}"
+ else:
+ text = f"You've been listening to {last_str}. {station_id}"
+
+ path, dur = generate_announcement_tts(text)
+ if not path:
+ return None
+
+ return {
+ "type": "announcement",
+ "path": path,
+ "text": text,
+ "duration": round(dur, 1),
+ "start_time": round(current_time, 1),
+ "start_formatted": format_time(current_time),
+ }
+
+
+def write_outputs(playlist):
+ """Write playlist.json and playlist.m3u."""
+ # JSON
+ with open(PLAYLIST_JSON, "w") as f:
+ json.dump(playlist, f, indent=2)
+ set_mediaserver_perms(PLAYLIST_JSON)
+
+ # M3U
+ with open(PLAYLIST_M3U, "w") as f:
+ f.write("#EXTM3U\n")
+ for entry in playlist["entries"]:
+ path = entry["path"]
+ dur = int(entry.get("duration", 0))
+ if entry["type"] == "track":
+ f.write(f"#EXTINF:{dur},{entry.get('artist', '')} - {entry.get('title', '')}\n")
+ elif entry["type"] == "announcement":
+ f.write(f"#EXTINF:{dur},Announcement\n")
+ elif entry["type"] == "jingle":
+ f.write(f"#EXTINF:{dur},Jingle\n")
+ f.write(f"{path}\n")
+ set_mediaserver_perms(PLAYLIST_M3U)
+
+
+def main():
+ start = time.time()
+ print(f"Radio Susan — Daily Playlist Generator")
+ print(f"Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
+ print()
+
+ playlist = build_playlist()
+ if not playlist:
+ sys.exit(1)
+
+ print(f"\nGenerating announcements...")
+ # Count announcements
+ ann_count = sum(1 for e in playlist["entries"] if e["type"] == "announcement")
+ jingle_count = sum(1 for e in playlist["entries"] if e["type"] == "jingle")
+
+ print(f"\nWriting outputs...")
+ write_outputs(playlist)
+
+ elapsed = time.time() - start
+ print(f"\n{'='*60}")
+ print(f" Playlist generated successfully!")
+ print(f" Tracks: {playlist['total_tracks']}")
+ print(f" Announcements: {ann_count}")
+ print(f" Jingles: {jingle_count}")
+ print(f" Total entries: {playlist['total_entries']}")
+ print(f" Duration: {playlist['total_duration_hours']}h")
+ print(f" Time taken: {elapsed:.1f}s")
+ print(f" Output: {PLAYLIST_JSON}")
+ print(f" Output: {PLAYLIST_M3U}")
+ print(f"{'='*60}")
+
+
+if __name__ == "__main__":
+ main()