1105 lines
45 KiB
Python
1105 lines
45 KiB
Python
from __future__ import annotations
|
|
from librespot import util
|
|
from librespot.audio.decrypt import AesAudioDecrypt
|
|
from librespot.audio.format import SuperAudioFormat
|
|
from librespot.audio.storage import ChannelManager
|
|
from librespot.cache import CacheManager
|
|
from librespot.crypto import Packet
|
|
from librespot.metadata import EpisodeId, PlayableId, TrackId
|
|
from librespot.proto import Metadata_pb2 as Metadata, StorageResolve_pb2 as StorageResolve
|
|
from librespot.structure import GeneralAudioStream, AudioDecrypt, AudioQualityPicker, Closeable, FeederException, GeneralAudioStream, GeneralWritableStream, HaltListener, NoopAudioDecrypt, PacketsReceiver
|
|
from pathlib import Path
|
|
import concurrent.futures
|
|
import io
|
|
import logging
|
|
import math
|
|
import queue
|
|
import random
|
|
import struct
|
|
import threading
|
|
import time
|
|
import typing
|
|
import urllib.parse
|
|
import os
|
|
import json
|
|
import requests
|
|
if typing.TYPE_CHECKING:
|
|
from librespot.core import Session
|
|
|
|
"""
|
|
PATCH : SpotiClub Audio Key Fetching (v0.2.0)
|
|
Fetches the audio decryption key from the SpotiClub Audio Key API instead of Spotify directly.
|
|
This is a workaround for Spotify's tightened restrictions on Audio Key access (they allow only Premium Tier now).
|
|
|
|
If you are using our fork, there is no reason for you to complete this section, as upon first run, Zotify will ask you for the logins and save them for future use.
|
|
But if needed somehow or by using this single patch file, there are 3 importants parameters to provide, and one is already filled in:
|
|
- server_url: The URL of the SpotiClub Audio Key API endpoint. You should not need to change this, except if a dev instructs you to do so.
|
|
- spoticlub_user : Your SpotiClub FTP username. You can get this by using our Padoru Asssistant once.
|
|
- spoticlub_password : Your SpotiClub FTP password, also obtainable via the Padoru Assistant.
|
|
|
|
Using the fork's assistant is the recommended way to get register your credentials, as overwriting this file during the beta phase will need you to put them here over and over again.
|
|
"""
|
|
##### WRITE YOUR LOGINS DOWN HERE #####
|
|
#######################################
|
|
server_url = "http://api.spoticlub.zip:4277/get_audio_key"
|
|
spoticlub_user = "anonymous"
|
|
spoticlub_password = "IfWeFeelLikeEnablingThis"
|
|
########################################
|
|
##### END OF USER INPUT AREA ###########
|
|
|
|
### SPOTICLUB CLIENT SERIAL TRACKING (DO NOT EDIT) ###
|
|
spoticlub_client_serial: typing.Optional[str] = None
|
|
spoticlub_loaded_logged: bool = False
|
|
########################################
|
|
|
|
class LoadedStream(GeneralAudioStream):
|
|
def __init__(self, data: bytes):
|
|
super().__init__()
|
|
self._buffer = io.BytesIO(data)
|
|
|
|
def read(self, n: int = -1) -> bytes:
|
|
return self._buffer.read(n)
|
|
|
|
def close(self) -> None:
|
|
self._buffer.close()
|
|
|
|
class AbsChunkedInputStream(io.BytesIO, HaltListener):
|
|
chunk_exception = None
|
|
closed = False
|
|
max_chunk_tries = 128
|
|
preload_ahead = 3
|
|
preload_chunk_retries = 2
|
|
retries: typing.List[int]
|
|
retry_on_chunk_error: bool
|
|
wait_lock: threading.Condition = threading.Condition()
|
|
wait_for_chunk = -1
|
|
__decoded_length = 0
|
|
__mark = 0
|
|
__pos = 0
|
|
|
|
def __init__(self, retry_on_chunk_error: bool):
|
|
super().__init__()
|
|
self.retries = [0] * self.chunks()
|
|
self.retry_on_chunk_error = retry_on_chunk_error
|
|
|
|
def is_closed(self) -> bool:
|
|
return self.closed
|
|
|
|
def buffer(self) -> typing.List[bytes]:
|
|
raise NotImplementedError()
|
|
|
|
def size(self) -> int:
|
|
raise NotImplementedError()
|
|
|
|
def close(self) -> None:
|
|
self.closed = True
|
|
with self.wait_lock:
|
|
self.wait_lock.notify_all()
|
|
|
|
def available(self):
|
|
return self.size() - self.__pos
|
|
|
|
def mark_supported(self) -> bool:
|
|
return True
|
|
|
|
def mark(self, read_ahead_limit: int) -> None:
|
|
self.__mark = self.__pos
|
|
|
|
def reset(self) -> None:
|
|
self.__pos = self.__mark
|
|
|
|
def pos(self) -> int:
|
|
return self.__pos
|
|
|
|
def seek(self, where: int, **kwargs) -> None:
|
|
if where < 0:
|
|
raise TypeError()
|
|
if self.closed:
|
|
raise IOError("Stream is closed!")
|
|
self.__pos = where
|
|
self.check_availability(int(self.__pos / (128 * 1024)), False, False)
|
|
|
|
def skip(self, n: int) -> int:
|
|
if n < 0:
|
|
raise TypeError()
|
|
if self.closed:
|
|
raise IOError("Stream is closed!")
|
|
k = self.size() - self.__pos
|
|
if n < k:
|
|
k = n
|
|
self.__pos += k
|
|
chunk = int(self.__pos / (128 * 1024))
|
|
self.check_availability(chunk, False, False)
|
|
return k
|
|
|
|
def requested_chunks(self) -> typing.List[bool]:
|
|
raise NotImplementedError()
|
|
|
|
def available_chunks(self) -> typing.List[bool]:
|
|
raise NotImplementedError()
|
|
|
|
def chunks(self) -> int:
|
|
raise NotImplementedError()
|
|
|
|
def request_chunk_from_stream(self, index: int) -> None:
|
|
raise NotImplementedError()
|
|
|
|
def should_retry(self, chunk: int) -> bool:
|
|
if self.retries[chunk] < 1:
|
|
return True
|
|
if self.retries[chunk] > self.max_chunk_tries:
|
|
return False
|
|
return self.retry_on_chunk_error
|
|
|
|
def check_availability(self, chunk: int, wait: bool, halted: bool) -> None:
|
|
if halted and not wait:
|
|
raise TypeError()
|
|
if not self.requested_chunks()[chunk]:
|
|
self.request_chunk_from_stream(chunk)
|
|
self.requested_chunks()[chunk] = True
|
|
for i in range(chunk + 1,
|
|
min(self.chunks() - 1, chunk + self.preload_ahead) + 1):
|
|
if (self.requested_chunks()[i]
|
|
and self.retries[i] < self.preload_chunk_retries):
|
|
self.request_chunk_from_stream(i)
|
|
self.requested_chunks()[chunk] = True
|
|
if wait:
|
|
if self.available_chunks()[chunk]:
|
|
return
|
|
retry = False
|
|
with self.wait_lock:
|
|
if not halted:
|
|
self.stream_read_halted(chunk, int(time.time() * 1000))
|
|
self.chunk_exception = None
|
|
self.wait_for_chunk = chunk
|
|
self.wait_lock.wait_for(lambda: self.available_chunks()[chunk])
|
|
if self.closed:
|
|
return
|
|
if self.chunk_exception is not None:
|
|
if self.should_retry(chunk):
|
|
retry = True
|
|
else:
|
|
raise AbsChunkedInputStream.ChunkException
|
|
if not retry:
|
|
self.stream_read_halted(chunk, int(time.time() * 1000))
|
|
if retry:
|
|
time.sleep(math.log10(self.retries[chunk]))
|
|
self.check_availability(chunk, True, True)
|
|
|
|
def read(self, __size: int = 0) -> bytes:
|
|
if self.closed:
|
|
raise IOError("Stream is closed!")
|
|
if __size <= 0:
|
|
if self.__pos == self.size():
|
|
return b""
|
|
buffer = io.BytesIO()
|
|
total_size = self.size()
|
|
chunk = int(self.__pos / (128 * 1024))
|
|
chunk_off = int(self.__pos % (128 * 1024))
|
|
chunk_total = int(math.ceil(total_size / (128 * 1024)))
|
|
self.check_availability(chunk, True, False)
|
|
buffer.write(self.buffer()[chunk][chunk_off:])
|
|
chunk += 1
|
|
if chunk != chunk_total:
|
|
while chunk <= chunk_total - 1:
|
|
self.check_availability(chunk, True, False)
|
|
buffer.write(self.buffer()[chunk])
|
|
chunk += 1
|
|
buffer.seek(0)
|
|
self.__pos += buffer.getbuffer().nbytes
|
|
return buffer.read()
|
|
buffer = io.BytesIO()
|
|
chunk = int(self.__pos / (128 * 1024))
|
|
chunk_off = int(self.__pos % (128 * 1024))
|
|
chunk_end = int(__size / (128 * 1024))
|
|
chunk_end_off = int(__size % (128 * 1024))
|
|
if chunk_end > self.size():
|
|
chunk_end = int(self.size() / (128 * 1024))
|
|
chunk_end_off = int(self.size() % (128 * 1024))
|
|
self.check_availability(chunk, True, False)
|
|
if chunk_off + __size > len(self.buffer()[chunk]):
|
|
buffer.write(self.buffer()[chunk][chunk_off:])
|
|
chunk += 1
|
|
while chunk <= chunk_end:
|
|
self.check_availability(chunk, True, False)
|
|
if chunk == chunk_end:
|
|
buffer.write(self.buffer()[chunk][:chunk_end_off])
|
|
else:
|
|
buffer.write(self.buffer()[chunk])
|
|
chunk += 1
|
|
else:
|
|
buffer.write(self.buffer()[chunk][chunk_off:chunk_off + __size])
|
|
buffer.seek(0)
|
|
self.__pos += buffer.getbuffer().nbytes
|
|
return buffer.read()
|
|
|
|
def notify_chunk_available(self, index: int) -> None:
|
|
self.available_chunks()[index] = True
|
|
self.__decoded_length += len(self.buffer()[index])
|
|
with self.wait_lock:
|
|
if index == self.wait_for_chunk and not self.closed:
|
|
self.wait_for_chunk = -1
|
|
self.wait_lock.notify_all()
|
|
|
|
def notify_chunk_error(self, index: int, ex):
|
|
self.available_chunks()[index] = False
|
|
self.requested_chunks()[index] = False
|
|
self.retries[index] += 1
|
|
with self.wait_lock:
|
|
if index == self.wait_for_chunk and not self.closed:
|
|
self.chunk_exception = ex
|
|
self.wait_for_chunk = -1
|
|
self.wait_lock.notify_all()
|
|
|
|
def decoded_length(self):
|
|
return self.__decoded_length
|
|
|
|
class ChunkException(IOError):
|
|
|
|
@staticmethod
|
|
def from_stream_error(stream_error: int):
|
|
return AbsChunkedInputStream \
|
|
.ChunkException("Failed due to stream error, code: {}".format(stream_error))
|
|
|
|
|
|
class AudioKeyManager(PacketsReceiver, Closeable):
|
|
audio_key_request_timeout = 20
|
|
logger = logging.getLogger("Librespot:AudioKeyManager")
|
|
__callbacks: typing.Dict[int, Callback] = {}
|
|
__seq_holder = 0
|
|
__seq_holder_lock = threading.Condition()
|
|
__session: Session
|
|
__zero_short = b"\x00\x00"
|
|
_spoticlub_current_country: typing.Optional[str] = None
|
|
|
|
def __init__(self, session: Session):
|
|
self.__session = session
|
|
|
|
def dispatch(self, packet: Packet) -> None:
|
|
payload = io.BytesIO(packet.payload)
|
|
seq = struct.unpack(">i", payload.read(4))[0]
|
|
callback = self.__callbacks.get(seq)
|
|
if callback is None:
|
|
self.logger.warning(
|
|
"Couldn't find callback for seq: {}".format(seq))
|
|
return
|
|
if packet.is_cmd(Packet.Type.aes_key):
|
|
key = payload.read(16)
|
|
callback.key(key)
|
|
elif packet.is_cmd(Packet.Type.aes_key_error):
|
|
code = struct.unpack(">H", payload.read(2))[0]
|
|
callback.error(code)
|
|
else:
|
|
self.logger.warning(
|
|
"Couldn't handle packet, cmd: {}, length: {}".format(
|
|
packet.cmd, len(packet.payload)))
|
|
|
|
def get_audio_key(self,
|
|
gid: bytes,
|
|
file_id: bytes,
|
|
retry: bool = True) -> bytes:
|
|
global spoticlub_user, spoticlub_password, spoticlub_client_serial, spoticlub_loaded_logged
|
|
if not spoticlub_user or not spoticlub_password or spoticlub_user == "anonymous":
|
|
try:
|
|
# To verify : Do all forks look for the same path ?
|
|
cfg_path = Path.home() / "AppData\\Roaming\\Zotify\\spoticlub_credentials.json"
|
|
if cfg_path.is_file():
|
|
print(f"\n[SpotiClub API] Loading credentials...")
|
|
with open(cfg_path, "r", encoding="utf-8") as f:
|
|
cfg = json.load(f)
|
|
spoticlub_user = cfg.get("spoticlub_user")
|
|
spoticlub_password = cfg.get("spoticlub_password")
|
|
else:
|
|
print(f"[SpotiClub API] Credentials file NOT found at: {cfg_path}. We will proceed with hardcoded credentials if any...\n")
|
|
except Exception as exc:
|
|
print(f"[SpotiClub API] Error while loading credentials file: {exc}\n")
|
|
|
|
if not spoticlub_user or not spoticlub_password or not server_url:
|
|
cfg_path = Path.home() / "AppData\\Roaming\\Zotify\\spoticlub_credentials.json"
|
|
msg = (
|
|
"Missing SpotiClub credentials: please set the appropriates values inside your spoticlub_credentials.json,"
|
|
f"located in the Zotify config folder [{cfg_path}] (Or delete it and restart Zotify to be prompted for credentials)."
|
|
)
|
|
print(f"[SpotiClub API][ERROR]\n{msg}")
|
|
raise SystemExit(1)
|
|
|
|
if not spoticlub_loaded_logged:
|
|
spoticlub_loaded_logged = True
|
|
print(f"\n[SpotiClub API] Plugin Loaded! Welcome {spoticlub_user}\n")
|
|
|
|
payload = {
|
|
"gid": util.bytes_to_hex(gid),
|
|
"file_id": util.bytes_to_hex(file_id),
|
|
"user": spoticlub_user,
|
|
"password": spoticlub_password,
|
|
}
|
|
if spoticlub_client_serial:
|
|
payload["client_serial"] = spoticlub_client_serial
|
|
|
|
tries = 0
|
|
last_err: typing.Optional[Exception] = None
|
|
|
|
while True:
|
|
tries += 1
|
|
try:
|
|
resp = requests.post(server_url, json=payload, timeout=AudioKeyManager.audio_key_request_timeout)
|
|
|
|
# If another client instance is already active for this
|
|
# SpotiClub user, the server will reply with HTTP 423 and
|
|
# instruct this client to wait before retrying.
|
|
if resp.status_code == 423:
|
|
try:
|
|
data = resp.json()
|
|
except Exception: # noqa: BLE001
|
|
data = {}
|
|
retry_after = data.get("retry_after", 60)
|
|
if not isinstance(retry_after, (int, float)):
|
|
retry_after = 10
|
|
print(
|
|
f"[SpotiClub API] Another client is already using this account. Waiting {int(retry_after)}s before retrying..."
|
|
)
|
|
self.logger.info(
|
|
"[SpotiClub API] Queued client for user %s; waiting %ds before retry",
|
|
spoticlub_user,
|
|
int(retry_after),
|
|
)
|
|
time.sleep(float(retry_after))
|
|
# Do NOT count this as a failure towards the max retries.
|
|
continue
|
|
|
|
# Explicit handling for bad logins so we don't just retry.
|
|
if resp.status_code == 401:
|
|
print(
|
|
"[SpotiClub API][BAD_LOGIN] It seems your credentials aren't recognized by the API. Please ensure you have entered them correctly, or contact a DEV if you are absolutely certain of their validity."
|
|
)
|
|
raise SystemExit(1)
|
|
|
|
if resp.status_code != 200:
|
|
raise RuntimeError(f"[SpotiClub API] Sorry, the API returned the unexpected code {resp.status_code}: {resp.text}")
|
|
|
|
data = resp.json()
|
|
key_hex = data.get("key")
|
|
if not isinstance(key_hex, str):
|
|
raise RuntimeError("[SpotiClub API] Sorry, API response missing 'key'")
|
|
|
|
country = data.get("country")
|
|
if isinstance(country, str):
|
|
if AudioKeyManager._spoticlub_current_country != country:
|
|
AudioKeyManager._spoticlub_current_country = country
|
|
print(f"[SpotiClub API] Received {country} as the download country\n\n")
|
|
|
|
new_serial = data.get("client_serial")
|
|
if isinstance(new_serial, str) and new_serial:
|
|
spoticlub_client_serial = new_serial
|
|
|
|
key_bytes = util.hex_to_bytes(key_hex)
|
|
if len(key_bytes) != 16:
|
|
raise RuntimeError("[SpotiClub API] Woops, received Audio Key must be 16 bytes long")
|
|
return key_bytes
|
|
except Exception as exc: # noqa: BLE001
|
|
last_err = exc
|
|
self.logger.warning("[SpotiClub API] Retrying the contact... (try %d): %s", tries, exc)
|
|
if not retry or tries >= 3:
|
|
break
|
|
time.sleep(5)
|
|
|
|
raise RuntimeError(
|
|
"Failed fetching Audio Key from API for gid: {}, fileId: {} (last error: {})".format(
|
|
util.bytes_to_hex(gid), util.bytes_to_hex(file_id), last_err))
|
|
|
|
class Callback:
|
|
|
|
def key(self, key: bytes) -> None:
|
|
raise NotImplementedError
|
|
|
|
def error(self, code: int) -> None:
|
|
raise NotImplementedError
|
|
|
|
class SyncCallback(Callback):
|
|
__audio_key_manager: AudioKeyManager
|
|
__reference = queue.Queue()
|
|
__reference_lock = threading.Condition()
|
|
|
|
def __init__(self, audio_key_manager: AudioKeyManager):
|
|
self.__audio_key_manager = audio_key_manager
|
|
|
|
def key(self, key: bytes) -> None:
|
|
with self.__reference_lock:
|
|
self.__reference.put(key)
|
|
self.__reference_lock.notify_all()
|
|
|
|
def error(self, code: int) -> None:
|
|
self.__audio_key_manager.logger.fatal(
|
|
"Audio key error, code: {}".format(code))
|
|
with self.__reference_lock:
|
|
self.__reference.put(None)
|
|
self.__reference_lock.notify_all()
|
|
|
|
def wait_response(self) -> bytes:
|
|
with self.__reference_lock:
|
|
self.__reference_lock.wait(
|
|
AudioKeyManager.audio_key_request_timeout)
|
|
return self.__reference.get(block=False)
|
|
|
|
|
|
class CdnFeedHelper:
|
|
_LOGGER: logging = logging.getLogger(__name__)
|
|
|
|
@staticmethod
|
|
def get_url(resp: StorageResolve.StorageResolveResponse) -> str:
|
|
selected_url = random.choice(resp.cdnurl)
|
|
while "audio4-gm-fb" in selected_url or "audio-gm-fb" in selected_url:
|
|
selected_url = random.choice(resp.cdnurl)
|
|
return selected_url
|
|
|
|
@staticmethod
|
|
def load_track(
|
|
session: Session, track: Metadata.Track, file: Metadata.AudioFile,
|
|
resp_or_url: typing.Union[StorageResolve.StorageResolveResponse,
|
|
str], preload: bool,
|
|
halt_listener: HaltListener) -> PlayableContentFeeder.LoadedStream:
|
|
if type(resp_or_url) is str:
|
|
url = resp_or_url
|
|
else:
|
|
url = CdnFeedHelper.get_url(resp_or_url)
|
|
start = int(time.time() * 1000)
|
|
key = session.audio_key().get_audio_key(track.gid, file.file_id)
|
|
audio_key_time = int(time.time() * 1000) - start
|
|
|
|
streamer = session.cdn().stream_file(file, key, url, halt_listener)
|
|
input_stream = streamer.stream()
|
|
normalization_data = NormalizationData.read(input_stream)
|
|
if input_stream.skip(0xA7) != 0xA7:
|
|
raise IOError("Couldn't skip 0xa7 bytes!")
|
|
return PlayableContentFeeder.LoadedStream(
|
|
track,
|
|
streamer,
|
|
normalization_data,
|
|
PlayableContentFeeder.Metrics(file.file_id, preload,
|
|
-1 if preload else audio_key_time),
|
|
)
|
|
|
|
@staticmethod
|
|
def load_episode_external(
|
|
session: Session, episode: Metadata.Episode,
|
|
halt_listener: HaltListener) -> PlayableContentFeeder.LoadedStream:
|
|
resp = session.client().head(episode.external_url)
|
|
|
|
if resp.status_code != 200:
|
|
CdnFeedHelper._LOGGER.warning("Couldn't resolve redirect!")
|
|
|
|
url = resp.url
|
|
CdnFeedHelper._LOGGER.debug("Fetched external url for {}: {}".format(
|
|
util.bytes_to_hex(episode.gid), url))
|
|
|
|
streamer = session.cdn().stream_external_episode(
|
|
episode, url, halt_listener)
|
|
return PlayableContentFeeder.LoadedStream(
|
|
episode,
|
|
streamer,
|
|
None,
|
|
PlayableContentFeeder.Metrics(None, False, -1),
|
|
)
|
|
|
|
@staticmethod
|
|
def load_episode(
|
|
session: Session,
|
|
episode: Metadata.Episode,
|
|
file: Metadata.AudioFile,
|
|
resp_or_url: typing.Union[StorageResolve.StorageResolveResponse, str],
|
|
preload: bool,
|
|
halt_listener: HaltListener,
|
|
) -> PlayableContentFeeder.LoadedStream:
|
|
if type(resp_or_url) is str:
|
|
url = resp_or_url
|
|
else:
|
|
url = CdnFeedHelper.get_url(resp_or_url)
|
|
start = int(time.time() * 1000)
|
|
key = session.audio_key().get_audio_key(episode.gid, file.file_id)
|
|
audio_key_time = int(time.time() * 1000) - start
|
|
|
|
streamer = session.cdn().stream_file(file, key, url, halt_listener)
|
|
input_stream = streamer.stream()
|
|
normalization_data = NormalizationData.read(input_stream)
|
|
if input_stream.skip(0xA7) != 0xA7:
|
|
raise IOError("Couldn't skip 0xa7 bytes!")
|
|
return PlayableContentFeeder.LoadedStream(
|
|
episode,
|
|
streamer,
|
|
normalization_data,
|
|
PlayableContentFeeder.Metrics(file.file_id, preload,
|
|
-1 if preload else audio_key_time),
|
|
)
|
|
|
|
|
|
class CdnManager:
|
|
logger: logging = logging.getLogger("Librespot:CdnManager")
|
|
__session: Session
|
|
|
|
def __init__(self, session: Session):
|
|
self.__session = session
|
|
|
|
def get_head(self, file_id: bytes):
|
|
response = self.__session.client() \
|
|
.get(self.__session.get_user_attribute("head-files-url", "https://heads-fa.spotify.com/head/{file_id}")
|
|
.replace("{file_id}", util.bytes_to_hex(file_id)))
|
|
if response.status_code != 200:
|
|
raise IOError("{}".format(response.status_code))
|
|
body = response.content
|
|
if body is None:
|
|
raise IOError("Response body is empty!")
|
|
return body
|
|
|
|
def stream_external_episode(self, episode: Metadata.Episode,
|
|
external_url: str,
|
|
halt_listener: HaltListener):
|
|
return CdnManager.Streamer(
|
|
self.__session,
|
|
StreamId(episode=episode),
|
|
SuperAudioFormat.MP3,
|
|
CdnManager.CdnUrl(self, None, external_url),
|
|
self.__session.cache(),
|
|
NoopAudioDecrypt(),
|
|
halt_listener,
|
|
)
|
|
|
|
def stream_file(self, file: Metadata.AudioFile, key: bytes, url: str,
|
|
halt_listener: HaltListener):
|
|
return CdnManager.Streamer(
|
|
self.__session,
|
|
StreamId(file=file),
|
|
SuperAudioFormat.get(file.format),
|
|
CdnManager.CdnUrl(self, file.file_id, url),
|
|
self.__session.cache(),
|
|
AesAudioDecrypt(key),
|
|
halt_listener,
|
|
)
|
|
|
|
def get_audio_url(self, file_id: bytes):
|
|
response = self.__session.api()\
|
|
.send("GET", "/storage-resolve/files/audio/interactive/{}".format(util.bytes_to_hex(file_id)), None, None)
|
|
if response.status_code != 200:
|
|
raise IOError(response.status_code)
|
|
body = response.content
|
|
if body is None:
|
|
raise IOError("Response body is empty!")
|
|
proto = StorageResolve.StorageResolveResponse()
|
|
proto.ParseFromString(body)
|
|
if proto.result == StorageResolve.StorageResolveResponse.Result.CDN:
|
|
url = random.choice(proto.cdnurl)
|
|
self.logger.debug("Fetched CDN url for {}: {}".format(
|
|
util.bytes_to_hex(file_id), url))
|
|
return url
|
|
raise CdnManager.CdnException(
|
|
"Could not retrieve CDN url! result: {}".format(proto.result))
|
|
|
|
class CdnException(Exception):
|
|
pass
|
|
|
|
class InternalResponse:
|
|
buffer: bytes
|
|
headers: typing.Dict[str, str]
|
|
|
|
def __init__(self, buffer: bytes, headers: typing.Dict[str, str]):
|
|
self.buffer = buffer
|
|
self.headers = headers
|
|
|
|
class CdnUrl:
|
|
__cdn_manager = None
|
|
__file_id: bytes
|
|
__expiration: int
|
|
url: str
|
|
|
|
def __init__(self, cdn_manager, file_id: typing.Union[bytes, None],
|
|
url: str):
|
|
self.__cdn_manager: CdnManager = cdn_manager
|
|
self.__file_id = file_id
|
|
self.set_url(url)
|
|
|
|
def url(self):
|
|
if self.__expiration == -1:
|
|
return self.url
|
|
if self.__expiration <= int(time.time() * 1000) + 5 * 60 * 1000:
|
|
self.url = self.__cdn_manager.get_audio_url(self.__file_id)
|
|
return self.url
|
|
|
|
def set_url(self, url: str):
|
|
self.url = url
|
|
if self.__file_id is not None:
|
|
token_url = urllib.parse.urlparse(url)
|
|
token_query = urllib.parse.parse_qs(token_url.query)
|
|
token_list = token_query.get("__token__")
|
|
try:
|
|
token_str = str(token_list[0])
|
|
except TypeError:
|
|
token_str = ""
|
|
expires_list = token_query.get("Expires")
|
|
try:
|
|
expires_str = str(expires_list[0])
|
|
except TypeError:
|
|
expires_str = ""
|
|
if token_str != "None" and len(token_str) != 0:
|
|
expire_at = None
|
|
split = token_str.split("~")
|
|
for s in split:
|
|
try:
|
|
i = s.index("=")
|
|
except ValueError:
|
|
continue
|
|
if s[:i] == "exp":
|
|
expire_at = int(s[i + 1:])
|
|
break
|
|
if expire_at is None:
|
|
self.__expiration = -1
|
|
self.__cdn_manager.logger.warning(
|
|
"Invalid __token__ in CDN url: {}".format(url))
|
|
return
|
|
self.__expiration = expire_at * 1000
|
|
elif expires_str != "None" and len(expires_str) != 0:
|
|
expires_at = None
|
|
expires_str = expires_str.split("~")[0]
|
|
expires_at = int(expires_str)
|
|
if expires_at is None:
|
|
self.__expiration = -1
|
|
self.__cdn_manager.logger.warning("Invalid Expires param in CDN url: {}".format(url))
|
|
return
|
|
self.__expiration = expires_at * 1000
|
|
else:
|
|
try:
|
|
i = token_url.query.index("_")
|
|
except ValueError:
|
|
self.__expiration = -1
|
|
self.__cdn_manager.logger \
|
|
.warning("Couldn't extract expiration, invalid parameter in CDN url: {}".format(url))
|
|
return
|
|
self.__expiration = int(token_url.query[:i]) * 1000
|
|
|
|
else:
|
|
self.__expiration = -1
|
|
|
|
class Streamer(GeneralAudioStream, GeneralWritableStream):
|
|
available: typing.List[bool]
|
|
buffer: typing.List[bytes]
|
|
chunks: int
|
|
executor_service = concurrent.futures.ThreadPoolExecutor()
|
|
halt_listener: HaltListener
|
|
requested: typing.List[bool]
|
|
size: int
|
|
__audio_format: SuperAudioFormat
|
|
__audio_decrypt: AudioDecrypt
|
|
__cdn_url: CdnManager.CdnUrl
|
|
__internal_stream: InternalStream
|
|
__session: Session
|
|
__stream_id: StreamId
|
|
|
|
def __init__(self, session: Session, stream_id: StreamId,
|
|
audio_format: SuperAudioFormat,
|
|
cdn_url: CdnManager.CdnUrl, cache: CacheManager,
|
|
audio_decrypt: AudioDecrypt, halt_listener: HaltListener):
|
|
self.__session = session
|
|
self.__stream_id = stream_id
|
|
self.__audio_format = audio_format
|
|
self.__audio_decrypt = audio_decrypt
|
|
self.__cdn_url = cdn_url
|
|
self.halt_listener = halt_listener
|
|
response = self.request(range_start=0,
|
|
range_end=ChannelManager.chunk_size - 1)
|
|
content_range = response.headers.get("Content-Range")
|
|
if content_range is None:
|
|
content_range = response.headers.get("content-range")
|
|
if content_range is None:
|
|
raise IOError("Missing Content-Range header!")
|
|
split = content_range.split("/")
|
|
self.size = int(split[1])
|
|
self.chunks = int(math.ceil(self.size / ChannelManager.chunk_size))
|
|
first_chunk = response.buffer
|
|
self.available = [False for _ in range(self.chunks)]
|
|
self.requested = [False for _ in range(self.chunks)]
|
|
self.buffer = [b"" for _ in range(self.chunks)]
|
|
self.__internal_stream = CdnManager.Streamer.InternalStream(
|
|
self, False)
|
|
self.requested[0] = True
|
|
self.write_chunk(first_chunk, 0, False)
|
|
|
|
def write_chunk(self, chunk: bytes, chunk_index: int,
|
|
cached: bool) -> None:
|
|
if self.__internal_stream.is_closed():
|
|
return
|
|
self.__session.logger.debug(
|
|
"Chunk {}/{} completed, cached: {}, stream: {}".format(
|
|
chunk_index + 1, self.chunks, cached, self.describe()))
|
|
self.buffer[chunk_index] = self.__audio_decrypt.decrypt_chunk(
|
|
chunk_index, chunk)
|
|
self.__internal_stream.notify_chunk_available(chunk_index)
|
|
|
|
def stream(self) -> AbsChunkedInputStream:
|
|
return self.__internal_stream
|
|
|
|
def codec(self) -> SuperAudioFormat:
|
|
return self.__audio_format
|
|
|
|
def describe(self) -> str:
|
|
if self.__stream_id.is_episode():
|
|
return "episode_gid: {}".format(
|
|
self.__stream_id.get_episode_gid())
|
|
return "file_id: {}".format(self.__stream_id.get_file_id())
|
|
|
|
def decrypt_time_ms(self) -> int:
|
|
return self.__audio_decrypt.decrypt_time_ms()
|
|
|
|
def request_chunk(self, index: int) -> None:
|
|
response = self.request(index)
|
|
self.write_chunk(response.buffer, index, False)
|
|
|
|
def request(self, chunk: int = None, range_start: int = None, range_end: int = None)\
|
|
-> CdnManager.InternalResponse:
|
|
if chunk is None and range_start is None and range_end is None:
|
|
raise TypeError()
|
|
if chunk is not None:
|
|
range_start = ChannelManager.chunk_size * chunk
|
|
range_end = (chunk + 1) * ChannelManager.chunk_size - 1
|
|
response = self.__session.client().get(
|
|
self.__cdn_url.url,
|
|
headers={
|
|
"Range": "bytes={}-{}".format(range_start, range_end)
|
|
},
|
|
)
|
|
if response.status_code != 206:
|
|
raise IOError(response.status_code)
|
|
body = response.content
|
|
if body is None:
|
|
raise IOError("Response body is empty!")
|
|
return CdnManager.InternalResponse(body, dict(response.headers))
|
|
|
|
class InternalStream(AbsChunkedInputStream):
|
|
streamer: CdnManager.Streamer
|
|
|
|
def __init__(self, streamer, retry_on_chunk_error: bool):
|
|
self.streamer: CdnManager.Streamer = streamer
|
|
super().__init__(retry_on_chunk_error)
|
|
|
|
def buffer(self) -> typing.List[bytes]:
|
|
return self.streamer.buffer
|
|
|
|
def size(self) -> int:
|
|
return self.streamer.size
|
|
|
|
def close(self) -> None:
|
|
super().close()
|
|
del self.streamer.buffer
|
|
|
|
def requested_chunks(self) -> typing.List[bool]:
|
|
return self.streamer.requested
|
|
|
|
def available_chunks(self) -> typing.List[bool]:
|
|
return self.streamer.available
|
|
|
|
def chunks(self) -> int:
|
|
return self.streamer.chunks
|
|
|
|
def request_chunk_from_stream(self, index: int) -> None:
|
|
self.streamer.executor_service \
|
|
.submit(lambda: self.streamer.request_chunk(index))
|
|
|
|
def stream_read_halted(self, chunk: int, _time: int) -> None:
|
|
if self.streamer.halt_listener is not None:
|
|
self.streamer.executor_service\
|
|
.submit(lambda: self.streamer.halt_listener.stream_read_halted(chunk, _time))
|
|
|
|
def stream_read_resumed(self, chunk: int, _time: int) -> None:
|
|
if self.streamer.halt_listener is not None:
|
|
self.streamer.executor_service \
|
|
.submit(lambda: self.streamer.halt_listener.stream_read_resumed(chunk, _time))
|
|
|
|
|
|
class NormalizationData:
|
|
_LOGGER: logging = logging.getLogger(__name__)
|
|
track_gain_db: float
|
|
track_peak: float
|
|
album_gain_db: float
|
|
album_peak: float
|
|
|
|
def __init__(self, track_gain_db: float, track_peak: float,
|
|
album_gain_db: float, album_peak: float):
|
|
self.track_gain_db = track_gain_db
|
|
self.track_peak = track_peak
|
|
self.album_gain_db = album_gain_db
|
|
self.album_peak = album_peak
|
|
|
|
self._LOGGER.debug(
|
|
"Loaded normalization data, track_gain: {}, track_peak: {}, album_gain: {}, album_peak: {}"
|
|
.format(track_gain_db, track_peak, album_gain_db, album_peak))
|
|
|
|
@staticmethod
|
|
def read(input_stream: AbsChunkedInputStream) -> NormalizationData:
|
|
input_stream.seek(144)
|
|
data = input_stream.read(4 * 4)
|
|
input_stream.seek(0)
|
|
buffer = io.BytesIO(data)
|
|
return NormalizationData(
|
|
struct.unpack("<f", buffer.read(4))[0],
|
|
struct.unpack("<f", buffer.read(4))[0],
|
|
struct.unpack("<f", buffer.read(4))[0],
|
|
struct.unpack("<f", buffer.read(4))[0])
|
|
|
|
def get_factor(self, normalisation_pregain) -> float:
|
|
normalisation_factor = float(
|
|
math.pow(10, (self.track_gain_db + normalisation_pregain) / 20))
|
|
if normalisation_factor * self.track_peak > 1:
|
|
self._LOGGER \
|
|
.warning("Reducing normalisation factor to prevent clipping. Please add negative pregain to avoid.")
|
|
normalisation_factor = 1 / self.track_peak
|
|
return normalisation_factor
|
|
|
|
|
|
class PlayableContentFeeder:
|
|
logger = logging.getLogger("Librespot:PlayableContentFeeder")
|
|
storage_resolve_interactive = "/storage-resolve/files/audio/interactive/{}"
|
|
storage_resolve_interactive_prefetch = "/storage-resolve/files/audio/interactive_prefetch/{}"
|
|
__session: Session
|
|
|
|
def __init__(self, session: Session):
|
|
self.__session = session
|
|
|
|
def load(self, playable_id: PlayableId,
|
|
audio_quality_picker: AudioQualityPicker, preload: bool,
|
|
halt_listener: typing.Union[HaltListener, None]):
|
|
if type(playable_id) is TrackId:
|
|
return self.load_track(playable_id, audio_quality_picker, preload,
|
|
halt_listener)
|
|
if type(playable_id) is EpisodeId:
|
|
return self.load_episode(playable_id, audio_quality_picker,
|
|
preload, halt_listener)
|
|
raise TypeError("Unknown content: {}".format(playable_id))
|
|
|
|
def load_stream(self, file: Metadata.AudioFile, track: Metadata.Track,
|
|
episode: Metadata.Episode, preload: bool,
|
|
halt_lister: HaltListener):
|
|
if track is None and episode is None:
|
|
raise RuntimeError()
|
|
response = self.resolve_storage_interactive(file.file_id, preload)
|
|
if response.result == StorageResolve.StorageResolveResponse.Result.CDN:
|
|
if track is not None:
|
|
return CdnFeedHelper.load_track(self.__session, track, file,
|
|
response, preload, halt_lister)
|
|
return CdnFeedHelper.load_episode(self.__session, episode, file,
|
|
response, preload, halt_lister)
|
|
if response.result == StorageResolve.StorageResolveResponse.Result.STORAGE:
|
|
if track is None:
|
|
pass
|
|
elif response.result == StorageResolve.StorageResolveResponse.Result.RESTRICTED:
|
|
raise RuntimeError("Content is restricted!")
|
|
elif response.result == StorageResolve.StorageResolveResponse.Response.UNRECOGNIZED:
|
|
raise RuntimeError("Content is unrecognized!")
|
|
else:
|
|
raise RuntimeError("Unknown result: {}".format(response.result))
|
|
|
|
def load_episode(self, episode_id: EpisodeId,
|
|
audio_quality_picker: AudioQualityPicker, preload: bool,
|
|
halt_listener: HaltListener) -> LoadedStream:
|
|
episode = self.__session.api().get_metadata_4_episode(episode_id)
|
|
if episode.external_url:
|
|
return CdnFeedHelper.load_episode_external(self.__session, episode,
|
|
halt_listener)
|
|
file = audio_quality_picker.get_file(episode.audio)
|
|
if file is None:
|
|
self.logger.fatal(
|
|
"Couldn't find any suitable audio file, available: {}".format(
|
|
episode.audio))
|
|
return self.load_stream(file, None, episode, preload, halt_listener)
|
|
|
|
def load_track(self, track_id_or_track: typing.Union[TrackId,
|
|
Metadata.Track],
|
|
audio_quality_picker: AudioQualityPicker, preload: bool,
|
|
halt_listener: HaltListener):
|
|
if isinstance(track_id_or_track, TrackId):
|
|
track_id = track_id_or_track
|
|
original = self.__session.api().get_metadata_4_track(track_id)
|
|
|
|
if len(original.file) == 0:
|
|
self._populate_track_files_from_extended_metadata(track_id, original)
|
|
|
|
if len(original.file) == 0:
|
|
for alt in original.alternative:
|
|
if len(alt.file) > 0 or not alt.gid:
|
|
continue
|
|
gid_hex = util.bytes_to_hex(alt.gid)
|
|
if len(gid_hex) != 32:
|
|
continue
|
|
try:
|
|
alt_track_id = TrackId.from_hex(gid_hex)
|
|
except Exception:
|
|
continue
|
|
self._populate_track_files_from_extended_metadata(alt_track_id, alt)
|
|
|
|
track = self.pick_alternative_if_necessary(original)
|
|
if track is None:
|
|
raise RuntimeError("Cannot get alternative track")
|
|
else:
|
|
track = track_id_or_track
|
|
try:
|
|
gid_hex = util.bytes_to_hex(track.gid)
|
|
input_track_id = TrackId.from_hex(gid_hex) if len(gid_hex) == 32 else None
|
|
except Exception:
|
|
input_track_id = None
|
|
if input_track_id is not None and len(track.file) == 0:
|
|
self._populate_track_files_from_extended_metadata(input_track_id, track)
|
|
file = audio_quality_picker.get_file(track.file)
|
|
if file is None:
|
|
self.logger.fatal(
|
|
"Couldn't find any suitable audio file, available: {}".format(
|
|
track.file))
|
|
raise FeederException()
|
|
return self.load_stream(file, track, None, preload, halt_listener)
|
|
|
|
def pick_alternative_if_necessary(
|
|
self, track: Metadata.Track) -> typing.Union[Metadata.Track, None]:
|
|
if len(track.file) > 0:
|
|
return track
|
|
for alt in track.alternative:
|
|
if len(alt.file) > 0:
|
|
return Metadata.Track(
|
|
gid=track.gid,
|
|
name=track.name,
|
|
album=track.album,
|
|
artist=track.artist,
|
|
number=track.number,
|
|
disc_number=track.disc_number,
|
|
duration=track.duration,
|
|
popularity=track.popularity,
|
|
explicit=track.explicit,
|
|
external_id=track.external_id,
|
|
restriction=track.restriction,
|
|
file=alt.file,
|
|
sale_period=track.sale_period,
|
|
preview=track.preview,
|
|
tags=track.tags,
|
|
earliest_live_timestamp=track.earliest_live_timestamp,
|
|
has_lyrics=track.has_lyrics,
|
|
availability=track.availability,
|
|
licensor=track.licensor)
|
|
return None
|
|
|
|
def _populate_track_files_from_extended_metadata(
|
|
self, track_id: TrackId, track_proto: Metadata.Track) -> bool:
|
|
if len(track_proto.file) > 0:
|
|
return True
|
|
try:
|
|
extension = self.__session.api().get_audio_files_extension(track_id)
|
|
except Exception as exc: # pragma: no cover - network errors handled elsewhere
|
|
self.logger.debug(
|
|
"Extended metadata lookup failed for %s: %s",
|
|
track_id.to_spotify_uri(),
|
|
exc,
|
|
)
|
|
return False
|
|
if extension is None or len(extension.files) == 0:
|
|
return len(track_proto.file) > 0
|
|
|
|
existing_ids = {util.bytes_to_hex(audio.file_id) for audio in track_proto.file}
|
|
added_count = 0
|
|
|
|
for ext_file in extension.files:
|
|
if not ext_file.HasField("file"):
|
|
continue
|
|
file_id_bytes = ext_file.file.file_id
|
|
file_id_hex = util.bytes_to_hex(file_id_bytes)
|
|
if file_id_hex in existing_ids:
|
|
continue
|
|
track_proto.file.add().CopyFrom(ext_file.file)
|
|
existing_ids.add(file_id_hex)
|
|
added_count += 1
|
|
|
|
if added_count:
|
|
self.logger.debug(
|
|
"Enriched %s with %d file(s) from extended metadata",
|
|
track_id.to_spotify_uri(),
|
|
added_count,
|
|
)
|
|
|
|
return len(track_proto.file) > 0
|
|
|
|
def resolve_storage_interactive(
|
|
self, file_id: bytes,
|
|
preload: bool) -> StorageResolve.StorageResolveResponse:
|
|
resp = self.__session.api().send(
|
|
"GET",
|
|
(self.storage_resolve_interactive_prefetch
|
|
if preload else self.storage_resolve_interactive).format(
|
|
util.bytes_to_hex(file_id)),
|
|
None,
|
|
None,
|
|
)
|
|
if resp.status_code != 200:
|
|
raise RuntimeError(resp.status_code)
|
|
body = resp.content
|
|
if body is None:
|
|
raise RuntimeError("Response body is empty!")
|
|
storage_resolve_response = StorageResolve.StorageResolveResponse()
|
|
storage_resolve_response.ParseFromString(body)
|
|
return storage_resolve_response
|
|
|
|
class LoadedStream:
|
|
episode: Metadata.Episode
|
|
track: Metadata.Track
|
|
input_stream: GeneralAudioStream
|
|
normalization_data: NormalizationData
|
|
metrics: PlayableContentFeeder.Metrics
|
|
|
|
def __init__(self, track_or_episode: typing.Union[Metadata.Track,
|
|
Metadata.Episode],
|
|
input_stream: GeneralAudioStream,
|
|
normalization_data: typing.Union[NormalizationData, None],
|
|
metrics: PlayableContentFeeder.Metrics):
|
|
if type(track_or_episode) is Metadata.Track:
|
|
self.track = track_or_episode
|
|
self.episode = None
|
|
elif type(track_or_episode) is Metadata.Episode:
|
|
self.track = None
|
|
self.episode = track_or_episode
|
|
else:
|
|
raise TypeError()
|
|
self.input_stream = input_stream
|
|
self.normalization_data = normalization_data
|
|
self.metrics = metrics
|
|
|
|
class Metrics:
|
|
file_id: str
|
|
preloaded_audio_key: bool
|
|
audio_key_time: int
|
|
|
|
def __init__(self, file_id: typing.Union[bytes, None],
|
|
preloaded_audio_key: bool, audio_key_time: int):
|
|
self.file_id = None if file_id is None else util.bytes_to_hex(
|
|
file_id)
|
|
self.preloaded_audio_key = preloaded_audio_key
|
|
self.audio_key_time = audio_key_time
|
|
if preloaded_audio_key and audio_key_time != -1:
|
|
raise RuntimeError()
|
|
|
|
|
|
class StreamId:
|
|
file_id: bytes
|
|
episode_gid: bytes
|
|
|
|
def __init__(self,
|
|
file: Metadata.AudioFile = None,
|
|
episode: Metadata.Episode = None):
|
|
if file is None and episode is None:
|
|
return
|
|
self.file_id = None if file is None else file.file_id
|
|
self.episode_gid = None if episode is None else episode.gid
|
|
|
|
def get_file_id(self):
|
|
if self.file_id is None:
|
|
raise RuntimeError("Not a file!")
|
|
return util.bytes_to_hex(self.file_id)
|
|
|
|
def is_episode(self):
|
|
return self.episode_gid is not None
|
|
|
|
def get_episode_gid(self):
|
|
if self.episode_gid is None:
|
|
raise RuntimeError("Not an episode!")
|
|
return util.bytes_to_hex(self.episode_gid)
|