Change Directory Layout

This commit is contained in:
kokarare1212
2021-03-06 18:16:43 +09:00
parent ee2a864330
commit 1f25b1468c
3 changed files with 254 additions and 741 deletions

View File

@@ -10,14 +10,12 @@ class CipherPair:
receive_nonce = 0 receive_nonce = 0
def __init__(self, send_key: bytes, receive_key: bytes): def __init__(self, send_key: bytes, receive_key: bytes):
# self.send_cipher = Shannon() self.send_cipher = Shannon()
# self.send_cipher.key(send_key) self.send_cipher.key(send_key)
self.send_cipher = Shannon(send_key)
self.send_nonce = 0 self.send_nonce = 0
# self.receive_cipher = Shannon() self.receive_cipher = Shannon()
# self.receive_cipher.key(receive_key) self.receive_cipher.key(receive_key)
self.receive_cipher = Shannon(receive_key)
self.receive_nonce = 0 self.receive_nonce = 0
def send_encoded(self, conn, cmd: bytes, payload: bytes): def send_encoded(self, conn, cmd: bytes, payload: bytes):
@@ -31,7 +29,6 @@ class CipherPair:
buffer = self.send_cipher.encrypt(buffer) buffer = self.send_cipher.encrypt(buffer)
# mac = self.send_cipher.finish(bytes(4))
mac = self.send_cipher.finish(4) mac = self.send_cipher.finish(4)
conn.write(buffer) conn.write(buffer)

View File

