Restyled by black

This commit is contained in:
Restyled.io
2021-05-21 03:40:40 +00:00
parent 1e0e47cced
commit 5132c9e023
5 changed files with 648 additions and 297 deletions

View File

@@ -21,9 +21,7 @@ class AbsChunkedInputStream(InputStream, HaltListener):
_decoded_length: int = 0
def __init__(self, retry_on_chunk_error: bool):
self.retries: typing.Final[typing.List[int]] = [
0 for _ in range(self.chunks())
]
self.retries: typing.Final[typing.List[int]] = [0 for _ in range(self.chunks())]
self.retry_on_chunk_error = retry_on_chunk_error
def is_closed(self) -> bool:
@@ -108,10 +106,13 @@ class AbsChunkedInputStream(InputStream, HaltListener):
self.request_chunk_from_stream(chunk)
self.requested_chunks()[chunk] = True
for i in range(chunk + 1,
min(self.chunks() - 1, chunk + self.preload_ahead) + 1):
if self.requested_chunks(
)[i] and self.retries[i] < self.preload_chunk_retries:
for i in range(
chunk + 1, min(self.chunks() - 1, chunk + self.preload_ahead) + 1
):
if (
self.requested_chunks()[i]
and self.retries[i] < self.preload_chunk_retries
):
self.request_chunk_from_stream(i)
self.requested_chunks()[chunk] = True
@@ -145,10 +146,7 @@ class AbsChunkedInputStream(InputStream, HaltListener):
self.check_availability(chunk, True, True)
def read(self,
b: bytearray = None,
offset: int = None,
length: int = None) -> int:
def read(self, b: bytearray = None, offset: int = None, length: int = None) -> int:
if b is None and offset is None and length is None:
return self.internal_read()
if not (b is not None and offset is not None and length is not None):
@@ -158,8 +156,9 @@ class AbsChunkedInputStream(InputStream, HaltListener):
raise IOError("Stream is closed!")
if offset < 0 or length < 0 or length > len(b) - offset:
raise IndexError("offset: {}, length: {}, buffer: {}".format(
offset, length, len(b)))
raise IndexError(
"offset: {}, length: {}, buffer: {}".format(offset, length, len(b))
)
elif length == 0:
return 0
@@ -174,8 +173,7 @@ class AbsChunkedInputStream(InputStream, HaltListener):
self.check_availability(chunk, True, False)
copy = min(len(self.buffer()[chunk]) - chunk_off, length - i)
b[offset + 0:copy] = self.buffer()[chunk][chunk_off:chunk_off +
copy]
b[offset + 0 : copy] = self.buffer()[chunk][chunk_off : chunk_off + copy]
i += copy
self._pos += copy
@@ -223,4 +221,5 @@ class AbsChunkedInputStream(InputStream, HaltListener):
@staticmethod
def from_stream_error(stream_error: int):
return AbsChunkedInputStream.ChunkException(
"Failed due to stream error, code: {}".format(stream_error))
"Failed due to stream error, code: {}".format(stream_error)
)

View File

