feat(music): native music core - shared helpers, album folder template, display cleanup (#125)

* Add native Music core workflow

* feat(music): add shared music core helpers, album folder template, and UX cleanup

Consolidate duplicated music-service logic into core and extend the native music workflow.

- add core/music/extract.py: shared stateless helpers (first_text, first_number, year/duration/name formatting, classify_release_kind, dedupe_track_options, build_music_from_songs) so services stop carrying their own copies
- add core/music/display.py: shared rich rendering (render_track_panel, render_album_header, render_artwork_preview) with TrackRow / MusicHeaderInfo data holders
- export the new helpers from core/music/__init__.py
- add dedicated `albums` folder template kind (output_template.folder.albums) resolving albums -> songs -> "{artist} - {album} ({year})"; whitelist music template variables (album_artist, track_total, disc_total, release_type, genre, explicit, isrc, upc, label)
- fix song filename crash: config.get_output_template(...) did not exist; use config.output_template.get("songs") with a sane default
- strip emojis from music output (renderer, dl.py music branch) to match unshackle UX; remove dead MusicSongPlan import and music_icon logic
- document the albums folder key in unshackle-example.yaml
- add tests for extract, display, and the folder template

---------

Co-authored-by: MrMovies-Dev <MrMovies-Dev@users.noreply.github.com>
This commit is contained in:
sp4rk.y
2026-06-15 13:34:34 -06:00
committed by GitHub
parent 680f5059b5
commit 78a6a97fcf
21 changed files with 3095 additions and 15 deletions

View File

@@ -67,6 +67,8 @@ dependencies = [
"animeapi-py>=0.6.0",
"rnet>=2.4.2",
"bandit>=1.9.4",
"jsonpath-ng>=1.8.0",
"mutagen>=1.47.0,<2",
"defusedxml>=0.7.1",
]

View File

@@ -0,0 +1,193 @@
"""Unit tests for the shared music rendering helpers in unshackle.core.music.display.
Rendering is visual, so these assert structure/type/no-crash rather than exact
ANSI. No network: the artwork soft-fail path is exercised with a session whose
get raises, and (when Pillow is absent) via the missing-dependency guard.
"""
from __future__ import annotations
from typing import Any, Optional
from rich.console import Console, RenderableType
from rich.panel import Panel
from unshackle.core.music.display import (MusicHeaderInfo, TrackRow, format_track_detail_quality, render_album_header,
render_artwork_preview, render_track_panel)
from unshackle.core.titles.music import Song
class DummyService:
"""Stand-in service class; Song only requires a type, never an instance."""
def make_song(*, track: int = 1, name: str = "Song", album: str = "Album") -> Song:
return Song(
id_=f"{album}-{track}",
service=DummyService,
name=name,
artist="Artist",
album=album,
track=track,
disc=1,
year=2020,
)
def render_to_text(renderable: RenderableType) -> str:
"""Render a rich renderable to plain text so we can assert on its content."""
console = Console(width=120, record=True, force_terminal=False)
console.print(renderable)
return console.export_text()
def test_trackrow_constructs() -> None:
song = make_song(name="Track A")
row = TrackRow(
song=song,
quality_label="FLAC 16-bit/44.1kHz",
layout="Stereo",
duration_str="3:21",
hires=True,
cd=True,
note="",
)
assert row.song is song
assert row.quality_label == "FLAC 16-bit/44.1kHz"
assert row.layout == "Stereo"
assert row.hires is True
assert row.cd is True
assert row.note == ""
def test_trackrow_defaults() -> None:
row = TrackRow(song=make_song(), quality_label="MP3 320", layout="Stereo", duration_str="2:00")
assert row.hires is False
assert row.cd is False
assert row.note == ""
def test_musicheaderinfo_constructs() -> None:
info = MusicHeaderInfo(
artist="Some Artist",
album="Some Album",
year="2021",
track_count=12,
quality_label="FLAC 24-bit/96kHz",
duration_str="42:10",
)
assert info.artist == "Some Artist"
assert info.album == "Some Album"
assert info.year == "2021"
assert info.track_count == 12
assert info.quality_label == "FLAC 24-bit/96kHz"
assert info.duration_str == "42:10"
assert info.artist_label == "Artist"
def test_format_track_detail_quality_brackets_codec() -> None:
assert format_track_detail_quality("FLAC 16-bit/44.1kHz") == "[FLAC] | 16-bit/44.1kHz"
assert format_track_detail_quality("ogg 320kbps") == "[OGG] | 320kbps"
def test_format_track_detail_quality_passthrough() -> None:
assert format_track_detail_quality("Lossless") == "Lossless"
assert format_track_detail_quality("") == ""
def test_render_track_panel_returns_panel() -> None:
rows = [
TrackRow(song=make_song(track=1, name="First"), quality_label="FLAC 16-bit/44.1kHz", layout="Stereo", duration_str="3:00", cd=True),
TrackRow(song=make_song(track=2, name="Second"), quality_label="OGG 320", layout="Stereo", duration_str="4:00", hires=True),
]
panel = render_track_panel(rows, total=len(rows))
assert isinstance(panel, Panel)
text = render_to_text(panel)
assert "First" in text
assert "Second" in text
assert "2 Tracks" in text
assert "CD" in text
assert "Hi-Res" in text
def test_render_track_panel_singular_label() -> None:
rows = [TrackRow(song=make_song(name="Only"), quality_label="FLAC 16-bit/44.1kHz", layout="Stereo", duration_str="3:00")]
text = render_to_text(render_track_panel(rows, total=1))
# The count node reads "1 Track" (singular); the panel title is always
# "Available Tracks", so we only check the count node phrasing here.
assert "1 Track" in text
assert "1 Tracks" not in text
def test_render_track_panel_note_marks_unavailable() -> None:
rows = [
TrackRow(
song=make_song(name="Gone"),
quality_label="FLAC",
layout="Stereo",
duration_str="3:00",
cd=True,
hires=True,
note="(unavailable in region)",
)
]
text = render_to_text(render_track_panel(rows, total=1))
assert "unavailable in region" in text
# When a note is present, the badges are suppressed.
assert "CD" not in text
assert "Hi-Res" not in text
def test_render_album_header_without_artwork() -> None:
info = MusicHeaderInfo(
artist="The Artist",
album="Greatest Hits",
year="1999",
track_count=10,
quality_label="FLAC 16-bit/44.1kHz",
duration_str="55:00",
)
header = render_album_header(info)
assert header is not None
text = render_to_text(header)
assert "The Artist" in text
assert "Greatest Hits" in text
assert "1999" in text
assert "10" in text
def test_render_album_header_with_artwork() -> None:
info = MusicHeaderInfo(
artist="Owner Name",
album="A Playlist",
year="2024",
track_count=3,
quality_label="OGG 320",
artist_label="Owner",
)
artwork = render_album_header(MusicHeaderInfo("x", "y", "1", 1, "q")) # any renderable
header = render_album_header(info, artwork=artwork)
assert header is not None
text = render_to_text(header)
assert "Owner Name" in text
assert "A Playlist" in text
assert "Owner" in text
class RaisingSession:
"""Session stub whose get() always raises, to drive the soft-fail path."""
def get(self, *args: Any, **kwargs: Any) -> Any:
raise RuntimeError("network disabled in tests")
def test_render_artwork_preview_empty_url_returns_none() -> None:
assert render_artwork_preview(RaisingSession(), "") is None
def test_render_artwork_preview_soft_fails_without_network() -> None:
# Either Pillow is absent (guard returns None) or the session.get raises
# (caught and returns None). Both paths must yield None, never raise.
result: Optional[RenderableType] = render_artwork_preview(RaisingSession(), "https://example.invalid/cover.jpg")
assert result is None

View File

@@ -0,0 +1,243 @@
"""Unit tests for the shared music helpers in unshackle.core.music.extract."""
from __future__ import annotations
from typing import Any, Optional
import pytest
from unshackle.core.music.extract import (
build_music_from_songs,
classify_release_kind,
dedupe_track_options,
duration_seconds,
first_number,
first_text,
format_duration,
format_names,
year_from_value,
)
from unshackle.core.music.models import MusicTrackOption
from unshackle.core.titles.music import Music, Song
class DummyService:
"""Stand-in service class; Song only requires a type, never an instance."""
def make_song(
*,
track: int = 1,
disc: int = 1,
year: int = 2020,
album: str = "Album",
artist: str = "Artist",
name: str = "Song",
album_artist: Optional[str] = None,
total_tracks: Optional[int] = None,
total_discs: Optional[int] = None,
artwork_url: Optional[str] = None,
data: Optional[Any] = None,
) -> Song:
return Song(
id_=f"{album}-{disc}-{track}",
service=DummyService,
name=name,
artist=artist,
album=album,
track=track,
disc=disc,
year=year,
album_artist=album_artist,
total_tracks=total_tracks,
total_discs=total_discs,
artwork_url=artwork_url,
data=data,
)
@pytest.mark.parametrize(
("values", "default", "expected"),
[
((None, "", " hello "), "", "hello"),
(("", None), "fallback", "fallback"),
(({"name": "Track"},), "", "Track"),
(({"title": "T"},), "", "T"),
(({"description": "D"},), "", "D"),
((["a", "", "b"],), "", "a, b"),
((123,), "", "123"),
((), "def", "def"),
],
)
def test_first_text(values: tuple[Any, ...], default: str, expected: str) -> None:
assert first_text(*values, default=default) == expected
@pytest.mark.parametrize(
("values", "expected"),
[
((None, "", "12"), 12.0),
(("3.5",), 3.5),
(("abc", 7), 7.0),
((None, ""), None),
((), None),
],
)
def test_first_number(values: tuple[Any, ...], expected: Optional[float]) -> None:
assert first_number(*values) == expected
@pytest.mark.parametrize(
("value", "expected"),
[
("2021-05-04", 2021),
("released 1999 remaster", 1999),
(2018, 2018),
("no year here", 1900),
(None, 1900),
("", 1900),
],
)
def test_year_from_value(value: Any, expected: int) -> None:
assert year_from_value(value) == expected
@pytest.mark.parametrize(
("value", "expected"),
[
(250, 250.0),
(250000, 250.0), # treated as milliseconds
("180", 180.0),
(None, None),
("nope", None),
],
)
def test_duration_seconds(value: Any, expected: Optional[float]) -> None:
assert duration_seconds(value) == expected
@pytest.mark.parametrize(
("value", "expected"),
[
(0, "0:00"),
(5, "0:05"),
(65, "1:05"),
(599, "9:59"),
(3600, "1:00:00"),
(3725, "1:02:05"),
(-5, "0:00"),
(None, ""),
("bad", ""),
],
)
def test_format_duration(value: Any, expected: str) -> None:
assert format_duration(value) == expected
@pytest.mark.parametrize(
("value", "sep", "expected"),
[
(["Alice", "Bob"], ", ", "Alice, Bob"),
(["Alice", "Alice"], ", ", "Alice"), # de-dupes
([{"profile": {"name": "DJ"}}], ", ", "DJ"),
({"items": [{"name": "X"}, {"name": "Y"}]}, " & ", "X & Y"),
("Solo", ", ", "Solo"),
],
)
def test_format_names(value: Any, sep: str, expected: str) -> None:
assert format_names(value, sep=sep) == expected
@pytest.mark.parametrize(
("raw_kind", "count", "expected"),
[
("single", 1, "single"),
("single", 3, "ep"), # multi-track "single" -> EP
("EP", None, "ep"),
("Extended Play", None, "ep"),
("ep-single", 1, "single"),
("ep-single", 4, "ep"),
("album", None, "album"),
("compilation", None, "compilation"),
("Live Recording", None, "live"),
("download", None, "download"),
("playlist", None, "playlist"),
("other", None, "other"),
("totally-unknown", None, "album"),
("", None, "album"),
],
)
def test_classify_release_kind(raw_kind: str, count: Optional[float], expected: str) -> None:
assert classify_release_kind(raw_kind, count) == expected
def test_dedupe_track_options() -> None:
a = MusicTrackOption(codec="flac", bit_depth=16, sample_rate=44100, bitrate=None, quality_label="L", explicit=False)
a_dup = MusicTrackOption(
codec="FLAC", bit_depth=16, sample_rate=44100, bitrate=None, quality_label="L", explicit=False
) # codec case-insensitive duplicate of `a`
b = MusicTrackOption(codec="flac", bit_depth=24, sample_rate=96000, bitrate=None, quality_label="H", explicit=False)
c = MusicTrackOption(codec="flac", bit_depth=16, sample_rate=44100, bitrate=None, quality_label="L", explicit=True)
result = dedupe_track_options([a, a_dup, b, c])
assert result == [a, b, c]
def test_build_music_from_songs() -> None:
songs = [
make_song(
track=1,
disc=1,
year=2019,
album="Greatest",
artist="Band",
album_artist="The Band",
total_tracks=2,
total_discs=1,
artwork_url="http://art/1.jpg",
data={"duration": 200},
),
make_song(
track=2,
disc=1,
year=2019,
album="Greatest",
artist="Band",
total_tracks=2,
total_discs=1,
data={"duration": 100},
),
]
music = build_music_from_songs(songs, kind="album")
assert isinstance(music, Music)
assert music.kind == "album"
assert music.title == "Greatest"
assert music.artist == "The Band" # album_artist preferred over artist
assert music.year == 2019
assert music.total_tracks == 2
assert music.total_discs == 1
assert music.total_duration == 300
assert music.artwork_url == "http://art/1.jpg"
def test_build_music_from_songs_overrides() -> None:
songs = [make_song(data={"duration": 60})]
music = build_music_from_songs(
songs,
kind="playlist",
title="My Mix",
artist="Various",
owner="george",
description="best of",
)
assert music.title == "My Mix"
assert music.artist == "Various"
assert music.owner == "george"
assert music.description == "best of"
def test_build_music_from_songs_empty_raises() -> None:
with pytest.raises(ValueError, match="nothing here"):
build_music_from_songs([], kind="album", empty_error="nothing here")

View File

@@ -0,0 +1,93 @@
"""Tests for the dedicated album-folder template (`output_template.folder.albums`)."""
from __future__ import annotations
import warnings
import pytest
from unshackle.core.config import Config, config
from unshackle.core.titles.music import Song
from unshackle.core.utilities import sanitize_filename
class DummyService:
pass
class StubMediaInfo:
"""Minimal MediaInfo stand-in: the template context only reads track lists."""
video_tracks: list = []
audio_tracks: list = []
def make_song(**overrides) -> Song:
kwargs = dict(
id_="track-0001",
service=DummyService,
name="NUEVAYoL",
artist="Bad Bunny",
album="DeBI TiRAR MaS FOToS",
track=1,
disc=1,
year=2025,
album_artist="Bad Bunny",
)
kwargs.update(overrides)
return Song(**kwargs)
@pytest.fixture
def reset_folder_config():
"""Save/restore the global config's template attributes around each test."""
saved = (config.folder_templates, config.folder_template, config.output_template)
config.folder_templates = {}
config.folder_template = ""
config.output_template = {}
yield config
config.folder_templates, config.folder_template, config.output_template = saved
def test_folder_fallback_when_no_templates(reset_folder_config):
song = make_song()
result = song.get_filename(StubMediaInfo(), folder=True)
assert result == sanitize_filename("Bad Bunny - DeBI TiRAR MaS FOToS (2025)", " ")
def test_albums_template_used(reset_folder_config):
reset_folder_config.folder_templates = {"albums": "{album_artist} - {album} ({year})"}
result = make_song().get_filename(StubMediaInfo(), folder=True)
assert result == sanitize_filename("Bad Bunny - DeBI TiRAR MaS FOToS (2025)", " ")
# Album folder must NOT carry per-track info like the song file name does.
assert "01" not in result
def test_albums_preferred_over_songs(reset_folder_config):
reset_folder_config.folder_templates = {"albums": "AA-{album}", "songs": "SS-{album}"}
result = make_song().get_filename(StubMediaInfo(), folder=True)
assert result.startswith("AA-")
def test_songs_folder_used_when_no_albums(reset_folder_config):
# Backward compatibility: the legacy "songs" folder kind still names the album folder.
reset_folder_config.folder_templates = {"songs": "SS-{album}"}
result = make_song().get_filename(StubMediaInfo(), folder=True)
assert result.startswith("SS-")
def test_validation_accepts_music_variables_and_albums_kind():
with warnings.catch_warnings():
warnings.simplefilter("error") # any warning becomes a test failure
Config(
output_template={
"songs": "{track_number}. {title}",
"folder": {"albums": "{album_artist} - {album} ({year?})"},
}
)
def test_validation_warns_on_unknown_folder_kind():
with pytest.warns(UserWarning, match="Unknown folder template kind"):
# A non-folder key is required so output-template validation actually runs.
Config(output_template={"songs": "{track_number}. {title}", "folder": {"bogus": "{album}"}})

View File

@@ -48,11 +48,21 @@ from unshackle.core.constants import DOWNLOAD_CANCELLED, DOWNLOAD_LICENCE_ONLY,
from unshackle.core.credential import Credential
from unshackle.core.drm import DRM_T, ClearKeyCENC, MonaLisa, PlayReady, Widevine
from unshackle.core.events import events
from unshackle.core.music import (
MusicAudioIntegrityError,
MusicMetadataResult,
MusicPlanner,
MusicRenderer,
file_md5,
verify_music_audio,
write_music_manifest,
write_music_metadata,
)
from unshackle.core.proxies import Basic, Gluetun, Hola, NordVPN, SurfsharkVPN, WindscribeVPN
from unshackle.core.service import Service
from unshackle.core.services import Services
from unshackle.core.title_cacher import get_account_hash
from unshackle.core.titles import Movie, Movies, Series, Song, Title_T
from unshackle.core.titles import Movie, Movies, Music, Series, Song, Title_T
from unshackle.core.titles.episode import Episode
from unshackle.core.tracks import Audio, Subtitle, Tracks, Video
from unshackle.core.tracks.attachment import Attachment
@@ -720,7 +730,7 @@ class dl:
if not config.output_template:
raise click.ClickException(
"No 'output_template' configured in your unshackle.yaml.\n"
"Please add an 'output_template' section with movies, series, and songs templates.\n"
"Please add an 'output_template' section with movies, series, and songs/music templates.\n"
"See unshackle-example.yaml for examples."
)
@@ -1265,7 +1275,7 @@ class dl:
)
with console.status(
"Authenticating with Remote Service..." if self.is_remote else "Authenticating with Service...",
"Preparing Remote Service Session..." if self.is_remote else "Preparing Service Session...",
spinner="dots",
):
try:
@@ -1378,12 +1388,43 @@ class dl:
if enrich_year and not titles.year:
titles.year = enrich_year
console.print(Padding(Rule(f"[rule.text]{titles.__class__.__name__}: {titles}"), (1, 2)))
music_mode = isinstance(titles, Music)
music_collection_mode = isinstance(titles, list) and bool(titles) and all(
isinstance(title, Music) for title in titles
)
music_titles = list(titles) if music_collection_mode else ([titles] if music_mode else [])
music_plans = {}
if music_titles:
music_renderer = MusicRenderer()
if music_collection_mode:
collection_label_getter = getattr(service, "get_music_collection_label", None)
collection_label = (
collection_label_getter(music_titles)
if callable(collection_label_getter)
else f"Music Collection ({len(music_titles)} Releases)"
)
if collection_label:
console.print(Padding(Rule(f"[rule.text]{collection_label}"), (1, 2)))
if list_:
for music_title in music_titles:
music_kind = MusicRenderer.display_kind(getattr(music_title, "kind", "") or "album")
console.print(Padding(Rule(f"[rule.text]{music_kind}: {music_title}"), (1, 2)))
current_plan = MusicPlanner(service).build(music_title)
music_plans[id(music_title)] = current_plan
music_renderable = music_renderer.render_plan(current_plan, verbose=True)
if music_renderable:
console.print(Padding(music_renderable, (0, 5)))
else:
console.print(Padding(Rule(f"[rule.text]{titles.__class__.__name__}: {titles}"), (1, 2)))
console.print(Padding(titles.tree(verbose=list_titles), (0, 5)))
console.print(Padding(titles.tree(verbose=list_titles), (0, 5)))
if list_titles:
return
if music_titles and list_:
return
# Enables manual selection for Series when --select-titles is set
if select_titles and isinstance(titles, Series):
console.print(Padding(Rule("[rule.text]Select Titles"), (1, 2)))
@@ -1474,6 +1515,431 @@ class dl:
latest_episode_id = f"{latest_ep.season}x{latest_ep.number}"
self.log.info(f"Latest episode mode: Selecting S{latest_ep.season:02}E{latest_ep.number:02}")
music_group_download = (
bool(music_titles)
and getattr(service, "GROUP_AUDIO_DOWNLOADS", False)
and not no_mux
and not video_only
and not subs_only
and not chapters_only
and not no_audio
)
if music_group_download:
def download_music_title(titles: Music, plan: Any) -> bool:
music_items: list[tuple[Song, Audio, Callable[..., None]]] = []
music_song_plans = {
id(song_plan.song): song_plan
for disc in plan.discs
for song_plan in disc.songs
}
music_renderer = MusicRenderer()
music_start_time = time.time()
music_kind = MusicRenderer.display_kind(getattr(titles, "kind", "") or "album")
console.print(Padding(Rule(f"[rule.text]{music_kind}: {titles}"), (1, 2)))
music_header = music_renderer.render_plan_header(plan)
if music_header:
console.print(Padding(music_header, (0, 5)))
def music_track_count(count: int) -> str:
return f"{count} track{'s' if count != 1 else ''}"
def format_elapsed_seconds(elapsed: float) -> str:
elapsed_int = int(elapsed)
minutes, seconds = divmod(elapsed_int, 60)
hours, minutes = divmod(minutes, 60)
value = f"{minutes:d}m{seconds:d}s"
return f"{hours:d}h{value}" if hours else value
def select_music_audio(song: Song) -> None:
if not audio_description:
song.tracks.select_audio(lambda x: not x.descriptive)
if acodec:
song.tracks.select_audio(lambda x: x.codec in acodec)
if not song.tracks.audio:
codec_names = ", ".join(c.name for c in acodec)
self.log.error(f"No audio tracks matching codecs for {song.name}: {codec_names}")
sys.exit(1)
if channels:
song.tracks.select_audio(lambda x: math.ceil(x.channels) == math.ceil(channels))
if not song.tracks.audio:
self.log.error(f"There's no {channels} Audio Track for {song.name}...")
sys.exit(1)
if no_atmos:
song.tracks.audio = [x for x in song.tracks.audio if not x.atmos]
if not song.tracks.audio:
self.log.error(f"No non-Atmos audio tracks available for {song.name}...")
sys.exit(1)
if abitrate:
song.tracks.select_audio(lambda x: x.bitrate and x.bitrate // 1000 == abitrate)
if not song.tracks.audio:
self.log.error(f"There's no {abitrate}kbps Audio Track for {song.name}...")
sys.exit(1)
if abitrate_min is not None and abitrate_max is not None:
song.tracks.select_audio(
lambda x: x.bitrate and abitrate_min <= x.bitrate // 1000 <= abitrate_max
)
if not song.tracks.audio:
self.log.error(
f"No Audio Track in {abitrate_min}-{abitrate_max}kbps range for {song.name}..."
)
sys.exit(1)
audio_languages = a_lang or lang
if audio_languages:
processed_lang = []
for language in audio_languages:
if language == "orig":
if song.language:
orig_lang = str(song.language) if hasattr(song.language, "__str__") else song.language
if orig_lang not in processed_lang:
processed_lang.append(orig_lang)
else:
self.log.warning("Original language not available for music track, skipping 'orig'")
elif language not in processed_lang:
processed_lang.append(language)
if "best" not in processed_lang and "all" not in processed_lang:
song.tracks.audio = song.tracks.by_language(
song.tracks.audio,
processed_lang,
per_language=1,
exact_match=exact_lang,
)
if not song.tracks.audio:
self.log.error(f"There's no {processed_lang} Audio Track for {song.name}...")
sys.exit(1)
self.log.debug("Getting Tracks")
tracks_label = "Getting Remote Tracks..." if self.is_remote else "Getting Tracks..."
with console.status(tracks_label, spinner="dots"):
for song in titles:
events.reset()
events.subscribe(events.Types.SEGMENT_DOWNLOADED, service.on_segment_downloaded)
events.subscribe(events.Types.TRACK_DOWNLOADED, service.on_track_downloaded)
events.subscribe(events.Types.TRACK_DECRYPTED, service.on_track_decrypted)
events.subscribe(events.Types.TRACK_REPACKED, service.on_track_repacked)
events.subscribe(events.Types.TRACK_MULTIPLEX, service.on_track_multiplex)
song.tracks.add(service.get_tracks(song), warn_only=True)
song.tracks.chapters = service.get_chapters(song)
song.tracks.sort_audio(by_language=a_lang or lang)
select_music_audio(song)
if not song.tracks.audio:
self.log.error(f"No audio tracks returned for {song.name}.")
sys.exit(1)
music_tree = Tree(
f"[repr.number]{len(titles)}[/] {'Track' if len(titles) == 1 else 'Tracks'}",
guide_style="bright_black",
)
for song in titles:
track = song.tracks.audio[0]
progress = Progress(
SpinnerColumn(finished_text=""),
BarColumn(),
" | ",
TimeRemainingColumn(compact=True, elapsed_when_finished=True),
" | ",
TextColumn("[progress.data.speed]{task.fields[downloaded]}"),
console=console,
speed_estimate_period=10,
)
task = progress.add_task("", downloaded="-")
state = {"total": 100.0}
def update_track_progress(
task_id: int = task,
_state: dict[str, float] = state,
_progress: Progress = progress,
**kwargs,
) -> None:
if "total" in kwargs and kwargs["total"] is not None:
_state["total"] = kwargs["total"]
downloaded_state = kwargs.get("downloaded")
if downloaded_state in {"Downloaded", "Decrypted", "[yellow]SKIPPED"}:
kwargs["completed"] = _state["total"]
_progress.update(task_id=task_id, **kwargs)
track_table = Table.grid()
track_table.add_row(music_renderer._song_line(song, titles))
song_plan = music_song_plans.get(id(song))
if song_plan and song_plan.selected:
track_table.add_row(music_renderer._option_line(song_plan.selected), style="text2")
else:
track_table.add_row(str(track)[6:], style="text2")
track_table.add_row(progress)
music_tree.add(track_table, guide_style="bright_black")
music_items.append((song, track, update_track_progress))
download_table = Table.grid()
download_table.add_row(music_tree)
try:
with Live(Padding(download_table, (1, 5)), console=console, refresh_per_second=5):
with ThreadPoolExecutor(downloads) as pool:
download_futures = [
pool.submit(
track.download,
session=track.session or service.session,
no_proxy_download=no_proxy_download,
prepare_drm=partial(
partial(self.prepare_drm, table=download_table),
track=track,
title=song,
certificate=partial(
service.get_widevine_service_certificate,
title=song,
track=track,
),
licence=partial(
service.get_playready_license
if is_playready_cdm(self.cdm)
else service.get_widevine_license,
title=song,
track=track,
),
cdm_only=cdm_only,
vaults_only=vaults_only,
export=export_path,
),
cdm=self.cdm,
max_workers=workers,
progress=progress_call,
)
for song, track, progress_call in music_items
]
for download in futures.as_completed(download_futures):
download.result()
except KeyboardInterrupt:
console.print(Padding(":x: Download Cancelled...", (0, 5, 1, 5)))
return
except Exception as e: # noqa
console.print(
Padding(
Group(
":x: Download Failed...",
f" {type(e).__name__}: {e}",
" An unexpected error occurred in one of the download workers.",
),
(1, 5),
)
)
console.print_exception()
return
if skip_dl:
console.log("Skipped downloads as --skip-dl was used...")
else:
dl_time = time_elapsed_since(music_start_time)
console.print(Padding(f"Track downloads finished in [progress.elapsed]{dl_time}[/]", (0, 5)))
integrity_results = {}
media_infos = {}
integrity_start = time.time()
try:
with console.status("Verifying audio integrity...", spinner="dots"):
for song, track, _ in music_items:
if not track.path or not track.path.exists():
continue
if track.needs_repack:
track.repackage()
events.emit(events.Types.TRACK_REPACKED, track=track)
media_info = MediaInfo.parse(track.path)
media_infos[id(track)] = media_info
integrity_results[id(track)] = verify_music_audio(
track.path,
song=song,
track=track,
media_info=media_info,
)
except MusicAudioIntegrityError as error:
console.print(Padding(f"Audio integrity failed: {error}", (0, 5, 1, 5)))
return
integrity_time = format_elapsed_seconds(time.time() - integrity_start)
source_md5 = {}
md5_elapsed = 0.0
md5_start = time.time()
with console.status("Recording MD5 checksums...", spinner="dots"):
for _, track, _ in music_items:
if track.path and track.path.exists():
source_md5[id(track)] = file_md5(track.path)
md5_elapsed += time.time() - md5_start
metadata_results = {}
final_paths = {}
final_md5 = {}
manifest_records = []
metadata_start = time.time()
metadata_warning = ""
used_final_paths: set[Path] = set()
with console.status("Writing music metadata...", spinner="dots"):
for song, track, _ in music_items:
if not track.path or not track.path.exists():
continue
media_info = media_infos.get(id(track)) or MediaInfo.parse(track.path)
final_dir = self.output_dir or config.directories.downloads
final_filename = song.get_filename(media_info, show_service=not no_source)
if not no_folder:
final_dir /= song.get_filename(media_info, show_service=not no_source, folder=True)
final_dir.mkdir(parents=True, exist_ok=True)
final_path = final_dir / f"{final_filename}{track.path.suffix}"
sep = config.get_template_separator("songs")
if final_path in used_final_paths:
index = 2
while final_path in used_final_paths:
final_path = final_dir / f"{final_filename.rstrip()}{sep}{index}{track.path.suffix}"
index += 1
try:
os.replace(track.path, final_path)
except OSError:
if final_path.exists():
final_path.unlink()
shutil.move(track.path, final_path)
used_final_paths.add(final_path)
final_paths[id(track)] = final_path
self.completed_files.append(final_path)
try:
metadata_results[id(track)] = write_music_metadata(
final_path,
song,
session=service.session,
source_md5=source_md5.get(id(track), ""),
)
except Exception as error:
metadata_warning = f"{type(error).__name__}: {error}"
self.log.warning(f"Music metadata failed for {song.name}: {metadata_warning}")
metadata_results[id(track)] = MusicMetadataResult(skipped=True, reason=metadata_warning)
metadata_time = format_elapsed_seconds(time.time() - metadata_start)
final_md5_start = time.time()
with console.status("Recording final MD5 checksums...", spinner="dots"):
for _, track, _ in music_items:
final_path = final_paths.get(id(track))
if final_path and final_path.exists():
final_md5[id(track)] = file_md5(final_path)
md5_elapsed += time.time() - final_md5_start
md5_time = format_elapsed_seconds(md5_elapsed)
for song, track, _ in music_items:
final_path = final_paths.get(id(track))
if not final_path:
continue
integrity_result = integrity_results.get(id(track))
metadata_result = metadata_results.get(id(track), MusicMetadataResult(skipped=True, reason="not processed"))
manifest_records.append(
{
"track_number": song.track,
"disc_number": song.disc,
"title": song.name,
"artist": song.artist,
"album": song.album,
"file": str(final_path),
"source_md5": source_md5.get(id(track), ""),
"final_md5": final_md5.get(id(track), ""),
"integrity": integrity_result.to_dict() if integrity_result else {},
"metadata": metadata_result.to_dict(),
}
)
if final_paths:
manifest_slug = ".".join(
part
for part in re.sub(
r"[^A-Za-z0-9._-]+",
".",
".".join(
str(part or "")
for part in (
self.service,
getattr(titles, "artist", ""),
getattr(titles, "title", ""),
getattr(titles, "year", ""),
int(time.time()),
)
),
)
.strip(".")
.split(".")
if part
)
try:
write_music_manifest(
config.directories.logs / "music" / f"{manifest_slug or 'music'}.json",
release={
"kind": getattr(titles, "kind", ""),
"title": getattr(titles, "title", ""),
"artist": getattr(titles, "artist", ""),
"year": getattr(titles, "year", None),
"service": self.service,
},
tracks=manifest_records,
)
except Exception as error:
self.log.warning(f"Music manifest write failed: {error}")
album_time = time_elapsed_since(music_start_time)
release_label = MusicRenderer.display_kind(getattr(titles, "kind", "") or "music")
integrity_count = len(integrity_results)
md5_count = len(final_md5)
metadata_written_count = sum(1 for result in metadata_results.values() if result.written)
console.print(
Padding(
f"Audio integrity verified for {music_track_count(integrity_count)} in [progress.elapsed]{integrity_time}[/]",
(0, 5),
)
)
console.print(
Padding(
f"MD5 checksum recorded for {music_track_count(md5_count)} in [progress.elapsed]{md5_time}[/]",
(0, 5),
)
)
if metadata_written_count:
console.print(
Padding(
f"Metadata written for {music_track_count(metadata_written_count)} in [progress.elapsed]{metadata_time}[/]",
(0, 5),
)
)
else:
reason = metadata_warning or next(
(result.reason for result in metadata_results.values() if result.reason),
"install mutagen to write music tags",
)
console.print(Padding(f"Metadata skipped: {reason}", (0, 5)))
console.print(Padding(f"{release_label} downloaded in [progress.elapsed]{album_time}[/]!", (0, 5, 1, 5)))
return True
for music_title in music_titles:
current_plan = music_plans.get(id(music_title)) or MusicPlanner(service).build(music_title)
if not download_music_title(music_title, current_plan):
return
if not hasattr(service, "close"):
cookie_file = self.get_cookie_path(self.service, self.profile)
if cookie_file:
self.save_cookies(cookie_file, service.session.cookies)
if hasattr(service, "close"):
service.close()
dl_time = time_elapsed_since(start_time)
console.print(Padding(f"Processed all titles in [progress.elapsed]{dl_time}", (0, 5, 1, 5)))
return
if music_collection_mode:
raise click.ClickException("Music collections require grouped audio downloads.")
for i, title in enumerate(titles):
if isinstance(title, Episode) and latest_episode and latest_episode_id:
# If --latest-episode is set, only process the latest episode
@@ -1482,7 +1948,12 @@ class dl:
elif isinstance(title, Episode) and wanted and f"{title.season}x{title.number}" not in wanted:
continue
console.print(Padding(Rule(f"[rule.text]{title}"), (1, 2)))
title_rule = (
f"Track {title.track:02}: {title.name}"
if music_mode and isinstance(title, Song)
else str(title)
)
console.print(Padding(Rule(f"[rule.text]{title_rule}"), (1, 2)))
temp_font_files = []
if isinstance(title, Episode) and not self.tmdb_searched:
@@ -2903,8 +3374,12 @@ class dl:
tags.tag_file(final_path, title, self.tmdb_id, self.imdb_id)
title_dl_time = time_elapsed_since(dl_start_time)
downloaded_label = "Track" if music_mode and isinstance(title, Song) else "Title"
console.print(
Padding(f":tada: Title downloaded in [progress.elapsed]{title_dl_time}[/]!", (0, 5, 1, 5))
Padding(
f":tada: {downloaded_label} downloaded in [progress.elapsed]{title_dl_time}[/]!",
(0, 5, 1, 5),
)
)
if not hasattr(service, "close"):

View File

@@ -146,8 +146,17 @@ class Config:
"tag",
"track_number",
"artist",
"album_artist",
"album",
"disc",
"track_total",
"disc_total",
"release_type",
"genre",
"explicit",
"isrc",
"upc",
"label",
"audio",
"audio_channels",
"audio_full",
@@ -168,8 +177,8 @@ class Config:
if self.folder_template:
all_templates["folder"] = self.folder_template
for kind, tmpl in self.folder_templates.items():
if kind not in {"movies", "series", "songs"}:
warnings.warn(f"Unknown folder template kind '{kind}' (expected movies/series/songs)")
if kind not in {"movies", "series", "songs", "albums"}:
warnings.warn(f"Unknown folder template kind '{kind}' (expected movies/series/songs/albums)")
continue
all_templates[f"folder.{kind}"] = tmpl
@@ -195,7 +204,7 @@ class Config:
def get_folder_template(self, kind: str) -> str:
"""Resolve the folder template for the given title kind.
kind: one of "movies", "series", "songs".
kind: one of "movies", "series", "songs", "albums".
Falls back to the legacy single-string folder template, then "".
"""
if self.folder_templates:

View File

@@ -0,0 +1,40 @@
from .display import MusicHeaderInfo, TrackRow, render_album_header, render_artwork_preview, render_track_panel
from .extract import (build_music_from_songs, classify_release_kind, dedupe_track_options, duration_seconds,
first_number, first_text, format_duration, format_names, year_from_value)
from .hasher import file_md5
from .integrity import MusicAudioIntegrityError, MusicAudioIntegrityResult, verify_music_audio
from .manifest import write_music_manifest
from .models import MusicDiscPlan, MusicDownloadPlan, MusicSongPlan, MusicTrackOption
from .planner import MusicPlanner
from .renderer import MusicRenderer
from .tagger import MusicMetadataResult, write_music_metadata
__all__ = (
"MusicAudioIntegrityError",
"MusicAudioIntegrityResult",
"MusicDiscPlan",
"MusicDownloadPlan",
"MusicHeaderInfo",
"MusicMetadataResult",
"MusicPlanner",
"MusicRenderer",
"MusicSongPlan",
"MusicTrackOption",
"TrackRow",
"build_music_from_songs",
"classify_release_kind",
"dedupe_track_options",
"duration_seconds",
"file_md5",
"first_number",
"first_text",
"format_duration",
"format_names",
"render_album_header",
"render_artwork_preview",
"render_track_panel",
"verify_music_audio",
"write_music_manifest",
"write_music_metadata",
"year_from_value",
)

View File

@@ -0,0 +1,175 @@
"""Shared rich-based rendering helpers for music releases.
Data-in / renderable-out: services do their own quality and field extraction,
then hand plain data here. The helpers know nothing about how a ``quality_label``
was derived; they only style and lay it out. Quality-schema parsing stays
per-service while panel/header/artwork rendering lives in one place.
"""
from __future__ import annotations
import re
from dataclasses import dataclass
from io import BytesIO
from typing import Any, Optional
# RenderableType is rich's union of "things that can be printed to a console".
from rich.console import Group, RenderableType
from rich.padding import Padding
from rich.panel import Panel
from rich.table import Table
from rich.text import Text
from rich.tree import Tree
from unshackle.core.titles.music import Song
@dataclass
class TrackRow:
"""One row in the track tree, already formatted by the calling service.
When ``note`` is set the row is treated as unavailable: the quality label is
styled red, ``note`` is appended muted, and the ``cd``/``hires`` badges are
suppressed. Otherwise the normal detail line is built and ``cd`` drives a
"CD" badge.
"""
song: Song
quality_label: str
layout: str
duration_str: str
hires: bool = False
cd: bool = False
note: str = ""
@dataclass
class MusicHeaderInfo:
"""Plain data holder for the release-header metadata grid."""
artist: str
album: str
year: str
track_count: int
quality_label: str
duration_str: str = ""
artist_label: str = "Artist"
# Quality strings look like "FLAC 16-bit/44.1kHz"; split the codec token off the
# front and bracket it so the detail line reads "[FLAC] | 16-bit/44.1kHz".
QUALITY_DETAIL_RE = re.compile(r"^(FLAC|OGG|AAC|MP3)\s+(.+)$", flags=re.IGNORECASE)
def format_track_detail_quality(quality_label: str) -> str:
"""Bracket the leading codec token of a quality string for the detail line."""
text = str(quality_label or "").strip()
match = QUALITY_DETAIL_RE.match(text)
if match:
return f"[{match.group(1).upper()}] | {match.group(2).strip()}"
return text
def render_artwork_preview(
session: Any,
artwork_url: str,
*,
width: int = 25,
) -> Optional[RenderableType]:
"""Fetch artwork via ``session`` and render it as half-block coloured cells.
Pillow (``PIL``) is an optional dependency; any error (missing PIL, network,
bad image, zero dimensions) soft-fails to ``None`` so callers skip the preview.
"""
if not artwork_url:
return None
try:
from PIL import Image
except Exception:
return None
try:
response = session.get(artwork_url, timeout=20)
response.raise_for_status()
image = Image.open(BytesIO(response.content)).convert("RGB")
if not image.width or not image.height:
return None
height = max(1, int(width * image.height / image.width * 0.5))
# Image.Resampling exists on Pillow >= 9.1; fall back gracefully on older.
resampling = getattr(getattr(Image, "Resampling", Image), "LANCZOS", 1)
image = image.resize((width, height * 2), resampling)
lines = []
for y in range(0, image.height, 2):
line = Text()
for x in range(image.width):
top = image.getpixel((x, y))
bottom = image.getpixel((x, min(y + 1, image.height - 1)))
top_color = f"#{top[0]:02x}{top[1]:02x}{top[2]:02x}"
bottom_color = f"#{bottom[0]:02x}{bottom[1]:02x}{bottom[2]:02x}"
# Lower-half block fg=bottom/bg=top packs two pixel rows per cell.
line.append("", style=f"{bottom_color} on {top_color}")
lines.append(line)
return Group(*lines)
except Exception:
return None
def render_track_panel(rows: list[TrackRow], total: int) -> Panel:
"""Render the per-track tree (one node + detail line each) in a Panel."""
track_label = "Track" if total == 1 else "Tracks"
tree = Tree(f"[repr.number]{total}[/] {track_label}", guide_style="bright_black")
for row in rows:
title_line = Text(f"{row.song.track:02}", style="repr.number")
title_line.append(" ")
title_line.append(row.song.name, style="bold #009900")
node = tree.add(title_line, guide_style="bright_black")
detail = Text()
if row.note:
detail.append(row.quality_label, style="red")
detail.append(" ")
detail.append(row.note, style="bright_black")
else:
detail.append(format_track_detail_quality(row.quality_label))
if row.layout:
detail.append(" | ")
detail.append(row.layout)
if row.duration_str:
detail.append(" | ")
detail.append(row.duration_str)
if row.cd:
detail.append(" | ")
detail.append("CD", style="yellow1")
if row.hires:
detail.append(" | ")
detail.append("Hi-Res", style="gold1")
node.add(detail, guide_style="bright_black")
return Panel(tree, title="Available Tracks")
def render_album_header(
info: MusicHeaderInfo,
artwork: Optional[RenderableType] = None,
) -> Optional[RenderableType]:
"""Render the release metadata grid, optionally placing artwork beside it."""
# Table.grid is a borderless table used purely for column alignment.
grid = Table.grid(padding=(0, 2))
grid.add_column(style="orchid1", no_wrap=True)
grid.add_column()
grid.add_row(info.artist_label, Text(info.artist, style="bold #ff0000"))
grid.add_row("Collection", Text(info.album, style="blue"))
grid.add_row("Year", Text(info.year, style="blue"))
grid.add_row("Tracks", Text(str(info.track_count), style="blue"))
grid.add_row("Quality", Text(info.quality_label, style="blue"))
if info.duration_str:
grid.add_row("Length", Text(info.duration_str, style="blue"))
if not artwork:
return grid
header = Table.grid(expand=True, padding=(0, 2))
header.add_column(no_wrap=True)
header.add_column(ratio=1)
header.add_row(artwork, Padding(grid, (3, 0, 0, 0)))
return header

View File

@@ -0,0 +1,196 @@
"""Shared, stateless helpers for music services.
These functions consolidate the generic data-shaping logic that music services
otherwise each duplicate: first-non-empty getters, duration/year/name
formatting, release-kind classification, track-option dedupe, and Song -> Music
assembly. They take plain data in and return plain data out (no ``self``), so
any music service can reuse them.
"""
from __future__ import annotations
import re
from typing import Any, Optional
from unshackle.core.music.models import MusicTrackOption
from unshackle.core.titles.music import Music, Song
def first_text(*values: Any, default: str = "") -> str:
"""Return the first non-empty stripped string across ``values``.
Dicts are searched by common label keys; lists are joined with ", ".
Falls back to ``default`` when nothing usable is found.
"""
for value in values:
if value in (None, "", [], {}):
continue
if isinstance(value, dict):
for key in ("name", "title", "display", "display_name", "description", "url"):
nested = value.get(key)
text = first_text(nested) if isinstance(nested, (dict, list)) else str(nested or "").strip()
if text:
return text
elif isinstance(value, list):
parts = [first_text(item) for item in value]
text = ", ".join(part for part in parts if part)
if text:
return text
else:
text = str(value).strip()
if text:
return text
return default
def first_number(*values: Any) -> Optional[float]:
"""Return the first value parseable as a float, else ``None``."""
for value in values:
if value in (None, ""):
continue
try:
return float(value)
except (TypeError, ValueError):
continue
return None
def year_from_value(value: Any) -> int:
"""Extract a 4-digit year from ``value``; fall back to 1900 when absent."""
match = re.search(r"(?P<year>\d{4})", str(value or ""))
return int(match.group("year")) if match else 1900
def duration_seconds(value: Any) -> Optional[float]:
"""Coerce a duration to seconds, treating large numbers (>10000) as milliseconds."""
number = first_number(value)
if number is None:
return None
if number > 10_000:
return number / 1000
return number
def format_duration(value: Any) -> str:
"""Format a duration in seconds as ``H:MM:SS`` (or ``M:SS`` under an hour)."""
seconds_value = first_number(value)
if seconds_value is None:
return ""
total = max(0, int(round(seconds_value)))
minutes, remaining = divmod(total, 60)
hours, minutes = divmod(minutes, 60)
if hours:
return f"{hours}:{minutes:02}:{remaining:02}"
return f"{minutes}:{remaining:02}"
def format_names(value: Any, sep: str = ", ") -> str:
"""Join a list/dict of artist-like entries into a de-duplicated string."""
names: list[str] = []
values: Any = value
if isinstance(values, dict):
values = values.get("items") or values.get("artists") or [values]
if not isinstance(values, list):
values = [values]
for item in values:
name = first_text(
item.get("profile") if isinstance(item, dict) else None,
item.get("artist") if isinstance(item, dict) else None,
item if isinstance(item, dict) else None,
item,
default="",
)
if name and name not in names:
names.append(name)
return sep.join(names)
def classify_release_kind(raw_kind: str, tracks_count: Optional[float]) -> str:
"""Normalise an already-extracted release-kind string to a canonical value.
Returns one of ``single | ep | album | compilation | live | download |
playlist | other``. ``tracks_count`` disambiguates "single" (1 track) from
an EP (more than 1 track) for sources that label EPs as singles; pass the
real track count (or ``None`` only when genuinely unknown) — it is required
so no caller silently mislabels a multi-track "single" as an EP by omission.
"""
key = re.sub(r"[^a-z0-9]+", "", str(raw_kind or "").lower())
if key in {"single"}:
return "single" if tracks_count == 1 else "ep"
if key in {"ep", "extendedplay"}:
return "ep"
if key in {"epsingle", "epsingles"}:
return "single" if tracks_count == 1 else "ep"
if key in {"compilation", "compilations"}:
return "compilation"
if key in {"live", "liverecording"}:
return "live"
if key in {"download", "downloads"}:
return "download"
if key in {"playlist", "playlists"}:
return "playlist"
if key in {"other"}:
return "other"
return "album"
def dedupe_track_options(options: list[MusicTrackOption]) -> list[MusicTrackOption]:
"""Drop duplicate track options keyed on codec/quality identity, preserving order."""
seen: set[tuple[str, Optional[int], Optional[int], Optional[int], str, bool]] = set()
unique: list[MusicTrackOption] = []
for option in options:
key = (
str(option.codec or "").upper(),
option.bit_depth,
option.sample_rate,
option.bitrate,
option.quality_label,
option.explicit,
)
if key in seen:
continue
seen.add(key)
unique.append(option)
return unique
def build_music_from_songs(
songs: list[Song],
*,
kind: str,
title: Optional[str] = None,
artist: Optional[str] = None,
owner: Optional[str] = None,
description: Optional[str] = None,
empty_error: str = "No songs were returned.",
) -> Music:
"""Assemble a :class:`Music` release from a list of :class:`Song` objects.
Aggregates artwork and total duration from each song's ``data`` payload and
derives track/disc totals. Raises ``ValueError(empty_error)`` on an empty list.
"""
if not songs:
raise ValueError(empty_error)
first_song = songs[0]
artwork_url = ""
total_duration = 0
for song in songs:
data = song.data if isinstance(song.data, dict) else {}
artwork_url = artwork_url or first_text(song.artwork_url, data.get("artwork_url"))
total_duration += int(first_number(data.get("duration")) or 0)
return Music(
songs,
kind=kind,
title=title or first_song.album,
artist=artist or first_song.album_artist or first_song.artist,
year=first_song.year,
total_tracks=max((song.total_tracks or song.track for song in songs), default=len(songs)),
total_discs=max((song.total_discs or song.disc for song in songs), default=1),
artwork_url=artwork_url or None,
total_duration=total_duration or None,
owner=owner,
description=description,
)

View File

@@ -0,0 +1,16 @@
from __future__ import annotations
import hashlib
from pathlib import Path
def file_md5(path: Path, chunk_size: int = 1024 * 1024) -> str:
"""Return the MD5 checksum for a local file."""
digest = hashlib.md5()
with path.open("rb") as file:
for chunk in iter(lambda: file.read(chunk_size), b""):
digest.update(chunk)
return digest.hexdigest()
__all__ = ("file_md5",)

View File

@@ -0,0 +1,187 @@
from __future__ import annotations
import subprocess
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Optional
from pymediainfo import MediaInfo
from unshackle.core import binaries
class MusicAudioIntegrityError(Exception):
"""Raised when a downloaded music audio file cannot be trusted."""
@dataclass
class MusicAudioIntegrityResult:
path: Path
size: int
codec: str = ""
duration: Optional[float] = None
bit_depth: Optional[int] = None
sample_rate: Optional[int] = None
channels: Optional[float] = None
flac_tested: bool = False
warnings: list[str] = field(default_factory=list)
def to_dict(self) -> dict[str, Any]:
return {
"path": str(self.path),
"size": self.size,
"codec": self.codec,
"duration": self.duration,
"bit_depth": self.bit_depth,
"sample_rate": self.sample_rate,
"channels": self.channels,
"flac_tested": self.flac_tested,
"warnings": self.warnings,
}
def verify_music_audio(path: Path, *, song: Any = None, track: Any = None, media_info: Optional[MediaInfo] = None) -> MusicAudioIntegrityResult:
"""Verify a downloaded music audio file after it is playable/decrypted."""
path = Path(path)
if not path.exists():
raise MusicAudioIntegrityError(f"Audio file is missing: {path}")
size = path.stat().st_size
if size <= 3:
raise MusicAudioIntegrityError(f"Audio file is empty: {path}")
media_info = media_info or MediaInfo.parse(path)
audio_track = next(iter(media_info.audio_tracks or []), None)
if not audio_track:
raise MusicAudioIntegrityError(f"MediaInfo could not find an audio stream in: {path.name}")
duration = _duration_seconds(getattr(audio_track, "duration", None))
expected_duration = _expected_duration(song, track)
warnings: list[str] = []
if expected_duration and duration:
tolerance = max(3.0, expected_duration * 0.02)
if abs(duration - expected_duration) > tolerance:
raise MusicAudioIntegrityError(
f"{path.name} duration mismatch: expected {expected_duration:.0f}s, got {duration:.0f}s"
)
result = MusicAudioIntegrityResult(
path=path,
size=size,
codec=_first_text(
getattr(audio_track, "format", None),
getattr(audio_track, "commercial_name", None),
getattr(audio_track, "codec_id", None),
),
duration=duration,
bit_depth=_first_int(getattr(audio_track, "bit_depth", None)),
sample_rate=_first_int(getattr(audio_track, "sampling_rate", None)),
channels=_first_float(
getattr(audio_track, "channel_s", None),
getattr(audio_track, "channel_s_original", None),
getattr(track, "channels", None),
),
warnings=warnings,
)
expected_size = _expected_size(track)
if expected_size and result.size != expected_size:
result.warnings.append(f"Size differs from service metadata: expected {expected_size}, got {result.size}")
if path.suffix.lower() == ".flac":
flac = binaries.find("flac")
if flac:
completed = subprocess.run(
[str(flac), "-t", "-s", str(path)],
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
encoding="utf-8",
errors="replace",
)
if completed.returncode != 0:
detail = (completed.stderr or completed.stdout or "").strip()
raise MusicAudioIntegrityError(f"FLAC stream test failed for {path.name}: {detail}")
result.flac_tested = True
else:
result.warnings.append("flac binary not available; FLAC stream test skipped")
return result
def _expected_duration(song: Any, track: Any) -> Optional[float]:
data = getattr(song, "data", None)
if isinstance(data, dict):
metadata = data.get("metadata") if isinstance(data.get("metadata"), dict) else {}
value = _first_float(data.get("duration"), metadata.get("duration"))
if value:
return value
track_data = getattr(track, "data", None)
if isinstance(track_data, dict):
metadata = track_data.get("metadata") if isinstance(track_data.get("metadata"), dict) else {}
value = _first_float(track_data.get("duration"), metadata.get("duration"))
if value:
return value
return None
def _expected_size(track: Any) -> Optional[int]:
data = getattr(track, "data", None)
if not isinstance(data, dict):
return None
sources = [
data,
data.get("file_info") if isinstance(data.get("file_info"), dict) else {},
data.get("audio_info") if isinstance(data.get("audio_info"), dict) else {},
data.get("source_info") if isinstance(data.get("source_info"), dict) else {},
data.get("stream_info") if isinstance(data.get("stream_info"), dict) else {},
data.get("metadata") if isinstance(data.get("metadata"), dict) else {},
]
for source in sources:
value = _first_int(
source.get("content_length"),
source.get("contentLength"),
source.get("file_size"),
source.get("filesize"),
source.get("size"),
)
if value:
return value
return None
def _duration_seconds(value: Any) -> Optional[float]:
number = _first_float(value)
if number is None:
return None
if number > 1000:
return number / 1000
return number
def _first_text(*values: Any) -> str:
for value in values:
text = str(value or "").strip()
if text:
return text
return ""
def _first_float(*values: Any) -> Optional[float]:
for value in values:
if value in (None, "", [], {}):
continue
text = str(value).strip().replace(" ", "")
try:
return float(text)
except (TypeError, ValueError):
continue
return None
def _first_int(*values: Any) -> Optional[int]:
value = _first_float(*values)
return int(value) if value is not None else None
__all__ = ("MusicAudioIntegrityError", "MusicAudioIntegrityResult", "verify_music_audio")

View File

@@ -0,0 +1,23 @@
from __future__ import annotations
import json
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
def write_music_manifest(path: Path, *, release: dict[str, Any], tracks: list[dict[str, Any]]) -> Path:
"""Write a compact JSON manifest for music integrity, checksums, and metadata results."""
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
payload = {
"schema": "unshackle.music.manifest.v1",
"generated_at": datetime.now(timezone.utc).isoformat(),
"release": release,
"tracks": tracks,
}
path.write_text(json.dumps(payload, indent=2, ensure_ascii=False), encoding="utf-8")
return path
__all__ = ("write_music_manifest",)

View File

@@ -0,0 +1,58 @@
from __future__ import annotations
from dataclasses import dataclass, field
from pathlib import Path
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from unshackle.core.titles.music import Song
@dataclass
class MusicTrackOption:
codec: str
bit_depth: Optional[int] = None
sample_rate: Optional[int] = None
bitrate: Optional[int] = None
channels: Optional[float] = None
lossless: bool = False
hires: bool = False
atmos: bool = False
explicit: bool = False
duration: Optional[int] = None
quality_label: str = ""
@dataclass
class MusicSongPlan:
song: Song
options: list[MusicTrackOption] = field(default_factory=list)
selected: Optional[MusicTrackOption] = None
output_path: Optional[Path] = None
fallback_used: bool = False
skip_reason: str = ""
@dataclass
class MusicDiscPlan:
disc_number: int
songs: list[MusicSongPlan] = field(default_factory=list)
@dataclass
class MusicDownloadPlan:
kind: str
title: str
artist: str
album_artist: str = ""
year: Optional[int] = None
released: str = ""
genre: str = ""
label: str = ""
artwork_url: Optional[str] = None
total_tracks: Optional[int] = None
total_discs: Optional[int] = None
total_duration: Optional[int] = None
discs: list[MusicDiscPlan] = field(default_factory=list)
quality_requested: str = "best"
fallback_mode: str = "next-best"

View File

@@ -0,0 +1,208 @@
from __future__ import annotations
import re
from collections import defaultdict
from typing import Any
from unshackle.core.music.models import MusicDiscPlan, MusicDownloadPlan, MusicSongPlan, MusicTrackOption
from unshackle.core.titles.music import Music, Song
class MusicPlanner:
"""Build a service-neutral music list/download plan for native Music rendering."""
def __init__(self, service: Any):
self.service = service
def build(self, music: Music) -> MusicDownloadPlan:
first_song = music[0] if music else None
first_data = first_song.data if first_song and isinstance(first_song.data, dict) else {}
first_metadata = first_data.get("metadata") if isinstance(first_data.get("metadata"), dict) else {}
plan = MusicDownloadPlan(
kind=getattr(music, "kind", "music"),
title=getattr(music, "title", None) or (first_song.album if first_song else ""),
artist=getattr(music, "artist", None) or (first_song.album_artist or first_song.artist if first_song else ""),
album_artist=(first_song.album_artist if first_song else "") or "",
year=getattr(music, "year", None) or (first_song.year if first_song else None),
released=self._first_text(
getattr(music, "released", None),
getattr(music, "release_date", None),
first_data.get("release_date"),
first_data.get("released_at"),
first_metadata.get("release_date"),
first_metadata.get("released_at"),
),
genre=(first_song.genre if first_song else "") or "",
label=(first_song.label if first_song else "") or "",
artwork_url=getattr(music, "artwork_url", None),
total_tracks=getattr(music, "total_tracks", None) or len(music),
total_discs=getattr(music, "total_discs", None) or self._max_value(music, "disc"),
total_duration=getattr(music, "total_duration", None) or self._sum_duration(music),
quality_requested=self._quality_requested(music),
)
selected_options: list[MusicTrackOption] = []
discs: dict[int, list[MusicSongPlan]] = defaultdict(list)
for song in music:
options = self._get_options(song)
selected = options[0] if options else None
if selected:
selected_options.append(selected)
discs[song.disc].append(
MusicSongPlan(
song=song,
options=options,
selected=selected,
)
)
for disc_number in sorted(discs):
plan.discs.append(MusicDiscPlan(disc_number=disc_number, songs=discs[disc_number]))
quality = self._quality_summary_from_options(selected_options)
if quality:
plan.quality_requested = quality
return plan
def _get_options(self, song: Song) -> list[MusicTrackOption]:
provider = getattr(self.service, "get_music_track_options", None)
if callable(provider):
options = provider(song)
if options:
return options
option = self._option_from_song(song)
return [option] if option else []
@staticmethod
def _option_from_song(song: Song) -> MusicTrackOption | None:
data = song.data if isinstance(song.data, dict) else {}
metadata = data.get("metadata") if isinstance(data.get("metadata"), dict) else {}
quality = MusicPlanner._first_text(data.get("quality"), metadata.get("quality"), metadata.get("quality_label"))
duration = MusicPlanner._first_number(data.get("duration"), metadata.get("duration"))
if not quality and duration is None:
return None
codec = MusicPlanner._codec_from_quality(quality)
bit_depth, sample_rate = MusicPlanner._quality_numbers(quality)
bitrate = MusicPlanner._bitrate_from_quality(quality)
hires = bool((bit_depth and bit_depth > 16) or (sample_rate and sample_rate > 48000))
lossless = codec in {"FLAC", "ALAC", "WAV", "AIFF"} or "lossless" in quality.lower()
atmos = "atmos" in quality.lower() or bool(data.get("atmos") or metadata.get("atmos"))
explicit = bool(getattr(song, "explicit", None) or data.get("explicit") or metadata.get("explicit"))
return MusicTrackOption(
codec=codec,
bit_depth=bit_depth,
sample_rate=sample_rate,
bitrate=bitrate,
channels=MusicPlanner._first_number(data.get("channels"), metadata.get("channels")),
lossless=lossless,
hires=hires,
atmos=atmos,
explicit=explicit,
duration=int(duration) if duration is not None else None,
quality_label=quality,
)
def _quality_requested(self, music: Music) -> str:
service_quality = getattr(self.service, "quality", None)
if service_quality:
quality_map = {
27: "Hi-Res Lossless",
7: "Hi-Res Lossless",
6: "Lossless",
5: "MP3",
}
return quality_map.get(service_quality, str(service_quality))
first_song = music[0] if music else None
if not first_song:
return ""
data = first_song.data if isinstance(first_song.data, dict) else {}
return self._first_text(data.get("quality"))
@staticmethod
def _first_text(*values: Any) -> str:
for value in values:
if value in (None, ""):
continue
text = str(value).strip()
if text:
return text
return ""
@staticmethod
def _first_number(*values: Any) -> float | None:
for value in values:
if value in (None, ""):
continue
try:
return float(value)
except (TypeError, ValueError):
continue
return None
@staticmethod
def _codec_from_quality(value: str) -> str:
lowered = value.lower()
for codec in ("flac", "alac", "aac", "mp3", "wav", "aiff"):
if codec in lowered:
return codec.upper()
return ""
@staticmethod
def _quality_numbers(value: str) -> tuple[int | None, int | None]:
bit_depth = None
sample_rate = None
bit_match = re.search(r"(?P<bits>\d+)\s*[- ]?bit", value.lower())
if bit_match:
bit_depth = int(bit_match.group("bits"))
sample_match = re.search(r"(?P<rate>\d+(?:\.\d+)?)\s*k(?:hz)?", value.lower())
if sample_match:
sample_rate = int(float(sample_match.group("rate")) * 1000)
return bit_depth, sample_rate
@staticmethod
def _bitrate_from_quality(value: str) -> int | None:
match = re.search(r"(?P<rate>\d+)\s*kb/s", value.lower())
if not match:
return None
return int(match.group("rate")) * 1000
@staticmethod
def _quality_summary_from_options(options: list[MusicTrackOption]) -> str:
if not options:
return ""
codecs = {str(option.codec or "").upper() for option in options if option.codec}
labels = [option.quality_label for option in options if option.quality_label]
if any(option.atmos for option in options):
return "Dolby Atmos"
if any(option.hires and (option.lossless or str(option.codec or "").upper() in {"FLAC", "ALAC", "WAV", "AIFF"}) for option in options):
return "Hi-Res Lossless"
if any(option.lossless or str(option.codec or "").upper() in {"FLAC", "ALAC", "WAV", "AIFF"} for option in options):
return "Lossless"
if "AAC" in codecs:
return "AAC"
if "MP3" in codecs:
return "MP3"
return MusicPlanner._first_text(*labels)
@staticmethod
def _sum_duration(music: Music) -> int | None:
total = 0
for song in music:
data = song.data if isinstance(song.data, dict) else {}
value = MusicPlanner._first_number(data.get("duration"))
total += int(value or 0)
return total or None
@staticmethod
def _max_value(music: Music, attr: str) -> int | None:
values = [getattr(song, attr, 0) or 0 for song in music]
return max(values, default=0) or None
__all__ = ("MusicPlanner",)

View File

@@ -0,0 +1,510 @@
from __future__ import annotations
import re
from typing import Any, Optional
from rich.console import Group, RenderableType
from rich.markup import escape
from rich.panel import Panel
from rich.table import Table
from rich.text import Text
from rich.tree import Tree
from unshackle.core.music.models import MusicDownloadPlan, MusicTrackOption
from unshackle.core.titles.music import Music, Song
class MusicRenderer:
"""Render native Music title containers for the CLI."""
COMPACT_TRACK_LIMIT = 8
def render(self, music: Music, *, verbose: bool = False) -> RenderableType:
header = self.render_header(music)
tracks = self.render_tracks(music, verbose=verbose)
if header:
return Group(header, Text(""), tracks)
return tracks
def render_plan(self, plan: MusicDownloadPlan, *, verbose: bool = True) -> RenderableType:
header = self.render_plan_header(plan)
tracks = self.render_plan_tracks(plan, verbose=verbose)
if header:
return Group(header, Text(""), tracks)
return tracks
def render_header(self, music: Music) -> Optional[Table]:
if not music:
return None
first_song = music[0]
data = first_song.data if isinstance(first_song.data, dict) else {}
metadata = data.get("metadata") if isinstance(data.get("metadata"), dict) else {}
title = music.title or first_song.album
artist = music.artist or first_song.album_artist or first_song.artist
year = getattr(music, "year", None) or first_song.year
kind = self.display_kind(music.kind)
explicit = any(bool(getattr(song, "explicit", None)) for song in music)
total_tracks = getattr(music, "total_tracks", None) or len(music)
total_discs = getattr(music, "total_discs", None) or self._max_value(music, "disc")
released = self._format_release_date(
self._first_value(
getattr(music, "released", None),
getattr(music, "release_date", None),
data.get("release_date"),
data.get("released_at"),
metadata.get("release_date"),
metadata.get("released_at"),
)
)
length = self._format_total_duration(
getattr(music, "total_duration", None) or self._sum_duration(music)
)
quality = self._quality_summary(
self._first_text(
getattr(music, "quality", None),
data.get("quality"),
metadata.get("quality"),
metadata.get("quality_label"),
),
lossless=self._as_bool(self._first_value(data.get("lossless"), metadata.get("lossless"))),
hires=self._as_bool(self._first_value(data.get("hires"), metadata.get("hires"))),
)
grid = self._metadata_grid()
grid.add_row(Text("Title", style="bright_black"), Text(str(title)))
grid.add_row(Text("Artist", style="bright_black"), Text(str(artist)))
grid.add_row(Text("Type", style="bright_black"), self._kind_text(kind, explicit=explicit))
if released:
grid.add_row(Text("Released", style="bright_black"), Text(released))
if year:
grid.add_row(Text("Year", style="bright_black"), Text(str(year)))
grid.add_row(Text("Tracks", style="bright_black"), Text(str(total_tracks)))
if total_discs and total_discs > 1:
grid.add_row(Text("Discs", style="bright_black"), Text(str(total_discs)))
if length:
grid.add_row(Text("Length", style="bright_black"), Text(length))
if quality:
grid.add_row(Text("Quality", style="bright_black"), Text(quality))
if first_song.genre:
grid.add_row(Text("Genre", style="bright_black"), Text(first_song.genre))
if first_song.label:
grid.add_row(Text("Label", style="bright_black"), Text(first_song.label))
return grid
def render_tracks(self, music: Music, *, verbose: bool = False) -> Panel:
total = len(music)
track_label = "Track" if total == 1 else "Tracks"
tree = Tree(f"[repr.number]{total}[/] {track_label}", guide_style="bright_black")
visible_songs = list(music)
if not verbose and len(visible_songs) > self.COMPACT_TRACK_LIMIT:
visible_songs = visible_songs[: self.COMPACT_TRACK_LIMIT]
for song in visible_songs:
node = tree.add(self._song_line(song, music), guide_style="bright_black")
option = self._option_from_song(song)
if option:
node.add(option, guide_style="bright_black")
hidden = total - len(visible_songs)
if hidden > 0:
suffix = "s" if hidden != 1 else ""
tree.add(f"[bright_black]... {hidden} more track{suffix}[/]", guide_style="bright_black")
return Panel(tree, title="Available Tracks")
def render_plan_header(self, plan: MusicDownloadPlan) -> Optional[Table]:
title = plan.title
artist = plan.artist or plan.album_artist
kind = self.display_kind(plan.kind)
explicit = any(
bool(getattr(song_plan.song, "explicit", None) or (song_plan.selected and song_plan.selected.explicit))
for disc in plan.discs
for song_plan in disc.songs
)
released = self._format_release_date(getattr(plan, "released", None) or getattr(plan, "release_date", None))
length = self._format_total_duration(plan.total_duration)
quality = self._quality_summary(plan.quality_requested)
grid = self._metadata_grid()
if title:
grid.add_row(Text("Title", style="bright_black"), Text(str(title)))
if artist:
grid.add_row(Text("Artist", style="bright_black"), Text(str(artist)))
grid.add_row(Text("Type", style="bright_black"), self._kind_text(kind, explicit=explicit))
if released:
grid.add_row(Text("Released", style="bright_black"), Text(released))
if plan.year:
grid.add_row(Text("Year", style="bright_black"), Text(str(plan.year)))
if plan.total_tracks:
grid.add_row(Text("Tracks", style="bright_black"), Text(str(plan.total_tracks)))
if plan.total_discs and plan.total_discs > 1:
grid.add_row(Text("Discs", style="bright_black"), Text(str(plan.total_discs)))
if length:
grid.add_row(Text("Length", style="bright_black"), Text(length))
if quality:
grid.add_row(Text("Quality", style="bright_black"), Text(quality))
if plan.genre:
grid.add_row(Text("Genre", style="bright_black"), Text(str(plan.genre)))
if plan.label:
grid.add_row(Text("Label", style="bright_black"), Text(str(plan.label)))
return grid
def render_plan_tracks(self, plan: MusicDownloadPlan, *, verbose: bool = True) -> Panel:
songs = [song_plan for disc in plan.discs for song_plan in disc.songs]
total = len(songs)
track_label = "Track" if total == 1 else "Tracks"
tree = Tree(f"[repr.number]{total}[/] {track_label}", guide_style="bright_black")
visible_songs = songs
if not verbose and len(visible_songs) > self.COMPACT_TRACK_LIMIT:
visible_songs = visible_songs[: self.COMPACT_TRACK_LIMIT]
for song_plan in visible_songs:
node = tree.add(self._song_line(song_plan.song, plan), guide_style="bright_black")
if song_plan.skip_reason:
node.add(f"[yellow]Skipped:[/] {escape(song_plan.skip_reason)}", guide_style="bright_black")
continue
for option in song_plan.options:
node.add(self._option_line(option), guide_style="bright_black")
hidden = total - len(visible_songs)
if hidden > 0:
suffix = "s" if hidden != 1 else ""
tree.add(f"[bright_black]... {hidden} more track{suffix}[/]", guide_style="bright_black")
return Panel(tree, title="Available Tracks")
@staticmethod
def _metadata_grid() -> Table:
grid = Table.grid(padding=(0, 2))
grid.add_column(style="orchid1", no_wrap=True)
grid.add_column()
return grid
@staticmethod
def display_kind(kind: Any) -> str:
text = str(kind or "music").strip()
key = re.sub(r"[^a-z0-9]+", "", text.lower())
labels = {
"album": "Album",
"single": "Single",
"ep": "EP",
"epsingle": "Single",
"playlist": "Playlist",
"compilation": "Compilation",
"live": "Live",
"download": "Download",
"other": "Other",
"track": "Track",
"music": "Music",
}
if key in labels:
return labels[key]
return text.replace("_", " ").replace("-", " ").title()
def _song_line(self, song: Song, music: Music | MusicDownloadPlan) -> Text:
number = f"{song.disc}.{song.track:02}" if song.disc > 1 else f"{song.track:02}"
line = Text()
line.append(number, style="repr.number")
line.append(" ")
line.append(song.name, style="rule.text")
kind = getattr(music, "kind", "").lower()
release_artist = getattr(music, "artist", None) or getattr(music, "album_artist", None)
if kind == "playlist" and song.artist and song.artist != release_artist:
line.append(f" - {song.artist}", style="bright_black")
return line
def _option_from_song(self, song: Song) -> Text | str:
data = song.data if isinstance(song.data, dict) else {}
metadata = data.get("metadata") if isinstance(data.get("metadata"), dict) else {}
quality = self._first_text(data.get("quality"), metadata.get("quality"), metadata.get("quality_label"))
duration = self._format_duration(self._first_value(data.get("duration"), metadata.get("duration")))
reason = self._first_text(
data.get("unavailable_reason"),
data.get("skip_reason"),
metadata.get("unavailable_reason"),
metadata.get("skip_reason"),
)
if reason:
return f"[yellow]Skipped:[/] {escape(reason)}"
badges = []
if song.explicit:
badges.append(("E", "bold bright_red"))
if self._as_bool(self._first_value(data.get("atmos"), metadata.get("atmos"))):
badges.append(("Atmos", "magenta"))
if self._is_hires_quality(quality):
badges.append(("Hi-Res", "gold1"))
details = []
if quality:
details.append(quality)
if duration:
details.append(duration)
if not details and not badges:
return ""
return self._format_option_text(details, badges)
def _option_line(self, option: MusicTrackOption) -> Text:
parts = []
codec = str(option.codec or "").strip()
if codec:
parts.append(f"[{codec}]")
if option.quality_label:
parts.extend(self._split_quality_label(option.quality_label, codec))
elif option.bit_depth and option.sample_rate:
parts.append(f"{option.bit_depth}-bit/{self._format_sample_rate(option.sample_rate)}")
elif option.bitrate:
parts.append(f"{int(option.bitrate / 1000)} kb/s")
if option.channels:
parts.append(self._format_channels(option.channels))
if option.duration:
parts.append(self._format_duration(option.duration))
badges = []
if option.explicit:
badges.append(("E", "bold bright_red"))
if option.atmos:
badges.append(("Atmos", "magenta"))
if self._is_cd_option(option):
badges.append(("CD", "yellow1"))
if option.hires:
badges.append(("Hi-Res", "gold1"))
return self._format_option_text(parts, badges)
@staticmethod
def _kind_text(kind: str, *, explicit: bool = False) -> Text:
text = Text(str(kind))
if explicit:
text.append(" Explicit", style="bold red")
return text
@staticmethod
def _format_option_text(parts: list[str], badges: list[tuple[str, str]]) -> Text:
text = Text(style="text2")
first = True
for part in parts:
if not part:
continue
if not first:
text.append(" | ")
text.append(str(part))
first = False
for badge, style in badges:
if not first:
text.append(" | ")
text.append(badge, style=style)
first = False
return text
@staticmethod
def _format_option_parts(parts: list[str]) -> str:
if not parts:
return ""
return " | ".join(escape(part) for part in parts if part)
@staticmethod
def _split_quality_label(value: str, codec: str = "") -> list[str]:
text = str(value or "").strip()
if not text:
return []
if codec and text.lower().startswith(codec.lower()):
text = text[len(codec) :].strip()
return [text] if text else []
@staticmethod
def _is_cd_option(option: MusicTrackOption) -> bool:
codec = str(option.codec or "").upper()
if codec not in {"FLAC", "ALAC", "WAV", "AIFF"}:
return False
if option.bit_depth == 16 and option.sample_rate in {44100, 44100.0}:
return True
return "16-bit/44.1" in str(option.quality_label or "").lower()
@staticmethod
def _format_sample_rate(value: Any) -> str:
try:
sample_rate = float(value)
except (TypeError, ValueError):
return str(value).strip()
if sample_rate >= 1000:
sample_rate /= 1000
if sample_rate.is_integer():
return f"{int(sample_rate)} kHz"
return f"{sample_rate:g} kHz"
@staticmethod
def _format_channels(value: Any) -> str:
try:
channels = float(value)
except (TypeError, ValueError):
return str(value).strip()
if channels == 1:
return "Mono"
if channels == 2:
return "Stereo"
if channels.is_integer():
return f"{int(channels)}.0"
return f"{channels:g}"
@staticmethod
def _first_text(*values: Any) -> str:
for value in values:
if value is None:
continue
if isinstance(value, dict):
for key in ("label", "name", "title", "display_name", "value"):
nested = value.get(key)
if nested:
return str(nested).strip()
elif isinstance(value, (list, tuple)):
text = MusicRenderer._first_text(*value)
if text:
return text
else:
text = str(value).strip()
if text:
return text
return ""
@staticmethod
def _first_value(*values: Any) -> Any:
for value in values:
if value is not None and value != "":
return value
return None
@staticmethod
def _as_bool(value: Any) -> bool:
if isinstance(value, bool):
return value
if isinstance(value, str):
return value.strip().lower() in {"1", "true", "yes", "y"}
return bool(value)
@staticmethod
def _format_duration(value: Any) -> str:
if value in (None, ""):
return ""
try:
seconds = int(float(value))
except (TypeError, ValueError):
return str(value).strip()
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
if hours:
return f"{hours}:{minutes:02}:{seconds:02}"
return f"{minutes}:{seconds:02}"
@staticmethod
def _format_total_duration(value: Any) -> str:
if value in (None, ""):
return ""
try:
total_seconds = int(float(value))
except (TypeError, ValueError):
return str(value).strip()
if total_seconds <= 0:
return ""
minutes, seconds = divmod(total_seconds, 60)
hours, minutes = divmod(minutes, 60)
if hours:
return f"{hours}h {minutes:02}m {seconds:02}s"
return f"{minutes}m {seconds:02}s"
@staticmethod
def _format_release_date(value: Any) -> str:
if value in (None, ""):
return ""
text = str(value).strip()
match = re.fullmatch(r"(?P<year>\d{4})(?:-(?P<month>\d{2})-(?P<day>\d{2}))?.*", text)
if not match or not match.group("month"):
return text
try:
year = int(match.group("year"))
month = int(match.group("month"))
day = int(match.group("day"))
except (TypeError, ValueError):
return text
month_names = (
"January",
"February",
"March",
"April",
"May",
"June",
"July",
"August",
"September",
"October",
"November",
"December",
)
if month < 1 or month > 12 or day < 1 or day > 31:
return text
return f"{month_names[month - 1]} {day}, {year}"
@classmethod
def _quality_summary(cls, value: Any, *, lossless: bool = False, hires: bool = False) -> str:
text = str(value or "").strip()
lowered = text.lower()
if not text:
if hires and lossless:
return "Hi-Res Lossless"
if lossless:
return "Lossless"
return ""
if "atmos" in lowered:
return "Dolby Atmos"
if any(codec in lowered for codec in ("flac", "alac", "wav", "aiff")):
return "Hi-Res Lossless" if cls._is_hires_quality(text) else "Lossless"
if "lossless" in lowered:
return "Hi-Res Lossless" if "hi-res" in lowered or hires else "Lossless"
if "aac" in lowered:
return "AAC"
if "mp3" in lowered:
return "MP3"
return text
@staticmethod
def _is_hires_quality(value: str) -> bool:
lowered = value.lower()
bit_depth = None
sample_rate = None
bit_match = re.search(r"(?P<bits>\d+)\s*[- ]?bit", lowered)
if bit_match:
bit_depth = int(bit_match.group("bits"))
sample_match = re.search(r"(?P<rate>\d+(?:\.\d+)?)\s*k(?:hz)?", lowered)
if sample_match:
sample_rate = float(sample_match.group("rate"))
return bool((bit_depth and bit_depth > 16) or (sample_rate and sample_rate > 48))
@staticmethod
def _sum_duration(music: Music) -> int:
total = 0
for song in music:
data = song.data if isinstance(song.data, dict) else {}
metadata = data.get("metadata") if isinstance(data.get("metadata"), dict) else {}
value = MusicRenderer._first_value(data.get("duration"), metadata.get("duration"))
try:
total += int(float(value or 0))
except (TypeError, ValueError):
continue
return total
@staticmethod
def _max_value(music: Music, attr: str) -> int:
values = [getattr(song, attr, 0) or 0 for song in music]
return max(values, default=0)
__all__ = ("MusicRenderer",)

View File

@@ -0,0 +1,345 @@
from __future__ import annotations
import logging
from dataclasses import dataclass
from io import BytesIO
from pathlib import Path
from typing import Any, Optional
from unshackle.core.config import config
try:
from PIL import Image
except ImportError: # pragma: no cover - optional artwork enhancement
Image = None
try:
from mutagen.flac import FLAC, Picture
from mutagen.id3 import (
APIC,
COMM,
ID3NoHeaderError,
TALB,
TCOM,
TCON,
TCOP,
TDRC,
TIT2,
TPE1,
TPE2,
TPOS,
TRCK,
TSRC,
TXXX,
)
from mutagen.mp3 import MP3
from mutagen.mp4 import MP4, MP4Cover
except ImportError: # pragma: no cover - optional tagging dependency
FLAC = Picture = None
MP3 = MP4 = MP4Cover = None
ID3NoHeaderError = Exception
APIC = COMM = TALB = TCOM = TCON = TCOP = TDRC = TIT2 = TPE1 = TPE2 = TPOS = TRCK = TSRC = TXXX = None
log = logging.getLogger("MUSIC_TAGGER")
@dataclass
class MusicMetadataResult:
written: bool = False
artwork_embedded: bool = False
skipped: bool = False
reason: str = ""
def to_dict(self) -> dict[str, Any]:
return {
"written": self.written,
"artwork_embedded": self.artwork_embedded,
"skipped": self.skipped,
"reason": self.reason,
}
def write_music_metadata(path: Path, song: Any, *, session: Any = None, source_md5: str = "") -> MusicMetadataResult:
"""Write normalized music metadata for FLAC, MP3, and M4A/MP4 audio files."""
path = Path(path)
extension = path.suffix.lower()
if extension not in {".flac", ".mp3", ".m4a", ".mp4"}:
return MusicMetadataResult(skipped=True, reason=f"Unsupported music metadata container: {extension}")
if extension == ".flac" and FLAC is None:
return MusicMetadataResult(skipped=True, reason="install mutagen to write FLAC tags")
if extension == ".mp3" and MP3 is None:
return MusicMetadataResult(skipped=True, reason="install mutagen to write MP3 tags")
if extension in {".m4a", ".mp4"} and MP4 is None:
return MusicMetadataResult(skipped=True, reason="install mutagen to write MP4/M4A tags")
tags = _build_tag_values(song, source_md5=source_md5)
artwork_url = _first_text(getattr(song, "artwork_url", None), _metadata(song).get("artwork_url"))
cover_data, mime_type = _download_cover(session, artwork_url)
if extension == ".flac":
_write_flac_tags(path, tags, cover_data, mime_type)
elif extension == ".mp3":
_write_mp3_tags(path, tags, cover_data, mime_type)
else:
_write_mp4_tags(path, tags, cover_data, mime_type)
return MusicMetadataResult(written=True, artwork_embedded=bool(cover_data))
def _build_tag_values(song: Any, *, source_md5: str = "") -> dict[str, str]:
metadata = _metadata(song)
track_number = _string_tag(getattr(song, "track", None) or metadata.get("track_number"))
track_total = _string_tag(getattr(song, "total_tracks", None) or metadata.get("total_tracks"))
disc_number = _string_tag(getattr(song, "disc", None) or metadata.get("disc_number"))
disc_total = _string_tag(getattr(song, "total_discs", None) or metadata.get("total_discs"))
explicit = _as_bool(getattr(song, "explicit", None), metadata.get("explicit"), metadata.get("parental_warning"))
tags = {
"TITLE": _first_text(getattr(song, "name", None), metadata.get("title")),
"ARTIST": _first_text(getattr(song, "artist", None), metadata.get("artist"), metadata.get("performer")),
"ALBUM": _first_text(getattr(song, "album", None), metadata.get("album")),
"ALBUMARTIST": _first_text(getattr(song, "album_artist", None), metadata.get("album_artist")),
"TRACKNUMBER": f"{track_number}/{track_total}" if track_number and track_total else track_number,
"TRACKTOTAL": track_total,
"DISCNUMBER": f"{disc_number}/{disc_total}" if disc_number and disc_total else disc_number,
"DISCTOTAL": disc_total,
"DATE": _string_tag(metadata.get("release_date") or metadata.get("year") or getattr(song, "year", None)),
"RELEASEDATE": _string_tag(metadata.get("release_date")),
"GENRE": _first_text(getattr(song, "genre", None), metadata.get("genre")),
"COMPOSER": _first_text(metadata.get("composer")),
"PERFORMER": _first_text(metadata.get("performer")),
"ISRC": _string_tag(getattr(song, "isrc", None) or metadata.get("isrc")),
"BARCODE": _string_tag(getattr(song, "upc", None) or metadata.get("upc") or metadata.get("barcode")),
"UPC": _string_tag(getattr(song, "upc", None) or metadata.get("upc") or metadata.get("barcode")),
"COPYRIGHT": _string_tag(getattr(song, "copyright", None) or metadata.get("copyright")),
"LABEL": _first_text(getattr(song, "label", None), metadata.get("label")),
"COMMENT": _first_text(metadata.get("comment"), metadata.get("quality")),
"SOURCE": _first_text(metadata.get("source"), metadata.get("service")),
"ENCODEDBY": "Unshackle",
"UNSHACKLE_SOURCE_MD5": source_md5,
}
if explicit:
tags["EXPLICIT"] = "1"
tags["ITUNESADVISORY"] = "1"
if config.tag and config.tag_group_name:
tags["GROUP"] = config.tag
service = _first_text(metadata.get("service"), metadata.get("source"))
if service:
prefix = service.upper().replace(" ", "_").replace("-", "_")
if metadata.get("track_id"):
tags[f"{prefix}_TRACK_ID"] = _string_tag(metadata.get("track_id"))
if metadata.get("album_id"):
tags[f"{prefix}_ALBUM_ID"] = _string_tag(metadata.get("album_id"))
if metadata.get("track_url"):
tags[f"{prefix}_TRACK_URL"] = _string_tag(metadata.get("track_url"))
if metadata.get("album_url"):
tags[f"{prefix}_ALBUM_URL"] = _string_tag(metadata.get("album_url"))
return {key: value for key, value in tags.items() if value}
def _write_flac_tags(path: Path, tags: dict[str, str], cover_data: Optional[bytes], mime_type: str) -> None:
audio = FLAC(path)
for key, value in tags.items():
audio[key] = [value]
if cover_data and Picture is not None:
picture = Picture()
picture.type = 3
picture.mime = mime_type or "image/jpeg"
picture.desc = "Cover"
picture.data = cover_data
if Image is not None:
try:
with Image.open(BytesIO(cover_data)) as image:
picture.width, picture.height = image.size
picture.depth = len(image.getbands()) * 8
except Exception:
pass
audio.clear_pictures()
audio.add_picture(picture)
audio.save()
def _write_mp3_tags(path: Path, tags: dict[str, str], cover_data: Optional[bytes], mime_type: str) -> None:
try:
audio = MP3(path)
except ID3NoHeaderError:
audio = MP3(path)
audio.add_tags()
if audio.tags is None:
audio.add_tags()
frame_map = {
"TITLE": ("TIT2", TIT2),
"ARTIST": ("TPE1", TPE1),
"ALBUM": ("TALB", TALB),
"ALBUMARTIST": ("TPE2", TPE2),
"TRACKNUMBER": ("TRCK", TRCK),
"DISCNUMBER": ("TPOS", TPOS),
"DATE": ("TDRC", TDRC),
"GENRE": ("TCON", TCON),
"COMPOSER": ("TCOM", TCOM),
"ISRC": ("TSRC", TSRC),
"COPYRIGHT": ("TCOP", TCOP),
}
for key, (frame_id, frame_class) in frame_map.items():
value = tags.get(key)
if value and frame_class is not None:
audio.tags.setall(frame_id, [frame_class(encoding=3, text=[value])])
if tags.get("COMMENT") and COMM is not None:
audio.tags.setall("COMM", [COMM(encoding=3, lang="eng", desc="", text=[tags["COMMENT"]])])
if cover_data and APIC is not None:
audio.tags.delall("APIC")
audio.tags.add(APIC(encoding=3, mime=mime_type or "image/jpeg", type=3, desc="Cover", data=cover_data))
custom_keys = set(tags) - set(frame_map) - {"COMMENT"}
for key in sorted(custom_keys):
if TXXX is not None and tags[key]:
audio.tags.delall(f"TXXX:{key}")
audio.tags.add(TXXX(encoding=3, desc=key, text=[tags[key]]))
audio.save()
def _write_mp4_tags(path: Path, tags: dict[str, str], cover_data: Optional[bytes], mime_type: str) -> None:
audio = MP4(path)
if tags.get("TITLE"):
audio["\xa9nam"] = [tags["TITLE"]]
if tags.get("ARTIST"):
audio["\xa9ART"] = [tags["ARTIST"]]
if tags.get("ALBUM"):
audio["\xa9alb"] = [tags["ALBUM"]]
if tags.get("ALBUMARTIST"):
audio["aART"] = [tags["ALBUMARTIST"]]
if tags.get("DATE"):
audio["\xa9day"] = [tags["DATE"]]
if tags.get("GENRE"):
audio["\xa9gen"] = [tags["GENRE"]]
if tags.get("COMPOSER"):
audio["\xa9wrt"] = [tags["COMPOSER"]]
if tags.get("COMMENT"):
audio["\xa9cmt"] = [tags["COMMENT"]]
if tags.get("COPYRIGHT"):
audio["cprt"] = [tags["COPYRIGHT"]]
track_number, track_total = _split_number_pair(tags.get("TRACKNUMBER", ""))
if track_number:
audio["trkn"] = [(track_number, track_total or 0)]
disc_number, disc_total = _split_number_pair(tags.get("DISCNUMBER", ""))
if disc_number:
audio["disk"] = [(disc_number, disc_total or 0)]
if cover_data and MP4Cover is not None:
image_format = MP4Cover.FORMAT_PNG if mime_type == "image/png" else MP4Cover.FORMAT_JPEG
audio["covr"] = [MP4Cover(cover_data, imageformat=image_format)]
mapped_keys = {
"TITLE",
"ARTIST",
"ALBUM",
"ALBUMARTIST",
"DATE",
"GENRE",
"COMPOSER",
"COMMENT",
"COPYRIGHT",
"TRACKNUMBER",
"DISCNUMBER",
}
for key in sorted(set(tags) - mapped_keys):
value = tags.get(key)
if value:
audio[f"----:com.apple.iTunes:{key}"] = [str(value).encode("utf-8")]
audio.save()
def _download_cover(session: Any, artwork_url: str) -> tuple[Optional[bytes], str]:
if not session or not artwork_url:
return None, ""
try:
with session.get(artwork_url, timeout=20) as response:
response.raise_for_status()
data = response.content
if not data:
return None, ""
content_type = str(response.headers.get("Content-Type") or "").split(";")[0].strip().lower()
return data, _mime_from_image(data, content_type or "image/jpeg")
except Exception as error:
log.debug("Music cover download failed for %s: %s", artwork_url, error)
return None, ""
def _metadata(song: Any) -> dict[str, Any]:
data = getattr(song, "data", None)
return data if isinstance(data, dict) else {}
def _first_text(*values: Any) -> str:
for value in values:
if isinstance(value, dict):
text = _first_text(value.get("name"), value.get("title"), value.get("display_name"), value.get("value"))
if text:
return text
elif isinstance(value, list):
parts = [_first_text(item) for item in value]
text = ", ".join(part for part in parts if part)
if text:
return text
else:
text = str(value or "").strip()
if text:
return text
return ""
def _string_tag(value: Any) -> str:
if value in (None, "", [], {}):
return ""
if isinstance(value, bool):
return "Explicit" if value else ""
return str(value).strip()
def _as_bool(*values: Any) -> bool:
for value in values:
if value in (None, "", [], {}):
continue
if isinstance(value, bool):
return value
if isinstance(value, (int, float)):
return value != 0
text = str(value).strip().lower()
if text in {"1", "true", "yes", "y", "explicit", "parental_warning"}:
return True
if text in {"0", "false", "no", "n", "clean"}:
return False
return False
def _mime_from_image(data: bytes, fallback: str = "image/jpeg") -> str:
if data.startswith(b"\x89PNG\r\n\x1a\n"):
return "image/png"
if data.startswith(b"\xff\xd8\xff"):
return "image/jpeg"
if data.startswith(b"RIFF") and b"WEBP" in data[:16]:
return "image/webp"
return fallback
def _split_number_pair(value: str) -> tuple[int, int]:
if not value:
return 0, 0
first, _, second = str(value).partition("/")
try:
number = int(first)
except ValueError:
number = 0
try:
total = int(second) if second else 0
except ValueError:
total = 0
return number, total
__all__ = ("MusicMetadataResult", "write_music_metadata")

View File

@@ -2,10 +2,10 @@ from typing import Union
from .episode import Episode, Series
from .movie import Movie, Movies
from .song import Album, Song
from .music import Album, Music, Song
Title_T = Union[Movie, Episode, Song]
Titles_T = Union[Movies, Series, Album]
Titles_T = Union[Movies, Series, Music, Album]
def remap_titles(titles: Titles_T, title_map: dict) -> Titles_T:
@@ -39,6 +39,7 @@ __all__ = (
"Series",
"Movie",
"Movies",
"Music",
"Album",
"Song",
"Title_T",

View File

@@ -0,0 +1,276 @@
import re
from abc import ABC
from typing import Any, Iterable, Optional, Union
from langcodes import Language
from pymediainfo import MediaInfo
from sortedcontainers import SortedKeyList
from unshackle.core.config import config
from unshackle.core.titles.title import Title
from unshackle.core.utilities import sanitize_filename
from unshackle.core.utils.template_formatter import TemplateFormatter
class Song(Title):
def __init__(
self,
id_: Any,
service: type,
name: str,
artist: str,
album: str,
track: int,
disc: int,
year: int,
language: Optional[Union[str, Language]] = None,
data: Optional[Any] = None,
album_artist: Optional[str] = None,
release_type: str = "album",
total_tracks: Optional[int] = None,
total_discs: Optional[int] = None,
genre: Optional[str] = None,
explicit: Optional[bool] = None,
isrc: Optional[str] = None,
upc: Optional[str] = None,
copyright: Optional[str] = None,
label: Optional[str] = None,
lyrics: Optional[str] = None,
artwork_url: Optional[str] = None,
) -> None:
super().__init__(id_, service, language, data)
if not name:
raise ValueError("Song name must be provided")
if not isinstance(name, str):
raise TypeError(f"Expected name to be a str, not {name!r}")
if not artist:
raise ValueError("Song artist must be provided")
if not isinstance(artist, str):
raise TypeError(f"Expected artist to be a str, not {artist!r}")
if not album:
raise ValueError("Song album must be provided")
if not isinstance(album, str):
raise TypeError(f"Expected album to be a str, not {album!r}")
if not track:
raise ValueError("Song track must be provided")
if not isinstance(track, int):
raise TypeError(f"Expected track to be an int, not {track!r}")
if not disc:
raise ValueError("Song disc must be provided")
if not isinstance(disc, int):
raise TypeError(f"Expected disc to be an int, not {disc!r}")
if not year:
raise ValueError("Song year must be provided")
if not isinstance(year, int):
raise TypeError(f"Expected year to be an int, not {year!r}")
if album_artist is not None and not isinstance(album_artist, str):
raise TypeError(f"Expected album_artist to be a str, not {album_artist!r}")
if not isinstance(release_type, str):
raise TypeError(f"Expected release_type to be a str, not {release_type!r}")
if total_tracks is not None and not isinstance(total_tracks, int):
raise TypeError(f"Expected total_tracks to be an int, not {total_tracks!r}")
if total_discs is not None and not isinstance(total_discs, int):
raise TypeError(f"Expected total_discs to be an int, not {total_discs!r}")
if genre is not None and not isinstance(genre, str):
raise TypeError(f"Expected genre to be a str, not {genre!r}")
if explicit is not None and not isinstance(explicit, bool):
raise TypeError(f"Expected explicit to be a bool, not {explicit!r}")
if isrc is not None and not isinstance(isrc, str):
raise TypeError(f"Expected isrc to be a str, not {isrc!r}")
if upc is not None and not isinstance(upc, str):
raise TypeError(f"Expected upc to be a str, not {upc!r}")
if copyright is not None and not isinstance(copyright, str):
raise TypeError(f"Expected copyright to be a str, not {copyright!r}")
if label is not None and not isinstance(label, str):
raise TypeError(f"Expected label to be a str, not {label!r}")
if lyrics is not None and not isinstance(lyrics, str):
raise TypeError(f"Expected lyrics to be a str, not {lyrics!r}")
if artwork_url is not None and not isinstance(artwork_url, str):
raise TypeError(f"Expected artwork_url to be a str, not {artwork_url!r}")
name = name.strip()
artist = artist.strip()
album = album.strip()
album_artist = album_artist.strip() if album_artist else None
release_type = release_type.strip().lower()
genre = genre.strip() if genre else None
isrc = isrc.strip() if isrc else None
upc = upc.strip() if upc else None
copyright = copyright.strip() if copyright else None
label = label.strip() if label else None
lyrics = lyrics.strip() if lyrics else None
artwork_url = artwork_url.strip() if artwork_url else None
if track <= 0:
raise ValueError(f"Song track cannot be {track}")
if disc <= 0:
raise ValueError(f"Song disc cannot be {disc}")
if year <= 0:
raise ValueError(f"Song year cannot be {year}")
if not release_type:
raise ValueError("Song release_type must be provided")
if total_tracks is not None and total_tracks <= 0:
raise ValueError(f"Song total_tracks cannot be {total_tracks}")
if total_discs is not None and total_discs <= 0:
raise ValueError(f"Song total_discs cannot be {total_discs}")
self.name = name
self.artist = artist
self.album = album
self.track = track
self.disc = disc
self.year = year
self.album_artist = album_artist
self.release_type = release_type
self.total_tracks = total_tracks
self.total_discs = total_discs
self.genre = genre
self.explicit = explicit
self.isrc = isrc
self.upc = upc
self.copyright = copyright
self.label = label
self.lyrics = lyrics
self.artwork_url = artwork_url
def __str__(self) -> str:
return "{artist} - {album} ({year}) / {track:02}. {name}".format(
artist=self.artist, album=self.album, year=self.year, track=self.track, name=self.name
).strip()
def _build_template_context(self, media_info: MediaInfo, show_service: bool = True) -> dict:
"""Build template context dictionary from MediaInfo."""
context = self._build_base_template_context(media_info, show_service)
context["title"] = self.name.replace("$", "S")
context["year"] = self.year or ""
context["track_number"] = f"{self.track:02}"
context["artist"] = self.artist.replace("$", "S")
context["album_artist"] = (self.album_artist or self.artist).replace("$", "S")
context["album"] = self.album.replace("$", "S")
context["disc"] = f"{self.disc:02}" if self.disc > 1 else ""
context["track_total"] = f"{self.total_tracks:02}" if self.total_tracks else ""
context["disc_total"] = f"{self.total_discs:02}" if self.total_discs else ""
context["release_type"] = self.release_type
context["genre"] = self.genre or ""
context["explicit"] = "Explicit" if self.explicit else ""
context["isrc"] = self.isrc or ""
context["upc"] = self.upc or ""
context["label"] = self.label or ""
return context
def get_filename(self, media_info: MediaInfo, folder: bool = False, show_service: bool = True) -> str:
if folder:
# Album folder name: prefer the dedicated "albums" template, fall back to the
# legacy "songs" folder template, then to "{artist} - {album} ({year})".
template = config.get_folder_template("albums") or config.get_folder_template("songs")
if template:
formatter = TemplateFormatter(template)
context = self._build_template_context(media_info, show_service)
folder_name = formatter.format(context)
separators = re.sub(r"\{[^}]*\}", "", template)
spacer = "." if "." in separators and " " not in separators else " "
return sanitize_filename(folder_name, spacer)
name = f"{self.artist} - {self.album}"
if self.year:
name += f" ({self.year})"
return sanitize_filename(name, " ")
template = config.output_template.get("songs") or "{track_number}. {title}"
formatter = TemplateFormatter(template)
context = self._build_template_context(media_info, show_service)
return formatter.format(context)
class Music(SortedKeyList, ABC):
"""A grouped music release, such as an album, EP, single, compilation, or playlist."""
def __new__(cls, *args: Any, **kwargs: Any):
return super().__new__(cls)
def __init__(
self,
iterable: Optional[Iterable] = None,
kind: str = "album",
title: Optional[str] = None,
artist: Optional[str] = None,
year: Optional[int] = None,
total_tracks: Optional[int] = None,
total_discs: Optional[int] = None,
artwork_url: Optional[str] = None,
total_duration: Optional[int] = None,
owner: Optional[str] = None,
description: Optional[str] = None,
):
if not isinstance(kind, str):
raise TypeError(f"Expected kind to be a str, not {kind!r}")
if title is not None and not isinstance(title, str):
raise TypeError(f"Expected title to be a str, not {title!r}")
if artist is not None and not isinstance(artist, str):
raise TypeError(f"Expected artist to be a str, not {artist!r}")
if year is not None and not isinstance(year, int):
raise TypeError(f"Expected year to be an int, not {year!r}")
if total_tracks is not None and not isinstance(total_tracks, int):
raise TypeError(f"Expected total_tracks to be an int, not {total_tracks!r}")
if total_discs is not None and not isinstance(total_discs, int):
raise TypeError(f"Expected total_discs to be an int, not {total_discs!r}")
if artwork_url is not None and not isinstance(artwork_url, str):
raise TypeError(f"Expected artwork_url to be a str, not {artwork_url!r}")
if total_duration is not None and not isinstance(total_duration, int):
raise TypeError(f"Expected total_duration to be an int, not {total_duration!r}")
if owner is not None and not isinstance(owner, str):
raise TypeError(f"Expected owner to be a str, not {owner!r}")
if description is not None and not isinstance(description, str):
raise TypeError(f"Expected description to be a str, not {description!r}")
super().__init__(iterable, key=lambda x: (x.album, x.disc, x.track, x.year or 0))
kind = kind.strip().lower()
if not kind:
raise ValueError("Music kind must be provided")
if year is not None and year <= 0:
raise ValueError(f"Music year cannot be {year}")
if total_tracks is not None and total_tracks <= 0:
raise ValueError(f"Music total_tracks cannot be {total_tracks}")
if total_discs is not None and total_discs <= 0:
raise ValueError(f"Music total_discs cannot be {total_discs}")
if total_duration is not None and total_duration < 0:
raise ValueError(f"Music total_duration cannot be {total_duration}")
self.kind = kind
self.title = title.strip() if title else None
self.artist = artist.strip() if artist else None
self.year = year
self.total_tracks = total_tracks
self.total_discs = total_discs
self.artwork_url = artwork_url.strip() if artwork_url else None
self.total_duration = total_duration
self.owner = owner.strip() if owner else None
self.description = description.strip() if description else None
def __str__(self) -> str:
if not self:
return super().__str__()
first_song = self[0]
artist = self.artist or getattr(first_song, "album_artist", None) or first_song.artist
title = self.title or first_song.album
year = self.year or first_song.year or "?"
return f"{artist} - {title} ({year})"
def tree(self, verbose: bool = False) -> Any:
from unshackle.core.music.renderer import MusicRenderer
return MusicRenderer().render(self, verbose=verbose)
class Album(Music):
"""Backward-compatible collection name for album-style music releases."""
__all__ = ("Song", "Music", "Album")

View File

@@ -154,7 +154,11 @@ class Audio(Track):
@property
def atmos(self) -> bool:
"""Return True if the audio track contains Dolby Atmos."""
return bool(self.joc)
if self.joc:
return True
if isinstance(self.extra, dict):
return bool(self.extra.get("atmos") or self.extra.get("joc"))
return False
def __str__(self) -> str:
return " | ".join(

View File

@@ -70,14 +70,18 @@ output_template:
# folder: '{title} ({year?})'
#
# Per-title-type folder templates (optional). Override folder naming separately for
# movies, series, and songs. Useful when music libraries need artist/album-style folders
# movies, series, and albums. Useful when music libraries need artist/album-style folders
# while movies/series follow a different scheme. Any kind omitted falls back to the
# default for that title type.
#
# For music: "songs" (above) names each track file, while "albums" (below) names the
# album folder the tracks are saved into. Album folder vars: {album_artist}, {album},
# {artist}, {year}, {genre}, {label}, {release_type}, {track_total}, {disc_total}.
#
# folder:
# movies: '{title} ({year})'
# series: '{title} ({year?})'
# songs: '{artist}/{album} ({year?})'
# albums: '{album_artist} - {album} ({year?})'
# Language-based tagging for output filenames
# Automatically adds language identifiers (e.g., DANiSH, NORDiC, DKsubs) based on

22
uv.lock generated
View File

@@ -660,6 +660,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/7f/ed/e3705d6d02b4f7aea715a353c8ce193efd0b5db13e204df895d38734c244/isort-7.0.0-py3-none-any.whl", hash = "sha256:1bcabac8bc3c36c7fb7b98a76c8abb18e0f841a3ba81decac7691008592499c1", size = 94672, upload-time = "2025-10-11T13:30:57.665Z" },
]
[[package]]
name = "jsonpath-ng"
version = "1.8.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/32/58/250751940d75c8019659e15482d548a4aa3b6ce122c515102a4bfdac50e3/jsonpath_ng-1.8.0.tar.gz", hash = "sha256:54252968134b5e549ea5b872f1df1168bd7defe1a52fed5a358c194e1943ddc3", size = 74513, upload-time = "2026-02-24T14:42:06.182Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/03/99/33c7d78a3fb70d545fd5411ac67a651c81602cc09c9cf0df383733f068c5/jsonpath_ng-1.8.0-py3-none-any.whl", hash = "sha256:b8dde192f8af58d646fc031fac9c99fe4d00326afc4148f1f043c601a8cfe138", size = 67844, upload-time = "2026-02-28T00:53:19.637Z" },
]
[[package]]
name = "jsonpickle"
version = "4.1.1"
@@ -934,6 +943,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/81/08/7036c080d7117f28a4af526d794aab6a84463126db031b007717c1a6676e/multidict-6.7.1-py3-none-any.whl", hash = "sha256:55d97cc6dae627efa6a6e548885712d4864b81110ac76fa4e534c03819fa4a56", size = 12319, upload-time = "2026-01-26T02:46:44.004Z" },
]
[[package]]
name = "mutagen"
version = "1.47.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/81/e6/64bc71b74eef4b68e61eb921dcf72dabd9e4ec4af1e11891bbd312ccbb77/mutagen-1.47.0.tar.gz", hash = "sha256:719fadef0a978c31b4cf3c956261b3c58b6948b32023078a2117b1de09f0fc99", size = 1274186, upload-time = "2023-09-03T16:33:33.411Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/b0/7a/620f945b96be1f6ee357d211d5bf74ab1b7fe72a9f1525aafbfe3aee6875/mutagen-1.47.0-py3-none-any.whl", hash = "sha256:edd96f50c5907a9539d8e5bba7245f62c9f520aef333d13392a79a4f70aca719", size = 194391, upload-time = "2023-09-03T16:33:29.955Z" },
]
[[package]]
name = "mypy"
version = "1.19.1"
@@ -1778,10 +1796,12 @@ dependencies = [
{ name = "defusedxml" },
{ name = "filelock" },
{ name = "fonttools" },
{ name = "jsonpath-ng" },
{ name = "jsonpickle" },
{ name = "langcodes" },
{ name = "language-data" },
{ name = "lxml" },
{ name = "mutagen" },
{ name = "protobuf" },
{ name = "pycaption" },
{ name = "pycountry" },
@@ -1844,10 +1864,12 @@ requires-dist = [
{ name = "defusedxml", specifier = ">=0.7.1" },
{ name = "filelock", specifier = ">=3.20.3,<4" },
{ name = "fonttools", specifier = ">=4.60.2,<5" },
{ name = "jsonpath-ng", specifier = ">=1.8.0" },
{ name = "jsonpickle", specifier = ">=3.0.4,<5" },
{ name = "langcodes", specifier = ">=3.4.0,<4" },
{ name = "language-data", specifier = ">=1.4.0" },
{ name = "lxml", specifier = ">=5.2.1,<7" },
{ name = "mutagen", specifier = ">=1.47.0,<2" },
{ name = "protobuf", specifier = ">=4.25.3,<7" },
{ name = "pycaption", specifier = ">=2.2.6,<3" },
{ name = "pycountry", specifier = ">=24.6.1" },