use login5 authentication instead of keymaster

fixes #306
This commit is contained in:
Googolplexed0
2025-08-16 22:19:36 -05:00
parent 3a6ce32d0d
commit afc993f047
2 changed files with 72 additions and 28 deletions

View File

@@ -29,7 +29,7 @@ from Cryptodome.Protocol.KDF import PBKDF2
from Cryptodome.PublicKey import RSA
from Cryptodome.Signature import PKCS1_v1_5
from librespot import util, oauth
from librespot import util
from librespot import Version
from librespot.audio import AudioKeyManager
from librespot.audio import CdnManager
@@ -57,6 +57,8 @@ from librespot.proto import Keyexchange_pb2 as Keyexchange
from librespot.proto import Metadata_pb2 as Metadata
from librespot.proto import Playlist4External_pb2 as Playlist4External
from librespot.proto.ExplicitContentPubsub_pb2 import UserAttributesUpdate
from librespot.proto.spotify.login5.v3 import Login5_pb2 as Login5
from librespot.proto.spotify.login5.v3.credentials import Credentials_pb2 as Login5Credentials
from librespot.structure import Closeable
from librespot.structure import MessageListener
from librespot.structure import RequestListener
@@ -908,6 +910,8 @@ class Session(Closeable, MessageListener, SubListener):
__dealer_client: typing.Union[DealerClient, None] = None
__event_service: typing.Union[EventService, None] = None
__keys: DiffieHellman
__login5_access_token: typing.Union[str, None] = None
__login5_token_expiry: typing.Union[int, None] = None
__mercury_client: MercuryClient
__receiver: typing.Union[Receiver, None] = None
__search: typing.Union[SearchManager, None]
@@ -968,6 +972,7 @@ class Session(Closeable, MessageListener, SubListener):
"""
self.__authenticate_partial(credential, False)
self.__authenticate_login5()
with self.__auth_lock:
self.__mercury_client = MercuryClient(self)
self.__token_provider = TokenProvider(self)
@@ -1203,6 +1208,13 @@ class Session(Closeable, MessageListener, SubListener):
self.__wait_auth_lock()
return self.__ap_welcome is not None and self.connection is not None
def login5(self) -> tuple[str, int]:
""" """
self.__wait_auth_lock()
if self.__login5_access_token is None or self.__login5_token_expiry is None:
raise RuntimeError("Session isn't authenticated!")
return self.__login5_access_token, self.__login5_token_expiry
def mercury(self) -> MercuryClient:
""" """
self.__wait_auth_lock()
@@ -1382,6 +1394,43 @@ class Session(Closeable, MessageListener, SubListener):
else:
raise RuntimeError("Unknown CMD 0x" + packet.cmd.hex())
def __authenticate_login5(self) -> None:
"""Authenticate using Login5 to get access token"""
login5_request = Login5.LoginRequest()
login5_request.client_info.client_id = MercuryRequests.keymaster_client_id
login5_request.client_info.device_id = self.__inner.device_id
# Set stored credential from APWelcome
if hasattr(self, '_Session__ap_welcome') and self.__ap_welcome:
stored_cred = Login5Credentials.StoredCredential()
stored_cred.username = self.__ap_welcome.canonical_username
stored_cred.data = self.__ap_welcome.reusable_auth_credentials
login5_request.stored_credential.CopyFrom(stored_cred)
response = requests.post(
"https://login5.spotify.com/v3/login",
data=login5_request.SerializeToString(),
headers={
"Content-Type": "application/x-protobuf",
"Accept": "application/x-protobuf"
}
)
if response.status_code == 200:
login5_response = Login5.LoginResponse()
login5_response.ParseFromString(response.content)
if login5_response.HasField('ok'):
self.__login5_access_token = login5_response.ok.access_token
self.__login5_token_expiry = int(time.time()) + login5_response.ok.access_token_expires_in
self.logger.info("Login5 authentication successful, got access token")
else:
self.logger.warning("Login5 authentication failed: {}".format(login5_response.error))
else:
self.logger.warning("Login5 request failed with status: {}".format(response.status_code))
else:
self.logger.error("Login5 authentication failed: No APWelcome found")
def __send_unchecked(self, cmd: bytes, payload: bytes) -> None:
self.cipher_pair.send_encoded(self.connection, cmd, payload)
@@ -2258,7 +2307,7 @@ class TokenProvider:
__tokens: typing.List[StoredToken] = []
def __init__(self, session: Session):
self._session = session
self.__session = session
def find_token_with_all_scopes(
self, scopes: typing.List[str]) -> typing.Union[StoredToken, None]:
@@ -2289,24 +2338,27 @@ class TokenProvider:
scopes = list(scopes)
if len(scopes) == 0:
raise RuntimeError("The token doesn't have any scope")
token = self.find_token_with_all_scopes(scopes)
if token is not None:
if token.expired():
self.__tokens.remove(token)
else:
return token
self.logger.debug(
"Token expired or not suitable, requesting again. scopes: {}, old_token: {}"
.format(scopes, token))
response = self._session.mercury().send_sync_json(
MercuryRequests.request_token(self._session.device_id(),
",".join(scopes)))
token = TokenProvider.StoredToken(response)
self.logger.debug(
"Updated token successfully! scopes: {}, new_token: {}".format(
scopes, token))
self.__tokens.append(token)
return token
login5_token = None
login5_access_token, login5_token_expiry = self.__session.login5()
if int(time.time()) < login5_token_expiry - 60: # 60 second buffer
login5_token = TokenProvider.StoredToken({
"expiresIn": login5_token_expiry - int(time.time()),
"accessToken": login5_access_token,
"scope": scopes
})
self.__tokens.append(login5_token)
self.logger.debug("Using Login5 access token for scopes: {}".format(scopes))
else:
self.logger.debug("Login5 token expired, need to re-authenticate")
return login5_token
class StoredToken:
""" """

View File

@@ -7,20 +7,12 @@ from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import enum_type_wrapper
from spotify.login5.v3 import \
client_info_pb2 as spotify_dot_login5_dot_v3_dot_client__info__pb2
from spotify.login5.v3 import \
user_info_pb2 as spotify_dot_login5_dot_v3_dot_user__info__pb2
from spotify.login5.v3.challenges import \
code_pb2 as spotify_dot_login5_dot_v3_dot_challenges_dot_code__pb2
from spotify.login5.v3.challenges import \
hashcash_pb2 as spotify_dot_login5_dot_v3_dot_challenges_dot_hashcash__pb2
from spotify.login5.v3.credentials import \
credentials_pb2 as \
spotify_dot_login5_dot_v3_dot_credentials_dot_credentials__pb2
from spotify.login5.v3.identifiers import \
identifiers_pb2 as \
spotify_dot_login5_dot_v3_dot_identifiers_dot_identifiers__pb2
from . import ClientInfo_pb2 as spotify_dot_login5_dot_v3_dot_client__info__pb2
from . import UserInfo_pb2 as spotify_dot_login5_dot_v3_dot_user__info__pb2
from ..v3.challenges import Code_pb2 as spotify_dot_login5_dot_v3_dot_challenges_dot_code__pb2
from ..v3.challenges import Hashcash_pb2 as spotify_dot_login5_dot_v3_dot_challenges_dot_hashcash__pb2
from ..v3.credentials import Credentials_pb2 as spotify_dot_login5_dot_v3_dot_credentials_dot_credentials__pb2
from ..v3.identifiers import Identifiers as spotify_dot_login5_dot_v3_dot_identifiers_dot_identifiers__pb2
# @@protoc_insertion_point(imports)