Initial SpotiClub version

This commit is contained in:
Scrameupeutchi
2025-11-22 03:20:05 +01:00
committed by GitHub
commit 912dfd504d
22 changed files with 2503 additions and 0 deletions

0
zotify/__init__.py Normal file
View File

68
zotify/__main__.py Normal file
View File

@@ -0,0 +1,68 @@
#! /usr/bin/env python3
"""
Zotify
It's like youtube-dl, but for that other music platform.
"""
import argparse
from zotify.app import client
from zotify.config import CONFIG_VALUES
def main():
parser = argparse.ArgumentParser(prog='zotify',
description='A music and podcast downloader needing only python and ffmpeg.')
parser.add_argument('-ns', '--no-splash',
action='store_true',
help='Suppress the splash screen when loading.')
parser.add_argument('--config-location',
type=str,
help='Specify the zconfig.json location')
parser.add_argument('--username',
type=str,
help='Account username')
parser.add_argument('--password',
type=str,
help='Account password')
group = parser.add_mutually_exclusive_group(required=False)
group.add_argument('urls',
type=str,
# action='extend',
default='',
nargs='*',
help='Downloads the track, album, playlist, podcast episode, or all albums by an artist from a url. Can take multiple urls.')
group.add_argument('-l', '--liked',
dest='liked_songs',
action='store_true',
help='Downloads all the liked songs from your account.')
group.add_argument('-f', '--followed',
dest='followed_artists',
action='store_true',
help='Downloads all the songs from all your followed artists.')
group.add_argument('-p', '--playlist',
action='store_true',
help='Downloads a saved playlist from your account.')
group.add_argument('-s', '--search',
type=str,
nargs='?',
const=' ',
help='Loads search prompt to find then download a specific track, album or playlist')
group.add_argument('-d', '--download',
type=str,
help='Downloads tracks, playlists and albums from the URLs written in the file passed.')
for configkey in CONFIG_VALUES:
parser.add_argument(CONFIG_VALUES[configkey]['arg'],
type=str,
default=None,
help='Specify the value of the ['+configkey+'] config value')
parser.set_defaults(func=client)
args = parser.parse_args()
args.func(args)
if __name__ == '__main__':
main()

58
zotify/album.py Normal file
View File

@@ -0,0 +1,58 @@
from zotify.const import ITEMS, ARTISTS, NAME, ID
from zotify.termoutput import Printer
from zotify.track import download_track
from zotify.utils import fix_filename
from zotify.zotify import Zotify
ALBUM_URL = 'https://api.spotify.com/v1/albums'
ARTIST_URL = 'https://api.spotify.com/v1/artists'
def get_album_tracks(album_id):
""" Returns album tracklist """
songs = []
offset = 0
limit = 50
while True:
resp = Zotify.invoke_url_with_params(f'{ALBUM_URL}/{album_id}/tracks', limit=limit, offset=offset)
offset += limit
songs.extend(resp[ITEMS])
if len(resp[ITEMS]) < limit:
break
return songs
def get_album_name(album_id):
""" Returns album name """
(raw, resp) = Zotify.invoke_url(f'{ALBUM_URL}/{album_id}')
return resp[ARTISTS][0][NAME], fix_filename(resp[NAME])
def get_artist_albums(artist_id):
""" Returns artist's albums """
(raw, resp) = Zotify.invoke_url(f'{ARTIST_URL}/{artist_id}/albums?include_groups=album%2Csingle')
# Return a list each album's id
album_ids = [resp[ITEMS][i][ID] for i in range(len(resp[ITEMS]))]
# Recursive requests to get all albums including singles an EPs
while resp['next'] is not None:
(raw, resp) = Zotify.invoke_url(resp['next'])
album_ids.extend([resp[ITEMS][i][ID] for i in range(len(resp[ITEMS]))])
return album_ids
def download_album(album):
""" Downloads songs from an album """
artist, album_name = get_album_name(album)
tracks = get_album_tracks(album)
for n, track in Printer.progress(enumerate(tracks, start=1), unit_scale=True, unit='Song', total=len(tracks)):
download_track('album', track[ID], extra_keys={'album_num': str(n).zfill(2), 'artist': artist, 'album': album_name, 'album_id': album}, disable_progressbar=True)
def download_artist_albums(artist):
""" Downloads albums of an artist """
albums = get_artist_albums(artist)
for album_id in albums:
download_album(album_id)

310
zotify/app.py Normal file
View File

@@ -0,0 +1,310 @@
from librespot.audio.decoders import AudioQuality
from tabulate import tabulate
from pathlib import Path
from zotify.album import download_album, download_artist_albums
from zotify.const import TRACK, NAME, ID, ARTIST, ARTISTS, ITEMS, TRACKS, EXPLICIT, ALBUM, ALBUMS, \
OWNER, PLAYLIST, PLAYLISTS, DISPLAY_NAME, TYPE
from zotify.loader import Loader
from zotify.playlist import get_playlist_songs, get_playlist_info, download_from_user_playlist, download_playlist
from zotify.podcast import download_episode, get_show_episodes
from zotify.termoutput import Printer, PrintChannel
from zotify.track import download_track, get_saved_tracks, get_followed_artists
from zotify.utils import splash, split_input, regex_input_for_urls
from zotify.zotify import Zotify
SEARCH_URL = 'https://api.spotify.com/v1/search'
def client(args) -> None:
""" Connects to download server to perform query's and get songs to download """
Zotify(args)
Printer.print(PrintChannel.SPLASH, splash())
quality_options = {
'auto': AudioQuality.VERY_HIGH if Zotify.check_premium() else AudioQuality.HIGH,
'normal': AudioQuality.NORMAL,
'high': AudioQuality.HIGH,
'very_high': AudioQuality.VERY_HIGH
}
Zotify.DOWNLOAD_QUALITY = quality_options[Zotify.CONFIG.get_download_quality()]
if args.download:
urls = []
filename = args.download
if Path(filename).exists():
with open(filename, 'r', encoding='utf-8') as file:
urls.extend([line.strip() for line in file.readlines()])
download_from_urls(urls)
else:
Printer.print(PrintChannel.ERRORS, f'File {filename} not found.\n')
return
if args.urls:
if len(args.urls) > 0:
download_from_urls(args.urls)
return
if args.playlist:
download_from_user_playlist()
return
if args.liked_songs:
for song in get_saved_tracks():
if not song[TRACK][NAME] or not song[TRACK][ID]:
Printer.print(PrintChannel.SKIPS, '### SKIPPING: SONG DOES NOT EXIST ANYMORE ###' + "\n")
else:
download_track('liked', song[TRACK][ID])
return
if args.followed_artists:
for artist in get_followed_artists():
download_artist_albums(artist)
return
if args.search:
if args.search == ' ':
search_text = ''
while len(search_text) == 0:
search_text = input('Enter search: ')
search(search_text)
else:
if not download_from_urls([args.search]):
search(args.search)
return
else:
search_text = ''
while len(search_text) == 0:
search_text = input('Enter search: ')
search(search_text)
def download_from_urls(urls: list[str]) -> bool:
""" Downloads from a list of urls """
download = False
for spotify_url in urls:
track_id, album_id, playlist_id, episode_id, show_id, artist_id = regex_input_for_urls(spotify_url)
if track_id is not None:
download = True
download_track('single', track_id)
elif artist_id is not None:
download = True
download_artist_albums(artist_id)
elif album_id is not None:
download = True
download_album(album_id)
elif playlist_id is not None:
download = True
playlist_songs = get_playlist_songs(playlist_id)
name, _ = get_playlist_info(playlist_id)
enum = 1
char_num = len(str(len(playlist_songs)))
for song in playlist_songs:
if not song[TRACK][NAME] or not song[TRACK][ID]:
Printer.print(PrintChannel.SKIPS, '### SKIPPING: SONG DOES NOT EXIST ANYMORE ###' + "\n")
else:
if song[TRACK][TYPE] == "episode": # Playlist item is a podcast episode
download_episode(song[TRACK][ID])
else:
download_track('playlist', song[TRACK][ID], extra_keys=
{
'playlist_song_name': song[TRACK][NAME],
'playlist': name,
'playlist_num': str(enum).zfill(char_num),
'playlist_id': playlist_id,
'playlist_track_id': song[TRACK][ID]
})
enum += 1
elif episode_id is not None:
download = True
download_episode(episode_id)
elif show_id is not None:
download = True
for episode in get_show_episodes(show_id):
download_episode(episode)
return download
def search(search_term):
""" Searches download server's API for relevant data """
params = {'limit': '10',
'offset': '0',
'q': search_term,
'type': 'track,album,artist,playlist'}
# Parse args
splits = search_term.split()
for split in splits:
index = splits.index(split)
if split[0] == '-' and len(split) > 1:
if len(splits)-1 == index:
raise IndexError('No parameters passed after option: {}\n'.
format(split))
if split == '-l' or split == '-limit':
try:
int(splits[index+1])
except ValueError:
raise ValueError('Parameter passed after {} option must be an integer.\n'.
format(split))
if int(splits[index+1]) > 50:
raise ValueError('Invalid limit passed. Max is 50.\n')
params['limit'] = splits[index+1]
if split == '-t' or split == '-type':
allowed_types = ['track', 'playlist', 'album', 'artist']
passed_types = []
for i in range(index+1, len(splits)):
if splits[i][0] == '-':
break
if splits[i] not in allowed_types:
raise ValueError('Parameters passed after {} option must be from this list:\n{}'.
format(split, '\n'.join(allowed_types)))
passed_types.append(splits[i])
params['type'] = ','.join(passed_types)
if len(params['type']) == 0:
params['type'] = 'track,album,artist,playlist'
# Clean search term
search_term_list = []
for split in splits:
if split[0] == "-":
break
search_term_list.append(split)
if not search_term_list:
raise ValueError("Invalid query.")
params["q"] = ' '.join(search_term_list)
resp = Zotify.invoke_url_with_params(SEARCH_URL, **params)
counter = 1
dics = []
total_tracks = 0
if TRACK in params['type'].split(','):
tracks = resp[TRACKS][ITEMS]
if len(tracks) > 0:
print('### TRACKS ###')
track_data = []
for track in tracks:
if track[EXPLICIT]:
explicit = '[E]'
else:
explicit = ''
track_data.append([counter, f'{track[NAME]} {explicit}',
','.join([artist[NAME] for artist in track[ARTISTS]])])
dics.append({
ID: track[ID],
NAME: track[NAME],
'type': TRACK,
})
counter += 1
total_tracks = counter - 1
print(tabulate(track_data, headers=[
'S.NO', 'Name', 'Artists'], tablefmt='pretty'))
print('\n')
del tracks
del track_data
total_albums = 0
if ALBUM in params['type'].split(','):
albums = resp[ALBUMS][ITEMS]
if len(albums) > 0:
print('### ALBUMS ###')
album_data = []
for album in albums:
album_data.append([counter, album[NAME],
','.join([artist[NAME] for artist in album[ARTISTS]])])
dics.append({
ID: album[ID],
NAME: album[NAME],
'type': ALBUM,
})
counter += 1
total_albums = counter - total_tracks - 1
print(tabulate(album_data, headers=[
'S.NO', 'Album', 'Artists'], tablefmt='pretty'))
print('\n')
del albums
del album_data
total_artists = 0
if ARTIST in params['type'].split(','):
artists = resp[ARTISTS][ITEMS]
if len(artists) > 0:
print('### ARTISTS ###')
artist_data = []
for artist in artists:
artist_data.append([counter, artist[NAME]])
dics.append({
ID: artist[ID],
NAME: artist[NAME],
'type': ARTIST,
})
counter += 1
total_artists = counter - total_tracks - total_albums - 1
print(tabulate(artist_data, headers=[
'S.NO', 'Name'], tablefmt='pretty'))
print('\n')
del artists
del artist_data
total_playlists = 0
if PLAYLIST in params['type'].split(','):
playlists = resp[PLAYLISTS][ITEMS]
if len(playlists) > 0:
print('### PLAYLISTS ###')
playlist_data = []
for playlist in playlists:
playlist_data.append(
[counter, playlist[NAME], playlist[OWNER][DISPLAY_NAME]])
dics.append({
ID: playlist[ID],
NAME: playlist[NAME],
'type': PLAYLIST,
})
counter += 1
total_playlists = counter - total_artists - total_tracks - total_albums - 1
print(tabulate(playlist_data, headers=[
'S.NO', 'Name', 'Owner'], tablefmt='pretty'))
print('\n')
del playlists
del playlist_data
if total_tracks + total_albums + total_artists + total_playlists == 0:
print('NO RESULTS FOUND - EXITING...')
else:
selection = ''
print('> SELECT A DOWNLOAD OPTION BY ID')
print('> SELECT A RANGE BY ADDING A DASH BETWEEN BOTH ID\'s')
print('> OR PARTICULAR OPTIONS BY ADDING A COMMA BETWEEN ID\'s\n')
while len(selection) == 0:
selection = str(input('ID(s): '))
inputs = split_input(selection)
for pos in inputs:
position = int(pos)
for dic in dics:
print_pos = dics.index(dic) + 1
if print_pos == position:
if dic['type'] == TRACK:
download_track('single', dic[ID])
elif dic['type'] == ALBUM:
download_album(dic[ID])
elif dic['type'] == ARTIST:
download_artist_albums(dic[ID])
else:
download_playlist(dic)

