176 lines
		
	
	
		
			5.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
		
		
			
		
	
	
			176 lines
		
	
	
		
			5.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| 
								 | 
							
								"""Msgpack parser with typed arrays"""
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								# Based on u-msgpack-python v2.4.1 - v at sergeev.io
							 | 
						||
| 
								 | 
							
								# https://github.com/vsergeev/u-msgpack-python
							 | 
						||
| 
								 | 
							
								#
							 | 
						||
| 
								 | 
							
								# Permission is hereby granted, free of charge, to any person obtaining a copy
							 | 
						||
| 
								 | 
							
								# of this software and associated documentation files (the "Software"), to deal
							 | 
						||
| 
								 | 
							
								# in the Software without restriction, including without limitation the rights
							 | 
						||
| 
								 | 
							
								# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
							 | 
						||
| 
								 | 
							
								# copies of the Software, and to permit persons to whom the Software is
							 | 
						||
| 
								 | 
							
								# furnished to do so, subject to the following conditions:
							 | 
						||
| 
								 | 
							
								#
							 | 
						||
| 
								 | 
							
								# The above copyright notice and this permission notice shall be included in
							 | 
						||
| 
								 | 
							
								# all copies or substantial portions of the Software.
							 | 
						||
| 
								 | 
							
								#
							 | 
						||
| 
								 | 
							
								# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
							 | 
						||
| 
								 | 
							
								# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
							 | 
						||
| 
								 | 
							
								# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
							 | 
						||
| 
								 | 
							
								# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
							 | 
						||
| 
								 | 
							
								# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
							 | 
						||
| 
								 | 
							
								# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
							 | 
						||
| 
								 | 
							
								# THE SOFTWARE.
							 | 
						||
| 
								 | 
							
								#
							 | 
						||
| 
								 | 
							
								import io
							 | 
						||
| 
								 | 
							
								import struct
							 | 
						||
| 
								 | 
							
								import numpy as np
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def _pack_integer(obj, fp):
							 | 
						||
| 
								 | 
							
								    if obj < 0:
							 | 
						||
| 
								 | 
							
								        if obj >= -32:
							 | 
						||
| 
								 | 
							
								            fp.write(struct.pack("b", obj))
							 | 
						||
| 
								 | 
							
								        elif obj >= -(2 ** (8 - 1)):
							 | 
						||
| 
								 | 
							
								            fp.write(b"\xd0" + struct.pack("b", obj))
							 | 
						||
| 
								 | 
							
								        elif obj >= -(2 ** (16 - 1)):
							 | 
						||
| 
								 | 
							
								            fp.write(b"\xd1" + struct.pack("<h", obj))
							 | 
						||
| 
								 | 
							
								        elif obj >= -(2 ** (32 - 1)):
							 | 
						||
| 
								 | 
							
								            fp.write(b"\xd2" + struct.pack("<i", obj))
							 | 
						||
| 
								 | 
							
								        elif obj >= -(2 ** (64 - 1)):
							 | 
						||
| 
								 | 
							
								            fp.write(b"\xd3" + struct.pack("<q", obj))
							 | 
						||
| 
								 | 
							
								        else:
							 | 
						||
| 
								 | 
							
								            raise Exception("huge signed int")
							 | 
						||
| 
								 | 
							
								    else:
							 | 
						||
| 
								 | 
							
								        if obj <= 127:
							 | 
						||
| 
								 | 
							
								            fp.write(struct.pack("B", obj))
							 | 
						||
| 
								 | 
							
								        elif obj <= 2**8 - 1:
							 | 
						||
| 
								 | 
							
								            fp.write(b"\xcc" + struct.pack("B", obj))
							 | 
						||
| 
								 | 
							
								        elif obj <= 2**16 - 1:
							 | 
						||
| 
								 | 
							
								            fp.write(b"\xcd" + struct.pack("<H", obj))
							 | 
						||
| 
								 | 
							
								        elif obj <= 2**32 - 1:
							 | 
						||
| 
								 | 
							
								            fp.write(b"\xce" + struct.pack("<I", obj))
							 | 
						||
| 
								 | 
							
								        elif obj <= 2**64 - 1:
							 | 
						||
| 
								 | 
							
								            fp.write(b"\xcf" + struct.pack("<Q", obj))
							 | 
						||
| 
								 | 
							
								        else:
							 | 
						||
