diff --git a/librespot/core.py b/librespot/core.py index a6fd623..336ec53 100644 --- a/librespot/core.py +++ b/librespot/core.py @@ -233,17 +233,20 @@ class DealerClient(Closeable): def __init__(self, session: Session): self.__session = session - def add_message_listener(self, listener: MessageListener, uris: list[str]) -> None: + def add_message_listener(self, listener: MessageListener, + uris: list[str]) -> None: with self.__message_listeners_lock: if listener in self.__message_listeners.keys(): - raise TypeError("A listener for {} has already been added.".format(uris)) + raise TypeError( + "A listener for {} has already been added.".format(uris)) self.__message_listeners[listener] = uris self.__message_listeners_lock.notify_all() def add_request_listener(self, listener: RequestListener, uri: str): with self.__request_listeners_lock: if uri in self.__request_listeners.keys(): - raise TypeError("A listener for '{}' has already been added.".format(uri)) + raise TypeError( + "A listener for '{}' has already been added.".format(uri)) self.__request_listeners[uri] = listener self.__request_listeners_lock.notify_all() @@ -251,8 +254,10 @@ class DealerClient(Closeable): self.__worker.shutdown() def connect(self) -> None: - self.__connection = DealerClient.ConnectionHolder(self.__session, self, "wss://{}/?access_token={}" - .format(ApResolver.get_random_dealer(), self.__session.tokens().get("playlist-read"))) + self.__connection = DealerClient.ConnectionHolder( + self.__session, self, "wss://{}/?access_token={}".format( + ApResolver.get_random_dealer(), + self.__session.tokens().get("playlist-read"))) def connection_invalided(self) -> None: self.__connection = None @@ -261,7 +266,9 @@ class DealerClient(Closeable): def anonymous(): self.__last_scheduled_reconnection = None self.connect() - self.__last_scheduled_reconnection = self.__scheduler.enter(10, 1, anonymous) + + self.__last_scheduled_reconnection = self.__scheduler.enter( + 10, 1, anonymous) def handle_message(self, obj: typing.Any) -> None: uri = obj.get("uri") @@ -290,6 +297,7 @@ class DealerClient(Closeable): def anonymous(): listener.on_message(uri, headers, decoded_payloads) + self.__worker.submit(anonymous) dispatched = True if not interesting: @@ -306,7 +314,9 @@ class DealerClient(Closeable): pid = payload.get("message_id") sender = payload.get("sent_by_device_id") command = payload.get("command") - self.logger.debug("Received request. [mid: {}, key: {}, pid: {}, sender: {}, command: {}]".format(mid, key, pid, sender, command)) + self.logger.debug( + "Received request. [mid: {}, key: {}, pid: {}, sender: {}, command: {}]" + .format(mid, key, pid, sender, command)) interesting = False with self.__request_listeners_lock: for mid_prefix in self.__request_listeners.keys(): @@ -318,7 +328,10 @@ class DealerClient(Closeable): result = listener.on_request(mid, pid, sender, command) if self.__connection is not None: self.__connection.send_reply(key, result) - self.logger.warning("Handled request. [key: {}, result: {}]".format(key, result)) + self.logger.warning( + "Handled request. [key: {}, result: {}]".format( + key, result)) + self.__worker.submit(anonymous) if not interesting: self.logger.debug("Couldn't dispatch request: {}".format(mid)) @@ -356,7 +369,8 @@ class DealerClient(Closeable): __url: str __ws: websocket.WebSocketApp - def __init__(self, session: Session, dealer_client: DealerClient, url: str): + def __init__(self, session: Session, dealer_client: DealerClient, + url: str): self.__session = session self.__dealer_client = dealer_client self.__url = url @@ -372,7 +386,8 @@ class DealerClient(Closeable): def on_failure(self, ws: websocket.WebSocketApp, error): if self.__closed: return - self.__dealer_client.logger.warning("An exception occurred. Reconnecting...") + self.__dealer_client.logger.warning( + "An exception occurred. Reconnecting...") self.close() def on_message(self, ws: websocket.WebSocketApp, text: str): @@ -388,12 +403,16 @@ class DealerClient(Closeable): elif typ == MessageType.PING: pass else: - raise RuntimeError("Unknown message type for {}".format(typ.value)) + raise RuntimeError("Unknown message type for {}".format( + typ.value)) def on_open(self, ws: websocket.WebSocketApp): if self.__closed: - self.__dealer_client.logger.fatal("I wonder what happened here... Terminating. [closed: {}]".format(self.__closed)) - self.__dealer_client.logger.debug("Dealer connected! [url: {}]".format(self.__url)) + self.__dealer_client.logger.fatal( + "I wonder what happened here... Terminating. [closed: {}]". + format(self.__closed)) + self.__dealer_client.logger.debug( + "Dealer connected! [url: {}]".format(self.__url)) def anonymous(): self.send_ping() @@ -403,20 +422,28 @@ class DealerClient(Closeable): if self.__last_scheduled_ping is None: return if not self.__received_pong: - self.__dealer_client.logger.warning("Did not receive ping in 3 seconds. Reconnecting...") + self.__dealer_client.logger.warning( + "Did not receive ping in 3 seconds. Reconnecting..." + ) self.close() return self.__received_pong = False + self.__scheduler.enter(3, 1, anonymous2) - self.__last_scheduled_ping = self.__scheduler.enter(30, 1, anonymous) - self.__last_scheduled_ping = self.__scheduler.enter(30, 1, anonymous) + self.__last_scheduled_ping = self.__scheduler.enter( + 30, 1, anonymous) + + self.__last_scheduled_ping = self.__scheduler.enter( + 30, 1, anonymous) def send_ping(self): self.__ws.send("{\"type\":\"ping\"}") def send_reply(self, key: str, result: DealerClient.RequestResult): success = "true" if result == DealerClient.RequestResult.SUCCESS else "false" - self.__ws.send("{\"type\":\"reply\",\"key\":\"%s\",\"payload\":{\"success\":%s}" % (key, success)) + self.__ws.send( + "{\"type\":\"reply\",\"key\":\"%s\",\"payload\":{\"success\":%s}" + % (key, success)) class RequestResult(enum.Enum): UNKNOWN_SEND_COMMAND_RESULT = 0 @@ -638,7 +665,8 @@ class Session(Closeable, MessageListener, SubListener): self.logger.info("Authenticated as {}!".format( self.__ap_welcome.canonical_username)) self.mercury().interested_in("spotify:user:attributes:update", self) - self.dealer().add_message_listener(self, ["hm://connect-state/v1/connect/logout"]) + self.dealer().add_message_listener( + self, ["hm://connect-state/v1/connect/logout"]) def cache(self) -> CacheManager: self.__wait_auth_lock() @@ -811,7 +839,8 @@ class Session(Closeable, MessageListener, SubListener): attributes_update.ParseFromString(resp.payload) for pair in attributes_update.pairs_list: self.__user_attributes[pair.key] = pair.value - self.logger.info("Updated user attribute: {} -> {}".format(pair.key, pair.value)) + self.logger.info("Updated user attribute: {} -> {}".format( + pair.key, pair.value)) def get_user_attribute(self, key: str, fallback: str = None) -> str: return self.__user_attributes.get(key) if self.__user_attributes.get( @@ -829,7 +858,8 @@ class Session(Closeable, MessageListener, SubListener): raise RuntimeError("Session isn't authenticated!") return self.__mercury_client - def on_message(self, uri: str, headers: typing.Dict[str, str], payload: bytes): + def on_message(self, uri: str, headers: typing.Dict[str, str], + payload: bytes): if uri == "hm://connect-state/v1/connect/logout": self.close() diff --git a/librespot/structure.py b/librespot/structure.py index bbec172..a6b2afb 100644 --- a/librespot/structure.py +++ b/librespot/structure.py @@ -57,7 +57,8 @@ class HaltListener: class MessageListener: - def on_message(self, uri: str, headers: typing.Dict[str, str], payload: bytes): + def on_message(self, uri: str, headers: typing.Dict[str, str], + payload: bytes): raise NotImplementedError @@ -75,7 +76,8 @@ class PacketsReceiver: class RequestListener: - def on_request(self, mid: str, pid: int, sender: str, command: typing.Any) -> DealerClient.RequestResult: + def on_request(self, mid: str, pid: int, sender: str, + command: typing.Any) -> DealerClient.RequestResult: raise NotImplementedError