@@ -17,10 +17,14 @@ class CdnFeedHelper:
return random.choice(resp.cdnurl)
@staticmethod
def load_track(session: Session, track: Metadata.Track, file: Metadata.AudioFile,
resp_or_url: typing.Union[StorageResolve.StorageResolveResponse, str],
preload: bool, halt_listener: HaltListener)\
-> PlayableContentFeeder.PlayableContentFeeder.LoadedStream:
def load_track(
session: Session,
track: Metadata.Track,
file: Metadata.AudioFile,
resp_or_url: typing.Union[StorageResolve.StorageResolveResponse, str],
preload: bool,
halt_listener: HaltListener,
) -> PlayableContentFeeder.PlayableContentFeeder.LoadedStream:
if type(resp_or_url) is str:
url = resp_or_url
else:
@@ -31,19 +35,21 @@ class CdnFeedHelper:
streamer = session.cdn().stream_file(file, key, url, halt_listener)
input_stream = streamer.stream()
normalization_data = NormalizationData.NormalizationData.read(
input_stream)
if input_stream.skip(0xa7) != 0xa7:
normalization_data = NormalizationData.NormalizationData.read(input_stream)
if input_stream.skip(0xA7) != 0xA7:
raise IOError("Couldn't skip 0xa7 bytes!")
return PlayableContentFeeder.PlayableContentFeeder.LoadedStream(
track, streamer, normalization_data,
track,
streamer,
normalization_data,
PlayableContentFeeder.PlayableContentFeeder.Metrics(
file.file_id, preload, -1 if preload else audio_key_time))
file.file_id, preload, -1 if preload else audio_key_time
),
)
@staticmethod
def load_episode_external(
session: Session, episode: Metadata.Episode,
halt_listener: HaltListener
session: Session, episode: Metadata.Episode, halt_listener: HaltListener
) -> PlayableContentFeeder.PlayableContentFeeder.LoadedStream:
resp = session.client().head(episode.external_url)
@@ -51,21 +57,27 @@ class CdnFeedHelper:
CdnFeedHelper._LOGGER.warning("Couldn't resolve redirect!")
url = resp.url
CdnFeedHelper._LOGGER.debug("Fetched external url for {}: {}".format(
Utils.Utils.bytes_to_hex(episode.gid), url))
CdnFeedHelper._LOGGER.debug(
"Fetched external url for {}: {}".format(
Utils.Utils.bytes_to_hex(episode.gid), url
)
)
streamer = session.cdn().stream_external_episode(
episode, url, halt_listener)
streamer = session.cdn().stream_external_episode(episode, url, halt_listener)
return PlayableContentFeeder.PlayableContentFeeder.LoadedStream(
episode, streamer, None,
PlayableContentFeeder.PlayableContentFeeder.Metrics(
None, False, -1))
episode,
streamer,
None,
PlayableContentFeeder.PlayableContentFeeder.Metrics(None, False, -1),
)
@staticmethod
def load_episode(
session: Session, episode: Metadata.Episode, file: Metadata.AudioFile,
resp_or_url: typing.Union[StorageResolve.StorageResolveResponse,
str], halt_listener: HaltListener
session: Session,
episode: Metadata.Episode,
file: Metadata.AudioFile,
resp_or_url: typing.Union[StorageResolve.StorageResolveResponse, str],
halt_listener: HaltListener,
) -> PlayableContentFeeder.PlayableContentFeeder.LoadedStream:
if type(resp_or_url) is str:
url = resp_or_url
@@ -78,9 +90,13 @@ class CdnFeedHelper:
streamer = session.cdn().stream_file(file, key, url, halt_listener)
input_stream = streamer.stream()
normalization_data = NormalizationData.read(input_stream)
if input_stream.skip(0xa7) != 0xa7:
if input_stream.skip(0xA7) != 0xA7:
raise IOError("Couldn't skip 0xa7 bytes!")
return PlayableContentFeeder.PlayableContentFeeder.LoadedStream(
episode, streamer, normalization_data,
episode,
streamer,
normalization_data,
PlayableContentFeeder.PlayableContentFeeder.Metrics(
file.file_id, False, audio_key_time))
file.file_id, False, audio_key_time
),
)

View File

