NavidromeApp/companion-api/main.py
Dallas Groot 3385b88270 Audio Tap Infrastructure:
- AudioTapProcessor: shared MTAudioProcessingTap with lock-free PCM ring buffer
- Pre-allocated vDSP FFT (1024-sample, Hann window, log-frequency 30-band output)
- Zero per-frame heap allocation in FFT path
- Shared tap serves both FFT visualizer and Shazam simultaneously

Fixes (blockers for tap to work):
- radioGoLive/radioSeekBack now update self.playerItem (was orphaned)
- Tap reinstalled on every AVPlayerItem swap (seek, live, station change)
- Tap removed on background, reinstalled on foreground
- Tap removed on radio→music transition

Shazam rework:
- Uses shared AudioTapProcessor instead of creating its own tap
- Fixes tap conflict where Shazam overwrote FFT audioMix
- 500ms wait for tapPrepare callback (sourceFormat timing race)
- Fixed pre-existing bug: stopAll() audio session never restored after mic fallback

Debug capture:
- Capture Audio Tap button in Visualizer Settings
- Records 5s of raw tap PCM as playable WAV file
- Uses actual stream sample rate (not hardcoded 44100)
- Share sheet via Notification pattern (survives view dismiss)
- Spinner auto-resets on appear if capture interrupted by background

Also includes from main branch:
- Edit History UI, batch undo, companion API 7-bug fix
- Recently Played tab, Discover section, Play Queue sync, Share links"
2026-04-14 17:15:34 -07:00

3800 lines
157 KiB
Python

