import numpy as np
import pandas as pd
from functools import partial
from gtable.column import Column
from gtable.lib import records, stack_table_inplace, add_column, \
merge_table, sort_table, filter_table, dropnan_table, first_record, \
last_record, fillna_column, from_chunks, required_columns, required_column
def _check_length(i, k, this_length, length_last):
if i == 0:
length_last = this_length
else:
if this_length != length_last:
raise ValueError("Column {} length mismatch".format(k))
else:
length_last = this_length
return length_last
def get_reductor(out_check_sorted):
from gtable.reductions import reduce_funcs, reduce_by_key
class ReductorByKey:
@staticmethod
def __dir__():
return [f for f in reduce_funcs]
def __init__(self, table, column, check_sorted=out_check_sorted):
for reduction_f in reduce_funcs:
self.__dict__[reduction_f] = partial(
reduce_by_key, table, column, reduction_f, check_sorted)
return ReductorByKey
[docs]class Table:
"""
Table is a class for fast columnar storage using a bitmap index for
sparse storage
"""
def __init__(self, data={}):
# This list stores the keys
self.keys = []
# This list stores the columns
self.data = []
# This is the index bitmap
self.index = None
length_last = 0
# Creating the table only supports assigning a single index
for i, (k, v) in enumerate(data.items()):
# If the column is a list, cast it to a numpy array
if type(v) == list:
# TODO: Remove ASAP
# You may get a list of Timestamps. Specific to NFQ
if len(v) > 0 and type(v[0]) == pd.Timestamp:
self.data.append(pd.DatetimeIndex(v).values)
else:
self.data.append(np.array(v))
self.keys.append(k)
length_last = _check_length(i, k, len(v), length_last)
elif type(v) == np.ndarray:
if not len(v.shape) == 1:
raise ValueError("Only 1D arrays supported")
self.data.append(v)
self.keys.append(k)
length_last = _check_length(i, k, v.shape[0], length_last)
# Pandas DatetimeIndex is supported for convenience.
elif type(v) == pd.DatetimeIndex:
self.data.append(np.array(v))
self.keys.append(k)
length_last = _check_length(i, k, v.shape[0], length_last)
else:
raise ValueError("Column type not supported")
# Create the index and the ordered arrays
self.index = np.ones((len(data), length_last), dtype=np.uint8)
def _repr_html_(self):
return "<i>xxx</i>"
def _index_column(self, key):
return self.index[self.keys.index(key), :]
[docs] def copy(self):
"""
Returns a copy of the table
"""
t = Table()
t.data = [d.copy() for d in self.data]
t.keys = self.keys[:]
t.index = self.index.copy()
return t
[docs] def add_column(self, k, v, dtype=None, index=None, align='top'):
"""
Column concatenation.
"""
add_column(self, k, v, dtype, index, align=align)
[docs] def del_column(self, k):
"""
Column deletion
"""
del self[k]
idx = self.keys.index(k)
self.keys.pop(idx)
self.index = np.delete(self.index, idx, axis=0)
[docs] def stack(self, table):
"""Vertical (Table) concatenation."""
stack_table_inplace(self, table)
[docs] def merge(self, table, column):
"""Merge two tables using two dense and sorted columns"""
self.data, self.keys, self.index = merge_table(table, self, column)
[docs] def records(self, fill=False):
"""Generator that returns a dictionary for each row of the table"""
yield from records(self, fill)
[docs] def sort_by(self, column):
"""Sorts by values of a column"""
sort_table(self, column)
[docs] def filter(self, predicate):
"""Filter table using a column specification or predicate"""
t = Table()
t.data, t.keys, t.index = filter_table(self, predicate)
return t
[docs] def sieve(self, idx):
"""Filter table using a one-dimensional array of boolean values"""
t = Table()
# This could be improved, but added as syntactic sugar ATM.
t.data, t.keys, t.index = filter_table(self, Column(idx.astype(np.int8), np.ones_like(idx)))
return t
[docs] def crop(self, key):
"""Purge the records where the column key is empty"""
t = Table()
col = self.get(key)
predicate = (col == col)
t.data, t.keys, t.index = filter_table(self, predicate)
return t
[docs] def first_record(self, fill=False):
"""Returns the first record of the table"""
return first_record(self, fill)
[docs] def last_record(self, fill=False):
"""Returns the last record of the table"""
return last_record(self, fill)
[docs] def to_pandas(self, fill=False):
"""Translate the table to a pandas dataframe"""
return pd.DataFrame.from_records(self.records(fill))
[docs] def to_dict(self):
"""Translate the table to a dict {key -> array_of_values}"""
return {k: v for k, v in zip(self.keys, self.data)}
[docs] def dropnan(self, clip=False):
"""Drop the NaNs and leave missing values instead"""
dropnan_table(self)
[docs] def get(self, key, copy=False):
"""Gets a column or a table with columns"""
if type(key) == str:
return Column(self[key], self._index_column(key))
elif type(key) == list or type(key) == tuple:
t = Table()
indices = [self.keys.index(k) for k in key]
if copy:
t.data = [self.data[idx].copy() for idx in indices]
t.index = self.index[indices, :][:, :]
else:
t.data = [self.data[idx].copy() for idx in indices]
t.index = self.index[indices, :]
t.keys = key
return t
[docs] def fillna_column(self, key, reverse=False, fillvalue=None):
"""
Fillna on a column inplace
:param key: string or list
:param reverse:
:param fillvalue:
:return:
"""
if (type(key) == list) or (type(key) == tuple):
for k in key:
self[k], self.index[self.keys.index(k), :] = fillna_column(
self[k], self._index_column(k), reverse, fillvalue)
else:
self[key], self.index[self.keys.index(key), :] = fillna_column(
self[key], self._index_column(key), reverse, fillvalue)
[docs] def fill_column(self, key, fillvalue):
"""
Fill N/A elements in the given columns with fillvalue
:param key: String, list or tuple with the column names to be filled.
:param fillvalue: Scalar to fill the N/A elements
:return:
"""
if (type(key) == list) or (type(key) == tuple):
for k in key:
col = getattr(self, k)
col.fill(fillvalue)
setattr(self, k, col)
else:
col = getattr(self, key)
col.fill(fillvalue)
setattr(self, key, col)
[docs] def reduce_by_key(self, column, check_sorted=False):
"""
Reduce by key
:param column:
:param check_sorted:
:return:
"""
return get_reductor(check_sorted)(self, column)
[docs] def required_column(self, key, dtype):
"""
Enforce the required column with a dtype
:param key:
:param dtype:
:return:
"""
required_column(self, key, dtype)
[docs] def required_columns(self, *args):
"""
Enforce the required columns. Create empty columns if necessary.
:param args:
:return:
"""
required_columns(self, *args)
[docs] def rename_column(self, old_name, new_name):
"""
Rename a column of the table
:param old_name:
:param new_name:
:return:
"""
idx = self.keys.index(old_name)
if new_name not in self.keys:
self.keys[idx] = new_name
else:
raise ValueError('Column names must be unique')
[docs] @classmethod
def from_pandas(cls, dataframe):
"""Create a table from a pandas dataframe"""
table = cls()
if np.all(np.isfinite(dataframe.index.values)):
table.add_column('idx', dataframe.index.values)
else:
raise ValueError('Dataframe index must not contain NaNs')
for k in dataframe:
if dataframe[k].values.dtype == np.dtype('O'):
table.add_column(k, np.array(list(dataframe[k].values)))
elif dataframe[k].values.dtype == np.dtype('datetime64[ns]'):
nidx = dataframe[k].values == np.datetime64('NaT')
table.add_column(k, dataframe[k].values[~nidx], dtype=dataframe[k].values.dtype, index=~nidx)
else:
nidx = np.isnan(dataframe[k].values)
table.add_column(k, dataframe[k].values[~nidx], dtype=dataframe[k].values.dtype, index=~nidx)
return table
[docs] @staticmethod
def from_chunks(chunks):
"""
Create a table from table chunks
:param chunks:
:return:
"""
return from_chunks(chunks)
def __repr__(self):
column_info = list()
for k, v in zip(self.keys, self.data):
if type(v) == np.ndarray:
column_type = v.dtype
else:
column_type = 'object'
count = np.count_nonzero(self._index_column(k))
column_info.append('{}[{}] <{}>'.format(k, count, column_type))
return "<Table[ {} ] object at {}>".format(', '.join(column_info),
hex(id(self)))
def __contains__(self, item):
return item in self.keys
@staticmethod
def __dir__():
return []
def __getattr__(self, key):
return Column(self.data[self.keys.index(key)], self._index_column(key))
def __getitem__(self, key):
return self.data[self.keys.index(key)]
def __setitem__(self, key, value):
if isinstance(value, np.ndarray):
self.data[self.keys.index(key)] = value
else:
raise ValueError('Direct assignment only valid with Numpy arrays')
def __delitem__(self, key):
del self.data[self.keys.index(key)]
def __setattr__(self, key, value):
if key in ['data', 'keys', 'index']:
self.__dict__[key] = value
else:
if type(value) == Column:
if key in self.keys:
self.data[self.keys.index(key)] = value.values
self.index[self.keys.index(key), :] = value.index
else:
self.add_column(key, value.values, value.index)
elif type(value) == np.ndarray:
if key in self.keys:
self.data[self.keys.index(key)] = value
else:
self.add_column(key, value)
elif type(value) == pd.DatetimeIndex:
if key in self.keys:
self.data[self.keys.index(key)] = value.values
else:
self.add_column(key, value)
def __getstate__(self):
index = self.index.copy()
data = [d.copy() for d in self.data]
keys = self.keys[:]
return index, data, keys
def __setstate__(self, state):
index, data, keys = state
self.index = index
self.data = data
self.keys = keys
def __len__(self):
return self.index.shape[1]