310
zotify/config.py Normal file
View File

@@ -0,0 +1,310 @@
import json
import sys
from pathlib import Path, PurePath
from typing import Any
ROOT_PATH = 'ROOT_PATH'
ROOT_PODCAST_PATH = 'ROOT_PODCAST_PATH'
SKIP_EXISTING = 'SKIP_EXISTING'
SKIP_PREVIOUSLY_DOWNLOADED = 'SKIP_PREVIOUSLY_DOWNLOADED'
DOWNLOAD_FORMAT = 'DOWNLOAD_FORMAT'
BULK_WAIT_TIME = 'BULK_WAIT_TIME'
OVERRIDE_AUTO_WAIT = 'OVERRIDE_AUTO_WAIT'
CHUNK_SIZE = 'CHUNK_SIZE'
SPLIT_ALBUM_DISCS = 'SPLIT_ALBUM_DISCS'
DOWNLOAD_REAL_TIME = 'DOWNLOAD_REAL_TIME'
LANGUAGE = 'LANGUAGE'
DOWNLOAD_QUALITY = 'DOWNLOAD_QUALITY'
TRANSCODE_BITRATE = 'TRANSCODE_BITRATE'
SONG_ARCHIVE = 'SONG_ARCHIVE'
SAVE_CREDENTIALS = 'SAVE_CREDENTIALS'
CREDENTIALS_LOCATION = 'CREDENTIALS_LOCATION'
OUTPUT = 'OUTPUT'
PRINT_SPLASH = 'PRINT_SPLASH'
PRINT_SKIPS = 'PRINT_SKIPS'
PRINT_DOWNLOAD_PROGRESS = 'PRINT_DOWNLOAD_PROGRESS'
PRINT_ERRORS = 'PRINT_ERRORS'
PRINT_DOWNLOADS = 'PRINT_DOWNLOADS'
PRINT_API_ERRORS = 'PRINT_API_ERRORS'
TEMP_DOWNLOAD_DIR = 'TEMP_DOWNLOAD_DIR'
MD_SAVE_GENRES = 'MD_SAVE_GENRES'
MD_ALLGENRES = 'MD_ALLGENRES'
MD_GENREDELIMITER = 'MD_GENREDELIMITER'
PRINT_PROGRESS_INFO = 'PRINT_PROGRESS_INFO'
PRINT_WARNINGS = 'PRINT_WARNINGS'
RETRY_ATTEMPTS = 'RETRY_ATTEMPTS'
CONFIG_VERSION = 'CONFIG_VERSION'
DOWNLOAD_LYRICS = 'DOWNLOAD_LYRICS'
CONFIG_VALUES = {
SAVE_CREDENTIALS: { 'default': 'True', 'type': bool, 'arg': '--save-credentials' },
CREDENTIALS_LOCATION: { 'default': '', 'type': str, 'arg': '--credentials-location' },
OUTPUT: { 'default': '', 'type': str, 'arg': '--output' },
SONG_ARCHIVE: { 'default': '', 'type': str, 'arg': '--song-archive' },
ROOT_PATH: { 'default': '', 'type': str, 'arg': '--root-path' },
ROOT_PODCAST_PATH: { 'default': '', 'type': str, 'arg': '--root-podcast-path' },
SPLIT_ALBUM_DISCS: { 'default': 'False', 'type': bool, 'arg': '--split-album-discs' },
DOWNLOAD_LYRICS: { 'default': 'True', 'type': bool, 'arg': '--download-lyrics' },
MD_SAVE_GENRES: { 'default': 'False', 'type': bool, 'arg': '--md-save-genres' },
MD_ALLGENRES: { 'default': 'False', 'type': bool, 'arg': '--md-allgenres' },
MD_GENREDELIMITER: { 'default': ',', 'type': str, 'arg': '--md-genredelimiter' },
DOWNLOAD_FORMAT: { 'default': 'ogg', 'type': str, 'arg': '--download-format' },
DOWNLOAD_QUALITY: { 'default': 'auto', 'type': str, 'arg': '--download-quality' },
TRANSCODE_BITRATE: { 'default': 'auto', 'type': str, 'arg': '--transcode-bitrate' },
SKIP_EXISTING: { 'default': 'True', 'type': bool, 'arg': '--skip-existing' },
SKIP_PREVIOUSLY_DOWNLOADED: { 'default': 'False', 'type': bool, 'arg': '--skip-previously-downloaded' },
RETRY_ATTEMPTS: { 'default': '1', 'type': int, 'arg': '--retry-attempts' },
BULK_WAIT_TIME: { 'default': '1', 'type': int, 'arg': '--bulk-wait-time' },
OVERRIDE_AUTO_WAIT: { 'default': 'False', 'type': bool, 'arg': '--override-auto-wait' },
CHUNK_SIZE: { 'default': '20000', 'type': int, 'arg': '--chunk-size' },
DOWNLOAD_REAL_TIME: { 'default': 'False', 'type': bool, 'arg': '--download-real-time' },
LANGUAGE: { 'default': 'en', 'type': str, 'arg': '--language' },
PRINT_SPLASH: { 'default': 'False', 'type': bool, 'arg': '--print-splash' },
PRINT_SKIPS: { 'default': 'True', 'type': bool, 'arg': '--print-skips' },
PRINT_DOWNLOAD_PROGRESS: { 'default': 'True', 'type': bool, 'arg': '--print-download-progress' },
PRINT_ERRORS: { 'default': 'True', 'type': bool, 'arg': '--print-errors' },
PRINT_DOWNLOADS: { 'default': 'False', 'type': bool, 'arg': '--print-downloads' },
PRINT_API_ERRORS: { 'default': 'True', 'type': bool, 'arg': '--print-api-errors' },
PRINT_PROGRESS_INFO: { 'default': 'True', 'type': bool, 'arg': '--print-progress-info' },
PRINT_WARNINGS: { 'default': 'True', 'type': bool, 'arg': '--print-warnings' },
TEMP_DOWNLOAD_DIR: { 'default': '', 'type': str, 'arg': '--temp-download-dir' }
}
OUTPUT_DEFAULT_PLAYLIST = '{playlist}/{artist} - {song_name}.{ext}'
OUTPUT_DEFAULT_PLAYLIST_EXT = '{playlist}/{playlist_num} - {artist} - {song_name}.{ext}'
OUTPUT_DEFAULT_LIKED_SONGS = 'Liked Songs/{artist} - {song_name}.{ext}'
OUTPUT_DEFAULT_SINGLE = '{artist}/{album}/{artist} - {song_name}.{ext}'
OUTPUT_DEFAULT_ALBUM = '{artist}/{album}/{album_num} - {artist} - {song_name}.{ext}'
class Config:
Values = {}
@classmethod
def load(cls, args) -> None:
system_paths = {
'win32': Path.home() / 'AppData/Roaming/Zotify',
'linux': Path.home() / '.config/zotify',
'darwin': Path.home() / 'Library/Application Support/Zotify'
}
if sys.platform not in system_paths:
config_fp = Path.cwd() / '.zotify/config.json'
else:
config_fp = system_paths[sys.platform] / 'config.json'
if args.config_location:
config_fp = args.config_location
true_config_file_path = Path(config_fp).expanduser()
# Load config from zconfig.json
Path(PurePath(true_config_file_path).parent).mkdir(parents=True, exist_ok=True)
if not Path(true_config_file_path).exists():
with open(true_config_file_path, 'w', encoding='utf-8') as config_file:
json.dump(cls.get_default_json(), config_file, indent=4)
with open(true_config_file_path, encoding='utf-8') as config_file:
jsonvalues = json.load(config_file)
cls.Values = {}
for key in CONFIG_VALUES:
if key in jsonvalues:
cls.Values[key] = cls.parse_arg_value(key, jsonvalues[key])
# Add default values for missing keys
for key in CONFIG_VALUES:
if key not in cls.Values:
cls.Values[key] = cls.parse_arg_value(key, CONFIG_VALUES[key]['default'])
# Override config from commandline arguments
for key in CONFIG_VALUES:
if key.lower() in vars(args) and vars(args)[key.lower()] is not None:
cls.Values[key] = cls.parse_arg_value(key, vars(args)[key.lower()])
if args.no_splash:
cls.Values[PRINT_SPLASH] = False
@classmethod
def get_default_json(cls) -> Any:
r = {}
for key in CONFIG_VALUES:
r[key] = CONFIG_VALUES[key]['default']
return r
@classmethod
def parse_arg_value(cls, key: str, value: Any) -> Any:
if type(value) == CONFIG_VALUES[key]['type']:
return value
if CONFIG_VALUES[key]['type'] == str:
return str(value)
if CONFIG_VALUES[key]['type'] == int:
return int(value)
if CONFIG_VALUES[key]['type'] == bool:
if str(value).lower() in ['yes', 'true', '1']:
return True
if str(value).lower() in ['no', 'false', '0']:
return False
raise ValueError("Not a boolean: " + value)
raise ValueError("Unknown Type: " + value)
@classmethod
def get(cls, key: str) -> Any:
return cls.Values.get(key)
@classmethod
def get_root_path(cls) -> str:
if cls.get(ROOT_PATH) == '':
root_path = PurePath(Path.home() / 'Music/Zotify Music/')
else:
root_path = PurePath(Path(cls.get(ROOT_PATH)).expanduser())
Path(root_path).mkdir(parents=True, exist_ok=True)
return root_path
@classmethod
def get_root_podcast_path(cls) -> str:
if cls.get(ROOT_PODCAST_PATH) == '':
root_podcast_path = PurePath(Path.home() / 'Music/Zotify Podcasts/')
else:
root_podcast_path = PurePath(Path(cls.get(ROOT_PODCAST_PATH)).expanduser())
Path(root_podcast_path).mkdir(parents=True, exist_ok=True)
return root_podcast_path
@classmethod
def get_skip_existing(cls) -> bool:
return cls.get(SKIP_EXISTING)
@classmethod
def get_skip_previously_downloaded(cls) -> bool:
return cls.get(SKIP_PREVIOUSLY_DOWNLOADED)
@classmethod
def get_split_album_discs(cls) -> bool:
return cls.get(SPLIT_ALBUM_DISCS)
@classmethod
def get_chunk_size(cls) -> int:
return cls.get(CHUNK_SIZE)
@classmethod
def get_override_auto_wait(cls) -> bool:
return cls.get(OVERRIDE_AUTO_WAIT)
@classmethod
def get_download_format(cls) -> str:
return cls.get(DOWNLOAD_FORMAT)
@classmethod
def get_download_lyrics(cls) -> bool:
return cls.get(DOWNLOAD_LYRICS)
@classmethod
def get_bulk_wait_time(cls) -> int:
return cls.get(BULK_WAIT_TIME)
@classmethod
def get_language(cls) -> str:
return cls.get(LANGUAGE)
@classmethod
def get_download_real_time(cls) -> bool:
return cls.get(DOWNLOAD_REAL_TIME)
@classmethod
def get_download_quality(cls) -> str:
return cls.get(DOWNLOAD_QUALITY)
@classmethod
def get_transcode_bitrate(cls) -> str:
return cls.get(TRANSCODE_BITRATE)
@classmethod
def get_song_archive(cls) -> str:
if cls.get(SONG_ARCHIVE) == '':
system_paths = {
'win32': Path.home() / 'AppData/Roaming/Zotify',
'linux': Path.home() / '.local/share/zotify',
'darwin': Path.home() / 'Library/Application Support/Zotify'
}
if sys.platform not in system_paths:
song_archive = PurePath(Path.cwd() / '.zotify/.song_archive')
else:
song_archive = PurePath(system_paths[sys.platform] / '.song_archive')
else:
song_archive = PurePath(Path(cls.get(SONG_ARCHIVE)).expanduser())
Path(song_archive.parent).mkdir(parents=True, exist_ok=True)
return song_archive
@classmethod
def get_save_credentials(cls) -> bool:
return cls.get(SAVE_CREDENTIALS)
@classmethod
def get_credentials_location(cls) -> str:
if cls.get(CREDENTIALS_LOCATION) == '':
system_paths = {
'win32': Path.home() / 'AppData/Roaming/Zotify',
'linux': Path.home() / '.local/share/zotify',
'darwin': Path.home() / 'Library/Application Support/Zotify'
}
if sys.platform not in system_paths:
credentials_location = PurePath(Path.cwd() / '.zotify/credentials.json')
else:
credentials_location = PurePath(system_paths[sys.platform] / 'credentials.json')
else:
credentials_location = PurePath(Path.cwd()).joinpath(cls.get(CREDENTIALS_LOCATION))
Path(credentials_location.parent).mkdir(parents=True, exist_ok=True)
return credentials_location
@classmethod
def get_temp_download_dir(cls) -> str:
if cls.get(TEMP_DOWNLOAD_DIR) == '':
return ''
return PurePath(cls.get_root_path()).joinpath(cls.get(TEMP_DOWNLOAD_DIR))
@classmethod
def get_save_genres(cls) -> bool:
return cls.get(MD_SAVE_GENRES)
@classmethod
def get_all_genres(cls) -> bool:
return cls.get(MD_ALLGENRES)
@classmethod
def get_all_genres_delimiter(cls) -> bool:
return cls.get(MD_GENREDELIMITER)
@classmethod
def get_output(cls, mode: str) -> str:
v = cls.get(OUTPUT)
if v:
return v
if mode == 'playlist':
if cls.get_split_album_discs():
split = PurePath(OUTPUT_DEFAULT_PLAYLIST).parent
return PurePath(split).joinpath('Disc {disc_number}').joinpath(split)
return OUTPUT_DEFAULT_PLAYLIST
if mode == 'extplaylist':
if cls.get_split_album_discs():
split = PurePath(OUTPUT_DEFAULT_PLAYLIST_EXT).parent
return PurePath(split).joinpath('Disc {disc_number}').joinpath(split)
return OUTPUT_DEFAULT_PLAYLIST_EXT
if mode == 'liked':
if cls.get_split_album_discs():
split = PurePath(OUTPUT_DEFAULT_LIKED_SONGS).parent
return PurePath(split).joinpath('Disc {disc_number}').joinpath(split)
return OUTPUT_DEFAULT_LIKED_SONGS
if mode == 'single':
if cls.get_split_album_discs():
split = PurePath(OUTPUT_DEFAULT_SINGLE).parent
return PurePath(split).joinpath('Disc {disc_number}').joinpath(split)
return OUTPUT_DEFAULT_SINGLE
if mode == 'album':
if cls.get_split_album_discs():
split = PurePath(OUTPUT_DEFAULT_ALBUM).parent
return PurePath(split).joinpath('Disc {disc_number}').joinpath(split)
return OUTPUT_DEFAULT_ALBUM
raise ValueError()
@classmethod
def get_retry_attempts(cls) -> int:
return cls.get(RETRY_ATTEMPTS)

