importing the compiled pb2 modules with alias

This commit is contained in:
Pawan Paudel
2021-05-26 15:44:04 +05:45
parent 81cbd5a588
commit 3fcf78eb23
14 changed files with 133 additions and 133 deletions

View File

@@ -35,9 +35,9 @@ from librespot.dealer import ApiClient
from librespot.dealer import DealerClient
from librespot.mercury import MercuryClient
from librespot.mercury import SubListener
from librespot.proto import Authentication_pb2
from librespot.proto import Connect_pb2
from librespot.proto import Keyexchange_pb2
from librespot.proto import Authentication_pb2 as Authentication
from librespot.proto import Connect_pb2 as Connect
from librespot.proto import Keyexchange_pb2 as Keyexchange
from librespot.proto.ExplicitContentPubsub_pb2 import UserAttributesUpdate
from librespot.standard import BytesInputStream
from librespot.standard import Closeable
@@ -319,7 +319,7 @@ class Session(Closeable, SubListener, DealerClient.MessageListener):
_conn: Session.ConnectionHolder = None
_cipherPair: CipherPair = None
_receiver: Session.Receiver = None
_apWelcome: Authentication_pb2.APWelcome = None
_apWelcome: Authentication.APWelcome = None
_mercuryClient: MercuryClient = None
_audioKeyManager: AudioKeyManager = None
_channelManager: ChannelManager = None
@@ -380,13 +380,13 @@ class Session(Closeable, SubListener, DealerClient.MessageListener):
nonce = os.urandom(0x10)
client_hello = Keyexchange_pb2.ClientHello(
client_hello = Keyexchange.ClientHello(
build_info=Version.standard_build_info(),
cryptosuites_supported=[
Keyexchange_pb2.Cryptosuite.CRYPTO_SUITE_SHANNON
Keyexchange.Cryptosuite.CRYPTO_SUITE_SHANNON
],
login_crypto_hello=Keyexchange_pb2.LoginCryptoHelloUnion(
diffie_hellman=Keyexchange_pb2.LoginCryptoDiffieHellmanHello(
login_crypto_hello=Keyexchange.LoginCryptoHelloUnion(
diffie_hellman=Keyexchange.LoginCryptoDiffieHellmanHello(
gc=self._keys.public_key_array(), server_keys_known=1), ),
client_nonce=nonce,
padding=bytes([0x1E]),
@@ -412,7 +412,7 @@ class Session(Closeable, SubListener, DealerClient.MessageListener):
buffer = self._conn.read(length - 4)
acc.write(buffer)
ap_response_message = Keyexchange_pb2.APResponseMessage()
ap_response_message = Keyexchange.APResponseMessage()
ap_response_message.ParseFromString(buffer)
shared_key = Utils.to_byte_array(
self._keys.compute_shared_key(
@@ -450,12 +450,12 @@ class Session(Closeable, SubListener, DealerClient.MessageListener):
mac.update(acc.array())
challenge = mac.digest()
client_response_plaintext = Keyexchange_pb2.ClientResponsePlaintext(
login_crypto_response=Keyexchange_pb2.LoginCryptoResponseUnion(
diffie_hellman=Keyexchange_pb2.LoginCryptoDiffieHellmanResponse(
client_response_plaintext = Keyexchange.ClientResponsePlaintext(
login_crypto_response=Keyexchange.LoginCryptoResponseUnion(
diffie_hellman=Keyexchange.LoginCryptoDiffieHellmanResponse(
hmac=challenge)),
pow_response=Keyexchange_pb2.PoWResponseUnion(),
crypto_response=Keyexchange_pb2.CryptoResponseUnion(),
pow_response=Keyexchange.PoWResponseUnion(),
crypto_response=Keyexchange.CryptoResponseUnion(),
)
client_response_plaintext_bytes = client_response_plaintext.SerializeToString(
@@ -474,7 +474,7 @@ class Session(Closeable, SubListener, DealerClient.MessageListener):
| (scrap[2] << 8)
| (scrap[3] & 0xFF))
payload = self._conn.read(length - 4)
failed = Keyexchange_pb2.APResponseMessage()
failed = Keyexchange.APResponseMessage()
failed.ParseFromString(payload)
raise RuntimeError(failed)
except socket.timeout:
@@ -490,7 +490,7 @@ class Session(Closeable, SubListener, DealerClient.MessageListener):
self._LOGGER.info("Connection successfully!")
def _authenticate(self,
credentials: Authentication_pb2.LoginCredentials) -> None:
credentials: Authentication.LoginCredentials) -> None:
self._authenticate_partial(credentials, False)
with self._authLock:
@@ -521,16 +521,16 @@ class Session(Closeable, SubListener, DealerClient.MessageListener):
self, "hm://connect-state/v1/connect/logout")
def _authenticate_partial(self,
credentials: Authentication_pb2.LoginCredentials,
credentials: Authentication.LoginCredentials,
remove_lock: bool) -> None:
if self._cipherPair is None:
raise RuntimeError("Connection not established!")
client_response_encrypted = Authentication_pb2.ClientResponseEncrypted(
client_response_encrypted = Authentication.ClientResponseEncrypted(
login_credentials=credentials,
system_info=Authentication_pb2.SystemInfo(
os=Authentication_pb2.Os.OS_UNKNOWN,
cpu_family=Authentication_pb2.CpuFamily.CPU_UNKNOWN,
system_info=Authentication.SystemInfo(
os=Authentication.Os.OS_UNKNOWN,
cpu_family=Authentication.CpuFamily.CPU_UNKNOWN,
system_information_string=Version.system_info_string(),
device_id=self._inner.device_id,
),
@@ -542,7 +542,7 @@ class Session(Closeable, SubListener, DealerClient.MessageListener):
packet = self._cipherPair.receive_encoded(self._conn)
if packet.is_cmd(Packet.Type.ap_welcome):
self._apWelcome = Authentication_pb2.APWelcome()
self._apWelcome = Authentication.APWelcome()
self._apWelcome.ParseFromString(packet.payload)
self._receiver = Session.Receiver(self)
@@ -564,7 +564,7 @@ class Session(Closeable, SubListener, DealerClient.MessageListener):
if self._inner.conf.store_credentials:
reusable = self._apWelcome.reusable_auth_credentials
reusable_type = Authentication_pb2.AuthenticationType.Name(
reusable_type = Authentication.AuthenticationType.Name(
self._apWelcome.reusable_auth_credentials_type)
if self._inner.conf.stored_credentials_file is None:
@@ -581,7 +581,7 @@ class Session(Closeable, SubListener, DealerClient.MessageListener):
)
elif packet.is_cmd(Packet.Type.auth_failure):
ap_login_failed = Keyexchange_pb2.APLoginFailed()
ap_login_failed = Keyexchange.APLoginFailed()
ap_login_failed.ParseFromString(packet.payload)
raise Session.SpotifyAuthenticationException(ap_login_failed)
else:
@@ -745,7 +745,7 @@ class Session(Closeable, SubListener, DealerClient.MessageListener):
def username(self) -> str:
return self.ap_welcome().canonical_username
def ap_welcome(self) -> Authentication_pb2.APWelcome:
def ap_welcome(self) -> Authentication.APWelcome:
self._wait_auth_lock()
if self._apWelcome is None:
raise RuntimeError("Session isn't authenticated!")
@@ -770,7 +770,7 @@ class Session(Closeable, SubListener, DealerClient.MessageListener):
def preferred_locale(self) -> str:
return self._inner.preferred_locale
def device_type(self) -> Connect_pb2.DeviceType:
def device_type(self) -> Connect.DeviceType:
return self._inner.device_type
def device_name(self) -> str:
@@ -792,7 +792,7 @@ class Session(Closeable, SubListener, DealerClient.MessageListener):
ApResolver.get_random_accesspoint(), self._inner.conf)
self._connect()
self._authenticate_partial(
Authentication_pb2.LoginCredentials(
Authentication.LoginCredentials(
typ=self._apWelcome.reusable_auth_credentials_type,
username=self._apWelcome.canonical_username,
auth_data=self._apWelcome.reusable_auth_credentials,
@@ -868,7 +868,7 @@ class Session(Closeable, SubListener, DealerClient.MessageListener):
pass
class Inner:
device_type: Connect_pb2.DeviceType = None
device_type: Connect.DeviceType = None
device_name: str = None
device_id: str = None
conf = None
@@ -876,7 +876,7 @@ class Session(Closeable, SubListener, DealerClient.MessageListener):
def __init__(
self,
device_type: Connect_pb2.DeviceType,
device_type: Connect.DeviceType,
device_name: str,
preferred_locale: str,
conf: Session.Configuration,
@@ -893,7 +893,7 @@ class Session(Closeable, SubListener, DealerClient.MessageListener):
conf = None
device_id = None
device_name = "librespot-python"
device_type = Connect_pb2.DeviceType.COMPUTER
device_type = Connect.DeviceType.COMPUTER
preferred_locale = "en"
def __init__(self, conf: Session.Configuration = None):
@@ -921,12 +921,12 @@ class Session(Closeable, SubListener, DealerClient.MessageListener):
return self
def set_device_type(
self, device_type: Connect_pb2.DeviceType) -> Session.AbsBuilder:
self, device_type: Connect.DeviceType) -> Session.AbsBuilder:
self.device_type = device_type
return self
class Builder(AbsBuilder):
login_credentials: Authentication_pb2.LoginCredentials = None
login_credentials: Authentication.LoginCredentials = None
def stored(self):
pass
@@ -943,8 +943,8 @@ class Session(Closeable, SubListener, DealerClient.MessageListener):
pass
else:
try:
self.login_credentials = Authentication_pb2.LoginCredentials(
typ=Authentication_pb2.AuthenticationType.Value(
self.login_credentials = Authentication.LoginCredentials(
typ=Authentication.AuthenticationType.Value(
obj["type"]),
username=obj["username"],
auth_data=base64.b64decode(obj["credentials"]),
@@ -955,9 +955,9 @@ class Session(Closeable, SubListener, DealerClient.MessageListener):
return self
def user_pass(self, username: str, password: str) -> Session.Builder:
self.login_credentials = Authentication_pb2.LoginCredentials(
self.login_credentials = Authentication.LoginCredentials(
username=username,
typ=Authentication_pb2.AuthenticationType.AUTHENTICATION_USER_PASS,
typ=Authentication.AuthenticationType.AUTHENTICATION_USER_PASS,
auth_data=password.encode(),
)
return self
@@ -1142,9 +1142,9 @@ class Session(Closeable, SubListener, DealerClient.MessageListener):
)
class SpotifyAuthenticationException(Exception):
def __init__(self, login_failed: Keyexchange_pb2.APLoginFailed):
def __init__(self, login_failed: Keyexchange.APLoginFailed):
super().__init__(
Keyexchange_pb2.ErrorCode.Name(login_failed.error_code))
Keyexchange.ErrorCode.Name(login_failed.error_code))
class Accumulator:
buffer: bytes = bytes()