"""
Navidrome Companion API
Endpoints (existing - unchanged):
GET /health
POST /reindex
PATCH /edit-metadata
PATCH /batch-edit-metadata
POST /undo-batch-edit/{batch_id}
POST /restore-tags?relative_path=...
GET /batch-edit-history
POST /upload-track
POST /upload-tracks
GET /smart-dj/profile
GET /smart-dj/bulk-profiles
GET /smart-dj/profiles/export
POST /bulk-fix
GET /visualizer/frames
POST /visualizer/precompute
WS /ws/push
Endpoints (Phase 1 - library database):
POST /library/scan
POST /library/sync-navidrome-ids
GET /library/songs
GET /library/albums
GET /library/artists
GET /library/search
GET /library/song/{song_id}
GET /library/cover-art/{song_id}
POST /library/cover-art/{song_id}
POST /library/cover-art-by-path
POST /library/artist-photo
GET /library/artist-photo/{artist_name}
Endpoints (Lyrics):
GET /lyrics/search
GET /lyrics/fetch
GET /lyrics/get
POST /lyrics/embed
"""
import os, re, json, asyncio, hashlib, sqlite3, subprocess, shutil, time, warnings, unicodedata
from pathlib import Path
from typing import Optional, List
from contextlib import asynccontextmanager
from urllib.parse import unquote
from datetime import datetime
import httpx
import numpy as np
from fastapi import (FastAPI, HTTPException, UploadFile, File, Form,
BackgroundTasks, WebSocket, WebSocketDisconnect, Query)
from fastapi.responses import JSONResponse, FileResponse
from pydantic import BaseModel
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=FutureWarning)
from mutagen import File as MutagenFile
MUSIC_DIR = os.getenv("MUSIC_DIR", "/music")
DB_PATH = os.getenv("DB_PATH", "/app/data/smart_dj.db")
VIS_CACHE_DIR = os.getenv("VIS_CACHE_DIR", "/app/data/vis_cache")
COVER_ART_DIR = os.getenv("COVER_ART_DIR", "/app/data/cover_art")
TAG_BACKUP_DIR = os.getenv("TAG_BACKUP_DIR", "/app/data/tag_backups")
ARTIST_PHOTO_DIR = os.getenv("ARTIST_PHOTO_DIR", "/app/data/artist_photos")
NAVIDROME_URL = os.getenv("NAVIDROME_URL", "http://navidrome:4533/navidrome")
SUBSONIC_USER = os.getenv("SUBSONIC_USER")
SUBSONIC_TOKEN = os.getenv("SUBSONIC_TOKEN")
SUBSONIC_SALT = os.getenv("SUBSONIC_SALT")
AUDIO_EXTS = ('.mp3', '.flac', '.m4a', '.ogg', '.opus', '.wav', '.aiff', '.aif')
COVER_NAMES = ('cover.jpg', 'folder.jpg', 'artwork.jpg', 'front.jpg',
'cover.png', 'folder.png', 'artwork.png', 'front.png')
# ── Database connection management ──────────────────────────────────────────
#
# CRITICAL: Python's `with sqlite3.connect(...) as c:` is a TRANSACTION
# manager only — it commits/rolls back but NEVER calls .close().
# Every bare connect() in the original code leaked a file handle until GC.
#
# get_db() fixes four issues at once:
# 1. WAL mode — readers never block writers; writers never block readers
# 2. synchronous=NORMAL — safe with WAL, ~3x faster than default FULL
# 3. busy_timeout — waits up to N seconds instead of raising immediately
# 4. Explicit close — in finally: block; no leaked handles under any path
#
# check_same_thread=False: BackgroundTasks run on a threadpool worker, not the
# asyncio thread. Each get_db() call creates its own connection so there is no
# actual cross-thread sharing — the flag disables an overly conservative check.
from contextlib import contextmanager
@contextmanager
def get_db(path: str = None, timeout: float = 10.0):
"""
Context manager for the Companion DB.
Drop-in replacement for every `with get_db() as c:`.
with get_db() as c:
rows = c.execute("SELECT ...").fetchall()
c.execute("INSERT ...")
# auto-committed and closed here
Raises automatically roll back and re-raise; caller never needs
to call c.commit() or c.close().
"""
if path is None:
path = DB_PATH
conn = sqlite3.connect(path, timeout=timeout, check_same_thread=False)
try:
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL")
conn.execute(f"PRAGMA busy_timeout={int(timeout * 1000)}")
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
@contextmanager
def get_navidrome_db(timeout: float = 10.0):
"""
Read-only context manager for Navidrome's SQLite database.
URI mode=ro: never acquires a write lock, safe while Navidrome is scanning.
Raises FileNotFoundError cleanly if the volume is not mounted.
"""
path = os.getenv("NAVIDROME_DB_PATH", "/navidrome_data/navidrome.db")
if not os.path.isfile(path):
raise FileNotFoundError(
f"Navidrome DB not found at '{path}'. "
"Check NAVIDROME_DB_PATH and your Docker volume mount."
)
uri = f"file:{path}?mode=ro"
conn = sqlite3.connect(uri, uri=True, timeout=timeout, check_same_thread=False)
try:
conn.execute(f"PRAGMA busy_timeout={int(timeout * 1000)}")
yield conn
finally:
conn.close()
# ── DB init ──────────────────────────────────────────────────────────────────
# ── Database ────────────────────────────────────────────────────────────────
def init_db():
"""
Initialise the Companion DB schema.
get_db() sets WAL mode on first open, making it persistent for the file.
All subsequent connections automatically inherit WAL mode.
"""
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
os.makedirs(COVER_ART_DIR, exist_ok=True)
os.makedirs(ARTIST_PHOTO_DIR, exist_ok=True)
with get_db() as c:
# Existing tables — untouched
c.execute("""CREATE TABLE IF NOT EXISTS dj_profiles (
file_path TEXT PRIMARY KEY, bpm REAL,
silence_start REAL, silence_end REAL, loudness_lufs REAL,
analyzed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)""")
c.execute("""CREATE TABLE IF NOT EXISTS file_index (
basename TEXT, full_path TEXT, title_words TEXT,
PRIMARY KEY (basename, full_path))""")
# Phase 1 — authoritative song metadata
c.execute("""CREATE TABLE IF NOT EXISTS songs (
id TEXT PRIMARY KEY,
full_path TEXT UNIQUE NOT NULL,
relative_path TEXT NOT NULL,
navidrome_id TEXT,
navidrome_album_id TEXT, -- Fix #13: track album reassignment
title TEXT NOT NULL DEFAULT '',
artist TEXT NOT NULL DEFAULT '',
album TEXT NOT NULL DEFAULT '',
album_artist TEXT NOT NULL DEFAULT '',
genre TEXT NOT NULL DEFAULT '',
year INTEGER,
track_number INTEGER,
disc_number INTEGER,
duration REAL,
sort_title TEXT,
sort_artist TEXT,
sort_album TEXT,
sort_album_artist TEXT,
cover_art_path TEXT,
file_size INTEGER,
file_mtime REAL,
date_added TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
date_modified TIMESTAMP)""")
c.execute("""CREATE TABLE IF NOT EXISTS artist_photos (
artist_name TEXT PRIMARY KEY,
photo_path TEXT NOT NULL,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)""")
c.execute("""CREATE TABLE IF NOT EXISTS lyrics (
artist TEXT NOT NULL,
title TEXT NOT NULL,
duration REAL,
synced_lyrics TEXT,
plain_lyrics TEXT,
source TEXT DEFAULT 'lrclib',
fetched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (artist, title))""")
c.execute("CREATE INDEX IF NOT EXISTS idx_songs_album ON songs(sort_album, disc_number, track_number)")
c.execute("CREATE INDEX IF NOT EXISTS idx_songs_artist ON songs(sort_artist, sort_album)")
c.execute("CREATE INDEX IF NOT EXISTS idx_songs_album_artist ON songs(sort_album_artist, sort_album)")
c.execute("CREATE INDEX IF NOT EXISTS idx_songs_navidrome ON songs(navidrome_id)")
c.execute("CREATE INDEX IF NOT EXISTS idx_songs_genre ON songs(genre)")
# ── Sort helpers ────────────────────────────────────────────────────────────
_ARTICLES = re.compile(r'^(the|a|an)\s+', re.IGNORECASE)
def sort_key(text: str) -> str:
if not text:
return ''
return _ARTICLES.sub('', text).lower().strip()
# ── Cover art discovery ─────────────────────────────────────────────────────
def find_cover_art(song_path: str) -> Optional[str]:
directory = os.path.dirname(song_path)
for name in COVER_NAMES:
candidate = os.path.join(directory, name)
if os.path.isfile(candidate):
return candidate
try:
for f in os.listdir(directory):
if f.lower().endswith(('.jpg', '.jpeg', '.png')):
return os.path.join(directory, f)
except OSError:
pass
# Extract embedded art and cache it
song_id = hashlib.md5(song_path.encode()).hexdigest()
cached = os.path.join(COVER_ART_DIR, f"{song_id}.jpg")
if os.path.isfile(cached):
return cached
try:
audio = MutagenFile(song_path)
if audio is None:
return None
if hasattr(audio, 'tags') and audio.tags:
for key in list(audio.tags.keys()):
if key.startswith('APIC'):
with open(cached, 'wb') as f:
f.write(audio.tags[key].data)
return cached
try:
from mutagen.mp4 import MP4
if isinstance(audio, MP4):
covers = audio.tags.get('covr', [])
if covers:
with open(cached, 'wb') as f:
f.write(bytes(covers[0]))
return cached
except Exception:
pass
if hasattr(audio, 'pictures') and audio.pictures:
with open(cached, 'wb') as f:
f.write(audio.pictures[0].data)
return cached
except Exception:
pass
return None
# ── Cover art embedding ─────────────────────────────────────────────────────
def embed_cover_art_in_file(audio_path: str, image_data: bytes, mime: str = "image/jpeg") -> bool:
"""
Embed cover art into a single audio file's metadata tags using mutagen.
Handles FLAC, MP3 (ID3), M4A/AAC (MP4), OGG/Opus (VorbisComment), and AIFF.
Returns True on success, False on failure.
"""
try:
audio = MutagenFile(audio_path)
if audio is None:
print(f" [embed-art] Unsupported format: {audio_path}", flush=True)
return False
ext = os.path.splitext(audio_path)[1].lower()
# ── FLAC ──
from mutagen.flac import FLAC
if isinstance(audio, FLAC):
from mutagen.flac import Picture
# Remove existing pictures
audio.clear_pictures()
pic = Picture()
pic.type = 3 # Cover (front)
pic.mime = mime
pic.desc = "Cover"
pic.data = image_data
audio.add_picture(pic)
audio.save()
print(f" [embed-art] FLAC embedded: {os.path.basename(audio_path)} ({len(image_data)} bytes)", flush=True)
return True
# ── MP3 / AIFF (ID3 tags) ──
from mutagen.id3 import ID3, APIC, ID3NoHeaderError
if hasattr(audio, 'tags') and audio.tags is not None:
# Check if it's an ID3-based format
is_id3 = any(key.startswith('APIC') or key.startswith('TIT2') or key.startswith('TPE1')
for key in audio.tags.keys()) or ext in ('.mp3', '.aiff', '.aif')
if is_id3:
# Remove existing APIC frames
to_remove = [k for k in audio.tags.keys() if k.startswith('APIC')]
for k in to_remove:
del audio.tags[k]
audio.tags.add(APIC(
encoding=3, # UTF-8
mime=mime,
type=3, # Cover (front)
desc='Cover',
data=image_data
))
audio.save()
print(f" [embed-art] ID3 embedded: {os.path.basename(audio_path)} ({len(image_data)} bytes)", flush=True)
return True
# ── M4A / AAC (MP4) ──
try:
from mutagen.mp4 import MP4, MP4Cover
if isinstance(audio, MP4):
fmt = MP4Cover.FORMAT_JPEG if mime == "image/jpeg" else MP4Cover.FORMAT_PNG
audio.tags['covr'] = [MP4Cover(image_data, imageformat=fmt)]
audio.save()
print(f" [embed-art] MP4 embedded: {os.path.basename(audio_path)} ({len(image_data)} bytes)", flush=True)
return True
except ImportError:
pass
# ── OGG / Opus (VorbisComment with METADATA_BLOCK_PICTURE) ──
if hasattr(audio, 'tags') and hasattr(audio.tags, 'get'):
from mutagen.flac import Picture
import base64
if ext in ('.ogg', '.opus', '.oga'):
pic = Picture()
pic.type = 3
pic.mime = mime
pic.desc = "Cover"
pic.data = image_data
encoded = base64.b64encode(pic.write()).decode('ascii')
audio["metadata_block_picture"] = [encoded]
audio.save()
print(f" [embed-art] Vorbis embedded: {os.path.basename(audio_path)} ({len(image_data)} bytes)", flush=True)
return True
print(f" [embed-art] No handler for format: {os.path.basename(audio_path)} (type={type(audio).__name__})", flush=True)
return False
except Exception as e:
print(f" [embed-art] FAILED {os.path.basename(audio_path)}: {e}", flush=True)
return False
AUDIO_EXTENSIONS = {'.flac', '.mp3', '.m4a', '.aac', '.ogg', '.opus', '.oga', '.aiff', '.aif', '.wav', '.wma'}
def embed_cover_art_in_directory(directory: str, image_data: bytes, mime: str = "image/jpeg") -> dict:
"""
Embed cover art into ALL audio files in a directory.
Returns {"succeeded": int, "failed": int, "skipped": int}.
"""
results = {"succeeded": 0, "failed": 0, "skipped": 0}
print(f" [embed-art] Embedding into all audio files in: {directory}", flush=True)
print(f" [embed-art] Image payload: {len(image_data)} bytes, MIME: {mime}", flush=True)
try:
files = sorted(os.listdir(directory))
except OSError as e:
print(f" [embed-art] Cannot list directory: {e}", flush=True)
return results
for fname in files:
ext = os.path.splitext(fname)[1].lower()
if ext not in AUDIO_EXTENSIONS:
continue
full = os.path.join(directory, fname)
if not os.path.isfile(full):
continue
ok = embed_cover_art_in_file(full, image_data, mime)
if ok:
results["succeeded"] += 1
else:
results["failed"] += 1
print(f" [embed-art] Done: {results['succeeded']} embedded, {results['failed']} failed", flush=True)
return results
# ── Tag reader ──────────────────────────────────────────────────────────────
def read_tags(full_path: str) -> dict:
"""Read all display tags from an audio file using mutagen."""
audio_easy = None
try:
audio_easy = MutagenFile(full_path, easy=True)
except Exception:
pass
# AIFF files don't support easy=True — fall back to raw ID3 tags
audio_raw = None
ext = Path(full_path).suffix.lower()
if ext in ('.aiff', '.aif') and (audio_easy is None or not audio_easy):
try:
from mutagen.aiff import AIFF
audio_raw = AIFF(full_path)
except Exception:
pass
def get_easy(key):
if audio_easy and key in audio_easy and audio_easy[key]:
return audio_easy[key][0]
return ''
def get_raw(frame_id):
"""Read a raw ID3 frame value from AIFF."""
if audio_raw and audio_raw.tags:
frame = audio_raw.tags.get(frame_id)
if frame:
return str(frame.text[0]) if hasattr(frame, 'text') and frame.text else str(frame)
return ''
def get(easy_key, raw_id=None):
val = get_easy(easy_key)
if not val and raw_id:
val = get_raw(raw_id)
return val
title = get('title', 'TIT2') or Path(full_path).stem
artist = get('artist', 'TPE1') or 'Unknown Artist'
album = get('album', 'TALB') or 'Unknown Album'
album_artist = get('albumartist', 'TPE2') or artist
genre = get('genre', 'TCON') or ''
year = None
raw_date = get('date', 'TDRC') or get('date', 'TYER')
if raw_date:
m = re.search(r'\d{4}', str(raw_date))
if m:
year = int(m.group())
track_number = None
raw_track = get('tracknumber', 'TRCK')
if raw_track:
m = re.match(r'(\d+)', str(raw_track))
if m:
track_number = int(m.group(1))
disc_number = None
raw_disc = get('discnumber', 'TPOS')
if raw_disc:
m = re.match(r'(\d+)', str(raw_disc))
if m:
disc_number = int(m.group(1))
duration = None
audio_for_info = audio_easy or audio_raw
if audio_for_info and hasattr(audio_for_info, 'info') and audio_for_info.info:
try:
duration = float(audio_for_info.info.length)
except Exception:
pass
return dict(title=title, artist=artist, album=album, album_artist=album_artist,
genre=genre, year=year, track_number=track_number,
disc_number=disc_number, duration=duration)
# ── Library scan ─────────────────────────────────────────────────────────────
def scan_library(full_rescan: bool = False) -> int:
"""Walk MUSIC_DIR and upsert every audio file into the songs table."""
print(f"Library scan started (full={full_rescan})...", flush=True)
count = skipped = 0
with get_db() as c:
for root, dirs, files in os.walk(MUSIC_DIR):
dirs[:] = [d for d in dirs if not d.startswith('.')]
for filename in files:
if not filename.lower().endswith(AUDIO_EXTS):
continue
full_path = os.path.join(root, filename)
try:
stat = os.stat(full_path)
mtime = stat.st_mtime
fsize = stat.st_size
except OSError:
continue
song_id = hashlib.md5(full_path.encode()).hexdigest()
relative = unicodedata.normalize("NFC", os.path.relpath(full_path, MUSIC_DIR))
if not full_rescan:
row = c.execute(
"SELECT file_mtime FROM songs WHERE id = ?", (song_id,)
).fetchone()
if row and row[0] and abs(row[0] - mtime) < 1.0:
skipped += 1
continue
tags = read_tags(full_path)
cover = find_cover_art(full_path)
c.execute("""INSERT OR REPLACE INTO songs (
id, full_path, relative_path,
title, artist, album, album_artist, genre,
year, track_number, disc_number, duration,
sort_title, sort_artist, sort_album, sort_album_artist,
cover_art_path, file_size, file_mtime, date_modified
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", (
song_id, full_path, relative,
tags['title'], tags['artist'], tags['album'], tags['album_artist'],
tags['genre'], tags['year'], tags['track_number'], tags['disc_number'],
tags['duration'],
sort_key(tags['title']), sort_key(tags['artist']),
sort_key(tags['album']), sort_key(tags['album_artist']),
cover, fsize, mtime, datetime.utcnow().isoformat()
))
count += 1
print(f"Library scan: {count} upserted, {skipped} unchanged", flush=True)
return count
def build_file_index():
with get_db() as c:
c.execute("DELETE FROM file_index")
count = 0
for root, _, files in os.walk(MUSIC_DIR):
for f in files:
if f.lower().endswith(AUDIO_EXTS):
fp = os.path.join(root, f)
stem = Path(f).stem.lower()
words = {w for w in re.split(r'[\s\-_\.]+', stem)
if len(w) > 1 and not w.isdigit()}
c.execute("INSERT OR REPLACE INTO file_index VALUES (?,?,?)",
(f.lower(), fp, " ".join(sorted(words))))
count += 1
print(f"File index built: {count} files", flush=True)
def update_song_in_db(full_path: str):
"""Re-read tags and update the songs row. Inserts if missing."""
song_id = hashlib.md5(full_path.encode()).hexdigest()
relative = os.path.relpath(full_path, MUSIC_DIR)
tags = read_tags(full_path)
cover = find_cover_art(full_path)
try:
stat = os.stat(full_path)
mtime = stat.st_mtime
fsize = stat.st_size
except OSError:
mtime = fsize = None
with get_db() as c:
cur = c.cursor()
cur.execute("""UPDATE songs SET
title=?, artist=?, album=?, album_artist=?, genre=?,
year=?, track_number=?, disc_number=?, duration=?,
sort_title=?, sort_artist=?, sort_album=?, sort_album_artist=?,
cover_art_path=?, file_size=?, file_mtime=?, date_modified=?
WHERE id=?""", (
tags['title'], tags['artist'], tags['album'], tags['album_artist'],
tags['genre'], tags['year'], tags['track_number'], tags['disc_number'],
tags['duration'],
sort_key(tags['title']), sort_key(tags['artist']),
sort_key(tags['album']), sort_key(tags['album_artist']),
cover, fsize, mtime, datetime.utcnow().isoformat(), song_id
))
if cur.rowcount == 0:
cur.execute("""INSERT OR REPLACE INTO songs (
id, full_path, relative_path,
title, artist, album, album_artist, genre,
year, track_number, disc_number, duration,
sort_title, sort_artist, sort_album, sort_album_artist,
cover_art_path, file_size, file_mtime, date_modified
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", (
song_id, full_path, relative,
tags['title'], tags['artist'], tags['album'], tags['album_artist'],
tags['genre'], tags['year'], tags['track_number'], tags['disc_number'],
tags['duration'],
sort_key(tags['title']), sort_key(tags['artist']),
sort_key(tags['album']), sort_key(tags['album_artist']),
cover, fsize, mtime, datetime.utcnow().isoformat()
))
# ── Startup ─────────────────────────────────────────────────────────────────
@asynccontextmanager
async def lifespan(app: FastAPI):
global _navidrome_client
_navidrome_client = httpx.AsyncClient(
timeout=httpx.Timeout(connect=10.0, read=30.0, write=10.0, pool=5.0),
limits=httpx.Limits(max_connections=5, max_keepalive_connections=2),
)
init_db()
os.makedirs(VIS_CACHE_DIR, exist_ok=True)
print(f"Companion API ready", flush=True)
print(f" MUSIC_DIR = {MUSIC_DIR}", flush=True)
print(f" DB_PATH = {DB_PATH}", flush=True)
print(f" VIS_CACHE = {VIS_CACHE_DIR}", flush=True)
print(f" COVER_ART = {COVER_ART_DIR}", flush=True)
print(f" ARTIST_PHOTO = {ARTIST_PHOTO_DIR}", flush=True)
try:
dirs = [d for d in os.listdir(MUSIC_DIR)
if os.path.isdir(os.path.join(MUSIC_DIR, d))]
print(f" MUSIC_DIR has {len(dirs)} top-level folders", flush=True)
except Exception as e:
print(f" Cannot list MUSIC_DIR: {e}", flush=True)
# Run blocking startup work in a thread so the event loop stays responsive.
# Uvicorn accepts connections during lifespan startup but cannot dispatch them
# until yield — keeping the loop unblocked allows health checks to queue properly.
await asyncio.to_thread(build_file_index)
await asyncio.to_thread(scan_library)
yield
# Graceful shutdown: close the shared httpx client
await _navidrome_client.aclose()
app = FastAPI(title="Navidrome Companion API", lifespan=lifespan)
# ── Pydantic models ──────────────────────────────────────────────────────────
class MetadataUpdate(BaseModel):
relative_path: str
title: Optional[str] = None
artist: Optional[str] = None
album: Optional[str] = None
album_artist: Optional[str] = None
genre: Optional[str] = None
year: Optional[int] = None
track_number: Optional[int] = None
class BatchMetadataUpdate(BaseModel):
relative_paths: List[str]
title: Optional[str] = None
artist: Optional[str] = None
album: Optional[str] = None
album_artist: Optional[str] = None
genre: Optional[str] = None
year: Optional[int] = None
class FixConflictRequest(BaseModel):
action: str
fix_data: dict = {}
class TrackUploadMeta(BaseModel):
filename: str
title: str
artist: str
album: str
track_number: Optional[int] = None
genre: Optional[str] = None
year: Optional[int] = None
album_artist: Optional[str] = None
# ── Push manager ─────────────────────────────────────────────────────────────
class PushManager:
def __init__(self):
self.connections: list[WebSocket] = []
async def connect(self, ws: WebSocket):
await ws.accept()
self.connections.append(ws)
# Only log when first client connects (not every reconnect cycle)
if len(self.connections) == 1:
print(f"Push: client connected ({len(self.connections)} total)", flush=True)
def disconnect(self, ws: WebSocket):
if ws in self.connections:
self.connections.remove(ws)
# Only log when all clients disconnected (not every reconnect cycle)
if len(self.connections) == 0:
print("Push: all clients disconnected", flush=True)
async def broadcast(self, event: str, data: dict):
msg = json.dumps({"event": event, "data": data})
# Iterate a snapshot so concurrent broadcast() coroutines that both
# detect the same dead socket don't race on list.remove() (AUDIT-018).
# disconnect() uses `if ws in` guard so double-removal is safe.
for ws in list(self.connections):
try:
await ws.send_text(msg)
except Exception:
self.disconnect(ws)
async def send_to(self, ws: WebSocket, event: str, data: dict):
await ws.send_text(json.dumps({"event": event, "data": data}))
push = PushManager()
# Strong-reference set for fire-and-forget tasks (AUDIT-013).
# _create_task() returns a Task that the GC can silently collect and
# cancel mid-execution if no reference is held. Store tasks here; the done
# callback removes them automatically when the task completes or raises.
_background_tasks: set = set()
def _create_task(coro):
"""
Safe replacement for bare _create_task().
Keeps a strong reference until the task finishes so the GC cannot
cancel it prematurely. Logs unhandled exceptions instead of silencing them.
"""
task = asyncio.create_task(coro) #hotfix
_background_tasks.add(task)
def _on_done(t):
_background_tasks.discard(t)
if not t.cancelled() and t.exception() is not None:
import traceback
print(f"[background task error] {t.get_name()}:", flush=True)
traceback.print_exception(type(t.exception()), t.exception(),
t.exception().__traceback__)
task.add_done_callback(_on_done)
return task
# ── Path resolution ──────────────────────────────────────────────────────────
def resolve_path(relative: str) -> Optional[str]:
"""
Resolve a Navidrome-relative path to an absolute filesystem path.
Resolution order:
1. Direct join with MUSIC_DIR — works when paths match exactly
2. Strip leading path components — handles sub-library prefixes
3. Companion songs table lookup by relative_path — handles Picard
renames where Navidrome path no longer matches disk structure.
This is the key fix: uses album+artist context so two files with
the same title (e.g. 'In All the Wrong Places') resolve correctly.
4. Exact filename match on disk — last resort before fuzzy
5. Fuzzy title match — lowest confidence, only when nothing else works
"""
decoded = relative
for _ in range(5):
next_d = unquote(decoded)
if next_d == decoded:
break
decoded = next_d
cleaned = decoded.lstrip("/")
print(f" resolve_path: '{relative}' -> '{cleaned}'", flush=True)
# 1. Direct path join
direct = os.path.join(MUSIC_DIR, cleaned)
if os.path.isfile(direct):
return direct
# 2. Strip leading path components (handles library folder prefix)
parts = Path(cleaned).parts
for i in range(1, len(parts)):
sub = os.path.join(MUSIC_DIR, *parts[i:])
if os.path.isfile(sub):
return sub
# 3. Companion songs table — look up by relative_path.
# Also tries matching on title+album+artist to disambiguate files
# with identical names in different albums (e.g. compilation tracks).
try:
with get_db() as c:
# First try exact relative_path match
row = c.execute(
"SELECT full_path FROM songs WHERE relative_path = ?", (cleaned,)
).fetchone()
if row and os.path.isfile(row[0]):
print(f" resolve: songs table exact -> {os.path.basename(row[0])}", flush=True)
return row[0]
# Try normalised path (NFC unicode)
nfc = unicodedata.normalize("NFC", cleaned)
row = c.execute(
"SELECT full_path FROM songs WHERE relative_path = ?", (nfc,)
).fetchone()
if row and os.path.isfile(row[0]):
print(f" resolve: songs table NFC -> {os.path.basename(row[0])}", flush=True)
return row[0]
# Extract context from path: Artist/Album/filename
path_parts = Path(cleaned).parts # e.g. ('Artist', 'Album', 'file.flac')
if len(path_parts) >= 3:
path_artist = path_parts[0]
path_album = path_parts[1]
filename = path_parts[-1]
stem = Path(filename).stem.lower()
ext = Path(filename).suffix.lower()
# Strip leading track number from stem for matching
clean_stem = re.sub(r'^\d{1,2}[-\s\.]+\d{0,2}[-\s\.]*', '', stem).strip()
clean_stem = re.sub(r'^\d{1,2}[-\s\.]+', '', clean_stem).strip()
# Match by title similarity + artist/album folder context
rows = c.execute(
"""SELECT full_path, relative_path FROM songs
WHERE full_path LIKE ? AND sort_title LIKE ?""",
(f'%{ext}', f'%{clean_stem[:6]}%')
).fetchall()
best_fp, best_score = None, 0.0
for fp, rp in rows:
rp_parts = Path(rp).parts
if len(rp_parts) < 2:
continue
# Score: title match + artist folder match + album folder match
score = 0.0
rp_stem = re.sub(r'^\d{1,2}[-\s\.]+', '', Path(rp).stem.lower()).strip()
if clean_stem and rp_stem:
words_q = set(re.split(r'[\s\-_\.]+', clean_stem))
words_r = set(re.split(r'[\s\-_\.]+', rp_stem))
if words_q:
score += len(words_q & words_r) / len(words_q) * 0.6
# Bonus for matching artist folder
if path_artist.lower()[:4] in rp_parts[0].lower():
score += 0.2
# Bonus for matching album folder
if path_album.lower()[:4] in (rp_parts[1].lower() if len(rp_parts) > 1 else ''):
score += 0.2
if score > best_score and os.path.isfile(fp):
best_score = score
best_fp = fp
if best_fp and best_score >= 0.7:
print(f" resolve: songs table context ({best_score:.0%}) -> {os.path.basename(best_fp)}", flush=True)
return best_fp
except Exception as e:
print(f" resolve: songs table error: {e}", flush=True)
# 4. Exact filename match on disk
target = os.path.basename(cleaned)
if target:
for root, _, files in os.walk(MUSIC_DIR):
if target in files:
found = os.path.join(root, target)
print(f" resolve: exact filename -> {found}", flush=True)
return found
# 5. Fuzzy title match (lowest confidence — last resort)
target_stem = Path(target).stem.lower() if target else ""
target_ext = Path(target).suffix.lower() if target else ""
title_part = re.sub(r'^\d+[\s\.\-]+', '', target_stem).strip()
words = {w for w in re.split(r'[\s\-_\.]+', title_part)
if len(w) > 1 and not w.isdigit()}
if words:
try:
with get_db() as c:
rows = c.execute(
"SELECT basename, full_path, title_words FROM file_index"
).fetchall()
best, best_score = None, 0.0
for basename, full_path, title_words_str in rows:
if not basename.endswith(target_ext):
continue
fw = set(title_words_str.split())
score = len(words & fw) / len(words) if fw else 0
if score > best_score:
best_score = score
best = full_path
if best and best_score >= 0.5:
print(f" resolve: fuzzy ({best_score:.0%}) -> {os.path.basename(best)}", flush=True)
return best
except Exception as e:
print(f" resolve: fuzzy error: {e}", flush=True)
print(f" resolve_path: FAILED for '{cleaned}'", flush=True)
return None
# ── Navidrome HTTP client ────────────────────────────────────────────────────
# Single shared AsyncClient reuses the TCP connection to Navidrome across all
# trigger_scan() calls (AUDIT-020). Previously a new client — and therefore a
# new connection — was created on every call, adding DNS + TCP + TLS overhead
# on every metadata edit and upload, which is especially costly over Tailscale.
#
# The client is initialised in lifespan() and closed on shutdown.
_navidrome_client: httpx.AsyncClient = None # set in lifespan
async def trigger_scan():
if not all([SUBSONIC_USER, SUBSONIC_TOKEN, SUBSONIC_SALT]):
print("Subsonic credentials not set - skipping scan")
return
params = {"u": SUBSONIC_USER, "t": SUBSONIC_TOKEN, "s": SUBSONIC_SALT,
"v": "1.16.1", "c": "CompanionAPI", "f": "json"}
client = _navidrome_client
if client is None or client.is_closed:
# Fallback: create a one-shot client if the shared one isn't ready
async with httpx.AsyncClient(timeout=10) as _c:
try:
r = await _c.get(f"{NAVIDROME_URL}/rest/startScan.view", params=params)
print(f"Navidrome scan (fallback client): {r.status_code}")
except Exception as e:
print(f"Scan failed: {e}")
return
try:
r = await client.get(f"{NAVIDROME_URL}/rest/startScan.view", params=params,
timeout=10)
print(f"Navidrome scan: {r.status_code}")
except Exception as e:
print(f"Scan failed: {e}")
async def sync_navidrome_ids_task():
"""
Fetch all songs from Navidrome and match them into our songs table.
Matching strategy (tried in order per song):
1. title + artist — primary, both read from same ID3 tags
2. title + album — fallback when artist field differs
3. title only — fallback for unique titles
4. duration bucket — last resort (±2s tolerance, unique per bucket)
"""
try:
if not all([SUBSONIC_USER, SUBSONIC_TOKEN, SUBSONIC_SALT]):
print("Subsonic credentials not set - cannot sync IDs")
return
print(f"Syncing Navidrome IDs... URL={NAVIDROME_URL}", flush=True)
# ── Fetch all songs from Navidrome ────────────────────────────────────
base_params = {
"u": SUBSONIC_USER, "t": SUBSONIC_TOKEN, "s": SUBSONIC_SALT,
"v": "1.16.1", "c": "CompanionAPI", "f": "json",
"albumCount": 0, "artistCount": 0, "songCount": 500, "query": ""
}
all_songs = []
offset = 0
# Reuse the shared client; 60s read timeout for large library pagination
http = _navidrome_client
while True:
try:
r = await http.get(
f"{NAVIDROME_URL}/rest/search3.view",
params={**base_params, "songOffset": offset},
timeout=httpx.Timeout(connect=10.0, read=60.0, write=10.0, pool=5.0)
)
resp = r.json().get("subsonic-response", {})
if resp.get("status") == "failed":
print(f" Navidrome auth error: {resp.get('error')}", flush=True)
return
songs = resp.get("searchResult3", {}).get("song", [])
print(f" Page offset={offset}: {len(songs)} songs", flush=True)
if not songs:
break
all_songs.extend(songs)
offset += len(songs)
if len(songs) < 500:
break
except Exception as e:
print(f" Navidrome fetch error: {e}", flush=True)
break
print(f" Navidrome total: {len(all_songs)} songs", flush=True)
if not all_songs:
return
# ── Show first 3 Navidrome songs for diagnosis ────────────────────────
for ns in all_songs[:3]:
print(f" ND sample: title={repr(ns.get('title',''))} "
f"artist={repr(ns.get('artist',''))} "
f"album={repr(ns.get('album',''))} "
f"duration={ns.get('duration')}", flush=True)
# ── Build lookup tables from our DB ───────────────────────────────────
def norm(s: str) -> str:
"""Lowercase, strip, NFC-normalize."""
return unicodedata.normalize("NFC", (s or "").lower().strip())
def clean_title(s: str) -> str:
"""Strip leading track/disc number prefix from filename-derived titles.
e.g. '09 Careless' -> 'careless', '01-02 Song' -> 'song'
"""
s = norm(s)
# Strip patterns like "09 ", "09 - ", "1-02 ", "01. " etc
s = re.sub(r'^\d{1,2}[-\s\.]+\d{0,2}[-\s\.]*', '', s).strip()
s = re.sub(r'^\d{1,2}[-\s\.]+', '', s).strip()
return s
def dur_bucket(seconds) -> Optional[int]:
"""Round to nearest 2-second bucket for fuzzy duration matching."""
if seconds is None:
return None
return int(round(float(seconds) / 2.0))
with get_db() as c:
cur = c.cursor()
db_rows = cur.execute(
"SELECT id, title, artist, album, duration FROM songs"
).fetchall()
total_db = len(db_rows)
print(f" DB songs total: {total_db}", flush=True)
# Show first 3 DB songs for comparison
for row in db_rows[:3]:
print(f" DB sample: title={repr(row[1])} "
f"artist={repr(row[2])} "
f"album={repr(row[3])} "
f"duration={row[4]}", flush=True)
# Strategy 1: title + artist
by_title_artist: dict[tuple, str] = {}
# Strategy 2: title + album
by_title_album: dict[tuple, str] = {}
# Strategy 3: title only (only stored if unique)
by_title: dict[str, Optional[str]] = {}
# Strategy 4: duration bucket + first 8 chars of title (unique)
by_dur: dict[tuple, Optional[str]] = {}
# Strategy 5: clean_title + artist (strips track number prefix)
by_clean_artist: dict[tuple, Optional[str]] = {}
# Strategy 6: duration only within ±2s (unique per bucket)
by_dur_only: dict[int, Optional[str]] = {}
for song_id, title, artist, album, duration in db_rows:
nt = norm(title)
na = norm(artist)
nb = norm(album)
ct = clean_title(title)
dk = dur_bucket(duration)
k1 = (nt, na)
if k1 not in by_title_artist:
by_title_artist[k1] = song_id
k2 = (nt, nb)
if k2 not in by_title_album:
by_title_album[k2] = song_id
if nt in by_title:
by_title[nt] = None
else:
by_title[nt] = song_id
if dk is not None:
k4 = (dk, nt[:8])
if k4 in by_dur:
by_dur[k4] = None
else:
by_dur[k4] = song_id
k5 = (ct, na)
if k5 not in by_clean_artist:
if k5 in by_clean_artist:
by_clean_artist[k5] = None
else:
by_clean_artist[k5] = song_id
else:
by_clean_artist[k5] = None # ambiguous
if dk is not None:
if dk in by_dur_only:
by_dur_only[dk] = None
else:
by_dur_only[dk] = song_id
# Strategy 7: clean_title + duration bucket (catches untagged files
# where artist is missing but filename title + duration uniquely identify the song)
by_clean_dur: dict[tuple, Optional[str]] = {}
for song_id, title, artist, album, duration in db_rows:
ct = clean_title(title)
dk = dur_bucket(duration)
if dk is not None:
k7 = (ct, dk)
if k7 in by_clean_dur:
by_clean_dur[k7] = None # ambiguous
else:
by_clean_dur[k7] = song_id
print(f" Lookups: title+artist={len(by_title_artist)} "
f"title+album={len(by_title_album)} "
f"title_only={sum(1 for v in by_title.values() if v)} "
f"duration={sum(1 for v in by_dur.values() if v)} "
f"clean+artist={sum(1 for v in by_clean_artist.values() if v)} "
f"dur_only={sum(1 for v in by_dur_only.values() if v)} "
f"clean+dur={sum(1 for v in by_clean_dur.values() if v)}", flush=True)
matched_s1 = matched_s2 = matched_s3 = matched_s4 = 0
matched_s5 = matched_s6 = matched_s7 = unmatched = 0
unmatched_samples = []
# Build the update list entirely in Python (pure dict lookups — fast, no I/O),
# then write to SQLite in a single executemany call.
# This avoids holding the DB connection open for the entire iteration AND
# never blocks the event loop with thousands of individual execute() calls
# without yielding (AUDIT-023).
updates = [] # list of (navidrome_id, navidrome_album_id, companion_song_id)
for ns in all_songs:
nd_id = ns.get("id", "")
if not nd_id:
continue
nt = norm(ns.get("title", ""))
na = norm(ns.get("artist", ""))
nb = norm(ns.get("album", ""))
ct = clean_title(ns.get("title", ""))
dk = dur_bucket(ns.get("duration"))
db_song_id = None
strategy = 0
if not db_song_id:
hit = by_title_artist.get((nt, na))
if hit: db_song_id, strategy = hit, 1
if not db_song_id:
hit = by_title_album.get((nt, nb))
if hit: db_song_id, strategy = hit, 2
if not db_song_id:
hit = by_title.get(nt)
if hit: db_song_id, strategy = hit, 3
if not db_song_id and dk is not None:
hit = by_dur.get((dk, nt[:8]))
if hit: db_song_id, strategy = hit, 4
if not db_song_id:
hit = by_clean_artist.get((ct, na))
if hit: db_song_id, strategy = hit, 5
if not db_song_id and dk is not None:
hit = by_dur_only.get(dk)
if hit: db_song_id, strategy = hit, 6
if not db_song_id and dk is not None:
hit = by_clean_dur.get((ct, dk))
if hit: db_song_id, strategy = hit, 7
if db_song_id:
nd_album_id = ns.get("albumId", "")
updates.append((nd_id, nd_album_id, db_song_id))
if strategy == 1: matched_s1 += 1
elif strategy == 2: matched_s2 += 1
elif strategy == 3: matched_s3 += 1
elif strategy == 4: matched_s4 += 1
elif strategy == 5: matched_s5 += 1
elif strategy == 6: matched_s6 += 1
else: matched_s7 += 1
else:
unmatched += 1
if len(unmatched_samples) < 10:
unmatched_samples.append(
f"title={repr(ns.get('title',''))} "
f"artist={repr(ns.get('artist',''))} "
f"duration={ns.get('duration')}"
)
# Single batched write — one connection open, one executemany, one commit
with get_db() as c:
c.executemany(
"UPDATE songs SET navidrome_id = ?, navidrome_album_id = ? WHERE id = ?",
updates
)
print(f" Wrote {len(updates)} navidrome_id matches to DB", flush=True)
total_matched = matched_s1+matched_s2+matched_s3+matched_s4+matched_s5+matched_s6+matched_s7
print(f"Navidrome ID sync complete: {total_matched}/{len(all_songs)} matched", flush=True)
print(f" Strategy breakdown: "
f"title+artist={matched_s1} title+album={matched_s2} "
f"title_only={matched_s3} dur+prefix={matched_s4} "
f"clean+artist={matched_s5} dur_only={matched_s6} "
f"clean+dur={matched_s7} unmatched={unmatched}", flush=True)
if unmatched_samples:
print(f" Unmatched samples:", flush=True)
for s in unmatched_samples:
print(f" {s}", flush=True)
except Exception as e:
import traceback
print(f"sync_navidrome_ids_task FAILED: {e}", flush=True)
traceback.print_exc()
# ── Picard tag cleanup ───────────────────────────────────────────────────────
# The ONLY tags Navidrome reads and uses. Everything else is noise that can
# cause album splitting, wrong grouping, and other library issues.
# Using a whitelist (keep only these) is safer than a blacklist (remove known bad ones)
# because it handles any future Picard tags automatically.
# ── Tag Backup System ────────────────────────────────────────────────────────
# Snapshots all tags before any destructive write. Enables per-file and
# batch-level undo. Backups are JSON files keyed by MD5 of the file path.
# Batch edits also create a manifest linking all affected files for bulk undo.
os.makedirs(TAG_BACKUP_DIR, exist_ok=True)
def _backup_key(full_path: str) -> str:
return hashlib.md5(full_path.encode()).hexdigest()
def _backup_key_rel(relative_path: str) -> str:
"""Secondary key based on relative path — survives full_path changes from restructure."""
return "rel_" + hashlib.md5(relative_path.encode()).hexdigest()
def backup_tags(full_path: str) -> Optional[str]:
"""Snapshot all current tags to a JSON file before any write.
Creates TWO backup files with different keys:
1. Keyed by full_path (fast lookup when path hasn't changed)
2. Keyed by relative_path (fallback after restructure moves the file)
Both files contain identical data. This ensures undo works even after
/bulk-fix restructures files to new directories.
Handles AIFF files specially (easy=True returns None for AIFF).
Returns the primary backup file path, or None on failure.
"""
try:
ext = Path(full_path).suffix.lower()
if ext in ('.aiff', '.aif'):
tags = _read_aiff_tags_for_backup(full_path)
if tags is None:
return None
else:
audio = MutagenFile(full_path, easy=True)
if audio is None:
return None
tags = {k: v for k, v in audio.items()}
relative = os.path.relpath(full_path, MUSIC_DIR)
backup = {
"path": full_path,
"relative_path": relative,
"filename": os.path.basename(full_path),
"timestamp": datetime.utcnow().isoformat(),
"tags": tags,
}
data = json.dumps(backup, indent=2)
# Primary key: full absolute path
primary_path = os.path.join(TAG_BACKUP_DIR, f"{_backup_key(full_path)}.json")
with open(primary_path, 'w') as f:
f.write(data)
# Secondary key: relative path (survives restructure)
secondary_path = os.path.join(TAG_BACKUP_DIR, f"{_backup_key_rel(relative)}.json")
with open(secondary_path, 'w') as f:
f.write(data)
return primary_path
except Exception as e:
print(f" backup_tags FAILED for {os.path.basename(full_path)}: {e}", flush=True)
return None
def _read_aiff_tags_for_backup(full_path: str) -> Optional[dict]:
"""Read AIFF tags via raw ID3 frames and return as easy-mode-style dict.
Returns None if the file can't be read."""
try:
from mutagen.aiff import AIFF
audio = AIFF(full_path)
if audio.tags is None:
return {}
# Map raw ID3 frames to easy-mode key names
frame_to_easy = {
'TIT2': 'title',
'TPE1': 'artist',
'TALB': 'album',
'TPE2': 'albumartist',
'TRCK': 'tracknumber',
'TPOS': 'discnumber',
'TDRC': 'date',
'TYER': 'date',
'TCON': 'genre',
'TCOM': 'composer',
}
tags = {}
for frame_id, easy_key in frame_to_easy.items():
frame = audio.tags.get(frame_id)
if frame:
# ID3 text frames store values as lists
val = [str(t) for t in frame.text] if hasattr(frame, 'text') else [str(frame)]
if val and val[0]:
tags[easy_key] = val
return tags
except Exception as e:
print(f" _read_aiff_tags FAILED for {os.path.basename(full_path)}: {e}", flush=True)
return None
def _find_backup(full_path: str, original_path: str = None) -> Optional[str]:
"""Find the backup JSON for a file, trying multiple key strategies.
Lookup order:
1. Current full_path key (fast — no restructure happened)
2. Current relative_path key (file moved but relative_path recalculated)
3. Original full_path key (manifest stored the old path, backup keyed to it)
4. Original relative_path key (old relative path from manifest)
Returns the backup file path, or None if not found.
"""
# 1. Current full_path
bp = os.path.join(TAG_BACKUP_DIR, f"{_backup_key(full_path)}.json")
if os.path.exists(bp):
return bp
# 2. Current relative_path
try:
rel = os.path.relpath(full_path, MUSIC_DIR)
bp = os.path.join(TAG_BACKUP_DIR, f"{_backup_key_rel(rel)}.json")
if os.path.exists(bp):
return bp
except ValueError:
pass
if original_path and original_path != full_path:
# 3. Original full_path
bp = os.path.join(TAG_BACKUP_DIR, f"{_backup_key(original_path)}.json")
if os.path.exists(bp):
return bp
# 4. Original relative_path
try:
old_rel = os.path.relpath(original_path, MUSIC_DIR)
bp = os.path.join(TAG_BACKUP_DIR, f"{_backup_key_rel(old_rel)}.json")
if os.path.exists(bp):
return bp
except ValueError:
pass
return None
def restore_tags_from_backup(full_path: str, original_path: str = None) -> bool:
"""Restore tags from the most recent backup for this file.
Uses _find_backup() to locate the backup JSON, trying multiple key strategies
(current path, relative path, original path). This ensures undo works even
after restructure has moved the file to a different directory.
IMPORTANT: Only overwrites text tags (title, artist, album, etc.) that
easy=True exposes. Does NOT touch binary frames like APIC (album art),
USLT (lyrics), or COMM (comments). Previous version used audio.delete()
which nuked everything including embedded art — that was a data-loss bug.
"""
backup_path = _find_backup(full_path, original_path=original_path)
if not backup_path:
print(f" restore: no backup found for {os.path.basename(full_path)}"
f" (original: {os.path.basename(original_path) if original_path else 'none'})", flush=True)
return False
try:
with open(backup_path) as f:
backup = json.load(f)
ext = Path(full_path).suffix.lower()
if ext in ('.aiff', '.aif'):
return _restore_aiff_from_backup(full_path, backup)
audio = MutagenFile(full_path, easy=True)
if audio is None:
return False
backup_tags_dict = backup.get("tags", {})
# 1. Remove any easy-mode tags currently on file that AREN'T in the backup.
# This undoes tags that were ADDED by the edit.
# Only touches easy-namespace keys — APIC, USLT, COMM are untouched.
current_keys = list(audio.keys())
for k in current_keys:
if k not in backup_tags_dict:
try:
del audio[k]
except Exception:
pass
# 2. Set every tag from the backup (overwrites edited values, restores deleted ones).
for k, v in backup_tags_dict.items():
try:
audio[k] = v
except Exception:
pass
audio.save()
return True
except Exception as e:
print(f" restore_tags FAILED for {os.path.basename(full_path)}: {e}", flush=True)
return False
def _restore_aiff_from_backup(full_path: str, backup: dict) -> bool:
"""Restore AIFF tags from backup using raw ID3 frames."""
from mutagen.aiff import AIFF
from mutagen.id3 import TIT2, TPE1, TALB, TPE2, TRCK, TPOS, TDRC, TCON
try:
audio = AIFF(full_path)
if audio.tags is None:
audio.add_tags()
tags = audio.tags
# Map easy-mode names back to ID3 frames
frame_map = {
'title': lambda v: TIT2(encoding=3, text=v),
'artist': lambda v: TPE1(encoding=3, text=v),
'album': lambda v: TALB(encoding=3, text=v),
'albumartist': lambda v: TPE2(encoding=3, text=v),
'tracknumber': lambda v: TRCK(encoding=3, text=v),
'discnumber': lambda v: TPOS(encoding=3, text=v),
'date': lambda v: TDRC(encoding=3, text=v),
'genre': lambda v: TCON(encoding=3, text=v),
}
for k, v in backup.get("tags", {}).items():
k_lower = k.lower()
if k_lower in frame_map:
val = v if isinstance(v, list) else [v]
tags.add(frame_map[k_lower](val))
audio.save()
return True
except Exception as e:
print(f" restore_aiff FAILED for {os.path.basename(full_path)}: {e}", flush=True)
return False
def save_batch_manifest(batch_id: str, paths: list, tags_changed: dict = None,
affected_albums: list = None, affected_artists: list = None,
edit_type: str = "batch"):
"""Save a manifest of a batch (or single) edit for undo + history UI.
Args:
batch_id: unique identifier for this edit
paths: list of absolute file paths that were modified
tags_changed: dict of field→value that was applied (e.g. {"genre": "Rock"})
affected_albums: list of album names touched (for UI display)
affected_artists: list of artist names touched (for UI display)
edit_type: "batch", "single", or "restructure"
"""
manifest = {
"batch_id": batch_id,
"timestamp": datetime.utcnow().isoformat(),
"file_count": len(paths),
"paths": paths,
"tags_changed": tags_changed or {},
"affected_albums": list(set(affected_albums or [])),
"affected_artists": list(set(affected_artists or [])),
"edit_type": edit_type,
"is_reverted": False,
}
manifest_path = os.path.join(TAG_BACKUP_DIR, f"batch_{batch_id}.json")
with open(manifest_path, 'w') as f:
json.dump(manifest, f, indent=2)
return manifest_path
def validate_essential_tags(full_path: str, had_artist: bool, had_album: bool, had_title: bool) -> list:
"""Check that essential tags weren't destroyed. Returns list of problems."""
problems = []
try:
audio = MutagenFile(full_path, easy=True)
if audio is None:
return ["unsupported format"]
now_artist = bool(audio.get('artist', [''])[0].strip())
now_album = bool(audio.get('album', [''])[0].strip())
now_title = bool(audio.get('title', [''])[0].strip())
if had_artist and not now_artist:
problems.append("artist lost")
if had_album and not now_album:
problems.append("album lost")
if had_title and not now_title:
problems.append("title lost")
except:
pass
return problems
NAVIDROME_TAGS = {
'TITLE', 'ARTIST', 'ALBUM', 'ALBUMARTIST',
'TRACKNUMBER', 'DISCNUMBER', 'DATE',
'GENRE', 'COMPOSER', 'LYRICS', 'COMMENT', 'ISRC',
'REPLAYGAIN_TRACK_GAIN', 'REPLAYGAIN_TRACK_PEAK',
'REPLAYGAIN_ALBUM_GAIN', 'REPLAYGAIN_ALBUM_PEAK',
}
# ID3v2 frame IDs that correspond to the Vorbis Comment names above.
# enforce_tag_whitelist uses NAVIDROME_TAGS (Vorbis names) for FLAC,
# but MP3 files use raw ID3 frame IDs — without this mapping, EVERY
# MP3 tag gets deleted because "TPE1" != "ARTIST".
ID3_FRAME_WHITELIST = {
'TIT2', # TITLE
'TPE1', # ARTIST
'TALB', # ALBUM
'TPE2', # ALBUMARTIST
'TRCK', # TRACKNUMBER
'TPOS', # DISCNUMBER
'TDRC', # DATE (ID3v2.4)
'TYER', # DATE (ID3v2.3 legacy)
'TCON', # GENRE
'TCOM', # COMPOSER
'COMM', # COMMENT
'TSRC', # ISRC
'APIC', # Album art — always keep
'USLT', # LYRICS (unsynced)
'SYLT', # LYRICS (synced)
'TXXX', # User-defined (checked by sub-key below)
}
# Keep the blacklist for reference / legacy clean-tags endpoint
PICARD_TAGS_TO_REMOVE = {
'MUSICBRAINZ_TRACKID', 'MUSICBRAINZ_ALBUMID', 'MUSICBRAINZ_RELEASETRACKID',
'MUSICBRAINZ_RELEASEGROUPID', 'MUSICBRAINZ_ALBUMARTISTID', 'MUSICBRAINZ_ARTISTID',
'MUSICBRAINZ_WORKID', 'MUSICBRAINZ_ALBUMCOMMENT', 'MUSICBRAINZ_ALBUMSTATUS',
'MUSICBRAINZ_ALBUMTYPE', 'ACOUSTID_ID', 'ACOUSTID_FINGERPRINT',
'RELEASECOUNTRY', 'RELEASESTATUS', 'RELEASETYPE', 'RELEASETIME',
'BARCODE', 'ASIN', 'CATALOGNUMBER', 'SCRIPT',
'ARTISTS', 'ARTIST_CREDIT', 'ALBUMARTIST_CREDIT',
'TOTALTRACKS', 'TRACKTOTAL', 'TOTALDISCS', 'DISCTOTAL',
'DISCC', 'TRACKC', 'DISC', 'TRACK', 'COMPOSERSORT',
'ENCODER', 'ENCODEDBY', 'GROUPING', 'PUBLISHER', 'DESCRIPTION',
'DISCSUBTITLE', 'LYRICIST', 'ARRANGER',
'OFFICIAL_AUDIO_SOURCE_URL', 'OFFICIAL_AUDIO_FILE_URL',
'DIGEST', 'FILETYPE', 'UPC', 'TYPE',
'ALBUM ARTIST', 'ALBUM_ARTIST', 'BPM',
}
def enforce_tag_whitelist(
full_path: str,
preserve_composer: bool = True,
preserve_lyrics: bool = True,
dry_run: bool = False,
) -> dict:
"""
Whitelist approach: keep ONLY tags in NAVIDROME_TAGS, delete everything else.
- preserve_composer: keep COMPOSER if non-empty (default True for edits, False for uploads)
- preserve_lyrics: keep LYRICS if non-empty (default True for edits, False for uploads)
- dry_run: return what would be removed without writing
Existing ReplayGain values are always preserved.
"""
allowed = set(NAVIDROME_TAGS)
if not preserve_composer:
allowed.discard('COMPOSER')
if not preserve_lyrics:
allowed.discard('LYRICS')
result = {"path": full_path, "removed": [], "kept": [], "error": None}
ext = Path(full_path).suffix.lower()
try:
if ext == '.flac':
from mutagen.flac import FLAC
f = FLAC(full_path)
to_remove = []
for k in list(f.keys()):
ku = k.upper()
if ku not in allowed:
to_remove.append(k)
# Never remove non-empty COMPOSER/LYRICS even if not in allowed
# (preserve existing values the user may have set manually)
elif ku == 'COMPOSER' and not f[k][0].strip():
to_remove.append(k) # blank composer — remove
elif ku == 'LYRICS' and not f[k][0].strip():
to_remove.append(k) # blank lyrics — remove
result["removed"] = to_remove
result["kept"] = [k for k in f.keys() if k not in to_remove]
if not dry_run and to_remove:
for k in to_remove:
del f[k]
f.save()
else:
audio = MutagenFile(full_path, easy=False)
if audio is None:
result["error"] = "Unsupported format"
return result
# For MP3/ID3 files, check raw frame IDs against ID3_FRAME_WHITELIST
# (not NAVIDROME_TAGS, which uses Vorbis Comment names like "ARTIST"
# that don't match ID3 frame IDs like "TPE1").
allowed_rg = {
'REPLAYGAIN_TRACK_GAIN', 'REPLAYGAIN_TRACK_PEAK',
'REPLAYGAIN_ALBUM_GAIN', 'REPLAYGAIN_ALBUM_PEAK',
}
to_remove = []
for k in list(audio.keys()):
# ID3 keys look like "TPE1", "TXXX:replaygain_track_gain",
# "APIC:", "COMM::eng", "USLT::eng"
frame_id = k.split(':')[0].strip().upper()
if frame_id in ID3_FRAME_WHITELIST:
# Frame type is allowed — but for TXXX, also check the
# sub-key (description) against known ReplayGain names
if frame_id == 'TXXX':
desc = k.split(':', 1)[1].upper() if ':' in k else ''
if desc not in allowed_rg:
to_remove.append(k)
# Otherwise keep it
else:
to_remove.append(k)
to_remove = list(set(to_remove))
result["removed"] = to_remove
result["kept"] = [k for k in audio.keys() if k not in to_remove]
if not dry_run and to_remove:
for k in to_remove:
try: del audio[k]
except Exception: pass
audio.save()
except Exception as e:
result["error"] = str(e)
return result
def clean_picard_tags(full_path: str, dry_run: bool = False) -> dict:
"""Legacy blacklist cleaner — now delegates to whitelist enforcer."""
return enforce_tag_whitelist(full_path, dry_run=dry_run)
# ── Metadata helpers ─────────────────────────────────────────────────────────
def sanitize(name: str) -> str:
"""Strip characters that are unsafe in folder/file names."""
# Remove: / : * ? " < > | \ and control characters
cleaned = re.sub(r'[/\\:*?"<>|]', '', name)
# Collapse multiple spaces, strip leading/trailing whitespace and dots
cleaned = re.sub(r'\s+', ' ', cleaned).strip().strip('.')
return cleaned or 'Unknown'
def build_target_path(full_path: str) -> Optional[str]:
"""
Compute the canonical target path for a file based on its current ID3 tags.
Structure:
MUSIC_DIR/Album Artist/Album/TT Song Title.ext (single disc)
MUSIC_DIR/Album Artist/Album/DD-TT Song Title.ext (multi disc)
- Album Artist tag is used for the top-level folder.
Falls back to Artist if Album Artist is blank.
- Track number zero-padded to 2 digits.
- Disc prefix only when disc > 1.
- Unsafe characters stripped from all components.
"""
try:
audio = MutagenFile(full_path, easy=True)
if audio is None:
return None
def get(key):
return audio[key][0] if key in audio and audio[key] else ''
title = sanitize(get('title') or Path(full_path).stem)
album_artist = sanitize(get('albumartist') or get('artist') or 'Unknown Artist')
album = sanitize(get('album') or 'Unknown Album')
ext = Path(full_path).suffix.lower()
# Track number — strip /total if present (e.g. "3/12" -> 3)
track_num = 0
raw_track = get('tracknumber')
if raw_track:
m = re.match(r'(\d+)', raw_track)
if m:
track_num = int(m.group(1))
# Disc number
disc_num = 0
raw_disc = get('discnumber')
if raw_disc:
m = re.match(r'(\d+)', raw_disc)
if m:
disc_num = int(m.group(1))
# Fix #6: Disc sanity check.
# If discnumber > 1 but no siblings in the same directory already have
# disc-prefixed filenames (e.g. 02-XX), treat this as disc 1.
# Prevents compilation tracks tagged from vinyl rips (disc 2) from
# getting 02-13 style filenames after being moved to a single-disc album.
if disc_num > 1:
same_dir = os.path.dirname(full_path)
try:
siblings = [f for f in os.listdir(same_dir) if f.lower().endswith(AUDIO_EXTS)]
has_disc_siblings = any(re.match(r'\d{2}-\d{2}', f) for f in siblings)
if not has_disc_siblings:
print(f" disc sanity: no disc siblings for {os.path.basename(full_path)}"
f" — overriding disc {disc_num} -> 1", flush=True)
disc_num = 1
except Exception:
pass
# Build track prefix
if disc_num > 1:
prefix = f"{disc_num:02d}-{track_num:02d}"
elif track_num > 0:
prefix = f"{track_num:02d}"
else:
prefix = ''
filename = f"{prefix} {title}{ext}".strip() if prefix else f"{title}{ext}"
target = os.path.join(MUSIC_DIR, album_artist, album, filename)
return target
except Exception as e:
print(f" build_target_path error for {os.path.basename(full_path)}: {e}", flush=True)
return None
def restructure_file(full_path: str) -> Optional[str]:
"""
Move a file to its canonical location based on current ID3 tags.
Updates the songs DB row with the new path.
Removes empty directories left behind.
Returns the new full path, or None if no move was needed / failed.
"""
target = build_target_path(full_path)
if not target:
return None
# Already in the right place
if os.path.normpath(full_path) == os.path.normpath(target):
return None
# Avoid overwriting a different existing file
if os.path.exists(target) and os.path.abspath(target) != os.path.abspath(full_path):
print(f" restructure: target exists, skipping: {os.path.basename(target)}", flush=True)
return None
try:
os.makedirs(os.path.dirname(target), exist_ok=True)
shutil.move(full_path, target)
print(f" restructure: {os.path.relpath(full_path, MUSIC_DIR)}"
f" -> {os.path.relpath(target, MUSIC_DIR)}", flush=True)
# Remove empty directories left behind (walk up, stop at MUSIC_DIR)
old_dir = os.path.dirname(full_path)
while old_dir != MUSIC_DIR and os.path.isdir(old_dir):
if not os.listdir(old_dir):
os.rmdir(old_dir)
old_dir = os.path.dirname(old_dir)
else:
break
# Update DB with new path — re-read tags for accurate sort keys
new_relative = os.path.relpath(target, MUSIC_DIR)
song_id = hashlib.md5(full_path.encode()).hexdigest()
new_id = hashlib.md5(target.encode()).hexdigest()
tags = read_tags(target)
with get_db() as c:
cur = c.cursor()
cur.execute("""UPDATE songs SET
id=?, full_path=?, relative_path=?,
sort_title=?, sort_artist=?, sort_album=?, sort_album_artist=?,
file_mtime=?, date_modified=?
WHERE id=?""", (
new_id, target, new_relative,
sort_key(tags['title']),
sort_key(tags['artist']),
sort_key(tags['album']),
sort_key(tags['album_artist']),
os.stat(target).st_mtime,
datetime.utcnow().isoformat(),
song_id
))
if cur.rowcount == 0:
# Row used old full_path as key — try matching by path
cur.execute("""UPDATE songs SET
id=?, full_path=?, relative_path=?,
sort_title=?, sort_artist=?, sort_album=?, sort_album_artist=?,
file_mtime=?, date_modified=?
WHERE full_path=?""", (
new_id, target, new_relative,
sort_key(tags['title']),
sort_key(tags['artist']),
sort_key(tags['album']),
sort_key(tags['album_artist']),
os.stat(target).st_mtime,
datetime.utcnow().isoformat(),
full_path
))
return target
except Exception as e:
print(f" restructure FAILED for {os.path.basename(full_path)}: {e}", flush=True)
return None
def restructure_all() -> dict:
"""Restructure every file in the library to match its tags. Used by bulk-fix."""
print("Restructuring library...", flush=True)
moved = 0
skipped = 0
failed = 0
with get_db() as c:
rows = c.execute("SELECT full_path FROM songs").fetchall()
for (full_path,) in rows:
if not os.path.isfile(full_path):
skipped += 1
continue
# Safety net: snapshot tags BEFORE enforce_tag_whitelist touches them.
# If whitelist enforcement damages tags (as it did for MP3s before the
# ID3_FRAME_WHITELIST fix), the backup enables recovery.
# Non-fatal if backup fails — log and continue (restructure is bulk).
if not backup_tags(full_path):
print(f" restructure: backup failed for {os.path.basename(full_path)}, proceeding cautiously", flush=True)
# Enforce whitelist before restructuring — clean tags first so
# build_target_path reads clean data and generates the correct path
enforce_tag_whitelist(full_path, preserve_composer=True, preserve_lyrics=True)
result = restructure_file(full_path)
if result:
moved += 1
else:
skipped += 1
print(f"Restructure complete: {moved} moved, {skipped} skipped, {failed} failed", flush=True)
return {"moved": moved, "skipped": skipped, "failed": failed}
def apply_tags(path: str, u, preserve_composer: bool = True, preserve_lyrics: bool = True):
"""Write tags then enforce whitelist — only Navidrome tags survive.
Backs up all tags BEFORE writing so undo is always possible.
REFUSES to write if backup fails — never modifies tags without a safety net."""
# Snapshot current state for undo — MUST succeed before we touch anything
if not backup_tags(path):
raise RuntimeError(
f"Cannot edit {os.path.basename(path)}: backup failed. "
"Tags untouched. Check disk space and file permissions in TAG_BACKUP_DIR."
)
audio = MutagenFile(path, easy=True)
if audio is None:
raise ValueError(f"Unsupported format: {path}")
# Record what was present before write
had_artist = bool(audio.get('artist', [''])[0].strip())
had_album = bool(audio.get('album', [''])[0].strip())
had_title = bool(audio.get('title', [''])[0].strip())
if u.title: audio['title'] = u.title
if u.artist: audio['artist'] = u.artist
if u.album: audio['album'] = u.album
if u.album_artist: audio['albumartist'] = u.album_artist
if u.genre: audio['genre'] = u.genre
if u.year: audio['date'] = str(u.year)
if u.track_number: audio['tracknumber'] = str(u.track_number)
audio.save()
# Enforce whitelist after writing
enforce_tag_whitelist(path, preserve_composer=preserve_composer, preserve_lyrics=preserve_lyrics)
# Validate — if essential tags were destroyed, auto-restore
problems = validate_essential_tags(path, had_artist, had_album, had_title)
if problems:
print(f" ⚠ TAG DAMAGE DETECTED in {os.path.basename(path)}: {problems} — auto-restoring", flush=True)
restore_tags_from_backup(path)
raise RuntimeError(f"Tag write damaged {os.path.basename(path)}: {problems}. Restored from backup.")
def apply_tags_dict(path: str, tags: dict, preserve_composer: bool = True, preserve_lyrics: bool = True):
"""Write tags dict then enforce whitelist — only Navidrome tags survive.
Backs up all tags BEFORE writing so undo is always possible.
REFUSES to write if backup fails — never modifies tags without a safety net."""
# Snapshot current state for undo — MUST succeed before we touch anything
if not backup_tags(path):
raise RuntimeError(
f"Cannot edit {os.path.basename(path)}: backup failed. "
"Tags untouched. Check disk space and file permissions in TAG_BACKUP_DIR."
)
audio = MutagenFile(path, easy=True)
if audio is None:
raise ValueError(f"Unsupported format: {path}")
# Record what was present before write
had_artist = bool(audio.get('artist', [''])[0].strip())
had_album = bool(audio.get('album', [''])[0].strip())
had_title = bool(audio.get('title', [''])[0].strip())
mapping = {'title': 'title', 'artist': 'artist', 'album': 'album',
'album_artist': 'albumartist', 'genre': 'genre',
'year': 'date', 'track_number': 'tracknumber'}
for key, tag_name in mapping.items():
val = tags.get(key)
if val is not None and val != "":
audio[tag_name] = str(val)
audio.save()
# Enforce whitelist after writing
enforce_tag_whitelist(path, preserve_composer=preserve_composer, preserve_lyrics=preserve_lyrics)
# Validate — if essential tags were destroyed, auto-restore
problems = validate_essential_tags(path, had_artist, had_album, had_title)
if problems:
print(f" ⚠ TAG DAMAGE DETECTED in {os.path.basename(path)}: {problems} — auto-restoring", flush=True)
restore_tags_from_backup(path)
raise RuntimeError(f"Tag write damaged {os.path.basename(path)}: {problems}. Restored from backup.")
# ── Analysis ─────────────────────────────────────────────────────────────────
def analyze(full_path: str) -> dict:
import librosa
cmd = ["ffmpeg", "-hide_banner", "-i", full_path,
"-af", "silencedetect=noise=-50dB:d=0.5,ebur128", "-f", "null", "-"]
try:
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
out = proc.stderr
except subprocess.TimeoutExpired as exc:
# Kill the ffmpeg process so it doesn't continue as an orphan consuming
# CPU on the Pi (AUDIT-017). exc.process is set by subprocess.run.
if exc.process is not None:
exc.process.kill()
exc.process.communicate() # reap zombie
raise RuntimeError(
f"ffmpeg timed out after 120s analysing {os.path.basename(full_path)}. "
"File may be corrupt or extremely long."
) from exc
all_starts = re.findall(r"silence_start: ([\d\.]+)", out)
all_ends = re.findall(r"silence_end: ([\d\.]+)", out)
lu = re.search(r"I:\s+([\-\d\.]+) LUFS", out)
sil_start = float(all_starts[-1]) if all_starts else 0.0
sil_end = float(all_ends[0]) if all_ends else 0.0
loudness = float(lu.group(1)) if lu else -14.0
if sil_end > 10.0:
sil_end = 0.0
dur_m = re.search(r"Duration: (\d+):(\d+):(\d+\.\d+)", out)
if dur_m:
total = (int(dur_m.group(1)) * 3600 +
int(dur_m.group(2)) * 60 +
float(dur_m.group(3)))
if sil_start < total * 0.5:
sil_start = total
print(f" analyze: trailing={sil_start:.1f}s leading_end={sil_end:.1f}s LUFS={loudness:.1f}",
flush=True)
y, sr = librosa.load(full_path, sr=22050, duration=30)
tempo, _ = librosa.beat.beat_track(y=y, sr=sr)
del y
# gc.collect() removed: causes a stop-the-world GC pause on the event loop
# (AUDIT-014). del y drops the refcount to 0; CPython frees it immediately.
try:
bpm = float(tempo)
except TypeError:
bpm = float(tempo[0]) if tempo is not None else 0.0
profile = {"bpm": round(bpm, 1), "silence_start": round(sil_start, 3),
"silence_end": round(sil_end, 3), "loudness_lufs": round(loudness, 1)}
with get_db() as c:
c.execute("INSERT OR REPLACE INTO dj_profiles VALUES (?,?,?,?,?,CURRENT_TIMESTAMP)",
(full_path, profile["bpm"], profile["silence_start"],
profile["silence_end"], profile["loudness_lufs"]))
return profile
# ── Visualizer (fixed: no eqBoost, uniform bin width) ────────────────────────
def gen_vis_frames(path: str, fps: float = 30.0, fft_size: int = 1024, pts: int = 20) -> list:
import librosa
# Cap at 600s to prevent OOM on long recordings (AUDIT-015).
# A 10-min song at 22050 Hz mono = ~26MB; without cap a 1hr file = ~318MB.
MAX_DURATION = 600.0
y, sr = librosa.load(path, sr=22050, mono=True, duration=MAX_DURATION)
hop = max(1, int(sr / fps))
frames = []
for start in range(0, len(y) - fft_size, hop):
chunk = y[start:start + fft_size] * np.hanning(fft_size)
spec = np.sqrt(np.abs(np.fft.rfft(chunk)) / fft_size)
half = len(spec)
cutoff = min(half - 1, 90)
uniform_bw = max(1, cutoff // pts)
fp = []
for i in range(pts):
ni = (i + 1) / pts
li = np.log10(ni * 9 + 1)
cb = int(li * cutoff)
lo = max(1, cb - uniform_bw // 2)
hi = min(cutoff, cb + uniform_bw // 2)
avg = float(np.mean(spec[lo:hi + 1])) if lo <= hi < half else 0.0
fp.append(avg) # no eqBoost
frames.append(fp)
del y
# gc.collect() removed (AUDIT-014) — del y is sufficient for numpy arrays
vals = sorted(v for f in frames for v in f if v > 0.001)
if vals:
p95 = vals[min(int(len(vals) * 0.95), len(vals) - 1)]
if p95 > 0.001:
s = 0.8 / p95
frames = [[min(1.0, v * s) for v in f] for f in frames]
return frames
def vis_cache_file(path: str) -> str:
return os.path.join(VIS_CACHE_DIR, hashlib.md5(path.encode()).hexdigest() + ".json")
def get_vis(path: str):
cf = vis_cache_file(path)
if os.path.exists(cf):
try:
with open(cf) as f:
return json.load(f)
except Exception:
pass
if not os.path.isfile(path):
return None
try:
frames = gen_vis_frames(path)
with open(cf, "w") as f:
json.dump(frames, f)
return frames
except Exception as e:
print(f"Vis gen failed for {os.path.basename(path)}: {e}")
return None
# ── Library response helpers ─────────────────────────────────────────────────
SONG_COLS = ("id,full_path,relative_path,navidrome_id,"
"title,artist,album,album_artist,genre,"
"year,track_number,disc_number,duration,"
"sort_title,sort_artist,sort_album,sort_album_artist,"
"cover_art_path,file_size,file_mtime,date_added,date_modified")
_ALLOWED_SORT = {
"title", "sort_title", "artist", "sort_artist", "album", "sort_album",
"album_artist", "sort_album_artist", "track_number", "disc_number",
"year", "duration", "genre", "date_added",
}
def song_row_to_dict(row) -> dict:
(song_id, full_path, relative_path, navidrome_id,
title, artist, album, album_artist, genre,
year, track_number, disc_number, duration,
sort_title, sort_artist, sort_album, sort_album_artist,
cover_art_path, file_size, file_mtime,
date_added, date_modified) = row
return {
"id": song_id,
"navidrome_id": navidrome_id,
"title": title,
"artist": artist,
"album": album,
"album_artist": album_artist,
"genre": genre,
"year": year,
"track_number": track_number,
"disc_number": disc_number,
"duration": duration,
"relative_path": relative_path,
"cover_art_url": f"/library/cover-art/{song_id}" if cover_art_path else None,
"sort_title": sort_title,
"sort_artist": sort_artist,
"sort_album": sort_album,
"sort_album_artist": sort_album_artist,
"date_added": date_added,
}
# =============================================================================
# ENDPOINTS - existing (behaviour unchanged)
# =============================================================================
@app.get("/health")
async def health():
pc = fc = sc = 0
try:
with get_db() as c:
pc = c.execute("SELECT COUNT(*) FROM dj_profiles").fetchone()[0]
fc = c.execute("SELECT COUNT(*) FROM file_index").fetchone()[0]
sc = c.execute("SELECT COUNT(*) FROM songs").fetchone()[0]
except Exception:
pass
vc = len(os.listdir(VIS_CACHE_DIR)) if os.path.isdir(VIS_CACHE_DIR) else 0
return {"status": "healthy", "music_dir": MUSIC_DIR,
"profiles": pc, "file_index": fc, "songs": sc,
"vis_cached": vc, "ws_clients": len(push.connections)}
@app.post("/reindex")
async def reindex():
await asyncio.to_thread(build_file_index)
return {"status": "reindexed"}
@app.patch("/edit-metadata")
async def edit_metadata(update: MetadataUpdate):
fp = resolve_path(update.relative_path)
if not fp:
raise HTTPException(404, f"File not found. raw='{update.relative_path}' MUSIC_DIR='{MUSIC_DIR}'")
# Generate a batch ID so single-track edits appear in edit history too
batch_id = hashlib.md5(f"{time.time()}_{update.relative_path}".encode()).hexdigest()[:12]
try:
# Collect context before the edit (for history UI)
album_name = ""
artist_name = ""
try:
pre = MutagenFile(fp, easy=True)
if pre:
album_name = pre.get('album', [''])[0]
artist_name = pre.get('albumartist', pre.get('artist', ['']))[0]
except Exception:
pass
# Offload blocking Mutagen I/O to a thread — audio.save() on FLAC can
# take 100-500ms and must not block the event loop (AUDIT-009)
await asyncio.to_thread(apply_tags, fp, update)
await asyncio.to_thread(update_song_in_db, fp)
new_relative = os.path.relpath(fp, MUSIC_DIR)
# Build tags_changed from update fields
tags_changed = {}
if update.title: tags_changed["title"] = update.title
if update.artist: tags_changed["artist"] = update.artist
if update.album: tags_changed["album"] = update.album
if update.album_artist: tags_changed["album_artist"] = update.album_artist
if update.genre: tags_changed["genre"] = update.genre
if update.year: tags_changed["year"] = str(update.year)
if update.track_number: tags_changed["track_number"] = str(update.track_number)
# Save manifest so single edits appear in edit history
save_batch_manifest(
batch_id, [fp],
tags_changed=tags_changed,
affected_albums=[album_name] if album_name else [],
affected_artists=[artist_name] if artist_name else [],
edit_type="single",
)
await trigger_scan()
await push.broadcast("metadata_updated", {
"path": new_relative,
"title": update.title or "", "artist": update.artist or "",
"album": update.album or "",
"batch_id": batch_id,
})
return {"status": "success", "file": new_relative, "resolved": fp, "batch_id": batch_id}
except Exception as e:
import traceback
traceback.print_exc()
raise HTTPException(500, str(e))
@app.patch("/batch-edit-metadata")
async def batch_edit_metadata(update: BatchMetadataUpdate):
# Generate a batch ID for undo support
batch_id = hashlib.md5(f"{time.time()}_{len(update.relative_paths)}".encode()).hexdigest()[:12]
results = {"succeeded": [], "failed": [], "batch_id": batch_id}
tags = {}
if update.title: tags["title"] = update.title
if update.artist: tags["artist"] = update.artist
if update.album: tags["album"] = update.album
if update.album_artist: tags["album_artist"] = update.album_artist
if update.genre: tags["genre"] = update.genre
if update.year: tags["year"] = str(update.year)
def _apply_batch():
"""Blocking tag writes run in a thread so the event loop stays free.
Each file is backed up BEFORE any write — undo is always possible."""
resolved_paths = []
albums_seen = []
artists_seen = []
for rp in update.relative_paths:
fp = resolve_path(rp)
if not fp:
results["failed"].append({"path": rp, "error": "File not found"})
continue
try:
# Collect album/artist context BEFORE the edit (for history UI)
try:
pre = MutagenFile(fp, easy=True)
if pre:
a = pre.get('album', [''])[0]
ar = pre.get('albumartist', pre.get('artist', ['']))[0]
if a: albums_seen.append(a)
if ar: artists_seen.append(ar)
except Exception:
pass
apply_tags_dict(fp, tags)
update_song_in_db(fp)
results["succeeded"].append(os.path.relpath(fp, MUSIC_DIR))
resolved_paths.append(fp)
except Exception as e:
results["failed"].append({"path": rp, "error": str(e)})
# Save batch manifest for bulk undo — includes what changed and what was affected
if resolved_paths:
save_batch_manifest(
batch_id, resolved_paths,
tags_changed=tags,
affected_albums=albums_seen,
affected_artists=artists_seen,
edit_type="batch",
)
await asyncio.to_thread(_apply_batch)
await trigger_scan()
# Wait for Navidrome to finish scanning before the app re-fetches paths.
await asyncio.sleep(4)
await push.broadcast("batch_metadata_updated",
{"count": len(results["succeeded"]),
"album": update.album or "",
"batch_id": batch_id,
"paths_changed": "true"})
# Run conflict check in background after every batch edit
_create_task(_run_conflict_check_and_broadcast())
return results
@app.post("/undo-batch-edit/{batch_id}")
async def undo_batch_edit(batch_id: str):
"""Restore all files in a batch edit to their pre-edit tags.
The batch_id is returned by PATCH /batch-edit-metadata.
Handles files that moved after restructure: if the manifest path
no longer exists, resolves the current location and uses the
original path to find the backup.
"""
manifest_path = os.path.join(TAG_BACKUP_DIR, f"batch_{batch_id}.json")
if not os.path.exists(manifest_path):
raise HTTPException(404, f"Batch {batch_id} not found. Backups may have been cleaned up.")
with open(manifest_path) as f:
manifest = json.load(f)
# Prevent double-undo: if already reverted, reject
if manifest.get("is_reverted"):
reverted_at = manifest.get("reverted_at", "unknown time")
raise HTTPException(409, f"Batch {batch_id} was already reverted at {reverted_at}.")
results = {"restored": [], "failed": []}
def _restore():
for original_fp in manifest["paths"]:
# Find the file — it may have moved since the edit
if os.path.isfile(original_fp):
current_fp = original_fp
else:
# File moved (restructure happened). Try resolve_path.
old_rel = os.path.relpath(original_fp, MUSIC_DIR) if original_fp.startswith(MUSIC_DIR) else original_fp
current_fp = resolve_path(old_rel)
if not current_fp:
# Last resort: search by filename anywhere in MUSIC_DIR
fname = os.path.basename(original_fp)
for root, dirs, files in os.walk(MUSIC_DIR):
if fname in files:
current_fp = os.path.join(root, fname)
break
if not current_fp:
results["failed"].append({
"path": os.path.relpath(original_fp, MUSIC_DIR),
"error": "File not found (may have been deleted)"
})
continue
try:
ok = restore_tags_from_backup(current_fp, original_path=original_fp)
if ok:
results["restored"].append(os.path.relpath(current_fp, MUSIC_DIR))
else:
results["failed"].append({
"path": os.path.relpath(current_fp, MUSIC_DIR),
"error": "No backup found for this file"
})
except Exception as e:
results["failed"].append({
"path": os.path.relpath(original_fp, MUSIC_DIR),
"error": str(e)
})
await asyncio.to_thread(_restore)
# Mark the manifest as reverted so it can't be undone again
manifest["is_reverted"] = True
manifest["reverted_at"] = datetime.utcnow().isoformat()
manifest["restore_results"] = {
"restored": len(results["restored"]),
"failed": len(results["failed"]),
}
with open(manifest_path, 'w') as f:
json.dump(manifest, f, indent=2)
await trigger_scan()
await asyncio.sleep(4)
await push.broadcast("batch_undo_complete", {
"batch_id": batch_id,
"restored": len(results["restored"]),
"failed": len(results["failed"]),
})
return results
@app.post("/restore-tags")
async def restore_single_file_tags(relative_path: str = Query(...)):
"""Restore a single file's tags from its backup."""
fp = resolve_path(relative_path)
if not fp:
raise HTTPException(404, "File not found")
ok = await asyncio.to_thread(restore_tags_from_backup, fp)
if not ok:
raise HTTPException(404, "No backup found for this file")
await asyncio.to_thread(update_song_in_db, fp)
await trigger_scan()
return {"status": "restored", "path": relative_path}
@app.get("/batch-edit-history")
async def batch_edit_history():
"""List recent edits (batch + single) that can be undone.
Returns enough data for the Edit History UI to display:
- what changed (tags_changed pills)
- what was affected (album/artist names)
- whether already reverted
- edit type (batch/single)
"""
manifests = []
if os.path.isdir(TAG_BACKUP_DIR):
for f in sorted(os.listdir(TAG_BACKUP_DIR), reverse=True):
if f.startswith("batch_") and f.endswith(".json"):
try:
with open(os.path.join(TAG_BACKUP_DIR, f)) as fh:
m = json.load(fh)
manifests.append({
"batch_id": m.get("batch_id", ""),
"timestamp": m.get("timestamp", ""),
"file_count": m.get("file_count", 0),
"tags_changed": m.get("tags_changed", {}),
"affected_albums": m.get("affected_albums", []),
"affected_artists": m.get("affected_artists", []),
"edit_type": m.get("edit_type", "batch"),
"is_reverted": m.get("is_reverted", False),
"reverted_at": m.get("reverted_at"),
})
except:
pass
# Sort by timestamp descending (filenames aren't guaranteed to sort by time)
manifests.sort(key=lambda x: x["timestamp"], reverse=True)
return {"batches": manifests[:50]} # last 50
@app.post("/upload-track")
async def upload_track(
file: UploadFile = File(...),
title: str = Form(...),
artist: str = Form(...),
album: str = Form(...),
preserve_composer: bool = Form(False),
preserve_lyrics: bool = Form(False),
):
dest = os.path.join(MUSIC_DIR, "uploads")
os.makedirs(dest, exist_ok=True)
fp = os.path.join(dest, file.filename)
with open(fp, "wb") as buf:
shutil.copyfileobj(file.file, buf)
try:
u = MetadataUpdate(relative_path=f"uploads/{file.filename}",
title=title, artist=artist, album=album)
await asyncio.to_thread(apply_tags, fp, u,
preserve_composer=preserve_composer,
preserve_lyrics=preserve_lyrics)
await asyncio.to_thread(update_song_in_db, fp)
profile = await asyncio.to_thread(analyze, fp)
await trigger_scan()
await push.broadcast("track_uploaded",
{"filename": file.filename, "profile": json.dumps(profile)})
return {"status": "uploaded", "path": f"uploads/{file.filename}", "profile": profile}
except Exception as e:
# Clean up both the file AND the DB row that update_song_in_db already
# wrote — leaving an orphaned row pointing to a deleted file (AUDIT-017)
if os.path.exists(fp):
os.remove(fp)
song_id = hashlib.md5(fp.encode()).hexdigest()
try:
with get_db() as c:
c.execute("DELETE FROM songs WHERE id = ?", (song_id,))
except Exception:
pass
raise HTTPException(500, str(e))
@app.post("/upload-tracks")
async def upload_tracks(
files: List[UploadFile] = File(...),
metadata_json: str = Form(...),
cover_art: Optional[UploadFile] = File(None),
preserve_composer: bool = Form(False),
preserve_lyrics: bool = Form(False),
):
"""
Upload multiple audio files with per-track metadata.
Optional cover_art is saved as cover.jpg in the album directory.
"""
try:
meta_list = json.loads(metadata_json)
except json.JSONDecodeError:
raise HTTPException(400, "Invalid metadata_json")
meta_by_name = {m["filename"]: m for m in meta_list}
results = {"uploaded": [], "failed": []}
album_dir = None
for file in files:
meta = meta_by_name.get(file.filename) or {
"filename": file.filename,
"title": Path(file.filename).stem,
"artist": "Unknown Artist", "album": "Uploads"
}
artist_name = meta.get("artist", "Unknown Artist")
album_name = meta.get("album", "Uploads")
safe_artist = re.sub(r'[<>:"/\\|?*]', '_', artist_name)
safe_album = re.sub(r'[<>:"/\\|?*]', '_', album_name)
dest_dir = os.path.join(MUSIC_DIR, safe_artist, safe_album)
os.makedirs(dest_dir, exist_ok=True)
album_dir = dest_dir
fp = os.path.join(dest_dir, file.filename)
try:
with open(fp, "wb") as buf:
shutil.copyfileobj(file.file, buf)
tags = {
"title": meta.get("title", Path(file.filename).stem),
"artist": artist_name,
"album": album_name,
"album_artist": meta.get("album_artist") or artist_name,
}
if meta.get("track_number"): tags["track_number"] = str(meta["track_number"])
if meta.get("genre"): tags["genre"] = meta["genre"]
if meta.get("year"): tags["year"] = str(meta["year"])
apply_tags_dict(fp, tags,
preserve_composer=preserve_composer,
preserve_lyrics=preserve_lyrics)
try:
await asyncio.to_thread(analyze, fp)
except Exception as e:
print(f" Analysis failed for {file.filename}: {e}")
await asyncio.to_thread(update_song_in_db, fp)
results["uploaded"].append({
"filename": file.filename,
"path": os.path.relpath(fp, MUSIC_DIR)
})
except Exception as e:
if os.path.exists(fp):
os.remove(fp)
results["failed"].append({"filename": file.filename, "error": str(e)})
if cover_art and album_dir:
cover_dest = os.path.join(album_dir, "cover.jpg")
try:
cover_data = await cover_art.read()
with open(cover_dest, "wb") as buf:
buf.write(cover_data)
with get_db() as c:
c.execute("UPDATE songs SET cover_art_path = ? WHERE full_path LIKE ?",
(cover_dest, os.path.join(album_dir, "%")))
# Also embed into each audio file's tags
embed_result = await asyncio.to_thread(
embed_cover_art_in_directory, album_dir, cover_data
)
print(f" Cover art saved: {cover_dest}, embedded in {embed_result['succeeded']} files", flush=True)
except Exception as e:
print(f" Cover art save failed: {e}", flush=True)
await trigger_scan()
await push.broadcast("tracks_uploaded", {
"count": len(results["uploaded"]),
"album": meta_list[0].get("album", "") if meta_list else ""
})
return results
@app.get("/smart-dj/profile")
async def get_profile(relative_path: str):
fp = resolve_path(relative_path)
if fp:
try:
with get_db() as c:
row = c.execute(
"SELECT bpm,silence_start,silence_end,loudness_lufs "
"FROM dj_profiles WHERE file_path=?", (fp,)
).fetchone()
if row:
return {"bpm": row[0], "silence_start": row[1],
"silence_end": row[2], "loudness_lufs": row[3]}
except sqlite3.OperationalError as e:
print(f"DB error: {e}", flush=True)
return await asyncio.to_thread(analyze, fp)
decoded = relative_path
for _ in range(5):
nd = unquote(decoded)
if nd == decoded: break
decoded = nd
target = os.path.basename(decoded).lower()
if target:
try:
with get_db() as c:
rows = c.execute(
"SELECT file_path,bpm,silence_start,silence_end,loudness_lufs "
"FROM dj_profiles WHERE LOWER(file_path) LIKE ?", (f"%{target}",)
).fetchall()
if rows:
return {"bpm": rows[0][1], "silence_start": rows[0][2],
"silence_end": rows[0][3], "loudness_lufs": rows[0][4]}
except Exception as e:
print(f" DB fallback error: {e}", flush=True)
raise HTTPException(404, f"Not found. decoded='{decoded}' MUSIC_DIR='{MUSIC_DIR}'")
@app.get("/smart-dj/bulk-profiles")
async def bulk_profiles(paths: str = Query(...)):
results = {}
for rp in (p.strip() for p in paths.split(",") if p.strip()):
fp = resolve_path(rp)
if not fp:
results[rp] = None
continue
try:
with get_db() as c:
row = c.execute(
"SELECT bpm,silence_start,silence_end,loudness_lufs "
"FROM dj_profiles WHERE file_path=?", (fp,)
).fetchone()
if row:
results[rp] = {"bpm": row[0], "silence_start": row[1],
"silence_end": row[2], "loudness_lufs": row[3]}
else:
results[rp] = await asyncio.to_thread(analyze, fp)
except Exception:
results[rp] = None
return results
@app.get("/smart-dj/profiles/export")
async def export_all_profiles():
"""
Export ALL Smart DJ profiles as a single JSON blob.
iOS client fetches this once on launch to populate the local cache,
eliminating per-song API calls for crossfade/volume data.
URLSession sends Accept-Encoding: gzip by default; FastAPI compresses
the response automatically — ~200KB raw → ~30KB over the wire.
"""
def _read_all():
with get_db() as c:
rows = c.execute(
"SELECT file_path, bpm, silence_start, silence_end, loudness_lufs "
"FROM dj_profiles"
).fetchall()
profiles = {}
for row in rows:
try:
rel = os.path.relpath(row[0], MUSIC_DIR)
except ValueError:
continue # skip paths on different drives (Windows edge case)
profiles[rel] = {
"bpm": row[1],
"silence_start": row[2],
"silence_end": row[3],
"loudness_lufs": row[4]
}
return profiles
profiles = await asyncio.to_thread(_read_all)
print(f" [profiles/export] Exported {len(profiles)} profiles", flush=True)
return {"count": len(profiles), "profiles": profiles}
# ── Lyrics ─────────────────────────────────────────────────────────────────
LRCLIB_BASE = "https://lrclib.net/api"
@app.get("/lyrics/search")
async def lyrics_search(q: str):
"""Search LRCLIB for lyrics matching a query string. Returns array of matches."""
async with httpx.AsyncClient(timeout=10) as client:
try:
resp = await client.get(f"{LRCLIB_BASE}/search", params={"q": q})
resp.raise_for_status()
results = resp.json()
return [
{
"id": r.get("id"),
"trackName": r.get("trackName", ""),
"artistName": r.get("artistName", ""),
"albumName": r.get("albumName", ""),
"duration": r.get("duration", 0),
"hasSynced": r.get("syncedLyrics") is not None,
"hasPlain": r.get("plainLyrics") is not None,
"syncedLyrics": r.get("syncedLyrics"),
"plainLyrics": r.get("plainLyrics"),
}
for r in results
]
except Exception as e:
raise HTTPException(502, f"LRCLIB search failed: {e}")
@app.get("/lyrics/fetch")
async def lyrics_fetch(artist: str, title: str, duration: float = 0):
"""
Exact-match fetch from LRCLIB by artist + title + duration.
Caches result in local DB. Returns cached version if available.
"""
# Check local cache first
def _check_cache():
with get_db() as c:
row = c.execute(
"SELECT synced_lyrics, plain_lyrics, source FROM lyrics WHERE artist=? AND title=?",
(artist.lower(), title.lower())
).fetchone()
if row:
return {"syncedLyrics": row[0], "plainLyrics": row[1],
"source": row[2], "cached": True}
return None
cached = await asyncio.to_thread(_check_cache)
if cached:
return cached
# Fetch from LRCLIB
params = {"artist_name": artist, "track_name": title}
if duration > 0:
params["duration"] = int(duration)
async with httpx.AsyncClient(timeout=10) as client:
try:
resp = await client.get(f"{LRCLIB_BASE}/get", params=params)
if resp.status_code == 404:
return {"syncedLyrics": None, "plainLyrics": None,
"source": "lrclib", "cached": False, "found": False}
resp.raise_for_status()
data = resp.json()
except httpx.HTTPStatusError:
return {"syncedLyrics": None, "plainLyrics": None,
"source": "lrclib", "cached": False, "found": False}
except Exception as e:
raise HTTPException(502, f"LRCLIB fetch failed: {e}")
synced = data.get("syncedLyrics")
plain = data.get("plainLyrics")
# Cache the result
if synced or plain:
def _cache():
with get_db() as c:
c.execute(
"INSERT OR REPLACE INTO lyrics (artist, title, duration, synced_lyrics, plain_lyrics, source) "
"VALUES (?, ?, ?, ?, ?, 'lrclib')",
(artist.lower(), title.lower(), duration, synced, plain)
)
await asyncio.to_thread(_cache)
print(f" [lyrics] Cached: {artist}{title} ({'synced' if synced else 'plain'})", flush=True)
return {"syncedLyrics": synced, "plainLyrics": plain,
"source": "lrclib", "cached": False, "found": bool(synced or plain)}
@app.get("/lyrics/get")
async def lyrics_get(relative_path: str):
"""
Get lyrics for a song. Priority:
1. Embedded in audio file tags (USLT/SYLT/LYRICS)
2. .lrc sidecar file in same directory
3. Local DB cache (from previous LRCLIB fetch)
"""
fp = resolve_path(relative_path)
if not fp:
raise HTTPException(404, "File not found")
def _read_all_sources():
# 1. Embedded tags via mutagen
try:
audio = MutagenFile(fp)
if audio and audio.tags:
# MP3: SYLT (synced) or USLT (unsynced)
synced_text = None
plain_text = None
for key in audio.tags:
if key.startswith("SYLT"):
# SYLT contains list of (text, timestamp_ms) tuples
sylt = audio.tags[key]
lines = []
for text, ts in sylt.text:
mins = int(ts / 60000)
secs = (ts % 60000) / 1000
lines.append(f"[{mins:02d}:{secs:05.2f}]{text}")
synced_text = "\n".join(lines)
elif key.startswith("USLT"):
plain_text = str(audio.tags[key])
# FLAC/OGG: LYRICS tag
if hasattr(audio, 'tags') and audio.tags:
for tag_key in ("lyrics", "LYRICS", "UNSYNCEDLYRICS"):
val = audio.tags.get(tag_key)
if val:
text = val[0] if isinstance(val, list) else str(val)
if text.strip():
# Check if it looks like LRC (has timestamps)
if "[" in text and "]" in text:
synced_text = text
else:
plain_text = text
# M4A: ©lyr
if hasattr(audio.tags, '__contains__') and '©lyr' in audio.tags:
val = audio.tags['©lyr']
text = val[0] if isinstance(val, list) else str(val)
if text.strip():
if "[" in text and "]" in text:
synced_text = text
else:
plain_text = text
if synced_text or plain_text:
return {"syncedLyrics": synced_text, "plainLyrics": plain_text,
"source": "embedded"}
except Exception:
pass
# 2. .lrc sidecar file
lrc_path = os.path.splitext(fp)[0] + ".lrc"
if os.path.exists(lrc_path):
try:
with open(lrc_path, "r", encoding="utf-8") as f:
lrc_content = f.read()
return {"syncedLyrics": lrc_content, "plainLyrics": None,
"source": "lrc_file"}
except Exception:
pass
# 3. DB cache
try:
# Extract artist/title from file tags for DB lookup
audio = MutagenFile(fp, easy=True)
if audio:
artist = (audio.get("artist", [None])[0] or "").lower()
title_tag = (audio.get("title", [None])[0] or "").lower()
if artist and title_tag:
with get_db() as c:
row = c.execute(
"SELECT synced_lyrics, plain_lyrics, source FROM lyrics "
"WHERE artist=? AND title=?",
(artist, title_tag)
).fetchone()
if row:
return {"syncedLyrics": row[0], "plainLyrics": row[1],
"source": row[2]}
except Exception:
pass
return None
result = await asyncio.to_thread(_read_all_sources)
if result:
return result
return {"syncedLyrics": None, "plainLyrics": None, "source": None}
@app.post("/lyrics/embed")
async def lyrics_embed(relative_path: str = Form(...), lrc_content: str = Form(...)):
"""
Embed synced lyrics (LRC format) into an audio file's metadata tags.
MP3 → USLT tag (plain text with timestamps in content)
FLAC → LYRICS tag
M4A → ©lyr tag
Then triggers a Navidrome scan to pick up the changes.
"""
fp = resolve_path(relative_path)
if not fp:
raise HTTPException(404, "File not found")
def _embed():
audio = MutagenFile(fp)
if audio is None:
raise ValueError(f"Cannot open: {fp}")
ext = os.path.splitext(fp)[1].lower()
if ext == ".mp3":
from mutagen.id3 import USLT
audio.tags.delall("USLT")
audio.tags.add(USLT(encoding=3, lang="eng", desc="synced", text=lrc_content))
elif ext in (".flac", ".ogg", ".opus"):
audio["LYRICS"] = lrc_content
elif ext in (".m4a", ".mp4", ".aac"):
audio.tags["©lyr"] = [lrc_content]
else:
raise ValueError(f"Unsupported format: {ext}")
audio.save()
# Also write .lrc sidecar for maximum compatibility
lrc_path = os.path.splitext(fp)[0] + ".lrc"
with open(lrc_path, "w", encoding="utf-8") as f:
f.write(lrc_content)
try:
await asyncio.to_thread(_embed)
except Exception as e:
raise HTTPException(500, f"Embed failed: {e}")
# Trigger Navidrome scan
asyncio.create_task(_trigger_scan())
print(f" [lyrics] Embedded LRC into: {os.path.basename(fp)}", flush=True)
# Notify connected clients
await push.broadcast("lyrics_updated", {"path": relative_path})
return {"status": "ok", "path": relative_path}
async def _trigger_scan():
"""Trigger Navidrome library scan."""
try:
navidrome_url = os.getenv("NAVIDROME_URL", "http://localhost:4533")
async with httpx.AsyncClient(timeout=10) as client:
await client.post(f"{navidrome_url}/api/scan", headers={
"x-nd-authorization": f"Bearer {os.getenv('NAVIDROME_TOKEN', '')}"
})
except Exception:
pass
@app.get("/visualizer/frames")
async def vis_frames(relative_path: str):
fp = resolve_path(relative_path)
if not fp:
raise HTTPException(404, "Not found")
# gen_vis_frames() loads the full audio file via librosa (~20MB+ for a 5min song)
# and runs FFT on every frame — must not block the event loop (AUDIT-008)
frames = await asyncio.to_thread(get_vis, fp)
if not frames:
raise HTTPException(500, "Generation failed")
return {"frame_count": len(frames), "fps": 30.0, "points": 20, "frames": frames}
@app.post("/visualizer/precompute")
async def precompute(background_tasks: BackgroundTasks, relative_path: str = ""):
def compute_all():
n = 0
for root, _, files in os.walk(MUSIC_DIR):
for f in files:
if f.lower().endswith(AUDIO_EXTS):
fp = os.path.join(root, f)
if not os.path.exists(vis_cache_file(fp)):
try:
get_vis(fp)
n += 1
except Exception:
pass
print(f"Pre-computed {n} vis caches")
if relative_path:
fp = resolve_path(relative_path)
if not fp:
raise HTTPException(404, f"Cannot resolve path: {relative_path!r}")
# Pass fp directly — avoids the lambda capturing relative_path and
# silently calling get_vis("") if resolve fails later (AUDIT-016)
background_tasks.add_task(get_vis, fp)
return {"message": f"Computing: {relative_path}"}
background_tasks.add_task(compute_all)
return {"message": "Background vis computation started"}
@app.post("/bulk-fix")
async def bulk_fix(background_tasks: BackgroundTasks, dry_run: bool = False):
"""
Fix #4: Restructure all files to canonical paths.
?dry_run=true returns a list of {from, to} moves without executing them.
"""
if dry_run:
# Preview only — return what would move
moves = []
with get_db() as c:
rows = c.execute("SELECT full_path FROM songs").fetchall()
for (full_path,) in rows:
if not os.path.isfile(full_path):
continue
target = build_target_path(full_path)
if target and os.path.normpath(full_path) != os.path.normpath(target):
moves.append({
"from": os.path.relpath(full_path, MUSIC_DIR),
"to": os.path.relpath(target, MUSIC_DIR)
})
return {"dry_run": True, "moves": len(moves), "preview": moves}
async def run():
# restructure_all() calls shutil.move + MutagenFile + SQLite for every song.
# Without to_thread this blocks the entire event loop for minutes (AUDIT-010).
result = await asyncio.to_thread(restructure_all)
await trigger_scan()
await asyncio.sleep(4)
_create_task(_run_conflict_check_and_broadcast())
await push.broadcast("library_restructured", result)
background_tasks.add_task(run)
return {"message": "Library restructure started"}
# =============================================================================
# ENDPOINTS - Phase 1: Library Database
# =============================================================================
@app.post("/library/scan")
async def library_scan(background_tasks: BackgroundTasks, full: bool = False):
"""Trigger a library rescan. ?full=true forces re-read of every file."""
background_tasks.add_task(scan_library, full)
return {"message": f"Library scan started (full={full})"}
@app.post("/library/sync-navidrome-ids")
async def sync_navidrome(background_tasks: BackgroundTasks):
"""Match our songs table to Navidrome IDs so the iOS app can stream."""
background_tasks.add_task(sync_navidrome_ids_task)
return {"message": "Navidrome ID sync started"}
@app.get("/library/songs")
async def library_songs(
page: int = Query(0, ge=0),
per_page: int = Query(100, ge=1, le=500),
sort: str = Query("sort_album,disc_number,track_number"),
album: Optional[str] = Query(None),
artist: Optional[str] = Query(None),
album_artist: Optional[str] = Query(None),
genre: Optional[str] = Query(None),
year: Optional[int] = Query(None),
):
order_parts = []
for col in sort.split(","):
col = col.strip()
desc = col.startswith("-")
name = col.lstrip("-")
if name in _ALLOWED_SORT:
order_parts.append(f"{name} {'DESC' if desc else 'ASC'}")
order = ", ".join(order_parts) or "sort_album, disc_number, track_number"
wheres, params = [], []
if album: wheres.append("album = ?"); params.append(album)
if artist: wheres.append("artist = ?"); params.append(artist)
if album_artist: wheres.append("album_artist = ?"); params.append(album_artist)
if genre: wheres.append("genre = ?"); params.append(genre)
if year: wheres.append("year = ?"); params.append(year)
where = f"WHERE {' AND '.join(wheres)}" if wheres else ""
with get_db() as c:
total = c.execute(f"SELECT COUNT(*) FROM songs {where}", params).fetchone()[0]
rows = c.execute(
f"SELECT {SONG_COLS} FROM songs {where} ORDER BY {order} LIMIT ? OFFSET ?",
params + [per_page, page * per_page]
).fetchall()
return {"total": total, "page": page, "per_page": per_page,
"songs": [song_row_to_dict(r) for r in rows]}
@app.get("/library/albums")
async def library_albums(
artist: Optional[str] = Query(None),
album_artist: Optional[str] = Query(None),
genre: Optional[str] = Query(None),
):
wheres, params = [], []
if artist: wheres.append("artist = ?"); params.append(artist)
if album_artist: wheres.append("album_artist = ?"); params.append(album_artist)
if genre: wheres.append("genre = ?"); params.append(genre)
where = f"WHERE {' AND '.join(wheres)}" if wheres else ""
with get_db() as c:
rows = c.execute(f"""
SELECT album, album_artist, sort_album, sort_album_artist,
MIN(year) as year, COUNT(*) as track_count,
MAX(cover_art_path) as cover_art_path, MIN(id) as rep_id
FROM songs {where}
GROUP BY album, album_artist
ORDER BY sort_album_artist, sort_album
""", params).fetchall()
albums = []
for album, aa, sort_alb, sort_aa, year, tc, cover_path, rep_id in rows:
albums.append({
"album": album,
"album_artist": aa,
"sort_album": sort_alb,
"sort_album_artist": sort_aa,
"year": year,
"track_count": tc,
"cover_art_url": f"/library/cover-art/{rep_id}" if cover_path else None,
})
return {"total": len(albums), "albums": albums}
@app.get("/library/artists")
async def library_artists():
with get_db() as c:
rows = c.execute("""
SELECT artist, sort_artist,
COUNT(*) as track_count, COUNT(DISTINCT album) as album_count
FROM songs GROUP BY artist ORDER BY sort_artist
""").fetchall()
photos = {r[0]: r[1] for r in
c.execute("SELECT artist_name, photo_path FROM artist_photos").fetchall()}
return {"total": len(rows), "artists": [
{"artist": artist,
"sort_artist": sort_art,
"track_count": tc,
"album_count": ac,
"photo_url": f"/library/artist-photo/{artist}" if photos.get(artist) else None}
for artist, sort_art, tc, ac in rows
]}
@app.get("/library/search")
async def library_search(
q: str = Query(..., min_length=1),
limit: int = Query(50, ge=1, le=200)
):
term = f"%{q}%"
with get_db() as c:
rows = c.execute(f"""
SELECT {SONG_COLS} FROM songs
WHERE title LIKE ? OR artist LIKE ? OR album LIKE ? OR genre LIKE ?
ORDER BY sort_artist, sort_album, disc_number, track_number
LIMIT ?
""", (term, term, term, term, limit)).fetchall()
return {"total": len(rows), "songs": [song_row_to_dict(r) for r in rows]}
@app.get("/library/song/{song_id}")
async def library_song(song_id: str):
with get_db() as c:
row = c.execute(f"SELECT {SONG_COLS} FROM songs WHERE id = ?", (song_id,)).fetchone()
if not row:
raise HTTPException(404, "Song not found")
return song_row_to_dict(row)
@app.get("/library/cover-art/{song_id}")
async def library_cover_art(song_id: str):
with get_db() as c:
row = c.execute("SELECT cover_art_path FROM songs WHERE id = ?", (song_id,)).fetchone()
if not row or not row[0] or not os.path.isfile(row[0]):
raise HTTPException(404, "No cover art")
mt = "image/png" if row[0].lower().endswith(".png") else "image/jpeg"
return FileResponse(row[0], media_type=mt)
@app.post("/library/cover-art/{song_id}")
async def upload_cover_art(song_id: str, file: UploadFile = File(...)):
"""Upload cover art — saves as cover.jpg and updates all songs in that directory."""
print(f" [cover-art] POST /library/cover-art/{song_id}", flush=True)
print(f" [cover-art] Upload filename: {file.filename}, content_type: {file.content_type}", flush=True)
with get_db() as c:
row = c.execute("SELECT full_path FROM songs WHERE id = ?", (song_id,)).fetchone()
if not row:
# Fallback: try navidrome_id (iOS may pass Navidrome ID when companion ID unavailable)
row = c.execute("SELECT full_path FROM songs WHERE navidrome_id = ?", (song_id,)).fetchone()
if row:
print(f" [cover-art] Found by navidrome_id fallback: {row[0]}", flush=True)
if not row:
print(f" [cover-art] Song not found for id={song_id}", flush=True)
raise HTTPException(404, "Song not found")
song_dir = os.path.dirname(row[0])
cover_dest = os.path.join(song_dir, "cover.jpg")
print(f" [cover-art] Saving to: {cover_dest}", flush=True)
try:
file_data = await file.read()
print(f" [cover-art] Received {len(file_data)} bytes", flush=True)
with open(cover_dest, "wb") as buf:
buf.write(file_data)
saved_size = os.path.getsize(cover_dest)
print(f" [cover-art] Written to disk: {saved_size} bytes", flush=True)
with get_db() as c:
c.execute("UPDATE songs SET cover_art_path = ? WHERE full_path LIKE ?",
(cover_dest, os.path.join(song_dir, "%")))
sids = [r[0] for r in c.execute(
"SELECT id FROM songs WHERE full_path LIKE ?",
(os.path.join(song_dir, "%"),)
).fetchall()]
for sid in sids:
cached = os.path.join(COVER_ART_DIR, f"{sid}.jpg")
if os.path.isfile(cached):
os.remove(cached)
# Embed the image into every audio file's metadata tags so Navidrome
# picks up the new art via getCoverArt (reads embedded, not cover.jpg)
embed_result = await asyncio.to_thread(
embed_cover_art_in_directory, song_dir, file_data
)
print(f" [cover-art] Success — updated {len(sids)} songs in DB, "
f"embedded in {embed_result['succeeded']} files, cleared cached art", flush=True)
await trigger_scan()
await push.broadcast("cover_art_updated", {"song_id": song_id})
return {"status": "saved", "path": cover_dest, "embedded": embed_result}
except Exception as e:
print(f" [cover-art] FAILED: {e}", flush=True)
raise HTTPException(500, str(e))
@app.post("/library/cover-art-by-path")
async def upload_cover_art_by_path(
relative_path: str = Form(...),
file: UploadFile = File(...)
):
"""
Upload cover art using a relative file path instead of a companion song ID.
Used when the iOS client has Navidrome-sourced songs without companion: prefix.
Resolves the path to find the album directory, then saves cover.jpg there.
"""
print(f" [cover-art-by-path] POST relative_path='{relative_path}'", flush=True)
print(f" [cover-art-by-path] Upload filename: {file.filename}, content_type: {file.content_type}", flush=True)
fp = resolve_path(relative_path)
if not fp:
print(f" [cover-art-by-path] Could not resolve path: '{relative_path}'", flush=True)
raise HTTPException(404, f"File not found. relative_path='{relative_path}'")
song_dir = os.path.dirname(fp)
cover_dest = os.path.join(song_dir, "cover.jpg")
print(f" [cover-art-by-path] Resolved to: {fp}", flush=True)
print(f" [cover-art-by-path] Saving cover to: {cover_dest}", flush=True)
try:
file_data = await file.read()
print(f" [cover-art-by-path] Received {len(file_data)} bytes", flush=True)
with open(cover_dest, "wb") as buf:
buf.write(file_data)
saved_size = os.path.getsize(cover_dest)
print(f" [cover-art-by-path] Written to disk: {saved_size} bytes", flush=True)
# Update all songs in this directory
with get_db() as c:
c.execute("UPDATE songs SET cover_art_path = ? WHERE full_path LIKE ?",
(cover_dest, os.path.join(song_dir, "%")))
updated = c.execute(
"SELECT COUNT(*) FROM songs WHERE full_path LIKE ?",
(os.path.join(song_dir, "%"),)
).fetchone()[0]
sids = [r[0] for r in c.execute(
"SELECT id FROM songs WHERE full_path LIKE ?",
(os.path.join(song_dir, "%"),)
).fetchall()]
# Clear any cached extracted cover art
cleared = 0
for sid in sids:
cached = os.path.join(COVER_ART_DIR, f"{sid}.jpg")
if os.path.isfile(cached):
os.remove(cached)
cleared += 1
# Embed the image into every audio file's metadata tags so Navidrome
# picks up the new art via getCoverArt (reads embedded, not cover.jpg)
embed_result = await asyncio.to_thread(
embed_cover_art_in_directory, song_dir, file_data
)
print(f" [cover-art-by-path] Success — {updated} songs in DB, "
f"{embed_result['succeeded']} embedded, {cleared} cached cleared", flush=True)
await trigger_scan()
await push.broadcast("cover_art_updated", {"path": relative_path})
return {"status": "saved", "path": cover_dest, "songs_updated": updated, "embedded": embed_result}
except Exception as e:
print(f" [cover-art-by-path] FAILED: {e}", flush=True)
import traceback
traceback.print_exc()
raise HTTPException(500, str(e))
@app.delete("/library/cover-art/{song_id}")
async def delete_cover_art(song_id: str):
"""Remove cover.jpg from the album directory and clear cover_art_path in DB."""
with get_db() as c:
row = c.execute("SELECT full_path FROM songs WHERE id = ?", (song_id,)).fetchone()
if not row:
raise HTTPException(404, "Song not found")
song_dir = os.path.dirname(row[0])
cover_path = os.path.join(song_dir, "cover.jpg")
try:
if os.path.isfile(cover_path):
os.remove(cover_path)
with get_db() as c:
c.execute("UPDATE songs SET cover_art_path = NULL WHERE full_path LIKE ?",
(os.path.join(song_dir, "%"),))
sids = [r[0] for r in c.execute(
"SELECT id FROM songs WHERE full_path LIKE ?",
(os.path.join(song_dir, "%"),)
).fetchall()]
for sid in sids:
cached = os.path.join(COVER_ART_DIR, f"{sid}.jpg")
if os.path.isfile(cached):
os.remove(cached)
await push.broadcast("cover_art_updated", {"song_id": song_id})
return {"status": "deleted"}
except Exception as e:
raise HTTPException(500, str(e))
@app.post("/library/artist-photo")
async def upload_artist_photo(
artist_name: str = Form(...),
file: UploadFile = File(...)
):
"""Upload an artist photo."""
safe = re.sub(r'[<>:"/\\|?*\s]', '_', artist_name)
ext = Path(file.filename).suffix.lower() or ".jpg"
dest = os.path.join(ARTIST_PHOTO_DIR, f"{safe}{ext}")
try:
with open(dest, "wb") as buf:
shutil.copyfileobj(file.file, buf)
with get_db() as c:
c.execute("""INSERT OR REPLACE INTO artist_photos
(artist_name, photo_path, updated_at) VALUES (?,?,CURRENT_TIMESTAMP)""",
(artist_name, dest))
await push.broadcast("artist_photo_updated", {"artist": artist_name})
return {"status": "saved", "artist": artist_name}
except Exception as e:
raise HTTPException(500, str(e))
@app.get("/library/artist-photo/{artist_name}")
async def get_artist_photo(artist_name: str):
with get_db() as c:
row = c.execute(
"SELECT photo_path FROM artist_photos WHERE artist_name = ?", (artist_name,)
).fetchone()
if not row or not row[0] or not os.path.isfile(row[0]):
raise HTTPException(404, "No photo")
mt = "image/png" if row[0].lower().endswith(".png") else "image/jpeg"
return FileResponse(row[0], media_type=mt)
async def auto_fix_duplicate_albums():
"""
Fix #9: After every Navidrome scan, detect duplicate album entries
(same name, different album_artist) and rewrite tags on minority files
so Navidrome groups them correctly on the next scan.
Uses tag rewrites only — never writes to Navidrome's DB directly.
"""
try:
with get_navidrome_db() as nav:
rows = nav.execute("""
SELECT name, COUNT(DISTINCT album_artist) as aa_count,
GROUP_CONCAT(id, '|||') as ids,
GROUP_CONCAT(album_artist, '|||') as artists,
GROUP_CONCAT(song_count, '|||') as counts
FROM album
GROUP BY name
HAVING aa_count > 1
""").fetchall()
if not rows:
return
print(f" auto_fix_duplicate_albums: found {len(rows)} duplicate album(s)", flush=True)
for name, aa_count, ids, artists, counts in rows:
id_list = ids.split('|||')
artist_list = artists.split('|||')
count_list = [int(c) for c in counts.split('|||')]
# Canonical = album entry with most songs
canonical_idx = count_list.index(max(count_list))
canonical_artist = artist_list[canonical_idx]
# Find all files in Companion DB for this album
with get_db() as c:
file_rows = c.execute(
"SELECT full_path, album_artist FROM songs WHERE album = ?", (name,)
).fetchall()
fixed = 0
for full_path, current_aa in file_rows:
if current_aa == canonical_artist:
continue
if not os.path.isfile(full_path):
continue
try:
ext = Path(full_path).suffix.lower()
if ext == '.flac':
from mutagen.flac import FLAC
f = FLAC(full_path)
for variant in ['ALBUM ARTIST', 'ALBUM_ARTIST', 'ALBUMARTIST', 'albumartist']:
if variant in f: del f[variant]
f['ALBUMARTIST'] = canonical_artist
f.save()
else:
audio = MutagenFile(full_path, easy=True)
if audio:
audio['albumartist'] = canonical_artist
audio.save()
update_song_in_db(full_path)
fixed += 1
print(f" auto_fix: {os.path.basename(full_path)}: "
f"{current_aa!r} -> {canonical_artist!r}", flush=True)
except Exception as e:
print(f" auto_fix error: {os.path.basename(full_path)}: {e}", flush=True)
if fixed:
print(f" auto_fix_duplicate_albums: fixed {fixed} tracks for '{name}'", flush=True)
except Exception as e:
print(f" auto_fix_duplicate_albums failed: {e}", flush=True)
async def _run_conflict_check_and_broadcast():
"""
Fix #9: After scan, auto-fix duplicate albums then run conflict check.
Called as a background task after every edit and scan.
"""
try:
await asyncio.sleep(6) # Wait for Navidrome scan to complete
# Auto-fix duplicate albums before reporting conflicts
await auto_fix_duplicate_albums()
# Picard tag check opens every FLAC file; stale-path check stats every song.
# Runs in thread to avoid blocking the event loop (AUDIT-011).
navidrome_db = os.getenv("NAVIDROME_DB_PATH", "/navidrome_data/navidrome.db")
issues = await asyncio.to_thread(check_library_conflicts, navidrome_db)
error_count = sum(1 for i in issues if i["severity"] == "error")
warning_count = sum(1 for i in issues if i["severity"] == "warning")
await push.broadcast("conflicts_updated", {
"total": str(len(issues)),
"errors": str(error_count),
"warnings": str(warning_count)
})
except Exception as e:
print(f" Background conflict check failed: {e}", flush=True)
# =============================================================================
# LIBRARY CONFLICTS
# =============================================================================
def check_library_conflicts(navidrome_db_path: str = os.getenv("NAVIDROME_DB_PATH", "/navidrome_data/navidrome.db")) -> list:
"""
Run all conflict checks and return a list of issues.
Each issue is a dict with: type, severity, title, detail, affected_paths, fix_action
"""
issues = []
# ── 1. Duplicate albums (same name, multiple album_artist values) ─────────
try:
with get_navidrome_db() as nav:
rows = nav.execute("""
SELECT name, COUNT(DISTINCT album_artist) as aa_count,
GROUP_CONCAT(id, '|||') as ids,
GROUP_CONCAT(album_artist, '|||') as artists,
GROUP_CONCAT(song_count, '|||') as counts
FROM album
GROUP BY name
HAVING aa_count > 1
ORDER BY name
""").fetchall()
for name, aa_count, ids, artists, counts in rows:
id_list = ids.split('|||')
artist_list = artists.split('|||')
count_list = counts.split('|||')
issues.append({
"type": "duplicate_album",
"severity": "error",
"title": f"Duplicate album: {name}",
"detail": f"{aa_count} versions found with different album artists: {', '.join(artist_list)}",
"affected_paths": [],
"fix_action": "fix_duplicate_album",
"fix_data": {
"album_name": name,
"album_ids": id_list,
"album_artists": artist_list,
"song_counts": count_list
}
})
except Exception as e:
print(f" conflict check 1 failed: {e}", flush=True)
# ── 2. Missing files (Navidrome knows about them but they're gone) ────────
try:
with get_navidrome_db() as nav:
rows = nav.execute(
"SELECT path FROM media_file WHERE missing = 1 LIMIT 50"
).fetchall()
if rows:
issues.append({
"type": "missing_files",
"severity": "warning",
"title": f"{len(rows)} missing file(s)",
"detail": "Files registered in Navidrome but not found on disk.",
"affected_paths": [r[0] for r in rows],
"fix_action": "fix_missing_files",
"fix_data": {}
})
except Exception as e:
print(f" conflict check 2 failed: {e}", flush=True)
# ── 3. Picard legacy tags (FLAC files with conflicting albumartist tags) ──
try:
legacy_files = []
with get_db() as c:
rows = c.execute("SELECT full_path FROM songs").fetchall()
for (full_path,) in rows:
if not full_path.lower().endswith('.flac'):
continue
if not os.path.isfile(full_path):
continue
try:
from mutagen.flac import FLAC
f = FLAC(full_path)
keys = [k.upper() for k in f.keys()]
has_legacy = any(k in keys for k in ['ALBUM ARTIST', 'ALBUM_ARTIST'])
has_canonical = 'ALBUMARTIST' in keys
if has_legacy and has_canonical:
legacy_files.append(os.path.relpath(full_path, MUSIC_DIR))
except Exception:
pass
if legacy_files:
issues.append({
"type": "picard_legacy_tags",
"severity": "error",
"title": f"{len(legacy_files)} file(s) with conflicting album artist tags",
"detail": "These FLAC files have both Picard legacy tags (ALBUM ARTIST/ALBUM_ARTIST) and the canonical ALBUMARTIST tag. Navidrome reads the legacy tag and gets the wrong album artist.",
"affected_paths": legacy_files[:50],
"fix_action": "fix_picard_tags",
"fix_data": {}
})
except Exception as e:
print(f" conflict check 3 failed: {e}", flush=True)
# ── 4. Orphaned tracks (album_id points to non-existent album) ────────────
try:
with get_navidrome_db() as nav:
rows = nav.execute("""
SELECT mf.path FROM media_file mf
LEFT JOIN album a ON mf.album_id = a.id
WHERE mf.album_id IS NOT NULL AND a.id IS NULL
LIMIT 50
""").fetchall()
if rows:
issues.append({
"type": "orphaned_tracks",
"severity": "warning",
"title": f"{len(rows)} orphaned track(s)",
"detail": "Tracks whose album_id points to a non-existent album entry.",
"affected_paths": [r[0] for r in rows],
"fix_action": "fix_orphaned_tracks",
"fix_data": {}
})
except Exception as e:
print(f" conflict check 4 failed: {e}", flush=True)
# ── 5. Duplicate tracks (same title+artist+duration appearing >1 time) ───
try:
with get_db() as c:
rows = c.execute("""
SELECT title, artist, COUNT(*) as cnt,
GROUP_CONCAT(relative_path, '|||') as paths
FROM songs
WHERE title != '' AND artist != ''
GROUP BY title, artist, CAST(duration AS INT)
HAVING cnt > 1
LIMIT 30
""").fetchall()
for title, artist, cnt, paths in rows:
path_list = paths.split('|||') if paths else []
issues.append({
"type": "duplicate_track",
"severity": "warning",
"title": f"Duplicate: {title}{artist}",
"detail": f"Found {cnt} copies of this track.",
"affected_paths": path_list,
"fix_action": None,
"fix_data": {}
})
except Exception as e:
print(f" conflict check 5 failed: {e}", flush=True)
# ── 6. Stale Companion paths (full_path no longer exists on disk) ─────────
try:
stale = []
with get_db() as c:
rows = c.execute("SELECT relative_path, full_path FROM songs").fetchall()
for rel, full in rows:
if not os.path.isfile(full):
stale.append(rel)
if stale:
issues.append({
"type": "stale_companion_paths",
"severity": "warning",
"title": f"{len(stale)} stale path(s) in Companion DB",
"detail": "The Companion's song database has entries whose files no longer exist at the recorded path. Run a library scan to fix.",
"affected_paths": stale[:50],
"fix_action": "fix_stale_paths",
"fix_data": {}
})
except Exception as e:
print(f" conflict check 6 failed: {e}", flush=True)
# ── 7. Album identity reassignment (Fix #13) ────────────────────────────
# Detect tracks whose Navidrome album_id changed since last sync —
# indicates Navidrome reassigned them to a different album entry.
try:
with get_db() as c:
rows = c.execute("""
SELECT relative_path, navidrome_album_id, album, album_artist
FROM songs
WHERE navidrome_album_id IS NOT NULL AND navidrome_album_id != ''
GROUP BY navidrome_album_id, album
HAVING COUNT(DISTINCT navidrome_album_id) > 1
OR (SELECT COUNT(*) FROM songs s2
WHERE s2.album = songs.album
AND s2.navidrome_album_id != songs.navidrome_album_id
LIMIT 1) > 0
""").fetchall()
# Simpler: find albums in our DB that map to multiple navidrome album IDs
album_rows = c.execute("""
SELECT album, album_artist,
COUNT(DISTINCT navidrome_album_id) as id_count,
GROUP_CONCAT(DISTINCT navidrome_album_id) as ids
FROM songs
WHERE navidrome_album_id IS NOT NULL AND navidrome_album_id != ''
GROUP BY album, album_artist
HAVING id_count > 1
""").fetchall()
for album, aa, id_count, ids in album_rows:
path_rows = c.execute(
"SELECT relative_path FROM songs WHERE album = ? AND album_artist = ? LIMIT 10",
(album, aa)
).fetchall()
issues.append({
"type": "album_reassigned",
"severity": "warning",
"title": f"Album split in Navidrome: {album}",
"detail": f"'{album}' by '{aa}' maps to {id_count} different Navidrome album IDs. "
f"This usually means a tag edit caused Navidrome to create a duplicate album entry.",
"affected_paths": [r[0] for r in path_rows],
"fix_action": "fix_duplicate_album",
"fix_data": {"album_name": album, "album_artists": [aa], "song_counts": ["0"]}
})
except Exception as e:
print(f" conflict check 7 failed: {e}", flush=True)
print(f" Conflict check complete: {len(issues)} issues found", flush=True)
return issues
@app.get("/library/conflicts")
async def library_conflicts():
"""Run all conflict checks and return structured results."""
issues = await asyncio.to_thread(check_library_conflicts)
error_count = sum(1 for i in issues if i["severity"] == "error")
warning_count = sum(1 for i in issues if i["severity"] == "warning")
return {
"total": len(issues),
"errors": error_count,
"warnings": warning_count,
"issues": issues
}
@app.post("/library/fix-conflict")
async def fix_conflict(request: FixConflictRequest):
"""
Fix a specific conflict by action type.
Request body: {"action": "fix_duplicate_album", "fix_data": {...}}
"""
action = request.action
fix_data = request.fix_data
if action == "fix_duplicate_album":
# Strategy: rewrite ALBUMARTIST tags on all affected files so Navidrome's
# next scan groups them under one canonical album entry.
# We cannot write to Navidrome's DB directly while it is running.
album_name = fix_data.get("album_name", "")
album_artists = fix_data.get("album_artists", [])
song_counts = fix_data.get("song_counts", [])
if not album_artists:
raise HTTPException(400, "No album artists provided")
# Canonical = album artist with the most songs
try:
counts = [int(c) for c in song_counts]
canonical_idx = counts.index(max(counts))
except Exception:
canonical_idx = 0
canonical_artist = album_artists[canonical_idx]
try:
with get_db() as c:
rows = c.execute(
"SELECT full_path FROM songs WHERE album = ?", (album_name,)
).fetchall()
fixed = 0
for (full_path,) in rows:
if not os.path.isfile(full_path):
continue
try:
ext = Path(full_path).suffix.lower()
if ext == '.flac':
from mutagen.flac import FLAC
f = FLAC(full_path)
for variant in ['ALBUM ARTIST', 'ALBUM_ARTIST', 'ALBUMARTIST', 'albumartist']:
if variant in f:
del f[variant]
f['ALBUMARTIST'] = canonical_artist
f['ALBUM'] = album_name
f.save()
else:
audio = MutagenFile(full_path, easy=True)
if audio:
audio['albumartist'] = canonical_artist
audio['album'] = album_name
audio.save()
update_song_in_db(full_path)
fixed += 1
except Exception as e:
print(f" fix_duplicate_album: {os.path.basename(full_path)}: {e}", flush=True)
await trigger_scan()
await asyncio.sleep(4)
await push.broadcast("conflicts_updated", {"action": "fix_duplicate_album", "album": album_name})
return {"status": "fixed", "album": album_name, "fixed": fixed, "canonical_artist": canonical_artist}
except Exception as e:
raise HTTPException(500, f"Fix failed: {e}")
elif action == "fix_missing_files":
# Trigger a full Navidrome rescan — Navidrome will detect and remove
# missing files automatically during a full scan. We cannot write to
# Navidrome's DB directly while it is running (mounted read-only).
try:
await trigger_scan()
return {"status": "triggered_scan", "detail": "Navidrome will remove missing entries on next scan"}
except Exception as e:
raise HTTPException(500, f"Fix failed: {e}")
elif action == "fix_picard_tags":
fixed = 0
failed = []
with get_db() as c:
rows = c.execute("SELECT full_path FROM songs WHERE full_path LIKE '%.flac'").fetchall()
def _fix_picard_files():
_fixed = 0
_failed = []
for (full_path,) in rows:
if not os.path.isfile(full_path):
continue
try:
from mutagen.flac import FLAC
f = FLAC(full_path)
keys = list(f.keys())
upper_keys = [k.upper() for k in keys]
has_legacy = any(k in upper_keys for k in ['ALBUM ARTIST', 'ALBUM_ARTIST'])
if not has_legacy:
continue
canonical = None
for k in keys:
if k.upper() == 'ALBUMARTIST':
canonical = f[k][0] if f[k] else None
break
if not canonical:
for k in keys:
if k.upper() in ['ALBUM ARTIST', 'ALBUM_ARTIST']:
canonical = f[k][0] if f[k] else None
break
if not canonical:
continue
for k in list(f.keys()):
if k.upper() in ['ALBUM ARTIST', 'ALBUM_ARTIST', 'ALBUMARTIST', 'albumartist']:
del f[k]
f['ALBUMARTIST'] = canonical
f.save()
update_song_in_db(full_path)
_fixed += 1
except Exception:
_failed.append(os.path.relpath(full_path, MUSIC_DIR))
return _fixed, _failed
# All FLAC opens + saves are blocking I/O — run in thread (AUDIT-010)
fixed, failed = await asyncio.to_thread(_fix_picard_files)
await trigger_scan()
await push.broadcast("conflicts_updated", {"action": "fix_picard_tags"})
return {"status": "fixed", "fixed": fixed, "failed": len(failed)}
elif action == "fix_stale_paths":
count = await asyncio.to_thread(scan_library, False)
await asyncio.to_thread(build_file_index)
return {"status": "fixed", "rescanned": count}
elif action == "fix_orphaned_tracks":
await trigger_scan()
return {"status": "triggered_scan"}
else:
raise HTTPException(400, f"Unknown action: {action}")
# =============================================================================
# LIBRARY CLEAN TAGS
# =============================================================================
@app.post("/library/clean-tags")
async def library_clean_tags(background_tasks: BackgroundTasks, dry_run: bool = False):
"""
Remove all Picard-specific tags from every file in the library.
?dry_run=true returns what would be removed without writing.
"""
if dry_run:
# enforce_tag_whitelist opens every file — run in thread (AUDIT-011)
def _preview():
_results = []
with get_db() as c:
rows = c.execute("SELECT full_path FROM songs").fetchall()
for (full_path,) in rows:
if not os.path.isfile(full_path):
continue
r = enforce_tag_whitelist(full_path, dry_run=True)
if r["removed"]:
_results.append({
"path": os.path.relpath(full_path, MUSIC_DIR),
"would_remove": r["removed"]
})
return _results
results = await asyncio.to_thread(_preview)
return {"dry_run": True, "files_affected": len(results), "preview": results[:50]}
async def run_clean():
fixed = 0
errors = 0
with get_db() as c:
rows = c.execute("SELECT full_path FROM songs").fetchall()
print(f"clean-tags: scanning {len(rows)} files...", flush=True)
for (full_path,) in rows:
if not os.path.isfile(full_path):
continue
r = enforce_tag_whitelist(full_path, dry_run=False)
if r["error"]:
errors += 1
elif r["removed"]:
update_song_in_db(full_path)
fixed += 1
print(f"clean-tags: cleaned {fixed} files, {errors} errors", flush=True)
await trigger_scan()
await asyncio.sleep(4)
_create_task(_run_conflict_check_and_broadcast())
await push.broadcast("tags_cleaned", {"fixed": str(fixed), "errors": str(errors)})
background_tasks.add_task(run_clean)
return {"message": "Tag cleaning started in background"}
# =============================================================================
# WebSocket Push
# =============================================================================
@app.websocket("/ws/push")
async def ws_push(ws: WebSocket):
await push.connect(ws)
try:
while True:
raw = await ws.receive_text()
# JSON decode failures are recoverable — send error, keep connection.
# Previously any exception here permanently dropped the client (AUDIT-019).
try:
data = json.loads(raw)
except json.JSONDecodeError as e:
await push.send_to(ws, "error", {"message": f"Invalid JSON: {e}"})
continue
act = data.get("action")
if act == "ping":
await push.send_to(ws, "pong", {"t": str(time.time())})
elif act == "get_profile":
rp = data.get("path", "")
fp = resolve_path(rp)
if fp:
try:
with get_db() as c:
row = c.execute(
"SELECT bpm,silence_start,silence_end,loudness_lufs "
"FROM dj_profiles WHERE file_path=?", (fp,)
).fetchone()
if row:
await push.send_to(ws, "profile", {
"path": rp, "bpm": str(row[0]),
"silence_start": str(row[1]),
"silence_end": str(row[2]),
"loudness_lufs": str(row[3])
})
else:
await push.send_to(ws, "profile",
{"path": rp, "error": "not_analyzed"})
except Exception as e:
# DB error (e.g. locked during scan) — report but keep connection
await push.send_to(ws, "error", {"message": str(e)})
elif act == "get_vis":
rp = data.get("path", "")
fp = resolve_path(rp)
if fp:
try:
frames = await asyncio.to_thread(get_vis, fp)
if frames:
await push.send_to(ws, "vis_frames", {
"path": rp, "count": str(len(frames)),
"fps": "30", "frames": json.dumps(frames)
})
except Exception as e:
await push.send_to(ws, "error", {"message": str(e)})
except WebSocketDisconnect:
push.disconnect(ws)
except Exception as e:
# Unrecoverable transport error — log and clean up
print(f"WS transport error: {e}", flush=True)
push.disconnect(ws)