115
zotify/const.py Normal file
View File

@@ -0,0 +1,115 @@
FOLLOWED_ARTISTS_URL = 'https://api.spotify.com/v1/me/following?type=artist'
SAVED_TRACKS_URL = 'https://api.spotify.com/v1/me/tracks'
TRACKS_URL = 'https://api.spotify.com/v1/tracks'
TRACK_STATS_URL = 'https://api.spotify.com/v1/audio-features/'
TRACKNUMBER = 'tracknumber'
DISCNUMBER = 'discnumber'
YEAR = 'year'
ALBUM = 'album'
TRACKTITLE = 'tracktitle'
ARTIST = 'artist'
ARTISTS = 'artists'
ALBUMARTIST = 'albumartist'
GENRES = 'genres'
GENRE = 'genre'
ARTWORK = 'artwork'
TRACKS = 'tracks'
TRACK = 'track'
ITEMS = 'items'
NAME = 'name'
HREF = 'href'
ID = 'id'
URL = 'url'
RELEASE_DATE = 'release_date'
IMAGES = 'images'
LIMIT = 'limit'
OFFSET = 'offset'
AUTHORIZATION = 'Authorization'
IS_PLAYABLE = 'is_playable'
DURATION_MS = 'duration_ms'
TRACK_NUMBER = 'track_number'
DISC_NUMBER = 'disc_number'
SHOW = 'show'
ERROR = 'error'
EXPLICIT = 'explicit'
PLAYLIST = 'playlist'
PLAYLISTS = 'playlists'
OWNER = 'owner'
DISPLAY_NAME = 'display_name'
ALBUMS = 'albums'
TYPE = 'type'
PREMIUM = 'premium'
WIDTH = 'width'
USER_READ_EMAIL = 'user-read-email'
USER_FOLLOW_READ = 'user-follow-read'
PLAYLIST_READ_PRIVATE = 'playlist-read-private'
USER_LIBRARY_READ = 'user-library-read'
WINDOWS_SYSTEM = 'Windows'
LINUX_SYSTEM = 'Linux'
CODEC_MAP = {
'aac': 'aac',
'fdk_aac': 'libfdk_aac',
'm4a': 'aac',
'mp3': 'libmp3lame',
'ogg': 'copy',
'opus': 'libopus',
'vorbis': 'copy',
}
EXT_MAP = {
'aac': 'm4a',
'fdk_aac': 'm4a',
'm4a': 'm4a',
'mp3': 'mp3',
'ogg': 'ogg',
'opus': 'ogg',
'vorbis': 'ogg',
}

