Source code for unicum.datarange

# -*- coding: utf-8 -*-

# unicum
# ------
# Python library for simple object cache and factory.
# 
# Author:   sonntagsgesicht, based on a fork of Deutsche Postbank [pbrisk]
# Version:  0.3, copyright Wednesday, 18 September 2019
# Website:  https://github.com/sonntagsgesicht/unicum
# License:  Apache License 2.0 (see LICENSE file)


LEFT_TOP = None


[docs]class DataRange(object): def __init__(self, iterable=None, value_types=(float, int, str, type(None)), none_alias=(None, ' ', '', 'None'), **kwargs): self._value_types = value_types self._none_alias = none_alias self._col_keys = list() self._row_keys = list() # convert dict into nested list if isinstance(iterable, dict): iterable = DataRange.__dict_to_nested_list(iterable) # convert DataRange into nested list if isinstance(iterable, DataRange): iterable = iterable.to_serializable() # replace None alias by None none_alias = none_alias if isinstance(none_alias, (tuple, list)) else [none_alias] if iterable: f = (lambda x: None if x in none_alias else x) iterable = [[f(c) for c in r] for r in iterable] # slice nested list iterable into (column headers, row headers, data) col_keys, row_keys, item_list = DataRange.__slice_nested_list(iterable) # validate iterable (only admissible value types) for row_value in item_list: for value in row_value: self._validate_value(value) # validate column header entries for ambiguity (no int) # if any(isinstance(col_key, int) for col_key in col_keys): # raise KeyError('Column keys must not be of type int.') # validate row header entries for ambiguity (no int < len(row_headers)) # if any(isinstance(row_key, int) for row_key in row_keys): # raise KeyError('If row keys are of type int, values not must between 0 and %i.' % len(row_keys)) if not len(col_keys) == len(set(col_keys)): raise KeyError('Col keys must be unique.') if not len(row_keys) == len(set(row_keys)): raise KeyError('Row keys must be unique.') self._col_keys = col_keys self._row_keys = row_keys self._item_list = item_list iterable = list() for row_key, row_values in zip(row_keys, item_list): for col_key, value in zip(col_keys, row_values): iterable.append(((row_key, col_key), value)) # super(DataRange, self).__init__(iterable, **kwargs) self._dict = dict(iterable, **kwargs) self._hash = hash(self) @staticmethod def __dict_to_nested_list(iterable): item_list = [list(iterable.keys())] for i in range(max(list(map(len, list(iterable.values()))))): item_list.append([iterable[k][i] for k in item_list[0]]) return item_list @staticmethod def __slice_nested_list(iterable): if not iterable: return list(), list(), list() # extract column headers from first row col_header = iterable.pop(0) if not len(set(col_header)) == len(col_header): raise ValueError('All column header entries must be unique.') # extract row headers if given if col_header.count(None): i = col_header.index(None) col_header.pop(i) row_header = [row.pop(i) for row in iterable] else: row_header = list(range(len(iterable))) if not len(set(row_header)) == len(row_header): raise ValueError('All row header entries must be unique.') return col_header, row_header, iterable def _validate_value(self, value): if not isinstance(value, self._value_types): s = ', '.join([str(t) for t in self._value_types]) t = type(value) msg = 'All properties of item in this DataRange must have type ' \ 'of either one of %s but not %s.' % (s, t) raise TypeError(msg) return True def _validate_key_format(self, key): if not isinstance(key, (list, tuple)) or not len(key) == 2: s = self.__class__.__name__, key.__class__.__name__ raise KeyError('Key of %s must be (row_key, col_key) tuple not %s.' % s) def _validate_key_existence(self, key): self._validate_key_format(key) r, c = key if r not in self._row_keys or c not in self._col_keys: s = self.__class__.__name__, str(self._row_keys), str(self._col_keys) msg = 'Key of %s must be (row_key, col_key) tuple with \n row_key in %s \n and col_key in %s' % s raise KeyError(msg) return True def __hash__(self): return hash(frozenset(list(self._dict.items()))) def __repr__(self): return self.__class__.__name__ + '(%s)' % str(id(self)) def __str__(self): return self.__class__.__name__ + '(%s)' % str(self.to_serializable()) def __len__(self): return len(self._row_keys) def __eq__(self, other): if not isinstance(other, DataRange): raise TypeError('Cannot compare %s with %s.' % (self.__class__.__name__, other.__class__.__name__)) return self.total_list == other.total_list def __contains__(self, key): try: self._validate_key_existence(key) except KeyError: return False return True def __copy__(self): return self.__class__(self.total_list) def __deepcopy__(self, memo): return self.__class__(self.to_serializable()) def __setitem__(self, key, value): self._validate_key_format(key) self._validate_value(value) self._dict[key] = value self._update_cache() def __getitem__(self, key): # redirect slice if isinstance(key, slice): return self.__getslice__(key.start, key.stop) self._validate_key_format(key) self._validate_key_existence(key) return self._dict.get(key, None) def __delitem__(self, key): raise NotImplementedError def __setslice__(self, i, j, sequence): raise NotImplementedError def __getslice__(self, i, j): self._update_cache() if not isinstance(i, (tuple, list)): l = len(self._col_keys) return self[(i, 0):(j, l)] ri, ci = i rj, cj = j ri = self._row_keys.index(ri) if self._row_keys.count(ri) else ri rj = self._row_keys.index(rj) + 1 if self._row_keys.count(rj) else rj ci = self._col_keys.index(ci) if self._col_keys.count(ci) else ci cj = self._col_keys.index(cj) + 1 if self._col_keys.count(cj) else cj row_keys = self._row_keys[ri:rj] col_keys = self._col_keys[ci:cj] ret = list() for r in row_keys: row = [self[r, c] for c in col_keys] ret.append(row) return ret def __delslice__(self, i, j): raise NotImplementedError
[docs] def update(self, other): r = self._dict.update(other) self._update_cache() return r
[docs] def keys(self): return list(self._dict.keys())
[docs] def values(self): return list(self._dict.values())
[docs] def items(self): return list(self._dict.items())
[docs] def get(self, key, default=None): return self._dict.get(key, default)
[docs] def pop(self, key, default=None): r = self._dict.pop(key, default) self._update_cache() return r
[docs] def popitem(self): raise NotImplementedError
[docs] def row_append(self, row_key, value_list): """ append a new row to a DataRange :param row_key: a string :param value_list: a list """ if row_key in self._row_keys: raise KeyError('Key %s already exists in row keys.' % row_key) if not len(value_list) == len(self._col_keys): raise ValueError('Length of data to set does not meet expected row length of %i' % len(self._col_keys)) self.update(list(zip(list(zip([row_key]*len(self._col_keys), self._col_keys)), value_list)))
[docs] def col_append(self, col_key, value_list): """ append a new row to a DataRange :param row_key: a string :param value_list: a list """ if col_key in self._col_keys: raise KeyError('Key %s already exists col keys.' % col_key) if not isinstance(value_list, (tuple, list)): value_list = [value_list] * len(self._row_keys) if not len(value_list) == len(self._row_keys): raise ValueError('Length of data to set does not meet expected col length of %i' % len(self._row_keys)) self.update(list(zip(list(zip(self._row_keys, [col_key]*len(self._row_keys))), value_list)))
def _update_cache(self): if not self._hash == hash(self): row_keys = sorted(list(set([row_key for row_key, col_key in list(self.keys()) if row_key not in self._row_keys]))) self._row_keys += row_keys col_keys = sorted(list(set([col_key for row_key, col_key in list(self.keys()) if col_key not in self._col_keys]))) self._col_keys += col_keys self._item_list = [[self.get((r, c)) for c in self._col_keys] for r in self._row_keys] self._hash = hash(self) # @property
[docs] def row_keys(self): return self._row_keys
# @property
[docs] def col_keys(self): return self._col_keys
[docs] def row(self, item): i = self.row_keys().index(item) return self.item_list[i]
[docs] def col(self, item): j = self.col_keys().index(item) return [row[j] for row in self.item_list]
@property def item_list(self): return self._item_list @property def total_list(self): if not self: return [[]] total = [[LEFT_TOP] + self.col_keys()] for r, row in zip(self.row_keys(), self.item_list): total.append([r] + row) return total
[docs] def to_serializable(self, level=0, all_properties_flag=False, recursive=True): ret = list() for r in self.total_list: l = list() for v in r: if recursive: v = v if not hasattr(v, 'to_serializable') else v.to_serializable(level + 1, all_properties_flag) v = v if isinstance(v, (float, int, type(None))) else str(v) v = self._none_alias[0] if isinstance(v, type(None)) else v l.append(v) ret.append(l) return ret
[docs] def transpose(self): return self.__class__(list(zip(*self.total_list)), value_types=self._value_types, none_alias=self._none_alias)
[docs] def append(self, key, value): self.row_append(key, value)
[docs] def extend(self, other): raise NotImplementedError
[docs] def insert(self, item, key, value=None): raise NotImplementedError
def __reduce__(self): return self.__class__, (self.total_list, self._value_types, self._none_alias)