| 
								 | 
							
								            raise Exception("huge unsigned int")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def _pack_nil(obj, fp):
							 | 
						||
| 
								 | 
							
								    fp.write(b"\xc0")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def _pack_boolean(obj, fp):
							 | 
						||
| 
								 | 
							
								    fp.write(b"\xc3" if obj else b"\xc2")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def _pack_float(obj, fp):
							 | 
						||
| 
								 | 
							
								    # NOTE: forced 32-bit floats for Leenkx
							 | 
						||
| 
								 | 
							
								    # fp.write(b"\xcb" + struct.pack("<d", obj)) # Double
							 | 
						||
| 
								 | 
							
								    fp.write(b"\xca" + struct.pack("<f", obj))
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def _pack_string(obj, fp):
							 | 
						||
| 
								 | 
							
								    obj = obj.encode("utf-8")
							 | 
						||
| 
								 | 
							
								    if len(obj) <= 31:
							 | 
						||
| 
								 | 
							
								        fp.write(struct.pack("B", 0xA0 | len(obj)) + obj)
							 | 
						||
| 
								 | 
							
								    elif len(obj) <= 2**8 - 1:
							 | 
						||
| 
								 | 
							
								        fp.write(b"\xd9" + struct.pack("B", len(obj)) + obj)
							 | 
						||
| 
								 | 
							
								    elif len(obj) <= 2**16 - 1:
							 | 
						||
| 
								 | 
							
								        fp.write(b"\xda" + struct.pack("<H", len(obj)) + obj)
							 | 
						||
| 
								 | 
							
								    elif len(obj) <= 2**32 - 1:
							 | 
						||
| 
								 | 
							
								        fp.write(b"\xdb" + struct.pack("<I", len(obj)) + obj)
							 | 
						||
| 
								 | 
							
								    else:
							 | 
						||
| 
								 | 
							
								        raise Exception("huge string")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def _pack_binary(obj, fp):
							 | 
						||
| 
								 | 
							
								    if len(obj) <= 2**8 - 1:
							 | 
						||
| 
								 | 
							
								        fp.write(b"\xc4" + struct.pack("B", len(obj)) + obj)
							 | 
						||
| 
								 | 
							
								    elif len(obj) <= 2**16 - 1:
							 | 
						||
| 
								 | 
							
								        fp.write(b"\xc5" + struct.pack("<H", len(obj)) + obj)
							 | 
						||
| 
								 | 
							
								    elif len(obj) <= 2**32 - 1:
							 | 
						||
| 
								 | 
							
								        fp.write(b"\xc6" + struct.pack("<I", len(obj)) + obj)
							 | 
						||
| 
								 | 
							
								    else:
							 | 
						||
| 
								 | 
							
								        raise Exception("huge binary string")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def _pack_array(obj, fp):
							 | 
						||
| 
								 | 
							
								    if len(obj) <= 15:
							 | 
						||
| 
								 | 
							
								        fp.write(struct.pack("B", 0x90 | len(obj)))
							 | 
						||
| 
								 | 
							
								    elif len(obj) <= 2**16 - 1:
							 | 
						||
| 
								 | 
							
								        fp.write(b"\xdc" + struct.pack("<H", len(obj)))
							 | 
						||
| 
								 | 
							
								    elif len(obj) <= 2**32 - 1:
							 | 
						||
| 
								 | 
							
								        fp.write(b"\xdd" + struct.pack("<I", len(obj)))
							 | 
						||
| 
								 | 
							
								    else:
							 | 
						||
| 
								 | 
							
								        raise Exception("huge array")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    if len(obj) > 0 and isinstance(obj[0], float):
							 | 
						||
| 
								 | 
							
								        fp.write(b"\xca")
							 | 
						||
| 
								 | 
							
								        for e in obj:
							 | 
						||
| 
								 | 
							
								            fp.write(struct.pack("<f", e))
							 | 
						||
| 
								 | 
							
								    elif len(obj) > 0 and isinstance(obj[0], bool):
							 | 
						||
| 
								 | 
							
								        for e in obj:
							 | 
						||
| 
								 | 
							
								            pack(e, fp)
							 | 
						||
| 
								 | 
							
								    elif len(obj) > 0 and isinstance(obj[0], int):
							 | 
						||