72
zotify/loader.py Normal file
View File

@@ -0,0 +1,72 @@
# load symbol from:
# https://stackoverflow.com/questions/22029562/python-how-to-make-simple-animated-loading-while-process-is-running
# imports
from itertools import cycle
from shutil import get_terminal_size
from threading import Thread
from time import sleep
from zotify.termoutput import Printer
class Loader:
"""Busy symbol.
Can be called inside a context:
with Loader("This take some Time..."):
# do something
pass
"""
def __init__(self, chan, desc="Loading...", end='', timeout=0.1, mode='prog'):
"""
A loader-like context manager
Args:
desc (str, optional): The loader's description. Defaults to "Loading...".
end (str, optional): Final print. Defaults to "".
timeout (float, optional): Sleep time between prints. Defaults to 0.1.
"""
self.desc = desc
self.end = end
self.timeout = timeout
self.channel = chan
self._thread = Thread(target=self._animate, daemon=True)
if mode == 'std1':
self.steps = ["", "", "", "", "", "", "", ""]
elif mode == 'std2':
self.steps = ["","","",""]
elif mode == 'std3':
self.steps = ["😐 ","😐 ","😮 ","😮 ","😦 ","😦 ","😧 ","😧 ","🤯 ","💥 ","","\u3000 ","\u3000 ","\u3000 "]
elif mode == 'prog':
self.steps = ["[∙∙∙]","[●∙∙]","[∙●∙]","[∙∙●]","[∙∙∙]"]
self.done = False
def start(self):
self._thread.start()
return self
def _animate(self):
for c in cycle(self.steps):
if self.done:
break
Printer.print_loader(self.channel, f"\r\t{c} {self.desc} ")
sleep(self.timeout)
def __enter__(self):
self.start()
def stop(self):
self.done = True
cols = get_terminal_size((80, 20)).columns
Printer.print_loader(self.channel, "\r" + " " * cols)
if self.end != "":
Printer.print_loader(self.channel, f"\r{self.end}")
def __exit__(self, exc_type, exc_value, tb):
# handle exceptions with those variables ^
self.stop()

83
zotify/playlist.py Normal file
View File

@@ -0,0 +1,83 @@
from zotify.const import ITEMS, ID, TRACK, NAME
from zotify.termoutput import Printer
from zotify.track import download_track
from zotify.utils import split_input
from zotify.zotify import Zotify
MY_PLAYLISTS_URL = 'https://api.spotify.com/v1/me/playlists'
PLAYLISTS_URL = 'https://api.spotify.com/v1/playlists'
def get_all_playlists():
""" Returns list of users playlists """
playlists = []
limit = 50
offset = 0
while True:
resp = Zotify.invoke_url_with_params(MY_PLAYLISTS_URL, limit=limit, offset=offset)
offset += limit
playlists.extend(resp[ITEMS])
if len(resp[ITEMS]) < limit:
break
return playlists
def get_playlist_songs(playlist_id):
""" returns list of songs in a playlist """
songs = []
offset = 0
limit = 100
while True:
resp = Zotify.invoke_url_with_params(f'{PLAYLISTS_URL}/{playlist_id}/tracks', limit=limit, offset=offset)
offset += limit
songs.extend(resp[ITEMS])
if len(resp[ITEMS]) < limit:
break
return songs
def get_playlist_info(playlist_id):
""" Returns information scraped from playlist """
(raw, resp) = Zotify.invoke_url(f'{PLAYLISTS_URL}/{playlist_id}?fields=name,owner(display_name)&market=from_token')
return resp['name'].strip(), resp['owner']['display_name'].strip()
def download_playlist(playlist):
"""Downloads all the songs from a playlist"""
playlist_songs = [song for song in get_playlist_songs(playlist[ID]) if song[TRACK] is not None and song[TRACK][ID]]
p_bar = Printer.progress(playlist_songs, unit='song', total=len(playlist_songs), unit_scale=True)
enum = 1
for song in p_bar:
download_track('extplaylist', song[TRACK][ID], extra_keys={'playlist': playlist[NAME], 'playlist_num': str(enum).zfill(2)}, disable_progressbar=True)
p_bar.set_description(song[TRACK][NAME])
enum += 1
def download_from_user_playlist():
""" Select which playlist(s) to download """
playlists = get_all_playlists()
count = 1
for playlist in playlists:
print(str(count) + ': ' + playlist[NAME].strip())
count += 1
selection = ''
print('\n> SELECT A PLAYLIST BY ID')
print('> SELECT A RANGE BY ADDING A DASH BETWEEN BOTH ID\'s')
print('> OR PARTICULAR OPTIONS BY ADDING A COMMA BETWEEN ID\'s\n')
while len(selection) == 0:
selection = str(input('ID(s): '))
playlist_choices = map(int, split_input(selection))
for playlist_number in playlist_choices:
playlist = playlists[playlist_number - 1]
print(f'Downloading {playlist[NAME].strip()}')
download_playlist(playlist)
print('\n**All playlists have been downloaded**\n')

138
zotify/podcast.py Normal file
View File

@@ -0,0 +1,138 @@
# import os
from pathlib import PurePath, Path
import time
from typing import Optional, Tuple
from librespot.metadata import EpisodeId
from zotify.const import ERROR, ID, ITEMS, NAME, SHOW, DURATION_MS
from zotify.termoutput import PrintChannel, Printer
from zotify.utils import create_download_directory, fix_filename
from zotify.zotify import Zotify
from zotify.loader import Loader
EPISODE_INFO_URL = 'https://api.spotify.com/v1/episodes'
SHOWS_URL = 'https://api.spotify.com/v1/shows'
def get_episode_info(episode_id_str) -> Tuple[Optional[str], Optional[str]]:
with Loader(PrintChannel.PROGRESS_INFO, "Fetching episode information..."):
(raw, info) = Zotify.invoke_url(f'{EPISODE_INFO_URL}/{episode_id_str}')
if not info:
Printer.print(PrintChannel.ERRORS, "### INVALID EPISODE ID ###")
duration_ms = info[DURATION_MS]
if ERROR in info:
return None, None
return fix_filename(info[SHOW][NAME]), duration_ms, fix_filename(info[NAME])
def get_show_episodes(show_id_str) -> list:
episodes = []
offset = 0
limit = 50
with Loader(PrintChannel.PROGRESS_INFO, "Fetching episodes..."):
while True:
resp = Zotify.invoke_url_with_params(
f'{SHOWS_URL}/{show_id_str}/episodes', limit=limit, offset=offset)
offset += limit
for episode in resp[ITEMS]:
episodes.append(episode[ID])
if len(resp[ITEMS]) < limit:
break
return episodes
def download_podcast_directly(url, filename):
import functools
import shutil
import requests
from tqdm.auto import tqdm
r = requests.get(url, stream=True, allow_redirects=True)
if r.status_code != 200:
r.raise_for_status() # Will only raise for 4xx codes, so...
raise RuntimeError(
f"Request to {url} returned status code {r.status_code}")
file_size = int(r.headers.get('Content-Length', 0))
path = Path(filename).expanduser().resolve()
path.parent.mkdir(parents=True, exist_ok=True)
desc = "(Unknown total file size)" if file_size == 0 else ""
r.raw.read = functools.partial(
r.raw.read, decode_content=True) # Decompress if needed
with tqdm.wrapattr(r.raw, "read", total=file_size, desc=desc) as r_raw:
with path.open("wb") as f:
shutil.copyfileobj(r_raw, f)
return path
def download_episode(episode_id) -> None:
podcast_name, duration_ms, episode_name = get_episode_info(episode_id)
extra_paths = podcast_name + '/'
prepare_download_loader = Loader(PrintChannel.PROGRESS_INFO, "Preparing download...")
prepare_download_loader.start()
if podcast_name is None:
Printer.print(PrintChannel.SKIPS, '### SKIPPING: (EPISODE NOT FOUND) ###')
prepare_download_loader.stop()
else:
filename = podcast_name + ' - ' + episode_name
resp = Zotify.invoke_url(
'https://api-partner.spotify.com/pathfinder/v1/query?operationName=getEpisode&variables={"uri":"spotify:episode:' + episode_id + '"}&extensions={"persistedQuery":{"version":1,"sha256Hash":"224ba0fd89fcfdfb3a15fa2d82a6112d3f4e2ac88fba5c6713de04d1b72cf482"}}')[1]["data"]["episode"]
direct_download_url = resp["audio"]["items"][-1]["url"]
download_directory = PurePath(Zotify.CONFIG.get_root_podcast_path()).joinpath(extra_paths)
# download_directory = os.path.realpath(download_directory)
create_download_directory(download_directory)
if "anon-podcast.scdn.co" in direct_download_url or "audio_preview_url" not in resp:
episode_id = EpisodeId.from_base62(episode_id)
stream = Zotify.get_content_stream(
episode_id, Zotify.DOWNLOAD_QUALITY)
total_size = stream.input_stream.size
filepath = PurePath(download_directory).joinpath(f"{filename}.ogg")
if (
Path(filepath).is_file()
and Path(filepath).stat().st_size == total_size
and Zotify.CONFIG.get_skip_existing()
):
Printer.print(PrintChannel.SKIPS, "\n### SKIPPING: " + podcast_name + " - " + episode_name + " (EPISODE ALREADY EXISTS) ###")
prepare_download_loader.stop()
return
prepare_download_loader.stop()
time_start = time.time()
downloaded = 0
with open(filepath, 'wb') as file, Printer.progress(
desc=filename,
total=total_size,
unit='B',
unit_scale=True,
unit_divisor=1024
) as p_bar:
prepare_download_loader.stop()
while True:
#for _ in range(int(total_size / Zotify.CONFIG.get_chunk_size()) + 2):
data = stream.input_stream.stream().read(Zotify.CONFIG.get_chunk_size())
p_bar.update(file.write(data))
downloaded += len(data)
if data == b'':
break
if Zotify.CONFIG.get_download_real_time():
delta_real = time.time() - time_start
delta_want = (downloaded / total_size) * (duration_ms/1000)
if delta_want > delta_real:
time.sleep(delta_want - delta_real)
else:
filepath = PurePath(download_directory).joinpath(f"{filename}.mp3")
download_podcast_directly(direct_download_url, filepath)
prepare_download_loader.stop()

