NavidromeApp/companion-api/pre_analyze.py

303 lines
11 KiB
Python
Raw Normal View History

2026-04-06 11:57:16 -07:00
"""
Pre-Analyzer (crash-safe)
Location: /home/pi/docker/navidrome/companion_api/pre_analyze.py
Each track is analyzed in a subprocess. If librosa OOMs or hangs on a large FLAC,
only the child process dies the parent logs the failure and moves on.
Usage:
docker compose exec music-companion python pre_analyze.py
docker compose exec music-companion python pre_analyze.py --force
docker compose exec music-companion python pre_analyze.py --dj
docker compose exec music-companion python pre_analyze.py --vis
docker compose exec music-companion python pre_analyze.py --skip-large 500
"""
import os, sys, json, hashlib, sqlite3, subprocess, time, warnings, multiprocessing
warnings.filterwarnings("ignore")
MUSIC_DIR = os.getenv("MUSIC_DIR", "/music")
DB_PATH = os.getenv("DB_PATH", "/app/data/smart_dj.db")
VIS_CACHE_DIR = os.getenv("VIS_CACHE_DIR", "/app/data/vis_cache")
SUPPORTED = ('.mp3', '.flac', '.m4a', '.ogg', '.opus', '.wav')
TRACK_TIMEOUT = int(os.getenv("TRACK_TIMEOUT", "180")) # 3 min per track
def init_db():
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
with sqlite3.connect(DB_PATH) as c:
c.execute("""CREATE TABLE IF NOT EXISTS dj_profiles (
file_path TEXT PRIMARY KEY, bpm REAL,
silence_start REAL, silence_end REAL,
loudness_lufs REAL,
analyzed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)""")
def is_dj_done(path):
try:
with sqlite3.connect(DB_PATH) as c:
return c.execute("SELECT 1 FROM dj_profiles WHERE file_path=?", (path,)).fetchone() is not None
except:
return False
def vis_path(path):
return os.path.join(VIS_CACHE_DIR, hashlib.md5(path.encode()).hexdigest() + ".json")
def is_vis_done(path):
return os.path.exists(vis_path(path))
def fmt(mb):
return f"{mb/1024:.2f} GB" if mb >= 1024 else f"{mb:.1f} MB"
# ═══════════════════════════════════════════════════════════
# CHILD PROCESS — runs in isolation, can be killed without
# taking down the parent
# ═══════════════════════════════════════════════════════════
def _worker(full_path, do_dj, do_vis, result_dict):
"""Runs in a child process. Writes results to shared dict."""
import re, gc, warnings
warnings.filterwarnings("ignore")
import numpy as np
dj_ok = False
vis_ok = False
error_msg = None
# ── DJ Analysis ───────────────────────────────────────
if do_dj:
try:
import librosa
# ffmpeg for silence + loudness (streams, low memory)
cmd = ["ffmpeg", "-hide_banner", "-i", full_path,
"-af", "silencedetect=noise=-50dB:d=0.5,ebur128",
"-f", "null", "-"]
r = subprocess.run(cmd, capture_output=True, text=True, timeout=90)
out = r.stderr
ss = re.findall(r"silence_start: ([\d\.]+)", out)
se = re.findall(r"silence_end: ([\d\.]+)", out)
lu = re.search(r"I:\s+([\-\d\.]+) LUFS", out)
# trailing silence start = last silence_start (crossfade trigger)
sil_start = float(ss[-1]) if ss else 0.0
# leading silence end = first silence_end (skip-to point)
sil_end = float(se[0]) if se else 0.0
loudness = float(lu.group(1)) if lu else -14.0
# Sanity checks
if sil_end > 10.0:
sil_end = 0.0
dur_match = re.search(r"Duration: (\d+):(\d+):(\d+\.\d+)", out)
if dur_match:
total_dur = int(dur_match.group(1)) * 3600 + int(dur_match.group(2)) * 60 + float(dur_match.group(3))
if sil_start < total_dur * 0.5:
sil_start = total_dur
# BPM — load only 30s at low rate
y, sr = librosa.load(full_path, sr=22050, duration=30)
tempo, _ = librosa.beat.beat_track(y=y, sr=sr)
del y; gc.collect()
try:
bpm = float(tempo)
except TypeError:
bpm = float(tempo[0]) if tempo is not None else 0.0
with sqlite3.connect(DB_PATH) as c:
c.execute(
"INSERT OR REPLACE INTO dj_profiles VALUES (?,?,?,?,?,CURRENT_TIMESTAMP)",
(full_path, round(bpm,1), round(sil_start,3),
round(sil_end,3), round(loudness,1)))
dj_ok = True
except subprocess.TimeoutExpired:
error_msg = "ffmpeg timeout (>90s)"
except MemoryError:
error_msg = "OUT OF MEMORY during DJ analysis"
except Exception as e:
error_msg = f"DJ: {e}"
# ── Vis Frames ────────────────────────────────────────
if do_vis:
cache_file = vis_path(full_path)
if os.path.exists(cache_file):
vis_ok = False # already cached
else:
try:
import librosa
# Load at 22050 to save memory
y, sr = librosa.load(full_path, sr=22050, mono=True)
fps = 30.0; fft_size = 1024; pts = 20
hop = int(sr / fps)
frames = []
for start in range(0, len(y) - fft_size, hop):
chunk = y[start:start+fft_size] * np.hanning(fft_size)
spec = np.sqrt(np.abs(np.fft.rfft(chunk)) / fft_size)
half = len(spec); cutoff = min(half-1, 90)
fp = []
for i in range(pts):
ni = (i+1)/pts; li = np.log10(ni*9+1)
cb = li*cutoff; bw = max(1, cutoff/pts*li)
sb = max(1, int(cb-bw/2)); eb = min(cutoff, int(cb+bw/2))
avg = float(np.mean(spec[sb:eb+1])) if sb<=eb<half else 0
fp.append(avg * (1 + i/pts*3.5))
frames.append(fp)
del y; gc.collect()
# Normalize
vals = sorted(v for f in frames for v in f if v > 0.001)
if vals:
p95 = vals[min(int(len(vals)*0.95), len(vals)-1)]
if p95 > 0.001:
s = 0.8/p95
frames = [[min(1.0, v*s) for v in f] for f in frames]
with open(cache_file, "w") as f:
json.dump(frames, f)
del frames; gc.collect()
vis_ok = True
except MemoryError:
error_msg = (error_msg + " | " if error_msg else "") + "OUT OF MEMORY during vis"
except Exception as e:
error_msg = (error_msg + " | " if error_msg else "") + f"Vis: {e}"
result_dict["dj"] = dj_ok
result_dict["vis"] = vis_ok
result_dict["error"] = error_msg
# ═══════════════════════════════════════════════════════════
# MAIN — runs workers as subprocesses
# ═══════════════════════════════════════════════════════════
def scan(force=False, dj_only=False, vis_only=False, skip_large_mb=0):
init_db()
os.makedirs(VIS_CACHE_DIR, exist_ok=True)
print(f"🔍 Scanning {MUSIC_DIR}...")
tracks = []
total_bytes = 0
for root, _, files in os.walk(MUSIC_DIR):
for f in files:
if f.lower().endswith(SUPPORTED):
fp = os.path.join(root, f)
tracks.append(fp)
total_bytes += os.path.getsize(fp)
total_mb = total_bytes / 1048576
mode = "FORCE" if force else "missing"
what = "DJ only" if dj_only else "Vis only" if vis_only else "DJ + Vis"
print(f"🚀 {len(tracks)} tracks ({fmt(total_mb)}) — {what} ({mode})")
print(f" Timeout: {TRACK_TIMEOUT}s per track")
if skip_large_mb:
print(f" Skipping files > {skip_large_mb} MB")
print(f" Each track runs in a subprocess (crash-safe)")
print()
dj_n = vis_n = skip_n = fail_n = 0
t0 = time.time()
for idx, path in enumerate(tracks):
name = os.path.basename(path)
size_mb = os.path.getsize(path) / 1048576
tag = f"[{idx+1}/{len(tracks)}]"
# Skip oversized
if skip_large_mb and size_mb > skip_large_mb:
print(f" {tag} ⏭ SKIP ({fmt(size_mb)} > {skip_large_mb}MB): {name}")
skip_n += 1
continue
# Check what's needed
need_dj = not vis_only and (force or not is_dj_done(path))
need_vis = not dj_only and (force or not is_vis_done(path))
if not need_dj and not need_vis:
skip_n += 1
continue
tasks = []
if need_dj: tasks.append("DJ")
if need_vis: tasks.append("Vis")
print(f" {tag} 🎵 {'+'.join(tasks)} ({fmt(size_mb)}): {name}", end="", flush=True)
t1 = time.time()
# Run in subprocess with shared dict for results
manager = multiprocessing.Manager()
result = manager.dict({"dj": False, "vis": False, "error": None})
proc = multiprocessing.Process(
target=_worker,
args=(path, need_dj, need_vis, result)
)
proc.start()
proc.join(timeout=TRACK_TIMEOUT)
elapsed = time.time() - t1
if proc.is_alive():
# Timed out — kill it
proc.kill()
proc.join()
print(f" ⏰ KILLED after {elapsed:.0f}s (timeout)")
fail_n += 1
continue
if proc.exitcode != 0 and proc.exitcode is not None:
# Crashed (OOM, segfault, etc.)
print(f" 💥 CRASHED (exit code {proc.exitcode}, {elapsed:.1f}s)")
fail_n += 1
continue
# Success path
err = result.get("error")
if result.get("dj"):
dj_n += 1
if result.get("vis"):
vis_n += 1
if err:
print(f"{elapsed:.1f}s — {err}")
fail_n += 1
else:
print(f"{elapsed:.1f}s")
total_elapsed = time.time() - t0
m = int(total_elapsed // 60)
s = int(total_elapsed % 60)
print(f"\n✨ Done in {m}m {s}s")
print(f" DJ profiles: {dj_n} new")
print(f" Vis frames: {vis_n} new")
print(f" Skipped: {skip_n} (already done)")
if fail_n:
print(f" ⚠ Failed: {fail_n} (see errors above)")
print(f" Tip: re-run with --skip-large 200 to skip huge FLACs")
if __name__ == "__main__":
args = sys.argv[1:]
force = "--force" in args
dj_only = "--dj" in args
vis_only = "--vis" in args
skip_large = 0
if "--skip-large" in args:
i = args.index("--skip-large")
if i + 1 < len(args):
try: skip_large = int(args[i+1])
except: pass
scan(force=force, dj_only=dj_only, vis_only=vis_only, skip_large_mb=skip_large)