Prepare V0.2

This commit is contained in:
unknown
2025-12-19 00:06:38 +01:00
parent 0073331919
commit 757b0e0e98
10 changed files with 137 additions and 3 deletions

View File

@@ -322,10 +322,83 @@ class AudioKeyManager(PacketsReceiver, Closeable):
"Couldn't handle packet, cmd: {}, length: {}".format(
packet.cmd, len(packet.payload)))
def _is_premium_user(self) -> bool:
"""Best-effort check whether the current Spotify account is Premium.
We rely on Session user attributes populated from the AP product_info packet.
Historically, the attribute named "type" contains values like "premium", "free",
"open", etc.
"""
try:
raw = (
self.__session.get_user_attribute("type")
or self.__session.get_user_attribute("product")
or self.__session.get_user_attribute("product_type")
or self.__session.get_user_attribute("subscription")
or self.__session.get_user_attribute("account_type")
or ""
)
product = str(raw).strip().lower()
if not product:
return False
if "premium" in product:
return True
# Legacy/alt plan names occasionally seen.
return product in {"unlimited"}
except Exception:
return False
def _get_spotify_audio_key(self, gid: bytes, file_id: bytes, retry: bool = True) -> bytes:
"""Request the audio key directly from Spotify (Premium accounts)."""
tries = 0
last_err: typing.Optional[Exception] = None
while True:
tries += 1
callback = AudioKeyManager.SyncCallback(self)
with self.__seq_holder_lock:
AudioKeyManager.__seq_holder += 1
seq = AudioKeyManager.__seq_holder
AudioKeyManager.__callbacks[seq] = callback
try:
payload = struct.pack(">i", seq) + gid + file_id
self.__session.send(Packet.Type.request_key, payload)
key = callback.wait_response()
if key is None:
raise RuntimeError("Audio key request failed")
return key
except Exception as exc: # noqa: BLE001
last_err = exc
self.logger.warning("Spotify audio key request failed (try %d): %s", tries, exc)
if not retry or tries >= 3:
break
time.sleep(1)
finally:
try:
AudioKeyManager.__callbacks.pop(seq, None)
except Exception:
pass
raise RuntimeError(
"Failed fetching Audio Key from Spotify for gid: {}, fileId: {} (last error: {})".format(
util.bytes_to_hex(gid), util.bytes_to_hex(file_id), last_err
)
)
def get_audio_key(self,
gid: bytes,
file_id: bytes,
retry: bool = True) -> bytes:
# If the user is Premium, Spotify will return audio keys directly.
# In that case, do not use the SpotiClub API.
if self._is_premium_user():
return self._get_spotify_audio_key(gid, file_id, retry=retry)
global spoticlub_user, spoticlub_password, spoticlub_client_serial, spoticlub_loaded_logged
if not spoticlub_user or not spoticlub_password or spoticlub_user == "anonymous":
try:

Binary file not shown.

Binary file not shown.

View File

@@ -78,8 +78,8 @@ def download_album(album):
track[ID],
extra_keys={
'album_num': str(n).zfill(2),
'album_total': str(len(tracks)),
'album_id': album,
# Used by download_track() to decide whether to insert a Disc folder.
'album_multi_disc': '1' if album_multi_disc else '0',
},
disable_progressbar=True

View File

@@ -118,6 +118,7 @@ def download_from_urls(urls: list[str]) -> bool:
'playlist_song_name': track_obj[NAME],
'playlist': name,
'playlist_num': str(enum).zfill(char_num),
'playlist_total': str(len(playlist_songs)),
'playlist_id': playlist_id,
'playlist_track_id': track_obj[ID]
})

View File

@@ -67,7 +67,12 @@ def download_playlist(playlist):
download_track(
'extplaylist',
song[TRACK][ID],
extra_keys={'playlist': pl_name, 'playlist_num': str(enum).zfill(2)},
extra_keys={
'playlist': pl_name,
'playlist_num': str(enum).zfill(2),
'playlist_total': str(len(playlist_songs)),
'playlist_id': playlist[ID],
},
disable_progressbar=True
)
p_bar.set_description(song[TRACK][NAME])

View File

@@ -372,7 +372,62 @@ def download_track(mode: str, track_id: str, extra_keys=None, disable_progressba
Printer.print(PrintChannel.SKIPS, f"### LYRICS_UNAVAILABLE: Lyrics for {song_name} not available or API returned empty/errored response ###")
else:
Printer.print(PrintChannel.PROGRESS_INFO, '\n### STARTING "' + song_name + '" ###' + "\n")
prog_prefix = ''
if mode == 'album':
cur = extra_keys.get('album_num')
total = extra_keys.get('album_total')
if cur and not total:
# No info about total tracks? Let's query the album from Spotify's API in last resort
try:
album_id = extra_keys.get('album_id')
if album_id:
locale = Zotify.CONFIG.get_locale()
resp = Zotify.invoke_url_with_params(
f'https://api.spotify.com/v1/albums/{album_id}/tracks',
limit=1,
offset=0,
market='from_token',
locale=locale,
)
total_val = resp.get('total') if isinstance(resp, dict) else None
if total_val is not None:
total = str(total_val)
except Exception:
total = total
if cur and total:
prog_prefix = f'({cur}/{total}) '
elif mode in ('playlist', 'extplaylist'):
cur = extra_keys.get('playlist_num')
total = extra_keys.get('playlist_total')
if cur and not total:
# Same fallback for total tracks in playlist
try:
playlist_id = extra_keys.get('playlist_id')
if playlist_id:
locale = Zotify.CONFIG.get_locale()
resp = Zotify.invoke_url_with_params(
f'https://api.spotify.com/v1/playlists/{playlist_id}/tracks',
limit=1,
offset=0,
market='from_token',
locale=locale,
)
total_val = resp.get('total') if isinstance(resp, dict) else None
if total_val is not None:
total = str(total_val)
except Exception:
total = total
if cur and total:
prog_prefix = f'({cur}/{total}) '
Printer.print(
PrintChannel.PROGRESS_INFO,
f'\n### {prog_prefix}STARTING "{song_name}" ###\n'
)
if ext == 'ogg':
# SpotiClub : TEMP? : For albums/playlists, wait 5 seconds between OGG tracks to avoid
# spamming the SpotiClub API for audio keys.