41
zotify/termoutput.py Normal file
View File

@@ -0,0 +1,41 @@
import sys
from enum import Enum
from tqdm import tqdm
from zotify.config import *
from zotify.zotify import Zotify
class PrintChannel(Enum):
SPLASH = PRINT_SPLASH
SKIPS = PRINT_SKIPS
DOWNLOAD_PROGRESS = PRINT_DOWNLOAD_PROGRESS
ERRORS = PRINT_ERRORS
WARNINGS = PRINT_WARNINGS
DOWNLOADS = PRINT_DOWNLOADS
API_ERRORS = PRINT_API_ERRORS
PROGRESS_INFO = PRINT_PROGRESS_INFO
ERROR_CHANNEL = [PrintChannel.ERRORS, PrintChannel.API_ERRORS]
class Printer:
@staticmethod
def print(channel: PrintChannel, msg: str) -> None:
if Zotify.CONFIG.get(channel.value):
if channel in ERROR_CHANNEL:
print(msg, file=sys.stderr)
else:
print(msg)
@staticmethod
def print_loader(channel: PrintChannel, msg: str) -> None:
if Zotify.CONFIG.get(channel.value):
print(msg, flush=True, end="")
@staticmethod
def progress(iterable=None, desc=None, total=None, unit='it', disable=False, unit_scale=False, unit_divisor=1000):
if not Zotify.CONFIG.get(PrintChannel.DOWNLOAD_PROGRESS.value):
disable = True
return tqdm(iterable=iterable, desc=desc, total=total, disable=disable, unit=unit, unit_scale=unit_scale, unit_divisor=unit_divisor)

436
zotify/track.py Normal file
View File

