forked from LeenkxTeam/LNXSDK
		
	
		
			
	
	
		
			182 lines
		
	
	
		
			5.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
		
		
			
		
	
	
			182 lines
		
	
	
		
			5.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| 
								 | 
							
								"""
							 | 
						||
| 
								 | 
							
								Port of the Iron LZ4 compression module based on
							 | 
						||
| 
								 | 
							
								https://github.com/gorhill/lz4-wasm. Original license:
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								BSD 2-Clause License
							 | 
						||
| 
								 | 
							
								Copyright (c) 2018, Raymond Hill
							 | 
						||
| 
								 | 
							
								All rights reserved.
							 | 
						||
| 
								 | 
							
								Redistribution and use in source and binary forms, with or without
							 | 
						||
| 
								 | 
							
								modification, are permitted provided that the following conditions are met:
							 | 
						||
| 
								 | 
							
								* Redistributions of source code must retain the above copyright notice, this
							 | 
						||
| 
								 | 
							
								  list of conditions and the following disclaimer.
							 | 
						||
| 
								 | 
							
								* Redistributions in binary form must reproduce the above copyright notice,
							 | 
						||
| 
								 | 
							
								  this list of conditions and the following disclaimer in the documentation
							 | 
						||
| 
								 | 
							
								  and/or other materials provided with the distribution.
							 | 
						||
| 
								 | 
							
								THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
							 | 
						||
| 
								 | 
							
								AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
							 | 
						||
| 
								 | 
							
								IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
							 | 
						||
| 
								 | 
							
								DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
							 | 
						||
| 
								 | 
							
								FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
							 | 
						||
| 
								 | 
							
								DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
							 | 
						||
| 
								 | 
							
								SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
							 | 
						||
| 
								 | 
							
								CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
							 | 
						||
| 
								 | 
							
								OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
							 | 
						||
| 
								 | 
							
								OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
							 | 
						||
| 
								 | 
							
								"""
							 | 
						||
| 
								 | 
							
								import numpy as np
							 | 
						||
| 
								 | 
							
								from numpy import uint8, int32, uint32
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class LZ4RangeException(Exception):
							 | 
						||
| 
								 | 
							
								    pass
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class LZ4:
							 | 
						||
| 
								 | 
							
								    hash_table = None
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    @staticmethod
							 | 
						||
| 
								 | 
							
								    def encode_bound(size: int) -> int:
							 | 
						||