| 
								 | 
							
								        fp.write(b"\xd2")
							 | 
						||
| 
								 | 
							
								        for e in obj:
							 | 
						||
| 
								 | 
							
								            fp.write(struct.pack("<i", e))
							 | 
						||
| 
								 | 
							
								    # Float32
							 | 
						||
| 
								 | 
							
								    elif len(obj) > 0 and isinstance(obj[0], np.float32):
							 | 
						||
| 
								 | 
							
								        fp.write(b"\xca")
							 | 
						||
| 
								 | 
							
								        fp.write(obj.tobytes())
							 | 
						||
| 
								 | 
							
								    # Int32
							 | 
						||
| 
								 | 
							
								    elif len(obj) > 0 and isinstance(obj[0], np.int32):
							 | 
						||
| 
								 | 
							
								        fp.write(b"\xd2")
							 | 
						||
| 
								 | 
							
								        fp.write(obj.tobytes())
							 | 
						||
| 
								 | 
							
								    # Int16
							 | 
						||
| 
								 | 
							
								    elif len(obj) > 0 and isinstance(obj[0], np.int16):
							 | 
						||
| 
								 | 
							
								        fp.write(b"\xd1")
							 | 
						||
| 
								 | 
							
								        fp.write(obj.tobytes())
							 | 
						||
| 
								 | 
							
								    # Regular
							 | 
						||
| 
								 | 
							
								    else:
							 | 
						||
| 
								 | 
							
								        for e in obj:
							 | 
						||
| 
								 | 
							
								            pack(e, fp)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def _pack_map(obj, fp):
							 | 
						||
| 
								 | 
							
								    if len(obj) <= 15:
							 | 
						||
| 
								 | 
							
								        fp.write(struct.pack("B", 0x80 | len(obj)))
							 | 
						||
| 
								 | 
							
								    elif len(obj) <= 2**16 - 1:
							 | 
						||
| 
								 | 
							
								        fp.write(b"\xde" + struct.pack("<H", len(obj)))
							 | 
						||
| 
								 | 
							
								    elif len(obj) <= 2**32 - 1:
							 | 
						||
| 
								 | 
							
								        fp.write(b"\xdf" + struct.pack("<I", len(obj)))
							 | 
						||
| 
								 | 
							
								    else:
							 | 
						||
| 
								 | 
							
								        raise Exception("huge array")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    for k, v in obj.items():
							 | 
						||
| 
								 | 
							
								        pack(k, fp)
							 | 
						||
| 
								 | 
							
								        pack(v, fp)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def pack(obj, fp):
							 | 
						||
| 
								 | 
							
								    if obj is None:
							 | 
						||
| 
								 | 
							
								        _pack_nil(obj, fp)
							 | 
						||
| 
								 | 
							
								    elif isinstance(obj, bool):
							 | 
						||
| 
								 | 
							
								        _pack_boolean(obj, fp)
							 | 
						||
| 
								 | 
							
								    elif isinstance(obj, int):
							 | 
						||
| 
								 | 
							
								        _pack_integer(obj, fp)
							 | 
						||
| 
								 | 
							
								    elif isinstance(obj, float):
							 | 
						||
| 
								 | 
							
								        _pack_float(obj, fp)
							 | 
						||
| 
								 | 
							
								    elif isinstance(obj, str):
							 | 
						||
| 
								 | 
							
								        _pack_string(obj, fp)
							 | 
						||
| 
								 | 
							
								    elif isinstance(obj, bytes):
							 | 
						||
| 
								 | 
							
								        _pack_binary(obj, fp)
							 | 
						||
| 
								 | 
							
								    elif isinstance(obj, (list, np.ndarray, tuple)):
							 | 
						||
| 
								 | 
							
								        _pack_array(obj, fp)
							 | 
						||
| 
								 | 
							
								    elif isinstance(obj, dict):
							 | 
						||
| 
								 | 
							
								        _pack_map(obj, fp)
							 | 
						||
| 
								 | 
							
								    else:
							 | 
						||
| 
								 | 
							
								        raise Exception(f"unsupported type: {str(type(obj))}")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def packb(obj):
							 | 
						||
| 
								 | 
							
								    fp = io.BytesIO()
							 | 
						||
| 
								 | 
							
								    pack(obj, fp)
							 | 
						||
| 
								 | 
							
								    return fp.getvalue()
							 |