@@ -0,0 +1,436 @@
from pathlib import Path, PurePath
import math
import re
import time
import uuid
from typing import Any, Tuple, List, Optional
from librespot.metadata import TrackId
import ffmpy
from zotify.const import TRACKS, ALBUM, GENRES, NAME, ITEMS, DISC_NUMBER, TRACK_NUMBER, IS_PLAYABLE, ARTISTS, IMAGES, URL, \
RELEASE_DATE, ID, TRACKS_URL, FOLLOWED_ARTISTS_URL, SAVED_TRACKS_URL, TRACK_STATS_URL, CODEC_MAP, EXT_MAP, DURATION_MS, \
HREF, ARTISTS, WIDTH
from zotify.termoutput import Printer, PrintChannel
from zotify.utils import fix_filename, set_audio_tags, set_music_thumbnail, create_download_directory, \
get_directory_song_ids, add_to_directory_song_ids, get_previously_downloaded, add_to_archive, fmt_seconds
from zotify.zotify import Zotify
import traceback
from zotify.loader import Loader
def get_saved_tracks() -> list:
""" Returns user's saved tracks """
songs = []
offset = 0
limit = 50
while True:
resp = Zotify.invoke_url_with_params(
SAVED_TRACKS_URL, limit=limit, offset=offset)
offset += limit
songs.extend(resp[ITEMS])
if len(resp[ITEMS]) < limit:
break
return songs
def get_followed_artists() -> list:
""" Returns user's followed artists """
artists = []
resp = Zotify.invoke_url(FOLLOWED_ARTISTS_URL)[1]
for artist in resp[ARTISTS][ITEMS]:
artists.append(artist[ID])
return artists
def get_song_info(song_id) -> Tuple[List[str], List[Any], str, str, Any, Any, Any, Any, Any, Any, int]:
""" Retrieves metadata for downloaded songs """
locale = Zotify.CONFIG.get_locale()
with Loader(PrintChannel.PROGRESS_INFO, "Fetching track information..."):
(raw, info) = Zotify.invoke_url(f'{TRACKS_URL}?ids={song_id}&market=from_token&locale={locale}')
if not TRACKS in info:
raise ValueError(f'Invalid response from TRACKS_URL:\n{raw}')
try:
artists = []
for data in info[TRACKS][0][ARTISTS]:
artists.append(data[NAME])
album_name = info[TRACKS][0][ALBUM][NAME]
name = info[TRACKS][0][NAME]
release_year = info[TRACKS][0][ALBUM][RELEASE_DATE].split('-')[0]
disc_number = info[TRACKS][0][DISC_NUMBER]
track_number = info[TRACKS][0][TRACK_NUMBER]
scraped_song_id = info[TRACKS][0][ID]
is_playable = info[TRACKS][0][IS_PLAYABLE]
duration_ms = info[TRACKS][0][DURATION_MS]
image = info[TRACKS][0][ALBUM][IMAGES][0]
for i in info[TRACKS][0][ALBUM][IMAGES]:
if i[WIDTH] > image[WIDTH]:
image = i
image_url = image[URL]
return artists, info[TRACKS][0][ARTISTS], album_name, name, image_url, release_year, disc_number, track_number, scraped_song_id, is_playable, duration_ms
except Exception as e:
raise ValueError(f'Failed to parse TRACKS_URL response: {str(e)}\n{raw}')
def get_song_genres(rawartists: List[str], track_name: str) -> List[str]:
if Zotify.CONFIG.get_save_genres():
try:
genres = []
for data in rawartists:
# query artist genres via href, which will be the api url
with Loader(PrintChannel.PROGRESS_INFO, "Fetching artist information..."):
(raw, artistInfo) = Zotify.invoke_url(f'{data[HREF]}')
if Zotify.CONFIG.get_all_genres() and len(artistInfo[GENRES]) > 0:
for genre in artistInfo[GENRES]:
genres.append(genre)
elif len(artistInfo[GENRES]) > 0:
genres.append(artistInfo[GENRES][0])
if len(genres) == 0:
Printer.print(PrintChannel.WARNINGS, '### No Genres found for song ' + track_name)
genres.append('')
return genres
except Exception as e:
raise ValueError(f'Failed to parse GENRES response: {str(e)}\n{raw}')
else:
return ['']
def get_song_lyrics(song_id: str, file_save: Optional[PurePath], title: Optional[str] = None, artists: Optional[List[str]] = None,
album: Optional[str] = None, duration_ms: Optional[int] = None, write_file: bool = True) -> List[str]:
"""Fetches lyrics from Spotify's color-lyrics API, writes an .lrc file, and returns the lyric lines.
Raises ValueError if lyrics are not available.
"""
# For lyrics, failures are expected for some tracks. Prefer expectFail=True to avoid noisy retries/logging,
# but fall back gracefully if the runtime Zotify.invoke_url doesn't support it (older versions).
url = f'https://spclient.wg.spotify.com/color-lyrics/v2/track/{song_id}'
try:
raw, lyrics = Zotify.invoke_url(url, expectFail=True)
except TypeError:
# Older environment without expectFail support
raw, lyrics = Zotify.invoke_url(url)
# Printer.print(PrintChannel.SKIPS, raw)
# Printer.print(PrintChannel.WARNINGS, lyrics)
if not lyrics or (isinstance(lyrics, dict) and 'error' in lyrics):
# Treat empty or errored JSON as lyrics not available
raise ValueError(f'Failed to fetch lyrics: {song_id}')
try:
formatted_lyrics = lyrics['lyrics']['lines']
except KeyError:
raise ValueError(f'Failed to fetch lyrics: {song_id}')
lines: List[str] = []
sync_type = lyrics['lyrics'].get('syncType')
# Optional LRC header
if Zotify.CONFIG.get_lyrics_md_header():
header = []
if title:
header.append(f"[ti: {title}]\n")
if artists and len(artists):
header.append(f"[ar: {artists[0]}]\n")
if album:
header.append(f"[al: {album}]\n")
if duration_ms is not None:
m = duration_ms // 60000
s = (duration_ms % 60000) // 1000
header.append(f"[length: {m}:{str(s).zfill(2)}]\n")
header.append("[by: The_Padoru_Companion]\n")
header.append("\n")
lines.extend(header)
if sync_type == "UNSYNCED":
for line in formatted_lyrics:
lines.append(line['words'] + '\n')
elif sync_type == "LINE_SYNCED":
for line in formatted_lyrics:
timestamp = int(line['startTimeMs'])
ts_minutes = str(math.floor(timestamp / 60000)).zfill(2)
ts_seconds = str(math.floor((timestamp % 60000) / 1000)).zfill(2)
ts_millis = str(math.floor(timestamp % 1000))[:2].zfill(2)
lines.append(f'[{ts_minutes}:{ts_seconds}.{ts_millis}]' + line['words'] + '\n')
else:
raise ValueError(f'Failed to fetch lyrics: {song_id}')
if write_file and file_save is not None:
Path(file_save).parent.mkdir(parents=True, exist_ok=True)
with open(file_save, 'w+', encoding='utf-8') as file:
file.writelines(lines)
return lines
def get_song_duration(song_id: str) -> float:
""" Retrieves duration of song in second as is on spotify """
(raw, resp) = Zotify.invoke_url(f'{TRACK_STATS_URL}{song_id}')
# get duration in miliseconds
ms_duration = resp['duration_ms']
# convert to seconds
duration = float(ms_duration)/1000
return duration
def download_track(mode: str, track_id: str, extra_keys=None, disable_progressbar=False) -> None:
""" Downloads raw song audio from Spotify """
if extra_keys is None:
extra_keys = {}
prepare_download_loader = Loader(PrintChannel.PROGRESS_INFO, "Preparing download...")
prepare_download_loader.start()
try:
output_template = Zotify.CONFIG.get_output(mode)
(artists, raw_artists, album_name, name, image_url, release_year, disc_number,
track_number, scraped_song_id, is_playable, duration_ms) = get_song_info(track_id)
song_name = fix_filename(artists[0]) + ' - ' + fix_filename(name)
for k in extra_keys:
output_template = output_template.replace("{"+k+"}", fix_filename(extra_keys[k]))
ext = EXT_MAP.get(Zotify.CONFIG.get_download_format().lower())
output_template = output_template.replace("{artist}", fix_filename(artists[0]))
output_template = output_template.replace("{album}", fix_filename(album_name))
output_template = output_template.replace("{song_name}", fix_filename(name))
output_template = output_template.replace("{release_year}", fix_filename(release_year))
output_template = output_template.replace("{disc_number}", fix_filename(disc_number))
output_template = output_template.replace("{track_number}", fix_filename(track_number))
output_template = output_template.replace("{id}", fix_filename(scraped_song_id))
output_template = output_template.replace("{track_id}", fix_filename(track_id))
output_template = output_template.replace("{ext}", ext)
filename = PurePath(Zotify.CONFIG.get_root_path()).joinpath(output_template)
filedir = PurePath(filename).parent
filename_temp = filename
if Zotify.CONFIG.get_temp_download_dir() != '':
filename_temp = PurePath(Zotify.CONFIG.get_temp_download_dir()).joinpath(f'zotify_{str(uuid.uuid4())}_{track_id}.{ext}')
check_name = Path(filename).is_file() and Path(filename).stat().st_size
check_id = scraped_song_id in get_directory_song_ids(filedir)
check_all_time = scraped_song_id in get_previously_downloaded()
# a song with the same name is installed
if not check_id and check_name:
c = len([file for file in Path(filedir).iterdir() if re.search(f'^{filename}_', str(file))]) + 1
stem = PurePath(filename).stem # correct base name without extension
ext = PurePath(filename).suffix
filename = PurePath(filedir).joinpath(f'{stem}_{c}{ext}')
except Exception as e:
Printer.print(PrintChannel.ERRORS, '### SKIPPING SONG - FAILED TO QUERY METADATA ###')
Printer.print(PrintChannel.ERRORS, 'Track_ID: ' + str(track_id))
for k in extra_keys:
Printer.print(PrintChannel.ERRORS, k + ': ' + str(extra_keys[k]))
Printer.print(PrintChannel.ERRORS, "\n")
Printer.print(PrintChannel.ERRORS, str(e) + "\n")
Printer.print(PrintChannel.ERRORS, "".join(traceback.TracebackException.from_exception(e).format()) + "\n")
Printer.print(PrintChannel.PROGRESS_INFO, "Waiting to query Spotify API again.." + "\n")
time.sleep(10)
return download_track(mode, track_id, extra_keys)
else:
try:
if not is_playable:
prepare_download_loader.stop()
Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG IS UNAVAILABLE) ###' + "\n")
else:
if check_name and Zotify.CONFIG.get_skip_existing():
prepare_download_loader.stop()
Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG ALREADY EXISTS) ###' + "\n")
if Zotify.CONFIG.get_always_check_lyrics() and Zotify.CONFIG.get_download_lyrics():
try:
lyr_dir = Zotify.CONFIG.get_lyrics_location() or PurePath(filename).parent
lyr_name_tpl = Zotify.CONFIG.get_lyrics_filename()
lyr_name = lyr_name_tpl
lyr_name = lyr_name.replace('{artist}', fix_filename(artists[0]))
lyr_name = lyr_name.replace('{song_name}', fix_filename(name))
lyr_name = lyr_name.replace('{album}', fix_filename(album_name))
if Zotify.CONFIG.get_unique_lyrics_file():
lrc_path = PurePath(lyr_dir).joinpath("lyrics.lrc")
else:
lrc_path = PurePath(lyr_dir).joinpath(f"{lyr_name}.lrc")
get_song_lyrics(track_id, lrc_path, title=name, artists=artists, album=album_name, duration_ms=duration_ms)
except ValueError:
Printer.print(PrintChannel.SKIPS, f"### LYRICS_UNAVAILABLE: Lyrics for {song_name} not available or API returned empty/errored response ###")
elif check_all_time and Zotify.CONFIG.get_skip_previously_downloaded():
prepare_download_loader.stop()
Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG ALREADY DOWNLOADED ONCE) ###' + "\n")
if Zotify.CONFIG.get_always_check_lyrics() and Zotify.CONFIG.get_download_lyrics():
try:
lyr_dir = Zotify.CONFIG.get_lyrics_location() or PurePath(filename).parent
lyr_name_tpl = Zotify.CONFIG.get_lyrics_filename()
lyr_name = lyr_name_tpl
lyr_name = lyr_name.replace('{artist}', fix_filename(artists[0]))
lyr_name = lyr_name.replace('{song_name}', fix_filename(name))
lyr_name = lyr_name.replace('{album}', fix_filename(album_name))
if Zotify.CONFIG.get_unique_lyrics_file():
lrc_path = PurePath(lyr_dir).joinpath("lyrics.lrc")
else:
lrc_path = PurePath(lyr_dir).joinpath(f"{lyr_name}.lrc")
get_song_lyrics(track_id, lrc_path, title=name, artists=artists, album=album_name, duration_ms=duration_ms)
except ValueError:
Printer.print(PrintChannel.SKIPS, f"### LYRICS_UNAVAILABLE: Lyrics for {song_name} not available or API returned empty/errored response ###")
else:
Printer.print(PrintChannel.PROGRESS_INFO, '\n### STARTING "' + song_name + '" ###' + "\n")
if ext == 'ogg':
Printer.print(PrintChannel.PROGRESS_INFO, '\n## Attente de 5 secondes avant reprise... ##')
time.sleep(5);
if track_id != scraped_song_id:
track_id = scraped_song_id
track = TrackId.from_base62(track_id)
stream = Zotify.get_content_stream(track, Zotify.DOWNLOAD_QUALITY)
create_download_directory(filedir)
total_size = stream.input_stream.size
prepare_download_loader.stop()
time_start = time.time()
downloaded = 0
with open(filename_temp, 'wb') as file, Printer.progress(
desc=song_name,
total=total_size,
unit='B',
unit_scale=True,
unit_divisor=1024,
disable=disable_progressbar
) as p_bar:
b = 0
while b < 5:
#for _ in range(int(total_size / Zotify.CONFIG.get_chunk_size()) + 2):
data = stream.input_stream.stream().read(Zotify.CONFIG.get_chunk_size())
p_bar.update(file.write(data))
downloaded += len(data)
b += 1 if data == b'' else 0
if Zotify.CONFIG.get_download_real_time():
delta_real = time.time() - time_start
delta_want = (downloaded / total_size) * (duration_ms/1000)
if delta_want > delta_real:
time.sleep(delta_want - delta_real)
time_downloaded = time.time()
genres = get_song_genres(raw_artists, name)
lyrics_lines: Optional[List[str]] = None
try:
if Zotify.CONFIG.get_download_lyrics():
# Build LRC path based on config
lyr_dir = Zotify.CONFIG.get_lyrics_location() or PurePath(filename).parent
lyr_name_tpl = Zotify.CONFIG.get_lyrics_filename()
lyr_name = lyr_name_tpl
lyr_name = lyr_name.replace('{artist}', fix_filename(artists[0]))
lyr_name = lyr_name.replace('{song_name}', fix_filename(name))
lyr_name = lyr_name.replace('{album}', fix_filename(album_name))
if Zotify.CONFIG.get_unique_lyrics_file():
lrc_path = PurePath(lyr_dir).joinpath("lyrics.lrc")
else:
lrc_path = PurePath(lyr_dir).joinpath(f"{lyr_name}.lrc")
lyrics_lines = get_song_lyrics(
track_id, lrc_path, title=name, artists=artists, album=album_name, duration_ms=duration_ms, write_file=True
)
else:
# Fetch lyrics for embedding only; do not write an .lrc file
lyrics_lines = get_song_lyrics(
track_id, None, title=name, artists=artists, album=album_name, duration_ms=duration_ms, write_file=False
)
except ValueError:
Printer.print(PrintChannel.SKIPS, f"### LYRICS_UNAVAILABLE: Lyrics for {song_name} not available or API returned empty/errored response ###")
convert_audio_format(filename_temp)
try:
set_audio_tags(filename_temp, artists, genres, name, album_name, release_year, disc_number, track_number,
lyrics_lines)
set_music_thumbnail(filename_temp, image_url)
except Exception:
Printer.print(PrintChannel.ERRORS, "Unable to write metadata, ensure ffmpeg is installed and added to your PATH.")
if filename_temp != filename:
Path(filename_temp).rename(filename)
time_finished = time.time()
Printer.print(PrintChannel.DOWNLOADS, f'### Downloaded "{song_name}" to "{Path(filename).relative_to(Zotify.CONFIG.get_root_path())}" in {fmt_seconds(time_downloaded - time_start)} (plus {fmt_seconds(time_finished - time_downloaded)} converting) ###' + "\n")
# add song id to archive file
if Zotify.CONFIG.get_skip_previously_downloaded():
add_to_archive(scraped_song_id, PurePath(filename).name, artists[0], name)
# add song id to download directory's .song_ids file
if not check_id:
add_to_directory_song_ids(filedir, scraped_song_id, PurePath(filename).name, artists[0], name)
if Zotify.CONFIG.get_bulk_wait_time():
time.sleep(Zotify.CONFIG.get_bulk_wait_time())
except Exception as e:
Printer.print(PrintChannel.ERRORS, '### SKIPPING: ' + song_name + ' (GENERAL DOWNLOAD ERROR) ###')
Printer.print(PrintChannel.ERRORS, 'Track_ID: ' + str(track_id))
for k in extra_keys:
Printer.print(PrintChannel.ERRORS, k + ': ' + str(extra_keys[k]))
Printer.print(PrintChannel.ERRORS, "\n")
Printer.print(PrintChannel.ERRORS, str(e) + "\n")
Printer.print(PrintChannel.ERRORS, "".join(traceback.TracebackException.from_exception(e).format()) + "\n")
if Path(filename_temp).exists():
Path(filename_temp).unlink()
prepare_download_loader.stop()
def convert_audio_format(filename) -> None:
""" Converts raw audio into playable file """
temp_filename = f'{PurePath(filename).parent}.tmp'
Path(filename).replace(temp_filename)
download_format = Zotify.CONFIG.get_download_format().lower()
file_codec = CODEC_MAP.get(download_format, 'copy')
if file_codec != 'copy':
bitrate = Zotify.CONFIG.get_transcode_bitrate()
bitrates = {
'auto': '320k' if Zotify.check_premium() else '160k',
'normal': '96k',
'high': '160k',
'very_high': '320k'
}
bitrate = bitrates[Zotify.CONFIG.get_download_quality()]
else:
bitrate = None
output_params = ['-c:a', file_codec]
if bitrate:
output_params += ['-b:a', bitrate]
try:
ff_m = ffmpy.FFmpeg(
global_options=['-y', '-hide_banner', '-loglevel error'],
inputs={temp_filename: None},
outputs={filename: output_params}
)
with Loader(PrintChannel.PROGRESS_INFO, "Converting file..."):
ff_m.run()
if Path(temp_filename).exists():
Path(temp_filename).unlink()
except ffmpy.FFExecutableNotFoundError:
Printer.print(PrintChannel.WARNINGS, f'### SKIPPING {file_codec.upper()} CONVERSION - FFMPEG NOT FOUND ###')

