Merge pull request #311 from Googolplexed0/login5-auth

Use Login5 Authentication Instead of Keymaster
This commit is contained in:
碧舞すみほ
2025-08-18 08:57:55 +09:00
committed by GitHub
2 changed files with 72 additions and 28 deletions

View File

@@ -29,7 +29,7 @@ from Cryptodome.Protocol.KDF import PBKDF2
from Cryptodome.PublicKey import RSA from Cryptodome.PublicKey import RSA
from Cryptodome.Signature import PKCS1_v1_5 from Cryptodome.Signature import PKCS1_v1_5
from librespot import util, oauth from librespot import util
from librespot import Version from librespot import Version
from librespot.audio import AudioKeyManager from librespot.audio import AudioKeyManager
from librespot.audio import CdnManager from librespot.audio import CdnManager
@@ -57,6 +57,8 @@ from librespot.proto import Keyexchange_pb2 as Keyexchange
from librespot.proto import Metadata_pb2 as Metadata from librespot.proto import Metadata_pb2 as Metadata
from librespot.proto import Playlist4External_pb2 as Playlist4External from librespot.proto import Playlist4External_pb2 as Playlist4External
from librespot.proto.ExplicitContentPubsub_pb2 import UserAttributesUpdate from librespot.proto.ExplicitContentPubsub_pb2 import UserAttributesUpdate
from librespot.proto.spotify.login5.v3 import Login5_pb2 as Login5
from librespot.proto.spotify.login5.v3.credentials import Credentials_pb2 as Login5Credentials
from librespot.structure import Closeable from librespot.structure import Closeable
from librespot.structure import MessageListener from librespot.structure import MessageListener
from librespot.structure import RequestListener from librespot.structure import RequestListener
@@ -908,6 +910,8 @@ class Session(Closeable, MessageListener, SubListener):
__dealer_client: typing.Union[DealerClient, None] = None __dealer_client: typing.Union[DealerClient, None] = None
__event_service: typing.Union[EventService, None] = None __event_service: typing.Union[EventService, None] = None
__keys: DiffieHellman __keys: DiffieHellman
__login5_access_token: typing.Union[str, None] = None
__login5_token_expiry: typing.Union[int, None] = None
__mercury_client: MercuryClient __mercury_client: MercuryClient
__receiver: typing.Union[Receiver, None] = None __receiver: typing.Union[Receiver, None] = None
__search: typing.Union[SearchManager, None] __search: typing.Union[SearchManager, None]
@@ -968,6 +972,7 @@ class Session(Closeable, MessageListener, SubListener):
""" """
self.__authenticate_partial(credential, False) self.__authenticate_partial(credential, False)
self.__authenticate_login5()
with self.__auth_lock: with self.__auth_lock:
self.__mercury_client = MercuryClient(self) self.__mercury_client = MercuryClient(self)
self.__token_provider = TokenProvider(self) self.__token_provider = TokenProvider(self)
@@ -1203,6 +1208,13 @@ class Session(Closeable, MessageListener, SubListener):
self.__wait_auth_lock() self.__wait_auth_lock()
return self.__ap_welcome is not None and self.connection is not None return self.__ap_welcome is not None and self.connection is not None
def login5(self) -> tuple[str, int]:
""" """
self.__wait_auth_lock()
if self.__login5_access_token is None or self.__login5_token_expiry is None:
raise RuntimeError("Session isn't authenticated!")
return self.__login5_access_token, self.__login5_token_expiry
def mercury(self) -> MercuryClient: def mercury(self) -> MercuryClient:
""" """ """ """
self.__wait_auth_lock() self.__wait_auth_lock()
@@ -1382,6 +1394,43 @@ class Session(Closeable, MessageListener, SubListener):
else: else:
raise RuntimeError("Unknown CMD 0x" + packet.cmd.hex()) raise RuntimeError("Unknown CMD 0x" + packet.cmd.hex())
def __authenticate_login5(self) -> None:
"""Authenticate using Login5 to get access token"""
login5_request = Login5.LoginRequest()
login5_request.client_info.client_id = MercuryRequests.keymaster_client_id
login5_request.client_info.device_id = self.__inner.device_id
# Set stored credential from APWelcome
if hasattr(self, '_Session__ap_welcome') and self.__ap_welcome:
stored_cred = Login5Credentials.StoredCredential()
stored_cred.username = self.__ap_welcome.canonical_username
stored_cred.data = self.__ap_welcome.reusable_auth_credentials
login5_request.stored_credential.CopyFrom(stored_cred)
response = requests.post(
"https://login5.spotify.com/v3/login",
data=login5_request.SerializeToString(),
headers={
"Content-Type": "application/x-protobuf",
"Accept": "application/x-protobuf"
}
)
if response.status_code == 200:
login5_response = Login5.LoginResponse()
login5_response.ParseFromString(response.content)
if login5_response.HasField('ok'):
self.__login5_access_token = login5_response.ok.access_token
self.__login5_token_expiry = int(time.time()) + login5_response.ok.access_token_expires_in
self.logger.info("Login5 authentication successful, got access token")
else:
self.logger.warning("Login5 authentication failed: {}".format(login5_response.error))
else:
self.logger.warning("Login5 request failed with status: {}".format(response.status_code))
else:
self.logger.error("Login5 authentication failed: No APWelcome found")
def __send_unchecked(self, cmd: bytes, payload: bytes) -> None: def __send_unchecked(self, cmd: bytes, payload: bytes) -> None:
self.cipher_pair.send_encoded(self.connection, cmd, payload) self.cipher_pair.send_encoded(self.connection, cmd, payload)
@@ -2258,7 +2307,7 @@ class TokenProvider:
__tokens: typing.List[StoredToken] = [] __tokens: typing.List[StoredToken] = []
def __init__(self, session: Session): def __init__(self, session: Session):
self._session = session self.__session = session
def find_token_with_all_scopes( def find_token_with_all_scopes(
self, scopes: typing.List[str]) -> typing.Union[StoredToken, None]: self, scopes: typing.List[str]) -> typing.Union[StoredToken, None]:
@@ -2289,24 +2338,27 @@ class TokenProvider:
scopes = list(scopes) scopes = list(scopes)
if len(scopes) == 0: if len(scopes) == 0:
raise RuntimeError("The token doesn't have any scope") raise RuntimeError("The token doesn't have any scope")
token = self.find_token_with_all_scopes(scopes) token = self.find_token_with_all_scopes(scopes)
if token is not None: if token is not None:
if token.expired(): if token.expired():
self.__tokens.remove(token) self.__tokens.remove(token)
else: else:
return token return token
self.logger.debug(
"Token expired or not suitable, requesting again. scopes: {}, old_token: {}" login5_token = None
.format(scopes, token)) login5_access_token, login5_token_expiry = self.__session.login5()
response = self._session.mercury().send_sync_json( if int(time.time()) < login5_token_expiry - 60: # 60 second buffer
MercuryRequests.request_token(self._session.device_id(), login5_token = TokenProvider.StoredToken({
",".join(scopes))) "expiresIn": login5_token_expiry - int(time.time()),
token = TokenProvider.StoredToken(response) "accessToken": login5_access_token,
self.logger.debug( "scope": scopes
"Updated token successfully! scopes: {}, new_token: {}".format( })
scopes, token)) self.__tokens.append(login5_token)
self.__tokens.append(token) self.logger.debug("Using Login5 access token for scopes: {}".format(scopes))
return token else:
self.logger.debug("Login5 token expired, need to re-authenticate")
return login5_token
class StoredToken: class StoredToken:
""" """ """ """

