#!/usr/bin/env python3 """Shared core logic for Radio Susan DJ — used by radio_dj.py and generate_daily_playlist.py.""" import json import math import os import random import sqlite3 import time from pathlib import Path # ── Paths ────────────────────────────────────────────────────────────────────── BEETS_DB = "/home/susan/.config/beets/library.db" NAVIDROME_DB = "/var/lib/navidrome/navidrome.db" GENRE_CACHE = "/var/lib/radio/genre_cache.json" FEATURES_DB = "/var/lib/radio/audio_features.db" STATE_DB = "/var/lib/radio/dj_state.db" MUSIC_ROOT = "/disks/Plex/Music/" SUPPORTED = {".flac", ".mp3", ".ogg", ".opus", ".m4a", ".wav"} # ── Energy curve (hour → target energy 1-10) ────────────────────────────────── ENERGY_CURVE = { 0: 1, 1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 2, 8: 3, 9: 4, 10: 5, 11: 5, 12: 4, 13: 4, 14: 5, 15: 6, 16: 6, 17: 5, 18: 6, 19: 7, 20: 8, 21: 7, 22: 4, 23: 2, } # ── Show definitions ─────────────────────────────────────────────────────────── SHOWS = [ {"name": "Night Drift", "start_hour": 0, "end_hour": 7, "energy": "1", "description": "Ambient, drone, sleep"}, {"name": "Morning Calm", "start_hour": 7, "end_hour": 9, "energy": "2-3", "description": "Downtempo, chill, acoustic"}, {"name": "Late Morning", "start_hour": 9, "end_hour": 12, "energy": "4-5", "description": "Indie, jazz, trip-hop"}, {"name": "Lunch Break", "start_hour": 12, "end_hour": 14, "energy": "4", "description": "Relaxed, soul, dub"}, {"name": "Afternoon Session", "start_hour": 14, "end_hour": 17, "energy": "5-6", "description": "Electronic, funk, hip-hop"}, {"name": "Drive Time", "start_hour": 17, "end_hour": 19, "energy": "5-6", "description": "Mixed energy"}, {"name": "Evening Heat", "start_hour": 19, "end_hour": 22, "energy": "7-8", "description": "Techno, house, DnB, dubstep"}, {"name": "Wind Down", "start_hour": 22, "end_hour": 24, "energy": "2-4", "description": "Comedown, ambient"}, ] # ── Mode weights ─────────────────────────────────────────────────────────────── MODE_WEIGHTS = { "shuffle": 40, "deep_cuts": 20, "new_to_library": 15, "old_favourites": 15, "full_album": 10, } # ── Cooldowns ────────────────────────────────────────────────────────────────── TRACK_COOLDOWN = 7 * 86400 # 7 days ARTIST_COOLDOWN = 3 * 3600 # 3 hours ALBUM_COOLDOWN = 24 * 3600 # 24 hours # ── Genre energy keywords (fallback) ────────────────────────────────────────── ENERGY_KEYWORDS = { 1: ["ambient", "drone", "new age", "sleep", "meditation"], 2: ["downtempo", "chill", "lounge", "easy listening", "bossa nova", "minimal"], 3: ["classical", "chamber", "piano", "acoustic"], 4: ["jazz", "soul", "trip hop", "lo-fi", "lofi", "dub", "blues"], 5: ["indie", "pop", "rock", "shoegaze", "post rock", "alternative", "folk", "dream pop"], 6: ["funk", "disco", "hip hop", "rap", "reggae", "afrobeat"], 7: ["house", "electronic", "electro", "garage", "dubstep", "idm"], 8: ["techno", "trance", "drum and bass", "dnb", "jungle", "breakbeat"], 9: ["industrial", "noise", "hardcore", "metal", "punk"], 10: ["speedcore", "gabber", "power electronics", "harsh noise"], } # ── Station ID variations ───────────────────────────────────────────────────── STATION_IDS = [ "Radio Susan, ninety nine point oh.", "You're listening to Radio Susan.", "Radio Susan.", "Susan Radio, always on.", "This is Radio Susan.", ] TTS_VOICES = [ "en-US-GuyNeural", "en-US-ChristopherNeural", "en-GB-RyanNeural", "en-US-AriaNeural", "en-US-JennyNeural", ] # ── Helpers ──────────────────────────────────────────────────────────────────── def normalize_genre(g): return g.lower().replace("-", " ").replace("_", " ").strip() if g else "" def keyword_energy(genre): g = normalize_genre(genre) if not g: return 5 for score in range(10, 0, -1): for kw in ENERGY_KEYWORDS.get(score, []): if kw in g or g in kw: return score return 5 def essentia_to_scale(e): return max(1, min(10, round(e * 9 + 1))) def open_ro(path): if not os.path.exists(path): return None try: c = sqlite3.connect(f"file:{path}?mode=ro", uri=True) c.row_factory = sqlite3.Row return c except Exception: return None def random_file_fallback(): root = Path(MUSIC_ROOT) candidates = [str(p) for p in root.rglob("*") if p.suffix.lower() in SUPPORTED and p.is_file()] return random.choice(candidates) if candidates else None def get_show_for_hour(hour): for show in SHOWS: if show["start_hour"] <= hour < show["end_hour"]: return show return SHOWS[0] # ── Track loading ────────────────────────────────────────────────────────────── def load_tracks(): """Load all track metadata. Returns list of dicts with path, artist, album, genre, energy, added, title, duration.""" tracks = {} # Genre cache cache = None cache_age = float("inf") if os.path.exists(GENRE_CACHE): try: with open(GENRE_CACHE, "r") as f: cache = json.load(f) cache_age = time.time() - cache.get("built_at", 0) except Exception: pass use_cache = cache is not None and cache_age < 48 * 3600 # Beets DB beets = open_ro(BEETS_DB) beets_data = {} if beets: try: for row in beets.execute("SELECT path, title, artist, album, albumartist, genre, added, length FROM items"): p = row["path"] if isinstance(p, bytes): try: p = p.decode("utf-8") except UnicodeDecodeError: continue beets_data[p] = { "title": row["title"] or "", "artist": row["artist"] or row["albumartist"] or "", "album": row["album"] or "", "genre": row["genre"] or "", "added": row["added"] or 0, "duration": row["length"] or 0, } beets.close() except Exception: pass # Essentia features features = {} feat_conn = open_ro(FEATURES_DB) if feat_conn: try: for row in feat_conn.execute("SELECT path, energy, bpm FROM features"): features[row["path"]] = {"energy": row["energy"], "bpm": row["bpm"]} feat_conn.close() except Exception: pass # Merge if use_cache: for path, info in cache["tracks"].items(): t = { "path": path, "artist": info.get("artist", ""), "album": info.get("album", ""), "title": info.get("title", ""), "genre": info.get("genre", "misc"), "energy": info.get("energy", 5), "added": 0, "duration": 240, } if path in beets_data: bd = beets_data[path] t["added"] = bd["added"] t["duration"] = bd["duration"] if bd["duration"] > 0 else 240 if not t["artist"]: t["artist"] = bd["artist"] if not t["title"]: t["title"] = bd["title"] if path in features: t["energy"] = essentia_to_scale(features[path]["energy"]) tracks[path] = t elif beets_data: for path, bd in beets_data.items(): energy = keyword_energy(bd["genre"]) if path in features: energy = essentia_to_scale(features[path]["energy"]) tracks[path] = { "path": path, "artist": bd["artist"], "album": bd["album"], "title": bd["title"], "genre": normalize_genre(bd["genre"]) or "misc", "energy": energy, "added": bd["added"], "duration": bd["duration"] if bd["duration"] > 0 else 240, } else: root = Path(MUSIC_ROOT) for p in root.rglob("*"): if p.suffix.lower() in SUPPORTED and p.is_file(): sp = str(p) parts = p.relative_to(MUSIC_ROOT).parts artist = parts[0] if len(parts) > 0 else "" album = parts[1] if len(parts) > 1 else "" energy = 5 if sp in features: energy = essentia_to_scale(features[sp]["energy"]) tracks[sp] = { "path": sp, "artist": artist, "album": album, "title": p.stem, "genre": "misc", "energy": energy, "added": 0, "duration": 240, } return list(tracks.values()) def load_navidrome(): nav = open_ro(NAVIDROME_DB) if not nav: return {} try: data = {} rows = nav.execute(""" SELECT mf.path, a.play_count, a.play_date, a.starred FROM media_file mf LEFT JOIN annotation a ON a.item_id = mf.id AND a.item_type = 'media_file' """).fetchall() for r in rows: p = r["path"] if isinstance(p, bytes): try: p = p.decode("utf-8") except UnicodeDecodeError: continue if not p.startswith("/"): p = os.path.join(MUSIC_ROOT, p) data[p] = { "play_count": r["play_count"] or 0, "play_date": r["play_date"] or "", "starred": bool(r["starred"]), } nav.close() return data except Exception: return {} # ── Selection logic ──────────────────────────────────────────────────────────── def filter_by_energy(tracks, target, tolerance=2): pool = [t for t in tracks if abs(t["energy"] - target) <= tolerance] while len(pool) < 20 and tolerance < 9: tolerance += 1 pool = [t for t in tracks if abs(t["energy"] - target) <= tolerance] return pool def apply_cooldowns(pool, cooled_paths, cooled_artists, cooled_albums, last_genre): result = [] for t in pool: if t["path"] in cooled_paths: continue if t["artist"] and t["artist"].lower() in cooled_artists: continue if t["album"] and t["album"].lower() in cooled_albums: continue if last_genre and normalize_genre(t["genre"]) == last_genre: continue result.append(t) return result def pick_mode(): modes = list(MODE_WEIGHTS.keys()) weights = list(MODE_WEIGHTS.values()) return random.choices(modes, weights=weights, k=1)[0] def apply_mode(pool, mode, nav_data, now): if mode == "deep_cuts" and nav_data: deep = [t for t in pool if nav_data.get(t["path"], {}).get("play_count", 0) == 0] if deep: return deep, "deep_cuts" elif mode == "new_to_library": cutoff = now - 2 * 86400 new = [t for t in pool if t["added"] > cutoff] if new: return new, "new_to_library" elif mode == "old_favourites" and nav_data: scored = [(t, nav_data.get(t["path"], {}).get("play_count", 0)) for t in pool] scored.sort(key=lambda x: x[1], reverse=True) top = [s[0] for s in scored[:max(10, len(scored) // 10)] if s[1] > 0] if top: return top, "old_favourites" return pool, "shuffle" def contrast_pick(candidates, recent_energies): if not recent_energies or len(candidates) <= 1: return random.choice(candidates) avg_recent = sum(recent_energies) / len(recent_energies) scored = [(t, abs(t["energy"] - avg_recent)) for t in candidates] scored.sort(key=lambda x: x[1], reverse=True) top = scored[:min(3, len(scored))] weights = [s[1] + 0.5 for s in top] return random.choices([s[0] for s in top], weights=weights, k=1)[0] def find_album_tracks(pool, min_tracks=3): """Group pool by album and return albums with min_tracks+ tracks.""" albums = {} for t in pool: if t["album"]: key = (t["artist"].lower(), t["album"].lower()) albums.setdefault(key, []).append(t) return {k: sorted(v, key=lambda t: t["path"]) for k, v in albums.items() if len(v) >= min_tracks}