diff --git a/zotify/__pycache__/album.cpython-314.pyc b/zotify/__pycache__/album.cpython-314.pyc index 62df1d0..59d5ada 100644 Binary files a/zotify/__pycache__/album.cpython-314.pyc and b/zotify/__pycache__/album.cpython-314.pyc differ diff --git a/zotify/__pycache__/app.cpython-314.pyc b/zotify/__pycache__/app.cpython-314.pyc index ec9a8a3..eca41c2 100644 Binary files a/zotify/__pycache__/app.cpython-314.pyc and b/zotify/__pycache__/app.cpython-314.pyc differ diff --git a/zotify/__pycache__/playlist.cpython-314.pyc b/zotify/__pycache__/playlist.cpython-314.pyc index fe457d9..3065328 100644 Binary files a/zotify/__pycache__/playlist.cpython-314.pyc and b/zotify/__pycache__/playlist.cpython-314.pyc differ diff --git a/zotify/__pycache__/track.cpython-314.pyc b/zotify/__pycache__/track.cpython-314.pyc index bb2d3ba..31114db 100644 Binary files a/zotify/__pycache__/track.cpython-314.pyc and b/zotify/__pycache__/track.cpython-314.pyc differ diff --git a/zotify/album.py b/zotify/album.py index 9ede74a..16f3f8d 100644 --- a/zotify/album.py +++ b/zotify/album.py @@ -1,5 +1,5 @@ from zotify.const import ITEMS, ARTISTS, NAME, ID -from zotify.termoutput import Printer +from zotify.termoutput import Printer, PrintChannel from zotify.track import download_track from zotify.utils import fix_filename from zotify.zotify import Zotify @@ -71,9 +71,13 @@ def download_album(album): } album_multi_disc = len(disc_numbers) > 1 + downloaded = 0 + skipped = 0 + errored = 0 + for n, track in Printer.progress(enumerate(tracks, start=1), unit_scale=True, unit='Song', total=len(tracks)): # Only pass dynamic numbering and album_id (useful for custom templates using {album_id}). - download_track( + result = download_track( 'album', track[ID], extra_keys={ @@ -85,6 +89,19 @@ def download_album(album): disable_progressbar=True ) + if result == 'downloaded': + downloaded += 1 + elif result == 'skipped': + skipped += 1 + else: + errored += 1 + + total = len(tracks) + Printer.print( + PrintChannel.PROGRESS_INFO, + f'\n#######################################\nFinished! Here is your album summary :\ndownloaded {downloaded}/{total} | skipped {skipped}/{total} | errored {errored}/{total}\n#######################################\n' + ) + def download_artist_albums(artist): """ Downloads albums of an artist """ diff --git a/zotify/app.py b/zotify/app.py index b103829..61c5231 100644 --- a/zotify/app.py +++ b/zotify/app.py @@ -103,8 +103,19 @@ def download_from_urls(urls: list[str]) -> bool: download = True playlist_songs = get_playlist_songs(playlist_id) name, _ = get_playlist_info(playlist_id) - enum = 1 - char_num = len(str(len(playlist_songs))) + track_items = [] + for song in playlist_songs: + track_obj = song.get(TRACK) if isinstance(song, dict) else None + if track_obj and track_obj.get(ID) and track_obj.get(TYPE) != "episode": + track_items.append(song) + + expected_total = len(track_items) + downloaded_count = 0 + skipped_count = 0 + errored_count = 0 + + track_enum = 1 + char_num = len(str(expected_total if expected_total else 1)) for song in playlist_songs: track_obj = song.get(TRACK) if isinstance(song, dict) else None if not track_obj or not track_obj.get(NAME) or not track_obj.get(ID): @@ -113,16 +124,27 @@ def download_from_urls(urls: list[str]) -> bool: if track_obj.get(TYPE) == "episode": # Playlist item is a podcast episode download_episode(track_obj[ID]) else: - download_track('playlist', track_obj[ID], extra_keys= + result = download_track('playlist', track_obj[ID], extra_keys= { 'playlist_song_name': track_obj[NAME], 'playlist': name, - 'playlist_num': str(enum).zfill(char_num), - 'playlist_total': str(len(playlist_songs)), + 'playlist_num': str(track_enum).zfill(char_num), + 'playlist_total': str(expected_total), 'playlist_id': playlist_id, 'playlist_track_id': track_obj[ID] }) - enum += 1 + track_enum += 1 + if result == 'downloaded': + downloaded_count += 1 + elif result == 'skipped': + skipped_count += 1 + else: + errored_count += 1 + + Printer.print( + PrintChannel.PROGRESS_INFO, + f'\n### PLAYLIST SUMMARY: downloaded {downloaded_count}/{expected_total} | skipped {skipped_count}/{expected_total} | errored {errored_count}/{expected_total} ###\n' + ) elif episode_id is not None: download = True download_episode(episode_id) diff --git a/zotify/config.py b/zotify/config.py index e2487d6..ec20213 100644 --- a/zotify/config.py +++ b/zotify/config.py @@ -3,7 +3,7 @@ import sys from pathlib import Path, PurePath from typing import Any -ZOTIFY_VERSION = "0.2.1" +ZOTIFY_VERSION = "0.2.2" ROOT_PATH = 'ROOT_PATH' ROOT_PODCAST_PATH = 'ROOT_PODCAST_PATH' SKIP_EXISTING = 'SKIP_EXISTING' diff --git a/zotify/playlist.py b/zotify/playlist.py index 043aebf..dd456b7 100644 --- a/zotify/playlist.py +++ b/zotify/playlist.py @@ -1,5 +1,5 @@ from zotify.const import ITEMS, ID, TRACK, NAME -from zotify.termoutput import Printer +from zotify.termoutput import Printer, PrintChannel from zotify.track import download_track from zotify.utils import split_input from zotify.zotify import Zotify @@ -60,24 +60,40 @@ def download_playlist(playlist): pl_name, _ = get_playlist_info(playlist[ID]) playlist_songs = [song for song in get_playlist_songs(playlist[ID]) if song[TRACK] is not None and song[TRACK][ID]] - p_bar = Printer.progress(playlist_songs, unit='song', total=len(playlist_songs), unit_scale=True) + total = len(playlist_songs) + downloaded = 0 + skipped = 0 + errored = 0 + + p_bar = Printer.progress(playlist_songs, unit='song', total=total, unit_scale=True) enum = 1 for song in p_bar: # Use localized playlist name; track metadata (artist/title) is localized in download_track via locale - download_track( + result = download_track( 'extplaylist', song[TRACK][ID], extra_keys={ 'playlist': pl_name, 'playlist_num': str(enum).zfill(2), - 'playlist_total': str(len(playlist_songs)), + 'playlist_total': str(total), 'playlist_id': playlist[ID], }, disable_progressbar=True ) + if result == 'downloaded': + downloaded += 1 + elif result == 'skipped': + skipped += 1 + else: + errored += 1 p_bar.set_description(song[TRACK][NAME]) enum += 1 + Printer.print( + PrintChannel.PROGRESS_INFO, + f'\n#######################################\nFinished! Here is your playlist summary :\ndownloaded {downloaded}/{total} | skipped {skipped}/{total} | errored {errored}/{total}\n#######################################\n' + ) + def download_from_user_playlist(): """ Select which playlist(s) to download """ diff --git a/zotify/track.py b/zotify/track.py index 08c432f..53e7ba2 100644 --- a/zotify/track.py +++ b/zotify/track.py @@ -1,25 +1,29 @@ from pathlib import Path, PurePath -from typing import Any, Tuple, List, Optional +from typing import Any, Tuple, List, Optional, Literal + +import json +import math +import re +import time +import traceback +import uuid + +import ffmpy from librespot.metadata import TrackId + from zotify.const import TRACKS, ALBUM, GENRES, NAME, ITEMS, DISC_NUMBER, TRACK_NUMBER, IS_PLAYABLE, ARTISTS, IMAGES, URL, \ RELEASE_DATE, ID, TRACKS_URL, FOLLOWED_ARTISTS_URL, SAVED_TRACKS_URL, TRACK_STATS_URL, CODEC_MAP, EXT_MAP, DURATION_MS, \ - HREF, ARTISTS, WIDTH + HREF, WIDTH +from zotify.loader import Loader from zotify.termoutput import Printer, PrintChannel from zotify.utils import fix_filename, set_audio_tags, set_music_thumbnail, create_download_directory, \ get_directory_song_ids, add_to_directory_song_ids, get_previously_downloaded, add_to_archive, fmt_seconds from zotify.zotify import Zotify -import traceback -from zotify.loader import Loader -import math -import re -import time -import uuid -import json -import ffmpy -# Track whether we've already applied the OGG delay for bulk (album/playlist) downloads _ogg_delay_applied_once = False +DownloadTrackResult = Literal['downloaded', 'skipped', 'errored'] + def get_saved_tracks() -> list: songs = [] offset = 0 @@ -79,11 +83,9 @@ def ensure_spoticlub_credentials() -> None: spoticlub_user = data.get('spoticlub_user') or '' spoticlub_password = data.get('spoticlub_password') or '' - # If any credential value is missing, prompt the user if not spoticlub_user or not spoticlub_password: Printer.print(PrintChannel.PROGRESS_INFO, '\nSpotiClub credentials not found. Please enter them now.') spoticlub_user = input('SpotiClub username: ').strip() - # Basic loop to avoid empty submissions while not spoticlub_user: spoticlub_user = input('SpotiClub username (cannot be empty): ').strip() @@ -163,7 +165,6 @@ def get_song_genres(rawartists: List[str], track_name: str) -> List[str]: elif artist_genres: genres.append(artist_genres[0]) - # De-duplicate while preserving order seen = set() genres = [g for g in genres if not (g in seen or seen.add(g))] @@ -238,284 +239,290 @@ def get_song_duration(song_id: str) -> float: return duration -def download_track(mode: str, track_id: str, extra_keys=None, disable_progressbar=False) -> None: +def download_track(mode: str, track_id: str, extra_keys=None, disable_progressbar=False) -> DownloadTrackResult: if extra_keys is None: extra_keys = {} - # Ensure SpotiClub credentials exist before starting any download prompts or loaders ensure_spoticlub_credentials() prepare_download_loader = Loader(PrintChannel.PROGRESS_INFO, "Preparing download...") prepare_download_loader.start() try: - output_template = str(Zotify.CONFIG.get_output(mode)) + try: + output_template = str(Zotify.CONFIG.get_output(mode)) + prepare_download_loader.stop() - prepare_download_loader.stop() + (artists, raw_artists, album_name, name, image_url, release_year, disc_number, + track_number, scraped_song_id, is_playable, duration_ms) = get_song_info(track_id) - (artists, raw_artists, album_name, name, image_url, release_year, disc_number, - track_number, scraped_song_id, is_playable, duration_ms) = get_song_info(track_id) + song_name = fix_filename(artists[0]) + ' - ' + fix_filename(name) - song_name = fix_filename(artists[0]) + ' - ' + fix_filename(name) + for k in extra_keys: + output_template = output_template.replace("{" + k + "}", fix_filename(extra_keys[k])) - for k in extra_keys: - output_template = output_template.replace("{"+k+"}", fix_filename(extra_keys[k])) + ext = EXT_MAP.get(Zotify.CONFIG.get_download_format().lower()) - ext = EXT_MAP.get(Zotify.CONFIG.get_download_format().lower()) + output_template = output_template.replace("{artist}", fix_filename(artists[0])) + output_template = output_template.replace("{album}", fix_filename(album_name)) + output_template = output_template.replace("{song_name}", fix_filename(name)) + output_template = output_template.replace("{release_year}", fix_filename(release_year)) + output_template = output_template.replace("{disc_number}", fix_filename(disc_number)) + output_template = output_template.replace("{track_number}", fix_filename(track_number)) + output_template = output_template.replace("{id}", fix_filename(scraped_song_id)) + output_template = output_template.replace("{track_id}", fix_filename(track_id)) + output_template = output_template.replace("{ext}", ext) + if mode == 'album' and Zotify.CONFIG.get_split_album_discs(): + flag_raw = extra_keys.get('album_multi_disc') + flag = str(flag_raw).strip().lower() in ('1', 'true', 'yes') + try: + disc_number_int = int(disc_number) + except Exception: + disc_number_int = 1 - output_template = output_template.replace("{artist}", fix_filename(artists[0])) - output_template = output_template.replace("{album}", fix_filename(album_name)) - output_template = output_template.replace("{song_name}", fix_filename(name)) - output_template = output_template.replace("{release_year}", fix_filename(release_year)) - output_template = output_template.replace("{disc_number}", fix_filename(disc_number)) - output_template = output_template.replace("{track_number}", fix_filename(track_number)) - output_template = output_template.replace("{id}", fix_filename(scraped_song_id)) - output_template = output_template.replace("{track_id}", fix_filename(track_id)) - output_template = output_template.replace("{ext}", ext) + should_create_disc_dir = flag or disc_number_int > 1 + if should_create_disc_dir: + tpl_path = PurePath(output_template) + disc_dir_name = f"Disc {disc_number_int}" + if disc_dir_name not in tpl_path.parts: + output_template = str(tpl_path.parent / disc_dir_name / tpl_path.name) - # SPLIT_ALBUM_DISCS should only create a Disc folder when the album truly has multiple discs. - # - When downloading via zotify.album.download_album(), we pass extra_keys['album_multi_disc']. - # - As a fallback, any track with disc_number > 1 implies multi-disc. - if mode == 'album' and Zotify.CONFIG.get_split_album_discs(): - flag_raw = extra_keys.get('album_multi_disc') - flag = str(flag_raw).strip().lower() in ('1', 'true', 'yes') - try: - disc_number_int = int(disc_number) - except Exception: - disc_number_int = 1 + filename = PurePath(Zotify.CONFIG.get_root_path()).joinpath(output_template) + filedir = PurePath(filename).parent - should_create_disc_dir = flag or disc_number_int > 1 - if should_create_disc_dir: - tpl_path = PurePath(output_template) - disc_dir_name = f"Disc {disc_number_int}" - if disc_dir_name not in tpl_path.parts: - output_template = str(tpl_path.parent / disc_dir_name / tpl_path.name) + filename_temp = filename + if Zotify.CONFIG.get_temp_download_dir() != '': + filename_temp = PurePath(Zotify.CONFIG.get_temp_download_dir()).joinpath( + f'zotify_{str(uuid.uuid4())}_{track_id}.{ext}' + ) - filename = PurePath(Zotify.CONFIG.get_root_path()).joinpath(output_template) - filedir = PurePath(filename).parent + check_name = Path(filename).is_file() and Path(filename).stat().st_size + check_id = scraped_song_id in get_directory_song_ids(filedir) + check_all_time = scraped_song_id in get_previously_downloaded() - filename_temp = filename - if Zotify.CONFIG.get_temp_download_dir() != '': - filename_temp = PurePath(Zotify.CONFIG.get_temp_download_dir()).joinpath(f'zotify_{str(uuid.uuid4())}_{track_id}.{ext}') + if not check_id and check_name: + stem = PurePath(filename).stem + ext_existing = PurePath(filename).suffix + base_prefix = str(PurePath(filedir).joinpath(stem)) + c = len([ + file + for file in Path(filedir).iterdir() + if str(file).startswith(base_prefix + "_") + ]) + 1 - check_name = Path(filename).is_file() and Path(filename).stat().st_size - check_id = scraped_song_id in get_directory_song_ids(filedir) - check_all_time = scraped_song_id in get_previously_downloaded() + filename = PurePath(filedir).joinpath(f'{stem}_{c}{ext_existing}') - # a song with the same name is installed - if not check_id and check_name: - stem = PurePath(filename).stem - ext = PurePath(filename).suffix - base_prefix = str(PurePath(filedir).joinpath(stem)) - c = len([ - file - for file in Path(filedir).iterdir() - if str(file).startswith(base_prefix + "_") - ]) + 1 + except Exception as e: + prepare_download_loader.stop() + Printer.print(PrintChannel.ERRORS, '### SKIPPING SONG - FAILED TO QUERY METADATA ###') + Printer.print(PrintChannel.ERRORS, 'Track_ID: ' + str(track_id)) + for k in extra_keys: + Printer.print(PrintChannel.ERRORS, k + ': ' + str(extra_keys[k])) + Printer.print(PrintChannel.ERRORS, "\n") + Printer.print(PrintChannel.ERRORS, str(e) + "\n") + Printer.print(PrintChannel.ERRORS, "".join(traceback.TracebackException.from_exception(e).format()) + "\n") + Printer.print(PrintChannel.PROGRESS_INFO, "Waiting to query Spotify API again.." + "\n") + time.sleep(10) + return download_track(mode, track_id, extra_keys) - # SpotiClub: Fix phantom files when colliding with existing names (-_.mp3) - filename = PurePath(filedir).joinpath(f'{stem}_{c}{ext}') - - - except Exception as e: - Printer.print(PrintChannel.ERRORS, '### SKIPPING SONG - FAILED TO QUERY METADATA ###') - Printer.print(PrintChannel.ERRORS, 'Track_ID: ' + str(track_id)) - for k in extra_keys: - Printer.print(PrintChannel.ERRORS, k + ': ' + str(extra_keys[k])) - Printer.print(PrintChannel.ERRORS, "\n") - Printer.print(PrintChannel.ERRORS, str(e) + "\n") - Printer.print(PrintChannel.ERRORS, "".join(traceback.TracebackException.from_exception(e).format()) + "\n") - Printer.print(PrintChannel.PROGRESS_INFO, "Waiting to query Spotify API again.." + "\n") - time.sleep(10) - return download_track(mode, track_id, extra_keys) - - else: try: if not is_playable: prepare_download_loader.stop() Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG IS UNAVAILABLE) ###' + "\n") - else: - if check_name and Zotify.CONFIG.get_skip_existing(): - prepare_download_loader.stop() - Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG ALREADY EXISTS) ###' + "\n") - if Zotify.CONFIG.get_always_check_lyrics() and Zotify.CONFIG.get_download_lyrics(): - try: - lyr_dir = Zotify.CONFIG.get_lyrics_location() or PurePath(filename).parent - lyr_name_tpl = Zotify.CONFIG.get_lyrics_filename() - lyr_name = lyr_name_tpl - lyr_name = lyr_name.replace('{artist}', fix_filename(artists[0])) - lyr_name = lyr_name.replace('{song_name}', fix_filename(name)) - lyr_name = lyr_name.replace('{album}', fix_filename(album_name)) - if Zotify.CONFIG.get_unique_lyrics_file(): - lrc_path = PurePath(lyr_dir).joinpath("lyrics.lrc") - else: - lrc_path = PurePath(lyr_dir).joinpath(f"{lyr_name}.lrc") - get_song_lyrics(track_id, lrc_path, title=name, artists=artists, album=album_name, duration_ms=duration_ms) - except ValueError: - Printer.print(PrintChannel.SKIPS, f"### LYRICS_UNAVAILABLE: Lyrics for {song_name} not available or API returned empty/errored response ###") + return 'skipped' - elif check_all_time and Zotify.CONFIG.get_skip_previously_downloaded(): - prepare_download_loader.stop() - Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG ALREADY DOWNLOADED ONCE) ###' + "\n") - if Zotify.CONFIG.get_always_check_lyrics() and Zotify.CONFIG.get_download_lyrics(): - try: - lyr_dir = Zotify.CONFIG.get_lyrics_location() or PurePath(filename).parent - lyr_name_tpl = Zotify.CONFIG.get_lyrics_filename() - lyr_name = lyr_name_tpl - lyr_name = lyr_name.replace('{artist}', fix_filename(artists[0])) - lyr_name = lyr_name.replace('{song_name}', fix_filename(name)) - lyr_name = lyr_name.replace('{album}', fix_filename(album_name)) - lrc_path = PurePath(lyr_dir).joinpath(f"{lyr_name}.lrc") - get_song_lyrics(track_id, lrc_path, title=name, artists=artists, album=album_name, duration_ms=duration_ms) - except ValueError: - Printer.print(PrintChannel.SKIPS, f"### LYRICS_UNAVAILABLE: Lyrics for {song_name} not available or API returned empty/errored response ###") - - else: - prog_prefix = '' - if mode == 'album': - cur = extra_keys.get('album_num') - total = extra_keys.get('album_total') - - if cur and not total: - # No info about total tracks? Let's query the album from Spotify's API in last resort - try: - album_id = extra_keys.get('album_id') - if album_id: - locale = Zotify.CONFIG.get_locale() - resp = Zotify.invoke_url_with_params( - f'https://api.spotify.com/v1/albums/{album_id}/tracks', - limit=1, - offset=0, - market='from_token', - locale=locale, - ) - total_val = resp.get('total') if isinstance(resp, dict) else None - if total_val is not None: - total = str(total_val) - except Exception: - total = total - - if cur and total: - prog_prefix = f'({cur}/{total}) ' - elif mode in ('playlist', 'extplaylist'): - cur = extra_keys.get('playlist_num') - total = extra_keys.get('playlist_total') - - if cur and not total: - # Same fallback for total tracks in playlist - try: - playlist_id = extra_keys.get('playlist_id') - if playlist_id: - locale = Zotify.CONFIG.get_locale() - resp = Zotify.invoke_url_with_params( - f'https://api.spotify.com/v1/playlists/{playlist_id}/tracks', - limit=1, - offset=0, - market='from_token', - locale=locale, - ) - total_val = resp.get('total') if isinstance(resp, dict) else None - if total_val is not None: - total = str(total_val) - except Exception: - total = total - - if cur and total: - prog_prefix = f'({cur}/{total}) ' - - Printer.print( - PrintChannel.PROGRESS_INFO, - f'\n### {prog_prefix}STARTING "{song_name}" ###\n' - ) - if ext == 'ogg': - # SpotiClub : TEMP? : For albums/playlists, wait 5 seconds between OGG tracks to avoid - # spamming the SpotiClub API for audio keys. - # Skip the very first track in the run and for single-track downloads. - global _ogg_delay_applied_once - if mode in ('album', 'playlist', 'extplaylist'): - if _ogg_delay_applied_once: - # Spammy log - # Printer.print(PrintChannel.PROGRESS_INFO, '\n## OGG File : Waiting 5 seconds before resuming... ##') - time.sleep(5) - else: - _ogg_delay_applied_once = True - if track_id != scraped_song_id: - track_id = scraped_song_id - track = TrackId.from_base62(track_id) - stream = Zotify.get_content_stream(track, Zotify.DOWNLOAD_QUALITY) - create_download_directory(filedir) - total_size = stream.input_stream.size - - prepare_download_loader.stop() - - time_start = time.time() - downloaded = 0 - with open(filename_temp, 'wb') as file, Printer.progress( - desc=song_name, - total=total_size, - unit='B', - unit_scale=True, - unit_divisor=1024, - disable=disable_progressbar - ) as p_bar: - b = 0 - while b < 5: - data = stream.input_stream.stream().read(Zotify.CONFIG.get_chunk_size()) - p_bar.update(file.write(data)) - downloaded += len(data) - b += 1 if data == b'' else 0 - if Zotify.CONFIG.get_download_real_time(): - delta_real = time.time() - time_start - delta_want = (downloaded / total_size) * (duration_ms/1000) - if delta_want > delta_real: - time.sleep(delta_want - delta_real) - - time_downloaded = time.time() - - genres = get_song_genres(raw_artists, name) - - lyrics_lines: Optional[List[str]] = None - if Zotify.CONFIG.get_download_lyrics(): - try: - lyr_dir = Zotify.CONFIG.get_lyrics_location() or PurePath(filename).parent - lyr_name_tpl = Zotify.CONFIG.get_lyrics_filename() - lyr_name = lyr_name_tpl - lyr_name = lyr_name.replace('{artist}', fix_filename(artists[0])) - lyr_name = lyr_name.replace('{song_name}', fix_filename(name)) - lyr_name = lyr_name.replace('{album}', fix_filename(album_name)) - if Zotify.CONFIG.get_unique_lyrics_file(): - lrc_path = PurePath(lyr_dir).joinpath("lyrics.lrc") - else: - lrc_path = PurePath(lyr_dir).joinpath(f"{lyr_name}.lrc") - lyrics_lines = get_song_lyrics( - track_id, lrc_path, title=name, artists=artists, album=album_name, duration_ms=duration_ms, write_file=True - ) - except ValueError: - Printer.print(PrintChannel.SKIPS, f"### LYRICS_UNAVAILABLE: Lyrics for {song_name} not available or API returned empty/errored response ###") - convert_audio_format(filename_temp) + if check_name and Zotify.CONFIG.get_skip_existing(): + prepare_download_loader.stop() + Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG ALREADY EXISTS) ###' + "\n") + if Zotify.CONFIG.get_always_check_lyrics() and Zotify.CONFIG.get_download_lyrics(): try: - set_audio_tags(filename_temp, artists, genres, name, album_name, release_year, disc_number, track_number, - lyrics_lines) - set_music_thumbnail(filename_temp, image_url) + lyr_dir = Zotify.CONFIG.get_lyrics_location() or PurePath(filename).parent + lyr_name_tpl = Zotify.CONFIG.get_lyrics_filename() + lyr_name = lyr_name_tpl + lyr_name = lyr_name.replace('{artist}', fix_filename(artists[0])) + lyr_name = lyr_name.replace('{song_name}', fix_filename(name)) + lyr_name = lyr_name.replace('{album}', fix_filename(album_name)) + if Zotify.CONFIG.get_unique_lyrics_file(): + lrc_path = PurePath(lyr_dir).joinpath("lyrics.lrc") + else: + lrc_path = PurePath(lyr_dir).joinpath(f"{lyr_name}.lrc") + get_song_lyrics(track_id, lrc_path, title=name, artists=artists, album=album_name, duration_ms=duration_ms) + except ValueError: + Printer.print(PrintChannel.SKIPS, f"### LYRICS_UNAVAILABLE: Lyrics for {song_name} not available or API returned empty/errored response ###") + return 'skipped' + + if check_all_time and Zotify.CONFIG.get_skip_previously_downloaded(): + prepare_download_loader.stop() + Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG ALREADY DOWNLOADED ONCE) ###' + "\n") + if Zotify.CONFIG.get_always_check_lyrics() and Zotify.CONFIG.get_download_lyrics(): + try: + lyr_dir = Zotify.CONFIG.get_lyrics_location() or PurePath(filename).parent + lyr_name_tpl = Zotify.CONFIG.get_lyrics_filename() + lyr_name = lyr_name_tpl + lyr_name = lyr_name.replace('{artist}', fix_filename(artists[0])) + lyr_name = lyr_name.replace('{song_name}', fix_filename(name)) + lyr_name = lyr_name.replace('{album}', fix_filename(album_name)) + lrc_path = PurePath(lyr_dir).joinpath(f"{lyr_name}.lrc") + get_song_lyrics(track_id, lrc_path, title=name, artists=artists, album=album_name, duration_ms=duration_ms) + except ValueError: + Printer.print(PrintChannel.SKIPS, f"### LYRICS_UNAVAILABLE: Lyrics for {song_name} not available or API returned empty/errored response ###") + return 'skipped' + + prog_prefix = '' + if mode == 'album': + cur = extra_keys.get('album_num') + total = extra_keys.get('album_total') + if cur and not total: + try: + album_id = extra_keys.get('album_id') + if album_id: + locale = Zotify.CONFIG.get_locale() + resp = Zotify.invoke_url_with_params( + f'https://api.spotify.com/v1/albums/{album_id}/tracks', + limit=1, + offset=0, + market='from_token', + locale=locale, + ) + total_val = resp.get('total') if isinstance(resp, dict) else None + if total_val is not None: + total = str(total_val) except Exception: - Printer.print(PrintChannel.ERRORS, "Unable to write metadata, ensure ffmpeg is installed and added to your PATH.") + total = total + if cur and total: + prog_prefix = f'({cur}/{total}) ' + elif mode in ('playlist', 'extplaylist'): + cur = extra_keys.get('playlist_num') + total = extra_keys.get('playlist_total') + if cur and not total: + try: + playlist_id = extra_keys.get('playlist_id') + if playlist_id: + locale = Zotify.CONFIG.get_locale() + resp = Zotify.invoke_url_with_params( + f'https://api.spotify.com/v1/playlists/{playlist_id}/tracks', + limit=1, + offset=0, + market='from_token', + locale=locale, + ) + total_val = resp.get('total') if isinstance(resp, dict) else None + if total_val is not None: + total = str(total_val) + except Exception: + total = total + if cur and total: + prog_prefix = f'({cur}/{total}) ' - if filename_temp != filename: - Path(filename_temp).rename(filename) + Printer.print( + PrintChannel.PROGRESS_INFO, + f'\n### {prog_prefix}STARTING "{song_name}" ###\n' + ) - time_finished = time.time() + if ext == 'ogg': + global _ogg_delay_applied_once + if mode in ('album', 'playlist', 'extplaylist'): + if _ogg_delay_applied_once: + time.sleep(5) + else: + _ogg_delay_applied_once = True - Printer.print(PrintChannel.DOWNLOADS, f'### Downloaded "{song_name}" to "{Path(filename).relative_to(Zotify.CONFIG.get_root_path())}" in {fmt_seconds(time_downloaded - time_start)} (plus {fmt_seconds(time_finished - time_downloaded)} converting) ###' + "\n") + if track_id != scraped_song_id: + track_id = scraped_song_id - if Zotify.CONFIG.get_skip_previously_downloaded(): - add_to_archive(scraped_song_id, PurePath(filename).name, artists[0], name) - if not check_id: - add_to_directory_song_ids(filedir, scraped_song_id, PurePath(filename).name, artists[0], name) + track = TrackId.from_base62(track_id) + stream = Zotify.get_content_stream(track, Zotify.DOWNLOAD_QUALITY) + create_download_directory(filedir) + total_size = stream.input_stream.size + + prepare_download_loader.stop() + + time_start = time.time() + downloaded = 0 + with open(filename_temp, 'wb') as file, Printer.progress( + desc=song_name, + total=total_size, + unit='B', + unit_scale=True, + unit_divisor=1024, + disable=disable_progressbar + ) as p_bar: + b = 0 + while b < 5: + data = stream.input_stream.stream().read(Zotify.CONFIG.get_chunk_size()) + p_bar.update(file.write(data)) + downloaded += len(data) + b += 1 if data == b'' else 0 + if Zotify.CONFIG.get_download_real_time(): + delta_real = time.time() - time_start + delta_want = (downloaded / total_size) * (duration_ms / 1000) + if delta_want > delta_real: + time.sleep(delta_want - delta_real) + + time_downloaded = time.time() + + genres = get_song_genres(raw_artists, name) + + lyrics_lines: Optional[List[str]] = None + if Zotify.CONFIG.get_download_lyrics(): + try: + lyr_dir = Zotify.CONFIG.get_lyrics_location() or PurePath(filename).parent + lyr_name_tpl = Zotify.CONFIG.get_lyrics_filename() + lyr_name = lyr_name_tpl + lyr_name = lyr_name.replace('{artist}', fix_filename(artists[0])) + lyr_name = lyr_name.replace('{song_name}', fix_filename(name)) + lyr_name = lyr_name.replace('{album}', fix_filename(album_name)) + if Zotify.CONFIG.get_unique_lyrics_file(): + lrc_path = PurePath(lyr_dir).joinpath("lyrics.lrc") + else: + lrc_path = PurePath(lyr_dir).joinpath(f"{lyr_name}.lrc") + lyrics_lines = get_song_lyrics( + track_id, lrc_path, title=name, artists=artists, album=album_name, duration_ms=duration_ms, write_file=True + ) + except ValueError: + Printer.print(PrintChannel.SKIPS, f"### LYRICS_UNAVAILABLE: Lyrics for {song_name} not available or API returned empty/errored response ###") + + convert_audio_format(filename_temp) + try: + set_audio_tags( + filename_temp, + artists, + genres, + name, + album_name, + release_year, + disc_number, + track_number, + lyrics_lines + ) + set_music_thumbnail(filename_temp, image_url) + except Exception: + Printer.print(PrintChannel.ERRORS, "Unable to write metadata, ensure ffmpeg is installed and added to your PATH.") + + if filename_temp != filename: + Path(filename_temp).rename(filename) + + time_finished = time.time() + + Printer.print( + PrintChannel.DOWNLOADS, + f'### Downloaded "{song_name}" to "{Path(filename).relative_to(Zotify.CONFIG.get_root_path())}" ' + f'in {fmt_seconds(time_downloaded - time_start)} (plus {fmt_seconds(time_finished - time_downloaded)} converting) ###\n' + ) + + if Zotify.CONFIG.get_skip_previously_downloaded(): + add_to_archive(scraped_song_id, PurePath(filename).name, artists[0], name) + if not check_id: + add_to_directory_song_ids(filedir, scraped_song_id, PurePath(filename).name, artists[0], name) + + if Zotify.CONFIG.get_bulk_wait_time(): + time.sleep(Zotify.CONFIG.get_bulk_wait_time()) + + return 'downloaded' - if Zotify.CONFIG.get_bulk_wait_time(): - time.sleep(Zotify.CONFIG.get_bulk_wait_time()) except Exception as e: + prepare_download_loader.stop() Printer.print(PrintChannel.ERRORS, '### SKIPPING: ' + song_name + ' (GENERAL DOWNLOAD ERROR) ###') Printer.print(PrintChannel.ERRORS, 'Track_ID: ' + str(track_id)) for k in extra_keys: @@ -523,10 +530,12 @@ def download_track(mode: str, track_id: str, extra_keys=None, disable_progressba Printer.print(PrintChannel.ERRORS, "\n") Printer.print(PrintChannel.ERRORS, str(e) + "\n") Printer.print(PrintChannel.ERRORS, "".join(traceback.TracebackException.from_exception(e).format()) + "\n") - if Path(filename_temp).exists(): + if 'filename_temp' in locals() and Path(filename_temp).exists(): Path(filename_temp).unlink() + return 'errored' - prepare_download_loader.stop() + finally: + prepare_download_loader.stop() def convert_audio_format(filename) -> None: @@ -538,7 +547,7 @@ def convert_audio_format(filename) -> None: if file_codec != 'copy': bitrate = Zotify.CONFIG.get_transcode_bitrate() bitrates = { - #SpotiClub API permit the use of '320k' for free users, so we map 'auto' to that value. + #SpotiClub : API permit the use of '320k' for free users, so we map 'auto' to that value. 'auto': '320k', 'normal': '96k', 'high': '160k',