@@ -30,9 +30,11 @@ class CdnManager:
self._session = session
def get_head(self, file_id: bytes):
resp = self._session.client() \
.get(self._session.get_user_attribute("head-files-url", "https://heads-fa.spotify.com/head/{file_id}")
.replace("{file_id}", Utils.bytes_to_hex(file_id)))
resp = self._session.client().get(
self._session.get_user_attribute(
"head-files-url", "https://heads-fa.spotify.com/head/{file_id}"
).replace("{file_id}", Utils.bytes_to_hex(file_id))
)
if resp.status_code != 200:
raise IOError("{}".format(resp.status_code))
@@ -43,27 +45,45 @@ class CdnManager:
return body
def stream_external_episode(self, episode: Metadata.Episode,
external_url: str,
halt_listener: HaltListener):
return CdnManager.Streamer(self._session, StreamId(episode),
SuperAudioFormat.MP3,
CdnManager.CdnUrl(self, None, external_url),
self._session.cache(), NoopAudioDecrypt(),
halt_listener)
def stream_external_episode(
self, episode: Metadata.Episode, external_url: str, halt_listener: HaltListener
):
return CdnManager.Streamer(
self._session,
StreamId(episode),
SuperAudioFormat.MP3,
CdnManager.CdnUrl(self, None, external_url),
self._session.cache(),
NoopAudioDecrypt(),
halt_listener,
)
def stream_file(self, file: Metadata.AudioFile, key: bytes, url: str,
halt_listener: HaltListener):
return CdnManager.Streamer(self._session, StreamId.StreamId(file),
SuperAudioFormat.get(file.format),
CdnManager.CdnUrl(self, file.file_id, url),
self._session.cache(), AesAudioDecrypt(key),
halt_listener)
def stream_file(
self,
file: Metadata.AudioFile,
key: bytes,
url: str,
halt_listener: HaltListener,
):
return CdnManager.Streamer(
self._session,
StreamId.StreamId(file),
SuperAudioFormat.get(file.format),
CdnManager.CdnUrl(self, file.file_id, url),
self._session.cache(),
AesAudioDecrypt(key),
halt_listener,
)
def get_audio_url(self, file_id: bytes):
resp = self._session.api().send(
"GET", "/storage-resolve/files/audio/interactive/{}".format(
Utils.bytes_to_hex(file_id)), None, None)
"GET",
"/storage-resolve/files/audio/interactive/{}".format(
Utils.bytes_to_hex(file_id)
),
None,
None,
)
if resp.status_code != 200:
raise IOError(resp.status_code)
@@ -76,11 +96,13 @@ class CdnManager:
proto.ParseFromString(body)
if proto.result == StorageResolve.StorageResolveResponse.Result.CDN:
url = random.choice(proto.cdnurl)
self._LOGGER.debug("Fetched CDN url for {}: {}".format(
Utils.bytes_to_hex(file_id), url))
self._LOGGER.debug(
"Fetched CDN url for {}: {}".format(Utils.bytes_to_hex(file_id), url)
)
return url
raise CdnManager.CdnException(
"Could not retrieve CDN url! result: {}".format(proto.result))
"Could not retrieve CDN url! result: {}".format(proto.result)
)
class CdnException(Exception):
pass
@@ -140,7 +162,8 @@ class CdnManager:
if expire_at is None:
self._expiration = -1
self._cdnManager._LOGGER.warning(
"Invalid __token__ in CDN url: {}".format(url))
"Invalid __token__ in CDN url: {}".format(url)
)
return
self._expiration = expire_at * 1000
@@ -150,8 +173,10 @@ class CdnManager:
except ValueError:
self._expiration = -1
self._cdnManager._LOGGER.warning(
"Couldn't extract expiration, invalid parameter in CDN url: "
.format(url))
"Couldn't extract expiration, invalid parameter in CDN url: ".format(
url
)
)
return
self._expiration = int(token_url.query[:i]) * 1000
@@ -159,8 +184,10 @@ class CdnManager:
else:
self._expiration = -1
class Streamer(GeneralAudioStream.GeneralAudioStream,
GeneralWritableStream.GeneralWritableStream):
class Streamer(
GeneralAudioStream.GeneralAudioStream,
GeneralWritableStream.GeneralWritableStream,
):
_session: Session = None
_streamId: StreamId = None
_executorService = concurrent.futures.ThreadPoolExecutor()
@@ -175,10 +202,16 @@ class CdnManager:
_internalStream: CdnManager.Streamer.InternalStream = None
_haltListener: HaltListener = None
def __init__(self, session: Session, stream_id: StreamId,
audio_format: SuperAudioFormat, cdn_url,
cache: CacheManager, audio_decrypt: AudioDecrypt,
halt_listener: HaltListener):
def __init__(
self,
session: Session,
stream_id: StreamId,
audio_format: SuperAudioFormat,
cdn_url,
cache: CacheManager,
audio_decrypt: AudioDecrypt,
halt_listener: HaltListener,
):
self._session = session
self._streamId = stream_id
self._audioFormat = audio_format
@@ -186,39 +219,38 @@ class CdnManager:
self._cdnUrl = cdn_url
self._haltListener = halt_listener
resp = self.request(range_start=0,
range_end=ChannelManager.CHUNK_SIZE - 1)
resp = self.request(range_start=0, range_end=ChannelManager.CHUNK_SIZE - 1)
content_range = resp._headers.get("Content-Range")
if content_range is None:
raise IOError("Missing Content-Range header!")
split = Utils.split(content_range, "/")
self._size = int(split[1])
self._chunks = int(
math.ceil(self._size / ChannelManager.CHUNK_SIZE))
self._chunks = int(math.ceil(self._size / ChannelManager.CHUNK_SIZE))
first_chunk = resp._buffer
self._available = [False for _ in range(self._chunks)]
self._requested = [False for _ in range(self._chunks)]
self._buffer = [bytearray() for _ in range(self._chunks)]
self._internalStream = CdnManager.Streamer.InternalStream(
self, False)
self._internalStream = CdnManager.Streamer.InternalStream(self, False)
self._requested[0] = True
self.write_chunk(first_chunk, 0, False)
def write_chunk(self, chunk: bytes, chunk_index: int,
cached: bool) -> None:
def write_chunk(self, chunk: bytes, chunk_index: int, cached: bool) -> None:
if self._internalStream.is_closed():
return
self._session._LOGGER.debug(
"Chunk {}/{} completed, cached: {}, stream: {}".format(
chunk_index + 1, self._chunks, cached, self.describe()))
chunk_index + 1, self._chunks, cached, self.describe()
)
)
self._buffer[chunk_index] = self._audioDecrypt.decrypt_chunk(
chunk_index, chunk)
chunk_index, chunk
)
self._internalStream.notify_chunk_available(chunk_index)
def stream(self) -> AbsChunkedInputStream:
@@ -229,8 +261,7 @@ class CdnManager:
def describe(self) -> str:
if self._streamId.is_episode():
return "episode_gid: {}".format(
self._streamId.get_episode_gid())
return "episode_gid: {}".format(self._streamId.get_episode_gid())
return "file_id: {}".format(self._streamId.get_file_id())
def decrypt_time_ms(self) -> int:
@@ -240,10 +271,9 @@ class CdnManager:
resp = self.request(index)
self.write_chunk(resp._buffer, index, False)
def request(self,
chunk: int = None,
range_start: int = None,
range_end: int = None) -> CdnManager.InternalResponse:
def request(
self, chunk: int = None, range_start: int = None, range_end: int = None
) -> CdnManager.InternalResponse:
if chunk is None and range_start is None and range_end is None:
raise TypeError()
@@ -251,12 +281,10 @@ class CdnManager:
range_start = ChannelManager.CHUNK_SIZE * chunk
range_end = (chunk + 1) * ChannelManager.CHUNK_SIZE - 1
resp = self._session.client().get(self._cdnUrl._url,
headers={
"Range":
"bytes={}-{}".format(
range_start, range_end)
})
resp = self._session.client().get(
self._cdnUrl._url,
headers={"Range": "bytes={}-{}".format(range_start, range_end)},
)
if resp.status_code != 206:
raise IOError(resp.status_code)
@@ -291,16 +319,21 @@ class CdnManager:
def request_chunk_from_stream(self, index: int) -> None:
self.streamer._executorService.submit(
lambda: self.streamer.request_chunk(index))
lambda: self.streamer.request_chunk(index)
)
def stream_read_halted(self, chunk: int, _time: int) -> None:
if self.streamer._haltListener is not None:
self.streamer._executorService.submit(
lambda: self.streamer._haltListener.stream_read_halted(
chunk, _time))
chunk, _time
)
)
def stream_read_resumed(self, chunk: int, _time: int) -> None:
if self.streamer._haltListener is not None:
self.streamer._executorService.submit(
lambda: self.streamer._haltListener.
stream_read_resumed(chunk, _time))
lambda: self.streamer._haltListener.stream_read_resumed(
chunk, _time
)
)