Restyled by yapf

This commit is contained in:
Restyled.io
2021-05-22 01:27:30 +00:00
parent 9c47be20a5
commit 0fbe39722b
48 changed files with 1213 additions and 1198 deletions

View File

@@ -45,12 +45,10 @@ class MercuryClient(PacketsReceiver.PacketsReceiver, Closeable):
sub = Pubsub.Subscription()
sub.ParseFromString(payload)
self._subscriptions.append(
MercuryClient.InternalSubListener(sub.uri, listener, True)
)
MercuryClient.InternalSubListener(sub.uri, listener, True))
else:
self._subscriptions.append(
MercuryClient.InternalSubListener(uri, listener, True)
)
MercuryClient.InternalSubListener(uri, listener, True))
self._LOGGER.debug("Subscribed successfully to {}!".format(uri))
@@ -73,10 +71,8 @@ class MercuryClient(PacketsReceiver.PacketsReceiver, Closeable):
resp = callback.wait_response()
if resp is None:
raise IOError(
"Request timeout out, {} passed, yet no response. seq: {}".format(
self._MERCURY_REQUEST_TIMEOUT, seq
)
)
"Request timeout out, {} passed, yet no response. seq: {}".
format(self._MERCURY_REQUEST_TIMEOUT, seq))
return resp
except queue.Empty as e:
raise IOError(e)
@@ -97,9 +93,7 @@ class MercuryClient(PacketsReceiver.PacketsReceiver, Closeable):
self._LOGGER.debug(
"Send Mercury request, seq: {}, uri: {}, method: {}".format(
seq, request.header.uri, request.header.method
)
)
seq, request.header.uri, request.header.method))
buffer.write_short(4)
buffer.write_int(seq)
@@ -143,9 +137,7 @@ class MercuryClient(PacketsReceiver.PacketsReceiver, Closeable):
self._LOGGER.debug(
"Handling packet, cmd: 0x{}, seq: {}, flags: {}, parts: {}".format(
Utils.bytes_to_hex(packet.cmd), seq, flags, parts
)
)
Utils.bytes_to_hex(packet.cmd), seq, flags, parts))
for i in range(parts):
size = payload.read_short()
@@ -173,39 +165,30 @@ class MercuryClient(PacketsReceiver.PacketsReceiver, Closeable):
if not dispatched:
self._LOGGER.debug(
"Couldn't dispatch Mercury event seq: {}, uri: {}, code: {}, payload: {}".format(
seq, header.uri, header.status_code, resp.payload
)
)
elif (
packet.is_cmd(Packet.Type.mercury_req)
or packet.is_cmd(Packet.Type.mercury_sub)
or packet.is_cmd(Packet.Type.mercury_sub)
):
"Couldn't dispatch Mercury event seq: {}, uri: {}, code: {}, payload: {}"
.format(seq, header.uri, header.status_code, resp.payload))
elif (packet.is_cmd(Packet.Type.mercury_req)
or packet.is_cmd(Packet.Type.mercury_sub)
or packet.is_cmd(Packet.Type.mercury_sub)):
callback = self._callbacks.get(seq)
self._callbacks.pop(seq)
if callback is not None:
callback.response(resp)
else:
self._LOGGER.warning(
"Skipped Mercury response, seq: {}, uri: {}, code: {}".format(
seq, resp.uri, resp.status_code
)
)
"Skipped Mercury response, seq: {}, uri: {}, code: {}".
format(seq, resp.uri, resp.status_code))
with self._removeCallbackLock:
self._removeCallbackLock.notify_all()
else:
self._LOGGER.warning(
"Couldn't handle packet, seq: {}, uri: {}, code: {}".format(
seq, header.uri, header.status_code
)
)
seq, header.uri, header.status_code))
def interested_in(self, uri: str, listener: SubListener) -> None:
self._subscriptions.append(
MercuryClient.InternalSubListener(uri, listener, False)
)
MercuryClient.InternalSubListener(uri, listener, False))
def not_interested_in(self, listener: SubListener) -> None:
try:
@@ -240,7 +223,8 @@ class MercuryClient(PacketsReceiver.PacketsReceiver, Closeable):
self._reference.task_done()
def wait_response(self) -> typing.Any:
return self._reference.get(timeout=MercuryClient._MERCURY_REQUEST_TIMEOUT)
return self._reference.get(
timeout=MercuryClient._MERCURY_REQUEST_TIMEOUT)
# class PubSubException(MercuryClient.MercuryException):
# pass
@@ -273,7 +257,8 @@ class MercuryClient(PacketsReceiver.PacketsReceiver, Closeable):
payload: typing.List[bytes]
status_code: int
def __init__(self, header: Mercury.Header, payload: typing.List[bytes]):
def __init__(self, header: Mercury.Header,
payload: typing.List[bytes]):
self.uri = header.uri
self.status_code = header.status_code
self.payload = payload[1:]

View File

@@ -13,35 +13,28 @@ class RawMercuryRequest:
@staticmethod
def sub(uri: str):
return RawMercuryRequest.new_builder().set_uri(uri).set_method("SUB").build()
return RawMercuryRequest.new_builder().set_uri(uri).set_method(
"SUB").build()
@staticmethod
def unsub(uri: str):
return RawMercuryRequest.new_builder().set_uri(uri).set_method("UNSUB").build()
return RawMercuryRequest.new_builder().set_uri(uri).set_method(
"UNSUB").build()
@staticmethod
def get(uri: str):
return RawMercuryRequest.new_builder().set_uri(uri).set_method("GET").build()
return RawMercuryRequest.new_builder().set_uri(uri).set_method(
"GET").build()
@staticmethod
def send(uri: str, part: bytes):
return (
RawMercuryRequest.new_builder()
.set_uri(uri)
.add_payload_part(part)
.set_method("SEND")
.build()
)
return (RawMercuryRequest.new_builder().set_uri(uri).add_payload_part(
part).set_method("SEND").build())
@staticmethod
def post(uri: str, part: bytes):
return (
RawMercuryRequest.new_builder()
.set_uri(uri)
.set_method("POST")
.add_payload_part(part)
.build()
)
return (RawMercuryRequest.new_builder().set_uri(uri).set_method(
"POST").add_payload_part(part).build())
@staticmethod
def new_builder():
@@ -67,9 +60,10 @@ class RawMercuryRequest:
self.header_dict["method"] = method
return self
def add_user_field(
self, field: Mercury.UserField = None, key: str = None, value: str = None
):
def add_user_field(self,
field: Mercury.UserField = None,
key: str = None,
value: str = None):
if field is None and (key is None or value is None):
return self
try:
@@ -80,8 +74,7 @@ class RawMercuryRequest:
self.header_dict["user_fields"].append(field)
if key is not None and value is not None:
self.header_dict["user_fields"].append(
Mercury.UserField(key=key, value=value.encode())
)
Mercury.UserField(key=key, value=value.encode()))
return self
def add_payload_part(self, part: bytes):
@@ -92,4 +85,5 @@ class RawMercuryRequest:
return self.add_payload_part(msg)
def build(self):
return RawMercuryRequest(Mercury.Header(**self.header_dict), self.payload)
return RawMercuryRequest(Mercury.Header(**self.header_dict),
self.payload)