from __future__ import annotations import base64 import binascii import concurrent.futures import enum import gzip import io import json import logging import os import random import sched import socket import struct import threading import time import typing import urllib.parse from collections.abc import Iterable import defusedxml.ElementTree import requests import websocket from google.protobuf import message as _message from Cryptodome import Random from Cryptodome.Cipher import AES from Cryptodome.Hash import HMAC from Cryptodome.Hash import SHA1 from Cryptodome.Protocol.KDF import PBKDF2 from Cryptodome.PublicKey import RSA from Cryptodome.Signature import PKCS1_v1_5 from librespot import util from librespot import Version from librespot.oauth import OAuth from librespot.audio import AudioKeyManager from librespot.audio import CdnManager from librespot.audio import PlayableContentFeeder from librespot.audio.storage import ChannelManager from librespot.cache import CacheManager from librespot.crypto import CipherPair from librespot.crypto import DiffieHellman from librespot.crypto import Packet from librespot.mercury import MercuryClient from librespot.mercury import MercuryRequests from librespot.mercury import RawMercuryRequest from librespot.metadata import AlbumId from librespot.metadata import ArtistId from librespot.metadata import EpisodeId from librespot.metadata import PlaylistId from librespot.metadata import ShowId from librespot.metadata import TrackId from librespot.proto import Authentication_pb2 as Authentication from librespot.proto import ClientToken_pb2 as ClientToken from librespot.proto import Connect_pb2 as Connect from librespot.proto import Connectivity_pb2 as Connectivity from librespot.proto import Keyexchange_pb2 as Keyexchange from librespot.proto import Metadata_pb2 as Metadata from librespot.proto import Playlist4External_pb2 as Playlist4External from librespot.proto_ext import audio_files_extension_pb2 from librespot.proto_ext import extended_metadata_pb2 from librespot.proto_ext import extension_kind_pb2 try: from librespot.proto.spotify.login5.v3 import Login5_pb2 as Login5 from librespot.proto.spotify.login5.v3 import ClientInfo_pb2 as Login5ClientInfo from librespot.proto.spotify.login5.v3.credentials import Credentials_pb2 as Login5Credentials LOGIN5_AVAILABLE = True except ImportError as e: # Login5 protobuf files not available, will use fallback LOGIN5_AVAILABLE = False Login5 = None Login5ClientInfo = None Login5Credentials = None from librespot.proto.ExplicitContentPubsub_pb2 import UserAttributesUpdate from librespot.structure import Closeable from librespot.structure import MessageListener from librespot.structure import RequestListener from librespot.structure import SubListener class ApiClient(Closeable): """ """ logger = logging.getLogger("Librespot:ApiClient") __base_url: str __client_token_str: str = None __session: Session def __init__(self, session: Session): self.__session = session self.__base_url = "https://{}".format(ApResolver.get_random_spclient()) def build_request( self, method: str, suffix: str, headers: typing.Union[None, typing.Dict[str, str]], body: typing.Union[None, bytes], url: typing.Union[None, str], ) -> requests.PreparedRequest: """ :param method: str: :param suffix: str: :param headers: typing.Union[None: :param typing.Dict[str: :param str]]: :param body: typing.Union[None: :param bytes]: :param url: typing.Union[None: :param str]: """ if self.__client_token_str is None: resp = self.__client_token() self.__client_token_str = resp.granted_token.token self.logger.debug("Updated client token: {}".format( self.__client_token_str)) merged_headers: dict[str, str] = {} if headers is not None: merged_headers.update(headers) if "Authorization" not in merged_headers: merged_headers["Authorization"] = "Bearer {}".format( self.__session.tokens().get("playlist-read")) if "client-token" not in merged_headers: merged_headers["client-token"] = self.__client_token_str full_url = (self.__base_url if url is None else url) + suffix request = requests.Request( method=method, url=full_url, headers=merged_headers, data=body, ) session = self.__session.client() return session.prepare_request(request) def send( self, method: str, suffix: str, headers: typing.Union[None, typing.Dict[str, str]], body: typing.Union[None, bytes], ) -> requests.Response: """ :param method: str: :param suffix: str: :param headers: typing.Union[None: :param typing.Dict[str: :param str]]: :param body: typing.Union[None: :param bytes]: """ response = self.__session.client().send( self.build_request(method, suffix, headers, body, None)) return response def sendToUrl( self, method: str, url: str, suffix: str, headers: typing.Union[None, typing.Dict[str, str]], body: typing.Union[None, bytes], ) -> requests.Response: """ :param method: str: :param url: str: :param suffix: str: :param headers: typing.Union[None: :param typing.Dict[str: :param str]]: :param body: typing.Union[None: :param bytes]: """ response = self.__session.client().send( self.build_request(method, suffix, headers, body, url)) return response def put_connect_state(self, connection_id: str, proto: Connect.PutStateRequest) -> None: """ :param connection_id: str: :param proto: Connect.PutStateRequest: """ response = self.send( "PUT", "/connect-state/v1/devices/{}".format(self.__session.device_id()), { "Content-Type": "application/protobuf", "X-Spotify-Connection-Id": connection_id, }, proto.SerializeToString(), ) if response.status_code == 413: self.logger.warning( "PUT state payload is too large: {} bytes uncompressed.". format(len(proto.SerializeToString()))) elif response.status_code != 200: self.logger.warning("PUT state returned {}. headers: {}".format( response.status_code, response.headers)) def get_metadata_4_track(self, track: TrackId) -> Metadata.Track: """ :param track: TrackId: """ response = self.sendToUrl("GET", "https://spclient.wg.spotify.com", "/metadata/4/track/{}".format(track.hex_id()), None, None) ApiClient.StatusCodeException.check_status(response) body = response.content if body is None: raise RuntimeError() proto = Metadata.Track() proto.ParseFromString(body) return proto def get_metadata_4_episode(self, episode: EpisodeId) -> Metadata.Episode: """ :param episode: EpisodeId: """ response = self.sendToUrl("GET", "https://spclient.wg.spotify.com", "/metadata/4/episode/{}".format(episode.hex_id()), None, None) ApiClient.StatusCodeException.check_status(response) body = response.content if body is None: raise IOError() proto = Metadata.Episode() proto.ParseFromString(body) return proto def get_metadata_4_album(self, album: AlbumId) -> Metadata.Album: """ :param album: AlbumId: """ response = self.sendToUrl("GET", "https://spclient.wg.spotify.com", "/metadata/4/album/{}".format(album.hex_id()), None, None) ApiClient.StatusCodeException.check_status(response) body = response.content if body is None: raise IOError() proto = Metadata.Album() proto.ParseFromString(body) return proto def get_metadata_4_artist(self, artist: ArtistId) -> Metadata.Artist: """ :param artist: ArtistId: """ response = self.sendToUrl("GET", "https://spclient.wg.spotify.com", "/metadata/4/artist/{}".format(artist.hex_id()), None, None) ApiClient.StatusCodeException.check_status(response) body = response.content if body is None: raise IOError() proto = Metadata.Artist() proto.ParseFromString(body) return proto def get_metadata_4_show(self, show: ShowId) -> Metadata.Show: """ :param show: ShowId: """ response = self.send("GET", "/metadata/4/show/{}".format(show.hex_id()), None, None) ApiClient.StatusCodeException.check_status(response) body = response.content if body is None: raise IOError() proto = Metadata.Show() proto.ParseFromString(body) return proto def get_playlist(self, _id: PlaylistId) -> Playlist4External.SelectedListContent: """ :param _id: PlaylistId: """ response = self.send("GET", "/playlist/v2/playlist/{}".format(_id.id()), None, None) ApiClient.StatusCodeException.check_status(response) body = response.content if body is None: raise IOError() proto = Playlist4External.SelectedListContent() proto.ParseFromString(body) return proto def get_audio_files_extension( self, track: TrackId ) -> typing.Optional[audio_files_extension_pb2.AudioFilesExtensionResponse]: """Fetch audio file metadata via extended metadata for a given track.""" spotify_uri = track.to_spotify_uri() request = extended_metadata_pb2.BatchedEntityRequest() header = request.header def _resolve_country_code() -> typing.Optional[str]: code = getattr(self.__session, "_Session__country_code", None) if code: code = str(code).strip().upper() if len(code) == 2 and code.isalpha(): return code return None country_code = _resolve_country_code() if country_code: header.country = country_code try: catalogue = self.__session.ap_welcome().catalogue except AttributeError: catalogue = None except Exception: # pragma: no cover - defensive guard if ap_welcome raises catalogue = None if catalogue: header.catalogue = catalogue entity_request = request.entity_request.add() entity_request.entity_uri = spotify_uri query = entity_request.query.add() query.extension_kind = extension_kind_pb2.ExtensionKind.AUDIO_FILES request_bytes = request.SerializeToString() def _decode_audio_files_extension( payload: typing.Optional[typing.Union[bytes, bytearray, typing.Iterable[bytes]]] ) -> typing.Optional[audio_files_extension_pb2.AudioFilesExtensionResponse]: if not payload: return None if isinstance(payload, (bytes, bytearray)): payload_bytes = bytes(payload) elif isinstance(payload, Iterable): # Mercury responses sometimes return payload parts payload_bytes = b"".join(typing.cast(typing.Iterable[bytes], payload)) else: payload_bytes = bytes(payload) batch_response = extended_metadata_pb2.BatchedExtensionResponse() try: batch_response.ParseFromString(payload_bytes) except _message.DecodeError: self.logger.debug( "Failed to parse extended metadata payload for %s", spotify_uri, exc_info=True, ) return None for extension_array in batch_response.extended_metadata: if extension_array.extension_kind != extension_kind_pb2.ExtensionKind.AUDIO_FILES: continue for entity in extension_array.extension_data: if entity.entity_uri and entity.entity_uri != spotify_uri: continue if not entity.HasField("extension_data"): continue audio_response = audio_files_extension_pb2.AudioFilesExtensionResponse() try: entity.extension_data.Unpack(audio_response) except (ValueError, _message.DecodeError): try: audio_response.ParseFromString(entity.extension_data.value) except _message.DecodeError: self.logger.debug( "Failed to unpack audio files extension for %s", spotify_uri, exc_info=True, ) continue return audio_response return None # Prefer the HTTPS extended metadata endpoint; fall back to Mercury if necessary. login5_token = None try: login5_token = self.__session.get_login5_token() except Exception: # pragma: no cover - defensive guard if session raises unexpectedly login5_token = None bearer_token = login5_token if not bearer_token: try: bearer_token = self.__session.tokens().get("playlist-read") except Exception: bearer_token = None headers = { "Content-Type": "application/x-protobuf", "Accept": "application/x-protobuf", "Content-Length": str(len(request_bytes)), } if bearer_token: headers["Authorization"] = f"Bearer {bearer_token}" preferred_locale = getattr(self.__session, "preferred_locale", None) if callable(preferred_locale): # Session.preferred_locale() is a method try: locale_value = preferred_locale() except Exception: locale_value = None else: locale_value = preferred_locale if isinstance(locale_value, str) and locale_value: headers.setdefault("Accept-Language", locale_value) query_params: dict[str, str] = {"product": "0"} if country_code: query_params["country"] = country_code query_params["salt"] = str(random.randint(0, 0xFFFFFFFF)) suffix = "/extended-metadata/v0/extended-metadata?" + urllib.parse.urlencode(query_params) def _http_post(url_override: typing.Optional[str]) -> typing.Optional[requests.Response]: target_headers = headers.copy() for attempt in range(2): try: if url_override is None: response = self.send("POST", suffix, target_headers, request_bytes) else: response = self.sendToUrl("POST", url_override, suffix, target_headers, request_bytes) except Exception as exc: # pragma: no cover - network errors handled gracefully self.logger.debug( "Extended metadata HTTP request failed for %s via %s: %s", spotify_uri, url_override or "AP host", exc, ) return None if response is not None and response.status_code in (401, 403): self.logger.debug( "Extended metadata HTTP returned %s for %s; refreshing client token", response.status_code, spotify_uri, ) self.__client_token_str = None target_headers.pop("client-token", None) continue return response return None http_response = _http_post(None) if http_response is None or http_response.status_code != 200: http_response = _http_post("https://spclient.wg.spotify.com") if http_response is not None: if http_response.status_code == 200: http_payload: typing.Optional[bytes] if isinstance(http_response.content, (bytes, bytearray)): http_payload = bytes(http_response.content) else: http_payload = None http_extension = _decode_audio_files_extension(http_payload) if http_extension is not None: return http_extension else: self.logger.debug( "Extended metadata HTTP returned status %s for %s", http_response.status_code, spotify_uri, ) mercury_request = ( RawMercuryRequest.new_builder() .set_uri("hm://extendedmetadata/v1/entity") .set_method("POST") .set_content_type("application/x-protobuf") .add_payload_part(request_bytes) .build() ) try: response = self.__session.mercury().send_sync(mercury_request) except Exception as exc: # pragma: no cover - network errors handled gracefully self.logger.debug( "Extended metadata request failed for %s: %s", spotify_uri, exc, ) return None if response.status_code != 200 or not response.payload: self.logger.debug( "Extended metadata returned status %s for %s", response.status_code, spotify_uri, ) return None return _decode_audio_files_extension(response.payload) def set_client_token(self, client_token): """ :param client_token: """ self.__client_token_str = client_token def __client_token(self): proto_req = ClientToken.ClientTokenRequest( request_type=ClientToken.ClientTokenRequestType. REQUEST_CLIENT_DATA_REQUEST, client_data=ClientToken.ClientDataRequest( client_id=MercuryRequests.keymaster_client_id, client_version=Version.version_name, connectivity_sdk_data=Connectivity.ConnectivitySdkData( device_id=self.__session.device_id(), platform_specific_data=Connectivity.PlatformSpecificData( windows=Connectivity.NativeWindowsData( something1=10, something3=21370, something4=2, something6=9, something7=332, something8=33404, something10=True, ), ), ), ), ) resp = requests.post( "https://clienttoken.spotify.com/v1/clienttoken", proto_req.SerializeToString(), headers={ "Accept": "application/x-protobuf", "Content-Encoding": "", }, ) ApiClient.StatusCodeException.check_status(resp) proto_resp = ClientToken.ClientTokenResponse() proto_resp.ParseFromString(resp.content) return proto_resp class StatusCodeException(IOError): """ """ code: int def __init__(self, response: requests.Response): super().__init__(response.status_code) self.code = response.status_code @staticmethod def check_status(response: requests.Response) -> None: """ :param response: requests.Response: """ if response.status_code != 200: raise ApiClient.StatusCodeException(response) class ApResolver: """ """ base_url = "https://apresolve.spotify.com/" @staticmethod def request(service_type: str) -> typing.Any: """Gets the specified ApResolve :param service_type: str: :returns: The resulting object will be returned """ response = requests.get("{}?type={}".format(ApResolver.base_url, service_type)) # If ApResolve responds with a non-200, treat this as a clear, # high-level error instead of bubbling up JSON parsing # exceptions from HTML error pages. if response.status_code != 200: if response.status_code == 502: raise RuntimeError( "Failed to contact Spotify ApResolve (502). " "Servers might be down or unreachable." ) raise RuntimeError( f"Failed to contact Spotify ApResolve (status {response.status_code}). " "This is usually a network, DNS, or firewall issue." ) try: return response.json() except ValueError as exc: # Response wasn't valid JSON; surface a friendly error # instead of a long JSONDecodeError traceback. raise RuntimeError( "Spotify ApResolve returned invalid data. " "This is likely a temporary server or network problem." ) from exc @staticmethod def get_random_of(service_type: str) -> str: """Gets the specified random ApResolve url :param service_type: str: :returns: A random ApResolve url will be returned """ pool = ApResolver.request(service_type) urls = pool.get(service_type) if urls is None or len(urls) == 0: raise RuntimeError("No ApResolve url available") return random.choice(urls) @staticmethod def get_random_dealer() -> str: """Get dealer endpoint url :returns: dealer endpoint url """ return ApResolver.get_random_of("dealer") @staticmethod def get_random_spclient() -> str: """Get spclient endpoint url :returns: spclient endpoint url """ return ApResolver.get_random_of("spclient") @staticmethod def get_random_accesspoint() -> str: """Get accesspoint endpoint url :returns: accesspoint endpoint url """ return ApResolver.get_random_of("accesspoint") class DealerClient(Closeable): """ """ logger = logging.getLogger("Librespot:DealerClient") __connection: typing.Union[ConnectionHolder, None] __last_scheduled_reconnection: typing.Union[sched.Event, None] __message_listeners: typing.Dict[MessageListener, typing.List[str]] = {} __message_listeners_lock = threading.Condition() __request_listeners: typing.Dict[str, RequestListener] = {} __request_listeners_lock = threading.Condition() __scheduler = sched.scheduler() __session: Session __worker = concurrent.futures.ThreadPoolExecutor() def __init__(self, session: Session): self.__session = session def add_message_listener(self, listener: MessageListener, uris: list[str]) -> None: """ :param listener: MessageListener: :param uris: list[str]: """ with self.__message_listeners_lock: if listener in self.__message_listeners: raise TypeError( "A listener for {} has already been added.".format(uris)) self.__message_listeners[listener] = uris self.__message_listeners_lock.notify_all() def add_request_listener(self, listener: RequestListener, uri: str): """ :param listener: RequestListener: :param uri: str: """ with self.__request_listeners_lock: if uri in self.__request_listeners: raise TypeError( "A listener for '{}' has already been added.".format(uri)) self.__request_listeners[uri] = listener self.__request_listeners_lock.notify_all() def close(self) -> None: """ """ self.__worker.shutdown() def connect(self) -> None: """ """ self.__connection = DealerClient.ConnectionHolder( self.__session, self, "wss://{}/?access_token={}".format( ApResolver.get_random_dealer(), self.__session.tokens().get("playlist-read"), ), ) def connection_invalided(self) -> None: """ """ self.__connection = None self.logger.debug("Scheduled reconnection attempt in 10 seconds...") def anonymous(): """ """ self.__last_scheduled_reconnection = None self.connect() self.__last_scheduled_reconnection = self.__scheduler.enter( 10, 1, anonymous) def handle_message(self, obj: typing.Any) -> None: """ :param obj: typing.Any: """ uri = obj.get("uri") headers = self.__get_headers(obj) payloads = obj.get("payloads") decoded_payloads: typing.Any if payloads is not None: if headers.get("Content-Type") == "application/json": decoded_payloads = payloads elif headers.get("Content-Type") == "plain/text": decoded_payloads = payloads else: decoded_payloads = base64.b64decode(payloads) if headers.get("Transfer-Encoding") == "gzip": decoded_payloads = gzip.decompress(decoded_payloads) else: decoded_payloads = b"" interesting = False with self.__message_listeners_lock: for listener in self.__message_listeners: dispatched = False keys = self.__message_listeners.get(listener) for key in keys: if uri.startswith(key) and not dispatched: interesting = True def anonymous(): """ """ listener.on_message(uri, headers, decoded_payloads) self.__worker.submit(anonymous) dispatched = True if not interesting: self.logger.debug("Couldn't dispatch message: {}".format(uri)) def handle_request(self, obj: typing.Any) -> None: """ :param obj: typing.Any: """ mid = obj.get("message_ident") key = obj.get("key") headers = self.__get_headers(obj) payload = obj.get("payload") if headers.get("Transfer-Encoding") == "gzip": gz = base64.b64decode(payload.get("compressed")) payload = json.loads(gzip.decompress(gz)) pid = payload.get("message_id") sender = payload.get("sent_by_device_id") command = payload.get("command") self.logger.debug( "Received request. [mid: {}, key: {}, pid: {}, sender: {}, command: {}]" .format(mid, key, pid, sender, command)) interesting = False with self.__request_listeners_lock: for mid_prefix in self.__request_listeners: if mid.startswith(mid_prefix): listener = self.__request_listeners.get(mid_prefix) interesting = True def anonymous(): """ """ result = listener.on_request(mid, pid, sender, command) if self.__connection is not None: self.__connection.send_reply(key, result) self.logger.warning( "Handled request. [key: {}, result: {}]".format( key, result)) self.__worker.submit(anonymous) if not interesting: self.logger.debug("Couldn't dispatch request: {}".format(mid)) def remove_message_listener(self, listener: MessageListener) -> None: """ :param listener: MessageListener: """ with self.__message_listeners_lock: self.__message_listeners.pop(listener) def remove_request_listener(self, listener: RequestListener) -> None: """ :param listener: RequestListener: """ with self.__request_listeners_lock: request_listeners = {} for key, value in self.__request_listeners.items(): if value != listener: request_listeners[key] = value self.__request_listeners = request_listeners def wait_for_listener(self) -> None: """ """ with self.__message_listeners_lock: if self.__message_listeners == {}: return self.__message_listeners_lock.wait() def __get_headers(self, obj: typing.Any) -> dict[str, str]: headers = obj.get("headers") if headers is None: return {} return headers class ConnectionHolder(Closeable): """ """ __closed = False __dealer_client: DealerClient __last_scheduled_ping: sched.Event __received_pong = False __scheduler = sched.scheduler() __session: Session __url: str __ws: websocket.WebSocketApp def __init__(self, session: Session, dealer_client: DealerClient, url: str): self.__session = session self.__dealer_client = dealer_client self.__url = url self.__ws = websocket.WebSocketApp(url) def close(self): """ """ if not self.__closed: self.__ws.close() self.__closed = True if self.__last_scheduled_ping is not None: self.__scheduler.cancel(self.__last_scheduled_ping) def on_failure(self, ws: websocket.WebSocketApp, error): """ :param ws: websocket.WebSocketApp: :param error: """ if self.__closed: return self.__dealer_client.logger.warning( "An exception occurred. Reconnecting...") self.close() def on_message(self, ws: websocket.WebSocketApp, text: str): """ :param ws: websocket.WebSocketApp: :param text: str: """ obj = json.loads(text) self.__dealer_client.wait_for_listener() typ = MessageType.parse(obj.get("type")) if typ == MessageType.MESSAGE: self.__dealer_client.handle_message(obj) elif typ == MessageType.REQUEST: self.__dealer_client.handle_request(obj) elif typ == MessageType.PONG: self.__received_pong = True elif typ == MessageType.PING: pass else: raise RuntimeError("Unknown message type for {}".format( typ.value)) def on_open(self, ws: websocket.WebSocketApp): """ :param ws: websocket.WebSocketApp: """ if self.__closed: self.__dealer_client.logger.fatal( "I wonder what happened here... Terminating. [closed: {}]". format(self.__closed)) self.__dealer_client.logger.debug( "Dealer connected! [url: {}]".format(self.__url)) def anonymous(): """ """ self.send_ping() self.__received_pong = False def anonymous2(): """ """ if self.__last_scheduled_ping is None: return if not self.__received_pong: self.__dealer_client.logger.warning( "Did not receive ping in 3 seconds. Reconnecting..." ) self.close() return self.__received_pong = False self.__scheduler.enter(3, 1, anonymous2) self.__last_scheduled_ping = self.__scheduler.enter( 30, 1, anonymous) self.__last_scheduled_ping = self.__scheduler.enter( 30, 1, anonymous) def send_ping(self): """ """ self.__ws.send('{"type":"ping"}') def send_reply(self, key: str, result: DealerClient.RequestResult): """ :param key: str: :param result: DealerClient.RequestResult: """ success = ("true" if result == DealerClient.RequestResult.SUCCESS else "false") self.__ws.send( '{"type":"reply","key":"%s","payload":{"success":%s}' % (key, success)) class RequestResult(enum.Enum): """ """ UNKNOWN_SEND_COMMAND_RESULT = 0 SUCCESS = 1 DEVICE_NOT_FOUND = 2 CONTEXT_PLAYER_ERROR = 3 DEVICE_DISAPPEARED = 4 UPSTREAM_ERROR = 5 DEVICE_DOES_NOT_SUPPORT_COMMAND = 6 RATE_LIMITED = 7 class EventService(Closeable): """ """ logger = logging.getLogger("Librespot:EventService") __session: Session __worker = concurrent.futures.ThreadPoolExecutor() def __init__(self, session: Session): self.__session = session def __worker_callback(self, event_builder: EventBuilder): try: body = event_builder.to_array() resp = self.__session.mercury().send_sync( RawMercuryRequest.Builder().set_uri( "hm://event-service/v1/events").set_method("POST"). add_user_field("Accept-Language", "en").add_user_field( "X-ClientTimeStamp", int(time.time() * 1000)).add_payload_part(body).build()) self.logger.debug("Event sent. body: {}, result: {}".format( body, resp.status_code)) except IOError as ex: self.logger.error("Failed sending event: {} {}".format( event_builder, ex)) def send_event(self, event_or_builder: typing.Union[GenericEvent, EventBuilder]): """ :param event_or_builder: typing.Union[GenericEvent: :param EventBuilder]: """ if type(event_or_builder) is EventService.GenericEvent: builder = event_or_builder.build() elif type(event_or_builder) is EventService.EventBuilder: builder = event_or_builder else: raise TypeError() self.__worker.submit(lambda: self.__worker_callback(builder)) def language(self, lang: str): """ :param lang: str: """ event = EventService.EventBuilder(EventService.Type.LANGUAGE) event.append(s=lang) def close(self): """ """ self.__worker.shutdown() class Type(enum.Enum): """ """ LANGUAGE = ("812", 1) FETCHED_FILE_ID = ("274", 3) NEW_SESSION_ID = ("557", 3) NEW_PLAYBACK_ID = ("558", 1) TRACK_PLAYED = ("372", 1) TRACK_TRANSITION = ("12", 37) CDN_REQUEST = ("10", 20) eventId: str unknown: str def __init__(self, event_id: str, unknown: str): self.eventId = event_id self.unknown = unknown class GenericEvent: """ """ def build(self) -> EventService.EventBuilder: """ """ raise NotImplementedError class EventBuilder: """ """ body: io.BytesIO def __init__(self, event_type: EventService.Type): self.body = io.BytesIO() self.append_no_delimiter(event_type.value[0]) self.append(event_type.value[1]) def append_no_delimiter(self, s: str = None) -> None: """ :param s: str: (Default value = None) """ if s is None: s = "" self.body.write(s.encode()) def append(self, c: int = None, s: str = None) -> EventService.EventBuilder: """ :param c: int: (Default value = None) :param s: str: (Default value = None) """ if c is None and s is None or c is not None and s is not None: raise TypeError() if c is not None: self.body.write(b"\x09") self.body.write(bytes([c])) return self if s is not None: self.body.write(b"\x09") self.append_no_delimiter(s) return self def to_array(self) -> bytes: """ """ pos = self.body.tell() self.body.seek(0) data = self.body.read() self.body.seek(pos) return data class MessageType(enum.Enum): """ """ MESSAGE = "message" PING = "ping" PONG = "pong" REQUEST = "request" @staticmethod def parse(_typ: str): """ :param _typ: str: """ if _typ == MessageType.MESSAGE.value: return MessageType.MESSAGE if _typ == MessageType.PING.value: return MessageType.PING if _typ == MessageType.PONG.value: return MessageType.PONG if _typ == MessageType.REQUEST.value: return MessageType.REQUEST raise TypeError("Unknown MessageType: {}".format(_typ)) class Session(Closeable, MessageListener, SubListener): """ """ cipher_pair: typing.Union[CipherPair, None] country_code: str = "EN" connection: typing.Union[ConnectionHolder, None] logger = logging.getLogger("Librespot:Session") scheduled_reconnect: typing.Union[sched.Event, None] = None scheduler = sched.scheduler(time.time) __api: ApiClient __ap_welcome: Authentication.APWelcome __audio_key_manager: typing.Union[AudioKeyManager, None] = None __auth_lock = threading.Condition() __auth_lock_bool = False __cache_manager: typing.Union[CacheManager, None] __cdn_manager: typing.Union[CdnManager, None] __channel_manager: typing.Union[ChannelManager, None] = None __client: typing.Union[requests.Session, None] __closed = False __closing = False __content_feeder: typing.Union[PlayableContentFeeder, None] __dealer_client: typing.Union[DealerClient, None] = None __event_service: typing.Union[EventService, None] = None __keys: DiffieHellman __mercury_client: MercuryClient __receiver: typing.Union[Receiver, None] = None __search: typing.Union[SearchManager, None] __server_key = (b"\xac\xe0F\x0b\xff\xc20\xaf\xf4k\xfe\xc3\xbf\xbf\x86=" b"\xa1\x91\xc6\xcc3l\x93\xa1O\xb3\xb0\x16\x12\xac\xacj" b"\xf1\x80\xe7\xf6\x14\xd9B\x9d\xbe.4fC\xe3b\xd22z\x1a" b"\r\x92;\xae\xdd\x14\x02\xb1\x81U\x05a\x04\xd5,\x96\xa4" b"L\x1e\xcc\x02J\xd4\xb2\x0c\x00\x1f\x17\xed\xc2/\xc45" b"!\xc8\xf0\xcb\xae\xd2\xad\xd7+\x0f\x9d\xb3\xc52\x1a*" b"\xfeY\xf3Z\r\xach\xf1\xfab\x1e\xfb,\x8d\x0c\xb79-\x92" b"G\xe3\xd75\x1am\xbd$\xc2\xae%[\x88\xff\xabs)\x8a\x0b" b"\xcc\xcd\x0cXg1\x89\xe8\xbd4\x80xJ_\xc9k\x89\x9d\x95k" b"\xfc\x86\xd7O3\xa6x\x17\x96\xc9\xc3-\r2\xa5\xab\xcd\x05'" b"\xe2\xf7\x10\xa3\x96\x13\xc4/\x99\xc0'\xbf\xed\x04\x9c" b"<'X\x04\xb6\xb2\x19\xf9\xc1/\x02\xe9Hc\xec\xa1\xb6B\xa0" b"\x9dH%\xf8\xb3\x9d\xd0\xe8j\xf9HM\xa1\xc2\xba\x860B\xea" b"\x9d\xb3\x08l\x19\x0eH\xb3\x9df\xeb\x00\x06\xa2Z\xee\xa1" b"\x1b\x13\x87<\xd7\x19\xe6U\xbd") __stored_str: str = "" __token_provider: typing.Union[TokenProvider, None] __user_attributes = {} __login5_access_token: typing.Union[str, None] = None __login5_token_expiry: typing.Union[int, None] = None def __init__(self, inner: Inner, address: str) -> None: self.__client = Session.create_client(inner.conf) self.connection = Session.ConnectionHolder.create(address, None) self.__inner = inner self.__keys = DiffieHellman() self.logger.info("Created new session! device_id: {}, ap: {}".format( inner.device_id, address)) def api(self) -> ApiClient: """ """ self.__wait_auth_lock() if self.__api is None: raise RuntimeError("Session isn't authenticated!") return self.__api def ap_welcome(self): """ """ self.__wait_auth_lock() if self.__ap_welcome is None: raise RuntimeError("Session isn't authenticated!") return self.__ap_welcome def audio_key(self) -> AudioKeyManager: """ """ self.__wait_auth_lock() if self.__audio_key_manager is None: raise RuntimeError("Session isn't authenticated!") return self.__audio_key_manager def authenticate(self, credential: Authentication.LoginCredentials) -> None: """Log in to Spotify :param credential: Spotify account login information :param credential: Authentication.LoginCredentials: """ self.__authenticate_partial(credential, False) self.__authenticate_login5(credential) with self.__auth_lock: self.__mercury_client = MercuryClient(self) self.__token_provider = TokenProvider(self) self.__audio_key_manager = AudioKeyManager(self) self.__channel_manager = ChannelManager(self) self.__api = ApiClient(self) self.__cdn_manager = CdnManager(self) self.__content_feeder = PlayableContentFeeder(self) self.__cache_manager = CacheManager(self) self.__dealer_client = DealerClient(self) self.__search = SearchManager(self) self.__event_service = EventService(self) self.__auth_lock_bool = False self.__auth_lock.notify_all() self.dealer().connect() self.logger.info("Authenticated as {}!".format( self.__ap_welcome.canonical_username)) self.mercury().interested_in("spotify:user:attributes:update", self) self.dealer().add_message_listener( self, ["hm://connect-state/v1/connect/logout"]) def cache(self) -> CacheManager: """ """ self.__wait_auth_lock() if self.__cache_manager is None: raise RuntimeError("Session isn't authenticated!") return self.__cache_manager def cdn(self) -> CdnManager: """ """ self.__wait_auth_lock() if self.__cdn_manager is None: raise RuntimeError("Session isn't authenticated!") return self.__cdn_manager def channel(self) -> ChannelManager: """ """ self.__wait_auth_lock() if self.__channel_manager is None: raise RuntimeError("Session isn't authenticated!") return self.__channel_manager def client(self) -> requests.Session: """ """ return self.__client def close(self) -> None: """Close instance""" self.logger.info("Closing session. device_id: {}".format( self.__inner.device_id)) self.__closing = True if self.__dealer_client is not None: self.__dealer_client.close() self.__dealer_client = None if self.__audio_key_manager is not None: self.__audio_key_manager = None if self.__channel_manager is not None: self.__channel_manager.close() self.__channel_manager = None if self.__event_service is not None: self.__event_service.close() self.__event_service = None if self.__receiver is not None: self.__receiver.stop() self.__receiver = None if self.__client is not None: self.__client.close() self.__client = None if self.connection is not None: self.connection.close() self.connection = None with self.__auth_lock: self.__ap_welcome = None self.cipher_pair = None self.__closed = True self.logger.info("Closed session. device_id: {}".format( self.__inner.device_id)) def connect(self) -> None: """Connect to the Spotify Server. This will retry the initial handshake a few times instead of crashing immediately on transient socket errors or short reads. """ max_attempts = 3 last_exc: typing.Optional[BaseException] = None for attempt in range(1, max_attempts + 1): try: # Inform the user about each connection attempt so it is # visible in the console (e.g. when called from Zotify). # Only show attempt counters after the first failure; the # initial attempt is shown without numbering. if attempt == 1: connect_msg = "Connecting to Spotify..." else: connect_msg = ( f"Connecting to Spotify (attempt {attempt}/{max_attempts})..." ) self.logger.info(connect_msg) print(connect_msg) acc = Session.Accumulator() # Send ClientHello nonce = Random.get_random_bytes(0x10) client_hello_proto = Keyexchange.ClientHello( build_info=Version.standard_build_info(), client_nonce=nonce, cryptosuites_supported=[ Keyexchange.Cryptosuite.CRYPTO_SUITE_SHANNON ], login_crypto_hello=Keyexchange.LoginCryptoHelloUnion( diffie_hellman=Keyexchange.LoginCryptoDiffieHellmanHello( gc=self.__keys.public_key_bytes(), server_keys_known=1, ), ), padding=b"\x1e", ) client_hello_bytes = client_hello_proto.SerializeToString() self.connection.write(b"\x00\x04") self.connection.write_int(2 + 4 + len(client_hello_bytes)) self.connection.write(client_hello_bytes) self.connection.flush() acc.write(b"\x00\x04") acc.write_int(2 + 4 + len(client_hello_bytes)) acc.write(client_hello_bytes) # Read APResponseMessage ap_response_message_length = self.connection.read_int() acc.write_int(ap_response_message_length) ap_response_message_bytes = self.connection.read( ap_response_message_length - 4 ) acc.write(ap_response_message_bytes) ap_response_message_proto = Keyexchange.APResponseMessage() ap_response_message_proto.ParseFromString( ap_response_message_bytes ) shared_key = util.int_to_bytes( self.__keys.compute_shared_key( ap_response_message_proto.challenge.login_crypto_challenge. diffie_hellman.gs ) ) # Check gs_signature rsa = RSA.construct( (int.from_bytes(self.__server_key, "big"), 65537) ) pkcs1_v1_5 = PKCS1_v1_5.new(rsa) sha1 = SHA1.new() sha1.update( ap_response_message_proto.challenge.login_crypto_challenge. diffie_hellman.gs ) if not pkcs1_v1_5.verify( sha1, ap_response_message_proto.challenge.login_crypto_challenge. diffie_hellman.gs_signature, ): raise RuntimeError("Failed signature check!") # Solve challenge buffer = io.BytesIO() for i in range(1, 6): mac = HMAC.new(shared_key, digestmod=SHA1) mac.update(acc.read()) mac.update(bytes([i])) buffer.write(mac.digest()) buffer.seek(0) mac = HMAC.new(buffer.read(20), digestmod=SHA1) mac.update(acc.read()) challenge = mac.digest() client_response_plaintext_proto = ( Keyexchange.ClientResponsePlaintext( crypto_response=Keyexchange.CryptoResponseUnion(), login_crypto_response=Keyexchange.LoginCryptoResponseUnion( diffie_hellman=Keyexchange.LoginCryptoDiffieHellmanResponse( hmac=challenge ) ), pow_response=Keyexchange.PoWResponseUnion(), ) ) client_response_plaintext_bytes = ( client_response_plaintext_proto.SerializeToString() ) self.connection.write_int( 4 + len(client_response_plaintext_bytes) ) self.connection.write(client_response_plaintext_bytes) self.connection.flush() try: self.connection.set_timeout(1) scrap = self.connection.read(4) if len(scrap) == 4: payload = self.connection.read( struct.unpack(">i", scrap)[0] - 4 ) failed = Keyexchange.APResponseMessage() failed.ParseFromString(payload) raise RuntimeError(failed) except socket.timeout: # Normal path: server did not send an error APResponse. pass finally: self.connection.set_timeout(0) # If we reach here, the handshake succeeded; derive # the Shannon cipher keys and mark the session as # connected. buffer.seek(20) with self.__auth_lock: self.cipher_pair = CipherPair( buffer.read(32), buffer.read(32) ) self.__auth_lock_bool = True self.logger.info("Connection successfully!") return except (ConnectionResetError, OSError, struct.error) as exc: last_exc = exc self.logger.warning( "Handshake attempt %d/%d failed: %s", attempt, max_attempts, exc, ) if attempt == 1: print(f"Connecting to Spotify failed: {exc}") else: print( f"Connecting to Spotify (attempt {attempt}/{max_attempts}) failed: {exc}" ) # Close current connection; a new access point will be # selected on the next attempt. if self.connection is not None: try: self.connection.close() except Exception: pass self.connection = None if attempt < max_attempts: # Pick a new access point and try again after a # short delay. address = ApResolver.get_random_accesspoint() self.logger.info( "Retrying connection, new access point: %s", address ) print( "Retrying connection to Spotify with new access point: " f"{address} (next attempt {attempt + 1}/{max_attempts})" ) self.connection = Session.ConnectionHolder.create( address, None ) time.sleep(1) # All attempts failed: log and raise a clear, user-friendly # error instead of crashing with a low-level struct.error. friendly_message = ( "Failed to connect to Spotify after " f"{max_attempts} attempts. " "OAuth login succeeded, but connecting to the Spotify " "access point timed out or was refused. " "This is usually a network or firewall issue." ) self.logger.error("%s Last error: %s", friendly_message, last_exc) print(friendly_message) raise RuntimeError(friendly_message) from last_exc def content_feeder(self) -> PlayableContentFeeder: """ """ self.__wait_auth_lock() if self.__content_feeder is None: raise RuntimeError("Session isn't authenticated!") return self.__content_feeder @staticmethod def create_client(conf: Configuration) -> requests.Session: """ :param conf: Configuration: """ client = requests.Session() return client def dealer(self) -> DealerClient: """ """ self.__wait_auth_lock() if self.__dealer_client is None: raise RuntimeError("Session isn't authenticated!") return self.__dealer_client def device_id(self) -> str: """ """ return self.__inner.device_id def device_name(self) -> str: """ """ return self.__inner.device_name def device_type(self) -> Connect.DeviceType: """ """ return self.__inner.device_type def event(self, resp: MercuryClient.Response) -> None: """ :param resp: MercuryClient.Response: """ if resp.uri == "spotify:user:attributes:update": attributes_update = UserAttributesUpdate() attributes_update.ParseFromString(resp.payload) for pair in attributes_update.pairs_list: self.__user_attributes[pair.key] = pair.value self.logger.info("Updated user attribute: {} -> {}".format( pair.key, pair.value)) def get_user_attribute(self, key: str, fallback: str = None) -> str: """ :param key: str: :param fallback: str: (Default value = None) """ return (self.__user_attributes.get(key) if self.__user_attributes.get(key) is not None else fallback) def is_valid(self) -> bool: """ """ if self.__closed: return False self.__wait_auth_lock() return self.__ap_welcome is not None and self.connection is not None def mercury(self) -> MercuryClient: """ """ self.__wait_auth_lock() if self.__mercury_client is None: raise RuntimeError("Session isn't authenticated!") return self.__mercury_client def on_message(self, uri: str, headers: typing.Dict[str, str], payload: bytes): """ :param uri: str: :param headers: typing.Dict[str: :param str]: :param payload: bytes: """ if uri == "hm://connect-state/v1/connect/logout": self.close() def parse_product_info(self, data) -> None: """Parse product information :param data: Raw product information """ products = defusedxml.ElementTree.fromstring(data) if products is None: return product = products[0] if product is None: return for i in range(len(product)): self.__user_attributes[product[i].tag] = product[i].text self.logger.debug("Parsed product info: {}".format( self.__user_attributes)) def preferred_locale(self) -> str: """ """ return self.__inner.preferred_locale def reconnect(self) -> None: """Reconnect to the Spotify Server""" if self.connection is not None: self.connection.close() self.__receiver.stop() self.connection = Session.ConnectionHolder.create( ApResolver.get_random_accesspoint(), self.__inner.conf) self.connect() self.__authenticate_partial( Authentication.LoginCredentials( typ=self.__ap_welcome.reusable_auth_credentials_type, username=self.__ap_welcome.canonical_username, auth_data=self.__ap_welcome.reusable_auth_credentials, ), True, ) self.logger.info("Re-authenticated as {}!".format( self.__ap_welcome.canonical_username)) def reconnecting(self) -> bool: """ """ return not self.__closing and not self.__closed and self.connection is None def search(self) -> SearchManager: """ """ self.__wait_auth_lock() if self.__search is None: raise RuntimeError("Session isn't authenticated!") return self.__search def send(self, cmd: bytes, payload: bytes): """Send data to socket using send_unchecked :param cmd: Command :param payload: Payload :param cmd: bytes: :param payload: bytes: """ if self.__closing and self.connection is None: self.logger.debug("Connection was broken while closing.") return if self.__closed: raise RuntimeError("Session is closed!") with self.__auth_lock: if self.cipher_pair is None or self.__auth_lock_bool: self.__auth_lock.wait() self.__send_unchecked(cmd, payload) def tokens(self) -> TokenProvider: """ """ self.__wait_auth_lock() if self.__token_provider is None: raise RuntimeError("Session isn't authenticated!") return self.__token_provider def username(self): """ """ return self.__ap_welcome.canonical_username def stored(self): """ """ return self.__stored_str def __authenticate_partial(self, credential: Authentication.LoginCredentials, remove_lock: bool) -> None: """ Login to Spotify Args: credential: Spotify account login information """ if self.cipher_pair is None: raise RuntimeError("Connection not established!") client_response_encrypted_proto = Authentication.ClientResponseEncrypted( login_credentials=credential, system_info=Authentication.SystemInfo( os=Authentication.Os.OS_UNKNOWN, cpu_family=Authentication.CpuFamily.CPU_UNKNOWN, system_information_string=Version.system_info_string(), device_id=self.__inner.device_id, ), version_string=Version.version_string(), ) self.__send_unchecked( Packet.Type.login, client_response_encrypted_proto.SerializeToString()) packet = self.cipher_pair.receive_encoded(self.connection) if packet.is_cmd(Packet.Type.ap_welcome): self.__ap_welcome = Authentication.APWelcome() self.__ap_welcome.ParseFromString(packet.payload) self.__receiver = Session.Receiver(self) bytes0x0f = Random.get_random_bytes(0x14) self.__send_unchecked(Packet.Type.unknown_0x0f, bytes0x0f) preferred_locale = io.BytesIO() preferred_locale.write(b"\x00\x00\x10\x00\x02preferred-locale" + self.__inner.preferred_locale.encode()) preferred_locale.seek(0) self.__send_unchecked(Packet.Type.preferred_locale, preferred_locale.read()) if remove_lock: with self.__auth_lock: self.__auth_lock_bool = False self.__auth_lock.notify_all() if self.__inner.conf.store_credentials: reusable = self.__ap_welcome.reusable_auth_credentials reusable_type = Authentication.AuthenticationType.Name( self.__ap_welcome.reusable_auth_credentials_type) if self.__inner.conf.stored_credentials_file is None: raise TypeError( "The file path to be saved is not specified") self.__stored_str = base64.b64encode( json.dumps({ "username": self.__ap_welcome.canonical_username, "credentials": base64.b64encode(reusable).decode(), "type": reusable_type, }).encode()).decode() with open(self.__inner.conf.stored_credentials_file, "w") as f: json.dump( { "username": self.__ap_welcome.canonical_username, "credentials": base64.b64encode(reusable).decode(), "type": reusable_type, }, f, ) elif packet.is_cmd(Packet.Type.auth_failure): ap_login_failed = Keyexchange.APLoginFailed() ap_login_failed.ParseFromString(packet.payload) self.close() raise Session.SpotifyAuthenticationException(ap_login_failed) else: raise RuntimeError("Unknown CMD 0x" + packet.cmd.hex()) def __send_unchecked(self, cmd: bytes, payload: bytes) -> None: self.cipher_pair.send_encoded(self.connection, cmd, payload) def __authenticate_login5(self, credential: Authentication.LoginCredentials) -> None: """Authenticate using Login5 to get access token""" if not LOGIN5_AVAILABLE: self.logger.warning("Login5 protobuf files not available, skipping Login5 authentication") return try: # Build Login5 request login5_request = Login5.LoginRequest() # Set client info login5_request.client_info.client_id = "65b708073fc0480ea92a077233ca87bd" login5_request.client_info.device_id = self.__inner.device_id # Set stored credential from APWelcome if hasattr(self, '_Session__ap_welcome') and self.__ap_welcome: stored_cred = Login5Credentials.StoredCredential() stored_cred.username = self.__ap_welcome.canonical_username stored_cred.data = self.__ap_welcome.reusable_auth_credentials login5_request.stored_credential.CopyFrom(stored_cred) # Send Login5 request login5_url = "https://login5.spotify.com/v3/login" headers = { "Content-Type": "application/x-protobuf", "Accept": "application/x-protobuf" } response = requests.post( login5_url, data=login5_request.SerializeToString(), headers=headers ) if response.status_code == 200: login5_response = Login5.LoginResponse() login5_response.ParseFromString(response.content) if login5_response.HasField('ok'): self.__login5_access_token = login5_response.ok.access_token self.__login5_token_expiry = int(time.time()) + login5_response.ok.access_token_expires_in self.logger.info("Login5 authentication successful, got access token") else: self.logger.warning("Login5 authentication failed: {}".format(login5_response.error)) else: # Login5 is best-effort; if it fails (e.g. 403 or # region restrictions), we silently skip it to # avoid confusing the user when the main # connection has already failed. self.logger.debug( "Login5 request failed with status: %s", response.status_code ) except Exception as e: # Also treat unexpected Login5 issues as debug-only noise. self.logger.debug("Failed to authenticate with Login5: %s", e) def get_login5_token(self) -> typing.Union[str, None]: """Get the Login5 access token if available and not expired""" if self.__login5_access_token and self.__login5_token_expiry: if int(time.time()) < self.__login5_token_expiry - 60: # 60 second buffer return self.__login5_access_token else: self.logger.debug("Login5 token expired, need to re-authenticate") return None def __wait_auth_lock(self) -> None: if self.__closing and self.connection is None: self.logger.debug("Connection was broken while closing.") return if self.__closed: raise RuntimeError("Session is closed!") with self.__auth_lock: if self.cipher_pair is None or self.__auth_lock_bool: self.__auth_lock.wait() class AbsBuilder: """ """ conf = None device_id = None device_name = "librespot-python" device_type = Connect.DeviceType.COMPUTER preferred_locale = "en" def __init__(self, conf: Session.Configuration = None): if conf is None: self.conf = Session.Configuration.Builder().build() else: self.conf = conf def set_preferred_locale(self, locale: str) -> Session.AbsBuilder: """ :param locale: str: """ if len(locale) != 2: raise TypeError("Invalid locale: {}".format(locale)) self.preferred_locale = locale return self def set_device_name(self, device_name: str) -> Session.AbsBuilder: """ :param device_name: str: """ self.device_name = device_name return self def set_device_id(self, device_id: str) -> Session.AbsBuilder: """ :param device_id: str: """ if self.device_id is not None and len(device_id) != 40: raise TypeError("Device ID must be 40 chars long.") self.device_id = device_id return self def set_device_type( self, device_type: Connect.DeviceType) -> Session.AbsBuilder: """ :param device_type: Connect.DeviceType: """ self.device_type = device_type return self class Accumulator: """ """ __buffer: io.BytesIO def __init__(self): self.__buffer = io.BytesIO() def read(self) -> bytes: """Read all buffer :returns: All buffer """ pos = self.__buffer.tell() self.__buffer.seek(0) data = self.__buffer.read() self.__buffer.seek(pos) return data def write(self, data: bytes) -> None: """Write data to buffer :param data: Bytes to be written :param data: bytes: """ self.__buffer.write(data) def write_int(self, data: int) -> None: """Write data to buffer :param data: Integer to be written :param data: int: """ self.write(struct.pack(">i", data)) def write_short(self, data: int) -> None: """Write data to buffer :param data: Short integer to be written :param data: int: """ self.write(struct.pack(">h", data)) class Builder(AbsBuilder): """ """ login_credentials: Authentication.LoginCredentials = None def blob(self, username: str, blob: bytes) -> Session.Builder: """ :param username: str: :param blob: bytes: """ if self.device_id is None: raise TypeError("You must specify the device ID first.") self.login_credentials = self.decrypt_blob(self.device_id, username, blob) return self def decrypt_blob( self, device_id: str, username: str, encrypted_blob: bytes) -> Authentication.LoginCredentials: """ :param device_id: str: :param username: str: :param encrypted_blob: bytes: """ encrypted_blob = base64.b64decode(encrypted_blob) sha1 = SHA1.new() sha1.update(device_id.encode()) secret = sha1.digest() base_key = PBKDF2(secret, username.encode(), 20, 0x100, hmac_hash_module=SHA1) sha1 = SHA1.new() sha1.update(base_key) key = sha1.digest() + b"\x00\x00\x00\x14" aes = AES.new(key, AES.MODE_ECB) decrypted_blob = bytearray(aes.decrypt(encrypted_blob)) l = len(decrypted_blob) for i in range(0, l - 0x10): decrypted_blob[l - i - 1] ^= decrypted_blob[l - i - 0x11] blob = io.BytesIO(decrypted_blob) blob.read(1) le = self.read_blob_int(blob) blob.read(le) blob.read(1) type_int = self.read_blob_int(blob) type_ = Authentication.AuthenticationType.Name(type_int) if type_ is None: raise IOError( TypeError( "Unknown AuthenticationType: {}".format(type_int))) blob.read(1) l = self.read_blob_int(blob) auth_data = blob.read(l) return Authentication.LoginCredentials( auth_data=auth_data, typ=type_, username=username, ) def read_blob_int(self, buffer: io.BytesIO) -> int: """ :param buffer: io.BytesIO: """ lo = buffer.read(1) if (int(lo[0]) & 0x80) == 0: return int(lo[0]) hi = buffer.read(1) return int(lo[0]) & 0x7F | int(hi[0]) << 7 def stored(self, stored_credentials_str: str): """Create credential from stored string :param stored_credentials_str: str: :returns: Builder """ try: obj = json.loads(base64.b64decode(stored_credentials_str)) except binascii.Error: pass except json.JSONDecodeError: pass else: try: self.login_credentials = Authentication.LoginCredentials( typ=Authentication.AuthenticationType.Value( obj["type"]), username=obj["username"], auth_data=base64.b64decode(obj["credentials"]), ) except KeyError: pass return self def stored_file(self, stored_credentials: str = None) -> Session.Builder: """Create credential from stored file :param stored_credentials: str: (Default value = None) :returns: Builder """ if stored_credentials is None: stored_credentials = self.conf.stored_credentials_file if os.path.isfile(stored_credentials): try: with open(stored_credentials) as f: obj = json.load(f) except json.JSONDecodeError: pass else: try: self.login_credentials = Authentication.LoginCredentials( typ=Authentication.AuthenticationType.Value( obj["type"]), username=obj["username"], auth_data=base64.b64decode(obj["credentials"]), ) except KeyError: pass return self def user_pass(self, username: str, password: str) -> Session.Builder: """Create credential from username and password :param username: Spotify's account username :param username: str: :param password: str: :returns: Builder """ self.login_credentials = Authentication.LoginCredentials( username=username, typ=Authentication.AuthenticationType.AUTHENTICATION_USER_PASS, auth_data=password.encode(), ) return self def create(self) -> Session: """Create the Session instance :returns: Session instance """ if self.login_credentials is None: raise RuntimeError("You must select an authentication method.") session = Session( Session.Inner( self.device_type, self.device_name, self.preferred_locale, self.conf, self.device_id, ), ApResolver.get_random_accesspoint(), ) session.connect() session.authenticate(self.login_credentials) return session class Configuration: """ """ # Proxy # proxyEnabled: bool # proxyType: Proxy.Type # proxyAddress: str # proxyPort: int # proxyAuth: bool # proxyUsername: str # proxyPassword: str # Cache cache_enabled: bool cache_dir: str do_cache_clean_up: bool # Stored credentials store_credentials: bool stored_credentials_file: str # Fetching retry_on_chunk_error: bool def __init__( self, # proxy_enabled: bool, # proxy_type: Proxy.Type, # proxy_address: str, # proxy_port: int, # proxy_auth: bool, # proxy_username: str, # proxy_password: str, cache_enabled: bool, cache_dir: str, do_cache_clean_up: bool, store_credentials: bool, stored_credentials_file: str, retry_on_chunk_error: bool, ): # self.proxyEnabled = proxy_enabled # self.proxyType = proxy_type # self.proxyAddress = proxy_address # self.proxyPort = proxy_port # self.proxyAuth = proxy_auth # self.proxyUsername = proxy_username # self.proxyPassword = proxy_password self.cache_enabled = cache_enabled self.cache_dir = cache_dir self.do_cache_clean_up = do_cache_clean_up self.store_credentials = store_credentials self.stored_credentials_file = stored_credentials_file self.retry_on_chunk_error = retry_on_chunk_error class Builder: """ """ # Proxy # proxyEnabled: bool = False # proxyType: Proxy.Type = Proxy.Type.DIRECT # proxyAddress: str = None # proxyPort: int = None # proxyAuth: bool = None # proxyUsername: str = None # proxyPassword: str = None # Cache cache_enabled: bool = True cache_dir: str = os.path.join(os.getcwd(), "cache") do_cache_clean_up: bool = True # Stored credentials store_credentials: bool = True stored_credentials_file: str = os.path.join( os.getcwd(), "credentials.json") # Fetching retry_on_chunk_error: bool = True # def set_proxy_enabled( # self, # proxy_enabled: bool) -> Session.Configuration.Builder: # self.proxyEnabled = proxy_enabled # return self # def set_proxy_type( # self, # proxy_type: Proxy.Type) -> Session.Configuration.Builder: # self.proxyType = proxy_type # return self # def set_proxy_address( # self, proxy_address: str) -> Session.Configuration.Builder: # self.proxyAddress = proxy_address # return self # def set_proxy_auth( # self, proxy_auth: bool) -> Session.Configuration.Builder: # self.proxyAuth = proxy_auth # return self # def set_proxy_username( # self, # proxy_username: str) -> Session.Configuration.Builder: # self.proxyUsername = proxy_username # return self # def set_proxy_password( # self, # proxy_password: str) -> Session.Configuration.Builder: # self.proxyPassword = proxy_password # return self def set_cache_enabled( self, cache_enabled: bool) -> Session.Configuration.Builder: """Set cache_enabled :param cache_enabled: bool: :returns: Builder """ self.cache_enabled = cache_enabled return self def set_cache_dir(self, cache_dir: str) -> Session.Configuration.Builder: """Set cache_dir :param cache_dir: str: :returns: Builder """ self.cache_dir = cache_dir return self def set_do_cache_clean_up( self, do_cache_clean_up: bool) -> Session.Configuration.Builder: """Set do_cache_clean_up :param do_cache_clean_up: bool: :returns: Builder """ self.do_cache_clean_up = do_cache_clean_up return self def set_store_credentials( self, store_credentials: bool) -> Session.Configuration.Builder: """Set store_credentials :param store_credentials: bool: :returns: Builder """ self.store_credentials = store_credentials return self def set_stored_credential_file( self, stored_credential_file: str ) -> Session.Configuration.Builder: """Set stored_credential_file :param stored_credential_file: str: :returns: Builder """ self.stored_credentials_file = stored_credential_file return self def set_retry_on_chunk_error( self, retry_on_chunk_error: bool ) -> Session.Configuration.Builder: """Set retry_on_chunk_error :param retry_on_chunk_error: bool: :returns: Builder """ self.retry_on_chunk_error = retry_on_chunk_error return self def build(self) -> Session.Configuration: """Build Configuration instance :returns: Session.Configuration """ return Session.Configuration( # self.proxyEnabled, # self.proxyType, # self.proxyAddress, # self.proxyPort, # self.proxyAuth, # self.proxyUsername, # self.proxyPassword, self.cache_enabled, self.cache_dir, self.do_cache_clean_up, self.store_credentials, self.stored_credentials_file, self.retry_on_chunk_error, ) class ConnectionHolder: """ """ __buffer: io.BytesIO __socket: socket.socket def __init__(self, sock: socket.socket): self.__buffer = io.BytesIO() self.__socket = sock @staticmethod def create(address: str, conf) -> Session.ConnectionHolder: """Create the ConnectionHolder instance :param address: Address to connect :param address: str: :param conf: :returns: ConnectionHolder instance """ ap_address = address.split(":")[0] ap_port = int(address.split(":")[1]) sock = socket.socket() # Retry logic: try up to 3 times with 2 seconds between attempts # for transient connection errors (e.g., ECONNREFUSED / error 111). attempts = 0 last_err: typing.Optional[Exception] = None while attempts < 3: attempts += 1 try: sock.connect((ap_address, ap_port)) break except OSError as exc: last_err = exc # Connection refused / temporary failure if attempts >= 3: raise time.sleep(2) return Session.ConnectionHolder(sock) def close(self) -> None: """Close the connection""" self.__socket.close() def flush(self) -> None: """Flush data to socket""" attempts = 0 while True: try: self.__buffer.seek(0) self.__socket.send(self.__buffer.read()) self.__buffer = io.BytesIO() break except ConnectionResetError as exc: attempts += 1 if attempts >= 3: raise time.sleep(1) except BrokenPipeError: break def read(self, length: int) -> bytes: """Read data from socket :param length: int: :returns: Bytes data from socket """ # Ensure we either read the requested number of bytes # or raise a clear error if the connection is closed. data = b"" while len(data) < length: chunk = self.__socket.recv(length - len(data)) if not chunk: break data += chunk return data def read_int(self) -> int: """Read integer from socket :returns: integer from socket """ data = self.read(4) if len(data) != 4: raise ConnectionResetError( "Unexpected end of stream while reading 4-byte integer" ) return struct.unpack(">i", data)[0] def read_short(self) -> int: """Read short integer from socket :returns: short integer from socket """ data = self.read(2) if len(data) != 2: raise ConnectionResetError( "Unexpected end of stream while reading 2-byte integer" ) return struct.unpack(">h", data)[0] def set_timeout(self, seconds: float) -> None: """Set socket's timeout :param seconds: Number of seconds until timeout :param seconds: float: """ self.__socket.settimeout(None if seconds == 0 else seconds) def write(self, data: bytes) -> None: """Write data to buffer :param data: Bytes to be written :param data: bytes: """ self.__buffer.write(data) def write_int(self, data: int) -> None: """Write data to buffer :param data: Integer to be written :param data: int: """ self.write(struct.pack(">i", data)) def write_short(self, data: int) -> None: """Write data to buffer :param data: Short integer to be written :param data: int: """ self.write(struct.pack(">h", data)) class Inner: """ """ device_type: Connect.DeviceType = None device_name: str device_id: str conf = None preferred_locale: str def __init__( self, device_type: Connect.DeviceType, device_name: str, preferred_locale: str, conf: Session.Configuration, device_id: str = None, ): self.preferred_locale = preferred_locale self.conf = conf self.device_type = device_type self.device_name = device_name self.device_id = (device_id if device_id is not None else util.random_hex_string(40)) class Receiver: """ """ __session: Session __thread: threading.Thread __running: bool = True def __init__(self, session): self.__session = session self.__thread = threading.Thread(target=self.run) self.__thread.daemon = True self.__thread.name = "session-packet-receiver" self.__thread.start() def stop(self) -> None: """Signal the receiver thread to stop and wait for it. This ensures that the background thread exits cleanly before the underlying socket/connection is closed, avoiding "Bad file descriptor" errors from pending recv() calls. """ self.__running = False try: # Joining from within the same thread would deadlock, so # guard against that. if threading.current_thread() is not self.__thread: self.__thread.join(timeout=1) except Exception: # Shutdown should be best-effort; if join fails, we # still proceed with closing the session. self.__session.logger.debug( "Receiver.stop: failed to join receiver thread", exc_info=True ) def run(self) -> None: """Receive Packet thread function""" self.__session.logger.info("Session.Receiver started") # If the session has been explicitly closed elsewhere, do not # start the receive loop at all; this prevents infinite # reconnect cycles when the caller is done. if not self.__running: return # Track how many times in a row we have seen a connection # reset (Errno 104). After a small number of consecutive # occurrences, stop trying to reconnect to avoid an # infinite loop when the remote side keeps closing. consecutive_resets = 0 max_consecutive_resets = 3 while self.__running: packet: Packet cmd: bytes try: packet = self.__session.cipher_pair.receive_encoded( self.__session.connection) cmd = Packet.Type.parse(packet.cmd) if cmd is None: self.__session.logger.info( "Skipping unknown command cmd: 0x{}, payload: {}". format(util.bytes_to_hex(packet.cmd), packet.payload)) continue except (RuntimeError, ConnectionResetError, OSError) as ex: # If we've been asked to stop, just exit quietly without # trying to reconnect. This avoids the situation where the # session keeps reconnecting in a loop after the work is # finished and the caller expects shutdown. if not self.__running: # When the underlying socket is closed as part of a # normal shutdown, recv() may raise "Bad file # descriptor" (errno 9). This is expected and # harmless, so we skip logging it to avoid noisy # messages after a clean Session.close(). if not ( isinstance(ex, OSError) and getattr(ex, "errno", None) == 9 ): self.__session.logger.info( "Receiver stopping after connection error: %s", ex ) break # Detect repeated "connection reset by peer" errors. is_reset = isinstance(ex, ConnectionResetError) or ( isinstance(ex, OSError) and getattr(ex, "errno", None) == 104 ) # If the underlying socket is already closed (e.g. # Bad file descriptor), just stop quietly; this can # happen when Session.close() has torn down the # connection while the receiver was blocked in recv(). if isinstance(ex, OSError) and getattr(ex, "errno", None) == 9: #self.__session.logger.info( # "Receiver stopping after socket close (errno 9)" #) self.__running = False break if is_reset: consecutive_resets += 1 self.__session.logger.fatal( "Failed reading packet (reset #%d)! %s", consecutive_resets, ex, ) if consecutive_resets >= max_consecutive_resets: self.__session.logger.error( "Too many consecutive connection resets (%d). " "Stopping receiver without further reconnect attempts.", consecutive_resets, ) # Mark as not running so the outer loop and # any future reconnect logic will see that the # session should shut down. self.__running = False break else: consecutive_resets = 0 self.__session.logger.fatal( "Failed reading packet! %s", ex ) # For both reset and non-reset errors (unless we've # exceeded the reset threshold), attempt a single # reconnect. if self.__running: self.__session.reconnect() break if not self.__running: break if cmd == Packet.Type.ping: if self.__session.scheduled_reconnect is not None: self.__session.scheduler.cancel( self.__session.scheduled_reconnect) def anonymous(): """ """ self.__session.logger.warning( "Socket timed out. Reconnecting...") self.__session.reconnect() self.__session.scheduled_reconnect = self.__session.scheduler.enter( 2 * 60 + 5, 1, anonymous) self.__session.send(Packet.Type.pong, packet.payload) elif cmd == Packet.Type.pong_ack: continue elif cmd == Packet.Type.country_code: self.__session.__country_code = packet.payload.decode() self.__session.logger.info( "Received country_code: {}".format( self.__session.__country_code)) elif cmd == Packet.Type.license_version: license_version = io.BytesIO(packet.payload) license_id = struct.unpack(">h", license_version.read(2))[0] if license_id != 0: buffer = license_version.read() self.__session.logger.info( "Received license_version: {}, {}".format( license_id, buffer.decode())) else: self.__session.logger.info( "Received license_version: {}".format(license_id)) elif cmd == Packet.Type.unknown_0x10: self.__session.logger.debug("Received 0x10: {}".format( util.bytes_to_hex(packet.payload))) elif cmd in [ Packet.Type.mercury_sub, Packet.Type.mercury_unsub, Packet.Type.mercury_event, Packet.Type.mercury_req, ]: self.__session.mercury().dispatch(packet) elif cmd in [Packet.Type.aes_key, Packet.Type.aes_key_error]: self.__session.audio_key().dispatch(packet) elif cmd in [ Packet.Type.channel_error, Packet.Type.stream_chunk_res ]: self.__session.channel().dispatch(packet) elif cmd == Packet.Type.product_info: self.__session.parse_product_info(packet.payload) else: self.__session.logger.info("Skipping {}".format( util.bytes_to_hex(cmd))) class SpotifyAuthenticationException(Exception): """ """ def __init__(self, login_failed: Keyexchange.APLoginFailed): super().__init__( Keyexchange.ErrorCode.Name(login_failed.error_code)) class SearchManager: """ """ base_url = "hm://searchview/km/v4/search/" __session: Session def __init__(self, session: Session): self.__session = session def request(self, request: SearchRequest) -> typing.Any: """ :param request: SearchRequest: """ if request.get_username() == "": request.set_username(self.__session.username()) if request.get_country() == "": request.set_country(self.__session.country_code) if request.get_locale() == "": request.set_locale(self.__session.preferred_locale()) response = self.__session.mercury().send_sync( RawMercuryRequest.new_builder().set_method("GET").set_uri( request.build_url()).build()) if response.status_code != 200: raise SearchManager.SearchException(response.status_code) return json.loads(response.payload) class SearchException(Exception): """ """ def __init__(self, status_code: int): super().__init__("Search failed with code {}.".format(status_code)) class SearchRequest: """ """ query: typing.Final[str] __catalogue = "" __country = "" __image_size = "" __limit = 10 __locale = "" __username = "" def __init__(self, query: str): self.query = query if query == "": raise TypeError def build_url(self) -> str: """ """ url = SearchManager.base_url + urllib.parse.quote(self.query) url += "?entityVersion=2" url += "&catalogue=" + urllib.parse.quote(self.__catalogue) url += "&country=" + urllib.parse.quote(self.__country) url += "&imageSize=" + urllib.parse.quote(self.__image_size) url += "&limit=" + str(self.__limit) url += "&locale=" + urllib.parse.quote(self.__locale) url += "&username=" + urllib.parse.quote(self.__username) return url def get_catalogue(self) -> str: """ """ return self.__catalogue def get_country(self) -> str: """ """ return self.__country def get_image_size(self) -> str: """ """ return self.__image_size def get_limit(self) -> int: """ """ return self.__limit def get_locale(self) -> str: """ """ return self.__locale def get_username(self) -> str: """ """ return self.__username def set_catalogue(self, catalogue: str) -> SearchManager.SearchRequest: """ :param catalogue: str: """ self.__catalogue = catalogue return self def set_country(self, country: str) -> SearchManager.SearchRequest: """ :param country: str: """ self.__country = country return self def set_image_size(self, image_size: str) -> SearchManager.SearchRequest: """ :param image_size: str: """ self.__image_size = image_size return self def set_limit(self, limit: int) -> SearchManager.SearchRequest: """ :param limit: int: """ self.__limit = limit return self def set_locale(self, locale: str) -> SearchManager.SearchRequest: """ :param locale: str: """ self.__locale = locale return self def set_username(self, username: str) -> SearchManager.SearchRequest: """ :param username: str: """ self.__username = username return self class TokenProvider: """ """ logger = logging.getLogger("Librespot:TokenProvider") token_expire_threshold = 10 __session: Session __tokens: typing.List[StoredToken] = [] def __init__(self, session: Session): self._session = session def find_token_with_all_scopes( self, scopes: typing.List[str]) -> typing.Union[StoredToken, None]: """ :param scopes: typing.List[str]: """ for token in self.__tokens: if token.has_scopes(scopes): return token return None def get(self, scope: str) -> str: """ :param scope: str: """ return self.get_token(scope).access_token def get_token(self, *scopes) -> StoredToken: """ :param *scopes: """ scopes = list(scopes) if len(scopes) == 0: raise RuntimeError("The token doesn't have any scope") login5_token = self._session.get_login5_token() if login5_token: # Create a StoredToken-compatible object using Login5 token login5_stored_token = TokenProvider.Login5StoredToken(login5_token, scopes) self.logger.debug("Using Login5 access token for scopes: {}".format(scopes)) return login5_stored_token token = self.find_token_with_all_scopes(scopes) if token is not None: if token.expired(): self.__tokens.remove(token) else: return token self.logger.debug( "Token expired or not suitable, requesting again. scopes: {}, old_token: {}" .format(scopes, token)) try: response = self._session.mercury().send_sync_json( MercuryRequests.request_token(self._session.device_id(), ",".join(scopes))) token = TokenProvider.StoredToken(response) self.logger.debug( "Updated token successfully! scopes: {}, new_token: {}".format( scopes, token)) self.__tokens.append(token) return token except Exception as e: self.logger.warning("Failed to get token from keymaster endpoint: {}".format(e)) raise RuntimeError("Unable to obtain access token") class Login5StoredToken: """StoredToken-compatible wrapper for Login5 access tokens""" access_token: str scopes: typing.List[str] def __init__(self, access_token: str, scopes: typing.List[str]): self.access_token = access_token self.scopes = scopes def expired(self) -> bool: """Login5 tokens are managed by Session, so delegate expiry check""" return False # Session handles expiry def has_scope(self, scope: str) -> bool: """Login5 tokens are general-purpose, assume they have all scopes""" return True def has_scopes(self, sc: typing.List[str]) -> bool: """Login5 tokens are general-purpose, assume they have all scopes""" return True class StoredToken: """ """ expires_in: int access_token: str scopes: typing.List[str] timestamp: int def __init__(self, obj): self.timestamp = int(time.time_ns() / 1000) self.expires_in = obj["expiresIn"] self.access_token = obj["accessToken"] self.scopes = obj["scope"] def expired(self) -> bool: """ """ return self.timestamp + (self.expires_in - TokenProvider. token_expire_threshold) * 1000 * 1000 < int( time.time_ns() / 1000) def has_scope(self, scope: str) -> bool: """ :param scope: str: """ for s in self.scopes: if s == scope: return True return False def has_scopes(self, sc: typing.List[str]) -> bool: """ :param sc: typing.List[str]: """ for s in sc: if not self.has_scope(s): return False return True