2170 lines
88 KiB
Python
2170 lines
88 KiB
Python
"""
|
|
Navidrome Companion API
|
|
|
|
Endpoints (existing - unchanged):
|
|
GET /health
|
|
POST /reindex
|
|
PATCH /edit-metadata
|
|
PATCH /batch-edit-metadata
|
|
POST /upload-track
|
|
POST /upload-tracks
|
|
GET /smart-dj/profile
|
|
GET /smart-dj/bulk-profiles
|
|
POST /bulk-fix
|
|
GET /visualizer/frames
|
|
POST /visualizer/precompute
|
|
WS /ws/push
|
|
|
|
Endpoints (Phase 1 - library database):
|
|
POST /library/scan
|
|
POST /library/sync-navidrome-ids
|
|
GET /library/songs
|
|
GET /library/albums
|
|
GET /library/artists
|
|
GET /library/search
|
|
GET /library/song/{song_id}
|
|
GET /library/cover-art/{song_id}
|
|
POST /library/cover-art/{song_id}
|
|
POST /library/artist-photo
|
|
GET /library/artist-photo/{artist_name}
|
|
"""
|
|
import os, re, json, asyncio, hashlib, sqlite3, subprocess, shutil, time, warnings, unicodedata
|
|
from pathlib import Path
|
|
from typing import Optional, List
|
|
from contextlib import asynccontextmanager
|
|
from urllib.parse import unquote
|
|
from datetime import datetime
|
|
|
|
import httpx
|
|
import numpy as np
|
|
from fastapi import (FastAPI, HTTPException, UploadFile, File, Form,
|
|
BackgroundTasks, WebSocket, WebSocketDisconnect, Query)
|
|
from fastapi.responses import JSONResponse, FileResponse
|
|
from pydantic import BaseModel
|
|
|
|
warnings.filterwarnings("ignore", category=UserWarning)
|
|
warnings.filterwarnings("ignore", category=FutureWarning)
|
|
from mutagen import File as MutagenFile
|
|
|
|
MUSIC_DIR = os.getenv("MUSIC_DIR", "/music")
|
|
DB_PATH = os.getenv("DB_PATH", "/app/data/smart_dj.db")
|
|
VIS_CACHE_DIR = os.getenv("VIS_CACHE_DIR", "/app/data/vis_cache")
|
|
COVER_ART_DIR = os.getenv("COVER_ART_DIR", "/app/data/cover_art")
|
|
ARTIST_PHOTO_DIR = os.getenv("ARTIST_PHOTO_DIR", "/app/data/artist_photos")
|
|
NAVIDROME_URL = os.getenv("NAVIDROME_URL", "http://navidrome:4533/navidrome")
|
|
SUBSONIC_USER = os.getenv("SUBSONIC_USER")
|
|
SUBSONIC_TOKEN = os.getenv("SUBSONIC_TOKEN")
|
|
SUBSONIC_SALT = os.getenv("SUBSONIC_SALT")
|
|
|
|
AUDIO_EXTS = ('.mp3', '.flac', '.m4a', '.ogg', '.opus', '.wav', '.aiff', '.aif')
|
|
COVER_NAMES = ('folder.jpg', 'cover.jpg', 'artwork.jpg', 'front.jpg',
|
|
'folder.png', 'cover.png', 'artwork.png', 'front.png')
|
|
|
|
|
|
# ── Database ────────────────────────────────────────────────────────────────
|
|
|
|
def init_db():
|
|
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
|
|
os.makedirs(COVER_ART_DIR, exist_ok=True)
|
|
os.makedirs(ARTIST_PHOTO_DIR, exist_ok=True)
|
|
with sqlite3.connect(DB_PATH) as c:
|
|
# Existing tables -- untouched
|
|
c.execute("""CREATE TABLE IF NOT EXISTS dj_profiles (
|
|
file_path TEXT PRIMARY KEY, bpm REAL,
|
|
silence_start REAL, silence_end REAL, loudness_lufs REAL,
|
|
analyzed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)""")
|
|
c.execute("""CREATE TABLE IF NOT EXISTS file_index (
|
|
basename TEXT, full_path TEXT, title_words TEXT,
|
|
PRIMARY KEY (basename, full_path))""")
|
|
|
|
# Phase 1 -- authoritative song metadata
|
|
c.execute("""CREATE TABLE IF NOT EXISTS songs (
|
|
id TEXT PRIMARY KEY,
|
|
full_path TEXT UNIQUE NOT NULL,
|
|
relative_path TEXT NOT NULL,
|
|
navidrome_id TEXT,
|
|
title TEXT NOT NULL DEFAULT '',
|
|
artist TEXT NOT NULL DEFAULT '',
|
|
album TEXT NOT NULL DEFAULT '',
|
|
album_artist TEXT NOT NULL DEFAULT '',
|
|
genre TEXT NOT NULL DEFAULT '',
|
|
year INTEGER,
|
|
track_number INTEGER,
|
|
disc_number INTEGER,
|
|
duration REAL,
|
|
sort_title TEXT,
|
|
sort_artist TEXT,
|
|
sort_album TEXT,
|
|
sort_album_artist TEXT,
|
|
cover_art_path TEXT,
|
|
file_size INTEGER,
|
|
file_mtime REAL,
|
|
date_added TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
date_modified TIMESTAMP)""")
|
|
|
|
c.execute("""CREATE TABLE IF NOT EXISTS artist_photos (
|
|
artist_name TEXT PRIMARY KEY,
|
|
photo_path TEXT NOT NULL,
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)""")
|
|
|
|
c.execute("CREATE INDEX IF NOT EXISTS idx_songs_album ON songs(sort_album, disc_number, track_number)")
|
|
c.execute("CREATE INDEX IF NOT EXISTS idx_songs_artist ON songs(sort_artist, sort_album)")
|
|
c.execute("CREATE INDEX IF NOT EXISTS idx_songs_album_artist ON songs(sort_album_artist, sort_album)")
|
|
c.execute("CREATE INDEX IF NOT EXISTS idx_songs_navidrome ON songs(navidrome_id)")
|
|
c.execute("CREATE INDEX IF NOT EXISTS idx_songs_genre ON songs(genre)")
|
|
|
|
|
|
# ── Sort helpers ────────────────────────────────────────────────────────────
|
|
|
|
_ARTICLES = re.compile(r'^(the|a|an)\s+', re.IGNORECASE)
|
|
|
|
def sort_key(text: str) -> str:
|
|
if not text:
|
|
return ''
|
|
return _ARTICLES.sub('', text).lower().strip()
|
|
|
|
|
|
# ── Cover art discovery ─────────────────────────────────────────────────────
|
|
|
|
def find_cover_art(song_path: str) -> Optional[str]:
|
|
directory = os.path.dirname(song_path)
|
|
for name in COVER_NAMES:
|
|
candidate = os.path.join(directory, name)
|
|
if os.path.isfile(candidate):
|
|
return candidate
|
|
try:
|
|
for f in os.listdir(directory):
|
|
if f.lower().endswith(('.jpg', '.jpeg', '.png')):
|
|
return os.path.join(directory, f)
|
|
except OSError:
|
|
pass
|
|
# Extract embedded art and cache it
|
|
song_id = hashlib.md5(song_path.encode()).hexdigest()
|
|
cached = os.path.join(COVER_ART_DIR, f"{song_id}.jpg")
|
|
if os.path.isfile(cached):
|
|
return cached
|
|
try:
|
|
audio = MutagenFile(song_path)
|
|
if audio is None:
|
|
return None
|
|
if hasattr(audio, 'tags') and audio.tags:
|
|
for key in list(audio.tags.keys()):
|
|
if key.startswith('APIC'):
|
|
with open(cached, 'wb') as f:
|
|
f.write(audio.tags[key].data)
|
|
return cached
|
|
try:
|
|
from mutagen.mp4 import MP4
|
|
if isinstance(audio, MP4):
|
|
covers = audio.tags.get('covr', [])
|
|
if covers:
|
|
with open(cached, 'wb') as f:
|
|
f.write(bytes(covers[0]))
|
|
return cached
|
|
except Exception:
|
|
pass
|
|
if hasattr(audio, 'pictures') and audio.pictures:
|
|
with open(cached, 'wb') as f:
|
|
f.write(audio.pictures[0].data)
|
|
return cached
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
# ── Tag reader ──────────────────────────────────────────────────────────────
|
|
|
|
def read_tags(full_path: str) -> dict:
|
|
"""Read all display tags from an audio file using mutagen."""
|
|
audio_easy = None
|
|
try:
|
|
audio_easy = MutagenFile(full_path, easy=True)
|
|
except Exception:
|
|
pass
|
|
|
|
# AIFF files don't support easy=True -- fall back to raw ID3 tags
|
|
audio_raw = None
|
|
ext = Path(full_path).suffix.lower()
|
|
if ext in ('.aiff', '.aif') and (audio_easy is None or not audio_easy):
|
|
try:
|
|
from mutagen.aiff import AIFF
|
|
audio_raw = AIFF(full_path)
|
|
except Exception:
|
|
pass
|
|
|
|
def get_easy(key):
|
|
if audio_easy and key in audio_easy and audio_easy[key]:
|
|
return audio_easy[key][0]
|
|
return ''
|
|
|
|
def get_raw(frame_id):
|
|
"""Read a raw ID3 frame value from AIFF."""
|
|
if audio_raw and audio_raw.tags:
|
|
frame = audio_raw.tags.get(frame_id)
|
|
if frame:
|
|
return str(frame.text[0]) if hasattr(frame, 'text') and frame.text else str(frame)
|
|
return ''
|
|
|
|
def get(easy_key, raw_id=None):
|
|
val = get_easy(easy_key)
|
|
if not val and raw_id:
|
|
val = get_raw(raw_id)
|
|
return val
|
|
|
|
title = get('title', 'TIT2') or Path(full_path).stem
|
|
artist = get('artist', 'TPE1') or 'Unknown Artist'
|
|
album = get('album', 'TALB') or 'Unknown Album'
|
|
album_artist = get('albumartist', 'TPE2') or artist
|
|
genre = get('genre', 'TCON') or ''
|
|
|
|
year = None
|
|
raw_date = get('date', 'TDRC') or get('date', 'TYER')
|
|
if raw_date:
|
|
m = re.search(r'\d{4}', str(raw_date))
|
|
if m:
|
|
year = int(m.group())
|
|
|
|
track_number = None
|
|
raw_track = get('tracknumber', 'TRCK')
|
|
if raw_track:
|
|
m = re.match(r'(\d+)', str(raw_track))
|
|
if m:
|
|
track_number = int(m.group(1))
|
|
|
|
disc_number = None
|
|
raw_disc = get('discnumber', 'TPOS')
|
|
if raw_disc:
|
|
m = re.match(r'(\d+)', str(raw_disc))
|
|
if m:
|
|
disc_number = int(m.group(1))
|
|
|
|
duration = None
|
|
audio_for_info = audio_easy or audio_raw
|
|
if audio_for_info and hasattr(audio_for_info, 'info') and audio_for_info.info:
|
|
try:
|
|
duration = float(audio_for_info.info.length)
|
|
except Exception:
|
|
pass
|
|
|
|
return dict(title=title, artist=artist, album=album, album_artist=album_artist,
|
|
genre=genre, year=year, track_number=track_number,
|
|
disc_number=disc_number, duration=duration)
|
|
|
|
|
|
# ── Library scan ─────────────────────────────────────────────────────────────
|
|
|
|
def scan_library(full_rescan: bool = False) -> int:
|
|
"""Walk MUSIC_DIR and upsert every audio file into the songs table."""
|
|
print(f"Library scan started (full={full_rescan})...", flush=True)
|
|
count = skipped = 0
|
|
with sqlite3.connect(DB_PATH) as c:
|
|
for root, dirs, files in os.walk(MUSIC_DIR):
|
|
dirs[:] = [d for d in dirs if not d.startswith('.')]
|
|
for filename in files:
|
|
if not filename.lower().endswith(AUDIO_EXTS):
|
|
continue
|
|
full_path = os.path.join(root, filename)
|
|
try:
|
|
stat = os.stat(full_path)
|
|
mtime = stat.st_mtime
|
|
fsize = stat.st_size
|
|
except OSError:
|
|
continue
|
|
|
|
song_id = hashlib.md5(full_path.encode()).hexdigest()
|
|
relative = unicodedata.normalize("NFC", os.path.relpath(full_path, MUSIC_DIR))
|
|
|
|
if not full_rescan:
|
|
row = c.execute(
|
|
"SELECT file_mtime FROM songs WHERE id = ?", (song_id,)
|
|
).fetchone()
|
|
if row and row[0] and abs(row[0] - mtime) < 1.0:
|
|
skipped += 1
|
|
continue
|
|
|
|
tags = read_tags(full_path)
|
|
cover = find_cover_art(full_path)
|
|
|
|
c.execute("""INSERT OR REPLACE INTO songs (
|
|
id, full_path, relative_path,
|
|
title, artist, album, album_artist, genre,
|
|
year, track_number, disc_number, duration,
|
|
sort_title, sort_artist, sort_album, sort_album_artist,
|
|
cover_art_path, file_size, file_mtime, date_modified
|
|
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", (
|
|
song_id, full_path, relative,
|
|
tags['title'], tags['artist'], tags['album'], tags['album_artist'],
|
|
tags['genre'], tags['year'], tags['track_number'], tags['disc_number'],
|
|
tags['duration'],
|
|
sort_key(tags['title']), sort_key(tags['artist']),
|
|
sort_key(tags['album']), sort_key(tags['album_artist']),
|
|
cover, fsize, mtime, datetime.utcnow().isoformat()
|
|
))
|
|
count += 1
|
|
|
|
print(f"Library scan: {count} upserted, {skipped} unchanged", flush=True)
|
|
return count
|
|
|
|
|
|
def build_file_index():
|
|
with sqlite3.connect(DB_PATH) as c:
|
|
c.execute("DELETE FROM file_index")
|
|
count = 0
|
|
for root, _, files in os.walk(MUSIC_DIR):
|
|
for f in files:
|
|
if f.lower().endswith(AUDIO_EXTS):
|
|
fp = os.path.join(root, f)
|
|
stem = Path(f).stem.lower()
|
|
words = {w for w in re.split(r'[\s\-_\.]+', stem)
|
|
if len(w) > 1 and not w.isdigit()}
|
|
c.execute("INSERT OR REPLACE INTO file_index VALUES (?,?,?)",
|
|
(f.lower(), fp, " ".join(sorted(words))))
|
|
count += 1
|
|
print(f"File index built: {count} files", flush=True)
|
|
|
|
|
|
def update_song_in_db(full_path: str):
|
|
"""Re-read tags and update the songs row. Inserts if missing."""
|
|
song_id = hashlib.md5(full_path.encode()).hexdigest()
|
|
relative = os.path.relpath(full_path, MUSIC_DIR)
|
|
tags = read_tags(full_path)
|
|
cover = find_cover_art(full_path)
|
|
try:
|
|
stat = os.stat(full_path)
|
|
mtime = stat.st_mtime
|
|
fsize = stat.st_size
|
|
except OSError:
|
|
mtime = fsize = None
|
|
|
|
with sqlite3.connect(DB_PATH) as c:
|
|
cur = c.cursor()
|
|
cur.execute("""UPDATE songs SET
|
|
title=?, artist=?, album=?, album_artist=?, genre=?,
|
|
year=?, track_number=?, disc_number=?, duration=?,
|
|
sort_title=?, sort_artist=?, sort_album=?, sort_album_artist=?,
|
|
cover_art_path=?, file_size=?, file_mtime=?, date_modified=?
|
|
WHERE id=?""", (
|
|
tags['title'], tags['artist'], tags['album'], tags['album_artist'],
|
|
tags['genre'], tags['year'], tags['track_number'], tags['disc_number'],
|
|
tags['duration'],
|
|
sort_key(tags['title']), sort_key(tags['artist']),
|
|
sort_key(tags['album']), sort_key(tags['album_artist']),
|
|
cover, fsize, mtime, datetime.utcnow().isoformat(), song_id
|
|
))
|
|
if cur.rowcount == 0:
|
|
cur.execute("""INSERT OR REPLACE INTO songs (
|
|
id, full_path, relative_path,
|
|
title, artist, album, album_artist, genre,
|
|
year, track_number, disc_number, duration,
|
|
sort_title, sort_artist, sort_album, sort_album_artist,
|
|
cover_art_path, file_size, file_mtime, date_modified
|
|
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", (
|
|
song_id, full_path, relative,
|
|
tags['title'], tags['artist'], tags['album'], tags['album_artist'],
|
|
tags['genre'], tags['year'], tags['track_number'], tags['disc_number'],
|
|
tags['duration'],
|
|
sort_key(tags['title']), sort_key(tags['artist']),
|
|
sort_key(tags['album']), sort_key(tags['album_artist']),
|
|
cover, fsize, mtime, datetime.utcnow().isoformat()
|
|
))
|
|
|
|
|
|
# ── Startup ─────────────────────────────────────────────────────────────────
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
init_db()
|
|
os.makedirs(VIS_CACHE_DIR, exist_ok=True)
|
|
print(f"Companion API ready", flush=True)
|
|
print(f" MUSIC_DIR = {MUSIC_DIR}", flush=True)
|
|
print(f" DB_PATH = {DB_PATH}", flush=True)
|
|
print(f" VIS_CACHE = {VIS_CACHE_DIR}", flush=True)
|
|
print(f" COVER_ART = {COVER_ART_DIR}", flush=True)
|
|
print(f" ARTIST_PHOTO = {ARTIST_PHOTO_DIR}", flush=True)
|
|
try:
|
|
dirs = [d for d in os.listdir(MUSIC_DIR)
|
|
if os.path.isdir(os.path.join(MUSIC_DIR, d))]
|
|
print(f" MUSIC_DIR has {len(dirs)} top-level folders", flush=True)
|
|
except Exception as e:
|
|
print(f" Cannot list MUSIC_DIR: {e}", flush=True)
|
|
build_file_index()
|
|
scan_library()
|
|
yield
|
|
|
|
app = FastAPI(title="Navidrome Companion API", lifespan=lifespan)
|
|
|
|
|
|
# ── Pydantic models ──────────────────────────────────────────────────────────
|
|
|
|
class MetadataUpdate(BaseModel):
|
|
relative_path: str
|
|
title: Optional[str] = None
|
|
artist: Optional[str] = None
|
|
album: Optional[str] = None
|
|
album_artist: Optional[str] = None
|
|
genre: Optional[str] = None
|
|
year: Optional[int] = None
|
|
track_number: Optional[int] = None
|
|
|
|
class BatchMetadataUpdate(BaseModel):
|
|
relative_paths: List[str]
|
|
title: Optional[str] = None
|
|
artist: Optional[str] = None
|
|
album: Optional[str] = None
|
|
album_artist: Optional[str] = None
|
|
genre: Optional[str] = None
|
|
year: Optional[int] = None
|
|
|
|
class FixConflictRequest(BaseModel):
|
|
action: str
|
|
fix_data: dict = {}
|
|
|
|
|
|
class TrackUploadMeta(BaseModel):
|
|
filename: str
|
|
title: str
|
|
artist: str
|
|
album: str
|
|
track_number: Optional[int] = None
|
|
genre: Optional[str] = None
|
|
year: Optional[int] = None
|
|
album_artist: Optional[str] = None
|
|
|
|
|
|
# ── Push manager ─────────────────────────────────────────────────────────────
|
|
|
|
class PushManager:
|
|
def __init__(self):
|
|
self.connections: list[WebSocket] = []
|
|
|
|
async def connect(self, ws: WebSocket):
|
|
await ws.accept()
|
|
self.connections.append(ws)
|
|
print(f"Client connected ({len(self.connections)} total)")
|
|
|
|
def disconnect(self, ws: WebSocket):
|
|
if ws in self.connections:
|
|
self.connections.remove(ws)
|
|
print(f"Client disconnected ({len(self.connections)} total)")
|
|
|
|
async def broadcast(self, event: str, data: dict):
|
|
msg = json.dumps({"event": event, "data": data})
|
|
dead = []
|
|
for ws in self.connections:
|
|
try:
|
|
await ws.send_text(msg)
|
|
except Exception:
|
|
dead.append(ws)
|
|
for ws in dead:
|
|
self.connections.remove(ws)
|
|
|
|
async def send_to(self, ws: WebSocket, event: str, data: dict):
|
|
await ws.send_text(json.dumps({"event": event, "data": data}))
|
|
|
|
push = PushManager()
|
|
|
|
|
|
# ── Path resolution ──────────────────────────────────────────────────────────
|
|
|
|
def resolve_path(relative: str) -> Optional[str]:
|
|
"""
|
|
Resolve a Navidrome-relative path to an absolute filesystem path.
|
|
|
|
Resolution order:
|
|
1. Direct join with MUSIC_DIR -- works when paths match exactly
|
|
2. Strip leading path components -- handles sub-library prefixes
|
|
3. Companion songs table lookup by relative_path -- handles Picard
|
|
renames where Navidrome path no longer matches disk structure.
|
|
This is the key fix: uses album+artist context so two files with
|
|
the same title (e.g. 'In All the Wrong Places') resolve correctly.
|
|
4. Exact filename match on disk -- last resort before fuzzy
|
|
5. Fuzzy title match -- lowest confidence, only when nothing else works
|
|
"""
|
|
decoded = relative
|
|
for _ in range(5):
|
|
next_d = unquote(decoded)
|
|
if next_d == decoded:
|
|
break
|
|
decoded = next_d
|
|
cleaned = decoded.lstrip("/")
|
|
print(f" resolve_path: '{relative}' -> '{cleaned}'", flush=True)
|
|
|
|
# 1. Direct path join
|
|
direct = os.path.join(MUSIC_DIR, cleaned)
|
|
if os.path.isfile(direct):
|
|
return direct
|
|
|
|
# 2. Strip leading path components (handles library folder prefix)
|
|
parts = Path(cleaned).parts
|
|
for i in range(1, len(parts)):
|
|
sub = os.path.join(MUSIC_DIR, *parts[i:])
|
|
if os.path.isfile(sub):
|
|
return sub
|
|
|
|
# 3. Companion songs table -- look up by relative_path.
|
|
# Also tries matching on title+album+artist to disambiguate files
|
|
# with identical names in different albums (e.g. compilation tracks).
|
|
try:
|
|
with sqlite3.connect(DB_PATH) as c:
|
|
# First try exact relative_path match
|
|
row = c.execute(
|
|
"SELECT full_path FROM songs WHERE relative_path = ?", (cleaned,)
|
|
).fetchone()
|
|
if row and os.path.isfile(row[0]):
|
|
print(f" resolve: songs table exact -> {os.path.basename(row[0])}", flush=True)
|
|
return row[0]
|
|
|
|
# Try normalised path (NFC unicode)
|
|
nfc = unicodedata.normalize("NFC", cleaned)
|
|
row = c.execute(
|
|
"SELECT full_path FROM songs WHERE relative_path = ?", (nfc,)
|
|
).fetchone()
|
|
if row and os.path.isfile(row[0]):
|
|
print(f" resolve: songs table NFC -> {os.path.basename(row[0])}", flush=True)
|
|
return row[0]
|
|
|
|
# Extract context from path: Artist/Album/filename
|
|
path_parts = Path(cleaned).parts # e.g. ('Artist', 'Album', 'file.flac')
|
|
if len(path_parts) >= 3:
|
|
path_artist = path_parts[0]
|
|
path_album = path_parts[1]
|
|
filename = path_parts[-1]
|
|
stem = Path(filename).stem.lower()
|
|
ext = Path(filename).suffix.lower()
|
|
# Strip leading track number from stem for matching
|
|
clean_stem = re.sub(r'^\d{1,2}[-\s\.]+\d{0,2}[-\s\.]*', '', stem).strip()
|
|
clean_stem = re.sub(r'^\d{1,2}[-\s\.]+', '', clean_stem).strip()
|
|
|
|
# Match by title similarity + artist/album folder context
|
|
rows = c.execute(
|
|
"""SELECT full_path, relative_path FROM songs
|
|
WHERE full_path LIKE ? AND sort_title LIKE ?""",
|
|
(f'%{ext}', f'%{clean_stem[:6]}%')
|
|
).fetchall()
|
|
best_fp, best_score = None, 0.0
|
|
for fp, rp in rows:
|
|
rp_parts = Path(rp).parts
|
|
if len(rp_parts) < 2:
|
|
continue
|
|
# Score: title match + artist folder match + album folder match
|
|
score = 0.0
|
|
rp_stem = re.sub(r'^\d{1,2}[-\s\.]+', '', Path(rp).stem.lower()).strip()
|
|
if clean_stem and rp_stem:
|
|
words_q = set(re.split(r'[\s\-_\.]+', clean_stem))
|
|
words_r = set(re.split(r'[\s\-_\.]+', rp_stem))
|
|
if words_q:
|
|
score += len(words_q & words_r) / len(words_q) * 0.6
|
|
# Bonus for matching artist folder
|
|
if path_artist.lower()[:4] in rp_parts[0].lower():
|
|
score += 0.2
|
|
# Bonus for matching album folder
|
|
if path_album.lower()[:4] in (rp_parts[1].lower() if len(rp_parts) > 1 else ''):
|
|
score += 0.2
|
|
if score > best_score and os.path.isfile(fp):
|
|
best_score = score
|
|
best_fp = fp
|
|
if best_fp and best_score >= 0.7:
|
|
print(f" resolve: songs table context ({best_score:.0%}) -> {os.path.basename(best_fp)}", flush=True)
|
|
return best_fp
|
|
|
|
except Exception as e:
|
|
print(f" resolve: songs table error: {e}", flush=True)
|
|
|
|
# 4. Exact filename match on disk
|
|
target = os.path.basename(cleaned)
|
|
if target:
|
|
for root, _, files in os.walk(MUSIC_DIR):
|
|
if target in files:
|
|
found = os.path.join(root, target)
|
|
print(f" resolve: exact filename -> {found}", flush=True)
|
|
return found
|
|
|
|
# 5. Fuzzy title match (lowest confidence -- last resort)
|
|
target_stem = Path(target).stem.lower() if target else ""
|
|
target_ext = Path(target).suffix.lower() if target else ""
|
|
title_part = re.sub(r'^\d+[\s\.\-]+', '', target_stem).strip()
|
|
words = {w for w in re.split(r'[\s\-_\.]+', title_part)
|
|
if len(w) > 1 and not w.isdigit()}
|
|
if words:
|
|
try:
|
|
with sqlite3.connect(DB_PATH) as c:
|
|
rows = c.execute(
|
|
"SELECT basename, full_path, title_words FROM file_index"
|
|
).fetchall()
|
|
best, best_score = None, 0.0
|
|
for basename, full_path, title_words_str in rows:
|
|
if not basename.endswith(target_ext):
|
|
continue
|
|
fw = set(title_words_str.split())
|
|
score = len(words & fw) / len(words) if fw else 0
|
|
if score > best_score:
|
|
best_score = score
|
|
best = full_path
|
|
if best and best_score >= 0.5:
|
|
print(f" resolve: fuzzy ({best_score:.0%}) -> {os.path.basename(best)}", flush=True)
|
|
return best
|
|
except Exception as e:
|
|
print(f" resolve: fuzzy error: {e}", flush=True)
|
|
|
|
print(f" resolve_path: FAILED for '{cleaned}'", flush=True)
|
|
return None
|
|
|
|
|
|
# ── Navidrome helpers ────────────────────────────────────────────────────────
|
|
|
|
async def trigger_scan():
|
|
if not all([SUBSONIC_USER, SUBSONIC_TOKEN, SUBSONIC_SALT]):
|
|
print("Subsonic credentials not set - skipping scan")
|
|
return
|
|
params = {"u": SUBSONIC_USER, "t": SUBSONIC_TOKEN, "s": SUBSONIC_SALT,
|
|
"v": "1.16.1", "c": "CompanionAPI", "f": "json"}
|
|
async with httpx.AsyncClient() as client:
|
|
try:
|
|
r = await client.get(f"{NAVIDROME_URL}/rest/startScan.view", params=params)
|
|
print(f"Navidrome scan: {r.status_code}")
|
|
except Exception as e:
|
|
print(f"Scan failed: {e}")
|
|
|
|
|
|
async def sync_navidrome_ids_task():
|
|
"""
|
|
Fetch all songs from Navidrome and match them into our songs table.
|
|
|
|
Matching strategy (tried in order per song):
|
|
1. title + artist -- primary, both read from same ID3 tags
|
|
2. title + album -- fallback when artist field differs
|
|
3. title only -- fallback for unique titles
|
|
4. duration bucket -- last resort (±2s tolerance, unique per bucket)
|
|
"""
|
|
try:
|
|
if not all([SUBSONIC_USER, SUBSONIC_TOKEN, SUBSONIC_SALT]):
|
|
print("Subsonic credentials not set - cannot sync IDs")
|
|
return
|
|
print(f"Syncing Navidrome IDs... URL={NAVIDROME_URL}", flush=True)
|
|
|
|
# ── Fetch all songs from Navidrome ────────────────────────────────────
|
|
base_params = {
|
|
"u": SUBSONIC_USER, "t": SUBSONIC_TOKEN, "s": SUBSONIC_SALT,
|
|
"v": "1.16.1", "c": "CompanionAPI", "f": "json",
|
|
"albumCount": 0, "artistCount": 0, "songCount": 500, "query": ""
|
|
}
|
|
all_songs = []
|
|
offset = 0
|
|
async with httpx.AsyncClient(timeout=60) as client:
|
|
while True:
|
|
try:
|
|
r = await client.get(
|
|
f"{NAVIDROME_URL}/rest/search3.view",
|
|
params={**base_params, "songOffset": offset}
|
|
)
|
|
resp = r.json().get("subsonic-response", {})
|
|
if resp.get("status") == "failed":
|
|
print(f" Navidrome auth error: {resp.get('error')}", flush=True)
|
|
return
|
|
songs = resp.get("searchResult3", {}).get("song", [])
|
|
print(f" Page offset={offset}: {len(songs)} songs", flush=True)
|
|
if not songs:
|
|
break
|
|
all_songs.extend(songs)
|
|
offset += len(songs)
|
|
if len(songs) < 500:
|
|
break
|
|
except Exception as e:
|
|
print(f" Navidrome fetch error: {e}", flush=True)
|
|
break
|
|
|
|
print(f" Navidrome total: {len(all_songs)} songs", flush=True)
|
|
if not all_songs:
|
|
return
|
|
|
|
# ── Show first 3 Navidrome songs for diagnosis ────────────────────────
|
|
for ns in all_songs[:3]:
|
|
print(f" ND sample: title={repr(ns.get('title',''))} "
|
|
f"artist={repr(ns.get('artist',''))} "
|
|
f"album={repr(ns.get('album',''))} "
|
|
f"duration={ns.get('duration')}", flush=True)
|
|
|
|
# ── Build lookup tables from our DB ───────────────────────────────────
|
|
def norm(s: str) -> str:
|
|
"""Lowercase, strip, NFC-normalize."""
|
|
return unicodedata.normalize("NFC", (s or "").lower().strip())
|
|
|
|
def clean_title(s: str) -> str:
|
|
"""Strip leading track/disc number prefix from filename-derived titles.
|
|
e.g. '09 Careless' -> 'careless', '01-02 Song' -> 'song'
|
|
"""
|
|
s = norm(s)
|
|
# Strip patterns like "09 ", "09 - ", "1-02 ", "01. " etc
|
|
s = re.sub(r'^\d{1,2}[-\s\.]+\d{0,2}[-\s\.]*', '', s).strip()
|
|
s = re.sub(r'^\d{1,2}[-\s\.]+', '', s).strip()
|
|
return s
|
|
|
|
def dur_bucket(seconds) -> Optional[int]:
|
|
"""Round to nearest 2-second bucket for fuzzy duration matching."""
|
|
if seconds is None:
|
|
return None
|
|
return int(round(float(seconds) / 2.0))
|
|
|
|
with sqlite3.connect(DB_PATH) as c:
|
|
cur = c.cursor()
|
|
db_rows = cur.execute(
|
|
"SELECT id, title, artist, album, duration FROM songs"
|
|
).fetchall()
|
|
|
|
total_db = len(db_rows)
|
|
print(f" DB songs total: {total_db}", flush=True)
|
|
|
|
# Show first 3 DB songs for comparison
|
|
for row in db_rows[:3]:
|
|
print(f" DB sample: title={repr(row[1])} "
|
|
f"artist={repr(row[2])} "
|
|
f"album={repr(row[3])} "
|
|
f"duration={row[4]}", flush=True)
|
|
|
|
# Strategy 1: title + artist
|
|
by_title_artist: dict[tuple, str] = {}
|
|
# Strategy 2: title + album
|
|
by_title_album: dict[tuple, str] = {}
|
|
# Strategy 3: title only (only stored if unique)
|
|
by_title: dict[str, Optional[str]] = {}
|
|
# Strategy 4: duration bucket + first 8 chars of title (unique)
|
|
by_dur: dict[tuple, Optional[str]] = {}
|
|
# Strategy 5: clean_title + artist (strips track number prefix)
|
|
by_clean_artist: dict[tuple, Optional[str]] = {}
|
|
# Strategy 6: duration only within ±2s (unique per bucket)
|
|
by_dur_only: dict[int, Optional[str]] = {}
|
|
|
|
for song_id, title, artist, album, duration in db_rows:
|
|
nt = norm(title)
|
|
na = norm(artist)
|
|
nb = norm(album)
|
|
ct = clean_title(title)
|
|
dk = dur_bucket(duration)
|
|
|
|
k1 = (nt, na)
|
|
if k1 not in by_title_artist:
|
|
by_title_artist[k1] = song_id
|
|
|
|
k2 = (nt, nb)
|
|
if k2 not in by_title_album:
|
|
by_title_album[k2] = song_id
|
|
|
|
if nt in by_title:
|
|
by_title[nt] = None
|
|
else:
|
|
by_title[nt] = song_id
|
|
|
|
if dk is not None:
|
|
k4 = (dk, nt[:8])
|
|
if k4 in by_dur:
|
|
by_dur[k4] = None
|
|
else:
|
|
by_dur[k4] = song_id
|
|
|
|
k5 = (ct, na)
|
|
if k5 not in by_clean_artist:
|
|
if k5 in by_clean_artist:
|
|
by_clean_artist[k5] = None
|
|
else:
|
|
by_clean_artist[k5] = song_id
|
|
else:
|
|
by_clean_artist[k5] = None # ambiguous
|
|
|
|
if dk is not None:
|
|
if dk in by_dur_only:
|
|
by_dur_only[dk] = None
|
|
else:
|
|
by_dur_only[dk] = song_id
|
|
|
|
# Strategy 7: clean_title + duration bucket (catches untagged files
|
|
# where artist is missing but filename title + duration uniquely identify the song)
|
|
by_clean_dur: dict[tuple, Optional[str]] = {}
|
|
|
|
for song_id, title, artist, album, duration in db_rows:
|
|
ct = clean_title(title)
|
|
dk = dur_bucket(duration)
|
|
if dk is not None:
|
|
k7 = (ct, dk)
|
|
if k7 in by_clean_dur:
|
|
by_clean_dur[k7] = None # ambiguous
|
|
else:
|
|
by_clean_dur[k7] = song_id
|
|
|
|
print(f" Lookups: title+artist={len(by_title_artist)} "
|
|
f"title+album={len(by_title_album)} "
|
|
f"title_only={sum(1 for v in by_title.values() if v)} "
|
|
f"duration={sum(1 for v in by_dur.values() if v)} "
|
|
f"clean+artist={sum(1 for v in by_clean_artist.values() if v)} "
|
|
f"dur_only={sum(1 for v in by_dur_only.values() if v)} "
|
|
f"clean+dur={sum(1 for v in by_clean_dur.values() if v)}", flush=True)
|
|
|
|
matched_s1 = matched_s2 = matched_s3 = matched_s4 = 0
|
|
matched_s5 = matched_s6 = matched_s7 = unmatched = 0
|
|
unmatched_samples = []
|
|
|
|
with sqlite3.connect(DB_PATH) as c:
|
|
cur = c.cursor()
|
|
for ns in all_songs:
|
|
nd_id = ns.get("id", "")
|
|
if not nd_id:
|
|
continue
|
|
|
|
nt = norm(ns.get("title", ""))
|
|
na = norm(ns.get("artist", ""))
|
|
nb = norm(ns.get("album", ""))
|
|
ct = clean_title(ns.get("title", ""))
|
|
dk = dur_bucket(ns.get("duration"))
|
|
|
|
db_song_id = None
|
|
strategy = 0
|
|
|
|
if not db_song_id:
|
|
hit = by_title_artist.get((nt, na))
|
|
if hit: db_song_id, strategy = hit, 1
|
|
|
|
if not db_song_id:
|
|
hit = by_title_album.get((nt, nb))
|
|
if hit: db_song_id, strategy = hit, 2
|
|
|
|
if not db_song_id:
|
|
hit = by_title.get(nt)
|
|
if hit: db_song_id, strategy = hit, 3
|
|
|
|
if not db_song_id and dk is not None:
|
|
hit = by_dur.get((dk, nt[:8]))
|
|
if hit: db_song_id, strategy = hit, 4
|
|
|
|
if not db_song_id:
|
|
hit = by_clean_artist.get((ct, na))
|
|
if hit: db_song_id, strategy = hit, 5
|
|
|
|
if not db_song_id and dk is not None:
|
|
hit = by_dur_only.get(dk)
|
|
if hit: db_song_id, strategy = hit, 6
|
|
|
|
# S7: clean title + duration bucket -- catches untagged AIFF/files
|
|
# where artist is unknown but filename+duration uniquely identify song
|
|
if not db_song_id and dk is not None:
|
|
hit = by_clean_dur.get((ct, dk))
|
|
if hit: db_song_id, strategy = hit, 7
|
|
|
|
if db_song_id:
|
|
cur.execute("UPDATE songs SET navidrome_id = ? WHERE id = ?",
|
|
(nd_id, db_song_id))
|
|
if strategy == 1: matched_s1 += 1
|
|
elif strategy == 2: matched_s2 += 1
|
|
elif strategy == 3: matched_s3 += 1
|
|
elif strategy == 4: matched_s4 += 1
|
|
elif strategy == 5: matched_s5 += 1
|
|
elif strategy == 6: matched_s6 += 1
|
|
else: matched_s7 += 1
|
|
else:
|
|
unmatched += 1
|
|
if len(unmatched_samples) < 10:
|
|
unmatched_samples.append(
|
|
f"title={repr(ns.get('title',''))} "
|
|
f"artist={repr(ns.get('artist',''))} "
|
|
f"duration={ns.get('duration')}"
|
|
)
|
|
|
|
total_matched = matched_s1+matched_s2+matched_s3+matched_s4+matched_s5+matched_s6+matched_s7
|
|
print(f"Navidrome ID sync complete: {total_matched}/{len(all_songs)} matched", flush=True)
|
|
print(f" Strategy breakdown: "
|
|
f"title+artist={matched_s1} title+album={matched_s2} "
|
|
f"title_only={matched_s3} dur+prefix={matched_s4} "
|
|
f"clean+artist={matched_s5} dur_only={matched_s6} "
|
|
f"clean+dur={matched_s7} unmatched={unmatched}", flush=True)
|
|
if unmatched_samples:
|
|
print(f" Unmatched samples:", flush=True)
|
|
for s in unmatched_samples:
|
|
print(f" {s}", flush=True)
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
print(f"sync_navidrome_ids_task FAILED: {e}", flush=True)
|
|
traceback.print_exc()
|
|
|
|
|
|
# ── Metadata helpers ─────────────────────────────────────────────────────────
|
|
|
|
def sanitize(name: str) -> str:
|
|
"""Strip characters that are unsafe in folder/file names."""
|
|
# Remove: / : * ? " < > | \ and control characters
|
|
cleaned = re.sub(r'[/\\:*?"<>|]', '', name)
|
|
# Collapse multiple spaces, strip leading/trailing whitespace and dots
|
|
cleaned = re.sub(r'\s+', ' ', cleaned).strip().strip('.')
|
|
return cleaned or 'Unknown'
|
|
|
|
|
|
def build_target_path(full_path: str) -> Optional[str]:
|
|
"""
|
|
Compute the canonical target path for a file based on its current ID3 tags.
|
|
|
|
Structure:
|
|
MUSIC_DIR/Album Artist/Album/TT Song Title.ext (single disc)
|
|
MUSIC_DIR/Album Artist/Album/DD-TT Song Title.ext (multi disc)
|
|
|
|
- Album Artist tag is used for the top-level folder.
|
|
Falls back to Artist if Album Artist is blank.
|
|
- Track number zero-padded to 2 digits.
|
|
- Disc prefix only when disc > 1.
|
|
- Unsafe characters stripped from all components.
|
|
"""
|
|
try:
|
|
audio = MutagenFile(full_path, easy=True)
|
|
if audio is None:
|
|
return None
|
|
|
|
def get(key):
|
|
return audio[key][0] if key in audio and audio[key] else ''
|
|
|
|
title = sanitize(get('title') or Path(full_path).stem)
|
|
album_artist = sanitize(get('albumartist') or get('artist') or 'Unknown Artist')
|
|
album = sanitize(get('album') or 'Unknown Album')
|
|
ext = Path(full_path).suffix.lower()
|
|
|
|
# Track number -- strip /total if present (e.g. "3/12" -> 3)
|
|
track_num = 0
|
|
raw_track = get('tracknumber')
|
|
if raw_track:
|
|
m = re.match(r'(\d+)', raw_track)
|
|
if m:
|
|
track_num = int(m.group(1))
|
|
|
|
# Disc number
|
|
disc_num = 0
|
|
raw_disc = get('discnumber')
|
|
if raw_disc:
|
|
m = re.match(r'(\d+)', raw_disc)
|
|
if m:
|
|
disc_num = int(m.group(1))
|
|
|
|
# Build track prefix
|
|
if disc_num > 1:
|
|
prefix = f"{disc_num:02d}-{track_num:02d}"
|
|
elif track_num > 0:
|
|
prefix = f"{track_num:02d}"
|
|
else:
|
|
prefix = ''
|
|
|
|
filename = f"{prefix} {title}{ext}".strip() if prefix else f"{title}{ext}"
|
|
target = os.path.join(MUSIC_DIR, album_artist, album, filename)
|
|
return target
|
|
|
|
except Exception as e:
|
|
print(f" build_target_path error for {os.path.basename(full_path)}: {e}", flush=True)
|
|
return None
|
|
|
|
|
|
def restructure_file(full_path: str) -> Optional[str]:
|
|
"""
|
|
Move a file to its canonical location based on current ID3 tags.
|
|
Updates the songs DB row with the new path.
|
|
Removes empty directories left behind.
|
|
Returns the new full path, or None if no move was needed / failed.
|
|
"""
|
|
target = build_target_path(full_path)
|
|
if not target:
|
|
return None
|
|
|
|
# Already in the right place
|
|
if os.path.normpath(full_path) == os.path.normpath(target):
|
|
return None
|
|
|
|
# Avoid overwriting a different existing file
|
|
if os.path.exists(target) and os.path.abspath(target) != os.path.abspath(full_path):
|
|
print(f" restructure: target exists, skipping: {os.path.basename(target)}", flush=True)
|
|
return None
|
|
|
|
try:
|
|
os.makedirs(os.path.dirname(target), exist_ok=True)
|
|
shutil.move(full_path, target)
|
|
print(f" restructure: {os.path.relpath(full_path, MUSIC_DIR)}"
|
|
f" -> {os.path.relpath(target, MUSIC_DIR)}", flush=True)
|
|
|
|
# Remove empty directories left behind (walk up, stop at MUSIC_DIR)
|
|
old_dir = os.path.dirname(full_path)
|
|
while old_dir != MUSIC_DIR and os.path.isdir(old_dir):
|
|
if not os.listdir(old_dir):
|
|
os.rmdir(old_dir)
|
|
old_dir = os.path.dirname(old_dir)
|
|
else:
|
|
break
|
|
|
|
# Update DB with new path -- re-read tags for accurate sort keys
|
|
new_relative = os.path.relpath(target, MUSIC_DIR)
|
|
song_id = hashlib.md5(full_path.encode()).hexdigest()
|
|
new_id = hashlib.md5(target.encode()).hexdigest()
|
|
tags = read_tags(target)
|
|
with sqlite3.connect(DB_PATH) as c:
|
|
cur = c.cursor()
|
|
cur.execute("""UPDATE songs SET
|
|
id=?, full_path=?, relative_path=?,
|
|
sort_title=?, sort_artist=?, sort_album=?, sort_album_artist=?,
|
|
file_mtime=?, date_modified=?
|
|
WHERE id=?""", (
|
|
new_id, target, new_relative,
|
|
sort_key(tags['title']),
|
|
sort_key(tags['artist']),
|
|
sort_key(tags['album']),
|
|
sort_key(tags['album_artist']),
|
|
os.stat(target).st_mtime,
|
|
datetime.utcnow().isoformat(),
|
|
song_id
|
|
))
|
|
if cur.rowcount == 0:
|
|
# Row used old full_path as key -- try matching by path
|
|
cur.execute("""UPDATE songs SET
|
|
id=?, full_path=?, relative_path=?,
|
|
sort_title=?, sort_artist=?, sort_album=?, sort_album_artist=?,
|
|
file_mtime=?, date_modified=?
|
|
WHERE full_path=?""", (
|
|
new_id, target, new_relative,
|
|
sort_key(tags['title']),
|
|
sort_key(tags['artist']),
|
|
sort_key(tags['album']),
|
|
sort_key(tags['album_artist']),
|
|
os.stat(target).st_mtime,
|
|
datetime.utcnow().isoformat(),
|
|
full_path
|
|
))
|
|
|
|
return target
|
|
|
|
except Exception as e:
|
|
print(f" restructure FAILED for {os.path.basename(full_path)}: {e}", flush=True)
|
|
return None
|
|
|
|
|
|
def restructure_all() -> dict:
|
|
"""Restructure every file in the library to match its tags. Used by bulk-fix."""
|
|
print("Restructuring library...", flush=True)
|
|
moved = 0
|
|
skipped = 0
|
|
failed = 0
|
|
with sqlite3.connect(DB_PATH) as c:
|
|
rows = c.execute("SELECT full_path FROM songs").fetchall()
|
|
for (full_path,) in rows:
|
|
if not os.path.isfile(full_path):
|
|
skipped += 1
|
|
continue
|
|
result = restructure_file(full_path)
|
|
if result:
|
|
moved += 1
|
|
else:
|
|
skipped += 1
|
|
print(f"Restructure complete: {moved} moved, {skipped} skipped, {failed} failed", flush=True)
|
|
return {"moved": moved, "skipped": skipped, "failed": failed}
|
|
audio = MutagenFile(path, easy=True)
|
|
if audio is None:
|
|
raise ValueError(f"Unsupported format: {path}")
|
|
if u.title: audio['title'] = u.title
|
|
if u.artist: audio['artist'] = u.artist
|
|
if u.album: audio['album'] = u.album
|
|
if u.album_artist: audio['albumartist'] = u.album_artist
|
|
if u.genre: audio['genre'] = u.genre
|
|
if u.year: audio['date'] = str(u.year)
|
|
if u.track_number: audio['tracknumber'] = str(u.track_number)
|
|
audio.save()
|
|
|
|
|
|
def apply_tags(path: str, u):
|
|
# For FLAC files, clean up all legacy albumartist variants written by Picard
|
|
# before writing. easy=True only writes lowercase 'albumartist' but Navidrome
|
|
# reads ALBUMARTIST (uppercase) first -- so old values stick if not removed.
|
|
ext = Path(path).suffix.lower()
|
|
if ext == '.flac' and u.album_artist:
|
|
try:
|
|
from mutagen.flac import FLAC
|
|
raw = FLAC(path)
|
|
for variant in ['ALBUM ARTIST', 'ALBUM_ARTIST', 'ALBUMARTIST', 'albumartist']:
|
|
if variant in raw:
|
|
del raw[variant]
|
|
raw['ALBUMARTIST'] = u.album_artist
|
|
raw.save()
|
|
except Exception as e:
|
|
print(f" FLAC albumartist cleanup failed for {os.path.basename(path)}: {e}", flush=True)
|
|
|
|
audio = MutagenFile(path, easy=True)
|
|
if audio is None:
|
|
raise ValueError(f"Unsupported format: {path}")
|
|
if u.title: audio['title'] = u.title
|
|
if u.artist: audio['artist'] = u.artist
|
|
if u.album: audio['album'] = u.album
|
|
if u.album_artist: audio['albumartist'] = u.album_artist
|
|
if u.genre: audio['genre'] = u.genre
|
|
if u.year: audio['date'] = str(u.year)
|
|
if u.track_number: audio['tracknumber'] = str(u.track_number)
|
|
audio.save()
|
|
|
|
|
|
def apply_tags_dict(path: str, tags: dict):
|
|
# For FLAC files, use raw mutagen to nuke all legacy tag variants before
|
|
# writing. Tools like Picard write ALBUM ARTIST, ALBUM_ARTIST, albumartist
|
|
# etc. -- Navidrome reads the uppercase ones and gets the wrong value if we
|
|
# only write the lowercase easy-mode key without removing the others.
|
|
ext = Path(path).suffix.lower()
|
|
if ext == '.flac' and tags.get('album_artist'):
|
|
try:
|
|
from mutagen.flac import FLAC
|
|
raw = FLAC(path)
|
|
for variant in ['ALBUM ARTIST', 'ALBUM_ARTIST', 'ALBUMARTIST', 'albumartist']:
|
|
if variant in raw:
|
|
del raw[variant]
|
|
raw['ALBUMARTIST'] = tags['album_artist']
|
|
raw.save()
|
|
except Exception as e:
|
|
print(f" FLAC albumartist cleanup failed for {os.path.basename(path)}: {e}", flush=True)
|
|
|
|
audio = MutagenFile(path, easy=True)
|
|
if audio is None:
|
|
raise ValueError(f"Unsupported format: {path}")
|
|
mapping = {'title': 'title', 'artist': 'artist', 'album': 'album',
|
|
'album_artist': 'albumartist', 'genre': 'genre',
|
|
'year': 'date', 'track_number': 'tracknumber'}
|
|
for key, tag_name in mapping.items():
|
|
val = tags.get(key)
|
|
if val is not None and val != "":
|
|
audio[tag_name] = str(val)
|
|
audio.save()
|
|
|
|
|
|
# ── Analysis ─────────────────────────────────────────────────────────────────
|
|
|
|
def analyze(full_path: str) -> dict:
|
|
import librosa
|
|
cmd = ["ffmpeg", "-hide_banner", "-i", full_path,
|
|
"-af", "silencedetect=noise=-50dB:d=0.5,ebur128", "-f", "null", "-"]
|
|
out = subprocess.run(cmd, capture_output=True, text=True, timeout=120).stderr
|
|
|
|
all_starts = re.findall(r"silence_start: ([\d\.]+)", out)
|
|
all_ends = re.findall(r"silence_end: ([\d\.]+)", out)
|
|
lu = re.search(r"I:\s+([\-\d\.]+) LUFS", out)
|
|
|
|
sil_start = float(all_starts[-1]) if all_starts else 0.0
|
|
sil_end = float(all_ends[0]) if all_ends else 0.0
|
|
loudness = float(lu.group(1)) if lu else -14.0
|
|
|
|
if sil_end > 10.0:
|
|
sil_end = 0.0
|
|
dur_m = re.search(r"Duration: (\d+):(\d+):(\d+\.\d+)", out)
|
|
if dur_m:
|
|
total = (int(dur_m.group(1)) * 3600 +
|
|
int(dur_m.group(2)) * 60 +
|
|
float(dur_m.group(3)))
|
|
if sil_start < total * 0.5:
|
|
sil_start = total
|
|
|
|
print(f" analyze: trailing={sil_start:.1f}s leading_end={sil_end:.1f}s LUFS={loudness:.1f}",
|
|
flush=True)
|
|
|
|
y, sr = librosa.load(full_path, sr=22050, duration=30)
|
|
tempo, _ = librosa.beat.beat_track(y=y, sr=sr)
|
|
del y
|
|
import gc; gc.collect()
|
|
try:
|
|
bpm = float(tempo)
|
|
except TypeError:
|
|
bpm = float(tempo[0]) if tempo is not None else 0.0
|
|
|
|
profile = {"bpm": round(bpm, 1), "silence_start": round(sil_start, 3),
|
|
"silence_end": round(sil_end, 3), "loudness_lufs": round(loudness, 1)}
|
|
with sqlite3.connect(DB_PATH) as c:
|
|
c.execute("INSERT OR REPLACE INTO dj_profiles VALUES (?,?,?,?,?,CURRENT_TIMESTAMP)",
|
|
(full_path, profile["bpm"], profile["silence_start"],
|
|
profile["silence_end"], profile["loudness_lufs"]))
|
|
return profile
|
|
|
|
|
|
# ── Visualizer (fixed: no eqBoost, uniform bin width) ────────────────────────
|
|
|
|
def gen_vis_frames(path: str, fps: float = 30.0, fft_size: int = 1024, pts: int = 20) -> list:
|
|
import librosa
|
|
y, sr = librosa.load(path, sr=22050, mono=True)
|
|
hop = max(1, int(sr / fps))
|
|
frames = []
|
|
for start in range(0, len(y) - fft_size, hop):
|
|
chunk = y[start:start + fft_size] * np.hanning(fft_size)
|
|
spec = np.sqrt(np.abs(np.fft.rfft(chunk)) / fft_size)
|
|
half = len(spec)
|
|
cutoff = min(half - 1, 90)
|
|
uniform_bw = max(1, cutoff // pts)
|
|
fp = []
|
|
for i in range(pts):
|
|
ni = (i + 1) / pts
|
|
li = np.log10(ni * 9 + 1)
|
|
cb = int(li * cutoff)
|
|
lo = max(1, cb - uniform_bw // 2)
|
|
hi = min(cutoff, cb + uniform_bw // 2)
|
|
avg = float(np.mean(spec[lo:hi + 1])) if lo <= hi < half else 0.0
|
|
fp.append(avg) # no eqBoost
|
|
frames.append(fp)
|
|
del y
|
|
import gc; gc.collect()
|
|
vals = sorted(v for f in frames for v in f if v > 0.001)
|
|
if vals:
|
|
p95 = vals[min(int(len(vals) * 0.95), len(vals) - 1)]
|
|
if p95 > 0.001:
|
|
s = 0.8 / p95
|
|
frames = [[min(1.0, v * s) for v in f] for f in frames]
|
|
return frames
|
|
|
|
|
|
def vis_cache_file(path: str) -> str:
|
|
return os.path.join(VIS_CACHE_DIR, hashlib.md5(path.encode()).hexdigest() + ".json")
|
|
|
|
|
|
def get_vis(path: str):
|
|
cf = vis_cache_file(path)
|
|
if os.path.exists(cf):
|
|
try:
|
|
with open(cf) as f:
|
|
return json.load(f)
|
|
except Exception:
|
|
pass
|
|
if not os.path.isfile(path):
|
|
return None
|
|
try:
|
|
frames = gen_vis_frames(path)
|
|
with open(cf, "w") as f:
|
|
json.dump(frames, f)
|
|
return frames
|
|
except Exception as e:
|
|
print(f"Vis gen failed for {os.path.basename(path)}: {e}")
|
|
return None
|
|
|
|
|
|
# ── Library response helpers ─────────────────────────────────────────────────
|
|
|
|
SONG_COLS = ("id,full_path,relative_path,navidrome_id,"
|
|
"title,artist,album,album_artist,genre,"
|
|
"year,track_number,disc_number,duration,"
|
|
"sort_title,sort_artist,sort_album,sort_album_artist,"
|
|
"cover_art_path,file_size,file_mtime,date_added,date_modified")
|
|
|
|
_ALLOWED_SORT = {
|
|
"title", "sort_title", "artist", "sort_artist", "album", "sort_album",
|
|
"album_artist", "sort_album_artist", "track_number", "disc_number",
|
|
"year", "duration", "genre", "date_added",
|
|
}
|
|
|
|
def song_row_to_dict(row) -> dict:
|
|
(song_id, full_path, relative_path, navidrome_id,
|
|
title, artist, album, album_artist, genre,
|
|
year, track_number, disc_number, duration,
|
|
sort_title, sort_artist, sort_album, sort_album_artist,
|
|
cover_art_path, file_size, file_mtime,
|
|
date_added, date_modified) = row
|
|
return {
|
|
"id": song_id,
|
|
"navidrome_id": navidrome_id,
|
|
"title": title,
|
|
"artist": artist,
|
|
"album": album,
|
|
"album_artist": album_artist,
|
|
"genre": genre,
|
|
"year": year,
|
|
"track_number": track_number,
|
|
"disc_number": disc_number,
|
|
"duration": duration,
|
|
"relative_path": relative_path,
|
|
"cover_art_url": f"/library/cover-art/{song_id}" if cover_art_path else None,
|
|
"sort_title": sort_title,
|
|
"sort_artist": sort_artist,
|
|
"sort_album": sort_album,
|
|
"sort_album_artist": sort_album_artist,
|
|
"date_added": date_added,
|
|
}
|
|
|
|
|
|
# =============================================================================
|
|
# ENDPOINTS - existing (behaviour unchanged)
|
|
# =============================================================================
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
pc = fc = sc = 0
|
|
try:
|
|
with sqlite3.connect(DB_PATH) as c:
|
|
pc = c.execute("SELECT COUNT(*) FROM dj_profiles").fetchone()[0]
|
|
fc = c.execute("SELECT COUNT(*) FROM file_index").fetchone()[0]
|
|
sc = c.execute("SELECT COUNT(*) FROM songs").fetchone()[0]
|
|
except Exception:
|
|
pass
|
|
vc = len(os.listdir(VIS_CACHE_DIR)) if os.path.isdir(VIS_CACHE_DIR) else 0
|
|
return {"status": "healthy", "music_dir": MUSIC_DIR,
|
|
"profiles": pc, "file_index": fc, "songs": sc,
|
|
"vis_cached": vc, "ws_clients": len(push.connections)}
|
|
|
|
|
|
@app.post("/reindex")
|
|
async def reindex():
|
|
build_file_index()
|
|
return {"status": "reindexed"}
|
|
|
|
|
|
@app.patch("/edit-metadata")
|
|
async def edit_metadata(update: MetadataUpdate):
|
|
fp = resolve_path(update.relative_path)
|
|
if not fp:
|
|
raise HTTPException(404, f"File not found. raw='{update.relative_path}' MUSIC_DIR='{MUSIC_DIR}'")
|
|
try:
|
|
apply_tags(fp, update)
|
|
# Do NOT restructure on single-track edits -- restructuring renames the
|
|
# file based on all current tags including disc number, which causes
|
|
# wrong filenames for compilation tracks (e.g. disc 2 gets 02-13 prefix).
|
|
# Restructuring is only done via the explicit bulk-fix endpoint.
|
|
update_song_in_db(fp)
|
|
new_relative = os.path.relpath(fp, MUSIC_DIR)
|
|
await trigger_scan()
|
|
await push.broadcast("metadata_updated", {
|
|
"path": new_relative,
|
|
"title": update.title or "", "artist": update.artist or "",
|
|
"album": update.album or ""
|
|
})
|
|
return {"status": "success", "file": new_relative, "resolved": fp}
|
|
except Exception as e:
|
|
import traceback
|
|
traceback.print_exc()
|
|
raise HTTPException(500, str(e))
|
|
|
|
|
|
@app.patch("/batch-edit-metadata")
|
|
async def batch_edit_metadata(update: BatchMetadataUpdate):
|
|
results = {"succeeded": [], "failed": []}
|
|
tags = {}
|
|
if update.title: tags["title"] = update.title
|
|
if update.artist: tags["artist"] = update.artist
|
|
if update.album: tags["album"] = update.album
|
|
if update.album_artist: tags["album_artist"] = update.album_artist
|
|
if update.genre: tags["genre"] = update.genre
|
|
if update.year: tags["year"] = str(update.year)
|
|
for rp in update.relative_paths:
|
|
fp = resolve_path(rp)
|
|
if not fp:
|
|
results["failed"].append({"path": rp, "error": "File not found"})
|
|
continue
|
|
try:
|
|
apply_tags_dict(fp, tags)
|
|
new_fp = restructure_file(fp) or fp
|
|
update_song_in_db(new_fp)
|
|
results["succeeded"].append(os.path.relpath(new_fp, MUSIC_DIR))
|
|
except Exception as e:
|
|
results["failed"].append({"path": rp, "error": str(e)})
|
|
await trigger_scan()
|
|
# Wait for Navidrome to finish scanning before the app re-fetches paths.
|
|
await asyncio.sleep(4)
|
|
await push.broadcast("batch_metadata_updated",
|
|
{"count": len(results["succeeded"]),
|
|
"album": update.album or "",
|
|
"paths_changed": "true"})
|
|
# Run conflict check in background after every batch edit
|
|
asyncio.create_task(_run_conflict_check_and_broadcast())
|
|
return results
|
|
|
|
|
|
@app.post("/upload-track")
|
|
async def upload_track(
|
|
file: UploadFile = File(...),
|
|
title: str = Form(...),
|
|
artist: str = Form(...),
|
|
album: str = Form(...)
|
|
):
|
|
dest = os.path.join(MUSIC_DIR, "uploads")
|
|
os.makedirs(dest, exist_ok=True)
|
|
fp = os.path.join(dest, file.filename)
|
|
with open(fp, "wb") as buf:
|
|
shutil.copyfileobj(file.file, buf)
|
|
try:
|
|
u = MetadataUpdate(relative_path=f"uploads/{file.filename}",
|
|
title=title, artist=artist, album=album)
|
|
apply_tags(fp, u)
|
|
update_song_in_db(fp)
|
|
profile = analyze(fp)
|
|
await trigger_scan()
|
|
await push.broadcast("track_uploaded",
|
|
{"filename": file.filename, "profile": json.dumps(profile)})
|
|
return {"status": "uploaded", "path": f"uploads/{file.filename}", "profile": profile}
|
|
except Exception as e:
|
|
if os.path.exists(fp):
|
|
os.remove(fp)
|
|
raise HTTPException(500, str(e))
|
|
|
|
|
|
@app.post("/upload-tracks")
|
|
async def upload_tracks(
|
|
files: List[UploadFile] = File(...),
|
|
metadata_json: str = Form(...),
|
|
cover_art: Optional[UploadFile] = File(None),
|
|
):
|
|
"""
|
|
Upload multiple audio files with per-track metadata.
|
|
Optional cover_art is saved as folder.jpg in the album directory.
|
|
"""
|
|
try:
|
|
meta_list = json.loads(metadata_json)
|
|
except json.JSONDecodeError:
|
|
raise HTTPException(400, "Invalid metadata_json")
|
|
|
|
meta_by_name = {m["filename"]: m for m in meta_list}
|
|
results = {"uploaded": [], "failed": []}
|
|
album_dir = None
|
|
|
|
for file in files:
|
|
meta = meta_by_name.get(file.filename) or {
|
|
"filename": file.filename,
|
|
"title": Path(file.filename).stem,
|
|
"artist": "Unknown Artist", "album": "Uploads"
|
|
}
|
|
artist_name = meta.get("artist", "Unknown Artist")
|
|
album_name = meta.get("album", "Uploads")
|
|
safe_artist = re.sub(r'[<>:"/\\|?*]', '_', artist_name)
|
|
safe_album = re.sub(r'[<>:"/\\|?*]', '_', album_name)
|
|
dest_dir = os.path.join(MUSIC_DIR, safe_artist, safe_album)
|
|
os.makedirs(dest_dir, exist_ok=True)
|
|
album_dir = dest_dir
|
|
fp = os.path.join(dest_dir, file.filename)
|
|
try:
|
|
with open(fp, "wb") as buf:
|
|
shutil.copyfileobj(file.file, buf)
|
|
tags = {
|
|
"title": meta.get("title", Path(file.filename).stem),
|
|
"artist": artist_name,
|
|
"album": album_name,
|
|
"album_artist": meta.get("album_artist") or artist_name,
|
|
}
|
|
if meta.get("track_number"): tags["track_number"] = str(meta["track_number"])
|
|
if meta.get("genre"): tags["genre"] = meta["genre"]
|
|
if meta.get("year"): tags["year"] = str(meta["year"])
|
|
apply_tags_dict(fp, tags)
|
|
try:
|
|
analyze(fp)
|
|
except Exception as e:
|
|
print(f" Analysis failed for {file.filename}: {e}")
|
|
update_song_in_db(fp)
|
|
results["uploaded"].append({
|
|
"filename": file.filename,
|
|
"path": os.path.relpath(fp, MUSIC_DIR)
|
|
})
|
|
except Exception as e:
|
|
if os.path.exists(fp):
|
|
os.remove(fp)
|
|
results["failed"].append({"filename": file.filename, "error": str(e)})
|
|
|
|
if cover_art and album_dir:
|
|
cover_dest = os.path.join(album_dir, "folder.jpg")
|
|
try:
|
|
with open(cover_dest, "wb") as buf:
|
|
shutil.copyfileobj(cover_art.file, buf)
|
|
with sqlite3.connect(DB_PATH) as c:
|
|
c.execute("UPDATE songs SET cover_art_path = ? WHERE full_path LIKE ?",
|
|
(cover_dest, os.path.join(album_dir, "%")))
|
|
print(f" Cover art saved: {cover_dest}", flush=True)
|
|
except Exception as e:
|
|
print(f" Cover art save failed: {e}", flush=True)
|
|
|
|
await trigger_scan()
|
|
await push.broadcast("tracks_uploaded", {
|
|
"count": len(results["uploaded"]),
|
|
"album": meta_list[0].get("album", "") if meta_list else ""
|
|
})
|
|
return results
|
|
|
|
|
|
@app.get("/smart-dj/profile")
|
|
async def get_profile(relative_path: str):
|
|
fp = resolve_path(relative_path)
|
|
if fp:
|
|
try:
|
|
with sqlite3.connect(DB_PATH) as c:
|
|
row = c.execute(
|
|
"SELECT bpm,silence_start,silence_end,loudness_lufs "
|
|
"FROM dj_profiles WHERE file_path=?", (fp,)
|
|
).fetchone()
|
|
if row:
|
|
return {"bpm": row[0], "silence_start": row[1],
|
|
"silence_end": row[2], "loudness_lufs": row[3]}
|
|
except sqlite3.OperationalError as e:
|
|
print(f"DB error: {e}", flush=True)
|
|
return analyze(fp)
|
|
|
|
decoded = relative_path
|
|
for _ in range(5):
|
|
nd = unquote(decoded)
|
|
if nd == decoded: break
|
|
decoded = nd
|
|
target = os.path.basename(decoded).lower()
|
|
if target:
|
|
try:
|
|
with sqlite3.connect(DB_PATH) as c:
|
|
rows = c.execute(
|
|
"SELECT file_path,bpm,silence_start,silence_end,loudness_lufs "
|
|
"FROM dj_profiles WHERE LOWER(file_path) LIKE ?", (f"%{target}",)
|
|
).fetchall()
|
|
if rows:
|
|
return {"bpm": rows[0][1], "silence_start": rows[0][2],
|
|
"silence_end": rows[0][3], "loudness_lufs": rows[0][4]}
|
|
except Exception as e:
|
|
print(f" DB fallback error: {e}", flush=True)
|
|
raise HTTPException(404, f"Not found. decoded='{decoded}' MUSIC_DIR='{MUSIC_DIR}'")
|
|
|
|
|
|
@app.get("/smart-dj/bulk-profiles")
|
|
async def bulk_profiles(paths: str = Query(...)):
|
|
results = {}
|
|
for rp in (p.strip() for p in paths.split(",") if p.strip()):
|
|
fp = resolve_path(rp)
|
|
if not fp:
|
|
results[rp] = None
|
|
continue
|
|
try:
|
|
with sqlite3.connect(DB_PATH) as c:
|
|
row = c.execute(
|
|
"SELECT bpm,silence_start,silence_end,loudness_lufs "
|
|
"FROM dj_profiles WHERE file_path=?", (fp,)
|
|
).fetchone()
|
|
results[rp] = ({"bpm": row[0], "silence_start": row[1],
|
|
"silence_end": row[2], "loudness_lufs": row[3]}
|
|
if row else analyze(fp))
|
|
except Exception:
|
|
results[rp] = None
|
|
return results
|
|
|
|
|
|
@app.get("/visualizer/frames")
|
|
async def vis_frames(relative_path: str):
|
|
fp = resolve_path(relative_path)
|
|
if not fp:
|
|
raise HTTPException(404, "Not found")
|
|
frames = get_vis(fp)
|
|
if not frames:
|
|
raise HTTPException(500, "Generation failed")
|
|
return {"frame_count": len(frames), "fps": 30.0, "points": 20, "frames": frames}
|
|
|
|
|
|
@app.post("/visualizer/precompute")
|
|
async def precompute(background_tasks: BackgroundTasks, relative_path: str = ""):
|
|
def compute_all():
|
|
n = 0
|
|
for root, _, files in os.walk(MUSIC_DIR):
|
|
for f in files:
|
|
if f.lower().endswith(AUDIO_EXTS):
|
|
fp = os.path.join(root, f)
|
|
if not os.path.exists(vis_cache_file(fp)):
|
|
try:
|
|
get_vis(fp)
|
|
n += 1
|
|
except Exception:
|
|
pass
|
|
print(f"Pre-computed {n} vis caches")
|
|
if relative_path:
|
|
background_tasks.add_task(lambda: get_vis(resolve_path(relative_path) or ""))
|
|
return {"message": f"Computing: {relative_path}"}
|
|
background_tasks.add_task(compute_all)
|
|
return {"message": "Background vis computation started"}
|
|
|
|
|
|
@app.post("/bulk-fix")
|
|
async def bulk_fix(background_tasks: BackgroundTasks):
|
|
async def run():
|
|
result = restructure_all()
|
|
await trigger_scan()
|
|
await push.broadcast("library_restructured", result)
|
|
background_tasks.add_task(run)
|
|
return {"message": "Library restructure started"}
|
|
|
|
|
|
# =============================================================================
|
|
# ENDPOINTS - Phase 1: Library Database
|
|
# =============================================================================
|
|
|
|
@app.post("/library/scan")
|
|
async def library_scan(background_tasks: BackgroundTasks, full: bool = False):
|
|
"""Trigger a library rescan. ?full=true forces re-read of every file."""
|
|
background_tasks.add_task(scan_library, full)
|
|
return {"message": f"Library scan started (full={full})"}
|
|
|
|
|
|
@app.post("/library/sync-navidrome-ids")
|
|
async def sync_navidrome(background_tasks: BackgroundTasks):
|
|
"""Match our songs table to Navidrome IDs so the iOS app can stream."""
|
|
background_tasks.add_task(sync_navidrome_ids_task)
|
|
return {"message": "Navidrome ID sync started"}
|
|
|
|
|
|
@app.get("/library/songs")
|
|
async def library_songs(
|
|
page: int = Query(0, ge=0),
|
|
per_page: int = Query(100, ge=1, le=500),
|
|
sort: str = Query("sort_album,disc_number,track_number"),
|
|
album: Optional[str] = Query(None),
|
|
artist: Optional[str] = Query(None),
|
|
album_artist: Optional[str] = Query(None),
|
|
genre: Optional[str] = Query(None),
|
|
year: Optional[int] = Query(None),
|
|
):
|
|
order_parts = []
|
|
for col in sort.split(","):
|
|
col = col.strip()
|
|
desc = col.startswith("-")
|
|
name = col.lstrip("-")
|
|
if name in _ALLOWED_SORT:
|
|
order_parts.append(f"{name} {'DESC' if desc else 'ASC'}")
|
|
order = ", ".join(order_parts) or "sort_album, disc_number, track_number"
|
|
|
|
wheres, params = [], []
|
|
if album: wheres.append("album = ?"); params.append(album)
|
|
if artist: wheres.append("artist = ?"); params.append(artist)
|
|
if album_artist: wheres.append("album_artist = ?"); params.append(album_artist)
|
|
if genre: wheres.append("genre = ?"); params.append(genre)
|
|
if year: wheres.append("year = ?"); params.append(year)
|
|
where = f"WHERE {' AND '.join(wheres)}" if wheres else ""
|
|
|
|
with sqlite3.connect(DB_PATH) as c:
|
|
total = c.execute(f"SELECT COUNT(*) FROM songs {where}", params).fetchone()[0]
|
|
rows = c.execute(
|
|
f"SELECT {SONG_COLS} FROM songs {where} ORDER BY {order} LIMIT ? OFFSET ?",
|
|
params + [per_page, page * per_page]
|
|
).fetchall()
|
|
|
|
return {"total": total, "page": page, "per_page": per_page,
|
|
"songs": [song_row_to_dict(r) for r in rows]}
|
|
|
|
|
|
@app.get("/library/albums")
|
|
async def library_albums(
|
|
artist: Optional[str] = Query(None),
|
|
album_artist: Optional[str] = Query(None),
|
|
genre: Optional[str] = Query(None),
|
|
):
|
|
wheres, params = [], []
|
|
if artist: wheres.append("artist = ?"); params.append(artist)
|
|
if album_artist: wheres.append("album_artist = ?"); params.append(album_artist)
|
|
if genre: wheres.append("genre = ?"); params.append(genre)
|
|
where = f"WHERE {' AND '.join(wheres)}" if wheres else ""
|
|
|
|
with sqlite3.connect(DB_PATH) as c:
|
|
rows = c.execute(f"""
|
|
SELECT album, album_artist, sort_album, sort_album_artist,
|
|
MIN(year) as year, COUNT(*) as track_count,
|
|
MAX(cover_art_path) as cover_art_path, MIN(id) as rep_id
|
|
FROM songs {where}
|
|
GROUP BY album, album_artist
|
|
ORDER BY sort_album_artist, sort_album
|
|
""", params).fetchall()
|
|
|
|
albums = []
|
|
for album, aa, sort_alb, sort_aa, year, tc, cover_path, rep_id in rows:
|
|
albums.append({
|
|
"album": album,
|
|
"album_artist": aa,
|
|
"sort_album": sort_alb,
|
|
"sort_album_artist": sort_aa,
|
|
"year": year,
|
|
"track_count": tc,
|
|
"cover_art_url": f"/library/cover-art/{rep_id}" if cover_path else None,
|
|
})
|
|
return {"total": len(albums), "albums": albums}
|
|
|
|
|
|
@app.get("/library/artists")
|
|
async def library_artists():
|
|
with sqlite3.connect(DB_PATH) as c:
|
|
rows = c.execute("""
|
|
SELECT artist, sort_artist,
|
|
COUNT(*) as track_count, COUNT(DISTINCT album) as album_count
|
|
FROM songs GROUP BY artist ORDER BY sort_artist
|
|
""").fetchall()
|
|
photos = {r[0]: r[1] for r in
|
|
c.execute("SELECT artist_name, photo_path FROM artist_photos").fetchall()}
|
|
|
|
return {"total": len(rows), "artists": [
|
|
{"artist": artist,
|
|
"sort_artist": sort_art,
|
|
"track_count": tc,
|
|
"album_count": ac,
|
|
"photo_url": f"/library/artist-photo/{artist}" if photos.get(artist) else None}
|
|
for artist, sort_art, tc, ac in rows
|
|
]}
|
|
|
|
|
|
@app.get("/library/search")
|
|
async def library_search(
|
|
q: str = Query(..., min_length=1),
|
|
limit: int = Query(50, ge=1, le=200)
|
|
):
|
|
term = f"%{q}%"
|
|
with sqlite3.connect(DB_PATH) as c:
|
|
rows = c.execute(f"""
|
|
SELECT {SONG_COLS} FROM songs
|
|
WHERE title LIKE ? OR artist LIKE ? OR album LIKE ? OR genre LIKE ?
|
|
ORDER BY sort_artist, sort_album, disc_number, track_number
|
|
LIMIT ?
|
|
""", (term, term, term, term, limit)).fetchall()
|
|
return {"total": len(rows), "songs": [song_row_to_dict(r) for r in rows]}
|
|
|
|
|
|
@app.get("/library/song/{song_id}")
|
|
async def library_song(song_id: str):
|
|
with sqlite3.connect(DB_PATH) as c:
|
|
row = c.execute(f"SELECT {SONG_COLS} FROM songs WHERE id = ?", (song_id,)).fetchone()
|
|
if not row:
|
|
raise HTTPException(404, "Song not found")
|
|
return song_row_to_dict(row)
|
|
|
|
|
|
@app.get("/library/cover-art/{song_id}")
|
|
async def library_cover_art(song_id: str):
|
|
with sqlite3.connect(DB_PATH) as c:
|
|
row = c.execute("SELECT cover_art_path FROM songs WHERE id = ?", (song_id,)).fetchone()
|
|
if not row or not row[0] or not os.path.isfile(row[0]):
|
|
raise HTTPException(404, "No cover art")
|
|
mt = "image/png" if row[0].lower().endswith(".png") else "image/jpeg"
|
|
return FileResponse(row[0], media_type=mt)
|
|
|
|
|
|
@app.post("/library/cover-art/{song_id}")
|
|
async def upload_cover_art(song_id: str, file: UploadFile = File(...)):
|
|
"""Upload cover art -- saves as folder.jpg and updates all songs in that directory."""
|
|
with sqlite3.connect(DB_PATH) as c:
|
|
row = c.execute("SELECT full_path FROM songs WHERE id = ?", (song_id,)).fetchone()
|
|
if not row:
|
|
raise HTTPException(404, "Song not found")
|
|
song_dir = os.path.dirname(row[0])
|
|
cover_dest = os.path.join(song_dir, "folder.jpg")
|
|
try:
|
|
with open(cover_dest, "wb") as buf:
|
|
shutil.copyfileobj(file.file, buf)
|
|
with sqlite3.connect(DB_PATH) as c:
|
|
c.execute("UPDATE songs SET cover_art_path = ? WHERE full_path LIKE ?",
|
|
(cover_dest, os.path.join(song_dir, "%")))
|
|
cached = os.path.join(COVER_ART_DIR, f"{song_id}.jpg")
|
|
if os.path.isfile(cached):
|
|
os.remove(cached)
|
|
await push.broadcast("cover_art_updated", {"song_id": song_id})
|
|
return {"status": "saved", "path": cover_dest}
|
|
except Exception as e:
|
|
raise HTTPException(500, str(e))
|
|
|
|
|
|
@app.post("/library/artist-photo")
|
|
async def upload_artist_photo(
|
|
artist_name: str = Form(...),
|
|
file: UploadFile = File(...)
|
|
):
|
|
"""Upload an artist photo."""
|
|
safe = re.sub(r'[<>:"/\\|?*\s]', '_', artist_name)
|
|
ext = Path(file.filename).suffix.lower() or ".jpg"
|
|
dest = os.path.join(ARTIST_PHOTO_DIR, f"{safe}{ext}")
|
|
try:
|
|
with open(dest, "wb") as buf:
|
|
shutil.copyfileobj(file.file, buf)
|
|
with sqlite3.connect(DB_PATH) as c:
|
|
c.execute("""INSERT OR REPLACE INTO artist_photos
|
|
(artist_name, photo_path, updated_at) VALUES (?,?,CURRENT_TIMESTAMP)""",
|
|
(artist_name, dest))
|
|
await push.broadcast("artist_photo_updated", {"artist": artist_name})
|
|
return {"status": "saved", "artist": artist_name}
|
|
except Exception as e:
|
|
raise HTTPException(500, str(e))
|
|
|
|
|
|
@app.get("/library/artist-photo/{artist_name}")
|
|
async def get_artist_photo(artist_name: str):
|
|
with sqlite3.connect(DB_PATH) as c:
|
|
row = c.execute(
|
|
"SELECT photo_path FROM artist_photos WHERE artist_name = ?", (artist_name,)
|
|
).fetchone()
|
|
if not row or not row[0] or not os.path.isfile(row[0]):
|
|
raise HTTPException(404, "No photo")
|
|
mt = "image/png" if row[0].lower().endswith(".png") else "image/jpeg"
|
|
return FileResponse(row[0], media_type=mt)
|
|
|
|
|
|
async def _run_conflict_check_and_broadcast():
|
|
"""Run conflict check in the background and broadcast results."""
|
|
try:
|
|
await asyncio.sleep(6) # Wait for Navidrome scan to complete
|
|
navidrome_db = os.getenv("NAVIDROME_DB_PATH", "/navidrome_data/navidrome.db")
|
|
issues = check_library_conflicts(navidrome_db)
|
|
error_count = sum(1 for i in issues if i["severity"] == "error")
|
|
warning_count = sum(1 for i in issues if i["severity"] == "warning")
|
|
if issues:
|
|
await push.broadcast("conflicts_updated", {
|
|
"total": str(len(issues)),
|
|
"errors": str(error_count),
|
|
"warnings": str(warning_count)
|
|
})
|
|
except Exception as e:
|
|
print(f" Background conflict check failed: {e}", flush=True)
|
|
|
|
|
|
# =============================================================================
|
|
# LIBRARY CONFLICTS
|
|
# =============================================================================
|
|
|
|
def check_library_conflicts(navidrome_db_path: str = os.getenv("NAVIDROME_DB_PATH", "/navidrome_data/navidrome.db")) -> list:
|
|
"""
|
|
Run all conflict checks and return a list of issues.
|
|
Each issue is a dict with: type, severity, title, detail, affected_paths, fix_action
|
|
"""
|
|
issues = []
|
|
|
|
# ── 1. Duplicate albums (same name, multiple album_artist values) ─────────
|
|
try:
|
|
with sqlite3.connect(navidrome_db_path) as nav:
|
|
rows = nav.execute("""
|
|
SELECT name, COUNT(DISTINCT album_artist) as aa_count,
|
|
GROUP_CONCAT(id, '|||') as ids,
|
|
GROUP_CONCAT(album_artist, '|||') as artists,
|
|
GROUP_CONCAT(song_count, '|||') as counts
|
|
FROM album
|
|
GROUP BY name
|
|
HAVING aa_count > 1
|
|
ORDER BY name
|
|
""").fetchall()
|
|
for name, aa_count, ids, artists, counts in rows:
|
|
id_list = ids.split('|||')
|
|
artist_list = artists.split('|||')
|
|
count_list = counts.split('|||')
|
|
issues.append({
|
|
"type": "duplicate_album",
|
|
"severity": "error",
|
|
"title": f"Duplicate album: {name}",
|
|
"detail": f"{aa_count} versions found with different album artists: {', '.join(artist_list)}",
|
|
"affected_paths": [],
|
|
"fix_action": "fix_duplicate_album",
|
|
"fix_data": {
|
|
"album_name": name,
|
|
"album_ids": id_list,
|
|
"album_artists": artist_list,
|
|
"song_counts": count_list
|
|
}
|
|
})
|
|
except Exception as e:
|
|
print(f" conflict check 1 failed: {e}", flush=True)
|
|
|
|
# ── 2. Missing files (Navidrome knows about them but they're gone) ────────
|
|
try:
|
|
with sqlite3.connect(navidrome_db_path) as nav:
|
|
rows = nav.execute(
|
|
"SELECT path FROM media_file WHERE missing = 1 LIMIT 50"
|
|
).fetchall()
|
|
if rows:
|
|
issues.append({
|
|
"type": "missing_files",
|
|
"severity": "warning",
|
|
"title": f"{len(rows)} missing file(s)",
|
|
"detail": "Files registered in Navidrome but not found on disk.",
|
|
"affected_paths": [r[0] for r in rows],
|
|
"fix_action": "fix_missing_files",
|
|
"fix_data": {}
|
|
})
|
|
except Exception as e:
|
|
print(f" conflict check 2 failed: {e}", flush=True)
|
|
|
|
# ── 3. Picard legacy tags (FLAC files with conflicting albumartist tags) ──
|
|
try:
|
|
legacy_files = []
|
|
with sqlite3.connect(DB_PATH) as c:
|
|
rows = c.execute("SELECT full_path FROM songs").fetchall()
|
|
for (full_path,) in rows:
|
|
if not full_path.lower().endswith('.flac'):
|
|
continue
|
|
if not os.path.isfile(full_path):
|
|
continue
|
|
try:
|
|
from mutagen.flac import FLAC
|
|
f = FLAC(full_path)
|
|
keys = [k.upper() for k in f.keys()]
|
|
has_legacy = any(k in keys for k in ['ALBUM ARTIST', 'ALBUM_ARTIST'])
|
|
has_canonical = 'ALBUMARTIST' in keys
|
|
if has_legacy and has_canonical:
|
|
legacy_files.append(os.path.relpath(full_path, MUSIC_DIR))
|
|
except Exception:
|
|
pass
|
|
if legacy_files:
|
|
issues.append({
|
|
"type": "picard_legacy_tags",
|
|
"severity": "error",
|
|
"title": f"{len(legacy_files)} file(s) with conflicting album artist tags",
|
|
"detail": "These FLAC files have both Picard legacy tags (ALBUM ARTIST/ALBUM_ARTIST) and the canonical ALBUMARTIST tag. Navidrome reads the legacy tag and gets the wrong album artist.",
|
|
"affected_paths": legacy_files[:50],
|
|
"fix_action": "fix_picard_tags",
|
|
"fix_data": {}
|
|
})
|
|
except Exception as e:
|
|
print(f" conflict check 3 failed: {e}", flush=True)
|
|
|
|
# ── 4. Orphaned tracks (album_id points to non-existent album) ────────────
|
|
try:
|
|
with sqlite3.connect(navidrome_db_path) as nav:
|
|
rows = nav.execute("""
|
|
SELECT mf.path FROM media_file mf
|
|
LEFT JOIN album a ON mf.album_id = a.id
|
|
WHERE mf.album_id IS NOT NULL AND a.id IS NULL
|
|
LIMIT 50
|
|
""").fetchall()
|
|
if rows:
|
|
issues.append({
|
|
"type": "orphaned_tracks",
|
|
"severity": "warning",
|
|
"title": f"{len(rows)} orphaned track(s)",
|
|
"detail": "Tracks whose album_id points to a non-existent album entry.",
|
|
"affected_paths": [r[0] for r in rows],
|
|
"fix_action": "fix_orphaned_tracks",
|
|
"fix_data": {}
|
|
})
|
|
except Exception as e:
|
|
print(f" conflict check 4 failed: {e}", flush=True)
|
|
|
|
# ── 5. Duplicate tracks (same title+artist+duration appearing >1 time) ───
|
|
try:
|
|
with sqlite3.connect(DB_PATH) as c:
|
|
rows = c.execute("""
|
|
SELECT title, artist, COUNT(*) as cnt,
|
|
GROUP_CONCAT(relative_path, '|||') as paths
|
|
FROM songs
|
|
WHERE title != '' AND artist != ''
|
|
GROUP BY title, artist, CAST(duration AS INT)
|
|
HAVING cnt > 1
|
|
LIMIT 30
|
|
""").fetchall()
|
|
for title, artist, cnt, paths in rows:
|
|
path_list = paths.split('|||') if paths else []
|
|
issues.append({
|
|
"type": "duplicate_track",
|
|
"severity": "warning",
|
|
"title": f"Duplicate: {title} -- {artist}",
|
|
"detail": f"Found {cnt} copies of this track.",
|
|
"affected_paths": path_list,
|
|
"fix_action": None,
|
|
"fix_data": {}
|
|
})
|
|
except Exception as e:
|
|
print(f" conflict check 5 failed: {e}", flush=True)
|
|
|
|
# ── 6. Stale Companion paths (full_path no longer exists on disk) ─────────
|
|
try:
|
|
stale = []
|
|
with sqlite3.connect(DB_PATH) as c:
|
|
rows = c.execute("SELECT relative_path, full_path FROM songs").fetchall()
|
|
for rel, full in rows:
|
|
if not os.path.isfile(full):
|
|
stale.append(rel)
|
|
if stale:
|
|
issues.append({
|
|
"type": "stale_companion_paths",
|
|
"severity": "warning",
|
|
"title": f"{len(stale)} stale path(s) in Companion DB",
|
|
"detail": "The Companion's song database has entries whose files no longer exist at the recorded path. Run a library scan to fix.",
|
|
"affected_paths": stale[:50],
|
|
"fix_action": "fix_stale_paths",
|
|
"fix_data": {}
|
|
})
|
|
except Exception as e:
|
|
print(f" conflict check 6 failed: {e}", flush=True)
|
|
|
|
print(f" Conflict check complete: {len(issues)} issues found", flush=True)
|
|
return issues
|
|
|
|
|
|
@app.get("/library/conflicts")
|
|
async def library_conflicts():
|
|
"""Run all conflict checks and return structured results."""
|
|
navidrome_db = os.getenv("NAVIDROME_DB_PATH", "/navidrome_data/navidrome.db")
|
|
issues = check_library_conflicts(navidrome_db)
|
|
error_count = sum(1 for i in issues if i["severity"] == "error")
|
|
warning_count = sum(1 for i in issues if i["severity"] == "warning")
|
|
return {
|
|
"total": len(issues),
|
|
"errors": error_count,
|
|
"warnings": warning_count,
|
|
"issues": issues
|
|
}
|
|
|
|
|
|
@app.post("/library/fix-conflict")
|
|
async def fix_conflict(request: FixConflictRequest):
|
|
"""
|
|
Fix a specific conflict by action type.
|
|
Request body: {"action": "fix_duplicate_album", "fix_data": {...}}
|
|
"""
|
|
navidrome_db = os.getenv("NAVIDROME_DB_PATH", "/navidrome_data/navidrome.db")
|
|
action = request.action
|
|
fix_data = request.fix_data
|
|
|
|
if action == "fix_duplicate_album":
|
|
# Strategy: rewrite ALBUMARTIST tags on all affected files so Navidrome's
|
|
# next scan groups them under one canonical album entry.
|
|
# We cannot write to Navidrome's DB directly while it is running.
|
|
album_name = fix_data.get("album_name", "")
|
|
album_artists = fix_data.get("album_artists", [])
|
|
song_counts = fix_data.get("song_counts", [])
|
|
|
|
if not album_artists:
|
|
raise HTTPException(400, "No album artists provided")
|
|
|
|
# Canonical = album artist with the most songs
|
|
try:
|
|
counts = [int(c) for c in song_counts]
|
|
canonical_idx = counts.index(max(counts))
|
|
except Exception:
|
|
canonical_idx = 0
|
|
canonical_artist = album_artists[canonical_idx]
|
|
|
|
try:
|
|
with sqlite3.connect(DB_PATH) as c:
|
|
rows = c.execute(
|
|
"SELECT full_path FROM songs WHERE album = ?", (album_name,)
|
|
).fetchall()
|
|
|
|
fixed = 0
|
|
for (full_path,) in rows:
|
|
if not os.path.isfile(full_path):
|
|
continue
|
|
try:
|
|
ext = Path(full_path).suffix.lower()
|
|
if ext == '.flac':
|
|
from mutagen.flac import FLAC
|
|
f = FLAC(full_path)
|
|
for variant in ['ALBUM ARTIST', 'ALBUM_ARTIST', 'ALBUMARTIST', 'albumartist']:
|
|
if variant in f:
|
|
del f[variant]
|
|
f['ALBUMARTIST'] = canonical_artist
|
|
f['ALBUM'] = album_name
|
|
f.save()
|
|
else:
|
|
audio = MutagenFile(full_path, easy=True)
|
|
if audio:
|
|
audio['albumartist'] = canonical_artist
|
|
audio['album'] = album_name
|
|
audio.save()
|
|
update_song_in_db(full_path)
|
|
fixed += 1
|
|
except Exception as e:
|
|
print(f" fix_duplicate_album: {os.path.basename(full_path)}: {e}", flush=True)
|
|
|
|
await trigger_scan()
|
|
await asyncio.sleep(4)
|
|
await push.broadcast("conflicts_updated", {"action": "fix_duplicate_album", "album": album_name})
|
|
return {"status": "fixed", "album": album_name, "fixed": fixed, "canonical_artist": canonical_artist}
|
|
except Exception as e:
|
|
raise HTTPException(500, f"Fix failed: {e}")
|
|
|
|
elif action == "fix_missing_files":
|
|
# Trigger a full Navidrome rescan -- Navidrome will detect and remove
|
|
# missing files automatically during a full scan. We cannot write to
|
|
# Navidrome's DB directly while it is running (mounted read-only).
|
|
try:
|
|
await trigger_scan()
|
|
return {"status": "triggered_scan", "detail": "Navidrome will remove missing entries on next scan"}
|
|
except Exception as e:
|
|
raise HTTPException(500, f"Fix failed: {e}")
|
|
|
|
elif action == "fix_picard_tags":
|
|
fixed = 0
|
|
failed = []
|
|
with sqlite3.connect(DB_PATH) as c:
|
|
rows = c.execute("SELECT full_path FROM songs WHERE full_path LIKE '%.flac'").fetchall()
|
|
for (full_path,) in rows:
|
|
if not os.path.isfile(full_path):
|
|
continue
|
|
try:
|
|
from mutagen.flac import FLAC
|
|
f = FLAC(full_path)
|
|
keys = list(f.keys())
|
|
upper_keys = [k.upper() for k in keys]
|
|
has_legacy = any(k in upper_keys for k in ['ALBUM ARTIST', 'ALBUM_ARTIST'])
|
|
if not has_legacy:
|
|
continue
|
|
# Get canonical value
|
|
canonical = None
|
|
for k in keys:
|
|
if k.upper() == 'ALBUMARTIST':
|
|
canonical = f[k][0] if f[k] else None
|
|
break
|
|
if not canonical:
|
|
for k in keys:
|
|
if k.upper() in ['ALBUM ARTIST', 'ALBUM_ARTIST']:
|
|
canonical = f[k][0] if f[k] else None
|
|
break
|
|
if not canonical:
|
|
continue
|
|
# Remove all variants and write canonical
|
|
for k in list(f.keys()):
|
|
if k.upper() in ['ALBUM ARTIST', 'ALBUM_ARTIST', 'ALBUMARTIST', 'albumartist']:
|
|
del f[k]
|
|
f['ALBUMARTIST'] = canonical
|
|
f.save()
|
|
update_song_in_db(full_path)
|
|
fixed += 1
|
|
except Exception as e:
|
|
failed.append(os.path.relpath(full_path, MUSIC_DIR))
|
|
await trigger_scan()
|
|
await push.broadcast("conflicts_updated", {"action": "fix_picard_tags"})
|
|
return {"status": "fixed", "fixed": fixed, "failed": len(failed)}
|
|
|
|
elif action == "fix_stale_paths":
|
|
count = scan_library(full_rescan=False)
|
|
build_file_index()
|
|
return {"status": "fixed", "rescanned": count}
|
|
|
|
elif action == "fix_orphaned_tracks":
|
|
await trigger_scan()
|
|
return {"status": "triggered_scan"}
|
|
|
|
else:
|
|
raise HTTPException(400, f"Unknown action: {action}")
|
|
|
|
|
|
# =============================================================================
|
|
# WebSocket Push
|
|
# =============================================================================
|
|
|
|
@app.websocket("/ws/push")
|
|
async def ws_push(ws: WebSocket):
|
|
await push.connect(ws)
|
|
try:
|
|
while True:
|
|
data = json.loads(await ws.receive_text())
|
|
act = data.get("action")
|
|
if act == "ping":
|
|
await push.send_to(ws, "pong", {"t": str(time.time())})
|
|
elif act == "get_profile":
|
|
rp = data.get("path", "")
|
|
fp = resolve_path(rp)
|
|
if fp:
|
|
try:
|
|
with sqlite3.connect(DB_PATH) as c:
|
|
row = c.execute(
|
|
"SELECT bpm,silence_start,silence_end,loudness_lufs "
|
|
"FROM dj_profiles WHERE file_path=?", (fp,)
|
|
).fetchone()
|
|
if row:
|
|
await push.send_to(ws, "profile", {
|
|
"path": rp, "bpm": str(row[0]),
|
|
"silence_start": str(row[1]),
|
|
"silence_end": str(row[2]),
|
|
"loudness_lufs": str(row[3])
|
|
})
|
|
else:
|
|
await push.send_to(ws, "profile",
|
|
{"path": rp, "error": "not_analyzed"})
|
|
except Exception as e:
|
|
await push.send_to(ws, "error", {"message": str(e)})
|
|
elif act == "get_vis":
|
|
rp = data.get("path", "")
|
|
fp = resolve_path(rp)
|
|
if fp:
|
|
frames = get_vis(fp)
|
|
if frames:
|
|
await push.send_to(ws, "vis_frames", {
|
|
"path": rp, "count": str(len(frames)),
|
|
"fps": "30", "frames": json.dumps(frames)
|
|
})
|
|
except WebSocketDisconnect:
|
|
push.disconnect(ws)
|
|
except Exception as e:
|
|
print(f"WS error: {e}")
|
|
push.disconnect(ws)
|