#!/usr/bin/env python3 """Analyse audio tracks with Essentia. Run daily via cron. Use venv: /var/lib/radio/venv/bin/python3 /var/lib/radio/analyse_tracks.py """ import math import os import sqlite3 import subprocess import sys import tempfile import time from pathlib import Path FEATURES_DB = "/var/lib/radio/audio_features.db" MUSIC_ROOT = "/disks/Plex/Music/" SUPPORTED = {".flac", ".mp3", ".ogg", ".opus", ".m4a", ".wav"} # Energy percentile breakpoints from bulk analysis of ~6500 tracks. # Maps raw log-energy values to even 0-1 distribution. # Index 0 = 0th percentile, index 20 = 100th percentile (every 5%). ENERGY_BREAKPOINTS = [ 0.0, 0.7148, 0.7581, 0.7801, 0.7999, 0.8163, 0.8289, 0.8401, 0.8493, 0.8587, 0.8667, 0.8744, 0.8811, 0.8877, 0.8949, 0.9018, 0.9089, 0.9169, 0.9252, 0.9377, 1.0, ] def normalize_energy(raw_log_energy: float) -> float: """Map a raw log-energy value to 0-1 using the library's percentile curve.""" bp = ENERGY_BREAKPOINTS if raw_log_energy <= bp[0]: return 0.0 if raw_log_energy >= bp[-1]: return 1.0 # Find which bucket it falls in and interpolate for i in range(1, len(bp)): if raw_log_energy <= bp[i]: frac = (raw_log_energy - bp[i - 1]) / (bp[i] - bp[i - 1]) if bp[i] != bp[i - 1] else 0.5 return round((i - 1 + frac) / (len(bp) - 1), 4) return 1.0 def init_db(): """Create features DB and table if needed.""" conn = sqlite3.connect(FEATURES_DB) conn.execute("PRAGMA journal_mode=WAL") conn.execute(""" CREATE TABLE IF NOT EXISTS features ( path TEXT PRIMARY KEY, energy REAL, bpm REAL, loudness REAL, danceability REAL, analysed_at REAL ) """) conn.commit() return conn def get_analysed_paths(conn): """Return set of already-analysed file paths.""" cur = conn.execute("SELECT path FROM features") return {row[0] for row in cur} def find_audio_files(): """Walk music root for all supported audio files.""" root = Path(MUSIC_ROOT) files = [] for p in root.rglob("*"): if p.suffix.lower() in SUPPORTED and p.is_file(): files.append(str(p)) return files def analyse_track(filepath): """Extract audio features using Essentia. Returns dict or None on error.""" try: import essentia import essentia.standard as es except ImportError: print("Error: essentia not installed. Install with: pip install essentia", file=sys.stderr) sys.exit(1) # Essentia's bundled decoder segfaults on opus and struggles with some # m4a/ogg files. Pre-decode these to a temp WAV via ffmpeg. NEEDS_PREDECODE = {".opus", ".ogg", ".m4a"} tmp_wav = None load_path = filepath if Path(filepath).suffix.lower() in NEEDS_PREDECODE: try: tmp_wav = tempfile.NamedTemporaryFile(suffix=".wav", delete=False) tmp_wav.close() subprocess.run( ["ffmpeg", "-y", "-i", filepath, "-ac", "1", "-ar", "22050", "-sample_fmt", "s16", "-f", "wav", tmp_wav.name], capture_output=True, timeout=15, ) load_path = tmp_wav.name except Exception as e: print(f" Skipping (ffmpeg decode error): {e}", file=sys.stderr) if tmp_wav and os.path.exists(tmp_wav.name): os.unlink(tmp_wav.name) return None try: sr = 22050 if tmp_wav else 44100 audio = es.MonoLoader(filename=load_path, sampleRate=sr)() except Exception as e: print(f" Skipping (load error): {e}", file=sys.stderr) if tmp_wav and os.path.exists(tmp_wav.name): os.unlink(tmp_wav.name) return None finally: if tmp_wav and os.path.exists(tmp_wav.name): os.unlink(tmp_wav.name) # Energy — raw log-energy mapped through percentile curve try: raw_energy = es.Energy()(audio) raw_log = min(1.0, math.log1p(raw_energy) / 15.0) energy_norm = normalize_energy(raw_log) energy_raw = raw_log except Exception: energy_norm = 0.5 energy_raw = 0.5 # BPM try: bpm, _, _, _, _ = es.RhythmExtractor2013()(audio) if bpm < 30 or bpm > 300: bpm = 0.0 except Exception: bpm = 0.0 # Loudness try: loudness = es.Loudness()(audio) except Exception: loudness = 0.0 # Danceability try: danceability, _ = es.Danceability()(audio) except Exception: danceability = 0.0 return { "energy": round(energy_norm, 4), "energy_raw": round(energy_raw, 4), "bpm": round(bpm, 2), "loudness": round(loudness, 4), "danceability": round(danceability, 4), } def main(): print(f"Essentia track analyser — {time.strftime('%Y-%m-%d %H:%M:%S')}") conn = init_db() analysed = get_analysed_paths(conn) print(f"Already analysed: {len(analysed)} tracks") all_files = find_audio_files() print(f"Found on disk: {len(all_files)} audio files") pending = [f for f in all_files if f not in analysed] print(f"To analyse: {len(pending)} new tracks") if not pending: print("Nothing to do.") conn.close() return done = 0 errors = 0 start = time.time() for i, filepath in enumerate(pending): if (i + 1) % 100 == 0 or i == 0: elapsed = time.time() - start rate = (done / elapsed) if elapsed > 0 and done > 0 else 0 eta = ((len(pending) - i) / rate / 60) if rate > 0 else 0 print(f" [{i+1}/{len(pending)}] {rate:.1f} tracks/sec, ETA {eta:.0f} min") features = analyse_track(filepath) if features is None: errors += 1 continue try: conn.execute( "INSERT OR REPLACE INTO features (path, energy, energy_raw, bpm, loudness, danceability, analysed_at) VALUES (?, ?, ?, ?, ?, ?, ?)", (filepath, features["energy"], features["energy_raw"], features["bpm"], features["loudness"], features["danceability"], time.time()), ) if done % 50 == 0: conn.commit() done += 1 except Exception as e: print(f" DB error for {filepath}: {e}", file=sys.stderr) errors += 1 conn.commit() conn.close() elapsed = time.time() - start print(f"Done: {done} analysed, {errors} errors, {elapsed:.1f}s total") if __name__ == "__main__": main()