302 lines
11 KiB
Python
302 lines
11 KiB
Python
"""
|
|
Pre-Analyzer (crash-safe)
|
|
Location: /home/pi/docker/navidrome/companion_api/pre_analyze.py
|
|
|
|
Each track is analyzed in a subprocess. If librosa OOMs or hangs on a large FLAC,
|
|
only the child process dies — the parent logs the failure and moves on.
|
|
|
|
Usage:
|
|
docker compose exec music-companion python pre_analyze.py
|
|
docker compose exec music-companion python pre_analyze.py --force
|
|
docker compose exec music-companion python pre_analyze.py --dj
|
|
docker compose exec music-companion python pre_analyze.py --vis
|
|
docker compose exec music-companion python pre_analyze.py --skip-large 500
|
|
"""
|
|
import os, sys, json, hashlib, sqlite3, subprocess, time, warnings, multiprocessing
|
|
|
|
warnings.filterwarnings("ignore")
|
|
|
|
MUSIC_DIR = os.getenv("MUSIC_DIR", "/music")
|
|
DB_PATH = os.getenv("DB_PATH", "/app/data/smart_dj.db")
|
|
VIS_CACHE_DIR = os.getenv("VIS_CACHE_DIR", "/app/data/vis_cache")
|
|
SUPPORTED = ('.mp3', '.flac', '.m4a', '.ogg', '.opus', '.wav')
|
|
TRACK_TIMEOUT = int(os.getenv("TRACK_TIMEOUT", "180")) # 3 min per track
|
|
|
|
|
|
def init_db():
|
|
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
|
|
with sqlite3.connect(DB_PATH) as c:
|
|
c.execute("""CREATE TABLE IF NOT EXISTS dj_profiles (
|
|
file_path TEXT PRIMARY KEY, bpm REAL,
|
|
silence_start REAL, silence_end REAL,
|
|
loudness_lufs REAL,
|
|
analyzed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)""")
|
|
|
|
|
|
def is_dj_done(path):
|
|
try:
|
|
with sqlite3.connect(DB_PATH) as c:
|
|
return c.execute("SELECT 1 FROM dj_profiles WHERE file_path=?", (path,)).fetchone() is not None
|
|
except:
|
|
return False
|
|
|
|
|
|
def vis_path(path):
|
|
return os.path.join(VIS_CACHE_DIR, hashlib.md5(path.encode()).hexdigest() + ".json")
|
|
|
|
|
|
def is_vis_done(path):
|
|
return os.path.exists(vis_path(path))
|
|
|
|
|
|
def fmt(mb):
|
|
return f"{mb/1024:.2f} GB" if mb >= 1024 else f"{mb:.1f} MB"
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════
|
|
# CHILD PROCESS — runs in isolation, can be killed without
|
|
# taking down the parent
|
|
# ═══════════════════════════════════════════════════════════
|
|
|
|
def _worker(full_path, do_dj, do_vis, result_dict):
|
|
"""Runs in a child process. Writes results to shared dict."""
|
|
import re, gc, warnings
|
|
warnings.filterwarnings("ignore")
|
|
import numpy as np
|
|
|
|
dj_ok = False
|
|
vis_ok = False
|
|
error_msg = None
|
|
|
|
# ── DJ Analysis ───────────────────────────────────────
|
|
if do_dj:
|
|
try:
|
|
import librosa
|
|
|
|
# ffmpeg for silence + loudness (streams, low memory)
|
|
cmd = ["ffmpeg", "-hide_banner", "-i", full_path,
|
|
"-af", "silencedetect=noise=-50dB:d=0.5,ebur128",
|
|
"-f", "null", "-"]
|
|
r = subprocess.run(cmd, capture_output=True, text=True, timeout=90)
|
|
out = r.stderr
|
|
|
|
ss = re.findall(r"silence_start: ([\d\.]+)", out)
|
|
se = re.findall(r"silence_end: ([\d\.]+)", out)
|
|
lu = re.search(r"I:\s+([\-\d\.]+) LUFS", out)
|
|
|
|
# trailing silence start = last silence_start (crossfade trigger)
|
|
sil_start = float(ss[-1]) if ss else 0.0
|
|
# leading silence end = first silence_end (skip-to point)
|
|
sil_end = float(se[0]) if se else 0.0
|
|
loudness = float(lu.group(1)) if lu else -14.0
|
|
|
|
# Sanity checks
|
|
if sil_end > 10.0:
|
|
sil_end = 0.0
|
|
dur_match = re.search(r"Duration: (\d+):(\d+):(\d+\.\d+)", out)
|
|
if dur_match:
|
|
total_dur = int(dur_match.group(1)) * 3600 + int(dur_match.group(2)) * 60 + float(dur_match.group(3))
|
|
if sil_start < total_dur * 0.5:
|
|
sil_start = total_dur
|
|
|
|
# BPM — load only 30s at low rate
|
|
y, sr = librosa.load(full_path, sr=22050, duration=30)
|
|
tempo, _ = librosa.beat.beat_track(y=y, sr=sr)
|
|
del y; gc.collect()
|
|
|
|
try:
|
|
bpm = float(tempo)
|
|
except TypeError:
|
|
bpm = float(tempo[0]) if tempo is not None else 0.0
|
|
|
|
with sqlite3.connect(DB_PATH) as c:
|
|
c.execute(
|
|
"INSERT OR REPLACE INTO dj_profiles VALUES (?,?,?,?,?,CURRENT_TIMESTAMP)",
|
|
(full_path, round(bpm,1), round(sil_start,3),
|
|
round(sil_end,3), round(loudness,1)))
|
|
dj_ok = True
|
|
|
|
except subprocess.TimeoutExpired:
|
|
error_msg = "ffmpeg timeout (>90s)"
|
|
except MemoryError:
|
|
error_msg = "OUT OF MEMORY during DJ analysis"
|
|
except Exception as e:
|
|
error_msg = f"DJ: {e}"
|
|
|
|
# ── Vis Frames ────────────────────────────────────────
|
|
if do_vis:
|
|
cache_file = vis_path(full_path)
|
|
if os.path.exists(cache_file):
|
|
vis_ok = False # already cached
|
|
else:
|
|
try:
|
|
import librosa
|
|
|
|
# Load at 22050 to save memory
|
|
y, sr = librosa.load(full_path, sr=22050, mono=True)
|
|
fps = 30.0; fft_size = 1024; pts = 20
|
|
hop = int(sr / fps)
|
|
frames = []
|
|
|
|
for start in range(0, len(y) - fft_size, hop):
|
|
chunk = y[start:start+fft_size] * np.hanning(fft_size)
|
|
spec = np.sqrt(np.abs(np.fft.rfft(chunk)) / fft_size)
|
|
half = len(spec); cutoff = min(half-1, 90)
|
|
fp = []
|
|
for i in range(pts):
|
|
ni = (i+1)/pts; li = np.log10(ni*9+1)
|
|
cb = li*cutoff; bw = max(1, cutoff/pts*li)
|
|
sb = max(1, int(cb-bw/2)); eb = min(cutoff, int(cb+bw/2))
|
|
avg = float(np.mean(spec[sb:eb+1])) if sb<=eb<half else 0
|
|
fp.append(avg * (1 + i/pts*3.5))
|
|
frames.append(fp)
|
|
|
|
del y; gc.collect()
|
|
|
|
# Normalize
|
|
vals = sorted(v for f in frames for v in f if v > 0.001)
|
|
if vals:
|
|
p95 = vals[min(int(len(vals)*0.95), len(vals)-1)]
|
|
if p95 > 0.001:
|
|
s = 0.8/p95
|
|
frames = [[min(1.0, v*s) for v in f] for f in frames]
|
|
|
|
with open(cache_file, "w") as f:
|
|
json.dump(frames, f)
|
|
del frames; gc.collect()
|
|
vis_ok = True
|
|
|
|
except MemoryError:
|
|
error_msg = (error_msg + " | " if error_msg else "") + "OUT OF MEMORY during vis"
|
|
except Exception as e:
|
|
error_msg = (error_msg + " | " if error_msg else "") + f"Vis: {e}"
|
|
|
|
result_dict["dj"] = dj_ok
|
|
result_dict["vis"] = vis_ok
|
|
result_dict["error"] = error_msg
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════
|
|
# MAIN — runs workers as subprocesses
|
|
# ═══════════════════════════════════════════════════════════
|
|
|
|
def scan(force=False, dj_only=False, vis_only=False, skip_large_mb=0):
|
|
init_db()
|
|
os.makedirs(VIS_CACHE_DIR, exist_ok=True)
|
|
|
|
print(f"🔍 Scanning {MUSIC_DIR}...")
|
|
tracks = []
|
|
total_bytes = 0
|
|
for root, _, files in os.walk(MUSIC_DIR):
|
|
for f in files:
|
|
if f.lower().endswith(SUPPORTED):
|
|
fp = os.path.join(root, f)
|
|
tracks.append(fp)
|
|
total_bytes += os.path.getsize(fp)
|
|
|
|
total_mb = total_bytes / 1048576
|
|
mode = "FORCE" if force else "missing"
|
|
what = "DJ only" if dj_only else "Vis only" if vis_only else "DJ + Vis"
|
|
print(f"🚀 {len(tracks)} tracks ({fmt(total_mb)}) — {what} ({mode})")
|
|
print(f" Timeout: {TRACK_TIMEOUT}s per track")
|
|
if skip_large_mb:
|
|
print(f" Skipping files > {skip_large_mb} MB")
|
|
print(f" Each track runs in a subprocess (crash-safe)")
|
|
print()
|
|
|
|
dj_n = vis_n = skip_n = fail_n = 0
|
|
t0 = time.time()
|
|
|
|
for idx, path in enumerate(tracks):
|
|
name = os.path.basename(path)
|
|
size_mb = os.path.getsize(path) / 1048576
|
|
tag = f"[{idx+1}/{len(tracks)}]"
|
|
|
|
# Skip oversized
|
|
if skip_large_mb and size_mb > skip_large_mb:
|
|
print(f" {tag} ⏭ SKIP ({fmt(size_mb)} > {skip_large_mb}MB): {name}")
|
|
skip_n += 1
|
|
continue
|
|
|
|
# Check what's needed
|
|
need_dj = not vis_only and (force or not is_dj_done(path))
|
|
need_vis = not dj_only and (force or not is_vis_done(path))
|
|
|
|
if not need_dj and not need_vis:
|
|
skip_n += 1
|
|
continue
|
|
|
|
tasks = []
|
|
if need_dj: tasks.append("DJ")
|
|
if need_vis: tasks.append("Vis")
|
|
|
|
print(f" {tag} 🎵 {'+'.join(tasks)} ({fmt(size_mb)}): {name}", end="", flush=True)
|
|
t1 = time.time()
|
|
|
|
# Run in subprocess with shared dict for results
|
|
manager = multiprocessing.Manager()
|
|
result = manager.dict({"dj": False, "vis": False, "error": None})
|
|
|
|
proc = multiprocessing.Process(
|
|
target=_worker,
|
|
args=(path, need_dj, need_vis, result)
|
|
)
|
|
proc.start()
|
|
proc.join(timeout=TRACK_TIMEOUT)
|
|
|
|
elapsed = time.time() - t1
|
|
|
|
if proc.is_alive():
|
|
# Timed out — kill it
|
|
proc.kill()
|
|
proc.join()
|
|
print(f" ⏰ KILLED after {elapsed:.0f}s (timeout)")
|
|
fail_n += 1
|
|
continue
|
|
|
|
if proc.exitcode != 0 and proc.exitcode is not None:
|
|
# Crashed (OOM, segfault, etc.)
|
|
print(f" 💥 CRASHED (exit code {proc.exitcode}, {elapsed:.1f}s)")
|
|
fail_n += 1
|
|
continue
|
|
|
|
# Success path
|
|
err = result.get("error")
|
|
if result.get("dj"):
|
|
dj_n += 1
|
|
if result.get("vis"):
|
|
vis_n += 1
|
|
|
|
if err:
|
|
print(f" ⚠ {elapsed:.1f}s — {err}")
|
|
fail_n += 1
|
|
else:
|
|
print(f" ✓ {elapsed:.1f}s")
|
|
|
|
total_elapsed = time.time() - t0
|
|
m = int(total_elapsed // 60)
|
|
s = int(total_elapsed % 60)
|
|
|
|
print(f"\n✨ Done in {m}m {s}s")
|
|
print(f" DJ profiles: {dj_n} new")
|
|
print(f" Vis frames: {vis_n} new")
|
|
print(f" Skipped: {skip_n} (already done)")
|
|
if fail_n:
|
|
print(f" ⚠ Failed: {fail_n} (see errors above)")
|
|
print(f" Tip: re-run with --skip-large 200 to skip huge FLACs")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
args = sys.argv[1:]
|
|
force = "--force" in args
|
|
dj_only = "--dj" in args
|
|
vis_only = "--vis" in args
|
|
|
|
skip_large = 0
|
|
if "--skip-large" in args:
|
|
i = args.index("--skip-large")
|
|
if i + 1 < len(args):
|
|
try: skip_large = int(args[i+1])
|
|
except: pass
|
|
|
|
scan(force=force, dj_only=dj_only, vis_only=vis_only, skip_large_mb=skip_large)
|