#!/usr/bin/env python3 """Generate a full 24-hour playlist for Radio Susan. Run at midnight via cron. Outputs playlist.json and playlist.m3u. """ import asyncio import grp import hashlib import json import os import random import subprocess import sys import tempfile import time from datetime import datetime, timedelta from pathlib import Path sys.path.insert(0, os.path.dirname(__file__)) import dj_core # ── Paths ────────────────────────────────────────────────────────────────────── RADIO_DIR = Path("/var/lib/radio") PLAYLIST_JSON = RADIO_DIR / "playlist.json" PLAYLIST_M3U = RADIO_DIR / "playlist.m3u" JINGLES_DIR = RADIO_DIR / "jingles" ANNOUNCE_DIR = RADIO_DIR / "announcements" VENV_BIN = RADIO_DIR / "venv" / "bin" EDGE_TTS = VENV_BIN / "edge-tts" ANNOUNCE_DIR.mkdir(parents=True, exist_ok=True) DAY_SECONDS = 86400 DEFAULT_ANNOUNCE_DURATION = 8 DEFAULT_JINGLE_DURATION = 10 ANNOUNCE_INTERVAL = 3 JINGLE_INTERVAL = 5 def set_mediaserver_perms(path): """Set file group to mediaserver and perms to 664.""" try: gid = grp.getgrnam("mediaserver").gr_gid os.chown(str(path), -1, gid) os.chmod(str(path), 0o664) except Exception: pass def get_jingle(): """Pick a random jingle, return (path, duration).""" jingles = list(JINGLES_DIR.glob("*.mp3")) if not jingles: return None, 0 j = random.choice(jingles) dur = get_audio_duration(str(j)) return str(j), dur if dur > 0 else DEFAULT_JINGLE_DURATION def get_audio_duration(path): """Get audio duration in seconds via ffprobe.""" try: result = subprocess.run( ["ffprobe", "-v", "quiet", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", path], capture_output=True, text=True, timeout=5 ) return float(result.stdout.strip()) except Exception: return 0 def generate_announcement_tts(text, voice=None): """Generate TTS announcement, return cached path and duration.""" if voice is None: h = int(hashlib.md5(text.encode()).hexdigest(), 16) voice = dj_core.TTS_VOICES[h % len(dj_core.TTS_VOICES)] content_hash = hashlib.md5(text.encode()).hexdigest()[:12] cached = ANNOUNCE_DIR / f"announce_{content_hash}.mp3" if cached.exists(): dur = get_audio_duration(str(cached)) return str(cached), dur if dur > 0 else DEFAULT_ANNOUNCE_DURATION with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as tmp: tmp_path = tmp.name try: result = subprocess.run( [str(EDGE_TTS), "--voice", voice, "--rate", "+10%", "--text", text, "--write-media", tmp_path], capture_output=True, text=True, timeout=15 ) if result.returncode != 0: return None, 0 # Normalize norm_result = subprocess.run( ["ffmpeg", "-y", "-hide_banner", "-loglevel", "error", "-i", tmp_path, "-af", "loudnorm=I=-14:TP=-1:LRA=11,aformat=sample_rates=44100:channel_layouts=stereo", "-b:a", "192k", str(cached)], capture_output=True, text=True, timeout=15 ) if norm_result.returncode != 0: os.rename(tmp_path, str(cached)) set_mediaserver_perms(cached) dur = get_audio_duration(str(cached)) return str(cached), dur if dur > 0 else DEFAULT_ANNOUNCE_DURATION except Exception: return None, 0 finally: try: os.unlink(tmp_path) except OSError: pass def format_time(seconds): """Format seconds into HH:MM:SS.""" h = int(seconds // 3600) m = int((seconds % 3600) // 60) s = int(seconds % 60) return f"{h:02d}:{m:02d}:{s:02d}" def build_playlist(): """Generate the full 24-hour playlist.""" print("Loading tracks...") all_tracks = dj_core.load_tracks() if not all_tracks: print("ERROR: No tracks found!") return None nav_data = dj_core.load_navidrome() now = time.time() print(f"Loaded {len(all_tracks)} tracks, {len(nav_data)} with play data") # In-memory cooldown tracking cooled_paths = set() cooled_artists = {} # artist_lower → timestamp of last play cooled_albums = {} # album_lower → timestamp of last play last_genre = None recent_energies = [] entries = [] current_time = 0.0 # seconds from midnight track_count = 0 tracks_since_announce = 0 tracks_since_jingle = 0 jingle_files = list(JINGLES_DIR.glob("*.mp3")) # Recent track info for announcements recent_tracks = [] print("Building playlist...") while current_time < DAY_SECONDS: hour = int(current_time // 3600) % 24 target_energy = dj_core.ENERGY_CURVE.get(hour, 5) show = dj_core.get_show_for_hour(hour) # ── Energy filter ───────────────────────────────────────────── pool = dj_core.filter_by_energy(all_tracks, target_energy) # ── Cooldown filter (in-memory) ─────────────────────────────── sim_now = now + current_time active_paths = cooled_paths # 7-day cooldown, but within a day they're all active active_artists = {a for a, ts in cooled_artists.items() if sim_now - ts < dj_core.ARTIST_COOLDOWN} active_albums = {a for a, ts in cooled_albums.items() if sim_now - ts < dj_core.ALBUM_COOLDOWN} filtered = dj_core.apply_cooldowns(pool, active_paths, active_artists, active_albums, last_genre) if len(filtered) < 5: # Relax cooldowns filtered = dj_core.apply_cooldowns(pool, active_paths, set(), set(), None) if len(filtered) < 5: filtered = pool if not filtered: filtered = all_tracks # ── Mode selection ──────────────────────────────────────────── mode = dj_core.pick_mode() # Handle full_album if mode == "full_album": multi = dj_core.find_album_tracks(filtered) if multi: chosen_key = random.choice(list(multi.keys())) album_tracks = multi[chosen_key] for i, t in enumerate(album_tracks): if current_time >= DAY_SECONDS: break dur = t["duration"] if t["duration"] > 0 else 240 entries.append({ "type": "track", "path": t["path"], "artist": t["artist"], "title": t["title"], "album": t["album"], "genre": t["genre"], "energy": t["energy"], "duration": round(dur, 1), "start_time": round(current_time, 1), "start_formatted": format_time(current_time), "mode": "full_album", "show": show["name"], }) # Update cooldowns cooled_paths.add(t["path"]) if t["artist"]: cooled_artists[t["artist"].lower()] = sim_now if t["album"]: cooled_albums[t["album"].lower()] = sim_now last_genre = dj_core.normalize_genre(t["genre"]) recent_energies = (recent_energies + [t["energy"]])[-3:] recent_tracks.append(t) current_time += dur track_count += 1 tracks_since_announce += 1 tracks_since_jingle += 1 # After album, maybe insert announcement if tracks_since_announce >= ANNOUNCE_INTERVAL: ann_entry = _make_announcement(recent_tracks, current_time) if ann_entry: entries.append(ann_entry) current_time += ann_entry["duration"] tracks_since_announce = 0 continue # Skip normal pick, already added album else: mode = "shuffle" # ── Normal pick ─────────────────────────────────────────────── mode_pool, actual_mode = dj_core.apply_mode(filtered, mode, nav_data, now) pick = dj_core.contrast_pick(mode_pool, recent_energies) if not os.path.isfile(pick["path"]): mode_pool = [t for t in mode_pool if os.path.isfile(t["path"])] if mode_pool: pick = random.choice(mode_pool) else: continue dur = pick["duration"] if pick["duration"] > 0 else 240 entries.append({ "type": "track", "path": pick["path"], "artist": pick["artist"], "title": pick["title"], "album": pick["album"], "genre": pick["genre"], "energy": pick["energy"], "duration": round(dur, 1), "start_time": round(current_time, 1), "start_formatted": format_time(current_time), "mode": actual_mode, "show": show["name"], }) # Update cooldowns cooled_paths.add(pick["path"]) if pick["artist"]: cooled_artists[pick["artist"].lower()] = sim_now if pick["album"]: cooled_albums[pick["album"].lower()] = sim_now last_genre = dj_core.normalize_genre(pick["genre"]) recent_energies = (recent_energies + [pick["energy"]])[-3:] recent_tracks.append(pick) current_time += dur track_count += 1 tracks_since_announce += 1 tracks_since_jingle += 1 # ── Insert announcement ─────────────────────────────────────── if tracks_since_announce >= ANNOUNCE_INTERVAL and current_time < DAY_SECONDS: ann_entry = _make_announcement(recent_tracks, current_time) if ann_entry: entries.append(ann_entry) current_time += ann_entry["duration"] tracks_since_announce = 0 # ── Insert jingle ───────────────────────────────────────────── if tracks_since_jingle >= JINGLE_INTERVAL and jingle_files and current_time < DAY_SECONDS: j_path, j_dur = get_jingle() if j_path: entries.append({ "type": "jingle", "path": j_path, "duration": round(j_dur, 1), "start_time": round(current_time, 1), "start_formatted": format_time(current_time), }) current_time += j_dur tracks_since_jingle = 0 # Build show schedule with formatted times shows = [] for s in dj_core.SHOWS: shows.append({ "name": s["name"], "start": f"{s['start_hour']:02d}:00", "end": f"{s['end_hour']:02d}:00" if s["end_hour"] < 24 else "00:00", "energy": s["energy"], "description": s["description"], }) playlist = { "generated_at": datetime.now().isoformat(timespec="seconds"), "total_tracks": track_count, "total_entries": len(entries), "total_duration_hours": round(current_time / 3600, 1), "shows": shows, "entries": entries, } return playlist def _make_announcement(recent_tracks, current_time): """Build an announcement entry from recent tracks.""" if len(recent_tracks) < 2: return None last = recent_tracks[-1] prev = recent_tracks[-2] last_str = f"{last['artist']} - {last['title']}" if last["artist"] and last["title"] else None prev_str = f"{prev['artist']} - {prev['title']}" if prev["artist"] and prev["title"] else None if not last_str: return None station_id = random.choice(dj_core.STATION_IDS) if prev_str: text = f"You've been listening to {last_str}, and {prev_str}. {station_id}" else: text = f"You've been listening to {last_str}. {station_id}" path, dur = generate_announcement_tts(text) if not path: return None return { "type": "announcement", "path": path, "text": text, "duration": round(dur, 1), "start_time": round(current_time, 1), "start_formatted": format_time(current_time), } def write_outputs(playlist): """Write playlist.json and playlist.m3u.""" # JSON with open(PLAYLIST_JSON, "w") as f: json.dump(playlist, f, indent=2) set_mediaserver_perms(PLAYLIST_JSON) # M3U with open(PLAYLIST_M3U, "w") as f: f.write("#EXTM3U\n") for entry in playlist["entries"]: path = entry["path"] dur = int(entry.get("duration", 0)) if entry["type"] == "track": f.write(f"#EXTINF:{dur},{entry.get('artist', '')} - {entry.get('title', '')}\n") elif entry["type"] == "announcement": f.write(f"#EXTINF:{dur},Announcement\n") elif entry["type"] == "jingle": f.write(f"#EXTINF:{dur},Jingle\n") f.write(f"{path}\n") set_mediaserver_perms(PLAYLIST_M3U) def main(): start = time.time() print(f"Radio Susan — Daily Playlist Generator") print(f"Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print() playlist = build_playlist() if not playlist: sys.exit(1) print(f"\nGenerating announcements...") # Count announcements ann_count = sum(1 for e in playlist["entries"] if e["type"] == "announcement") jingle_count = sum(1 for e in playlist["entries"] if e["type"] == "jingle") print(f"\nWriting outputs...") write_outputs(playlist) elapsed = time.time() - start print(f"\n{'='*60}") print(f" Playlist generated successfully!") print(f" Tracks: {playlist['total_tracks']}") print(f" Announcements: {ann_count}") print(f" Jingles: {jingle_count}") print(f" Total entries: {playlist['total_entries']}") print(f" Duration: {playlist['total_duration_hours']}h") print(f" Time taken: {elapsed:.1f}s") print(f" Output: {PLAYLIST_JSON}") print(f" Output: {PLAYLIST_M3U}") print(f"{'='*60}") if __name__ == "__main__": main()