SpotiClub Patch v0.2.0
Some checks failed
CodeQL / Analyze (python) (push) Has been cancelled

This commit is contained in:
unknown
2025-12-17 21:03:32 +01:00
parent acd633d3eb
commit f2c6a5ec0d
18 changed files with 4786 additions and 515 deletions

View File

@@ -17,10 +17,12 @@ import threading
import time
import typing
import urllib.parse
from collections.abc import Iterable
import defusedxml.ElementTree
import requests
import websocket
from google.protobuf import message as _message
from Cryptodome import Random
from Cryptodome.Cipher import AES
from Cryptodome.Hash import HMAC
@@ -28,7 +30,6 @@ from Cryptodome.Hash import SHA1
from Cryptodome.Protocol.KDF import PBKDF2
from Cryptodome.PublicKey import RSA
from Cryptodome.Signature import PKCS1_v1_5
from requests.structures import CaseInsensitiveDict
from librespot import util
from librespot import Version
@@ -49,7 +50,6 @@ from librespot.metadata import EpisodeId
from librespot.metadata import PlaylistId
from librespot.metadata import ShowId
from librespot.metadata import TrackId
from librespot.oauth import OAuth
from librespot.proto import Authentication_pb2 as Authentication
from librespot.proto import ClientToken_pb2 as ClientToken
from librespot.proto import Connect_pb2 as Connect
@@ -57,11 +57,21 @@ from librespot.proto import Connectivity_pb2 as Connectivity
from librespot.proto import Keyexchange_pb2 as Keyexchange
from librespot.proto import Metadata_pb2 as Metadata
from librespot.proto import Playlist4External_pb2 as Playlist4External
from librespot.proto.ExtendedMetadata_pb2 import EntityRequest, BatchedEntityRequest, ExtensionQuery, BatchedExtensionResponse
from librespot.proto.ExtensionKind_pb2 import ExtensionKind
from librespot.proto_ext import audio_files_extension_pb2
from librespot.proto_ext import extended_metadata_pb2
from librespot.proto_ext import extension_kind_pb2
try:
from librespot.proto.spotify.login5.v3 import Login5_pb2 as Login5
from librespot.proto.spotify.login5.v3 import ClientInfo_pb2 as Login5ClientInfo
from librespot.proto.spotify.login5.v3.credentials import Credentials_pb2 as Login5Credentials
LOGIN5_AVAILABLE = True
except ImportError as e:
# Login5 protobuf files not available, will use fallback
LOGIN5_AVAILABLE = False
Login5 = None
Login5ClientInfo = None
Login5Credentials = None
from librespot.proto.ExplicitContentPubsub_pb2 import UserAttributesUpdate
from librespot.proto.spotify.login5.v3 import Login5_pb2 as Login5
from librespot.proto.spotify.login5.v3.credentials import Credentials_pb2 as Login5Credentials
from librespot.structure import Closeable
from librespot.structure import MessageListener
from librespot.structure import RequestListener
@@ -83,7 +93,7 @@ class ApiClient(Closeable):
self,
method: str,
suffix: str,
headers: typing.Union[None, CaseInsensitiveDict[str, str]],
headers: typing.Union[None, typing.Dict[str, str]],
body: typing.Union[None, bytes],
url: typing.Union[None, str],
) -> requests.PreparedRequest:
@@ -92,7 +102,7 @@ class ApiClient(Closeable):
:param method: str:
:param suffix: str:
:param headers: typing.Union[None:
:param CaseInsensitiveDict[str:
:param typing.Dict[str:
:param str]]:
:param body: typing.Union[None:
:param bytes]:
@@ -106,26 +116,32 @@ class ApiClient(Closeable):
self.logger.debug("Updated client token: {}".format(
self.__client_token_str))
if url is None:
url = self.__base_url + suffix
else:
url = url + suffix
merged_headers: dict[str, str] = {}
if headers is not None:
merged_headers.update(headers)
if headers is None:
headers = CaseInsensitiveDict()
headers["Authorization"] = "Bearer {}".format(
self.__session.tokens().get("playlist-read"))
headers["client-token"] = self.__client_token_str
if "Authorization" not in merged_headers:
merged_headers["Authorization"] = "Bearer {}".format(
self.__session.tokens().get("playlist-read"))
request = requests.Request(method, url, headers=headers, data=body)
if "client-token" not in merged_headers:
merged_headers["client-token"] = self.__client_token_str
return request.prepare()
full_url = (self.__base_url if url is None else url) + suffix
request = requests.Request(
method=method,
url=full_url,
headers=merged_headers,
data=body,
)
session = self.__session.client()
return session.prepare_request(request)
def send(
self,
method: str,
suffix: str,
headers: typing.Union[None, CaseInsensitiveDict[str, str]],
headers: typing.Union[None, typing.Dict[str, str]],
body: typing.Union[None, bytes],
) -> requests.Response:
"""
@@ -133,7 +149,7 @@ class ApiClient(Closeable):
:param method: str:
:param suffix: str:
:param headers: typing.Union[None:
:param CaseInsensitiveDict[str:
:param typing.Dict[str:
:param str]]:
:param body: typing.Union[None:
:param bytes]:
@@ -148,20 +164,18 @@ class ApiClient(Closeable):
method: str,
url: str,
suffix: str,
headers: typing.Union[None, CaseInsensitiveDict[str, str]],
headers: typing.Union[None, typing.Dict[str, str]],
body: typing.Union[None, bytes],
) -> requests.Response:
"""
:param method: str:
:param url: str:
:param suffix: str:
:param headers: typing.Union[None:
:param CaseInsensitiveDict[str:
:param typing.Dict[str:
:param str]]:
:param body: typing.Union[None:
:param bytes]:
"""
response = self.__session.client().send(
self.build_request(method, suffix, headers, body, url))
@@ -192,36 +206,22 @@ class ApiClient(Closeable):
self.logger.warning("PUT state returned {}. headers: {}".format(
response.status_code, response.headers))
def get_ext_metadata(self, extension_kind: ExtensionKind, uri: str):
headers = CaseInsensitiveDict({"content-type": "application/x-protobuf"})
req = EntityRequest(entity_uri=uri, query=[ExtensionQuery(extension_kind=extension_kind),])
response = self.send("POST", "/extended-metadata/v0/extended-metadata",
headers, BatchedEntityRequest(entity_request=[req,]).SerializeToString())
ApiClient.StatusCodeException.check_status(response)
body = response.content
if body is None:
raise ConnectionError("Extended Metadata request failed: No response body")
proto = BatchedExtensionResponse()
proto.ParseFromString(body)
entityextd = proto.extended_metadata.pop().extension_data.pop()
if entityextd.header.status_code != 200:
raise ConnectionError("Extended Metadata request failed: Status code {}".format(entityextd.header.status_code))
mdb: bytes = entityextd.extension_data.value
return mdb
def get_metadata_4_track(self, track: TrackId) -> Metadata.Track:
"""
:param track: TrackId:
"""
mdb = self.get_ext_metadata(ExtensionKind.TRACK_V4, track.to_spotify_uri())
md = Metadata.Track()
md.ParseFromString(mdb)
return md
response = self.sendToUrl("GET", "https://spclient.wg.spotify.com",
"/metadata/4/track/{}".format(track.hex_id()),
None, None)
ApiClient.StatusCodeException.check_status(response)
body = response.content
if body is None:
raise RuntimeError()
proto = Metadata.Track()
proto.ParseFromString(body)
return proto
def get_metadata_4_episode(self, episode: EpisodeId) -> Metadata.Episode:
"""
@@ -229,10 +229,16 @@ class ApiClient(Closeable):
:param episode: EpisodeId:
"""
mdb = self.get_ext_metadata(ExtensionKind.EPISODE_V4, episode.to_spotify_uri())
md = Metadata.Episode()
md.ParseFromString(mdb)
return md
response = self.sendToUrl("GET", "https://spclient.wg.spotify.com",
"/metadata/4/episode/{}".format(episode.hex_id()),
None, None)
ApiClient.StatusCodeException.check_status(response)
body = response.content
if body is None:
raise IOError()
proto = Metadata.Episode()
proto.ParseFromString(body)
return proto
def get_metadata_4_album(self, album: AlbumId) -> Metadata.Album:
"""
@@ -240,10 +246,17 @@ class ApiClient(Closeable):
:param album: AlbumId:
"""
mdb = self.get_ext_metadata(ExtensionKind.ALBUM_V4, album.to_spotify_uri())
md = Metadata.Album()
md.ParseFromString(mdb)
return md
response = self.sendToUrl("GET", "https://spclient.wg.spotify.com",
"/metadata/4/album/{}".format(album.hex_id()),
None, None)
ApiClient.StatusCodeException.check_status(response)
body = response.content
if body is None:
raise IOError()
proto = Metadata.Album()
proto.ParseFromString(body)
return proto
def get_metadata_4_artist(self, artist: ArtistId) -> Metadata.Artist:
"""
@@ -251,10 +264,16 @@ class ApiClient(Closeable):
:param artist: ArtistId:
"""
mdb = self.get_ext_metadata(ExtensionKind.ARTIST_V4, artist.to_spotify_uri())
md = Metadata.Artist()
md.ParseFromString(mdb)
return md
response = self.sendToUrl("GET", "https://spclient.wg.spotify.com",
"/metadata/4/artist/{}".format(artist.hex_id()),
None, None)
ApiClient.StatusCodeException.check_status(response)
body = response.content
if body is None:
raise IOError()
proto = Metadata.Artist()
proto.ParseFromString(body)
return proto
def get_metadata_4_show(self, show: ShowId) -> Metadata.Show:
"""
@@ -262,10 +281,16 @@ class ApiClient(Closeable):
:param show: ShowId:
"""
mdb = self.get_ext_metadata(ExtensionKind.SHOW_V4, show.to_spotify_uri())
md = Metadata.Show()
md.ParseFromString(mdb)
return md
response = self.send("GET",
"/metadata/4/show/{}".format(show.hex_id()), None,
None)
ApiClient.StatusCodeException.check_status(response)
body = response.content
if body is None:
raise IOError()
proto = Metadata.Show()
proto.ParseFromString(body)
return proto
def get_playlist(self,
_id: PlaylistId) -> Playlist4External.SelectedListContent:
@@ -285,6 +310,216 @@ class ApiClient(Closeable):
proto.ParseFromString(body)
return proto
def get_audio_files_extension(
self, track: TrackId
) -> typing.Optional[audio_files_extension_pb2.AudioFilesExtensionResponse]:
"""Fetch audio file metadata via extended metadata for a given track."""
spotify_uri = track.to_spotify_uri()
request = extended_metadata_pb2.BatchedEntityRequest()
header = request.header
def _resolve_country_code() -> typing.Optional[str]:
code = getattr(self.__session, "_Session__country_code", None)
if code:
code = str(code).strip().upper()
if len(code) == 2 and code.isalpha():
return code
return None
country_code = _resolve_country_code()
if country_code:
header.country = country_code
try:
catalogue = self.__session.ap_welcome().catalogue
except AttributeError:
catalogue = None
except Exception: # pragma: no cover - defensive guard if ap_welcome raises
catalogue = None
if catalogue:
header.catalogue = catalogue
entity_request = request.entity_request.add()
entity_request.entity_uri = spotify_uri
query = entity_request.query.add()
query.extension_kind = extension_kind_pb2.ExtensionKind.AUDIO_FILES
request_bytes = request.SerializeToString()
def _decode_audio_files_extension(
payload: typing.Optional[typing.Union[bytes, bytearray, typing.Iterable[bytes]]]
) -> typing.Optional[audio_files_extension_pb2.AudioFilesExtensionResponse]:
if not payload:
return None
if isinstance(payload, (bytes, bytearray)):
payload_bytes = bytes(payload)
elif isinstance(payload, Iterable): # Mercury responses sometimes return payload parts
payload_bytes = b"".join(typing.cast(typing.Iterable[bytes], payload))
else:
payload_bytes = bytes(payload)
batch_response = extended_metadata_pb2.BatchedExtensionResponse()
try:
batch_response.ParseFromString(payload_bytes)
except _message.DecodeError:
self.logger.debug(
"Failed to parse extended metadata payload for %s",
spotify_uri,
exc_info=True,
)
return None
for extension_array in batch_response.extended_metadata:
if extension_array.extension_kind != extension_kind_pb2.ExtensionKind.AUDIO_FILES:
continue
for entity in extension_array.extension_data:
if entity.entity_uri and entity.entity_uri != spotify_uri:
continue
if not entity.HasField("extension_data"):
continue
audio_response = audio_files_extension_pb2.AudioFilesExtensionResponse()
try:
entity.extension_data.Unpack(audio_response)
except (ValueError, _message.DecodeError):
try:
audio_response.ParseFromString(entity.extension_data.value)
except _message.DecodeError:
self.logger.debug(
"Failed to unpack audio files extension for %s",
spotify_uri,
exc_info=True,
)
continue
return audio_response
return None
# Prefer the HTTPS extended metadata endpoint; fall back to Mercury if necessary.
login5_token = None
try:
login5_token = self.__session.get_login5_token()
except Exception: # pragma: no cover - defensive guard if session raises unexpectedly
login5_token = None
bearer_token = login5_token
if not bearer_token:
try:
bearer_token = self.__session.tokens().get("playlist-read")
except Exception:
bearer_token = None
headers = {
"Content-Type": "application/x-protobuf",
"Accept": "application/x-protobuf",
"Content-Length": str(len(request_bytes)),
}
if bearer_token:
headers["Authorization"] = f"Bearer {bearer_token}"
preferred_locale = getattr(self.__session, "preferred_locale", None)
if callable(preferred_locale): # Session.preferred_locale() is a method
try:
locale_value = preferred_locale()
except Exception:
locale_value = None
else:
locale_value = preferred_locale
if isinstance(locale_value, str) and locale_value:
headers.setdefault("Accept-Language", locale_value)
query_params: dict[str, str] = {"product": "0"}
if country_code:
query_params["country"] = country_code
query_params["salt"] = str(random.randint(0, 0xFFFFFFFF))
suffix = "/extended-metadata/v0/extended-metadata?" + urllib.parse.urlencode(query_params)
def _http_post(url_override: typing.Optional[str]) -> typing.Optional[requests.Response]:
target_headers = headers.copy()
for attempt in range(2):
try:
if url_override is None:
response = self.send("POST", suffix, target_headers, request_bytes)
else:
response = self.sendToUrl("POST", url_override, suffix, target_headers, request_bytes)
except Exception as exc: # pragma: no cover - network errors handled gracefully
self.logger.debug(
"Extended metadata HTTP request failed for %s via %s: %s",
spotify_uri,
url_override or "AP host",
exc,
)
return None
if response is not None and response.status_code in (401, 403):
self.logger.debug(
"Extended metadata HTTP returned %s for %s; refreshing client token",
response.status_code,
spotify_uri,
)
self.__client_token_str = None
target_headers.pop("client-token", None)
continue
return response
return None
http_response = _http_post(None)
if http_response is None or http_response.status_code != 200:
http_response = _http_post("https://spclient.wg.spotify.com")
if http_response is not None:
if http_response.status_code == 200:
http_payload: typing.Optional[bytes]
if isinstance(http_response.content, (bytes, bytearray)):
http_payload = bytes(http_response.content)
else:
http_payload = None
http_extension = _decode_audio_files_extension(http_payload)
if http_extension is not None:
return http_extension
else:
self.logger.debug(
"Extended metadata HTTP returned status %s for %s",
http_response.status_code,
spotify_uri,
)
mercury_request = (
RawMercuryRequest.new_builder()
.set_uri("hm://extendedmetadata/v1/entity")
.set_method("POST")
.set_content_type("application/x-protobuf")
.add_payload_part(request_bytes)
.build()
)
try:
response = self.__session.mercury().send_sync(mercury_request)
except Exception as exc: # pragma: no cover - network errors handled gracefully
self.logger.debug(
"Extended metadata request failed for %s: %s",
spotify_uri,
exc,
)
return None
if response.status_code != 200 or not response.payload:
self.logger.debug(
"Extended metadata returned status %s for %s",
response.status_code,
spotify_uri,
)
return None
return _decode_audio_files_extension(response.payload)
def set_client_token(self, client_token):
"""
@@ -319,10 +554,10 @@ class ApiClient(Closeable):
resp = requests.post(
"https://clienttoken.spotify.com/v1/clienttoken",
proto_req.SerializeToString(),
headers=CaseInsensitiveDict({
headers={
"Accept": "application/x-protobuf",
"Content-Encoding": "",
}),
},
)
ApiClient.StatusCodeException.check_status(resp)
@@ -596,10 +831,10 @@ class DealerClient(Closeable):
return
self.__message_listeners_lock.wait()
def __get_headers(self, obj: typing.Any) -> CaseInsensitiveDict[str, str]:
def __get_headers(self, obj: typing.Any) -> dict[str, str]:
headers = obj.get("headers")
if headers is None:
return CaseInsensitiveDict()
return {}
return headers
class ConnectionHolder(Closeable):
@@ -923,6 +1158,8 @@ class Session(Closeable, MessageListener, SubListener):
__stored_str: str = ""
__token_provider: typing.Union[TokenProvider, None]
__user_attributes = {}
__login5_access_token: typing.Union[str, None] = None
__login5_token_expiry: typing.Union[int, None] = None
def __init__(self, inner: Inner, address: str) -> None:
self.__client = Session.create_client(inner.conf)
@@ -962,6 +1199,7 @@ class Session(Closeable, MessageListener, SubListener):
"""
self.__authenticate_partial(credential, False)
self.__authenticate_login5(credential)
with self.__auth_lock:
self.__mercury_client = MercuryClient(self)
self.__token_provider = TokenProvider(self)
@@ -1204,12 +1442,12 @@ class Session(Closeable, MessageListener, SubListener):
raise RuntimeError("Session isn't authenticated!")
return self.__mercury_client
def on_message(self, uri: str, headers: CaseInsensitiveDict[str, str],
def on_message(self, uri: str, headers: typing.Dict[str, str],
payload: bytes):
"""
:param uri: str:
:param headers: CaseInsensitiveDict[str:
:param headers: typing.Dict[str:
:param str]:
:param payload: bytes:
@@ -1379,6 +1617,64 @@ class Session(Closeable, MessageListener, SubListener):
def __send_unchecked(self, cmd: bytes, payload: bytes) -> None:
self.cipher_pair.send_encoded(self.connection, cmd, payload)
def __authenticate_login5(self, credential: Authentication.LoginCredentials) -> None:
"""Authenticate using Login5 to get access token"""
if not LOGIN5_AVAILABLE:
self.logger.warning("Login5 protobuf files not available, skipping Login5 authentication")
return
try:
# Build Login5 request
login5_request = Login5.LoginRequest()
# Set client info
login5_request.client_info.client_id = "65b708073fc0480ea92a077233ca87bd"
login5_request.client_info.device_id = self.__inner.device_id
# Set stored credential from APWelcome
if hasattr(self, '_Session__ap_welcome') and self.__ap_welcome:
stored_cred = Login5Credentials.StoredCredential()
stored_cred.username = self.__ap_welcome.canonical_username
stored_cred.data = self.__ap_welcome.reusable_auth_credentials
login5_request.stored_credential.CopyFrom(stored_cred)
# Send Login5 request
login5_url = "https://login5.spotify.com/v3/login"
headers = {
"Content-Type": "application/x-protobuf",
"Accept": "application/x-protobuf"
}
response = requests.post(
login5_url,
data=login5_request.SerializeToString(),
headers=headers
)
if response.status_code == 200:
login5_response = Login5.LoginResponse()
login5_response.ParseFromString(response.content)
if login5_response.HasField('ok'):
self.__login5_access_token = login5_response.ok.access_token
self.__login5_token_expiry = int(time.time()) + login5_response.ok.access_token_expires_in
self.logger.info("Login5 authentication successful, got access token")
else:
self.logger.warning("Login5 authentication failed: {}".format(login5_response.error))
else:
self.logger.warning("Login5 request failed with status: {}".format(response.status_code))
except Exception as e:
self.logger.warning("Failed to authenticate with Login5: {}".format(e))
def get_login5_token(self) -> typing.Union[str, None]:
"""Get the Login5 access token if available and not expired"""
if self.__login5_access_token and self.__login5_token_expiry:
if int(time.time()) < self.__login5_token_expiry - 60: # 60 second buffer
return self.__login5_access_token
else:
self.logger.debug("Login5 token expired, need to re-authenticate")
return None
def __wait_auth_lock(self) -> None:
if self.__closing and self.connection is None:
self.logger.debug("Connection was broken while closing.")
@@ -1610,7 +1906,6 @@ class Session(Closeable, MessageListener, SubListener):
pass
else:
try:
# Try Python librespot format first
self.login_credentials = Authentication.LoginCredentials(
typ=Authentication.AuthenticationType.Value(
obj["type"]),
@@ -1618,27 +1913,7 @@ class Session(Closeable, MessageListener, SubListener):
auth_data=base64.b64decode(obj["credentials"]),
)
except KeyError:
# Try Rust librespot format (auth_type as int, auth_data instead of credentials)
try:
self.login_credentials = Authentication.LoginCredentials(
typ=obj["auth_type"],
username=obj["username"],
auth_data=base64.b64decode(obj["auth_data"]),
)
except KeyError:
pass
return self
def oauth(self, oauth_url_callback, success_page_content = None) -> Session.Builder:
"""
Login via OAuth
You can supply an oauth_url_callback method that takes a string and returns the OAuth URL.
When oauth_url_callback is None, this will only log the auth url to the console.
"""
if os.path.isfile(self.conf.stored_credentials_file):
return self.stored_file(None)
self.login_credentials = OAuth(MercuryRequests.keymaster_client_id, "http://127.0.0.1:5588/login", oauth_url_callback).set_success_page_content(success_page_content).flow()
pass
return self
def user_pass(self, username: str, password: str) -> Session.Builder:
@@ -1907,7 +2182,23 @@ class Session(Closeable, MessageListener, SubListener):
ap_address = address.split(":")[0]
ap_port = int(address.split(":")[1])
sock = socket.socket()
sock.connect((ap_address, ap_port))
# Retry logic: try up to 3 times with 2 seconds between attempts
# for transient connection errors (e.g., ECONNREFUSED / error 111).
attempts = 0
last_err: typing.Optional[Exception] = None
while attempts < 3:
attempts += 1
try:
sock.connect((ap_address, ap_port))
break
except OSError as exc:
last_err = exc
# Connection refused / temporary failure
if attempts >= 3:
raise
time.sleep(2)
return Session.ConnectionHolder(sock)
def close(self) -> None:
@@ -1916,12 +2207,20 @@ class Session(Closeable, MessageListener, SubListener):
def flush(self) -> None:
"""Flush data to socket"""
try:
self.__buffer.seek(0)
self.__socket.send(self.__buffer.read())
self.__buffer = io.BytesIO()
except BrokenPipeError:
pass
attempts = 0
while True:
try:
self.__buffer.seek(0)
self.__socket.send(self.__buffer.read())
self.__buffer = io.BytesIO()
break
except ConnectionResetError as exc:
attempts += 1
if attempts >= 3:
raise
time.sleep(1)
except BrokenPipeError:
break
def read(self, length: int) -> bytes:
"""Read data from socket
@@ -2023,12 +2322,41 @@ class Session(Closeable, MessageListener, SubListener):
self.__thread.start()
def stop(self) -> None:
""" """
"""Signal the receiver thread to stop and wait for it.
This ensures that the background thread exits cleanly before
the underlying socket/connection is closed, avoiding
"Bad file descriptor" errors from pending recv() calls.
"""
self.__running = False
try:
# Joining from within the same thread would deadlock, so
# guard against that.
if threading.current_thread() is not self.__thread:
self.__thread.join(timeout=1)
except Exception:
# Shutdown should be best-effort; if join fails, we
# still proceed with closing the session.
self.__session.logger.debug(
"Receiver.stop: failed to join receiver thread", exc_info=True
)
def run(self) -> None:
"""Receive Packet thread function"""
self.__session.logger.info("Session.Receiver started")
# If the session has been explicitly closed elsewhere, do not
# start the receive loop at all; this prevents infinite
# reconnect cycles when the caller is done.
if not self.__running:
return
# Track how many times in a row we have seen a connection
# reset (Errno 104). After a small number of consecutive
# occurrences, stop trying to reconnect to avoid an
# infinite loop when the remote side keeps closing.
consecutive_resets = 0
max_consecutive_resets = 3
while self.__running:
packet: Packet
cmd: bytes
@@ -2042,10 +2370,70 @@ class Session(Closeable, MessageListener, SubListener):
format(util.bytes_to_hex(packet.cmd),
packet.payload))
continue
except (RuntimeError, ConnectionResetError) as ex:
if self.__running:
except (RuntimeError, ConnectionResetError, OSError) as ex:
# If we've been asked to stop, just exit quietly without
# trying to reconnect. This avoids the situation where the
# session keeps reconnecting in a loop after the work is
# finished and the caller expects shutdown.
if not self.__running:
# When the underlying socket is closed as part of a
# normal shutdown, recv() may raise "Bad file
# descriptor" (errno 9). This is expected and
# harmless, so we skip logging it to avoid noisy
# messages after a clean Session.close().
if not (
isinstance(ex, OSError)
and getattr(ex, "errno", None) == 9
):
self.__session.logger.info(
"Receiver stopping after connection error: %s", ex
)
break
# Detect repeated "connection reset by peer" errors.
is_reset = isinstance(ex, ConnectionResetError) or (
isinstance(ex, OSError)
and getattr(ex, "errno", None) == 104
)
# If the underlying socket is already closed (e.g.
# Bad file descriptor), just stop quietly; this can
# happen when Session.close() has torn down the
# connection while the receiver was blocked in recv().
if isinstance(ex, OSError) and getattr(ex, "errno", None) == 9:
#self.__session.logger.info(
# "Receiver stopping after socket close (errno 9)"
#)
self.__running = False
break
if is_reset:
consecutive_resets += 1
self.__session.logger.fatal(
"Failed reading packet! {}".format(ex))
"Failed reading packet (reset #%d)! %s",
consecutive_resets,
ex,
)
if consecutive_resets >= max_consecutive_resets:
self.__session.logger.error(
"Too many consecutive connection resets (%d). "
"Stopping receiver without further reconnect attempts.",
consecutive_resets,
)
# Mark as not running so the outer loop and
# any future reconnect logic will see that the
# session should shut down.
self.__running = False
break
else:
consecutive_resets = 0
self.__session.logger.fatal(
"Failed reading packet! %s", ex
)
# For both reset and non-reset errors (unless we've
# exceeded the reset threshold), attempt a single
# reconnect.
if self.__running:
self.__session.reconnect()
break
if not self.__running:
@@ -2261,7 +2649,7 @@ class TokenProvider:
__tokens: typing.List[StoredToken] = []
def __init__(self, session: Session):
self.__session = session
self._session = session
def find_token_with_all_scopes(
self, scopes: typing.List[str]) -> typing.Union[StoredToken, None]:
@@ -2292,60 +2680,58 @@ class TokenProvider:
scopes = list(scopes)
if len(scopes) == 0:
raise RuntimeError("The token doesn't have any scope")
login5_token = self._session.get_login5_token()
if login5_token:
# Create a StoredToken-compatible object using Login5 token
login5_stored_token = TokenProvider.Login5StoredToken(login5_token, scopes)
self.logger.debug("Using Login5 access token for scopes: {}".format(scopes))
return login5_stored_token
token = self.find_token_with_all_scopes(scopes)
if token is not None:
if token.expired():
self.__tokens.remove(token)
self.logger.debug("Login5 token expired, need to re-authenticate")
else:
return token
self.logger.debug(
"Token expired or not suitable, requesting again. scopes: {}, old_token: {}"
.format(scopes, token))
token = self.login5(scopes)
if token is not None:
try:
response = self._session.mercury().send_sync_json(
MercuryRequests.request_token(self._session.device_id(),
",".join(scopes)))
token = TokenProvider.StoredToken(response)
self.logger.debug(
"Updated token successfully! scopes: {}, new_token: {}".format(
scopes, token))
self.__tokens.append(token)
self.logger.debug("Using Login5 access token for scopes: {}".format(scopes))
return token
return token
except Exception as e:
self.logger.warning("Failed to get token from keymaster endpoint: {}".format(e))
raise RuntimeError("Unable to obtain access token")
def login5(self, scopes: typing.List[str]) -> typing.Union[StoredToken, None]:
"""Submit Login5 request for a fresh access token"""
class Login5StoredToken:
"""StoredToken-compatible wrapper for Login5 access tokens"""
access_token: str
scopes: typing.List[str]
if self.__session.ap_welcome():
login5_request = Login5.LoginRequest()
login5_request.client_info.client_id = MercuryRequests.keymaster_client_id
login5_request.client_info.device_id = self.__session.device_id()
stored_cred = Login5Credentials.StoredCredential()
stored_cred.username = self.__session.username()
stored_cred.data = self.__session.ap_welcome().reusable_auth_credentials
login5_request.stored_credential.CopyFrom(stored_cred)
response = requests.post(
"https://login5.spotify.com/v3/login",
data=login5_request.SerializeToString(),
headers=CaseInsensitiveDict({
"Content-Type": "application/x-protobuf",
"Accept": "application/x-protobuf"
}))
if response.status_code == 200:
login5_response = Login5.LoginResponse()
login5_response.ParseFromString(response.content)
if login5_response.HasField('ok'):
self.logger.info("Login5 authentication successful, got access token".format(login5_response.ok.access_token))
token = TokenProvider.StoredToken({
"expiresIn": login5_response.ok.access_token_expires_in, # approximately one hour
"accessToken": login5_response.ok.access_token,
"scope": scopes
})
return token
else:
self.logger.warning("Login5 authentication failed: {}".format(login5_response.error))
else:
self.logger.warning("Login5 request failed with status: {}".format(response.status_code))
else:
self.logger.error("Login5 authentication failed: No APWelcome found")
def __init__(self, access_token: str, scopes: typing.List[str]):
self.access_token = access_token
self.scopes = scopes
def expired(self) -> bool:
"""Login5 tokens are managed by Session, so delegate expiry check"""
return False # Session handles expiry
def has_scope(self, scope: str) -> bool:
"""Login5 tokens are general-purpose, assume they have all scopes"""
return True
def has_scopes(self, sc: typing.List[str]) -> bool:
"""Login5 tokens are general-purpose, assume they have all scopes"""
return True
class StoredToken:
""" """