320
zotify/utils.py Normal file
View File

@@ -0,0 +1,320 @@
import datetime
import math
import os
import platform
import re
import subprocess
from enum import Enum
from pathlib import Path, PurePath
from typing import List, Tuple, Optional
from mutagen.id3 import ID3, TYER
import music_tag
import requests
from zotify.const import ARTIST, GENRE, TRACKTITLE, ALBUM, YEAR, DISCNUMBER, TRACKNUMBER, ARTWORK, \
WINDOWS_SYSTEM, ALBUMARTIST
from zotify.zotify import Zotify
class MusicFormat(str, Enum):
MP3 = 'mp3',
OGG = 'ogg',
def create_download_directory(download_path: str) -> None:
""" Create directory and add a hidden file with song ids """
Path(download_path).mkdir(parents=True, exist_ok=True)
# add hidden file with song ids
hidden_file_path = PurePath(download_path).joinpath('.song_ids')
if not Path(hidden_file_path).is_file():
with open(hidden_file_path, 'w', encoding='utf-8') as f:
pass
def get_previously_downloaded() -> List[str]:
""" Returns list of all time downloaded songs """
ids = []
archive_path = Zotify.CONFIG.get_song_archive()
if Path(archive_path).exists():
with open(archive_path, 'r', encoding='utf-8') as f:
ids = [line.strip().split('\t')[0] for line in f.readlines()]
return ids
def add_to_archive(song_id: str, filename: str, author_name: str, song_name: str) -> None:
""" Adds song id to all time installed songs archive """
archive_path = Zotify.CONFIG.get_song_archive()
if Path(archive_path).exists():
with open(archive_path, 'a', encoding='utf-8') as file:
file.write(f'{song_id}\t{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}\t{author_name}\t{song_name}\t{filename}\n')
else:
with open(archive_path, 'w', encoding='utf-8') as file:
file.write(f'{song_id}\t{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}\t{author_name}\t{song_name}\t{filename}\n')
def get_directory_song_ids(download_path: str) -> List[str]:
""" Gets song ids of songs in directory """
song_ids = []
hidden_file_path = PurePath(download_path).joinpath('.song_ids')
if Path(hidden_file_path).is_file():
with open(hidden_file_path, 'r', encoding='utf-8') as file:
song_ids.extend([line.strip().split('\t')[0] for line in file.readlines()])
return song_ids
def add_to_directory_song_ids(download_path: str, song_id: str, filename: str, author_name: str, song_name: str) -> None:
""" Appends song_id to .song_ids file in directory """
hidden_file_path = PurePath(download_path).joinpath('.song_ids')
# not checking if file exists because we need an exception
# to be raised if something is wrong
with open(hidden_file_path, 'a', encoding='utf-8') as file:
file.write(f'{song_id}\t{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}\t{author_name}\t{song_name}\t{filename}\n')
def get_downloaded_song_duration(filename: str) -> float:
""" Returns the downloaded file's duration in seconds """
command = ['ffprobe', '-show_entries', 'format=duration', '-i', f'{filename}']
output = subprocess.run(command, capture_output=True)
duration = re.search(r'[\D]=([\d\.]*)', str(output.stdout)).groups()[0]
duration = float(duration)
return duration
def split_input(selection) -> List[str]:
""" Returns a list of inputted strings """
inputs = []
if '-' in selection:
for number in range(int(selection.split('-')[0]), int(selection.split('-')[1]) + 1):
inputs.append(number)
else:
selections = selection.split(',')
for i in selections:
inputs.append(i.strip())
return inputs
def splash() -> str:
""" Displays splash screen """
return """
███████╗ ██████╗ ████████╗██╗███████╗██╗ ██╗
╚══███╔╝██╔═══██╗╚══██╔══╝██║██╔════╝╚██╗ ██╔╝
███╔╝ ██║ ██║ ██║ ██║█████╗ ╚████╔╝
███╔╝ ██║ ██║ ██║ ██║██╔══╝ ╚██╔╝
███████╗╚██████╔╝ ██║ ██║██║ ██║
╚══════╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═╝
"""
def clear() -> None:
""" Clear the console window """
if platform.system() == WINDOWS_SYSTEM:
os.system('cls')
else:
os.system('clear')
def set_audio_tags(filename, artists, genres, name, album_name, release_year, disc_number, track_number, lyrics: Optional[List[str]] = None) -> None:
""" sets music_tag metadata """
tags = music_tag.load_file(filename)
tags[ALBUMARTIST] = artists[0]
tags[ARTIST] = conv_artist_format(artists)
tags[GENRE] = genres[0] if not Zotify.CONFIG.get_all_genres() else Zotify.CONFIG.get_all_genres_delimiter().join(genres)
tags[TRACKTITLE] = name
tags[ALBUM] = album_name
tags[YEAR] = release_year
# Also set 'date' for broader player compatibility (maps to TDRC in ID3v2.4)
try:
tags['date'] = release_year
except Exception:
pass
tags[DISCNUMBER] = disc_number
tags[TRACKNUMBER] = track_number
# Always save lyrics into tags when provided (file creation is controlled separately)
try:
if lyrics:
tags['lyrics'] = ''.join(lyrics)
except Exception:
# Non-fatal: some formats may not support lyrics tag via music_tag
pass
tags.save()
# Ensure MP3 files end with ID3v2.3 and explicit TYER set so YEAR doesn't disappear
try:
if str(filename).lower().endswith('.mp3'):
audio = ID3(filename)
if release_year:
try:
audio.add(TYER(encoding=3, text=release_year))
except Exception:
pass
audio.save(v2_version=3)
except Exception:
# Non-fatal: ignore if not applicable
pass
def conv_artist_format(artists) -> str:
""" Returns converted artist format """
return ', '.join(artists)
def set_music_thumbnail(filename, image_url) -> None:
""" Downloads cover artwork """
img = requests.get(image_url).content
tags = music_tag.load_file(filename)
tags[ARTWORK] = img
tags.save()
# Ensure final MP3 tags are saved as ID3v2.3 after artwork write
try:
if str(filename).lower().endswith('.mp3'):
audio = ID3(filename)
audio.save(v2_version=3)
except Exception:
# Non-fatal: ignore if not applicable to this format
pass
def regex_input_for_urls(search_input) -> Tuple[str, str, str, str, str, str]:
""" Since many kinds of search may be passed at the command line, process them all here. """
track_uri_search = re.search(
r'^spotify:track:(?P<TrackID>[0-9a-zA-Z]{22})$', search_input)
track_url_search = re.search(
r'^(https?://)?open\.spotify\.com/track/(?P<TrackID>[0-9a-zA-Z]{22})(\?si=.+?)?$',
search_input,
)
album_uri_search = re.search(
r'^spotify:album:(?P<AlbumID>[0-9a-zA-Z]{22})$', search_input)
album_url_search = re.search(
r'^(https?://)?open\.spotify\.com/album/(?P<AlbumID>[0-9a-zA-Z]{22})(\?si=.+?)?$',
search_input,
)
playlist_uri_search = re.search(
r'^spotify:playlist:(?P<PlaylistID>[0-9a-zA-Z]{22})$', search_input)
playlist_url_search = re.search(
r'^(https?://)?open\.spotify\.com/playlist/(?P<PlaylistID>[0-9a-zA-Z]{22})(\?si=.+?)?$',
search_input,
)
episode_uri_search = re.search(
r'^spotify:episode:(?P<EpisodeID>[0-9a-zA-Z]{22})$', search_input)
episode_url_search = re.search(
r'^(https?://)?open\.spotify\.com/episode/(?P<EpisodeID>[0-9a-zA-Z]{22})(\?si=.+?)?$',
search_input,
)
show_uri_search = re.search(
r'^spotify:show:(?P<ShowID>[0-9a-zA-Z]{22})$', search_input)
show_url_search = re.search(
r'^(https?://)?open\.spotify\.com/show/(?P<ShowID>[0-9a-zA-Z]{22})(\?si=.+?)?$',
search_input,
)
artist_uri_search = re.search(
r'^spotify:artist:(?P<ArtistID>[0-9a-zA-Z]{22})$', search_input)
artist_url_search = re.search(
r'^(https?://)?open\.spotify\.com/artist/(?P<ArtistID>[0-9a-zA-Z]{22})(\?si=.+?)?$',
search_input,
)
if track_uri_search is not None or track_url_search is not None:
track_id_str = (track_uri_search
if track_uri_search is not None else
track_url_search).group('TrackID')
else:
track_id_str = None
if album_uri_search is not None or album_url_search is not None:
album_id_str = (album_uri_search
if album_uri_search is not None else
album_url_search).group('AlbumID')
else:
album_id_str = None
if playlist_uri_search is not None or playlist_url_search is not None:
playlist_id_str = (playlist_uri_search
if playlist_uri_search is not None else
playlist_url_search).group('PlaylistID')
else:
playlist_id_str = None
if episode_uri_search is not None or episode_url_search is not None:
episode_id_str = (episode_uri_search
if episode_uri_search is not None else
episode_url_search).group('EpisodeID')
else:
episode_id_str = None
if show_uri_search is not None or show_url_search is not None:
show_id_str = (show_uri_search
if show_uri_search is not None else
show_url_search).group('ShowID')
else:
show_id_str = None
if artist_uri_search is not None or artist_url_search is not None:
artist_id_str = (artist_uri_search
if artist_uri_search is not None else
artist_url_search).group('ArtistID')
else:
artist_id_str = None
return track_id_str, album_id_str, playlist_id_str, episode_id_str, show_id_str, artist_id_str
def fix_filename(name):
"""
Replace invalid characters on Linux/Windows/MacOS with underscores.
List from https://stackoverflow.com/a/31976060/819417
Trailing spaces & periods are ignored on Windows.
>>> fix_filename(" COM1 ")
'_ COM1 _'
>>> fix_filename("COM10")
'COM10'
>>> fix_filename("COM1,")
'COM1,'
>>> fix_filename("COM1.txt")
'_.txt'
>>> all('_' == fix_filename(chr(i)) for i in list(range(32)))
True
"""
return re.sub(r'[/\\:|<>"?*\0-\x1f]|^(AUX|COM[1-9]|CON|LPT[1-9]|NUL|PRN)(?![^.])|^\s|[\s.]$', "_", str(name), flags=re.IGNORECASE)
def fmt_seconds(secs: float) -> str:
val = math.floor(secs)
s = math.floor(val % 60)
val -= s
val /= 60
m = math.floor(val % 60)
val -= m
val /= 60
h = math.floor(val)
if h == 0 and m == 0 and s == 0:
return "0s"
elif h == 0 and m == 0:
return f'{s}s'.zfill(2)
elif h == 0:
return f'{m}'.zfill(2) + ':' + f'{s}'.zfill(2)
else:
return f'{h}'.zfill(2) + ':' + f'{m}'.zfill(2) + ':' + f'{s}'.zfill(2)

