From 78a6a97fcfa0b378cc285bcddd3409ddb6b3736f Mon Sep 17 00:00:00 2001 From: "sp4rk.y" Date: Mon, 15 Jun 2026 13:34:34 -0600 Subject: [PATCH] feat(music): native music core - shared helpers, album folder template, display cleanup (#125) * Add native Music core workflow * feat(music): add shared music core helpers, album folder template, and UX cleanup Consolidate duplicated music-service logic into core and extend the native music workflow. - add core/music/extract.py: shared stateless helpers (first_text, first_number, year/duration/name formatting, classify_release_kind, dedupe_track_options, build_music_from_songs) so services stop carrying their own copies - add core/music/display.py: shared rich rendering (render_track_panel, render_album_header, render_artwork_preview) with TrackRow / MusicHeaderInfo data holders - export the new helpers from core/music/__init__.py - add dedicated `albums` folder template kind (output_template.folder.albums) resolving albums -> songs -> "{artist} - {album} ({year})"; whitelist music template variables (album_artist, track_total, disc_total, release_type, genre, explicit, isrc, upc, label) - fix song filename crash: config.get_output_template(...) did not exist; use config.output_template.get("songs") with a sane default - strip emojis from music output (renderer, dl.py music branch) to match unshackle UX; remove dead MusicSongPlan import and music_icon logic - document the albums folder key in unshackle-example.yaml - add tests for extract, display, and the folder template --------- Co-authored-by: MrMovies-Dev --- pyproject.toml | 2 + tests/core/test_music_display.py | 193 +++++++++ tests/core/test_music_extract.py | 243 +++++++++++ tests/core/test_music_folder_template.py | 93 +++++ unshackle/commands/dl.py | 489 +++++++++++++++++++++- unshackle/core/config.py | 15 +- unshackle/core/music/__init__.py | 40 ++ unshackle/core/music/display.py | 175 ++++++++ unshackle/core/music/extract.py | 196 +++++++++ unshackle/core/music/hasher.py | 16 + unshackle/core/music/integrity.py | 187 +++++++++ unshackle/core/music/manifest.py | 23 + unshackle/core/music/models.py | 58 +++ unshackle/core/music/planner.py | 208 +++++++++ unshackle/core/music/renderer.py | 510 +++++++++++++++++++++++ unshackle/core/music/tagger.py | 345 +++++++++++++++ unshackle/core/titles/__init__.py | 5 +- unshackle/core/titles/music.py | 276 ++++++++++++ unshackle/core/tracks/audio.py | 6 +- unshackle/unshackle-example.yaml | 8 +- uv.lock | 22 + 21 files changed, 3095 insertions(+), 15 deletions(-) create mode 100644 tests/core/test_music_display.py create mode 100644 tests/core/test_music_extract.py create mode 100644 tests/core/test_music_folder_template.py create mode 100644 unshackle/core/music/__init__.py create mode 100644 unshackle/core/music/display.py create mode 100644 unshackle/core/music/extract.py create mode 100644 unshackle/core/music/hasher.py create mode 100644 unshackle/core/music/integrity.py create mode 100644 unshackle/core/music/manifest.py create mode 100644 unshackle/core/music/models.py create mode 100644 unshackle/core/music/planner.py create mode 100644 unshackle/core/music/renderer.py create mode 100644 unshackle/core/music/tagger.py create mode 100644 unshackle/core/titles/music.py diff --git a/pyproject.toml b/pyproject.toml index 50cc70e..c81bf4a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -67,6 +67,8 @@ dependencies = [ "animeapi-py>=0.6.0", "rnet>=2.4.2", "bandit>=1.9.4", + "jsonpath-ng>=1.8.0", + "mutagen>=1.47.0,<2", "defusedxml>=0.7.1", ] diff --git a/tests/core/test_music_display.py b/tests/core/test_music_display.py new file mode 100644 index 0000000..ba5c98b --- /dev/null +++ b/tests/core/test_music_display.py @@ -0,0 +1,193 @@ +"""Unit tests for the shared music rendering helpers in unshackle.core.music.display. + +Rendering is visual, so these assert structure/type/no-crash rather than exact +ANSI. No network: the artwork soft-fail path is exercised with a session whose +get raises, and (when Pillow is absent) via the missing-dependency guard. +""" + +from __future__ import annotations + +from typing import Any, Optional + +from rich.console import Console, RenderableType +from rich.panel import Panel + +from unshackle.core.music.display import (MusicHeaderInfo, TrackRow, format_track_detail_quality, render_album_header, + render_artwork_preview, render_track_panel) +from unshackle.core.titles.music import Song + + +class DummyService: + """Stand-in service class; Song only requires a type, never an instance.""" + + +def make_song(*, track: int = 1, name: str = "Song", album: str = "Album") -> Song: + return Song( + id_=f"{album}-{track}", + service=DummyService, + name=name, + artist="Artist", + album=album, + track=track, + disc=1, + year=2020, + ) + + +def render_to_text(renderable: RenderableType) -> str: + """Render a rich renderable to plain text so we can assert on its content.""" + console = Console(width=120, record=True, force_terminal=False) + console.print(renderable) + return console.export_text() + + +def test_trackrow_constructs() -> None: + song = make_song(name="Track A") + row = TrackRow( + song=song, + quality_label="FLAC 16-bit/44.1kHz", + layout="Stereo", + duration_str="3:21", + hires=True, + cd=True, + note="", + ) + assert row.song is song + assert row.quality_label == "FLAC 16-bit/44.1kHz" + assert row.layout == "Stereo" + assert row.hires is True + assert row.cd is True + assert row.note == "" + + +def test_trackrow_defaults() -> None: + row = TrackRow(song=make_song(), quality_label="MP3 320", layout="Stereo", duration_str="2:00") + assert row.hires is False + assert row.cd is False + assert row.note == "" + + +def test_musicheaderinfo_constructs() -> None: + info = MusicHeaderInfo( + artist="Some Artist", + album="Some Album", + year="2021", + track_count=12, + quality_label="FLAC 24-bit/96kHz", + duration_str="42:10", + ) + assert info.artist == "Some Artist" + assert info.album == "Some Album" + assert info.year == "2021" + assert info.track_count == 12 + assert info.quality_label == "FLAC 24-bit/96kHz" + assert info.duration_str == "42:10" + assert info.artist_label == "Artist" + + +def test_format_track_detail_quality_brackets_codec() -> None: + assert format_track_detail_quality("FLAC 16-bit/44.1kHz") == "[FLAC] | 16-bit/44.1kHz" + assert format_track_detail_quality("ogg 320kbps") == "[OGG] | 320kbps" + + +def test_format_track_detail_quality_passthrough() -> None: + assert format_track_detail_quality("Lossless") == "Lossless" + assert format_track_detail_quality("") == "" + + +def test_render_track_panel_returns_panel() -> None: + rows = [ + TrackRow(song=make_song(track=1, name="First"), quality_label="FLAC 16-bit/44.1kHz", layout="Stereo", duration_str="3:00", cd=True), + TrackRow(song=make_song(track=2, name="Second"), quality_label="OGG 320", layout="Stereo", duration_str="4:00", hires=True), + ] + panel = render_track_panel(rows, total=len(rows)) + assert isinstance(panel, Panel) + + text = render_to_text(panel) + assert "First" in text + assert "Second" in text + assert "2 Tracks" in text + assert "CD" in text + assert "Hi-Res" in text + + +def test_render_track_panel_singular_label() -> None: + rows = [TrackRow(song=make_song(name="Only"), quality_label="FLAC 16-bit/44.1kHz", layout="Stereo", duration_str="3:00")] + text = render_to_text(render_track_panel(rows, total=1)) + # The count node reads "1 Track" (singular); the panel title is always + # "Available Tracks", so we only check the count node phrasing here. + assert "1 Track" in text + assert "1 Tracks" not in text + + +def test_render_track_panel_note_marks_unavailable() -> None: + rows = [ + TrackRow( + song=make_song(name="Gone"), + quality_label="FLAC", + layout="Stereo", + duration_str="3:00", + cd=True, + hires=True, + note="(unavailable in region)", + ) + ] + text = render_to_text(render_track_panel(rows, total=1)) + assert "unavailable in region" in text + # When a note is present, the badges are suppressed. + assert "CD" not in text + assert "Hi-Res" not in text + + +def test_render_album_header_without_artwork() -> None: + info = MusicHeaderInfo( + artist="The Artist", + album="Greatest Hits", + year="1999", + track_count=10, + quality_label="FLAC 16-bit/44.1kHz", + duration_str="55:00", + ) + header = render_album_header(info) + assert header is not None + text = render_to_text(header) + assert "The Artist" in text + assert "Greatest Hits" in text + assert "1999" in text + assert "10" in text + + +def test_render_album_header_with_artwork() -> None: + info = MusicHeaderInfo( + artist="Owner Name", + album="A Playlist", + year="2024", + track_count=3, + quality_label="OGG 320", + artist_label="Owner", + ) + artwork = render_album_header(MusicHeaderInfo("x", "y", "1", 1, "q")) # any renderable + header = render_album_header(info, artwork=artwork) + assert header is not None + text = render_to_text(header) + assert "Owner Name" in text + assert "A Playlist" in text + assert "Owner" in text + + +class RaisingSession: + """Session stub whose get() always raises, to drive the soft-fail path.""" + + def get(self, *args: Any, **kwargs: Any) -> Any: + raise RuntimeError("network disabled in tests") + + +def test_render_artwork_preview_empty_url_returns_none() -> None: + assert render_artwork_preview(RaisingSession(), "") is None + + +def test_render_artwork_preview_soft_fails_without_network() -> None: + # Either Pillow is absent (guard returns None) or the session.get raises + # (caught and returns None). Both paths must yield None, never raise. + result: Optional[RenderableType] = render_artwork_preview(RaisingSession(), "https://example.invalid/cover.jpg") + assert result is None diff --git a/tests/core/test_music_extract.py b/tests/core/test_music_extract.py new file mode 100644 index 0000000..3b7ca08 --- /dev/null +++ b/tests/core/test_music_extract.py @@ -0,0 +1,243 @@ +"""Unit tests for the shared music helpers in unshackle.core.music.extract.""" + +from __future__ import annotations + +from typing import Any, Optional + +import pytest + +from unshackle.core.music.extract import ( + build_music_from_songs, + classify_release_kind, + dedupe_track_options, + duration_seconds, + first_number, + first_text, + format_duration, + format_names, + year_from_value, +) +from unshackle.core.music.models import MusicTrackOption +from unshackle.core.titles.music import Music, Song + + +class DummyService: + """Stand-in service class; Song only requires a type, never an instance.""" + + +def make_song( + *, + track: int = 1, + disc: int = 1, + year: int = 2020, + album: str = "Album", + artist: str = "Artist", + name: str = "Song", + album_artist: Optional[str] = None, + total_tracks: Optional[int] = None, + total_discs: Optional[int] = None, + artwork_url: Optional[str] = None, + data: Optional[Any] = None, +) -> Song: + return Song( + id_=f"{album}-{disc}-{track}", + service=DummyService, + name=name, + artist=artist, + album=album, + track=track, + disc=disc, + year=year, + album_artist=album_artist, + total_tracks=total_tracks, + total_discs=total_discs, + artwork_url=artwork_url, + data=data, + ) + + +@pytest.mark.parametrize( + ("values", "default", "expected"), + [ + ((None, "", " hello "), "", "hello"), + (("", None), "fallback", "fallback"), + (({"name": "Track"},), "", "Track"), + (({"title": "T"},), "", "T"), + (({"description": "D"},), "", "D"), + ((["a", "", "b"],), "", "a, b"), + ((123,), "", "123"), + ((), "def", "def"), + ], +) +def test_first_text(values: tuple[Any, ...], default: str, expected: str) -> None: + assert first_text(*values, default=default) == expected + + +@pytest.mark.parametrize( + ("values", "expected"), + [ + ((None, "", "12"), 12.0), + (("3.5",), 3.5), + (("abc", 7), 7.0), + ((None, ""), None), + ((), None), + ], +) +def test_first_number(values: tuple[Any, ...], expected: Optional[float]) -> None: + assert first_number(*values) == expected + + +@pytest.mark.parametrize( + ("value", "expected"), + [ + ("2021-05-04", 2021), + ("released 1999 remaster", 1999), + (2018, 2018), + ("no year here", 1900), + (None, 1900), + ("", 1900), + ], +) +def test_year_from_value(value: Any, expected: int) -> None: + assert year_from_value(value) == expected + + +@pytest.mark.parametrize( + ("value", "expected"), + [ + (250, 250.0), + (250000, 250.0), # treated as milliseconds + ("180", 180.0), + (None, None), + ("nope", None), + ], +) +def test_duration_seconds(value: Any, expected: Optional[float]) -> None: + assert duration_seconds(value) == expected + + +@pytest.mark.parametrize( + ("value", "expected"), + [ + (0, "0:00"), + (5, "0:05"), + (65, "1:05"), + (599, "9:59"), + (3600, "1:00:00"), + (3725, "1:02:05"), + (-5, "0:00"), + (None, ""), + ("bad", ""), + ], +) +def test_format_duration(value: Any, expected: str) -> None: + assert format_duration(value) == expected + + +@pytest.mark.parametrize( + ("value", "sep", "expected"), + [ + (["Alice", "Bob"], ", ", "Alice, Bob"), + (["Alice", "Alice"], ", ", "Alice"), # de-dupes + ([{"profile": {"name": "DJ"}}], ", ", "DJ"), + ({"items": [{"name": "X"}, {"name": "Y"}]}, " & ", "X & Y"), + ("Solo", ", ", "Solo"), + ], +) +def test_format_names(value: Any, sep: str, expected: str) -> None: + assert format_names(value, sep=sep) == expected + + +@pytest.mark.parametrize( + ("raw_kind", "count", "expected"), + [ + ("single", 1, "single"), + ("single", 3, "ep"), # multi-track "single" -> EP + ("EP", None, "ep"), + ("Extended Play", None, "ep"), + ("ep-single", 1, "single"), + ("ep-single", 4, "ep"), + ("album", None, "album"), + ("compilation", None, "compilation"), + ("Live Recording", None, "live"), + ("download", None, "download"), + ("playlist", None, "playlist"), + ("other", None, "other"), + ("totally-unknown", None, "album"), + ("", None, "album"), + ], +) +def test_classify_release_kind(raw_kind: str, count: Optional[float], expected: str) -> None: + assert classify_release_kind(raw_kind, count) == expected + + +def test_dedupe_track_options() -> None: + a = MusicTrackOption(codec="flac", bit_depth=16, sample_rate=44100, bitrate=None, quality_label="L", explicit=False) + a_dup = MusicTrackOption( + codec="FLAC", bit_depth=16, sample_rate=44100, bitrate=None, quality_label="L", explicit=False + ) # codec case-insensitive duplicate of `a` + b = MusicTrackOption(codec="flac", bit_depth=24, sample_rate=96000, bitrate=None, quality_label="H", explicit=False) + c = MusicTrackOption(codec="flac", bit_depth=16, sample_rate=44100, bitrate=None, quality_label="L", explicit=True) + + result = dedupe_track_options([a, a_dup, b, c]) + assert result == [a, b, c] + + +def test_build_music_from_songs() -> None: + songs = [ + make_song( + track=1, + disc=1, + year=2019, + album="Greatest", + artist="Band", + album_artist="The Band", + total_tracks=2, + total_discs=1, + artwork_url="http://art/1.jpg", + data={"duration": 200}, + ), + make_song( + track=2, + disc=1, + year=2019, + album="Greatest", + artist="Band", + total_tracks=2, + total_discs=1, + data={"duration": 100}, + ), + ] + + music = build_music_from_songs(songs, kind="album") + + assert isinstance(music, Music) + assert music.kind == "album" + assert music.title == "Greatest" + assert music.artist == "The Band" # album_artist preferred over artist + assert music.year == 2019 + assert music.total_tracks == 2 + assert music.total_discs == 1 + assert music.total_duration == 300 + assert music.artwork_url == "http://art/1.jpg" + + +def test_build_music_from_songs_overrides() -> None: + songs = [make_song(data={"duration": 60})] + music = build_music_from_songs( + songs, + kind="playlist", + title="My Mix", + artist="Various", + owner="george", + description="best of", + ) + assert music.title == "My Mix" + assert music.artist == "Various" + assert music.owner == "george" + assert music.description == "best of" + + +def test_build_music_from_songs_empty_raises() -> None: + with pytest.raises(ValueError, match="nothing here"): + build_music_from_songs([], kind="album", empty_error="nothing here") diff --git a/tests/core/test_music_folder_template.py b/tests/core/test_music_folder_template.py new file mode 100644 index 0000000..1f7c790 --- /dev/null +++ b/tests/core/test_music_folder_template.py @@ -0,0 +1,93 @@ +"""Tests for the dedicated album-folder template (`output_template.folder.albums`).""" + +from __future__ import annotations + +import warnings + +import pytest + +from unshackle.core.config import Config, config +from unshackle.core.titles.music import Song +from unshackle.core.utilities import sanitize_filename + + +class DummyService: + pass + + +class StubMediaInfo: + """Minimal MediaInfo stand-in: the template context only reads track lists.""" + + video_tracks: list = [] + audio_tracks: list = [] + + +def make_song(**overrides) -> Song: + kwargs = dict( + id_="track-0001", + service=DummyService, + name="NUEVAYoL", + artist="Bad Bunny", + album="DeBI TiRAR MaS FOToS", + track=1, + disc=1, + year=2025, + album_artist="Bad Bunny", + ) + kwargs.update(overrides) + return Song(**kwargs) + + +@pytest.fixture +def reset_folder_config(): + """Save/restore the global config's template attributes around each test.""" + saved = (config.folder_templates, config.folder_template, config.output_template) + config.folder_templates = {} + config.folder_template = "" + config.output_template = {} + yield config + config.folder_templates, config.folder_template, config.output_template = saved + + +def test_folder_fallback_when_no_templates(reset_folder_config): + song = make_song() + result = song.get_filename(StubMediaInfo(), folder=True) + assert result == sanitize_filename("Bad Bunny - DeBI TiRAR MaS FOToS (2025)", " ") + + +def test_albums_template_used(reset_folder_config): + reset_folder_config.folder_templates = {"albums": "{album_artist} - {album} ({year})"} + result = make_song().get_filename(StubMediaInfo(), folder=True) + assert result == sanitize_filename("Bad Bunny - DeBI TiRAR MaS FOToS (2025)", " ") + # Album folder must NOT carry per-track info like the song file name does. + assert "01" not in result + + +def test_albums_preferred_over_songs(reset_folder_config): + reset_folder_config.folder_templates = {"albums": "AA-{album}", "songs": "SS-{album}"} + result = make_song().get_filename(StubMediaInfo(), folder=True) + assert result.startswith("AA-") + + +def test_songs_folder_used_when_no_albums(reset_folder_config): + # Backward compatibility: the legacy "songs" folder kind still names the album folder. + reset_folder_config.folder_templates = {"songs": "SS-{album}"} + result = make_song().get_filename(StubMediaInfo(), folder=True) + assert result.startswith("SS-") + + +def test_validation_accepts_music_variables_and_albums_kind(): + with warnings.catch_warnings(): + warnings.simplefilter("error") # any warning becomes a test failure + Config( + output_template={ + "songs": "{track_number}. {title}", + "folder": {"albums": "{album_artist} - {album} ({year?})"}, + } + ) + + +def test_validation_warns_on_unknown_folder_kind(): + with pytest.warns(UserWarning, match="Unknown folder template kind"): + # A non-folder key is required so output-template validation actually runs. + Config(output_template={"songs": "{track_number}. {title}", "folder": {"bogus": "{album}"}}) diff --git a/unshackle/commands/dl.py b/unshackle/commands/dl.py index 4b14dc6..8cc0dc6 100644 --- a/unshackle/commands/dl.py +++ b/unshackle/commands/dl.py @@ -48,11 +48,21 @@ from unshackle.core.constants import DOWNLOAD_CANCELLED, DOWNLOAD_LICENCE_ONLY, from unshackle.core.credential import Credential from unshackle.core.drm import DRM_T, ClearKeyCENC, MonaLisa, PlayReady, Widevine from unshackle.core.events import events +from unshackle.core.music import ( + MusicAudioIntegrityError, + MusicMetadataResult, + MusicPlanner, + MusicRenderer, + file_md5, + verify_music_audio, + write_music_manifest, + write_music_metadata, +) from unshackle.core.proxies import Basic, Gluetun, Hola, NordVPN, SurfsharkVPN, WindscribeVPN from unshackle.core.service import Service from unshackle.core.services import Services from unshackle.core.title_cacher import get_account_hash -from unshackle.core.titles import Movie, Movies, Series, Song, Title_T +from unshackle.core.titles import Movie, Movies, Music, Series, Song, Title_T from unshackle.core.titles.episode import Episode from unshackle.core.tracks import Audio, Subtitle, Tracks, Video from unshackle.core.tracks.attachment import Attachment @@ -720,7 +730,7 @@ class dl: if not config.output_template: raise click.ClickException( "No 'output_template' configured in your unshackle.yaml.\n" - "Please add an 'output_template' section with movies, series, and songs templates.\n" + "Please add an 'output_template' section with movies, series, and songs/music templates.\n" "See unshackle-example.yaml for examples." ) @@ -1265,7 +1275,7 @@ class dl: ) with console.status( - "Authenticating with Remote Service..." if self.is_remote else "Authenticating with Service...", + "Preparing Remote Service Session..." if self.is_remote else "Preparing Service Session...", spinner="dots", ): try: @@ -1378,12 +1388,43 @@ class dl: if enrich_year and not titles.year: titles.year = enrich_year - console.print(Padding(Rule(f"[rule.text]{titles.__class__.__name__}: {titles}"), (1, 2))) + music_mode = isinstance(titles, Music) + music_collection_mode = isinstance(titles, list) and bool(titles) and all( + isinstance(title, Music) for title in titles + ) + music_titles = list(titles) if music_collection_mode else ([titles] if music_mode else []) + music_plans = {} + + if music_titles: + music_renderer = MusicRenderer() + if music_collection_mode: + collection_label_getter = getattr(service, "get_music_collection_label", None) + collection_label = ( + collection_label_getter(music_titles) + if callable(collection_label_getter) + else f"Music Collection ({len(music_titles)} Releases)" + ) + if collection_label: + console.print(Padding(Rule(f"[rule.text]{collection_label}"), (1, 2))) + if list_: + for music_title in music_titles: + music_kind = MusicRenderer.display_kind(getattr(music_title, "kind", "") or "album") + console.print(Padding(Rule(f"[rule.text]{music_kind}: {music_title}"), (1, 2))) + current_plan = MusicPlanner(service).build(music_title) + music_plans[id(music_title)] = current_plan + music_renderable = music_renderer.render_plan(current_plan, verbose=True) + if music_renderable: + console.print(Padding(music_renderable, (0, 5))) + else: + console.print(Padding(Rule(f"[rule.text]{titles.__class__.__name__}: {titles}"), (1, 2))) + console.print(Padding(titles.tree(verbose=list_titles), (0, 5))) - console.print(Padding(titles.tree(verbose=list_titles), (0, 5))) if list_titles: return + if music_titles and list_: + return + # Enables manual selection for Series when --select-titles is set if select_titles and isinstance(titles, Series): console.print(Padding(Rule("[rule.text]Select Titles"), (1, 2))) @@ -1474,6 +1515,431 @@ class dl: latest_episode_id = f"{latest_ep.season}x{latest_ep.number}" self.log.info(f"Latest episode mode: Selecting S{latest_ep.season:02}E{latest_ep.number:02}") + music_group_download = ( + bool(music_titles) + and getattr(service, "GROUP_AUDIO_DOWNLOADS", False) + and not no_mux + and not video_only + and not subs_only + and not chapters_only + and not no_audio + ) + if music_group_download: + def download_music_title(titles: Music, plan: Any) -> bool: + music_items: list[tuple[Song, Audio, Callable[..., None]]] = [] + music_song_plans = { + id(song_plan.song): song_plan + for disc in plan.discs + for song_plan in disc.songs + } + music_renderer = MusicRenderer() + music_start_time = time.time() + + music_kind = MusicRenderer.display_kind(getattr(titles, "kind", "") or "album") + console.print(Padding(Rule(f"[rule.text]{music_kind}: {titles}"), (1, 2))) + music_header = music_renderer.render_plan_header(plan) + if music_header: + console.print(Padding(music_header, (0, 5))) + + def music_track_count(count: int) -> str: + return f"{count} track{'s' if count != 1 else ''}" + + def format_elapsed_seconds(elapsed: float) -> str: + elapsed_int = int(elapsed) + minutes, seconds = divmod(elapsed_int, 60) + hours, minutes = divmod(minutes, 60) + value = f"{minutes:d}m{seconds:d}s" + return f"{hours:d}h{value}" if hours else value + + def select_music_audio(song: Song) -> None: + if not audio_description: + song.tracks.select_audio(lambda x: not x.descriptive) + if acodec: + song.tracks.select_audio(lambda x: x.codec in acodec) + if not song.tracks.audio: + codec_names = ", ".join(c.name for c in acodec) + self.log.error(f"No audio tracks matching codecs for {song.name}: {codec_names}") + sys.exit(1) + if channels: + song.tracks.select_audio(lambda x: math.ceil(x.channels) == math.ceil(channels)) + if not song.tracks.audio: + self.log.error(f"There's no {channels} Audio Track for {song.name}...") + sys.exit(1) + if no_atmos: + song.tracks.audio = [x for x in song.tracks.audio if not x.atmos] + if not song.tracks.audio: + self.log.error(f"No non-Atmos audio tracks available for {song.name}...") + sys.exit(1) + if abitrate: + song.tracks.select_audio(lambda x: x.bitrate and x.bitrate // 1000 == abitrate) + if not song.tracks.audio: + self.log.error(f"There's no {abitrate}kbps Audio Track for {song.name}...") + sys.exit(1) + if abitrate_min is not None and abitrate_max is not None: + song.tracks.select_audio( + lambda x: x.bitrate and abitrate_min <= x.bitrate // 1000 <= abitrate_max + ) + if not song.tracks.audio: + self.log.error( + f"No Audio Track in {abitrate_min}-{abitrate_max}kbps range for {song.name}..." + ) + sys.exit(1) + + audio_languages = a_lang or lang + if audio_languages: + processed_lang = [] + for language in audio_languages: + if language == "orig": + if song.language: + orig_lang = str(song.language) if hasattr(song.language, "__str__") else song.language + if orig_lang not in processed_lang: + processed_lang.append(orig_lang) + else: + self.log.warning("Original language not available for music track, skipping 'orig'") + elif language not in processed_lang: + processed_lang.append(language) + + if "best" not in processed_lang and "all" not in processed_lang: + song.tracks.audio = song.tracks.by_language( + song.tracks.audio, + processed_lang, + per_language=1, + exact_match=exact_lang, + ) + if not song.tracks.audio: + self.log.error(f"There's no {processed_lang} Audio Track for {song.name}...") + sys.exit(1) + + self.log.debug("Getting Tracks") + tracks_label = "Getting Remote Tracks..." if self.is_remote else "Getting Tracks..." + with console.status(tracks_label, spinner="dots"): + for song in titles: + events.reset() + events.subscribe(events.Types.SEGMENT_DOWNLOADED, service.on_segment_downloaded) + events.subscribe(events.Types.TRACK_DOWNLOADED, service.on_track_downloaded) + events.subscribe(events.Types.TRACK_DECRYPTED, service.on_track_decrypted) + events.subscribe(events.Types.TRACK_REPACKED, service.on_track_repacked) + events.subscribe(events.Types.TRACK_MULTIPLEX, service.on_track_multiplex) + + song.tracks.add(service.get_tracks(song), warn_only=True) + song.tracks.chapters = service.get_chapters(song) + song.tracks.sort_audio(by_language=a_lang or lang) + select_music_audio(song) + if not song.tracks.audio: + self.log.error(f"No audio tracks returned for {song.name}.") + sys.exit(1) + + music_tree = Tree( + f"[repr.number]{len(titles)}[/] {'Track' if len(titles) == 1 else 'Tracks'}", + guide_style="bright_black", + ) + for song in titles: + track = song.tracks.audio[0] + progress = Progress( + SpinnerColumn(finished_text=""), + BarColumn(), + " | ", + TimeRemainingColumn(compact=True, elapsed_when_finished=True), + " | ", + TextColumn("[progress.data.speed]{task.fields[downloaded]}"), + console=console, + speed_estimate_period=10, + ) + task = progress.add_task("", downloaded="-") + state = {"total": 100.0} + + def update_track_progress( + task_id: int = task, + _state: dict[str, float] = state, + _progress: Progress = progress, + **kwargs, + ) -> None: + if "total" in kwargs and kwargs["total"] is not None: + _state["total"] = kwargs["total"] + + downloaded_state = kwargs.get("downloaded") + if downloaded_state in {"Downloaded", "Decrypted", "[yellow]SKIPPED"}: + kwargs["completed"] = _state["total"] + _progress.update(task_id=task_id, **kwargs) + + track_table = Table.grid() + track_table.add_row(music_renderer._song_line(song, titles)) + song_plan = music_song_plans.get(id(song)) + if song_plan and song_plan.selected: + track_table.add_row(music_renderer._option_line(song_plan.selected), style="text2") + else: + track_table.add_row(str(track)[6:], style="text2") + track_table.add_row(progress) + music_tree.add(track_table, guide_style="bright_black") + music_items.append((song, track, update_track_progress)) + + download_table = Table.grid() + download_table.add_row(music_tree) + + try: + with Live(Padding(download_table, (1, 5)), console=console, refresh_per_second=5): + with ThreadPoolExecutor(downloads) as pool: + download_futures = [ + pool.submit( + track.download, + session=track.session or service.session, + no_proxy_download=no_proxy_download, + prepare_drm=partial( + partial(self.prepare_drm, table=download_table), + track=track, + title=song, + certificate=partial( + service.get_widevine_service_certificate, + title=song, + track=track, + ), + licence=partial( + service.get_playready_license + if is_playready_cdm(self.cdm) + else service.get_widevine_license, + title=song, + track=track, + ), + cdm_only=cdm_only, + vaults_only=vaults_only, + export=export_path, + ), + cdm=self.cdm, + max_workers=workers, + progress=progress_call, + ) + for song, track, progress_call in music_items + ] + for download in futures.as_completed(download_futures): + download.result() + except KeyboardInterrupt: + console.print(Padding(":x: Download Cancelled...", (0, 5, 1, 5))) + return + except Exception as e: # noqa + console.print( + Padding( + Group( + ":x: Download Failed...", + f" {type(e).__name__}: {e}", + " An unexpected error occurred in one of the download workers.", + ), + (1, 5), + ) + ) + console.print_exception() + return + + if skip_dl: + console.log("Skipped downloads as --skip-dl was used...") + else: + dl_time = time_elapsed_since(music_start_time) + console.print(Padding(f"Track downloads finished in [progress.elapsed]{dl_time}[/]", (0, 5))) + + integrity_results = {} + media_infos = {} + integrity_start = time.time() + try: + with console.status("Verifying audio integrity...", spinner="dots"): + for song, track, _ in music_items: + if not track.path or not track.path.exists(): + continue + if track.needs_repack: + track.repackage() + events.emit(events.Types.TRACK_REPACKED, track=track) + + media_info = MediaInfo.parse(track.path) + media_infos[id(track)] = media_info + integrity_results[id(track)] = verify_music_audio( + track.path, + song=song, + track=track, + media_info=media_info, + ) + except MusicAudioIntegrityError as error: + console.print(Padding(f"Audio integrity failed: {error}", (0, 5, 1, 5))) + return + integrity_time = format_elapsed_seconds(time.time() - integrity_start) + + source_md5 = {} + md5_elapsed = 0.0 + md5_start = time.time() + with console.status("Recording MD5 checksums...", spinner="dots"): + for _, track, _ in music_items: + if track.path and track.path.exists(): + source_md5[id(track)] = file_md5(track.path) + md5_elapsed += time.time() - md5_start + + metadata_results = {} + final_paths = {} + final_md5 = {} + manifest_records = [] + metadata_start = time.time() + metadata_warning = "" + used_final_paths: set[Path] = set() + with console.status("Writing music metadata...", spinner="dots"): + for song, track, _ in music_items: + if not track.path or not track.path.exists(): + continue + + media_info = media_infos.get(id(track)) or MediaInfo.parse(track.path) + final_dir = self.output_dir or config.directories.downloads + final_filename = song.get_filename(media_info, show_service=not no_source) + if not no_folder: + final_dir /= song.get_filename(media_info, show_service=not no_source, folder=True) + + final_dir.mkdir(parents=True, exist_ok=True) + final_path = final_dir / f"{final_filename}{track.path.suffix}" + sep = config.get_template_separator("songs") + if final_path in used_final_paths: + index = 2 + while final_path in used_final_paths: + final_path = final_dir / f"{final_filename.rstrip()}{sep}{index}{track.path.suffix}" + index += 1 + + try: + os.replace(track.path, final_path) + except OSError: + if final_path.exists(): + final_path.unlink() + shutil.move(track.path, final_path) + used_final_paths.add(final_path) + final_paths[id(track)] = final_path + self.completed_files.append(final_path) + + try: + metadata_results[id(track)] = write_music_metadata( + final_path, + song, + session=service.session, + source_md5=source_md5.get(id(track), ""), + ) + except Exception as error: + metadata_warning = f"{type(error).__name__}: {error}" + self.log.warning(f"Music metadata failed for {song.name}: {metadata_warning}") + metadata_results[id(track)] = MusicMetadataResult(skipped=True, reason=metadata_warning) + metadata_time = format_elapsed_seconds(time.time() - metadata_start) + + final_md5_start = time.time() + with console.status("Recording final MD5 checksums...", spinner="dots"): + for _, track, _ in music_items: + final_path = final_paths.get(id(track)) + if final_path and final_path.exists(): + final_md5[id(track)] = file_md5(final_path) + md5_elapsed += time.time() - final_md5_start + md5_time = format_elapsed_seconds(md5_elapsed) + + for song, track, _ in music_items: + final_path = final_paths.get(id(track)) + if not final_path: + continue + integrity_result = integrity_results.get(id(track)) + metadata_result = metadata_results.get(id(track), MusicMetadataResult(skipped=True, reason="not processed")) + manifest_records.append( + { + "track_number": song.track, + "disc_number": song.disc, + "title": song.name, + "artist": song.artist, + "album": song.album, + "file": str(final_path), + "source_md5": source_md5.get(id(track), ""), + "final_md5": final_md5.get(id(track), ""), + "integrity": integrity_result.to_dict() if integrity_result else {}, + "metadata": metadata_result.to_dict(), + } + ) + + if final_paths: + manifest_slug = ".".join( + part + for part in re.sub( + r"[^A-Za-z0-9._-]+", + ".", + ".".join( + str(part or "") + for part in ( + self.service, + getattr(titles, "artist", ""), + getattr(titles, "title", ""), + getattr(titles, "year", ""), + int(time.time()), + ) + ), + ) + .strip(".") + .split(".") + if part + ) + try: + write_music_manifest( + config.directories.logs / "music" / f"{manifest_slug or 'music'}.json", + release={ + "kind": getattr(titles, "kind", ""), + "title": getattr(titles, "title", ""), + "artist": getattr(titles, "artist", ""), + "year": getattr(titles, "year", None), + "service": self.service, + }, + tracks=manifest_records, + ) + except Exception as error: + self.log.warning(f"Music manifest write failed: {error}") + + album_time = time_elapsed_since(music_start_time) + release_label = MusicRenderer.display_kind(getattr(titles, "kind", "") or "music") + + integrity_count = len(integrity_results) + md5_count = len(final_md5) + metadata_written_count = sum(1 for result in metadata_results.values() if result.written) + console.print( + Padding( + f"Audio integrity verified for {music_track_count(integrity_count)} in [progress.elapsed]{integrity_time}[/]", + (0, 5), + ) + ) + console.print( + Padding( + f"MD5 checksum recorded for {music_track_count(md5_count)} in [progress.elapsed]{md5_time}[/]", + (0, 5), + ) + ) + if metadata_written_count: + console.print( + Padding( + f"Metadata written for {music_track_count(metadata_written_count)} in [progress.elapsed]{metadata_time}[/]", + (0, 5), + ) + ) + else: + reason = metadata_warning or next( + (result.reason for result in metadata_results.values() if result.reason), + "install mutagen to write music tags", + ) + console.print(Padding(f"Metadata skipped: {reason}", (0, 5))) + console.print(Padding(f"{release_label} downloaded in [progress.elapsed]{album_time}[/]!", (0, 5, 1, 5))) + + return True + + for music_title in music_titles: + current_plan = music_plans.get(id(music_title)) or MusicPlanner(service).build(music_title) + if not download_music_title(music_title, current_plan): + return + + if not hasattr(service, "close"): + cookie_file = self.get_cookie_path(self.service, self.profile) + if cookie_file: + self.save_cookies(cookie_file, service.session.cookies) + + if hasattr(service, "close"): + service.close() + + dl_time = time_elapsed_since(start_time) + console.print(Padding(f"Processed all titles in [progress.elapsed]{dl_time}", (0, 5, 1, 5))) + return + + if music_collection_mode: + raise click.ClickException("Music collections require grouped audio downloads.") + for i, title in enumerate(titles): if isinstance(title, Episode) and latest_episode and latest_episode_id: # If --latest-episode is set, only process the latest episode @@ -1482,7 +1948,12 @@ class dl: elif isinstance(title, Episode) and wanted and f"{title.season}x{title.number}" not in wanted: continue - console.print(Padding(Rule(f"[rule.text]{title}"), (1, 2))) + title_rule = ( + f"Track {title.track:02}: {title.name}" + if music_mode and isinstance(title, Song) + else str(title) + ) + console.print(Padding(Rule(f"[rule.text]{title_rule}"), (1, 2))) temp_font_files = [] if isinstance(title, Episode) and not self.tmdb_searched: @@ -2903,8 +3374,12 @@ class dl: tags.tag_file(final_path, title, self.tmdb_id, self.imdb_id) title_dl_time = time_elapsed_since(dl_start_time) + downloaded_label = "Track" if music_mode and isinstance(title, Song) else "Title" console.print( - Padding(f":tada: Title downloaded in [progress.elapsed]{title_dl_time}[/]!", (0, 5, 1, 5)) + Padding( + f":tada: {downloaded_label} downloaded in [progress.elapsed]{title_dl_time}[/]!", + (0, 5, 1, 5), + ) ) if not hasattr(service, "close"): diff --git a/unshackle/core/config.py b/unshackle/core/config.py index 4e6d1bb..eaab476 100644 --- a/unshackle/core/config.py +++ b/unshackle/core/config.py @@ -146,8 +146,17 @@ class Config: "tag", "track_number", "artist", + "album_artist", "album", "disc", + "track_total", + "disc_total", + "release_type", + "genre", + "explicit", + "isrc", + "upc", + "label", "audio", "audio_channels", "audio_full", @@ -168,8 +177,8 @@ class Config: if self.folder_template: all_templates["folder"] = self.folder_template for kind, tmpl in self.folder_templates.items(): - if kind not in {"movies", "series", "songs"}: - warnings.warn(f"Unknown folder template kind '{kind}' (expected movies/series/songs)") + if kind not in {"movies", "series", "songs", "albums"}: + warnings.warn(f"Unknown folder template kind '{kind}' (expected movies/series/songs/albums)") continue all_templates[f"folder.{kind}"] = tmpl @@ -195,7 +204,7 @@ class Config: def get_folder_template(self, kind: str) -> str: """Resolve the folder template for the given title kind. - kind: one of "movies", "series", "songs". + kind: one of "movies", "series", "songs", "albums". Falls back to the legacy single-string folder template, then "". """ if self.folder_templates: diff --git a/unshackle/core/music/__init__.py b/unshackle/core/music/__init__.py new file mode 100644 index 0000000..df6779e --- /dev/null +++ b/unshackle/core/music/__init__.py @@ -0,0 +1,40 @@ +from .display import MusicHeaderInfo, TrackRow, render_album_header, render_artwork_preview, render_track_panel +from .extract import (build_music_from_songs, classify_release_kind, dedupe_track_options, duration_seconds, + first_number, first_text, format_duration, format_names, year_from_value) +from .hasher import file_md5 +from .integrity import MusicAudioIntegrityError, MusicAudioIntegrityResult, verify_music_audio +from .manifest import write_music_manifest +from .models import MusicDiscPlan, MusicDownloadPlan, MusicSongPlan, MusicTrackOption +from .planner import MusicPlanner +from .renderer import MusicRenderer +from .tagger import MusicMetadataResult, write_music_metadata + +__all__ = ( + "MusicAudioIntegrityError", + "MusicAudioIntegrityResult", + "MusicDiscPlan", + "MusicDownloadPlan", + "MusicHeaderInfo", + "MusicMetadataResult", + "MusicPlanner", + "MusicRenderer", + "MusicSongPlan", + "MusicTrackOption", + "TrackRow", + "build_music_from_songs", + "classify_release_kind", + "dedupe_track_options", + "duration_seconds", + "file_md5", + "first_number", + "first_text", + "format_duration", + "format_names", + "render_album_header", + "render_artwork_preview", + "render_track_panel", + "verify_music_audio", + "write_music_manifest", + "write_music_metadata", + "year_from_value", +) diff --git a/unshackle/core/music/display.py b/unshackle/core/music/display.py new file mode 100644 index 0000000..08a8e74 --- /dev/null +++ b/unshackle/core/music/display.py @@ -0,0 +1,175 @@ +"""Shared rich-based rendering helpers for music releases. + +Data-in / renderable-out: services do their own quality and field extraction, +then hand plain data here. The helpers know nothing about how a ``quality_label`` +was derived; they only style and lay it out. Quality-schema parsing stays +per-service while panel/header/artwork rendering lives in one place. +""" + +from __future__ import annotations + +import re +from dataclasses import dataclass +from io import BytesIO +from typing import Any, Optional + +# RenderableType is rich's union of "things that can be printed to a console". +from rich.console import Group, RenderableType +from rich.padding import Padding +from rich.panel import Panel +from rich.table import Table +from rich.text import Text +from rich.tree import Tree + +from unshackle.core.titles.music import Song + + +@dataclass +class TrackRow: + """One row in the track tree, already formatted by the calling service. + + When ``note`` is set the row is treated as unavailable: the quality label is + styled red, ``note`` is appended muted, and the ``cd``/``hires`` badges are + suppressed. Otherwise the normal detail line is built and ``cd`` drives a + "CD" badge. + """ + + song: Song + quality_label: str + layout: str + duration_str: str + hires: bool = False + cd: bool = False + note: str = "" + + +@dataclass +class MusicHeaderInfo: + """Plain data holder for the release-header metadata grid.""" + + artist: str + album: str + year: str + track_count: int + quality_label: str + duration_str: str = "" + artist_label: str = "Artist" + + +# Quality strings look like "FLAC 16-bit/44.1kHz"; split the codec token off the +# front and bracket it so the detail line reads "[FLAC] | 16-bit/44.1kHz". +QUALITY_DETAIL_RE = re.compile(r"^(FLAC|OGG|AAC|MP3)\s+(.+)$", flags=re.IGNORECASE) + + +def format_track_detail_quality(quality_label: str) -> str: + """Bracket the leading codec token of a quality string for the detail line.""" + text = str(quality_label or "").strip() + match = QUALITY_DETAIL_RE.match(text) + if match: + return f"[{match.group(1).upper()}] | {match.group(2).strip()}" + return text + + +def render_artwork_preview( + session: Any, + artwork_url: str, + *, + width: int = 25, +) -> Optional[RenderableType]: + """Fetch artwork via ``session`` and render it as half-block coloured cells. + + Pillow (``PIL``) is an optional dependency; any error (missing PIL, network, + bad image, zero dimensions) soft-fails to ``None`` so callers skip the preview. + """ + if not artwork_url: + return None + try: + from PIL import Image + except Exception: + return None + try: + response = session.get(artwork_url, timeout=20) + response.raise_for_status() + image = Image.open(BytesIO(response.content)).convert("RGB") + if not image.width or not image.height: + return None + + height = max(1, int(width * image.height / image.width * 0.5)) + # Image.Resampling exists on Pillow >= 9.1; fall back gracefully on older. + resampling = getattr(getattr(Image, "Resampling", Image), "LANCZOS", 1) + image = image.resize((width, height * 2), resampling) + + lines = [] + for y in range(0, image.height, 2): + line = Text() + for x in range(image.width): + top = image.getpixel((x, y)) + bottom = image.getpixel((x, min(y + 1, image.height - 1))) + top_color = f"#{top[0]:02x}{top[1]:02x}{top[2]:02x}" + bottom_color = f"#{bottom[0]:02x}{bottom[1]:02x}{bottom[2]:02x}" + # Lower-half block fg=bottom/bg=top packs two pixel rows per cell. + line.append("▄", style=f"{bottom_color} on {top_color}") + lines.append(line) + return Group(*lines) + except Exception: + return None + + +def render_track_panel(rows: list[TrackRow], total: int) -> Panel: + """Render the per-track tree (one node + detail line each) in a Panel.""" + track_label = "Track" if total == 1 else "Tracks" + tree = Tree(f"[repr.number]{total}[/] {track_label}", guide_style="bright_black") + for row in rows: + title_line = Text(f"{row.song.track:02}", style="repr.number") + title_line.append(" ") + title_line.append(row.song.name, style="bold #009900") + node = tree.add(title_line, guide_style="bright_black") + + detail = Text() + if row.note: + detail.append(row.quality_label, style="red") + detail.append(" ") + detail.append(row.note, style="bright_black") + else: + detail.append(format_track_detail_quality(row.quality_label)) + if row.layout: + detail.append(" | ") + detail.append(row.layout) + if row.duration_str: + detail.append(" | ") + detail.append(row.duration_str) + if row.cd: + detail.append(" | ") + detail.append("CD", style="yellow1") + if row.hires: + detail.append(" | ") + detail.append("Hi-Res", style="gold1") + node.add(detail, guide_style="bright_black") + return Panel(tree, title="Available Tracks") + + +def render_album_header( + info: MusicHeaderInfo, + artwork: Optional[RenderableType] = None, +) -> Optional[RenderableType]: + """Render the release metadata grid, optionally placing artwork beside it.""" + # Table.grid is a borderless table used purely for column alignment. + grid = Table.grid(padding=(0, 2)) + grid.add_column(style="orchid1", no_wrap=True) + grid.add_column() + grid.add_row(info.artist_label, Text(info.artist, style="bold #ff0000")) + grid.add_row("Collection", Text(info.album, style="blue")) + grid.add_row("Year", Text(info.year, style="blue")) + grid.add_row("Tracks", Text(str(info.track_count), style="blue")) + grid.add_row("Quality", Text(info.quality_label, style="blue")) + if info.duration_str: + grid.add_row("Length", Text(info.duration_str, style="blue")) + + if not artwork: + return grid + + header = Table.grid(expand=True, padding=(0, 2)) + header.add_column(no_wrap=True) + header.add_column(ratio=1) + header.add_row(artwork, Padding(grid, (3, 0, 0, 0))) + return header diff --git a/unshackle/core/music/extract.py b/unshackle/core/music/extract.py new file mode 100644 index 0000000..f8bcaad --- /dev/null +++ b/unshackle/core/music/extract.py @@ -0,0 +1,196 @@ +"""Shared, stateless helpers for music services. + +These functions consolidate the generic data-shaping logic that music services +otherwise each duplicate: first-non-empty getters, duration/year/name +formatting, release-kind classification, track-option dedupe, and Song -> Music +assembly. They take plain data in and return plain data out (no ``self``), so +any music service can reuse them. +""" + +from __future__ import annotations + +import re +from typing import Any, Optional + +from unshackle.core.music.models import MusicTrackOption +from unshackle.core.titles.music import Music, Song + + +def first_text(*values: Any, default: str = "") -> str: + """Return the first non-empty stripped string across ``values``. + + Dicts are searched by common label keys; lists are joined with ", ". + Falls back to ``default`` when nothing usable is found. + """ + for value in values: + if value in (None, "", [], {}): + continue + if isinstance(value, dict): + for key in ("name", "title", "display", "display_name", "description", "url"): + nested = value.get(key) + text = first_text(nested) if isinstance(nested, (dict, list)) else str(nested or "").strip() + if text: + return text + elif isinstance(value, list): + parts = [first_text(item) for item in value] + text = ", ".join(part for part in parts if part) + if text: + return text + else: + text = str(value).strip() + if text: + return text + return default + + +def first_number(*values: Any) -> Optional[float]: + """Return the first value parseable as a float, else ``None``.""" + for value in values: + if value in (None, ""): + continue + try: + return float(value) + except (TypeError, ValueError): + continue + return None + + +def year_from_value(value: Any) -> int: + """Extract a 4-digit year from ``value``; fall back to 1900 when absent.""" + match = re.search(r"(?P\d{4})", str(value or "")) + return int(match.group("year")) if match else 1900 + + +def duration_seconds(value: Any) -> Optional[float]: + """Coerce a duration to seconds, treating large numbers (>10000) as milliseconds.""" + number = first_number(value) + if number is None: + return None + if number > 10_000: + return number / 1000 + return number + + +def format_duration(value: Any) -> str: + """Format a duration in seconds as ``H:MM:SS`` (or ``M:SS`` under an hour).""" + seconds_value = first_number(value) + if seconds_value is None: + return "" + total = max(0, int(round(seconds_value))) + minutes, remaining = divmod(total, 60) + hours, minutes = divmod(minutes, 60) + if hours: + return f"{hours}:{minutes:02}:{remaining:02}" + return f"{minutes}:{remaining:02}" + + +def format_names(value: Any, sep: str = ", ") -> str: + """Join a list/dict of artist-like entries into a de-duplicated string.""" + names: list[str] = [] + values: Any = value + if isinstance(values, dict): + values = values.get("items") or values.get("artists") or [values] + if not isinstance(values, list): + values = [values] + for item in values: + name = first_text( + item.get("profile") if isinstance(item, dict) else None, + item.get("artist") if isinstance(item, dict) else None, + item if isinstance(item, dict) else None, + item, + default="", + ) + if name and name not in names: + names.append(name) + return sep.join(names) + + +def classify_release_kind(raw_kind: str, tracks_count: Optional[float]) -> str: + """Normalise an already-extracted release-kind string to a canonical value. + + Returns one of ``single | ep | album | compilation | live | download | + playlist | other``. ``tracks_count`` disambiguates "single" (1 track) from + an EP (more than 1 track) for sources that label EPs as singles; pass the + real track count (or ``None`` only when genuinely unknown) — it is required + so no caller silently mislabels a multi-track "single" as an EP by omission. + """ + key = re.sub(r"[^a-z0-9]+", "", str(raw_kind or "").lower()) + + if key in {"single"}: + return "single" if tracks_count == 1 else "ep" + if key in {"ep", "extendedplay"}: + return "ep" + if key in {"epsingle", "epsingles"}: + return "single" if tracks_count == 1 else "ep" + if key in {"compilation", "compilations"}: + return "compilation" + if key in {"live", "liverecording"}: + return "live" + if key in {"download", "downloads"}: + return "download" + if key in {"playlist", "playlists"}: + return "playlist" + if key in {"other"}: + return "other" + return "album" + + +def dedupe_track_options(options: list[MusicTrackOption]) -> list[MusicTrackOption]: + """Drop duplicate track options keyed on codec/quality identity, preserving order.""" + seen: set[tuple[str, Optional[int], Optional[int], Optional[int], str, bool]] = set() + unique: list[MusicTrackOption] = [] + for option in options: + key = ( + str(option.codec or "").upper(), + option.bit_depth, + option.sample_rate, + option.bitrate, + option.quality_label, + option.explicit, + ) + if key in seen: + continue + seen.add(key) + unique.append(option) + return unique + + +def build_music_from_songs( + songs: list[Song], + *, + kind: str, + title: Optional[str] = None, + artist: Optional[str] = None, + owner: Optional[str] = None, + description: Optional[str] = None, + empty_error: str = "No songs were returned.", +) -> Music: + """Assemble a :class:`Music` release from a list of :class:`Song` objects. + + Aggregates artwork and total duration from each song's ``data`` payload and + derives track/disc totals. Raises ``ValueError(empty_error)`` on an empty list. + """ + if not songs: + raise ValueError(empty_error) + + first_song = songs[0] + artwork_url = "" + total_duration = 0 + for song in songs: + data = song.data if isinstance(song.data, dict) else {} + artwork_url = artwork_url or first_text(song.artwork_url, data.get("artwork_url")) + total_duration += int(first_number(data.get("duration")) or 0) + + return Music( + songs, + kind=kind, + title=title or first_song.album, + artist=artist or first_song.album_artist or first_song.artist, + year=first_song.year, + total_tracks=max((song.total_tracks or song.track for song in songs), default=len(songs)), + total_discs=max((song.total_discs or song.disc for song in songs), default=1), + artwork_url=artwork_url or None, + total_duration=total_duration or None, + owner=owner, + description=description, + ) diff --git a/unshackle/core/music/hasher.py b/unshackle/core/music/hasher.py new file mode 100644 index 0000000..548f04b --- /dev/null +++ b/unshackle/core/music/hasher.py @@ -0,0 +1,16 @@ +from __future__ import annotations + +import hashlib +from pathlib import Path + + +def file_md5(path: Path, chunk_size: int = 1024 * 1024) -> str: + """Return the MD5 checksum for a local file.""" + digest = hashlib.md5() + with path.open("rb") as file: + for chunk in iter(lambda: file.read(chunk_size), b""): + digest.update(chunk) + return digest.hexdigest() + + +__all__ = ("file_md5",) diff --git a/unshackle/core/music/integrity.py b/unshackle/core/music/integrity.py new file mode 100644 index 0000000..8ade62f --- /dev/null +++ b/unshackle/core/music/integrity.py @@ -0,0 +1,187 @@ +from __future__ import annotations + +import subprocess +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Optional + +from pymediainfo import MediaInfo + +from unshackle.core import binaries + + +class MusicAudioIntegrityError(Exception): + """Raised when a downloaded music audio file cannot be trusted.""" + + +@dataclass +class MusicAudioIntegrityResult: + path: Path + size: int + codec: str = "" + duration: Optional[float] = None + bit_depth: Optional[int] = None + sample_rate: Optional[int] = None + channels: Optional[float] = None + flac_tested: bool = False + warnings: list[str] = field(default_factory=list) + + def to_dict(self) -> dict[str, Any]: + return { + "path": str(self.path), + "size": self.size, + "codec": self.codec, + "duration": self.duration, + "bit_depth": self.bit_depth, + "sample_rate": self.sample_rate, + "channels": self.channels, + "flac_tested": self.flac_tested, + "warnings": self.warnings, + } + + +def verify_music_audio(path: Path, *, song: Any = None, track: Any = None, media_info: Optional[MediaInfo] = None) -> MusicAudioIntegrityResult: + """Verify a downloaded music audio file after it is playable/decrypted.""" + path = Path(path) + if not path.exists(): + raise MusicAudioIntegrityError(f"Audio file is missing: {path}") + size = path.stat().st_size + if size <= 3: + raise MusicAudioIntegrityError(f"Audio file is empty: {path}") + + media_info = media_info or MediaInfo.parse(path) + audio_track = next(iter(media_info.audio_tracks or []), None) + if not audio_track: + raise MusicAudioIntegrityError(f"MediaInfo could not find an audio stream in: {path.name}") + + duration = _duration_seconds(getattr(audio_track, "duration", None)) + expected_duration = _expected_duration(song, track) + warnings: list[str] = [] + if expected_duration and duration: + tolerance = max(3.0, expected_duration * 0.02) + if abs(duration - expected_duration) > tolerance: + raise MusicAudioIntegrityError( + f"{path.name} duration mismatch: expected {expected_duration:.0f}s, got {duration:.0f}s" + ) + + result = MusicAudioIntegrityResult( + path=path, + size=size, + codec=_first_text( + getattr(audio_track, "format", None), + getattr(audio_track, "commercial_name", None), + getattr(audio_track, "codec_id", None), + ), + duration=duration, + bit_depth=_first_int(getattr(audio_track, "bit_depth", None)), + sample_rate=_first_int(getattr(audio_track, "sampling_rate", None)), + channels=_first_float( + getattr(audio_track, "channel_s", None), + getattr(audio_track, "channel_s_original", None), + getattr(track, "channels", None), + ), + warnings=warnings, + ) + + expected_size = _expected_size(track) + if expected_size and result.size != expected_size: + result.warnings.append(f"Size differs from service metadata: expected {expected_size}, got {result.size}") + + if path.suffix.lower() == ".flac": + flac = binaries.find("flac") + if flac: + completed = subprocess.run( + [str(flac), "-t", "-s", str(path)], + check=False, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + encoding="utf-8", + errors="replace", + ) + if completed.returncode != 0: + detail = (completed.stderr or completed.stdout or "").strip() + raise MusicAudioIntegrityError(f"FLAC stream test failed for {path.name}: {detail}") + result.flac_tested = True + else: + result.warnings.append("flac binary not available; FLAC stream test skipped") + + return result + + +def _expected_duration(song: Any, track: Any) -> Optional[float]: + data = getattr(song, "data", None) + if isinstance(data, dict): + metadata = data.get("metadata") if isinstance(data.get("metadata"), dict) else {} + value = _first_float(data.get("duration"), metadata.get("duration")) + if value: + return value + track_data = getattr(track, "data", None) + if isinstance(track_data, dict): + metadata = track_data.get("metadata") if isinstance(track_data.get("metadata"), dict) else {} + value = _first_float(track_data.get("duration"), metadata.get("duration")) + if value: + return value + return None + + +def _expected_size(track: Any) -> Optional[int]: + data = getattr(track, "data", None) + if not isinstance(data, dict): + return None + sources = [ + data, + data.get("file_info") if isinstance(data.get("file_info"), dict) else {}, + data.get("audio_info") if isinstance(data.get("audio_info"), dict) else {}, + data.get("source_info") if isinstance(data.get("source_info"), dict) else {}, + data.get("stream_info") if isinstance(data.get("stream_info"), dict) else {}, + data.get("metadata") if isinstance(data.get("metadata"), dict) else {}, + ] + for source in sources: + value = _first_int( + source.get("content_length"), + source.get("contentLength"), + source.get("file_size"), + source.get("filesize"), + source.get("size"), + ) + if value: + return value + return None + + +def _duration_seconds(value: Any) -> Optional[float]: + number = _first_float(value) + if number is None: + return None + if number > 1000: + return number / 1000 + return number + + +def _first_text(*values: Any) -> str: + for value in values: + text = str(value or "").strip() + if text: + return text + return "" + + +def _first_float(*values: Any) -> Optional[float]: + for value in values: + if value in (None, "", [], {}): + continue + text = str(value).strip().replace(" ", "") + try: + return float(text) + except (TypeError, ValueError): + continue + return None + + +def _first_int(*values: Any) -> Optional[int]: + value = _first_float(*values) + return int(value) if value is not None else None + + +__all__ = ("MusicAudioIntegrityError", "MusicAudioIntegrityResult", "verify_music_audio") diff --git a/unshackle/core/music/manifest.py b/unshackle/core/music/manifest.py new file mode 100644 index 0000000..fee4d30 --- /dev/null +++ b/unshackle/core/music/manifest.py @@ -0,0 +1,23 @@ +from __future__ import annotations + +import json +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + + +def write_music_manifest(path: Path, *, release: dict[str, Any], tracks: list[dict[str, Any]]) -> Path: + """Write a compact JSON manifest for music integrity, checksums, and metadata results.""" + path = Path(path) + path.parent.mkdir(parents=True, exist_ok=True) + payload = { + "schema": "unshackle.music.manifest.v1", + "generated_at": datetime.now(timezone.utc).isoformat(), + "release": release, + "tracks": tracks, + } + path.write_text(json.dumps(payload, indent=2, ensure_ascii=False), encoding="utf-8") + return path + + +__all__ = ("write_music_manifest",) diff --git a/unshackle/core/music/models.py b/unshackle/core/music/models.py new file mode 100644 index 0000000..69682dc --- /dev/null +++ b/unshackle/core/music/models.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from pathlib import Path +from typing import TYPE_CHECKING, Optional + +if TYPE_CHECKING: + from unshackle.core.titles.music import Song + + +@dataclass +class MusicTrackOption: + codec: str + bit_depth: Optional[int] = None + sample_rate: Optional[int] = None + bitrate: Optional[int] = None + channels: Optional[float] = None + lossless: bool = False + hires: bool = False + atmos: bool = False + explicit: bool = False + duration: Optional[int] = None + quality_label: str = "" + + +@dataclass +class MusicSongPlan: + song: Song + options: list[MusicTrackOption] = field(default_factory=list) + selected: Optional[MusicTrackOption] = None + output_path: Optional[Path] = None + fallback_used: bool = False + skip_reason: str = "" + + +@dataclass +class MusicDiscPlan: + disc_number: int + songs: list[MusicSongPlan] = field(default_factory=list) + + +@dataclass +class MusicDownloadPlan: + kind: str + title: str + artist: str + album_artist: str = "" + year: Optional[int] = None + released: str = "" + genre: str = "" + label: str = "" + artwork_url: Optional[str] = None + total_tracks: Optional[int] = None + total_discs: Optional[int] = None + total_duration: Optional[int] = None + discs: list[MusicDiscPlan] = field(default_factory=list) + quality_requested: str = "best" + fallback_mode: str = "next-best" diff --git a/unshackle/core/music/planner.py b/unshackle/core/music/planner.py new file mode 100644 index 0000000..9f6797f --- /dev/null +++ b/unshackle/core/music/planner.py @@ -0,0 +1,208 @@ +from __future__ import annotations + +import re +from collections import defaultdict +from typing import Any + +from unshackle.core.music.models import MusicDiscPlan, MusicDownloadPlan, MusicSongPlan, MusicTrackOption +from unshackle.core.titles.music import Music, Song + + +class MusicPlanner: + """Build a service-neutral music list/download plan for native Music rendering.""" + + def __init__(self, service: Any): + self.service = service + + def build(self, music: Music) -> MusicDownloadPlan: + first_song = music[0] if music else None + first_data = first_song.data if first_song and isinstance(first_song.data, dict) else {} + first_metadata = first_data.get("metadata") if isinstance(first_data.get("metadata"), dict) else {} + plan = MusicDownloadPlan( + kind=getattr(music, "kind", "music"), + title=getattr(music, "title", None) or (first_song.album if first_song else ""), + artist=getattr(music, "artist", None) or (first_song.album_artist or first_song.artist if first_song else ""), + album_artist=(first_song.album_artist if first_song else "") or "", + year=getattr(music, "year", None) or (first_song.year if first_song else None), + released=self._first_text( + getattr(music, "released", None), + getattr(music, "release_date", None), + first_data.get("release_date"), + first_data.get("released_at"), + first_metadata.get("release_date"), + first_metadata.get("released_at"), + ), + genre=(first_song.genre if first_song else "") or "", + label=(first_song.label if first_song else "") or "", + artwork_url=getattr(music, "artwork_url", None), + total_tracks=getattr(music, "total_tracks", None) or len(music), + total_discs=getattr(music, "total_discs", None) or self._max_value(music, "disc"), + total_duration=getattr(music, "total_duration", None) or self._sum_duration(music), + quality_requested=self._quality_requested(music), + ) + + selected_options: list[MusicTrackOption] = [] + discs: dict[int, list[MusicSongPlan]] = defaultdict(list) + for song in music: + options = self._get_options(song) + selected = options[0] if options else None + if selected: + selected_options.append(selected) + discs[song.disc].append( + MusicSongPlan( + song=song, + options=options, + selected=selected, + ) + ) + + for disc_number in sorted(discs): + plan.discs.append(MusicDiscPlan(disc_number=disc_number, songs=discs[disc_number])) + + quality = self._quality_summary_from_options(selected_options) + if quality: + plan.quality_requested = quality + + return plan + + def _get_options(self, song: Song) -> list[MusicTrackOption]: + provider = getattr(self.service, "get_music_track_options", None) + if callable(provider): + options = provider(song) + if options: + return options + option = self._option_from_song(song) + return [option] if option else [] + + @staticmethod + def _option_from_song(song: Song) -> MusicTrackOption | None: + data = song.data if isinstance(song.data, dict) else {} + metadata = data.get("metadata") if isinstance(data.get("metadata"), dict) else {} + quality = MusicPlanner._first_text(data.get("quality"), metadata.get("quality"), metadata.get("quality_label")) + duration = MusicPlanner._first_number(data.get("duration"), metadata.get("duration")) + if not quality and duration is None: + return None + + codec = MusicPlanner._codec_from_quality(quality) + bit_depth, sample_rate = MusicPlanner._quality_numbers(quality) + bitrate = MusicPlanner._bitrate_from_quality(quality) + hires = bool((bit_depth and bit_depth > 16) or (sample_rate and sample_rate > 48000)) + lossless = codec in {"FLAC", "ALAC", "WAV", "AIFF"} or "lossless" in quality.lower() + atmos = "atmos" in quality.lower() or bool(data.get("atmos") or metadata.get("atmos")) + explicit = bool(getattr(song, "explicit", None) or data.get("explicit") or metadata.get("explicit")) + + return MusicTrackOption( + codec=codec, + bit_depth=bit_depth, + sample_rate=sample_rate, + bitrate=bitrate, + channels=MusicPlanner._first_number(data.get("channels"), metadata.get("channels")), + lossless=lossless, + hires=hires, + atmos=atmos, + explicit=explicit, + duration=int(duration) if duration is not None else None, + quality_label=quality, + ) + + def _quality_requested(self, music: Music) -> str: + service_quality = getattr(self.service, "quality", None) + if service_quality: + quality_map = { + 27: "Hi-Res Lossless", + 7: "Hi-Res Lossless", + 6: "Lossless", + 5: "MP3", + } + return quality_map.get(service_quality, str(service_quality)) + + first_song = music[0] if music else None + if not first_song: + return "" + data = first_song.data if isinstance(first_song.data, dict) else {} + return self._first_text(data.get("quality")) + + @staticmethod + def _first_text(*values: Any) -> str: + for value in values: + if value in (None, ""): + continue + text = str(value).strip() + if text: + return text + return "" + + @staticmethod + def _first_number(*values: Any) -> float | None: + for value in values: + if value in (None, ""): + continue + try: + return float(value) + except (TypeError, ValueError): + continue + return None + + @staticmethod + def _codec_from_quality(value: str) -> str: + lowered = value.lower() + for codec in ("flac", "alac", "aac", "mp3", "wav", "aiff"): + if codec in lowered: + return codec.upper() + return "" + + @staticmethod + def _quality_numbers(value: str) -> tuple[int | None, int | None]: + bit_depth = None + sample_rate = None + bit_match = re.search(r"(?P\d+)\s*[- ]?bit", value.lower()) + if bit_match: + bit_depth = int(bit_match.group("bits")) + sample_match = re.search(r"(?P\d+(?:\.\d+)?)\s*k(?:hz)?", value.lower()) + if sample_match: + sample_rate = int(float(sample_match.group("rate")) * 1000) + return bit_depth, sample_rate + + @staticmethod + def _bitrate_from_quality(value: str) -> int | None: + match = re.search(r"(?P\d+)\s*kb/s", value.lower()) + if not match: + return None + return int(match.group("rate")) * 1000 + + @staticmethod + def _quality_summary_from_options(options: list[MusicTrackOption]) -> str: + if not options: + return "" + + codecs = {str(option.codec or "").upper() for option in options if option.codec} + labels = [option.quality_label for option in options if option.quality_label] + + if any(option.atmos for option in options): + return "Dolby Atmos" + if any(option.hires and (option.lossless or str(option.codec or "").upper() in {"FLAC", "ALAC", "WAV", "AIFF"}) for option in options): + return "Hi-Res Lossless" + if any(option.lossless or str(option.codec or "").upper() in {"FLAC", "ALAC", "WAV", "AIFF"} for option in options): + return "Lossless" + if "AAC" in codecs: + return "AAC" + if "MP3" in codecs: + return "MP3" + return MusicPlanner._first_text(*labels) + + @staticmethod + def _sum_duration(music: Music) -> int | None: + total = 0 + for song in music: + data = song.data if isinstance(song.data, dict) else {} + value = MusicPlanner._first_number(data.get("duration")) + total += int(value or 0) + return total or None + + @staticmethod + def _max_value(music: Music, attr: str) -> int | None: + values = [getattr(song, attr, 0) or 0 for song in music] + return max(values, default=0) or None + + +__all__ = ("MusicPlanner",) diff --git a/unshackle/core/music/renderer.py b/unshackle/core/music/renderer.py new file mode 100644 index 0000000..6bfaf44 --- /dev/null +++ b/unshackle/core/music/renderer.py @@ -0,0 +1,510 @@ +from __future__ import annotations + +import re +from typing import Any, Optional + +from rich.console import Group, RenderableType +from rich.markup import escape +from rich.panel import Panel +from rich.table import Table +from rich.text import Text +from rich.tree import Tree + +from unshackle.core.music.models import MusicDownloadPlan, MusicTrackOption +from unshackle.core.titles.music import Music, Song + + +class MusicRenderer: + """Render native Music title containers for the CLI.""" + + COMPACT_TRACK_LIMIT = 8 + + def render(self, music: Music, *, verbose: bool = False) -> RenderableType: + header = self.render_header(music) + tracks = self.render_tracks(music, verbose=verbose) + if header: + return Group(header, Text(""), tracks) + return tracks + + def render_plan(self, plan: MusicDownloadPlan, *, verbose: bool = True) -> RenderableType: + header = self.render_plan_header(plan) + tracks = self.render_plan_tracks(plan, verbose=verbose) + if header: + return Group(header, Text(""), tracks) + return tracks + + def render_header(self, music: Music) -> Optional[Table]: + if not music: + return None + + first_song = music[0] + data = first_song.data if isinstance(first_song.data, dict) else {} + metadata = data.get("metadata") if isinstance(data.get("metadata"), dict) else {} + title = music.title or first_song.album + artist = music.artist or first_song.album_artist or first_song.artist + year = getattr(music, "year", None) or first_song.year + kind = self.display_kind(music.kind) + explicit = any(bool(getattr(song, "explicit", None)) for song in music) + total_tracks = getattr(music, "total_tracks", None) or len(music) + total_discs = getattr(music, "total_discs", None) or self._max_value(music, "disc") + released = self._format_release_date( + self._first_value( + getattr(music, "released", None), + getattr(music, "release_date", None), + data.get("release_date"), + data.get("released_at"), + metadata.get("release_date"), + metadata.get("released_at"), + ) + ) + length = self._format_total_duration( + getattr(music, "total_duration", None) or self._sum_duration(music) + ) + quality = self._quality_summary( + self._first_text( + getattr(music, "quality", None), + data.get("quality"), + metadata.get("quality"), + metadata.get("quality_label"), + ), + lossless=self._as_bool(self._first_value(data.get("lossless"), metadata.get("lossless"))), + hires=self._as_bool(self._first_value(data.get("hires"), metadata.get("hires"))), + ) + + grid = self._metadata_grid() + + grid.add_row(Text("Title", style="bright_black"), Text(str(title))) + grid.add_row(Text("Artist", style="bright_black"), Text(str(artist))) + grid.add_row(Text("Type", style="bright_black"), self._kind_text(kind, explicit=explicit)) + if released: + grid.add_row(Text("Released", style="bright_black"), Text(released)) + if year: + grid.add_row(Text("Year", style="bright_black"), Text(str(year))) + grid.add_row(Text("Tracks", style="bright_black"), Text(str(total_tracks))) + if total_discs and total_discs > 1: + grid.add_row(Text("Discs", style="bright_black"), Text(str(total_discs))) + if length: + grid.add_row(Text("Length", style="bright_black"), Text(length)) + if quality: + grid.add_row(Text("Quality", style="bright_black"), Text(quality)) + if first_song.genre: + grid.add_row(Text("Genre", style="bright_black"), Text(first_song.genre)) + if first_song.label: + grid.add_row(Text("Label", style="bright_black"), Text(first_song.label)) + + return grid + + def render_tracks(self, music: Music, *, verbose: bool = False) -> Panel: + total = len(music) + track_label = "Track" if total == 1 else "Tracks" + tree = Tree(f"[repr.number]{total}[/] {track_label}", guide_style="bright_black") + + visible_songs = list(music) + if not verbose and len(visible_songs) > self.COMPACT_TRACK_LIMIT: + visible_songs = visible_songs[: self.COMPACT_TRACK_LIMIT] + + for song in visible_songs: + node = tree.add(self._song_line(song, music), guide_style="bright_black") + option = self._option_from_song(song) + if option: + node.add(option, guide_style="bright_black") + + hidden = total - len(visible_songs) + if hidden > 0: + suffix = "s" if hidden != 1 else "" + tree.add(f"[bright_black]... {hidden} more track{suffix}[/]", guide_style="bright_black") + + return Panel(tree, title="Available Tracks") + + def render_plan_header(self, plan: MusicDownloadPlan) -> Optional[Table]: + title = plan.title + artist = plan.artist or plan.album_artist + kind = self.display_kind(plan.kind) + explicit = any( + bool(getattr(song_plan.song, "explicit", None) or (song_plan.selected and song_plan.selected.explicit)) + for disc in plan.discs + for song_plan in disc.songs + ) + released = self._format_release_date(getattr(plan, "released", None) or getattr(plan, "release_date", None)) + length = self._format_total_duration(plan.total_duration) + quality = self._quality_summary(plan.quality_requested) + + grid = self._metadata_grid() + if title: + grid.add_row(Text("Title", style="bright_black"), Text(str(title))) + if artist: + grid.add_row(Text("Artist", style="bright_black"), Text(str(artist))) + grid.add_row(Text("Type", style="bright_black"), self._kind_text(kind, explicit=explicit)) + if released: + grid.add_row(Text("Released", style="bright_black"), Text(released)) + if plan.year: + grid.add_row(Text("Year", style="bright_black"), Text(str(plan.year))) + if plan.total_tracks: + grid.add_row(Text("Tracks", style="bright_black"), Text(str(plan.total_tracks))) + if plan.total_discs and plan.total_discs > 1: + grid.add_row(Text("Discs", style="bright_black"), Text(str(plan.total_discs))) + if length: + grid.add_row(Text("Length", style="bright_black"), Text(length)) + if quality: + grid.add_row(Text("Quality", style="bright_black"), Text(quality)) + if plan.genre: + grid.add_row(Text("Genre", style="bright_black"), Text(str(plan.genre))) + if plan.label: + grid.add_row(Text("Label", style="bright_black"), Text(str(plan.label))) + return grid + + def render_plan_tracks(self, plan: MusicDownloadPlan, *, verbose: bool = True) -> Panel: + songs = [song_plan for disc in plan.discs for song_plan in disc.songs] + total = len(songs) + track_label = "Track" if total == 1 else "Tracks" + tree = Tree(f"[repr.number]{total}[/] {track_label}", guide_style="bright_black") + + visible_songs = songs + if not verbose and len(visible_songs) > self.COMPACT_TRACK_LIMIT: + visible_songs = visible_songs[: self.COMPACT_TRACK_LIMIT] + + for song_plan in visible_songs: + node = tree.add(self._song_line(song_plan.song, plan), guide_style="bright_black") + if song_plan.skip_reason: + node.add(f"[yellow]Skipped:[/] {escape(song_plan.skip_reason)}", guide_style="bright_black") + continue + for option in song_plan.options: + node.add(self._option_line(option), guide_style="bright_black") + + hidden = total - len(visible_songs) + if hidden > 0: + suffix = "s" if hidden != 1 else "" + tree.add(f"[bright_black]... {hidden} more track{suffix}[/]", guide_style="bright_black") + + return Panel(tree, title="Available Tracks") + + @staticmethod + def _metadata_grid() -> Table: + grid = Table.grid(padding=(0, 2)) + grid.add_column(style="orchid1", no_wrap=True) + grid.add_column() + return grid + + @staticmethod + def display_kind(kind: Any) -> str: + text = str(kind or "music").strip() + key = re.sub(r"[^a-z0-9]+", "", text.lower()) + labels = { + "album": "Album", + "single": "Single", + "ep": "EP", + "epsingle": "Single", + "playlist": "Playlist", + "compilation": "Compilation", + "live": "Live", + "download": "Download", + "other": "Other", + "track": "Track", + "music": "Music", + } + if key in labels: + return labels[key] + return text.replace("_", " ").replace("-", " ").title() + + def _song_line(self, song: Song, music: Music | MusicDownloadPlan) -> Text: + number = f"{song.disc}.{song.track:02}" if song.disc > 1 else f"{song.track:02}" + line = Text() + line.append(number, style="repr.number") + line.append(" ") + line.append(song.name, style="rule.text") + kind = getattr(music, "kind", "").lower() + release_artist = getattr(music, "artist", None) or getattr(music, "album_artist", None) + if kind == "playlist" and song.artist and song.artist != release_artist: + line.append(f" - {song.artist}", style="bright_black") + return line + + def _option_from_song(self, song: Song) -> Text | str: + data = song.data if isinstance(song.data, dict) else {} + metadata = data.get("metadata") if isinstance(data.get("metadata"), dict) else {} + + quality = self._first_text(data.get("quality"), metadata.get("quality"), metadata.get("quality_label")) + duration = self._format_duration(self._first_value(data.get("duration"), metadata.get("duration"))) + reason = self._first_text( + data.get("unavailable_reason"), + data.get("skip_reason"), + metadata.get("unavailable_reason"), + metadata.get("skip_reason"), + ) + if reason: + return f"[yellow]Skipped:[/] {escape(reason)}" + + badges = [] + if song.explicit: + badges.append(("E", "bold bright_red")) + if self._as_bool(self._first_value(data.get("atmos"), metadata.get("atmos"))): + badges.append(("Atmos", "magenta")) + if self._is_hires_quality(quality): + badges.append(("Hi-Res", "gold1")) + + details = [] + if quality: + details.append(quality) + if duration: + details.append(duration) + if not details and not badges: + return "" + return self._format_option_text(details, badges) + + def _option_line(self, option: MusicTrackOption) -> Text: + parts = [] + codec = str(option.codec or "").strip() + if codec: + parts.append(f"[{codec}]") + if option.quality_label: + parts.extend(self._split_quality_label(option.quality_label, codec)) + elif option.bit_depth and option.sample_rate: + parts.append(f"{option.bit_depth}-bit/{self._format_sample_rate(option.sample_rate)}") + elif option.bitrate: + parts.append(f"{int(option.bitrate / 1000)} kb/s") + if option.channels: + parts.append(self._format_channels(option.channels)) + if option.duration: + parts.append(self._format_duration(option.duration)) + badges = [] + if option.explicit: + badges.append(("E", "bold bright_red")) + if option.atmos: + badges.append(("Atmos", "magenta")) + if self._is_cd_option(option): + badges.append(("CD", "yellow1")) + if option.hires: + badges.append(("Hi-Res", "gold1")) + return self._format_option_text(parts, badges) + + @staticmethod + def _kind_text(kind: str, *, explicit: bool = False) -> Text: + text = Text(str(kind)) + if explicit: + text.append(" Explicit", style="bold red") + return text + + @staticmethod + def _format_option_text(parts: list[str], badges: list[tuple[str, str]]) -> Text: + text = Text(style="text2") + first = True + for part in parts: + if not part: + continue + if not first: + text.append(" | ") + text.append(str(part)) + first = False + for badge, style in badges: + if not first: + text.append(" | ") + text.append(badge, style=style) + first = False + return text + + @staticmethod + def _format_option_parts(parts: list[str]) -> str: + if not parts: + return "" + return " | ".join(escape(part) for part in parts if part) + + @staticmethod + def _split_quality_label(value: str, codec: str = "") -> list[str]: + text = str(value or "").strip() + if not text: + return [] + if codec and text.lower().startswith(codec.lower()): + text = text[len(codec) :].strip() + return [text] if text else [] + + @staticmethod + def _is_cd_option(option: MusicTrackOption) -> bool: + codec = str(option.codec or "").upper() + if codec not in {"FLAC", "ALAC", "WAV", "AIFF"}: + return False + if option.bit_depth == 16 and option.sample_rate in {44100, 44100.0}: + return True + return "16-bit/44.1" in str(option.quality_label or "").lower() + + @staticmethod + def _format_sample_rate(value: Any) -> str: + try: + sample_rate = float(value) + except (TypeError, ValueError): + return str(value).strip() + if sample_rate >= 1000: + sample_rate /= 1000 + if sample_rate.is_integer(): + return f"{int(sample_rate)} kHz" + return f"{sample_rate:g} kHz" + + @staticmethod + def _format_channels(value: Any) -> str: + try: + channels = float(value) + except (TypeError, ValueError): + return str(value).strip() + if channels == 1: + return "Mono" + if channels == 2: + return "Stereo" + if channels.is_integer(): + return f"{int(channels)}.0" + return f"{channels:g}" + + @staticmethod + def _first_text(*values: Any) -> str: + for value in values: + if value is None: + continue + if isinstance(value, dict): + for key in ("label", "name", "title", "display_name", "value"): + nested = value.get(key) + if nested: + return str(nested).strip() + elif isinstance(value, (list, tuple)): + text = MusicRenderer._first_text(*value) + if text: + return text + else: + text = str(value).strip() + if text: + return text + return "" + + @staticmethod + def _first_value(*values: Any) -> Any: + for value in values: + if value is not None and value != "": + return value + return None + + @staticmethod + def _as_bool(value: Any) -> bool: + if isinstance(value, bool): + return value + if isinstance(value, str): + return value.strip().lower() in {"1", "true", "yes", "y"} + return bool(value) + + @staticmethod + def _format_duration(value: Any) -> str: + if value in (None, ""): + return "" + try: + seconds = int(float(value)) + except (TypeError, ValueError): + return str(value).strip() + minutes, seconds = divmod(seconds, 60) + hours, minutes = divmod(minutes, 60) + if hours: + return f"{hours}:{minutes:02}:{seconds:02}" + return f"{minutes}:{seconds:02}" + + @staticmethod + def _format_total_duration(value: Any) -> str: + if value in (None, ""): + return "" + try: + total_seconds = int(float(value)) + except (TypeError, ValueError): + return str(value).strip() + if total_seconds <= 0: + return "" + minutes, seconds = divmod(total_seconds, 60) + hours, minutes = divmod(minutes, 60) + if hours: + return f"{hours}h {minutes:02}m {seconds:02}s" + return f"{minutes}m {seconds:02}s" + + @staticmethod + def _format_release_date(value: Any) -> str: + if value in (None, ""): + return "" + text = str(value).strip() + match = re.fullmatch(r"(?P\d{4})(?:-(?P\d{2})-(?P\d{2}))?.*", text) + if not match or not match.group("month"): + return text + + try: + year = int(match.group("year")) + month = int(match.group("month")) + day = int(match.group("day")) + except (TypeError, ValueError): + return text + + month_names = ( + "January", + "February", + "March", + "April", + "May", + "June", + "July", + "August", + "September", + "October", + "November", + "December", + ) + if month < 1 or month > 12 or day < 1 or day > 31: + return text + return f"{month_names[month - 1]} {day}, {year}" + + @classmethod + def _quality_summary(cls, value: Any, *, lossless: bool = False, hires: bool = False) -> str: + text = str(value or "").strip() + lowered = text.lower() + if not text: + if hires and lossless: + return "Hi-Res Lossless" + if lossless: + return "Lossless" + return "" + if "atmos" in lowered: + return "Dolby Atmos" + if any(codec in lowered for codec in ("flac", "alac", "wav", "aiff")): + return "Hi-Res Lossless" if cls._is_hires_quality(text) else "Lossless" + if "lossless" in lowered: + return "Hi-Res Lossless" if "hi-res" in lowered or hires else "Lossless" + if "aac" in lowered: + return "AAC" + if "mp3" in lowered: + return "MP3" + return text + + @staticmethod + def _is_hires_quality(value: str) -> bool: + lowered = value.lower() + bit_depth = None + sample_rate = None + + bit_match = re.search(r"(?P\d+)\s*[- ]?bit", lowered) + if bit_match: + bit_depth = int(bit_match.group("bits")) + + sample_match = re.search(r"(?P\d+(?:\.\d+)?)\s*k(?:hz)?", lowered) + if sample_match: + sample_rate = float(sample_match.group("rate")) + + return bool((bit_depth and bit_depth > 16) or (sample_rate and sample_rate > 48)) + + @staticmethod + def _sum_duration(music: Music) -> int: + total = 0 + for song in music: + data = song.data if isinstance(song.data, dict) else {} + metadata = data.get("metadata") if isinstance(data.get("metadata"), dict) else {} + value = MusicRenderer._first_value(data.get("duration"), metadata.get("duration")) + try: + total += int(float(value or 0)) + except (TypeError, ValueError): + continue + return total + + @staticmethod + def _max_value(music: Music, attr: str) -> int: + values = [getattr(song, attr, 0) or 0 for song in music] + return max(values, default=0) + + +__all__ = ("MusicRenderer",) diff --git a/unshackle/core/music/tagger.py b/unshackle/core/music/tagger.py new file mode 100644 index 0000000..61edd06 --- /dev/null +++ b/unshackle/core/music/tagger.py @@ -0,0 +1,345 @@ +from __future__ import annotations + +import logging +from dataclasses import dataclass +from io import BytesIO +from pathlib import Path +from typing import Any, Optional + +from unshackle.core.config import config + +try: + from PIL import Image +except ImportError: # pragma: no cover - optional artwork enhancement + Image = None + +try: + from mutagen.flac import FLAC, Picture + from mutagen.id3 import ( + APIC, + COMM, + ID3NoHeaderError, + TALB, + TCOM, + TCON, + TCOP, + TDRC, + TIT2, + TPE1, + TPE2, + TPOS, + TRCK, + TSRC, + TXXX, + ) + from mutagen.mp3 import MP3 + from mutagen.mp4 import MP4, MP4Cover +except ImportError: # pragma: no cover - optional tagging dependency + FLAC = Picture = None + MP3 = MP4 = MP4Cover = None + ID3NoHeaderError = Exception + APIC = COMM = TALB = TCOM = TCON = TCOP = TDRC = TIT2 = TPE1 = TPE2 = TPOS = TRCK = TSRC = TXXX = None + + +log = logging.getLogger("MUSIC_TAGGER") + + +@dataclass +class MusicMetadataResult: + written: bool = False + artwork_embedded: bool = False + skipped: bool = False + reason: str = "" + + def to_dict(self) -> dict[str, Any]: + return { + "written": self.written, + "artwork_embedded": self.artwork_embedded, + "skipped": self.skipped, + "reason": self.reason, + } + + +def write_music_metadata(path: Path, song: Any, *, session: Any = None, source_md5: str = "") -> MusicMetadataResult: + """Write normalized music metadata for FLAC, MP3, and M4A/MP4 audio files.""" + path = Path(path) + extension = path.suffix.lower() + if extension not in {".flac", ".mp3", ".m4a", ".mp4"}: + return MusicMetadataResult(skipped=True, reason=f"Unsupported music metadata container: {extension}") + + if extension == ".flac" and FLAC is None: + return MusicMetadataResult(skipped=True, reason="install mutagen to write FLAC tags") + if extension == ".mp3" and MP3 is None: + return MusicMetadataResult(skipped=True, reason="install mutagen to write MP3 tags") + if extension in {".m4a", ".mp4"} and MP4 is None: + return MusicMetadataResult(skipped=True, reason="install mutagen to write MP4/M4A tags") + + tags = _build_tag_values(song, source_md5=source_md5) + artwork_url = _first_text(getattr(song, "artwork_url", None), _metadata(song).get("artwork_url")) + cover_data, mime_type = _download_cover(session, artwork_url) + + if extension == ".flac": + _write_flac_tags(path, tags, cover_data, mime_type) + elif extension == ".mp3": + _write_mp3_tags(path, tags, cover_data, mime_type) + else: + _write_mp4_tags(path, tags, cover_data, mime_type) + + return MusicMetadataResult(written=True, artwork_embedded=bool(cover_data)) + + +def _build_tag_values(song: Any, *, source_md5: str = "") -> dict[str, str]: + metadata = _metadata(song) + track_number = _string_tag(getattr(song, "track", None) or metadata.get("track_number")) + track_total = _string_tag(getattr(song, "total_tracks", None) or metadata.get("total_tracks")) + disc_number = _string_tag(getattr(song, "disc", None) or metadata.get("disc_number")) + disc_total = _string_tag(getattr(song, "total_discs", None) or metadata.get("total_discs")) + explicit = _as_bool(getattr(song, "explicit", None), metadata.get("explicit"), metadata.get("parental_warning")) + + tags = { + "TITLE": _first_text(getattr(song, "name", None), metadata.get("title")), + "ARTIST": _first_text(getattr(song, "artist", None), metadata.get("artist"), metadata.get("performer")), + "ALBUM": _first_text(getattr(song, "album", None), metadata.get("album")), + "ALBUMARTIST": _first_text(getattr(song, "album_artist", None), metadata.get("album_artist")), + "TRACKNUMBER": f"{track_number}/{track_total}" if track_number and track_total else track_number, + "TRACKTOTAL": track_total, + "DISCNUMBER": f"{disc_number}/{disc_total}" if disc_number and disc_total else disc_number, + "DISCTOTAL": disc_total, + "DATE": _string_tag(metadata.get("release_date") or metadata.get("year") or getattr(song, "year", None)), + "RELEASEDATE": _string_tag(metadata.get("release_date")), + "GENRE": _first_text(getattr(song, "genre", None), metadata.get("genre")), + "COMPOSER": _first_text(metadata.get("composer")), + "PERFORMER": _first_text(metadata.get("performer")), + "ISRC": _string_tag(getattr(song, "isrc", None) or metadata.get("isrc")), + "BARCODE": _string_tag(getattr(song, "upc", None) or metadata.get("upc") or metadata.get("barcode")), + "UPC": _string_tag(getattr(song, "upc", None) or metadata.get("upc") or metadata.get("barcode")), + "COPYRIGHT": _string_tag(getattr(song, "copyright", None) or metadata.get("copyright")), + "LABEL": _first_text(getattr(song, "label", None), metadata.get("label")), + "COMMENT": _first_text(metadata.get("comment"), metadata.get("quality")), + "SOURCE": _first_text(metadata.get("source"), metadata.get("service")), + "ENCODEDBY": "Unshackle", + "UNSHACKLE_SOURCE_MD5": source_md5, + } + if explicit: + tags["EXPLICIT"] = "1" + tags["ITUNESADVISORY"] = "1" + if config.tag and config.tag_group_name: + tags["GROUP"] = config.tag + + service = _first_text(metadata.get("service"), metadata.get("source")) + if service: + prefix = service.upper().replace(" ", "_").replace("-", "_") + if metadata.get("track_id"): + tags[f"{prefix}_TRACK_ID"] = _string_tag(metadata.get("track_id")) + if metadata.get("album_id"): + tags[f"{prefix}_ALBUM_ID"] = _string_tag(metadata.get("album_id")) + if metadata.get("track_url"): + tags[f"{prefix}_TRACK_URL"] = _string_tag(metadata.get("track_url")) + if metadata.get("album_url"): + tags[f"{prefix}_ALBUM_URL"] = _string_tag(metadata.get("album_url")) + + return {key: value for key, value in tags.items() if value} + + +def _write_flac_tags(path: Path, tags: dict[str, str], cover_data: Optional[bytes], mime_type: str) -> None: + audio = FLAC(path) + for key, value in tags.items(): + audio[key] = [value] + if cover_data and Picture is not None: + picture = Picture() + picture.type = 3 + picture.mime = mime_type or "image/jpeg" + picture.desc = "Cover" + picture.data = cover_data + if Image is not None: + try: + with Image.open(BytesIO(cover_data)) as image: + picture.width, picture.height = image.size + picture.depth = len(image.getbands()) * 8 + except Exception: + pass + audio.clear_pictures() + audio.add_picture(picture) + audio.save() + + +def _write_mp3_tags(path: Path, tags: dict[str, str], cover_data: Optional[bytes], mime_type: str) -> None: + try: + audio = MP3(path) + except ID3NoHeaderError: + audio = MP3(path) + audio.add_tags() + if audio.tags is None: + audio.add_tags() + + frame_map = { + "TITLE": ("TIT2", TIT2), + "ARTIST": ("TPE1", TPE1), + "ALBUM": ("TALB", TALB), + "ALBUMARTIST": ("TPE2", TPE2), + "TRACKNUMBER": ("TRCK", TRCK), + "DISCNUMBER": ("TPOS", TPOS), + "DATE": ("TDRC", TDRC), + "GENRE": ("TCON", TCON), + "COMPOSER": ("TCOM", TCOM), + "ISRC": ("TSRC", TSRC), + "COPYRIGHT": ("TCOP", TCOP), + } + for key, (frame_id, frame_class) in frame_map.items(): + value = tags.get(key) + if value and frame_class is not None: + audio.tags.setall(frame_id, [frame_class(encoding=3, text=[value])]) + if tags.get("COMMENT") and COMM is not None: + audio.tags.setall("COMM", [COMM(encoding=3, lang="eng", desc="", text=[tags["COMMENT"]])]) + if cover_data and APIC is not None: + audio.tags.delall("APIC") + audio.tags.add(APIC(encoding=3, mime=mime_type or "image/jpeg", type=3, desc="Cover", data=cover_data)) + + custom_keys = set(tags) - set(frame_map) - {"COMMENT"} + for key in sorted(custom_keys): + if TXXX is not None and tags[key]: + audio.tags.delall(f"TXXX:{key}") + audio.tags.add(TXXX(encoding=3, desc=key, text=[tags[key]])) + audio.save() + + +def _write_mp4_tags(path: Path, tags: dict[str, str], cover_data: Optional[bytes], mime_type: str) -> None: + audio = MP4(path) + if tags.get("TITLE"): + audio["\xa9nam"] = [tags["TITLE"]] + if tags.get("ARTIST"): + audio["\xa9ART"] = [tags["ARTIST"]] + if tags.get("ALBUM"): + audio["\xa9alb"] = [tags["ALBUM"]] + if tags.get("ALBUMARTIST"): + audio["aART"] = [tags["ALBUMARTIST"]] + if tags.get("DATE"): + audio["\xa9day"] = [tags["DATE"]] + if tags.get("GENRE"): + audio["\xa9gen"] = [tags["GENRE"]] + if tags.get("COMPOSER"): + audio["\xa9wrt"] = [tags["COMPOSER"]] + if tags.get("COMMENT"): + audio["\xa9cmt"] = [tags["COMMENT"]] + if tags.get("COPYRIGHT"): + audio["cprt"] = [tags["COPYRIGHT"]] + + track_number, track_total = _split_number_pair(tags.get("TRACKNUMBER", "")) + if track_number: + audio["trkn"] = [(track_number, track_total or 0)] + disc_number, disc_total = _split_number_pair(tags.get("DISCNUMBER", "")) + if disc_number: + audio["disk"] = [(disc_number, disc_total or 0)] + + if cover_data and MP4Cover is not None: + image_format = MP4Cover.FORMAT_PNG if mime_type == "image/png" else MP4Cover.FORMAT_JPEG + audio["covr"] = [MP4Cover(cover_data, imageformat=image_format)] + mapped_keys = { + "TITLE", + "ARTIST", + "ALBUM", + "ALBUMARTIST", + "DATE", + "GENRE", + "COMPOSER", + "COMMENT", + "COPYRIGHT", + "TRACKNUMBER", + "DISCNUMBER", + } + for key in sorted(set(tags) - mapped_keys): + value = tags.get(key) + if value: + audio[f"----:com.apple.iTunes:{key}"] = [str(value).encode("utf-8")] + audio.save() + + +def _download_cover(session: Any, artwork_url: str) -> tuple[Optional[bytes], str]: + if not session or not artwork_url: + return None, "" + try: + with session.get(artwork_url, timeout=20) as response: + response.raise_for_status() + data = response.content + if not data: + return None, "" + content_type = str(response.headers.get("Content-Type") or "").split(";")[0].strip().lower() + return data, _mime_from_image(data, content_type or "image/jpeg") + except Exception as error: + log.debug("Music cover download failed for %s: %s", artwork_url, error) + return None, "" + + +def _metadata(song: Any) -> dict[str, Any]: + data = getattr(song, "data", None) + return data if isinstance(data, dict) else {} + + +def _first_text(*values: Any) -> str: + for value in values: + if isinstance(value, dict): + text = _first_text(value.get("name"), value.get("title"), value.get("display_name"), value.get("value")) + if text: + return text + elif isinstance(value, list): + parts = [_first_text(item) for item in value] + text = ", ".join(part for part in parts if part) + if text: + return text + else: + text = str(value or "").strip() + if text: + return text + return "" + + +def _string_tag(value: Any) -> str: + if value in (None, "", [], {}): + return "" + if isinstance(value, bool): + return "Explicit" if value else "" + return str(value).strip() + + +def _as_bool(*values: Any) -> bool: + for value in values: + if value in (None, "", [], {}): + continue + if isinstance(value, bool): + return value + if isinstance(value, (int, float)): + return value != 0 + text = str(value).strip().lower() + if text in {"1", "true", "yes", "y", "explicit", "parental_warning"}: + return True + if text in {"0", "false", "no", "n", "clean"}: + return False + return False + + +def _mime_from_image(data: bytes, fallback: str = "image/jpeg") -> str: + if data.startswith(b"\x89PNG\r\n\x1a\n"): + return "image/png" + if data.startswith(b"\xff\xd8\xff"): + return "image/jpeg" + if data.startswith(b"RIFF") and b"WEBP" in data[:16]: + return "image/webp" + return fallback + + +def _split_number_pair(value: str) -> tuple[int, int]: + if not value: + return 0, 0 + first, _, second = str(value).partition("/") + try: + number = int(first) + except ValueError: + number = 0 + try: + total = int(second) if second else 0 + except ValueError: + total = 0 + return number, total + + +__all__ = ("MusicMetadataResult", "write_music_metadata") diff --git a/unshackle/core/titles/__init__.py b/unshackle/core/titles/__init__.py index 9c95645..58304aa 100644 --- a/unshackle/core/titles/__init__.py +++ b/unshackle/core/titles/__init__.py @@ -2,10 +2,10 @@ from typing import Union from .episode import Episode, Series from .movie import Movie, Movies -from .song import Album, Song +from .music import Album, Music, Song Title_T = Union[Movie, Episode, Song] -Titles_T = Union[Movies, Series, Album] +Titles_T = Union[Movies, Series, Music, Album] def remap_titles(titles: Titles_T, title_map: dict) -> Titles_T: @@ -39,6 +39,7 @@ __all__ = ( "Series", "Movie", "Movies", + "Music", "Album", "Song", "Title_T", diff --git a/unshackle/core/titles/music.py b/unshackle/core/titles/music.py new file mode 100644 index 0000000..70c3abb --- /dev/null +++ b/unshackle/core/titles/music.py @@ -0,0 +1,276 @@ +import re +from abc import ABC +from typing import Any, Iterable, Optional, Union + +from langcodes import Language +from pymediainfo import MediaInfo +from sortedcontainers import SortedKeyList + +from unshackle.core.config import config +from unshackle.core.titles.title import Title +from unshackle.core.utilities import sanitize_filename +from unshackle.core.utils.template_formatter import TemplateFormatter + + +class Song(Title): + def __init__( + self, + id_: Any, + service: type, + name: str, + artist: str, + album: str, + track: int, + disc: int, + year: int, + language: Optional[Union[str, Language]] = None, + data: Optional[Any] = None, + album_artist: Optional[str] = None, + release_type: str = "album", + total_tracks: Optional[int] = None, + total_discs: Optional[int] = None, + genre: Optional[str] = None, + explicit: Optional[bool] = None, + isrc: Optional[str] = None, + upc: Optional[str] = None, + copyright: Optional[str] = None, + label: Optional[str] = None, + lyrics: Optional[str] = None, + artwork_url: Optional[str] = None, + ) -> None: + super().__init__(id_, service, language, data) + + if not name: + raise ValueError("Song name must be provided") + if not isinstance(name, str): + raise TypeError(f"Expected name to be a str, not {name!r}") + + if not artist: + raise ValueError("Song artist must be provided") + if not isinstance(artist, str): + raise TypeError(f"Expected artist to be a str, not {artist!r}") + + if not album: + raise ValueError("Song album must be provided") + if not isinstance(album, str): + raise TypeError(f"Expected album to be a str, not {album!r}") + + if not track: + raise ValueError("Song track must be provided") + if not isinstance(track, int): + raise TypeError(f"Expected track to be an int, not {track!r}") + + if not disc: + raise ValueError("Song disc must be provided") + if not isinstance(disc, int): + raise TypeError(f"Expected disc to be an int, not {disc!r}") + + if not year: + raise ValueError("Song year must be provided") + if not isinstance(year, int): + raise TypeError(f"Expected year to be an int, not {year!r}") + if album_artist is not None and not isinstance(album_artist, str): + raise TypeError(f"Expected album_artist to be a str, not {album_artist!r}") + if not isinstance(release_type, str): + raise TypeError(f"Expected release_type to be a str, not {release_type!r}") + if total_tracks is not None and not isinstance(total_tracks, int): + raise TypeError(f"Expected total_tracks to be an int, not {total_tracks!r}") + if total_discs is not None and not isinstance(total_discs, int): + raise TypeError(f"Expected total_discs to be an int, not {total_discs!r}") + if genre is not None and not isinstance(genre, str): + raise TypeError(f"Expected genre to be a str, not {genre!r}") + if explicit is not None and not isinstance(explicit, bool): + raise TypeError(f"Expected explicit to be a bool, not {explicit!r}") + if isrc is not None and not isinstance(isrc, str): + raise TypeError(f"Expected isrc to be a str, not {isrc!r}") + if upc is not None and not isinstance(upc, str): + raise TypeError(f"Expected upc to be a str, not {upc!r}") + if copyright is not None and not isinstance(copyright, str): + raise TypeError(f"Expected copyright to be a str, not {copyright!r}") + if label is not None and not isinstance(label, str): + raise TypeError(f"Expected label to be a str, not {label!r}") + if lyrics is not None and not isinstance(lyrics, str): + raise TypeError(f"Expected lyrics to be a str, not {lyrics!r}") + if artwork_url is not None and not isinstance(artwork_url, str): + raise TypeError(f"Expected artwork_url to be a str, not {artwork_url!r}") + + name = name.strip() + artist = artist.strip() + album = album.strip() + album_artist = album_artist.strip() if album_artist else None + release_type = release_type.strip().lower() + genre = genre.strip() if genre else None + isrc = isrc.strip() if isrc else None + upc = upc.strip() if upc else None + copyright = copyright.strip() if copyright else None + label = label.strip() if label else None + lyrics = lyrics.strip() if lyrics else None + artwork_url = artwork_url.strip() if artwork_url else None + + if track <= 0: + raise ValueError(f"Song track cannot be {track}") + if disc <= 0: + raise ValueError(f"Song disc cannot be {disc}") + if year <= 0: + raise ValueError(f"Song year cannot be {year}") + if not release_type: + raise ValueError("Song release_type must be provided") + if total_tracks is not None and total_tracks <= 0: + raise ValueError(f"Song total_tracks cannot be {total_tracks}") + if total_discs is not None and total_discs <= 0: + raise ValueError(f"Song total_discs cannot be {total_discs}") + + self.name = name + self.artist = artist + self.album = album + self.track = track + self.disc = disc + self.year = year + self.album_artist = album_artist + self.release_type = release_type + self.total_tracks = total_tracks + self.total_discs = total_discs + self.genre = genre + self.explicit = explicit + self.isrc = isrc + self.upc = upc + self.copyright = copyright + self.label = label + self.lyrics = lyrics + self.artwork_url = artwork_url + + def __str__(self) -> str: + return "{artist} - {album} ({year}) / {track:02}. {name}".format( + artist=self.artist, album=self.album, year=self.year, track=self.track, name=self.name + ).strip() + + def _build_template_context(self, media_info: MediaInfo, show_service: bool = True) -> dict: + """Build template context dictionary from MediaInfo.""" + context = self._build_base_template_context(media_info, show_service) + context["title"] = self.name.replace("$", "S") + context["year"] = self.year or "" + context["track_number"] = f"{self.track:02}" + context["artist"] = self.artist.replace("$", "S") + context["album_artist"] = (self.album_artist or self.artist).replace("$", "S") + context["album"] = self.album.replace("$", "S") + context["disc"] = f"{self.disc:02}" if self.disc > 1 else "" + context["track_total"] = f"{self.total_tracks:02}" if self.total_tracks else "" + context["disc_total"] = f"{self.total_discs:02}" if self.total_discs else "" + context["release_type"] = self.release_type + context["genre"] = self.genre or "" + context["explicit"] = "Explicit" if self.explicit else "" + context["isrc"] = self.isrc or "" + context["upc"] = self.upc or "" + context["label"] = self.label or "" + return context + + def get_filename(self, media_info: MediaInfo, folder: bool = False, show_service: bool = True) -> str: + if folder: + # Album folder name: prefer the dedicated "albums" template, fall back to the + # legacy "songs" folder template, then to "{artist} - {album} ({year})". + template = config.get_folder_template("albums") or config.get_folder_template("songs") + if template: + formatter = TemplateFormatter(template) + context = self._build_template_context(media_info, show_service) + folder_name = formatter.format(context) + + separators = re.sub(r"\{[^}]*\}", "", template) + spacer = "." if "." in separators and " " not in separators else " " + return sanitize_filename(folder_name, spacer) + name = f"{self.artist} - {self.album}" + if self.year: + name += f" ({self.year})" + return sanitize_filename(name, " ") + + template = config.output_template.get("songs") or "{track_number}. {title}" + formatter = TemplateFormatter(template) + context = self._build_template_context(media_info, show_service) + return formatter.format(context) + + +class Music(SortedKeyList, ABC): + """A grouped music release, such as an album, EP, single, compilation, or playlist.""" + + def __new__(cls, *args: Any, **kwargs: Any): + return super().__new__(cls) + + def __init__( + self, + iterable: Optional[Iterable] = None, + kind: str = "album", + title: Optional[str] = None, + artist: Optional[str] = None, + year: Optional[int] = None, + total_tracks: Optional[int] = None, + total_discs: Optional[int] = None, + artwork_url: Optional[str] = None, + total_duration: Optional[int] = None, + owner: Optional[str] = None, + description: Optional[str] = None, + ): + if not isinstance(kind, str): + raise TypeError(f"Expected kind to be a str, not {kind!r}") + if title is not None and not isinstance(title, str): + raise TypeError(f"Expected title to be a str, not {title!r}") + if artist is not None and not isinstance(artist, str): + raise TypeError(f"Expected artist to be a str, not {artist!r}") + if year is not None and not isinstance(year, int): + raise TypeError(f"Expected year to be an int, not {year!r}") + if total_tracks is not None and not isinstance(total_tracks, int): + raise TypeError(f"Expected total_tracks to be an int, not {total_tracks!r}") + if total_discs is not None and not isinstance(total_discs, int): + raise TypeError(f"Expected total_discs to be an int, not {total_discs!r}") + if artwork_url is not None and not isinstance(artwork_url, str): + raise TypeError(f"Expected artwork_url to be a str, not {artwork_url!r}") + if total_duration is not None and not isinstance(total_duration, int): + raise TypeError(f"Expected total_duration to be an int, not {total_duration!r}") + if owner is not None and not isinstance(owner, str): + raise TypeError(f"Expected owner to be a str, not {owner!r}") + if description is not None and not isinstance(description, str): + raise TypeError(f"Expected description to be a str, not {description!r}") + + super().__init__(iterable, key=lambda x: (x.album, x.disc, x.track, x.year or 0)) + + kind = kind.strip().lower() + if not kind: + raise ValueError("Music kind must be provided") + if year is not None and year <= 0: + raise ValueError(f"Music year cannot be {year}") + if total_tracks is not None and total_tracks <= 0: + raise ValueError(f"Music total_tracks cannot be {total_tracks}") + if total_discs is not None and total_discs <= 0: + raise ValueError(f"Music total_discs cannot be {total_discs}") + if total_duration is not None and total_duration < 0: + raise ValueError(f"Music total_duration cannot be {total_duration}") + + self.kind = kind + self.title = title.strip() if title else None + self.artist = artist.strip() if artist else None + self.year = year + self.total_tracks = total_tracks + self.total_discs = total_discs + self.artwork_url = artwork_url.strip() if artwork_url else None + self.total_duration = total_duration + self.owner = owner.strip() if owner else None + self.description = description.strip() if description else None + + def __str__(self) -> str: + if not self: + return super().__str__() + first_song = self[0] + artist = self.artist or getattr(first_song, "album_artist", None) or first_song.artist + title = self.title or first_song.album + year = self.year or first_song.year or "?" + return f"{artist} - {title} ({year})" + + def tree(self, verbose: bool = False) -> Any: + from unshackle.core.music.renderer import MusicRenderer + + return MusicRenderer().render(self, verbose=verbose) + + +class Album(Music): + """Backward-compatible collection name for album-style music releases.""" + + +__all__ = ("Song", "Music", "Album") diff --git a/unshackle/core/tracks/audio.py b/unshackle/core/tracks/audio.py index 988136b..115de57 100644 --- a/unshackle/core/tracks/audio.py +++ b/unshackle/core/tracks/audio.py @@ -154,7 +154,11 @@ class Audio(Track): @property def atmos(self) -> bool: """Return True if the audio track contains Dolby Atmos.""" - return bool(self.joc) + if self.joc: + return True + if isinstance(self.extra, dict): + return bool(self.extra.get("atmos") or self.extra.get("joc")) + return False def __str__(self) -> str: return " | ".join( diff --git a/unshackle/unshackle-example.yaml b/unshackle/unshackle-example.yaml index 61b942c..49484de 100644 --- a/unshackle/unshackle-example.yaml +++ b/unshackle/unshackle-example.yaml @@ -70,14 +70,18 @@ output_template: # folder: '{title} ({year?})' # # Per-title-type folder templates (optional). Override folder naming separately for - # movies, series, and songs. Useful when music libraries need artist/album-style folders + # movies, series, and albums. Useful when music libraries need artist/album-style folders # while movies/series follow a different scheme. Any kind omitted falls back to the # default for that title type. # + # For music: "songs" (above) names each track file, while "albums" (below) names the + # album folder the tracks are saved into. Album folder vars: {album_artist}, {album}, + # {artist}, {year}, {genre}, {label}, {release_type}, {track_total}, {disc_total}. + # # folder: # movies: '{title} ({year})' # series: '{title} ({year?})' - # songs: '{artist}/{album} ({year?})' + # albums: '{album_artist} - {album} ({year?})' # Language-based tagging for output filenames # Automatically adds language identifiers (e.g., DANiSH, NORDiC, DKsubs) based on diff --git a/uv.lock b/uv.lock index ff9f7bb..7971ec4 100644 --- a/uv.lock +++ b/uv.lock @@ -660,6 +660,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/7f/ed/e3705d6d02b4f7aea715a353c8ce193efd0b5db13e204df895d38734c244/isort-7.0.0-py3-none-any.whl", hash = "sha256:1bcabac8bc3c36c7fb7b98a76c8abb18e0f841a3ba81decac7691008592499c1", size = 94672, upload-time = "2025-10-11T13:30:57.665Z" }, ] +[[package]] +name = "jsonpath-ng" +version = "1.8.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/32/58/250751940d75c8019659e15482d548a4aa3b6ce122c515102a4bfdac50e3/jsonpath_ng-1.8.0.tar.gz", hash = "sha256:54252968134b5e549ea5b872f1df1168bd7defe1a52fed5a358c194e1943ddc3", size = 74513, upload-time = "2026-02-24T14:42:06.182Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/03/99/33c7d78a3fb70d545fd5411ac67a651c81602cc09c9cf0df383733f068c5/jsonpath_ng-1.8.0-py3-none-any.whl", hash = "sha256:b8dde192f8af58d646fc031fac9c99fe4d00326afc4148f1f043c601a8cfe138", size = 67844, upload-time = "2026-02-28T00:53:19.637Z" }, +] + [[package]] name = "jsonpickle" version = "4.1.1" @@ -934,6 +943,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/81/08/7036c080d7117f28a4af526d794aab6a84463126db031b007717c1a6676e/multidict-6.7.1-py3-none-any.whl", hash = "sha256:55d97cc6dae627efa6a6e548885712d4864b81110ac76fa4e534c03819fa4a56", size = 12319, upload-time = "2026-01-26T02:46:44.004Z" }, ] +[[package]] +name = "mutagen" +version = "1.47.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/81/e6/64bc71b74eef4b68e61eb921dcf72dabd9e4ec4af1e11891bbd312ccbb77/mutagen-1.47.0.tar.gz", hash = "sha256:719fadef0a978c31b4cf3c956261b3c58b6948b32023078a2117b1de09f0fc99", size = 1274186, upload-time = "2023-09-03T16:33:33.411Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b0/7a/620f945b96be1f6ee357d211d5bf74ab1b7fe72a9f1525aafbfe3aee6875/mutagen-1.47.0-py3-none-any.whl", hash = "sha256:edd96f50c5907a9539d8e5bba7245f62c9f520aef333d13392a79a4f70aca719", size = 194391, upload-time = "2023-09-03T16:33:29.955Z" }, +] + [[package]] name = "mypy" version = "1.19.1" @@ -1778,10 +1796,12 @@ dependencies = [ { name = "defusedxml" }, { name = "filelock" }, { name = "fonttools" }, + { name = "jsonpath-ng" }, { name = "jsonpickle" }, { name = "langcodes" }, { name = "language-data" }, { name = "lxml" }, + { name = "mutagen" }, { name = "protobuf" }, { name = "pycaption" }, { name = "pycountry" }, @@ -1844,10 +1864,12 @@ requires-dist = [ { name = "defusedxml", specifier = ">=0.7.1" }, { name = "filelock", specifier = ">=3.20.3,<4" }, { name = "fonttools", specifier = ">=4.60.2,<5" }, + { name = "jsonpath-ng", specifier = ">=1.8.0" }, { name = "jsonpickle", specifier = ">=3.0.4,<5" }, { name = "langcodes", specifier = ">=3.4.0,<4" }, { name = "language-data", specifier = ">=1.4.0" }, { name = "lxml", specifier = ">=5.2.1,<7" }, + { name = "mutagen", specifier = ">=1.47.0,<2" }, { name = "protobuf", specifier = ">=4.25.3,<7" }, { name = "pycaption", specifier = ">=2.2.6,<3" }, { name = "pycountry", specifier = ">=24.6.1" },