@@ -1,495 +1,322 @@
""" import struct
Shannon: Shannon stream cipher and MAC -- reference implementation, ported from C code written by Greg Rose import typing
https://github.com/sashahilton00/spotify-connect-resources/blob/master/Shannon-1.0/ShannonRef.c
Copyright 2017, Dmitry Borisov
THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED
WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE AND AGAINST
INFRINGEMENT ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR
CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
import struct, \
copy
# Constants
N = 16
INITKONST = 0x6996c53a
KEYP = 13 # where to insert key/MAC/counter words
FOLD = N # how many iterations of folding to do
class Shannon: class Shannon:
@staticmethod N = 16
def ROTL(w, x): FOLD = N
return ((w << x) | (w >> (32 - x))) & 0xFFFFFFFF INITKONST = 0x6996c53a
KEYP = 13
@staticmethod R: list
def ROTR(w, x): CRC: list
return ((w >> x) | (w << (32 - x))) & 0xFFFFFFFF initR: list
konst: int
sbuf: int
mbuf: int
nbuf: int
""" Nonlinear transform (sbox) of a word. def __init__(self):
There are two slightly different combinations. """ self.R = [0 for _ in range(self.N)]
self.CRC = [0 for _ in range(self.N)]
self.initR = [0 for _ in range(self.N)]
@staticmethod def rotl(self, i: int, distance: int):
def sbox1(w): return ((i << distance) | (i >> (32 - distance))) & 0xffffffff
w ^= Shannon.ROTL(w, 5) | Shannon.ROTL(w, 7)
w ^= Shannon.ROTL(w, 19) | Shannon.ROTL(w, 22)
return w
""" Nonlinear transform (sbox) of a word. def sbox(self, i: int):
There are two slightly different combinations. """ i ^= self.rotl(i, 5) | self.rotl(i, 7)
i ^= self.rotl(i, 19) | self.rotl(i, 22)
@staticmethod return i
def sbox2(w):
w ^= Shannon.ROTL(w, 7) | Shannon.ROTL(w, 22)
w ^= Shannon.ROTL(w, 5) | Shannon.ROTL(w, 19)
return w
""" initialise to known state """ def sbox2(self, i: int):
i ^= self.rotl(i, 7) | self.rotl(i, 22)
i ^= self.rotl(i, 5) | self.rotl(i, 19)
def _initstate(self): return i
global N, \
INITKONST
# Generate fibonacci numbers up to N def cycle(self):
self._R = [1, 1] t: int
for x in range(1, N - 1):
self._R.append(self._R[x] + self._R[x - 1])
self._konst = INITKONST t = self.R[12] ^ self.R[13] ^ self.konst
t = self.sbox(t) ^ self.rotl(self.R[0], 1)
""" cycle the contents of the register and calculate output word in _sbuf. """ for i in range(1, self.N):
self.R[i - 1] = self.R[i]
def _cycle(self): self.R[self.N - 1] = t
# nonlinear feedback function
t = self._R[12] ^ self._R[13] ^ self._konst
t = Shannon.sbox1(t) ^ Shannon.ROTL(self._R[0], 1)
# Shift to the left t = self.sbox2(self.R[2] ^ self.R[15])
self._R = self._R[1:] + [t] self.R[0] ^= t
t = Shannon.sbox2(self._R[2] ^ self._R[15]) self.sbuf = t ^ self.R[8] ^ self.R[12]
self._R[0] ^= t
self._sbuf = t ^ self._R[8] ^ self._R[12]
""" The Shannon MAC function is modelled after the concepts of Phelix and SHA. def crc_func(self, i: int):
Basically, words to be accumulated in the MAC are incorporated in two t: int
different ways:
1. They are incorporated into the stream cipher register at a place
where they will immediately have a nonlinear effect on the state
2. They are incorporated into bit-parallel CRC-16 registers; the
contents of these registers will be used in MAC finalization. """
""" Accumulate a CRC of input words, later to be fed into MAC.
This is actually 32 parallel CRC-16s, using the IBM CRC-16
polynomial x^16 + x^15 + x^2 + 1. """
def _crcfunc(self, i): t = self.CRC[0] ^ self.CRC[2] ^ self.CRC[15] ^ i
t = self._CRC[0] ^ self._CRC[2] ^ self._CRC[15] ^ i
# Accumulate CRC of input
self._CRC = self._CRC[1:] + [t]
""" Normal MAC word processing: do both stream register and CRC. """ for j in range(1, self.N):
self.CRC[j - 1] = self.CRC[j]
def _macfunc(self, i): self.CRC[self.N - 1] = t
global KEYP
self._crcfunc(i) def mac_func(self, i: int):
self._R[KEYP] ^= i self.crc_func(i)
""" extra nonlinear diffusion of register for key and MAC """ self.R[self.KEYP] ^= i
def _diffuse(self): def init_state(self):
global FOLD self.R[0] = 1
self.R[1] = 1
for i in range(FOLD): for i in range(2, self.N):
self._cycle() self.R[i] = self.R[i - 1] + self.R[i - 2]
""" Common actions for loading key material self.konst = self.INITKONST
Allow non-word-multiple key and nonce material.
Note also initializes the CRC register as a side effect. """
def _loadkey(self, key): def save_state(self):
global KEYP, \ for i in range(self.N):
N self.initR[i] = self.R[i]
def reload_state(self):
for i in range(self.N):
self.R[i] = self.initR[i]
def gen_konst(self):
self.konst = self.R[0]
def add_key(self, k: int):
self.R[self.KEYP] ^= k
def diffuse(self):
for i in range(self.FOLD):
self.cycle()
def load_key(self, key: bytes):
extra = bytearray(4)
i: int
j: int
t: int
# Pad key with 00s to align on 4 bytes and add key_len
padding_size = int((len(key) + 3) / 4) * 4 - len(key) padding_size = int((len(key) + 3) / 4) * 4 - len(key)
key = key + (b'\x00' * padding_size) + struct.pack("<I", len(key)) key = key + (b"\x00" * padding_size) + struct.pack("<I", len(key))
for i in range(0, len(key), 4): for i in range(0, len(key), 4):
self._R[KEYP] = self._R[KEYP] ^ struct.unpack( self.R[self.KEYP] = \
"<I", key[i:i + 4])[0] # Little Endian order self.R[self.KEYP] ^ \
self._cycle() struct.unpack("<I", key[i: i + 4])[0]
# save a copy of the register self.cycle()
self._CRC = copy.copy(self._R)
# now diffuse for i in range(self.N):
self._diffuse() self.CRC[i] = self.R[i]
# now xor the copy back -- makes key loading irreversible */ self.diffuse()
for i in range(N):
self._R[i] ^= self._CRC[i]
""" Constructor """ for i in range(self.N):
self.R[i] ^= self.CRC[i]
def __init__(self, key): def key(self, key: bytes):
self._initstate() self.init_state()
self._loadkey(key)
self._konst = self._R[0] # in case we proceed to stream generation
self._initR = copy.copy(self._R)
self._nbuf = 0
""" Published "IV" interface """ self.load_key(key)
def nonce(self, nonce): self.gen_konst()
global INITKONST
if type(nonce) == int: self.save_state()
# Accept int as well (BigEndian)
self.nbuf = 0
def nonce(self, nonce: typing.Union[bytes, int]):
if type(nonce) is int:
nonce = bytes(struct.pack(">I", nonce)) nonce = bytes(struct.pack(">I", nonce))
self._R = copy.copy(self._initR) self.reload_state()
self._konst = INITKONST
self._loadkey(nonce)
self._konst = self._R[0]
self._nbuf = 0
self._mbuf = 0
""" Encrypt small chunk """ self.konst = self.INITKONST
def _encrypt_chunk(self, chunk): self.load_key(nonce)
result = []
for c in chunk:
self._mbuf ^= c << (32 - self._nbuf)
result.append(c ^ (self._sbuf >> (32 - self._nbuf)) & 0xFF)
self._nbuf -= 8
return result self.gen_konst()
""" Combined MAC and encryption. self.nbuf = 0
Note that plaintext is accumulated for MAC. """
def encrypt(self, buf): def encrypt(self, buffer: bytes, n: int = None):
# handle any previously buffered bytes if n is None:
result = [] return self.encrypt(buffer, len(buffer))
if self._nbuf != 0:
head = buf[:(self._nbuf >> 3)]
buf = buf[(self._nbuf >> 3):]
result = self._encrypt_chunk(head)
if self._nbuf != 0:
return bytes(result)
# LFSR already cycled buffer = bytearray(buffer)
self._macfunc(self._mbuf)
# Handle body
i = 0 i = 0
while len(buf) >= 4: j: int
self._cycle() t: int
t = struct.unpack("<I", buf[i:i + 4])[0]
self._macfunc(t)
t ^= self._sbuf
result += struct.pack("<I", t)
buf = buf[4:]
# handle any trailing bytes if self.nbuf != 0:
if len(buf): while self.nbuf != 0 and n != 0:
self._cycle() self.mbuf ^= (buffer[i] & 0xff) << (32 - self.nbuf)
self._mbuf = 0 buffer[i] ^= (self.sbuf >> (32 - self.nbuf)) & 0xff
self._nbuf = 32
result += self._encrypt_chunk(buf)
return bytes(result) i += 1
""" Decrypt small chunk """ self.nbuf -= 8
def _decrypt_chunk(self, chunk): n -= 1
result = []
for c in chunk:
result.append(c ^ ((self._sbuf >> (32 - self._nbuf)) & 0xFF))
self._mbuf ^= result[-1] << (32 - self._nbuf)
self._nbuf -= 8
return result if self.nbuf != 0:
return
""" Combined MAC and decryption. self.mac_func(self.mbuf)
Note that plaintext is accumulated for MAC. """
def decrypt(self, buf): j = n & ~0x03
# handle any previously buffered bytes
result = []
if self._nbuf != 0:
head = buf[:(self._nbuf >> 3)]
buf = buf[(self._nbuf >> 3):]
result = self._decrypt_chunk(head)
if self._nbuf != 0:
return bytes(result)
# LFSR already cycled while i < j:
self._macfunc(self._mbuf) self.cycle()
t = ((buffer[i + 3] & 0xFF) << 24) | \
((buffer[i + 2] & 0xFF) << 16) | \
((buffer[i + 1] & 0xFF) << 8) | \
(buffer[i] & 0xFF)
self.mac_func(t)
t ^= self.sbuf
buffer[i + 3] = (t >> 24) & 0xFF
buffer[i + 2] = (t >> 16) & 0xFF
buffer[i + 1] = (t >> 8) & 0xFF
buffer[i] = t & 0xFF
i += 4
n &= 0x03
if n != 0:
self.cycle()
self.mbuf = 0
self.nbuf = 32
while self.nbuf != 0 and n != 0:
self.mbuf ^= (buffer[i] & 0xff) << (32 - self.nbuf)
buffer[i] ^= (self.sbuf >> (32 - self.nbuf)) & 0xff
i += 1
self.nbuf -= 8
n -= 1
return bytes(buffer)
def decrypt(self, buffer: bytes, n: int = None):
if n is None:
return self.decrypt(buffer, len(buffer))
buffer = bytearray(buffer)
# Handle whole words
i = 0 i = 0
while len(buf) >= 4: j: int
self._cycle() t: int
t = struct.unpack("<I", buf[i:i + 4])[0] ^ self._sbuf
self._macfunc(t)
result += struct.pack("<I", t)
buf = buf[4:]
# handle any trailing bytes if self.nbuf != 0:
if len(buf): while self.nbuf != 0 and n != 0:
self._cycle() buffer[i] ^= (self.sbuf >> (32 - self.nbuf)) & 0xff
self._mbuf = 0 self.mbuf ^= (buffer[i] & 0xff) << (32 - self.nbuf)
self._nbuf = 32
result += self._decrypt_chunk(buf)
return bytes(result) i += 1
""" Having accumulated a MAC, finish processing and return it. self.nbuf -= 8
Note that any unprocessed bytes are treated as if
they were encrypted zero bytes, so plaintext (zero) is accumulated. """
def finish(self, buf_len): n -= 1
global KEYP, \
INITKONST
# handle any previously buffered bytes if self.nbuf != 0:
if self._nbuf != 0: return
# LFSR already cycled
self._macfunc(self._mbuf)
# perturb the MAC to mark end of input. self.mac_func(self.mbuf)
# Note that only the stream register is updated, not the CRC. This is an
# action that can't be duplicated by passing in plaintext, hence
# defeating any kind of extension attack.
self._cycle()
self._R[KEYP] ^= INITKONST ^ (self._nbuf << 3)
self._nbuf = 0
# now add the CRC to the stream register and diffuse it j = n & ~0x03
for i in range(N):
self._R[i] ^= self._CRC[i]
self._diffuse() while i < j:
self.cycle()
t = ((buffer[i + 3] & 0xFF) << 24) | \
((buffer[i + 2] & 0xFF) << 16) | \
((buffer[i + 1] & 0xFF) << 8) | \
(buffer[i] & 0xFF)
t ^= self.sbuf
self.mac_func(t)
buffer[i + 3] = (t >> 24) & 0xFF
buffer[i + 2] = (t >> 16) & 0xFF
buffer[i + 1] = (t >> 8) & 0xFF
buffer[i] = t & 0xFF
i += 4
n &= 0x03
if n != 0:
self.cycle()
self.mbuf = 0
self.nbuf = 32
while self.nbuf != 0 and n != 0:
buffer[i] ^= (self.sbuf >> (32 - self.nbuf)) & 0xff
self.mbuf ^= (buffer[i] & 0xff) << (32 - self.nbuf)
i += 1
self.nbuf -= 8
n -= 1
return bytes(buffer)
def finish(self, n: int):
buffer = bytearray(4)
result = []
# produce output from the stream buffer
i = 0 i = 0
for i in range(0, buf_len, 4): j: int
self._cycle()
if i + 4 <= buf_len: if self.nbuf != 0:
result += struct.pack("<I", self._sbuf) self.mac_func(self.mbuf)
self.cycle()
self.add_key(self.INITKONST ^ (self.nbuf << 3))
self.nbuf = 0
for j in range(self.N):
self.R[j] ^= self.CRC[j]
self.diffuse()
while n > 0:
self.cycle()
if n >= 4:
buffer[i + 3] = (self.sbuf >> 24) & 0xff
buffer[i + 2] = (self.sbuf >> 16) & 0xff
buffer[i + 1] = (self.sbuf >> 8) & 0xff
buffer[i] = self.sbuf & 0xff
n -= 4
i += 4
else: else:
sbuf = self._sbuf for j in range(n):
for j in range(i, buf_len): buffer[i + j] = (self.sbuf >> (i * 8)) & 0xff
result.append(sbuf & 0xFF) break
sbuf >>= 8 return bytes(buffer)
return bytes(result)
if __name__ == '__main__': if __name__ == "__main__":
TESTSIZE = 23
TEST_KEY = b"test key 128bits" TEST_KEY = b"test key 128bits"
TEST_PHRASE = b'\x00' * 20 TEST_PHRASE = b'\x00' * 20
sh = Shannon()
sh = Shannon( sh.key(TEST_KEY)
bytes([ sh.nonce(0)
133, 199, 15, 101, 207, 100, 229, 237, 15, 249, 248, 155, 76, 170,
62, 189, 239, 251, 147, 213, 22, 186, 157, 47, 218, 198, 235, 14,
171, 50, 11, 121
]))
sh.set_nonce(0)
p1 = sh.decrypt(
bytes([
235,
94,
210,
19,
246,
203,
195,
35,
22,
215,
80,
69,
158,
247,
110,
146,
241,
101,
199,
37,
67,
92,
5,
197,
112,
244,
77,
185,
197,
118,
119,
56,
164,
246,
159,
242,
56,
200,
39,
27,
141,
191,
37,
244,
244,
164,
44,
250,
59,
227,
245,
155,
239,
155,
137,
85,
244,
29,
52,
233,
180,
119,
166,
46,
252,
24,
141,
20,
135,
73,
144,
10,
176,
79,
88,
228,
140,
62,
173,
192,
117,
116,
152,
182,
246,
183,
88,
90,
73,
51,
159,
83,
227,
222,
140,
48,
157,
137,
185,
131,
201,
202,
122,
112,
207,
231,
153,
155,
9,
163,
225,
73,
41,
252,
249,
65,
33,
102,
83,
100,
36,
115,
174,
191,
43,
250,
113,
229,
146,
47,
154,
175,
55,
101,
73,
164,
49,
234,
103,
32,
53,
190,
236,
47,
210,
78,
141,
0,
176,
255,
79,
151,
159,
66,
20,
]))
print([hex(x) for x in p1])
print([hex(x) for x in sh.finish(4)])
sh.set_nonce(1)
print([hex(x) for x in sh.decrypt(bytes([173, 184, 50]))])
sh = Shannon(TEST_KEY)
sh.set_nonce(0)
encr = [sh.encrypt(bytes([x])) for x in TEST_PHRASE]
print('Encrypted 1-by-1 (len %d)' % len(encr), [hex(x[0]) for x in encr])
print(' sbuf %08x' % sh._sbuf)
print(' MAC', [hex(x) for x in sh.finish(4)])
sh.set_nonce(0)
encr = sh.encrypt(TEST_PHRASE) encr = sh.encrypt(TEST_PHRASE)
print('Encrypted whole (len %d)' % len(encr), [hex(x) for x in encr]) print(encr)
print(' sbuf %08x' % sh._sbuf)
print(' MAC', [hex(x) for x in sh.finish(4)])
sh.set_nonce(0)
print('Decrypted whole', [hex(x) for x in sh.decrypt(encr)])
print(' MAC', [hex(x) for x in sh.finish(4)])
sh.set_nonce(0)
decr = [sh.decrypt(bytes([x])) for x in encr]
print('Decrypted 1-by-1', [hex(x[0]) for x in decr])
print(' MAC', [hex(x) for x in sh.finish(4)])

View File

@@ -1,311 +0,0 @@
import struct
class Shannon:
n = 16
fold = n
initkonst = 0x6996c53a
keyp = 13
r: list
crc: list
initr: list
konst: int
sbuf: int
mbuf: int
nbuf: int
def __init__(self):
self.r = [0 for _ in range(self.n)]
self.crc = [0 for _ in range(self.n)]
self.initr = [0 for _ in range(self.n)]
def rotl(self, i: int, distance: int):
return ((i << distance) | (i >> (32 - distance))) & 0xffffffff
def sbox(self, i: int):
i ^= self.rotl(i, 5) | self.rotl(i, 7)
i ^= self.rotl(i, 19) | self.rotl(i, 22)
return i
def sbox2(self, i: int):
i ^= self.rotl(i, 7) | self.rotl(i, 22)
i ^= self.rotl(i, 5) | self.rotl(i, 19)
return i
def cycle(self):
t: int
t = self.r[12] ^ self.r[13] ^ self.konst
t = self.sbox(t) ^ self.rotl(self.r[0], 1)
for i in range(1, self.n):
self.r[i - 1] = self.r[i]
self.r[self.n - 1] = t
t = self.sbox2(self.r[2] ^ self.r[15])
self.r[0] ^= t
self.sbuf = t ^ self.r[8] ^ self.r[12]
def crc_func(self, i: int):
t: int
t = self.crc[0] ^ self.crc[2] ^ self.crc[15] ^ i
for j in range(1, self.n):
self.crc[j - 1] = self.crc[j]
self.crc[self.n - 1] = t
def mac_func(self, i: int):
self.crc_func(i)
self.r[self.keyp] ^= i
def init_state(self):
self.r[0] = 1
self.r[1] = 1
for i in range(2, self.n):
self.r[i] = self.r[i - 1] + self.r[i - 2]
self.konst = self.initkonst
def save_state(self):
for i in range(self.n):
self.initr[i] = self.r[i]
def reload_state(self):
for i in range(self.n):
self.r[i] = self.initr[i]
def gen_konst(self):
self.konst = self.r[0]
def add_key(self, k: int):
self.r[self.keyp] ^= k
def diffuse(self):
for i in range(self.fold):
self.cycle()
def load_key(self, key: bytes):
extra = bytearray(4)
i: int
j: int
t: int
padding_size = int((len(key) + 3) / 4) * 4 - len(key)
key = key + (b"\x00" * padding_size) + struct.pack("<I", len(key))
for i in range(0, len(key), 4):
self.r[self.keyp] = \
self.r[self.keyp] ^ \
struct.unpack("<I", key[i: i + 4])[0]
self.cycle()
for i in range(self.n):
self.crc[i] = self.r[i]
self.diffuse()
for i in range(self.n):
self.r[i] ^= self.crc[i]
def key(self, key: bytes):
self.init_state()
self.load_key(key)
self.gen_konst()
self.save_state()
self.nbuf = 0
def nonce(self, nonce: bytes):
self.reload_state()
self.konst = self.initkonst
self.load_key(nonce)
self.gen_konst()
self.nbuf = 0
def encrypt(self, buffer: bytes, n: int = None):
if n is None:
return self.encrypt(buffer, len(buffer))
buffer = bytearray(buffer)
i = 0
j: int
t: int
if self.nbuf != 0:
while self.nbuf != 0 and n != 0:
self.mbuf ^= (buffer[i] & 0xff) << (32 - self.nbuf)
buffer[i] ^= (self.sbuf >> (32 - self.nbuf)) & 0xff
i += 1
self.nbuf -= 8
n -= 1
if self.nbuf != 0:
return
self.mac_func(self.mbuf)
j = n & ~0x03
while i < j:
self.cycle()
t = ((buffer[i + 3] & 0xFF) << 24) | \
((buffer[i + 2] & 0xFF) << 16) | \
((buffer[i + 1] & 0xFF) << 8) | \
(buffer[i] & 0xFF)
self.mac_func(t)
t ^= self.sbuf
buffer[i + 3] = (t >> 24) & 0xFF
buffer[i + 2] = (t >> 16) & 0xFF
buffer[i + 3] = (t >> 8) & 0xFF
buffer[i] = t & 0xFF
i += 4
n &= 0x03
if n != 0:
self.cycle()
self.mbuf = 0
self.nbuf = 32
while self.nbuf != 0 and n != 0:
self.mbuf ^= (buffer[i] & 0xff) << (32 - self.nbuf)
buffer[i] ^= (self.sbuf >> (32 - self.nbuf)) & 0xff
i += 1
self.nbuf -= 8
n -= 1
return bytes(buffer)
def decrypt(self, buffer: bytes, n: int = None):
if n is None:
return self.decrypt(buffer, len(buffer))
buffer = bytearray(buffer)
i = 0
j: int
t: int
if self.nbuf != 0:
while self.nbuf != 0 and n != 0:
buffer[i] ^= (self.sbuf >> (32 - self.nbuf)) & 0xff
self.mbuf ^= (buffer[i] & 0xff) << (32 - self.nbuf)
i += 1
self.nbuf -= 8
n -= 1
if self.nbuf != 0:
return
self.mac_func(self.mbuf)
j = n & ~0x03
while i < j:
self.cycle()
t = ((buffer[i + 3] & 0xFF) << 24) | \
((buffer[i + 2] & 0xFF) << 16) | \
((buffer[i + 1] & 0xFF) << 8) | \
(buffer[i] & 0xFF)
t ^= self.sbuf
self.mac_func(t)
buffer[i + 3] = (t >> 24) & 0xFF
buffer[i + 2] = (t >> 16) & 0xFF
buffer[i + 1] = (t >> 8) & 0xFF
buffer[i] = t & 0xFF
i += 4
n &= 0x03
if n != 0:
self.cycle()
self.mbuf = 0
self.nbuf = 32
while self.nbuf != 0 and n != 0:
buffer[i] ^= (self.sbuf >> (32 - self.nbuf)) & 0xff
self.mbuf ^= (buffer[i] & 0xff) << (32 - self.nbuf)
i += 1
self.nbuf -= 8
n -= 1
return bytes(buffer)
def finish(self, buffer: bytes, n: int = None):
if n is None:
return self.finish(buffer, len(buffer))
buffer = bytearray(buffer)
i = 0
j: int
if self.nbuf != 0:
self.mac_func(self.mbuf)
self.cycle()
self.add_key(self.initkonst ^ (self.nbuf << 3))
self.nbuf = 0
for j in range(self.n):
self.r[j] ^= self.crc[j]
self.diffuse()
while n > 0:
self.cycle()
if n >= 4:
buffer[i + 3] = (self.sbuf >> 24) & 0xff
buffer[i + 2] = (self.sbuf >> 16) & 0xff
buffer[i + 1] = (self.sbuf >> 8) & 0xff
buffer[i] = self.sbuf & 0xff
n -= 4
i += 4
else:
for j in range(n):
buffer[i + j] = (self.sbuf >> (i * 8)) & 0xff
break
return bytes(buffer)