View File

@@ -7,20 +7,12 @@ from google.protobuf import message as _message
from google.protobuf import reflection as _reflection from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import enum_type_wrapper from google.protobuf.internal import enum_type_wrapper
from spotify.login5.v3 import \ from . import ClientInfo_pb2 as spotify_dot_login5_dot_v3_dot_client__info__pb2
client_info_pb2 as spotify_dot_login5_dot_v3_dot_client__info__pb2 from . import UserInfo_pb2 as spotify_dot_login5_dot_v3_dot_user__info__pb2
from spotify.login5.v3 import \ from ..v3.challenges import Code_pb2 as spotify_dot_login5_dot_v3_dot_challenges_dot_code__pb2
user_info_pb2 as spotify_dot_login5_dot_v3_dot_user__info__pb2 from ..v3.challenges import Hashcash_pb2 as spotify_dot_login5_dot_v3_dot_challenges_dot_hashcash__pb2
from spotify.login5.v3.challenges import \ from ..v3.credentials import Credentials_pb2 as spotify_dot_login5_dot_v3_dot_credentials_dot_credentials__pb2
code_pb2 as spotify_dot_login5_dot_v3_dot_challenges_dot_code__pb2 from ..v3.identifiers import Identifiers as spotify_dot_login5_dot_v3_dot_identifiers_dot_identifiers__pb2
from spotify.login5.v3.challenges import \
hashcash_pb2 as spotify_dot_login5_dot_v3_dot_challenges_dot_hashcash__pb2
from spotify.login5.v3.credentials import \
credentials_pb2 as \
spotify_dot_login5_dot_v3_dot_credentials_dot_credentials__pb2
from spotify.login5.v3.identifiers import \
identifiers_pb2 as \
spotify_dot_login5_dot_v3_dot_identifiers_dot_identifiers__pb2
# @@protoc_insertion_point(imports) # @@protoc_insertion_point(imports)