fixed library management

This commit is contained in:
Dallas Groot 2026-04-10 16:22:49 -07:00
parent 5b319ad643
commit fde3df0d26

View file

@ -175,47 +175,74 @@ def find_cover_art(song_path: str) -> Optional[str]:
# ── Tag reader ──────────────────────────────────────────────────────────────
def read_tags(full_path: str) -> dict:
"""Read all display tags from an audio file using mutagen."""
audio_easy = None
try:
audio = MutagenFile(full_path, easy=True)
audio_easy = MutagenFile(full_path, easy=True)
except Exception:
audio = None
pass
def get(key):
if audio and key in audio and audio[key]:
return audio[key][0]
# AIFF files don't support easy=True — fall back to raw ID3 tags
audio_raw = None
ext = Path(full_path).suffix.lower()
if ext in ('.aiff', '.aif') and (audio_easy is None or not audio_easy):
try:
from mutagen.aiff import AIFF
audio_raw = AIFF(full_path)
except Exception:
pass
def get_easy(key):
if audio_easy and key in audio_easy and audio_easy[key]:
return audio_easy[key][0]
return ''
title = get('title') or Path(full_path).stem
artist = get('artist') or 'Unknown Artist'
album = get('album') or 'Unknown Album'
album_artist = get('albumartist') or artist
genre = get('genre') or ''
def get_raw(frame_id):
"""Read a raw ID3 frame value from AIFF."""
if audio_raw and audio_raw.tags:
frame = audio_raw.tags.get(frame_id)
if frame:
return str(frame.text[0]) if hasattr(frame, 'text') and frame.text else str(frame)
return ''
def get(easy_key, raw_id=None):
val = get_easy(easy_key)
if not val and raw_id:
val = get_raw(raw_id)
return val
title = get('title', 'TIT2') or Path(full_path).stem
artist = get('artist', 'TPE1') or 'Unknown Artist'
album = get('album', 'TALB') or 'Unknown Album'
album_artist = get('albumartist', 'TPE2') or artist
genre = get('genre', 'TCON') or ''
year = None
raw = get('date')
if raw:
m = re.search(r'\d{4}', raw)
raw_date = get('date', 'TDRC') or get('date', 'TYER')
if raw_date:
m = re.search(r'\d{4}', str(raw_date))
if m:
year = int(m.group())
track_number = None
raw = get('tracknumber')
if raw:
m = re.match(r'(\d+)', raw)
raw_track = get('tracknumber', 'TRCK')
if raw_track:
m = re.match(r'(\d+)', str(raw_track))
if m:
track_number = int(m.group(1))
disc_number = None
raw = get('discnumber')
if raw:
m = re.match(r'(\d+)', raw)
raw_disc = get('discnumber', 'TPOS')
if raw_disc:
m = re.match(r'(\d+)', str(raw_disc))
if m:
disc_number = int(m.group(1))
duration = None
if audio and hasattr(audio, 'info') and audio.info:
audio_for_info = audio_easy or audio_raw
if audio_for_info and hasattr(audio_for_info, 'info') and audio_for_info.info:
try:
duration = float(audio.info.length)
duration = float(audio_for_info.info.length)
except Exception:
pass
@ -310,7 +337,8 @@ def update_song_in_db(full_path: str):
mtime = fsize = None
with sqlite3.connect(DB_PATH) as c:
c.execute("""UPDATE songs SET
cur = c.cursor()
cur.execute("""UPDATE songs SET
title=?, artist=?, album=?, album_artist=?, genre=?,
year=?, track_number=?, disc_number=?, duration=?,
sort_title=?, sort_artist=?, sort_album=?, sort_album_artist=?,
@ -323,8 +351,8 @@ def update_song_in_db(full_path: str):
sort_key(tags['album']), sort_key(tags['album_artist']),
cover, fsize, mtime, datetime.utcnow().isoformat(), song_id
))
if c.rowcount == 0:
c.execute("""INSERT OR REPLACE INTO songs (
if cur.rowcount == 0:
cur.execute("""INSERT OR REPLACE INTO songs (
id, full_path, relative_path,
title, artist, album, album_artist, genre,
year, track_number, disc_number, duration,
@ -506,15 +534,27 @@ async def trigger_scan():
async def sync_navidrome_ids_task():
"""Fetch all songs from Navidrome and write navidrome_id into our songs table."""
"""
Fetch all songs from Navidrome and match them into our songs table.
Matching strategy (tried in order per song):
1. title + artist primary, both read from same ID3 tags
2. title + album fallback when artist field differs
3. title only fallback for unique titles
4. duration bucket last resort (±2s tolerance, unique per bucket)
"""
try:
if not all([SUBSONIC_USER, SUBSONIC_TOKEN, SUBSONIC_SALT]):
print("Subsonic credentials not set - cannot sync IDs")
return
print(f"Syncing Navidrome IDs... URL={NAVIDROME_URL}", flush=True)
base_params = {"u": SUBSONIC_USER, "t": SUBSONIC_TOKEN, "s": SUBSONIC_SALT,
"v": "1.16.1", "c": "CompanionAPI", "f": "json",
"albumCount": 0, "artistCount": 0, "songCount": 500, "query": ""}
# ── Fetch all songs from Navidrome ────────────────────────────────────
base_params = {
"u": SUBSONIC_USER, "t": SUBSONIC_TOKEN, "s": SUBSONIC_SALT,
"v": "1.16.1", "c": "CompanionAPI", "f": "json",
"albumCount": 0, "artistCount": 0, "songCount": 500, "query": ""
}
all_songs = []
offset = 0
async with httpx.AsyncClient(timeout=60) as client:
@ -524,13 +564,9 @@ async def sync_navidrome_ids_task():
f"{NAVIDROME_URL}/rest/search3.view",
params={**base_params, "songOffset": offset}
)
print(f" Navidrome response: HTTP {r.status_code}", flush=True)
body = r.json()
# Check for auth errors
resp = body.get("subsonic-response", {})
resp = r.json().get("subsonic-response", {})
if resp.get("status") == "failed":
err = resp.get("error", {})
print(f" Navidrome auth error: {err}", flush=True)
print(f" Navidrome auth error: {resp.get('error')}", flush=True)
return
songs = resp.get("searchResult3", {}).get("song", [])
print(f" Page offset={offset}: {len(songs)} songs", flush=True)
@ -544,42 +580,212 @@ async def sync_navidrome_ids_task():
print(f" Navidrome fetch error: {e}", flush=True)
break
matched = 0
print(f" Navidrome total: {len(all_songs)} songs", flush=True)
if not all_songs:
return
# ── Show first 3 Navidrome songs for diagnosis ────────────────────────
for ns in all_songs[:3]:
print(f" ND sample: title={repr(ns.get('title',''))} "
f"artist={repr(ns.get('artist',''))} "
f"album={repr(ns.get('album',''))} "
f"duration={ns.get('duration')}", flush=True)
# ── Build lookup tables from our DB ───────────────────────────────────
def norm(s: str) -> str:
"""Lowercase, strip, NFC-normalize."""
return unicodedata.normalize("NFC", (s or "").lower().strip())
def clean_title(s: str) -> str:
"""Strip leading track/disc number prefix from filename-derived titles.
e.g. '09 Careless' -> 'careless', '01-02 Song' -> 'song'
"""
s = norm(s)
# Strip patterns like "09 ", "09 - ", "1-02 ", "01. " etc
s = re.sub(r'^\d{1,2}[-\s\.]+\d{0,2}[-\s\.]*', '', s).strip()
s = re.sub(r'^\d{1,2}[-\s\.]+', '', s).strip()
return s
def dur_bucket(seconds) -> Optional[int]:
"""Round to nearest 2-second bucket for fuzzy duration matching."""
if seconds is None:
return None
return int(round(float(seconds) / 2.0))
with sqlite3.connect(DB_PATH) as c:
cur = c.cursor()
total_db = cur.execute("SELECT COUNT(*) FROM songs").fetchone()[0]
print(f" DB songs total: {total_db}", flush=True)
# Match by (title, artist) from ID3 tags — both Navidrome and Companion
# read the same tags so this is always consistent regardless of
# folder structure or filename format differences.
db_rows = cur.execute(
"SELECT id, LOWER(TRIM(title)), LOWER(TRIM(artist)) FROM songs"
"SELECT id, title, artist, album, duration FROM songs"
).fetchall()
db_lookup = {}
for song_id, title, artist in db_rows:
key = (unicodedata.normalize("NFC", title),
unicodedata.normalize("NFC", artist))
# If duplicate title+artist, keep first (edge case)
if key not in db_lookup:
db_lookup[key] = song_id
print(f" DB lookup built: {len(db_lookup)} entries", flush=True)
total_db = len(db_rows)
print(f" DB songs total: {total_db}", flush=True)
# Show first 3 DB songs for comparison
for row in db_rows[:3]:
print(f" DB sample: title={repr(row[1])} "
f"artist={repr(row[2])} "
f"album={repr(row[3])} "
f"duration={row[4]}", flush=True)
# Strategy 1: title + artist
by_title_artist: dict[tuple, str] = {}
# Strategy 2: title + album
by_title_album: dict[tuple, str] = {}
# Strategy 3: title only (only stored if unique)
by_title: dict[str, Optional[str]] = {}
# Strategy 4: duration bucket + first 8 chars of title (unique)
by_dur: dict[tuple, Optional[str]] = {}
# Strategy 5: clean_title + artist (strips track number prefix)
by_clean_artist: dict[tuple, Optional[str]] = {}
# Strategy 6: duration only within ±2s (unique per bucket)
by_dur_only: dict[int, Optional[str]] = {}
for song_id, title, artist, album, duration in db_rows:
nt = norm(title)
na = norm(artist)
nb = norm(album)
ct = clean_title(title)
dk = dur_bucket(duration)
k1 = (nt, na)
if k1 not in by_title_artist:
by_title_artist[k1] = song_id
k2 = (nt, nb)
if k2 not in by_title_album:
by_title_album[k2] = song_id
if nt in by_title:
by_title[nt] = None
else:
by_title[nt] = song_id
if dk is not None:
k4 = (dk, nt[:8])
if k4 in by_dur:
by_dur[k4] = None
else:
by_dur[k4] = song_id
k5 = (ct, na)
if k5 not in by_clean_artist:
if k5 in by_clean_artist:
by_clean_artist[k5] = None
else:
by_clean_artist[k5] = song_id
else:
by_clean_artist[k5] = None # ambiguous
if dk is not None:
if dk in by_dur_only:
by_dur_only[dk] = None
else:
by_dur_only[dk] = song_id
# Strategy 7: clean_title + duration bucket (catches untagged files
# where artist is missing but filename title + duration uniquely identify the song)
by_clean_dur: dict[tuple, Optional[str]] = {}
for song_id, title, artist, album, duration in db_rows:
ct = clean_title(title)
dk = dur_bucket(duration)
if dk is not None:
k7 = (ct, dk)
if k7 in by_clean_dur:
by_clean_dur[k7] = None # ambiguous
else:
by_clean_dur[k7] = song_id
print(f" Lookups: title+artist={len(by_title_artist)} "
f"title+album={len(by_title_album)} "
f"title_only={sum(1 for v in by_title.values() if v)} "
f"duration={sum(1 for v in by_dur.values() if v)} "
f"clean+artist={sum(1 for v in by_clean_artist.values() if v)} "
f"dur_only={sum(1 for v in by_dur_only.values() if v)} "
f"clean+dur={sum(1 for v in by_clean_dur.values() if v)}", flush=True)
matched_s1 = matched_s2 = matched_s3 = matched_s4 = 0
matched_s5 = matched_s6 = matched_s7 = unmatched = 0
unmatched_samples = []
with sqlite3.connect(DB_PATH) as c:
cur = c.cursor()
for ns in all_songs:
nd_id = ns.get("id", "")
nd_title = unicodedata.normalize("NFC",
(ns.get("title") or "").lower().strip())
nd_artist = unicodedata.normalize("NFC",
(ns.get("artist") or "").lower().strip())
if not nd_id or not nd_title:
nd_id = ns.get("id", "")
if not nd_id:
continue
key = (nd_title, nd_artist)
if key in db_lookup:
cur.execute("UPDATE songs SET navidrome_id = ? WHERE id = ?",
(nd_id, db_lookup[key]))
matched += 1
print(f"Navidrome ID sync: {matched}/{len(all_songs)} matched", flush=True)
nt = norm(ns.get("title", ""))
na = norm(ns.get("artist", ""))
nb = norm(ns.get("album", ""))
ct = clean_title(ns.get("title", ""))
dk = dur_bucket(ns.get("duration"))
db_song_id = None
strategy = 0
if not db_song_id:
hit = by_title_artist.get((nt, na))
if hit: db_song_id, strategy = hit, 1
if not db_song_id:
hit = by_title_album.get((nt, nb))
if hit: db_song_id, strategy = hit, 2
if not db_song_id:
hit = by_title.get(nt)
if hit: db_song_id, strategy = hit, 3
if not db_song_id and dk is not None:
hit = by_dur.get((dk, nt[:8]))
if hit: db_song_id, strategy = hit, 4
if not db_song_id:
hit = by_clean_artist.get((ct, na))
if hit: db_song_id, strategy = hit, 5
if not db_song_id and dk is not None:
hit = by_dur_only.get(dk)
if hit: db_song_id, strategy = hit, 6
# S7: clean title + duration bucket — catches untagged AIFF/files
# where artist is unknown but filename+duration uniquely identify song
if not db_song_id and dk is not None:
hit = by_clean_dur.get((ct, dk))
if hit: db_song_id, strategy = hit, 7
if db_song_id:
cur.execute("UPDATE songs SET navidrome_id = ? WHERE id = ?",
(nd_id, db_song_id))
if strategy == 1: matched_s1 += 1
elif strategy == 2: matched_s2 += 1
elif strategy == 3: matched_s3 += 1
elif strategy == 4: matched_s4 += 1
elif strategy == 5: matched_s5 += 1
elif strategy == 6: matched_s6 += 1
else: matched_s7 += 1
else:
unmatched += 1
if len(unmatched_samples) < 10:
unmatched_samples.append(
f"title={repr(ns.get('title',''))} "
f"artist={repr(ns.get('artist',''))} "
f"duration={ns.get('duration')}"
)
total_matched = matched_s1+matched_s2+matched_s3+matched_s4+matched_s5+matched_s6+matched_s7
print(f"Navidrome ID sync complete: {total_matched}/{len(all_songs)} matched", flush=True)
print(f" Strategy breakdown: "
f"title+artist={matched_s1} title+album={matched_s2} "
f"title_only={matched_s3} dur+prefix={matched_s4} "
f"clean+artist={matched_s5} dur_only={matched_s6} "
f"clean+dur={matched_s7} unmatched={unmatched}", flush=True)
if unmatched_samples:
print(f" Unmatched samples:", flush=True)
for s in unmatched_samples:
print(f" {s}", flush=True)
except Exception as e:
import traceback
print(f"sync_navidrome_ids_task FAILED: {e}", flush=True)
@ -692,32 +898,39 @@ def restructure_file(full_path: str) -> Optional[str]:
else:
break
# Update DB with new path
# Update DB with new path — re-read tags for accurate sort keys
new_relative = os.path.relpath(target, MUSIC_DIR)
song_id = hashlib.md5(full_path.encode()).hexdigest()
new_id = hashlib.md5(target.encode()).hexdigest()
tags = read_tags(target)
with sqlite3.connect(DB_PATH) as c:
# Update the existing row to reflect new path and new id
c.execute("""UPDATE songs SET
cur = c.cursor()
cur.execute("""UPDATE songs SET
id=?, full_path=?, relative_path=?,
sort_title=?, sort_artist=?, sort_album=?, sort_album_artist=?,
file_mtime=?, date_modified=?
WHERE id=?""", (
new_id, target, new_relative,
sort_key(Path(target).stem),
sort_key(os.path.dirname(new_relative).split(os.sep)[0] if os.sep in new_relative else ''),
sort_key(os.path.dirname(new_relative).split(os.sep)[1] if new_relative.count(os.sep) > 0 else ''),
sort_key(os.path.dirname(new_relative).split(os.sep)[0] if os.sep in new_relative else ''),
sort_key(tags['title']),
sort_key(tags['artist']),
sort_key(tags['album']),
sort_key(tags['album_artist']),
os.stat(target).st_mtime,
datetime.utcnow().isoformat(),
song_id
))
if c.rowcount == 0:
# Row used old id — try by full_path
c.execute("""UPDATE songs SET
id=?, full_path=?, relative_path=?, file_mtime=?, date_modified=?
if cur.rowcount == 0:
# Row used old full_path as key — try matching by path
cur.execute("""UPDATE songs SET
id=?, full_path=?, relative_path=?,
sort_title=?, sort_artist=?, sort_album=?, sort_album_artist=?,
file_mtime=?, date_modified=?
WHERE full_path=?""", (
new_id, target, new_relative,
sort_key(tags['title']),
sort_key(tags['artist']),
sort_key(tags['album']),
sort_key(tags['album_artist']),
os.stat(target).st_mtime,
datetime.utcnow().isoformat(),
full_path