"""
This code is taken from the itertree package:
_ _____ _____ _____ _____ _____ _____ _____
| |_ _| __| __ |_ _| __ | __| __|
|-| | | | __| -| | | | -| __| __|
|_| |_| |_____|__|__| |_| |__|__|_____|_____|
https://pypi.org/project/itertree/
GIT Home:
https://github.com/BR1py/itertree
The documentation can be found here:
https://itertree.readthedocs.io/en/latest/index.html
The code is published under MIT license
For more information see: https://en.wikipedia.org/wiki/MIT_License
CONTENT DESCRIPTION:
This part of code contains the standard iTree serializers (JSON and rendering)
"""
from __future__ import absolute_import
import gzip
import hashlib
import os
from collections import OrderedDict,deque
# For serializing we (try) to import some modules:
IS_ORJSON=False
DECODE = False
try:
import rapidjson
except:
rapidjson = None
try:
import orjson as JSON
IS_ORJSON=True
# orjson is much faster than standard json but might not be available
import json
except:
import json as JSON
test=bytes(JSON.dumps({'test':'123','HASH':hashlib.sha1(b'asjhdahsdh').hexdigest()}).encode('utf8', errors='backslashreplace'))
try:
JSON.loads(test)
except TypeError:
DECODE=True
class np():
class nparray():
pass
try:
import numpy as np
# only needed in case of numpy arrays in data
# for serializing the data
np_loaded = True
except ImportError:
np_loaded = False
np=None
from itertree import *
[docs]
def Converter_1_1_1_to_2_0_0(src_path,check_hash=True):
return Converter_1_1_1_to_2_0_0_Cls().convert(src_path,check_hash)
[docs]
class Converter_1_1_1_to_2_0_0_Cls(object):
"""
This is the standard serializer for DataTree which translates the structure
into the JSON format.
Users might implement their own serializers using the interface methods defined
in this serializer
"""
ITREE_SERIALIZE_VERSION = "1.1.1"
TREE = 'iT'
DATA = 'DT'
LINK = 'LK'
TAG = 'TG'
IDX = 'IDX'
DATA_MODELL = 'DM'
DTYPE = 'TP'
DATA_CONTAINER = 'DC'
ITREE_ITEMS_DECODE={'iT','iTRO','iTl','iTPH','iTI'}
[docs]
def convert(self,src_path,check_hash=True):
source_str=None
if type(src_path) is str:
if not os.path.exists(src_path):
raise FileNotFoundError('Error file "%s" not found' % src_path)
with open(src_path, 'rb') as fh:
source_str = fh.read()
else:
# file handle
source_str = src_path.read()
pack = False
try:
source_str = gzip.decompress(source_str)
pack=True
except OSError:
# we might have an already unzipped file!
pass
if source_str is None:
raise SyntaxError('File couldnot be loaded')
header_str, dt_str = source_str.split(b'}***DT***')
if DECODE:
header_dict = JSON.loads((header_str + b'}').decode('utf-8'))
else:
header_dict = JSON.loads(header_str + b'}')
if header_dict['VERSION'] != self.ITREE_SERIALIZE_VERSION:
raise PermissionError('Wrong version of DataTree data please convert first!')
if check_hash and ('HASH' in header_dict):
if hashlib.sha1(dt_str).hexdigest() != header_dict['HASH']:
raise PermissionError('Given DataTree data is corrupted (wrong hash)!')
if DECODE:
raw_o = JSON.loads(dt_str.decode('utf-8'))
else:
raw_o = JSON.loads(dt_str)
new_tree=self.create_itree_from_raw(raw_o)
return new_tree
[docs]
def create_itree_from_raw(self,raw_o):
decode = self.create_itree_from_raw
t = type(raw_o)
if t is dict:
o_type = raw_o.get(self.DTYPE)
if o_type is None:
return {decode(key): decode(v) for key, v in raw_o}
elif o_type in self.ITREE_ITEMS_DECODE:
tag = decode(raw_o.get(self.TAG))
if o_type == 'iTPH':
return iTree(tag,flags=iTree._PLACEHOLDER)
elif o_type == 'iTI':
return TagIdx(tag, raw_o.get(self.IDX))
else:
data = raw_o.get(self.DATA)
if data is not None:
data = decode(data)
tree = raw_o.get(self.TREE)
link = raw_o.get(self.LINK)
if link is not None:
new_item = iTree(tag, value=data,
subtree=[decode(i) for i in tree],
link=iTLink(link[0],decode(link[1]))
)
else:
if o_type == 'iT':
new_item = iTree(tag, value=data, subtree=[decode(i) for i in tree])
else:
new_item = iTree(tag, value=data,
subtree=[decode(i) for i in tree],
flags=iTree._READ_ONLY_TREE|iTree._READ_ONLY_VALUE)
return new_item
elif o_type == 'OD':
return OrderedDict([(decode(i[0]), decode(i[1])) for i in raw_o[self.DATA_CONTAINER]])
elif (o_type == 'D') or (o_type == 'd') or (o_type == 'DR'):
return {decode(i[0]): decode(i[1]) for i in raw_o[self.DATA_CONTAINER]}
elif o_type.startswith('ITER:'):
new = [decode(i) for i in raw_o.get(self.DATA_CONTAINER)]
return eval('%s(new)' % o_type[5:])
elif o_type == 'npa':
# here we will get an exception in case numpy is not available!
if not np_loaded:
raise ImportError(
'Numpy is needed for decoding of this data! It is not found in the python installation')
return np.frombuffer(bytes(bytearray(raw_o.get(self.DATA_CONTAINER))), raw_o.get('np.dtype'))
return raw_o