113
zotify/zotify.py Normal file
View File

@@ -0,0 +1,113 @@
import json
from pathlib import Path
from pwinput import pwinput
import time
import requests
from librespot.audio.decoders import VorbisOnlyAudioQuality
from librespot.core import Session
from zotify.const import TYPE, \
PREMIUM, USER_READ_EMAIL, OFFSET, LIMIT, \
PLAYLIST_READ_PRIVATE, USER_LIBRARY_READ, USER_FOLLOW_READ
from zotify.config import Config
class Zotify:
SESSION: Session = None
DOWNLOAD_QUALITY = None
CONFIG: Config = Config()
def __init__(self, args):
Zotify.CONFIG.load(args)
Zotify.login(args)
@classmethod
def login(cls, args):
""" Authenticates with Spotify and saves credentials to a file """
cred_location = Config.get_credentials_location()
if Path(cred_location).is_file():
try:
conf = Session.Configuration.Builder().set_store_credentials(False).build()
cls.SESSION = Session.Builder(conf).stored_file(cred_location).create()
return
except RuntimeError:
pass
while True:
user_name = args.username if args.username else ''
while len(user_name) == 0:
user_name = input('Username: ')
password = args.password if args.password else pwinput(prompt='Password: ', mask='*')
try:
if Config.get_save_credentials():
conf = Session.Configuration.Builder().set_stored_credential_file(cred_location).build()
else:
conf = Session.Configuration.Builder().set_store_credentials(False).build()
cls.SESSION = Session.Builder(conf).user_pass(user_name, password).create()
return
except RuntimeError:
pass
@classmethod
def get_content_stream(cls, content_id, quality):
return cls.SESSION.content_feeder().load(content_id, VorbisOnlyAudioQuality(quality), False, None)
@classmethod
def __get_auth_token(cls):
return cls.SESSION.tokens().get_token(
USER_READ_EMAIL, PLAYLIST_READ_PRIVATE, USER_LIBRARY_READ, USER_FOLLOW_READ
).access_token
@classmethod
def get_auth_header(cls):
return {
'Authorization': f'Bearer {cls.__get_auth_token()}',
'Accept-Language': f'{cls.CONFIG.get_language()}',
'Accept': 'application/json',
'app-platform': 'WebPlayer',
# Some Spotify spclient endpoints (e.g., color-lyrics) require a browser-like UA
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv=136.0) Gecko/20100101 Firefox/136.0',
}
@classmethod
def get_auth_header_and_params(cls, limit, offset):
return {
'Authorization': f'Bearer {cls.__get_auth_token()}',
'Accept-Language': f'{cls.CONFIG.get_language()}',
'Accept': 'application/json',
'app-platform': 'WebPlayer',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv=136.0) Gecko/20100101 Firefox/136.0',
}, {LIMIT: limit, OFFSET: offset}
@classmethod
def invoke_url_with_params(cls, url, limit, offset, **kwargs):
headers, params = cls.get_auth_header_and_params(limit=limit, offset=offset)
params.update(kwargs)
return requests.get(url, headers=headers, params=params).json()
@classmethod
def invoke_url(cls, url, tryCount=0):
# we need to import that here, otherwise we will get circular imports!
from zotify.termoutput import Printer, PrintChannel
headers = cls.get_auth_header()
response = requests.get(url, headers=headers)
responsetext = response.text
try:
responsejson = response.json()
except json.decoder.JSONDecodeError:
responsejson = {"error": {"status": "unknown", "message": "received an empty response"}}
if not responsejson or 'error' in responsejson:
if tryCount < (cls.CONFIG.get_retry_attempts() - 1):
Printer.print(PrintChannel.WARNINGS, f"Spotify API Error (try {tryCount + 1}) ({responsejson['error']['status']}): {responsejson['error']['message']}")
time.sleep(5)
return cls.invoke_url(url, tryCount + 1)
Printer.print(PrintChannel.API_ERRORS, f"Spotify API Error ({responsejson['error']['status']}): {responsejson['error']['message']}")
return responsetext, responsejson
@classmethod
def check_premium(cls) -> bool:
""" If user has spotify premium return true """
return (cls.SESSION.get_user_attribute(TYPE) == PREMIUM)