| 
								 | 
							
								        return 0 if size > 0x7E000000 else size + (size // 255 | 0) + 16
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    @staticmethod
							 | 
						||
| 
								 | 
							
								    def encode(b: bytes) -> bytes:
							 | 
						||
| 
								 | 
							
								        i_buf: np.ndarray = np.frombuffer(b, dtype=uint8)
							 | 
						||
| 
								 | 
							
								        i_len = i_buf.size
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        if i_len >= 0x7E000000:
							 | 
						||
| 
								 | 
							
								            raise LZ4RangeException("Input buffer is too large")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        # "The last match must start at least 12 bytes before end of block"
							 | 
						||
| 
								 | 
							
								        last_match_pos = i_len - 12
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        # "The last 5 bytes are always literals"
							 | 
						||
| 
								 | 
							
								        last_literal_pos = i_len - 5
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        if LZ4.hash_table is None:
							 | 
						||
| 
								 | 
							
								            LZ4.hash_table = np.full(shape=65536, fill_value=-65536, dtype=int32)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        LZ4.hash_table.fill(-65536)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        o_len = LZ4.encode_bound(i_len)
							 | 
						||
| 
								 | 
							
								        o_buf = np.full(shape=o_len, fill_value=0, dtype=uint8)
							 | 
						||
| 
								 | 
							
								        i_pos = 0
							 | 
						||
| 
								 | 
							
								        o_pos = 0
							 | 
						||
| 
								 | 
							
								        anchor_pos = 0
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        # Sequence-finding loop
							 | 
						||
| 
								 | 
							
								        while True:
							 | 
						||
| 
								 | 
							
								            ref_pos = int32(0)
							 | 
						||
| 
								 | 
							
								            m_offset = 0
							 | 
						||
| 
								 | 
							
								            sequence = uint32(
							 | 
						||
| 
								 | 
							
								                i_buf[i_pos] << 8 | i_buf[i_pos + 1] << 16 | i_buf[i_pos + 2] << 24
							 | 
						||
| 
								 | 
							
								            )
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            # Match-finding loop
							 | 
						||
| 
								 | 
							
								            while i_pos <= last_match_pos:
							 | 
						||
| 
								 | 
							
								                # Conversion to uint32 is mandatory to ensure correct
							 | 
						||
| 
								 | 
							
								                # unsigned right shift (compare with .hx implementation)
							 | 
						||
| 
								 | 
							
								                sequence = uint32(
							 | 
						||
| 
								 | 
							
								                    uint32(sequence) >> uint32(8) | i_buf[i_pos + 3] << 24
							 | 
						||
| 
								 | 
							
								                )
							 | 
						||
| 
								 | 
							
								                hash_val = (sequence * 0x9E37 & 0xFFFF) + (
							 | 
						||
| 
								 | 
							
								                    uint32(sequence * 0x79B1) >> uint32(16)
							 | 
						||
| 
								 | 
							
								                ) & 0xFFFF
							 | 
						||
| 
								 | 
							
								                ref_pos = LZ4.hash_table[hash_val]
							 | 
						||
| 
								 | 
							
								                LZ4.hash_table[hash_val] = i_pos
							 | 
						||
| 
								 | 
							
								                m_offset = i_pos - ref_pos
							 | 
						||
| 
								 | 
							
								                if (
							 | 
						||
| 
								 | 
							
								                    m_offset < 65536
							 | 
						||
| 
								 | 
							
								                    and i_buf[ref_pos + 0] == (sequence & 0xFF)
							 | 
						||
| 
								 | 
							
								                    and i_buf[ref_pos + 1] == ((sequence >> uint32(8)) & 0xFF)
							 | 
						||
| 
								 | 
							
								                    and i_buf[ref_pos + 2] == ((sequence >> uint32(16)) & 0xFF)
							 | 
						||
| 
								 | 
							
								                    and i_buf[ref_pos + 3] == ((sequence >> uint32(24)) & 0xFF)
							 | 
						||
| 
								 | 
							
								                ):
							 | 
						||
| 
								 | 
							
								                    break
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								                i_pos += 1
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            # No match found
							 | 
						||
| 
								 | 
							
								            if i_pos > last_match_pos:
							 | 
						||
| 
								 | 
							
								                break
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            # Match found
							 | 
						||
| 
								 | 
							
								            l_len = i_pos - anchor_pos
							 | 
						||
| 
								 | 
							
								            m_len = i_pos
							 | 
						||
| 
								 | 
							
								            i_pos += 4
							 | 
						||
| 
								 | 
							
								            ref_pos += 4
							 | 
						||
| 
								 | 
							
								            while i_pos < last_literal_pos and i_buf[i_pos] == i_buf[ref_pos]:
							 | 
						||
| 
								 | 
							
								                i_pos += 1
							 | 
						||
| 
								 | 
							
								                ref_pos += 1
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            m_len = i_pos - m_len
							 | 
						||
| 
								 | 
							
								            token = m_len - 4 if m_len < 19 else 15
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            # Write token, length of literals if needed
							 | 
						||
| 
								 | 
							
								            if l_len >= 15:
							 | 
						||
| 
								 | 
							
								                o_buf[o_pos] = 0xF0 | token
							 | 
						||
| 
								 | 
							
								                o_pos += 1
							 | 
						||
| 
								 | 
							
								                l = l_len - 15
							 | 
						||
| 
								 | 
							
								                while l >= 255:
							 | 
						||
| 
								 | 
							
								                    o_buf[o_pos] = 255
							 | 
						||
| 
								 | 
							
								                    o_pos += 1
							 | 
						||
| 
								 | 
							
								                    l -= 255
							 | 
						||
| 
								 | 
							
								                o_buf[o_pos] = l
							 | 
						||
| 
								 | 
							
								                o_pos += 1
							 | 
						||
| 
								 | 
							
								            else:
							 | 
						||
| 
								 | 
							
								                o_buf[o_pos] = (l_len << 4) | token
							 | 
						||
| 
								 | 
							
								                o_pos += 1
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            # Write literals
							 | 
						||
| 
								 | 
							
								            while l_len > 0:
							 | 
						||
| 
								 | 
							
								                l_len -= 1
							 | 
						||
| 
								 | 
							
								                o_buf[o_pos] = i_buf[anchor_pos]
							 | 
						||
| 
								 | 
							
								                o_pos += 1
							 | 
						||
| 
								 | 
							
								                anchor_pos += 1
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            if m_len == 0:
							 | 
						||
| 
								 | 
							
								                break
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            # Write offset of match
							 | 
						||
| 
								 | 
							
								            o_buf[o_pos + 0] = m_offset
							 | 
						||
| 
								 | 
							
								            o_buf[o_pos + 1] = m_offset >> 8
							 | 
						||
| 
								 | 
							
								            o_pos += 2
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            # Write length of match if needed
							 | 
						||
| 
								 | 
							
								            if m_len >= 19:
							 | 
						||
| 
								 | 
							
								                l = m_len - 19
							 | 
						||
| 
								 | 
							
								                while l >= 255:
							 | 
						||
| 
								 | 
							
								                    o_buf[o_pos] = 255
							 | 
						||
| 
								 | 
							
								                    o_pos += 1
							 | 
						||
| 
								 | 
							
								                    l -= 255
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								                o_buf[o_pos] = l
							 | 
						||
| 
								 | 
							
								                o_pos += 1
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            anchor_pos = i_pos
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        # Last sequence is literals only
							 | 
						||
| 
								 | 
							
								        l_len = i_len - anchor_pos
							 | 
						||
| 
								 | 
							
								        if l_len >= 15:
							 | 
						||
| 
								 | 
							
								            o_buf[o_pos] = 0xF0
							 | 
						||
| 
								 | 
							
								            o_pos += 1
							 | 
						||
| 
								 | 
							
								            l = l_len - 15
							 | 
						||
| 
								 | 
							
								            while l >= 255:
							 | 
						||
| 
								 | 
							
								                o_buf[o_pos] = 255
							 | 
						||
| 
								 | 
							
								                o_pos += 1
							 | 
						||
| 
								 | 
							
								                l -= 255
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            o_buf[o_pos] = l
							 | 
						||
| 
								 | 
							
								            o_pos += 1
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        else:
							 | 
						||
| 
								 | 
							
								            o_buf[o_pos] = l_len << 4
							 | 
						||
| 
								 | 
							
								            o_pos += 1
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        while l_len > 0:
							 | 
						||
| 
								 | 
							
								            l_len -= 1
							 | 
						||
| 
								 | 
							
								            o_buf[o_pos] = i_buf[anchor_pos]
							 | 
						||
| 
								 | 
							
								            o_pos += 1
							 | 
						||
| 
								 | 
							
								            anchor_pos += 1
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        return np.resize(o_buf, o_pos).tobytes()
							 |