Push V0.2.3

This commit is contained in:
unknown
2025-12-19 01:49:51 +01:00
parent 80e3740d01
commit 9a64fa8b39
9 changed files with 336 additions and 272 deletions

View File

@@ -1,5 +1,5 @@
from zotify.const import ITEMS, ARTISTS, NAME, ID from zotify.const import ITEMS, ARTISTS, NAME, ID
from zotify.termoutput import Printer from zotify.termoutput import Printer, PrintChannel
from zotify.track import download_track from zotify.track import download_track
from zotify.utils import fix_filename from zotify.utils import fix_filename
from zotify.zotify import Zotify from zotify.zotify import Zotify
@@ -71,9 +71,13 @@ def download_album(album):
} }
album_multi_disc = len(disc_numbers) > 1 album_multi_disc = len(disc_numbers) > 1
downloaded = 0
skipped = 0
errored = 0
for n, track in Printer.progress(enumerate(tracks, start=1), unit_scale=True, unit='Song', total=len(tracks)): for n, track in Printer.progress(enumerate(tracks, start=1), unit_scale=True, unit='Song', total=len(tracks)):
# Only pass dynamic numbering and album_id (useful for custom templates using {album_id}). # Only pass dynamic numbering and album_id (useful for custom templates using {album_id}).
download_track( result = download_track(
'album', 'album',
track[ID], track[ID],
extra_keys={ extra_keys={
@@ -85,6 +89,19 @@ def download_album(album):
disable_progressbar=True disable_progressbar=True
) )
if result == 'downloaded':
downloaded += 1
elif result == 'skipped':
skipped += 1
else:
errored += 1
total = len(tracks)
Printer.print(
PrintChannel.PROGRESS_INFO,
f'\n#######################################\nFinished! Here is your album summary :\ndownloaded {downloaded}/{total} | skipped {skipped}/{total} | errored {errored}/{total}\n#######################################\n'
)
def download_artist_albums(artist): def download_artist_albums(artist):
""" Downloads albums of an artist """ """ Downloads albums of an artist """

View File

@@ -103,8 +103,19 @@ def download_from_urls(urls: list[str]) -> bool:
download = True download = True
playlist_songs = get_playlist_songs(playlist_id) playlist_songs = get_playlist_songs(playlist_id)
name, _ = get_playlist_info(playlist_id) name, _ = get_playlist_info(playlist_id)
enum = 1 track_items = []
char_num = len(str(len(playlist_songs))) for song in playlist_songs:
track_obj = song.get(TRACK) if isinstance(song, dict) else None
if track_obj and track_obj.get(ID) and track_obj.get(TYPE) != "episode":
track_items.append(song)
expected_total = len(track_items)
downloaded_count = 0
skipped_count = 0
errored_count = 0
track_enum = 1
char_num = len(str(expected_total if expected_total else 1))
for song in playlist_songs: for song in playlist_songs:
track_obj = song.get(TRACK) if isinstance(song, dict) else None track_obj = song.get(TRACK) if isinstance(song, dict) else None
if not track_obj or not track_obj.get(NAME) or not track_obj.get(ID): if not track_obj or not track_obj.get(NAME) or not track_obj.get(ID):
@@ -113,16 +124,27 @@ def download_from_urls(urls: list[str]) -> bool:
if track_obj.get(TYPE) == "episode": # Playlist item is a podcast episode if track_obj.get(TYPE) == "episode": # Playlist item is a podcast episode
download_episode(track_obj[ID]) download_episode(track_obj[ID])
else: else:
download_track('playlist', track_obj[ID], extra_keys= result = download_track('playlist', track_obj[ID], extra_keys=
{ {
'playlist_song_name': track_obj[NAME], 'playlist_song_name': track_obj[NAME],
'playlist': name, 'playlist': name,
'playlist_num': str(enum).zfill(char_num), 'playlist_num': str(track_enum).zfill(char_num),
'playlist_total': str(len(playlist_songs)), 'playlist_total': str(expected_total),
'playlist_id': playlist_id, 'playlist_id': playlist_id,
'playlist_track_id': track_obj[ID] 'playlist_track_id': track_obj[ID]
}) })
enum += 1 track_enum += 1
if result == 'downloaded':
downloaded_count += 1
elif result == 'skipped':
skipped_count += 1
else:
errored_count += 1
Printer.print(
PrintChannel.PROGRESS_INFO,
f'\n### PLAYLIST SUMMARY: downloaded {downloaded_count}/{expected_total} | skipped {skipped_count}/{expected_total} | errored {errored_count}/{expected_total} ###\n'
)
elif episode_id is not None: elif episode_id is not None:
download = True download = True
download_episode(episode_id) download_episode(episode_id)

View File

@@ -3,7 +3,7 @@ import sys
from pathlib import Path, PurePath from pathlib import Path, PurePath
from typing import Any from typing import Any
ZOTIFY_VERSION = "0.2.1" ZOTIFY_VERSION = "0.2.2"
ROOT_PATH = 'ROOT_PATH' ROOT_PATH = 'ROOT_PATH'
ROOT_PODCAST_PATH = 'ROOT_PODCAST_PATH' ROOT_PODCAST_PATH = 'ROOT_PODCAST_PATH'
SKIP_EXISTING = 'SKIP_EXISTING' SKIP_EXISTING = 'SKIP_EXISTING'

View File

@@ -1,5 +1,5 @@
from zotify.const import ITEMS, ID, TRACK, NAME from zotify.const import ITEMS, ID, TRACK, NAME
from zotify.termoutput import Printer from zotify.termoutput import Printer, PrintChannel
from zotify.track import download_track from zotify.track import download_track
from zotify.utils import split_input from zotify.utils import split_input
from zotify.zotify import Zotify from zotify.zotify import Zotify
@@ -60,24 +60,40 @@ def download_playlist(playlist):
pl_name, _ = get_playlist_info(playlist[ID]) pl_name, _ = get_playlist_info(playlist[ID])
playlist_songs = [song for song in get_playlist_songs(playlist[ID]) if song[TRACK] is not None and song[TRACK][ID]] playlist_songs = [song for song in get_playlist_songs(playlist[ID]) if song[TRACK] is not None and song[TRACK][ID]]
p_bar = Printer.progress(playlist_songs, unit='song', total=len(playlist_songs), unit_scale=True) total = len(playlist_songs)
downloaded = 0
skipped = 0
errored = 0
p_bar = Printer.progress(playlist_songs, unit='song', total=total, unit_scale=True)
enum = 1 enum = 1
for song in p_bar: for song in p_bar:
# Use localized playlist name; track metadata (artist/title) is localized in download_track via locale # Use localized playlist name; track metadata (artist/title) is localized in download_track via locale
download_track( result = download_track(
'extplaylist', 'extplaylist',
song[TRACK][ID], song[TRACK][ID],
extra_keys={ extra_keys={
'playlist': pl_name, 'playlist': pl_name,
'playlist_num': str(enum).zfill(2), 'playlist_num': str(enum).zfill(2),
'playlist_total': str(len(playlist_songs)), 'playlist_total': str(total),
'playlist_id': playlist[ID], 'playlist_id': playlist[ID],
}, },
disable_progressbar=True disable_progressbar=True
) )
if result == 'downloaded':
downloaded += 1
elif result == 'skipped':
skipped += 1
else:
errored += 1
p_bar.set_description(song[TRACK][NAME]) p_bar.set_description(song[TRACK][NAME])
enum += 1 enum += 1
Printer.print(
PrintChannel.PROGRESS_INFO,
f'\n#######################################\nFinished! Here is your playlist summary :\ndownloaded {downloaded}/{total} | skipped {skipped}/{total} | errored {errored}/{total}\n#######################################\n'
)
def download_from_user_playlist(): def download_from_user_playlist():
""" Select which playlist(s) to download """ """ Select which playlist(s) to download """

View File

@@ -1,25 +1,29 @@
from pathlib import Path, PurePath from pathlib import Path, PurePath
from typing import Any, Tuple, List, Optional from typing import Any, Tuple, List, Optional, Literal
import json
import math
import re
import time
import traceback
import uuid
import ffmpy
from librespot.metadata import TrackId from librespot.metadata import TrackId
from zotify.const import TRACKS, ALBUM, GENRES, NAME, ITEMS, DISC_NUMBER, TRACK_NUMBER, IS_PLAYABLE, ARTISTS, IMAGES, URL, \ from zotify.const import TRACKS, ALBUM, GENRES, NAME, ITEMS, DISC_NUMBER, TRACK_NUMBER, IS_PLAYABLE, ARTISTS, IMAGES, URL, \
RELEASE_DATE, ID, TRACKS_URL, FOLLOWED_ARTISTS_URL, SAVED_TRACKS_URL, TRACK_STATS_URL, CODEC_MAP, EXT_MAP, DURATION_MS, \ RELEASE_DATE, ID, TRACKS_URL, FOLLOWED_ARTISTS_URL, SAVED_TRACKS_URL, TRACK_STATS_URL, CODEC_MAP, EXT_MAP, DURATION_MS, \
HREF, ARTISTS, WIDTH HREF, WIDTH
from zotify.loader import Loader
from zotify.termoutput import Printer, PrintChannel from zotify.termoutput import Printer, PrintChannel
from zotify.utils import fix_filename, set_audio_tags, set_music_thumbnail, create_download_directory, \ from zotify.utils import fix_filename, set_audio_tags, set_music_thumbnail, create_download_directory, \
get_directory_song_ids, add_to_directory_song_ids, get_previously_downloaded, add_to_archive, fmt_seconds get_directory_song_ids, add_to_directory_song_ids, get_previously_downloaded, add_to_archive, fmt_seconds
from zotify.zotify import Zotify from zotify.zotify import Zotify
import traceback
from zotify.loader import Loader
import math
import re
import time
import uuid
import json
import ffmpy
# Track whether we've already applied the OGG delay for bulk (album/playlist) downloads
_ogg_delay_applied_once = False _ogg_delay_applied_once = False
DownloadTrackResult = Literal['downloaded', 'skipped', 'errored']
def get_saved_tracks() -> list: def get_saved_tracks() -> list:
songs = [] songs = []
offset = 0 offset = 0
@@ -79,11 +83,9 @@ def ensure_spoticlub_credentials() -> None:
spoticlub_user = data.get('spoticlub_user') or '' spoticlub_user = data.get('spoticlub_user') or ''
spoticlub_password = data.get('spoticlub_password') or '' spoticlub_password = data.get('spoticlub_password') or ''
# If any credential value is missing, prompt the user
if not spoticlub_user or not spoticlub_password: if not spoticlub_user or not spoticlub_password:
Printer.print(PrintChannel.PROGRESS_INFO, '\nSpotiClub credentials not found. Please enter them now.') Printer.print(PrintChannel.PROGRESS_INFO, '\nSpotiClub credentials not found. Please enter them now.')
spoticlub_user = input('SpotiClub username: ').strip() spoticlub_user = input('SpotiClub username: ').strip()
# Basic loop to avoid empty submissions
while not spoticlub_user: while not spoticlub_user:
spoticlub_user = input('SpotiClub username (cannot be empty): ').strip() spoticlub_user = input('SpotiClub username (cannot be empty): ').strip()
@@ -163,7 +165,6 @@ def get_song_genres(rawartists: List[str], track_name: str) -> List[str]:
elif artist_genres: elif artist_genres:
genres.append(artist_genres[0]) genres.append(artist_genres[0])
# De-duplicate while preserving order
seen = set() seen = set()
genres = [g for g in genres if not (g in seen or seen.add(g))] genres = [g for g in genres if not (g in seen or seen.add(g))]
@@ -238,284 +239,290 @@ def get_song_duration(song_id: str) -> float:
return duration return duration
def download_track(mode: str, track_id: str, extra_keys=None, disable_progressbar=False) -> None: def download_track(mode: str, track_id: str, extra_keys=None, disable_progressbar=False) -> DownloadTrackResult:
if extra_keys is None: if extra_keys is None:
extra_keys = {} extra_keys = {}
# Ensure SpotiClub credentials exist before starting any download prompts or loaders
ensure_spoticlub_credentials() ensure_spoticlub_credentials()
prepare_download_loader = Loader(PrintChannel.PROGRESS_INFO, "Preparing download...") prepare_download_loader = Loader(PrintChannel.PROGRESS_INFO, "Preparing download...")
prepare_download_loader.start() prepare_download_loader.start()
try: try:
output_template = str(Zotify.CONFIG.get_output(mode)) try:
output_template = str(Zotify.CONFIG.get_output(mode))
prepare_download_loader.stop()
prepare_download_loader.stop() (artists, raw_artists, album_name, name, image_url, release_year, disc_number,
track_number, scraped_song_id, is_playable, duration_ms) = get_song_info(track_id)
(artists, raw_artists, album_name, name, image_url, release_year, disc_number, song_name = fix_filename(artists[0]) + ' - ' + fix_filename(name)
track_number, scraped_song_id, is_playable, duration_ms) = get_song_info(track_id)
song_name = fix_filename(artists[0]) + ' - ' + fix_filename(name) for k in extra_keys:
output_template = output_template.replace("{" + k + "}", fix_filename(extra_keys[k]))
for k in extra_keys: ext = EXT_MAP.get(Zotify.CONFIG.get_download_format().lower())
output_template = output_template.replace("{"+k+"}", fix_filename(extra_keys[k]))
ext = EXT_MAP.get(Zotify.CONFIG.get_download_format().lower()) output_template = output_template.replace("{artist}", fix_filename(artists[0]))
output_template = output_template.replace("{album}", fix_filename(album_name))
output_template = output_template.replace("{song_name}", fix_filename(name))
output_template = output_template.replace("{release_year}", fix_filename(release_year))
output_template = output_template.replace("{disc_number}", fix_filename(disc_number))
output_template = output_template.replace("{track_number}", fix_filename(track_number))
output_template = output_template.replace("{id}", fix_filename(scraped_song_id))
output_template = output_template.replace("{track_id}", fix_filename(track_id))
output_template = output_template.replace("{ext}", ext)
if mode == 'album' and Zotify.CONFIG.get_split_album_discs():
flag_raw = extra_keys.get('album_multi_disc')
flag = str(flag_raw).strip().lower() in ('1', 'true', 'yes')
try:
disc_number_int = int(disc_number)
except Exception:
disc_number_int = 1
output_template = output_template.replace("{artist}", fix_filename(artists[0])) should_create_disc_dir = flag or disc_number_int > 1
output_template = output_template.replace("{album}", fix_filename(album_name)) if should_create_disc_dir:
output_template = output_template.replace("{song_name}", fix_filename(name)) tpl_path = PurePath(output_template)
output_template = output_template.replace("{release_year}", fix_filename(release_year)) disc_dir_name = f"Disc {disc_number_int}"
output_template = output_template.replace("{disc_number}", fix_filename(disc_number)) if disc_dir_name not in tpl_path.parts:
output_template = output_template.replace("{track_number}", fix_filename(track_number)) output_template = str(tpl_path.parent / disc_dir_name / tpl_path.name)
output_template = output_template.replace("{id}", fix_filename(scraped_song_id))
output_template = output_template.replace("{track_id}", fix_filename(track_id))
output_template = output_template.replace("{ext}", ext)
# SPLIT_ALBUM_DISCS should only create a Disc folder when the album truly has multiple discs. filename = PurePath(Zotify.CONFIG.get_root_path()).joinpath(output_template)
# - When downloading via zotify.album.download_album(), we pass extra_keys['album_multi_disc']. filedir = PurePath(filename).parent
# - As a fallback, any track with disc_number > 1 implies multi-disc.
if mode == 'album' and Zotify.CONFIG.get_split_album_discs():
flag_raw = extra_keys.get('album_multi_disc')
flag = str(flag_raw).strip().lower() in ('1', 'true', 'yes')
try:
disc_number_int = int(disc_number)
except Exception:
disc_number_int = 1
should_create_disc_dir = flag or disc_number_int > 1 filename_temp = filename
if should_create_disc_dir: if Zotify.CONFIG.get_temp_download_dir() != '':
tpl_path = PurePath(output_template) filename_temp = PurePath(Zotify.CONFIG.get_temp_download_dir()).joinpath(
disc_dir_name = f"Disc {disc_number_int}" f'zotify_{str(uuid.uuid4())}_{track_id}.{ext}'
if disc_dir_name not in tpl_path.parts: )
output_template = str(tpl_path.parent / disc_dir_name / tpl_path.name)
filename = PurePath(Zotify.CONFIG.get_root_path()).joinpath(output_template) check_name = Path(filename).is_file() and Path(filename).stat().st_size
filedir = PurePath(filename).parent check_id = scraped_song_id in get_directory_song_ids(filedir)
check_all_time = scraped_song_id in get_previously_downloaded()
filename_temp = filename if not check_id and check_name:
if Zotify.CONFIG.get_temp_download_dir() != '': stem = PurePath(filename).stem
filename_temp = PurePath(Zotify.CONFIG.get_temp_download_dir()).joinpath(f'zotify_{str(uuid.uuid4())}_{track_id}.{ext}') ext_existing = PurePath(filename).suffix
base_prefix = str(PurePath(filedir).joinpath(stem))
c = len([
file
for file in Path(filedir).iterdir()
if str(file).startswith(base_prefix + "_")
]) + 1
check_name = Path(filename).is_file() and Path(filename).stat().st_size filename = PurePath(filedir).joinpath(f'{stem}_{c}{ext_existing}')
check_id = scraped_song_id in get_directory_song_ids(filedir)
check_all_time = scraped_song_id in get_previously_downloaded()
# a song with the same name is installed except Exception as e:
if not check_id and check_name: prepare_download_loader.stop()
stem = PurePath(filename).stem Printer.print(PrintChannel.ERRORS, '### SKIPPING SONG - FAILED TO QUERY METADATA ###')
ext = PurePath(filename).suffix Printer.print(PrintChannel.ERRORS, 'Track_ID: ' + str(track_id))
base_prefix = str(PurePath(filedir).joinpath(stem)) for k in extra_keys:
c = len([ Printer.print(PrintChannel.ERRORS, k + ': ' + str(extra_keys[k]))
file Printer.print(PrintChannel.ERRORS, "\n")
for file in Path(filedir).iterdir() Printer.print(PrintChannel.ERRORS, str(e) + "\n")
if str(file).startswith(base_prefix + "_") Printer.print(PrintChannel.ERRORS, "".join(traceback.TracebackException.from_exception(e).format()) + "\n")
]) + 1 Printer.print(PrintChannel.PROGRESS_INFO, "Waiting to query Spotify API again.." + "\n")
time.sleep(10)
return download_track(mode, track_id, extra_keys)
# SpotiClub: Fix phantom files when colliding with existing names (-_.mp3)
filename = PurePath(filedir).joinpath(f'{stem}_{c}{ext}')
except Exception as e:
Printer.print(PrintChannel.ERRORS, '### SKIPPING SONG - FAILED TO QUERY METADATA ###')
Printer.print(PrintChannel.ERRORS, 'Track_ID: ' + str(track_id))
for k in extra_keys:
Printer.print(PrintChannel.ERRORS, k + ': ' + str(extra_keys[k]))
Printer.print(PrintChannel.ERRORS, "\n")
Printer.print(PrintChannel.ERRORS, str(e) + "\n")
Printer.print(PrintChannel.ERRORS, "".join(traceback.TracebackException.from_exception(e).format()) + "\n")
Printer.print(PrintChannel.PROGRESS_INFO, "Waiting to query Spotify API again.." + "\n")
time.sleep(10)
return download_track(mode, track_id, extra_keys)
else:
try: try:
if not is_playable: if not is_playable:
prepare_download_loader.stop() prepare_download_loader.stop()
Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG IS UNAVAILABLE) ###' + "\n") Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG IS UNAVAILABLE) ###' + "\n")
else: return 'skipped'
if check_name and Zotify.CONFIG.get_skip_existing():
prepare_download_loader.stop()
Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG ALREADY EXISTS) ###' + "\n")
if Zotify.CONFIG.get_always_check_lyrics() and Zotify.CONFIG.get_download_lyrics():
try:
lyr_dir = Zotify.CONFIG.get_lyrics_location() or PurePath(filename).parent
lyr_name_tpl = Zotify.CONFIG.get_lyrics_filename()
lyr_name = lyr_name_tpl
lyr_name = lyr_name.replace('{artist}', fix_filename(artists[0]))
lyr_name = lyr_name.replace('{song_name}', fix_filename(name))
lyr_name = lyr_name.replace('{album}', fix_filename(album_name))
if Zotify.CONFIG.get_unique_lyrics_file():
lrc_path = PurePath(lyr_dir).joinpath("lyrics.lrc")
else:
lrc_path = PurePath(lyr_dir).joinpath(f"{lyr_name}.lrc")
get_song_lyrics(track_id, lrc_path, title=name, artists=artists, album=album_name, duration_ms=duration_ms)
except ValueError:
Printer.print(PrintChannel.SKIPS, f"### LYRICS_UNAVAILABLE: Lyrics for {song_name} not available or API returned empty/errored response ###")
elif check_all_time and Zotify.CONFIG.get_skip_previously_downloaded(): if check_name and Zotify.CONFIG.get_skip_existing():
prepare_download_loader.stop() prepare_download_loader.stop()
Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG ALREADY DOWNLOADED ONCE) ###' + "\n") Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG ALREADY EXISTS) ###' + "\n")
if Zotify.CONFIG.get_always_check_lyrics() and Zotify.CONFIG.get_download_lyrics(): if Zotify.CONFIG.get_always_check_lyrics() and Zotify.CONFIG.get_download_lyrics():
try:
lyr_dir = Zotify.CONFIG.get_lyrics_location() or PurePath(filename).parent
lyr_name_tpl = Zotify.CONFIG.get_lyrics_filename()
lyr_name = lyr_name_tpl
lyr_name = lyr_name.replace('{artist}', fix_filename(artists[0]))
lyr_name = lyr_name.replace('{song_name}', fix_filename(name))
lyr_name = lyr_name.replace('{album}', fix_filename(album_name))
lrc_path = PurePath(lyr_dir).joinpath(f"{lyr_name}.lrc")
get_song_lyrics(track_id, lrc_path, title=name, artists=artists, album=album_name, duration_ms=duration_ms)
except ValueError:
Printer.print(PrintChannel.SKIPS, f"### LYRICS_UNAVAILABLE: Lyrics for {song_name} not available or API returned empty/errored response ###")
else:
prog_prefix = ''
if mode == 'album':
cur = extra_keys.get('album_num')
total = extra_keys.get('album_total')
if cur and not total:
# No info about total tracks? Let's query the album from Spotify's API in last resort
try:
album_id = extra_keys.get('album_id')
if album_id:
locale = Zotify.CONFIG.get_locale()
resp = Zotify.invoke_url_with_params(
f'https://api.spotify.com/v1/albums/{album_id}/tracks',
limit=1,
offset=0,
market='from_token',
locale=locale,
)
total_val = resp.get('total') if isinstance(resp, dict) else None
if total_val is not None:
total = str(total_val)
except Exception:
total = total
if cur and total:
prog_prefix = f'({cur}/{total}) '
elif mode in ('playlist', 'extplaylist'):
cur = extra_keys.get('playlist_num')
total = extra_keys.get('playlist_total')
if cur and not total:
# Same fallback for total tracks in playlist
try:
playlist_id = extra_keys.get('playlist_id')
if playlist_id:
locale = Zotify.CONFIG.get_locale()
resp = Zotify.invoke_url_with_params(
f'https://api.spotify.com/v1/playlists/{playlist_id}/tracks',
limit=1,
offset=0,
market='from_token',
locale=locale,
)
total_val = resp.get('total') if isinstance(resp, dict) else None
if total_val is not None:
total = str(total_val)
except Exception:
total = total
if cur and total:
prog_prefix = f'({cur}/{total}) '
Printer.print(
PrintChannel.PROGRESS_INFO,
f'\n### {prog_prefix}STARTING "{song_name}" ###\n'
)
if ext == 'ogg':
# SpotiClub : TEMP? : For albums/playlists, wait 5 seconds between OGG tracks to avoid
# spamming the SpotiClub API for audio keys.
# Skip the very first track in the run and for single-track downloads.
global _ogg_delay_applied_once
if mode in ('album', 'playlist', 'extplaylist'):
if _ogg_delay_applied_once:
# Spammy log
# Printer.print(PrintChannel.PROGRESS_INFO, '\n## OGG File : Waiting 5 seconds before resuming... ##')
time.sleep(5)
else:
_ogg_delay_applied_once = True
if track_id != scraped_song_id:
track_id = scraped_song_id
track = TrackId.from_base62(track_id)
stream = Zotify.get_content_stream(track, Zotify.DOWNLOAD_QUALITY)
create_download_directory(filedir)
total_size = stream.input_stream.size
prepare_download_loader.stop()
time_start = time.time()
downloaded = 0
with open(filename_temp, 'wb') as file, Printer.progress(
desc=song_name,
total=total_size,
unit='B',
unit_scale=True,
unit_divisor=1024,
disable=disable_progressbar
) as p_bar:
b = 0
while b < 5:
data = stream.input_stream.stream().read(Zotify.CONFIG.get_chunk_size())
p_bar.update(file.write(data))
downloaded += len(data)
b += 1 if data == b'' else 0
if Zotify.CONFIG.get_download_real_time():
delta_real = time.time() - time_start
delta_want = (downloaded / total_size) * (duration_ms/1000)
if delta_want > delta_real:
time.sleep(delta_want - delta_real)
time_downloaded = time.time()
genres = get_song_genres(raw_artists, name)
lyrics_lines: Optional[List[str]] = None
if Zotify.CONFIG.get_download_lyrics():
try:
lyr_dir = Zotify.CONFIG.get_lyrics_location() or PurePath(filename).parent
lyr_name_tpl = Zotify.CONFIG.get_lyrics_filename()
lyr_name = lyr_name_tpl
lyr_name = lyr_name.replace('{artist}', fix_filename(artists[0]))
lyr_name = lyr_name.replace('{song_name}', fix_filename(name))
lyr_name = lyr_name.replace('{album}', fix_filename(album_name))
if Zotify.CONFIG.get_unique_lyrics_file():
lrc_path = PurePath(lyr_dir).joinpath("lyrics.lrc")
else:
lrc_path = PurePath(lyr_dir).joinpath(f"{lyr_name}.lrc")
lyrics_lines = get_song_lyrics(
track_id, lrc_path, title=name, artists=artists, album=album_name, duration_ms=duration_ms, write_file=True
)
except ValueError:
Printer.print(PrintChannel.SKIPS, f"### LYRICS_UNAVAILABLE: Lyrics for {song_name} not available or API returned empty/errored response ###")
convert_audio_format(filename_temp)
try: try:
set_audio_tags(filename_temp, artists, genres, name, album_name, release_year, disc_number, track_number, lyr_dir = Zotify.CONFIG.get_lyrics_location() or PurePath(filename).parent
lyrics_lines) lyr_name_tpl = Zotify.CONFIG.get_lyrics_filename()
set_music_thumbnail(filename_temp, image_url) lyr_name = lyr_name_tpl
lyr_name = lyr_name.replace('{artist}', fix_filename(artists[0]))
lyr_name = lyr_name.replace('{song_name}', fix_filename(name))
lyr_name = lyr_name.replace('{album}', fix_filename(album_name))
if Zotify.CONFIG.get_unique_lyrics_file():
lrc_path = PurePath(lyr_dir).joinpath("lyrics.lrc")
else:
lrc_path = PurePath(lyr_dir).joinpath(f"{lyr_name}.lrc")
get_song_lyrics(track_id, lrc_path, title=name, artists=artists, album=album_name, duration_ms=duration_ms)
except ValueError:
Printer.print(PrintChannel.SKIPS, f"### LYRICS_UNAVAILABLE: Lyrics for {song_name} not available or API returned empty/errored response ###")
return 'skipped'
if check_all_time and Zotify.CONFIG.get_skip_previously_downloaded():
prepare_download_loader.stop()
Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG ALREADY DOWNLOADED ONCE) ###' + "\n")
if Zotify.CONFIG.get_always_check_lyrics() and Zotify.CONFIG.get_download_lyrics():
try:
lyr_dir = Zotify.CONFIG.get_lyrics_location() or PurePath(filename).parent
lyr_name_tpl = Zotify.CONFIG.get_lyrics_filename()
lyr_name = lyr_name_tpl
lyr_name = lyr_name.replace('{artist}', fix_filename(artists[0]))
lyr_name = lyr_name.replace('{song_name}', fix_filename(name))
lyr_name = lyr_name.replace('{album}', fix_filename(album_name))
lrc_path = PurePath(lyr_dir).joinpath(f"{lyr_name}.lrc")
get_song_lyrics(track_id, lrc_path, title=name, artists=artists, album=album_name, duration_ms=duration_ms)
except ValueError:
Printer.print(PrintChannel.SKIPS, f"### LYRICS_UNAVAILABLE: Lyrics for {song_name} not available or API returned empty/errored response ###")
return 'skipped'
prog_prefix = ''
if mode == 'album':
cur = extra_keys.get('album_num')
total = extra_keys.get('album_total')
if cur and not total:
try:
album_id = extra_keys.get('album_id')
if album_id:
locale = Zotify.CONFIG.get_locale()
resp = Zotify.invoke_url_with_params(
f'https://api.spotify.com/v1/albums/{album_id}/tracks',
limit=1,
offset=0,
market='from_token',
locale=locale,
)
total_val = resp.get('total') if isinstance(resp, dict) else None
if total_val is not None:
total = str(total_val)
except Exception: except Exception:
Printer.print(PrintChannel.ERRORS, "Unable to write metadata, ensure ffmpeg is installed and added to your PATH.") total = total
if cur and total:
prog_prefix = f'({cur}/{total}) '
elif mode in ('playlist', 'extplaylist'):
cur = extra_keys.get('playlist_num')
total = extra_keys.get('playlist_total')
if cur and not total:
try:
playlist_id = extra_keys.get('playlist_id')
if playlist_id:
locale = Zotify.CONFIG.get_locale()
resp = Zotify.invoke_url_with_params(
f'https://api.spotify.com/v1/playlists/{playlist_id}/tracks',
limit=1,
offset=0,
market='from_token',
locale=locale,
)
total_val = resp.get('total') if isinstance(resp, dict) else None
if total_val is not None:
total = str(total_val)
except Exception:
total = total
if cur and total:
prog_prefix = f'({cur}/{total}) '
if filename_temp != filename: Printer.print(
Path(filename_temp).rename(filename) PrintChannel.PROGRESS_INFO,
f'\n### {prog_prefix}STARTING "{song_name}" ###\n'
)
time_finished = time.time() if ext == 'ogg':
global _ogg_delay_applied_once
if mode in ('album', 'playlist', 'extplaylist'):
if _ogg_delay_applied_once:
time.sleep(5)
else:
_ogg_delay_applied_once = True
Printer.print(PrintChannel.DOWNLOADS, f'### Downloaded "{song_name}" to "{Path(filename).relative_to(Zotify.CONFIG.get_root_path())}" in {fmt_seconds(time_downloaded - time_start)} (plus {fmt_seconds(time_finished - time_downloaded)} converting) ###' + "\n") if track_id != scraped_song_id:
track_id = scraped_song_id
if Zotify.CONFIG.get_skip_previously_downloaded(): track = TrackId.from_base62(track_id)
add_to_archive(scraped_song_id, PurePath(filename).name, artists[0], name) stream = Zotify.get_content_stream(track, Zotify.DOWNLOAD_QUALITY)
if not check_id: create_download_directory(filedir)
add_to_directory_song_ids(filedir, scraped_song_id, PurePath(filename).name, artists[0], name) total_size = stream.input_stream.size
prepare_download_loader.stop()
time_start = time.time()
downloaded = 0
with open(filename_temp, 'wb') as file, Printer.progress(
desc=song_name,
total=total_size,
unit='B',
unit_scale=True,
unit_divisor=1024,
disable=disable_progressbar
) as p_bar:
b = 0
while b < 5:
data = stream.input_stream.stream().read(Zotify.CONFIG.get_chunk_size())
p_bar.update(file.write(data))
downloaded += len(data)
b += 1 if data == b'' else 0
if Zotify.CONFIG.get_download_real_time():
delta_real = time.time() - time_start
delta_want = (downloaded / total_size) * (duration_ms / 1000)
if delta_want > delta_real:
time.sleep(delta_want - delta_real)
time_downloaded = time.time()
genres = get_song_genres(raw_artists, name)
lyrics_lines: Optional[List[str]] = None
if Zotify.CONFIG.get_download_lyrics():
try:
lyr_dir = Zotify.CONFIG.get_lyrics_location() or PurePath(filename).parent
lyr_name_tpl = Zotify.CONFIG.get_lyrics_filename()
lyr_name = lyr_name_tpl
lyr_name = lyr_name.replace('{artist}', fix_filename(artists[0]))
lyr_name = lyr_name.replace('{song_name}', fix_filename(name))
lyr_name = lyr_name.replace('{album}', fix_filename(album_name))
if Zotify.CONFIG.get_unique_lyrics_file():
lrc_path = PurePath(lyr_dir).joinpath("lyrics.lrc")
else:
lrc_path = PurePath(lyr_dir).joinpath(f"{lyr_name}.lrc")
lyrics_lines = get_song_lyrics(
track_id, lrc_path, title=name, artists=artists, album=album_name, duration_ms=duration_ms, write_file=True
)
except ValueError:
Printer.print(PrintChannel.SKIPS, f"### LYRICS_UNAVAILABLE: Lyrics for {song_name} not available or API returned empty/errored response ###")
convert_audio_format(filename_temp)
try:
set_audio_tags(
filename_temp,
artists,
genres,
name,
album_name,
release_year,
disc_number,
track_number,
lyrics_lines
)
set_music_thumbnail(filename_temp, image_url)
except Exception:
Printer.print(PrintChannel.ERRORS, "Unable to write metadata, ensure ffmpeg is installed and added to your PATH.")
if filename_temp != filename:
Path(filename_temp).rename(filename)
time_finished = time.time()
Printer.print(
PrintChannel.DOWNLOADS,
f'### Downloaded "{song_name}" to "{Path(filename).relative_to(Zotify.CONFIG.get_root_path())}" '
f'in {fmt_seconds(time_downloaded - time_start)} (plus {fmt_seconds(time_finished - time_downloaded)} converting) ###\n'
)
if Zotify.CONFIG.get_skip_previously_downloaded():
add_to_archive(scraped_song_id, PurePath(filename).name, artists[0], name)
if not check_id:
add_to_directory_song_ids(filedir, scraped_song_id, PurePath(filename).name, artists[0], name)
if Zotify.CONFIG.get_bulk_wait_time():
time.sleep(Zotify.CONFIG.get_bulk_wait_time())
return 'downloaded'
if Zotify.CONFIG.get_bulk_wait_time():
time.sleep(Zotify.CONFIG.get_bulk_wait_time())
except Exception as e: except Exception as e:
prepare_download_loader.stop()
Printer.print(PrintChannel.ERRORS, '### SKIPPING: ' + song_name + ' (GENERAL DOWNLOAD ERROR) ###') Printer.print(PrintChannel.ERRORS, '### SKIPPING: ' + song_name + ' (GENERAL DOWNLOAD ERROR) ###')
Printer.print(PrintChannel.ERRORS, 'Track_ID: ' + str(track_id)) Printer.print(PrintChannel.ERRORS, 'Track_ID: ' + str(track_id))
for k in extra_keys: for k in extra_keys:
@@ -523,10 +530,12 @@ def download_track(mode: str, track_id: str, extra_keys=None, disable_progressba
Printer.print(PrintChannel.ERRORS, "\n") Printer.print(PrintChannel.ERRORS, "\n")
Printer.print(PrintChannel.ERRORS, str(e) + "\n") Printer.print(PrintChannel.ERRORS, str(e) + "\n")
Printer.print(PrintChannel.ERRORS, "".join(traceback.TracebackException.from_exception(e).format()) + "\n") Printer.print(PrintChannel.ERRORS, "".join(traceback.TracebackException.from_exception(e).format()) + "\n")
if Path(filename_temp).exists(): if 'filename_temp' in locals() and Path(filename_temp).exists():
Path(filename_temp).unlink() Path(filename_temp).unlink()
return 'errored'
prepare_download_loader.stop() finally:
prepare_download_loader.stop()
def convert_audio_format(filename) -> None: def convert_audio_format(filename) -> None:
@@ -538,7 +547,7 @@ def convert_audio_format(filename) -> None:
if file_codec != 'copy': if file_codec != 'copy':
bitrate = Zotify.CONFIG.get_transcode_bitrate() bitrate = Zotify.CONFIG.get_transcode_bitrate()
bitrates = { bitrates = {
#SpotiClub API permit the use of '320k' for free users, so we map 'auto' to that value. #SpotiClub : API permit the use of '320k' for free users, so we map 'auto' to that value.
'auto': '320k', 'auto': '320k',
'normal': '96k', 'normal': '96k',
'high': '160k', 'high': '160k',