1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
|
#!/usr/bin/env python3
"""Analyse audio tracks with Essentia. Run daily via cron.
Use venv: /var/lib/radio/venv/bin/python3 /var/lib/radio/analyse_tracks.py
"""
import math
import os
import sqlite3
import subprocess
import sys
import tempfile
import time
from pathlib import Path
FEATURES_DB = "/var/lib/radio/audio_features.db"
MUSIC_ROOT = "/disks/Plex/Music/"
SUPPORTED = {".flac", ".mp3", ".ogg", ".opus", ".m4a", ".wav"}
# Energy percentile breakpoints from bulk analysis of ~6500 tracks.
# Maps raw log-energy values to even 0-1 distribution.
# Index 0 = 0th percentile, index 20 = 100th percentile (every 5%).
ENERGY_BREAKPOINTS = [
0.0, 0.7148, 0.7581, 0.7801, 0.7999, 0.8163, 0.8289, 0.8401,
0.8493, 0.8587, 0.8667, 0.8744, 0.8811, 0.8877, 0.8949, 0.9018,
0.9089, 0.9169, 0.9252, 0.9377, 1.0,
]
def normalize_energy(raw_log_energy: float) -> float:
"""Map a raw log-energy value to 0-1 using the library's percentile curve."""
bp = ENERGY_BREAKPOINTS
if raw_log_energy <= bp[0]:
return 0.0
if raw_log_energy >= bp[-1]:
return 1.0
# Find which bucket it falls in and interpolate
for i in range(1, len(bp)):
if raw_log_energy <= bp[i]:
frac = (raw_log_energy - bp[i - 1]) / (bp[i] - bp[i - 1]) if bp[i] != bp[i - 1] else 0.5
return round((i - 1 + frac) / (len(bp) - 1), 4)
return 1.0
def init_db():
"""Create features DB and table if needed."""
conn = sqlite3.connect(FEATURES_DB)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("""
CREATE TABLE IF NOT EXISTS features (
path TEXT PRIMARY KEY,
energy REAL,
bpm REAL,
loudness REAL,
danceability REAL,
analysed_at REAL
)
""")
conn.commit()
return conn
def get_analysed_paths(conn):
"""Return set of already-analysed file paths."""
cur = conn.execute("SELECT path FROM features")
return {row[0] for row in cur}
def find_audio_files():
"""Walk music root for all supported audio files."""
root = Path(MUSIC_ROOT)
files = []
for p in root.rglob("*"):
if p.suffix.lower() in SUPPORTED and p.is_file():
files.append(str(p))
return files
def analyse_track(filepath):
"""Extract audio features using Essentia. Returns dict or None on error."""
try:
import essentia
import essentia.standard as es
except ImportError:
print("Error: essentia not installed. Install with: pip install essentia", file=sys.stderr)
sys.exit(1)
# Essentia's bundled decoder segfaults on opus and struggles with some
# m4a/ogg files. Pre-decode these to a temp WAV via ffmpeg.
NEEDS_PREDECODE = {".opus", ".ogg", ".m4a"}
tmp_wav = None
load_path = filepath
if Path(filepath).suffix.lower() in NEEDS_PREDECODE:
try:
tmp_wav = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
tmp_wav.close()
subprocess.run(
["ffmpeg", "-y", "-i", filepath, "-ac", "1", "-ar", "22050", "-sample_fmt", "s16", "-f", "wav", tmp_wav.name],
capture_output=True, timeout=15,
)
load_path = tmp_wav.name
except Exception as e:
print(f" Skipping (ffmpeg decode error): {e}", file=sys.stderr)
if tmp_wav and os.path.exists(tmp_wav.name):
os.unlink(tmp_wav.name)
return None
try:
sr = 22050 if tmp_wav else 44100
audio = es.MonoLoader(filename=load_path, sampleRate=sr)()
except Exception as e:
print(f" Skipping (load error): {e}", file=sys.stderr)
if tmp_wav and os.path.exists(tmp_wav.name):
os.unlink(tmp_wav.name)
return None
finally:
if tmp_wav and os.path.exists(tmp_wav.name):
os.unlink(tmp_wav.name)
# Energy — raw log-energy mapped through percentile curve
try:
raw_energy = es.Energy()(audio)
raw_log = min(1.0, math.log1p(raw_energy) / 15.0)
energy_norm = normalize_energy(raw_log)
energy_raw = raw_log
except Exception:
energy_norm = 0.5
energy_raw = 0.5
# BPM
try:
bpm, _, _, _, _ = es.RhythmExtractor2013()(audio)
if bpm < 30 or bpm > 300:
bpm = 0.0
except Exception:
bpm = 0.0
# Loudness
try:
loudness = es.Loudness()(audio)
except Exception:
loudness = 0.0
# Danceability
try:
danceability, _ = es.Danceability()(audio)
except Exception:
danceability = 0.0
return {
"energy": round(energy_norm, 4),
"energy_raw": round(energy_raw, 4),
"bpm": round(bpm, 2),
"loudness": round(loudness, 4),
"danceability": round(danceability, 4),
}
def main():
print(f"Essentia track analyser — {time.strftime('%Y-%m-%d %H:%M:%S')}")
conn = init_db()
analysed = get_analysed_paths(conn)
print(f"Already analysed: {len(analysed)} tracks")
all_files = find_audio_files()
print(f"Found on disk: {len(all_files)} audio files")
pending = [f for f in all_files if f not in analysed]
print(f"To analyse: {len(pending)} new tracks")
if not pending:
print("Nothing to do.")
conn.close()
return
done = 0
errors = 0
start = time.time()
for i, filepath in enumerate(pending):
if (i + 1) % 100 == 0 or i == 0:
elapsed = time.time() - start
rate = (done / elapsed) if elapsed > 0 and done > 0 else 0
eta = ((len(pending) - i) / rate / 60) if rate > 0 else 0
print(f" [{i+1}/{len(pending)}] {rate:.1f} tracks/sec, ETA {eta:.0f} min")
features = analyse_track(filepath)
if features is None:
errors += 1
continue
try:
conn.execute(
"INSERT OR REPLACE INTO features (path, energy, energy_raw, bpm, loudness, danceability, analysed_at) VALUES (?, ?, ?, ?, ?, ?, ?)",
(filepath, features["energy"], features["energy_raw"], features["bpm"], features["loudness"], features["danceability"], time.time()),
)
if done % 50 == 0:
conn.commit()
done += 1
except Exception as e:
print(f" DB error for {filepath}: {e}", file=sys.stderr)
errors += 1
conn.commit()
conn.close()
elapsed = time.time() - start
print(f"Done: {done} analysed, {errors} errors, {elapsed:.1f}s total")
if __name__ == "__main__":
main()
|