# -*- coding: utf-8 -*-
from __future__ import division, print_function
import io
import difflib
import base64
from typing import Iterable
import os
import math
import time
import itertools
import functools
import collections
import sys
import platform
import warnings
import re
from functools import reduce
import threading
import six
import vaex.utils
# import vaex.image
import numpy as np
import concurrent.futures
import numbers
import pyarrow as pa
from vaex.utils import Timer
import vaex.events
# import vaex.ui.undo
import vaex.grids
import vaex.hash
import vaex.multithreading
import vaex.promise
import vaex.execution
import vaex.expresso
import vaex.tasks
import logging
import vaex.kld
from . import selections, tasks, scopes
from .expression import expression_namespace
from .delayed import delayed, delayed_args, delayed_list
from .column import Column, ColumnIndexed, ColumnSparse, ColumnString, ColumnConcatenatedLazy, supported_column_types
from . import array_types
import vaex.events
from .datatype import DataType
from .docstrings import docsubst
astropy = vaex.utils.optional_import("astropy.units")
xarray = vaex.utils.optional_import("xarray")
# py2/p3 compatibility
try:
from urllib.parse import urlparse
except ImportError:
from urlparse import urlparse
_DEBUG = os.environ.get('VAEX_DEBUG', False) # extra sanity checks that might hit performance
_REPORT_EXECUTION_TRACES = vaex.utils.get_env_type(int, 'VAEX_EXECUTE_TRACE', 0)
DEFAULT_REPR_FORMAT = 'plain'
FILTER_SELECTION_NAME = '__filter__'
sys_is_le = sys.byteorder == 'little'
logger = logging.getLogger("vaex")
lock = threading.Lock()
default_shape = 128
default_chunk_size = 1024**2
# executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)
# executor = vaex.execution.default_executor
def _len(o):
return o.__len__()
def _requires(name):
def wrap(*args, **kwargs):
raise RuntimeError('this function is wrapped by a placeholder, you probably want to install vaex-' + name)
return wrap
from .utils import (_ensure_strings_from_expressions,
_ensure_string_from_expression,
_ensure_list,
_is_limit,
_isnumber,
_issequence,
_is_string, _normalize_selection,
_parse_reduction,
_parse_n,
_normalize_selection_name,
_normalize,
_parse_f,
_expand,
_expand_shape,
_expand_limits,
as_flat_float,
as_flat_array,
_split_and_combine_mask)
main_executor = None # vaex.execution.Executor(vaex.multithreading.pool)
from vaex.execution import Executor
def get_main_executor():
global main_executor
if main_executor is None:
main_executor = vaex.execution.ExecutorLocal(vaex.multithreading.get_main_pool())
return main_executor
# we import after function_mapping is defined
from .expression import Expression
_functions_statistics_1d = []
def stat_1d(f):
_functions_statistics_1d.append(f)
return f
def _hidden(meth):
"""Mark a method as hidden"""
meth.__hidden__ = True
return meth
@vaex.encoding.register("dataframe")
class _DataFrameEncoder:
@staticmethod
def encode(encoding, df):
state = df.state_get(skip=[df.dataset])
return {
'state': encoding.encode('dataframe-state', state),
'dataset': encoding.encode('dataset', df.dataset)
}
@staticmethod
def decode(encoding, spec):
dataset = encoding.decode('dataset', spec['dataset'])
state = encoding.decode('dataframe-state', spec['state'])
df = vaex.from_dataset(dataset)._future()
df.state_set(state)
return df
[docs]
class DataFrame(object):
"""All local or remote datasets are encapsulated in this class, which provides a pandas
like API to your dataset.
Each DataFrame (df) has a number of columns, and a number of rows, the length of the DataFrame.
All DataFrames have multiple 'selection', and all calculations are done on the whole DataFrame (default)
or for the selection. The following example shows how to use the selection.
>>> df.select("x < 0")
>>> df.sum(df.y, selection=True)
>>> df.sum(df.y, selection=[df.x < 0, df.x > 0])
:type signal_selection_changed: events.Signal
:type executor: Executor
"""
[docs]
def __init__(self, name=None, executor=None):
self.executor = executor or get_main_executor()
self.name = name
self._init()
def _init(self):
self.column_names = []
self.signal_pick = vaex.events.Signal("pick")
self.signal_sequence_index_change = vaex.events.Signal("sequence index change")
self.signal_selection_changed = vaex.events.Signal("selection changed")
self.signal_active_fraction_changed = vaex.events.Signal("active fraction changed")
self.signal_column_changed = vaex.events.Signal("a column changed") # (df, column_name, change_type=["add", "remove", "change"])
self.signal_variable_changed = vaex.events.Signal("a variable changed")
self.variables = {}
self.virtual_columns = {}
# we also store the virtual columns as expressions, for performance reasons
# the expression object can cache the ast, making renaming/rewriting faster
self._virtual_expressions = {}
self.functions = {}
self._length_original = None
self._length_unfiltered = None
self._cached_filtered_length = None
self._filter_filled = False
self._active_fraction = 1
self._current_row = None
self._index_start = 0
self._index_end = None
self.description = None
self.ucds = {}
self.units = {}
self.descriptions = {}
self.favorite_selections = {}
# this is to be backward compatible with v4 for now
self._future_behaviour = False
self.mask = None # a bitmask for the selection does not work for server side
# maps from name to list of Selection objets
self.selection_histories = collections.defaultdict(list)
# after an undo, the last one in the history list is not the active one, -1 means no selection
self.selection_history_indices = collections.defaultdict(lambda: -1)
assert self.filtered is False
self._auto_fraction = False
self._sparse_matrices = {} # record which sparse columns belong to which sparse matrix
self._categories = {}
self._selection_mask_caches = collections.defaultdict(dict)
self._selection_masks = {} # maps to vaex.superutils.Mask object
self._renamed_columns = []
# weak refs of expression that we keep to rewrite expressions
self._expressions = []
self.local = threading.local()
# a check to avoid nested aggregator calls, which make stack traces very difficult
# like the ExecutorLocal.local.executing, this needs to be thread local
self.local._aggregator_nest_count = 0
# a per dataframe lock for thread safe changing the state
self._state_lock = threading.Lock()
[docs]
def fingerprint(self, dependencies=None, treeshake=False):
'''Id that uniquely identifies a dataframe (cross runtime).
:param set[str] dependencies: set of column, virtual column, function or selection names to be used.
:param bool treeshake: Get rid of unused variables before calculating the fingerprint.
'''
df = self.copy(treeshake=True) if treeshake else self
selections = {name: self.get_selection(name) for name, history in self.selection_histories.items() if self.has_selection(name)}
if dependencies is not None:
dependencies = set(dependencies) # copy
# these are implicit dependencies that we need to add
for selection in selections.values():
dependencies.update(selection.dependencies(self))
# we only use the state parts that affect data (no metadata)
encoding = vaex.encoding.Encoding()
def dep_filter(d : dict):
if dependencies is None:
return d
return {k: v for k, v in d.items() if k in dependencies}
state = dict(
column_names=[k for k in list(self.column_names) if dependencies is None or k in dependencies],
virtual_columns=dep_filter(self.virtual_columns),
# variables go unencoded
variables=dep_filter(self.variables),
# for functions it should be fast enough (not large amounts of data)
functions={name: encoding.encode("function", value) for name, value in dep_filter(self.functions).items()},
active_range=[self._index_start, self._index_end]
)
# selections can affect the filter, so put them all in
state['selections'] = {name: selection.to_dict() if selection is not None else None for name, selection in selections.items()}
fp = vaex.cache.fingerprint(state, df.dataset.fingerprint)
return f'dataframe-{fp}'
[docs]
def __dataframe__(self, nan_as_null : bool = False, allow_copy : bool = True):
"""
"""
import vaex.dataframe_protocol
return vaex.dataframe_protocol._VaexDataFrame(self, nan_as_null=nan_as_null, allow_copy=allow_copy)
def _future(self, version=5, inplace=False):
'''Act like a Vaex dataframe version 5.
meaning:
* A dataframe with automatically encoded categorical data
* state version 5 (which stored the dataset)
'''
df = self if inplace else self.copy()
df._future_behaviour = 5
return df
_auto_encode = _hidden(vaex.utils.deprecated('use _future')(_future))
def __getattr__(self, name):
# will support the hidden methods
if name in self.__hidden__:
return self.__hidden__[name].__get__(self)
else:
return object.__getattribute__(self, name)
def _ipython_key_completions_(self):
return self.get_column_names()
@property
def func(self):
class Functions(object):
pass
functions = Functions()
for name, value in expression_namespace.items():
# f = vaex.expression.FunctionBuiltin(self, name)
def closure(name=name, value=value):
local_name = name
def wrap(*args, **kwargs):
def myrepr(k):
if isinstance(k, Expression):
return str(k)
elif isinstance(k, np.ndarray) and k.ndim == 0:
# to support numpy scalars
return myrepr(k.item())
elif isinstance(k, np.ndarray):
# to support numpy arrays
var = self.add_variable('arg_numpy_array', k, unique=True)
return var
elif isinstance(k, list):
# to support numpy scalars
return '[' + ', '.join(myrepr(i) for i in k) + ']'
else:
return repr(k)
arg_string = ", ".join([myrepr(k) for k in args] + ['{}={}'.format(name, myrepr(value)) for name, value in kwargs.items()])
expression = "{}({})".format(local_name, arg_string)
return vaex.expression.Expression(self, expression)
return wrap
f = closure()
try:
f = functools.wraps(value)(f)
except AttributeError:
pass # python2 quicks.. ?
setattr(functions, name, f)
for name, value in self.functions.items():
setattr(functions, name, value)
return functions
@_hidden
@vaex.utils.deprecated('use is_category')
def iscategory(self, column):
return self.is_category(column)
def is_datetime(self, expression):
dtype = self.data_type(expression)
return isinstance(dtype, np.dtype) and dtype.kind == 'M'
def is_string(self, expression):
return vaex.array_types.is_string_type(self.data_type(expression))
[docs]
def is_category(self, column):
"""Returns true if column is a category."""
column = _ensure_string_from_expression(column)
# TODO: we don't support DictionaryType for remote dataframes
if self.is_local() and column in self.columns:
# TODO: we don't support categories as expressions
dtype = vaex.dtype_of(self.columns[column])
if dtype.is_encoded:
return True
return column in self._categories
def _category_dictionary(self, column):
'''Return the dictionary for a column if it is an arrow dict type'''
if column in self.columns:
x = self.columns[column]
dtype = vaex.dtype_of(x)
if dtype.is_encoded:
x = x[:1] # could be a proxy
# we're interested in the type of the dictionary or the indices?
if isinstance(x, pa.ChunkedArray):
# take the first dictionaryu
x = x.chunks[0]
dictionary = x.dictionary
return dictionary
def category_labels(self, column, aslist=True):
column = _ensure_string_from_expression(column)
if column in self._categories:
return self._categories[column]['labels']
dictionary = self._category_dictionary(column)
if dictionary is not None:
if aslist:
dictionary = dictionary.to_pylist()
return dictionary
else:
raise ValueError(f'Column {column} is not a categorical')
def category_values(self, column):
column = _ensure_string_from_expression(column)
return self._categories[column]['values']
def category_count(self, column):
column = _ensure_string_from_expression(column)
if column in self._categories:
return self._categories[column]['N']
dictionary = self._category_dictionary(column)
if dictionary is not None:
return len(dictionary)
else:
raise ValueError(f'Column {column} is not a categorical')
def category_offset(self, column):
column = _ensure_string_from_expression(column)
if column in self._categories:
return self._categories[column]['min_value']
dictionary = self._category_dictionary(column)
if dictionary is not None:
return 0
else:
raise ValueError(f'Column {column} is not a categorical')
[docs]
def execute(self):
'''Execute all delayed jobs.'''
# make sure we only add the tasks at the last moment, after all operations are added (for cache keys)
if not self.executor.tasks:
logger.info('no task to execute')
return
if _REPORT_EXECUTION_TRACES:
import traceback
trace = ''.join(traceback.format_stack(limit=_REPORT_EXECUTION_TRACES))
print('Execution triggerd from:\n', trace)
print("Tasks:")
for task in self.executor.tasks:
print(repr(task))
if self.executor.tasks:
self.executor.execute()
[docs]
async def execute_async(self):
'''Async version of execute'''
await self.executor.execute_async()
@property
def filtered(self):
return self.has_selection(FILTER_SELECTION_NAME)
def map_reduce(self, map, reduce, arguments, progress=False, delay=False, info=False, to_numpy=True, ignore_filter=False, pre_filter=False, name='map reduce (custom)', selection=None):
# def map_wrapper(*blocks):
pre_filter = pre_filter and self.filtered
task = tasks.TaskMapReduce(self, arguments, map, reduce, info=info, to_numpy=to_numpy, ignore_filter=ignore_filter, selection=selection, pre_filter=pre_filter)
progressbar = vaex.utils.progressbars(progress)
progressbar.add_task(task, f'map reduce: {name}')
task = self.executor.schedule(task)
return self._delay(delay, task)
[docs]
def apply(self, f, arguments=None, vectorize=False, multiprocessing=True):
"""Apply a function on a per row basis across the entire DataFrame.
Example:
>>> import vaex
>>> df = vaex.example()
>>> def func(x, y):
... return (x+y)/(x-y)
...
>>> df.apply(func, arguments=[df.x, df.y])
Expression = lambda_function(x, y)
Length: 330,000 dtype: float64 (expression)
-------------------------------------------
0 -0.460789
1 3.90038
2 -0.642851
3 0.685768
4 -0.543357
:param f: The function to be applied
:param arguments: List of arguments to be passed on to the function f.
:param vectorize: Call f with arrays instead of a scalars (for better performance).
:param bool multiprocessing: Use multiple processes to avoid the GIL (Global interpreter lock).
:return: A function that is lazily evaluated.
"""
assert arguments is not None, 'for now, you need to supply arguments'
import types
if isinstance(f, types.LambdaType):
name = 'lambda_function'
else:
name = f.__name__
if not vectorize:
f = vaex.expression.FunctionToScalar(f, multiprocessing)
else:
f = vaex.expression.FunctionSerializablePickle(f, multiprocessing)
lazy_function = self.add_function(name, f, unique=True)
arguments = _ensure_strings_from_expressions(arguments)
return lazy_function(*arguments)
[docs]
@docsubst
def nop(self, expression=None, progress=False, delay=False):
"""Evaluates expression or a list of expressions, and drops the result. Usefull for benchmarking, since vaex is usually lazy.
:param expression: {expression}
:param progress: {progress}
:param delay: {delay}
:returns: None
"""
if expression is None:
expressions = self.get_column_names()
else:
expressions = _ensure_list(_ensure_strings_from_expressions(expression))
def map(*ar):
pass
def reduce(a, b):
pass
return self.map_reduce(map, reduce, expressions, delay=delay, progress=progress, name='nop', to_numpy=False)
def _hash_map_unique(self, expression, progress=False, selection=None, flatten=True, delay=False, limit=None, limit_raise=True, return_inverse=False):
# TODO: selections in aggregation take the same code path as these two statements, maybe refactor this in the future to follow a common code path
selection = vaex.utils._normalize_selection(selection)
selection = str(selection) if isinstance(selection, Expression) else selection
expression = _ensure_string_from_expression(expression)
task = vaex.tasks.TaskHashmapUniqueCreate(self, expression, flatten, limit=limit, selection=selection, return_inverse=return_inverse, limit_raise=limit_raise)
task = self.executor.schedule(task)
progressbar = vaex.utils.progressbars(progress)
progressbar.add_task(task, f"set for {str(expression)}")
return self._delay(delay, task)
# kept for compatibility
_set = _hash_map_unique
def _index(self, expression, progress=False, delay=False, prime_growth=False, cardinality=None):
column = _ensure_string_from_expression(expression)
# TODO: this does not seem needed
# column = vaex.utils.valid_expression(self.dataset, column)
columns = [column]
from .hash import index_type_from_dtype
from vaex.column import _to_string_sequence
transient = self[column].transient or self.filtered or self.is_masked(column)
if self.is_string(expression) and not transient:
# string is a special case, only ColumnString are not transient
ar = self.columns[str(self[column].expand())]
if not isinstance(ar, ColumnString):
transient = True
dtype = self.data_type(column)
index_type = index_type_from_dtype(dtype, transient, prime_growth=prime_growth)
import queue
if cardinality is not None:
N_index = min(self.executor.thread_pool.nthreads, max(1, len(self)//cardinality))
capacity_initial = len(self) // N_index
else:
N_index = self.executor.thread_pool.nthreads
capacity_initial = 10
indices = queue.Queue()
# we put None to lazily create them
for i in range(N_index):
indices.put(None)
def map(thread_index, i1, i2, selection_masks, blocks):
ar = blocks[0]
index = indices.get()
if index is None:
index = index_type(1)
if hasattr(index, 'reserve'):
index.reserve(capacity_initial)
if vaex.array_types.is_string_type(dtype):
previous_ar = ar
ar = _to_string_sequence(ar)
if not transient:
assert ar is previous_ar.string_sequence
if np.ma.isMaskedArray(ar):
mask = np.ma.getmaskarray(ar)
index.update(ar, mask, i1)
else:
index.update(ar, i1)
indices.put(index)
# cardinality_estimated = sum()
def reduce(a, b):
pass
self.map_reduce(map, reduce, columns, delay=delay, name='index', info=True, to_numpy=False)
index_list = [] #[k for k in index_list if k is not None]
while not indices.empty():
index = indices.get(timeout=10)
if index is not None:
index_list.append(index)
index0 = index_list[0]
for other in index_list[1:]:
index0.merge(other)
return index0
[docs]
@docsubst
def unique(self, expression, return_inverse=False, dropna=False, dropnan=False, dropmissing=False, progress=False, selection=None, axis=None, delay=False, limit=None, limit_raise=True, array_type='python'):
"""Returns all unique values.
:param expression: {expression}
:param return_inverse: Return the inverse mapping from unique values to original values.
:param dropna: {dropna}
:param dropnan: {dropnan}
:param dropmissing: {dropmissing}
:param progress: {progress}
:param selection: {selection}
:param int axis: Axis over which to determine the unique elements (None will flatten arrays or lists)
:param delay: {delay}
:param int limit: {limit}
:param bool limit_raise: {limit_raise}
:param str array_type: {array_type}
"""
if dropna:
dropnan = True
dropmissing = True
if axis is not None:
raise ValueError('only axis=None is supported')
expression = _ensure_string_from_expression(expression)
if self._future_behaviour and self.is_category(expression):
if self.filtered:
keys = pa.array(self.category_labels(expression))
@delayed
def encode(codes):
used_keys = keys.take(codes)
return vaex.array_types.convert(used_keys, array_type)
codes = self[expression].index_values().unique(delay=True)
return self._delay(delay, encode(codes))
else:
keys = pa.array(self.category_labels(expression))
keys = vaex.array_types.convert(keys, array_type)
return self._delay(delay, vaex.promise.Promise.fulfilled(keys))
else:
@delayed
def process(hash_map_unique):
transient = True
data_type_item = self.data_type(expression, axis=-1)
if return_inverse:
# inverse type can be smaller, depending on length of set
inverse = np.zeros(self._length_unfiltered, dtype=np.int64)
dtype = self.data_type(expression)
from vaex.column import _to_string_sequence
def map(thread_index, i1, i2, selection_mask, blocks):
ar = blocks[0]
if vaex.array_types.is_string_type(dtype):
previous_ar = ar
ar = _to_string_sequence(ar)
if not transient:
assert ar is previous_ar.string_sequence
# TODO: what about masked values?
inverse[i1:i2] = hash_map_unique.map(ar)
def reduce(a, b):
pass
self.map_reduce(map, reduce, [expression], delay=delay, name='unique_return_inverse', progress=progress_inverse, info=True, to_numpy=False, selection=selection)
# ordered_set.seal()
# if array_type == 'python':
if data_type_item.is_object:
key_values = hash_map_unique._internal.extract()
keys = list(key_values.keys())
counts = list(key_values.values())
if hash_map_unique.has_nan and not dropnan:
keys = [np.nan] + keys
counts = [hash_map_unique.nan_count] + counts
if hash_map_unique.has_null and not dropmissing:
keys = [None] + keys
counts = [hash_map_unique.null_count] + counts
if dropmissing and None in keys:
# we still can have a None in the values
index = keys.index(None)
keys.pop(index)
counts.pop(index)
counts = np.array(counts)
keys = np.array(keys)
else:
keys = hash_map_unique.keys()
# TODO: we might want to put the dropmissing in .keys(..)
deletes = []
if dropmissing and hash_map_unique.has_null:
deletes.append(hash_map_unique.null_index)
if dropnan and hash_map_unique.has_nan:
deletes.append(hash_map_unique.nan_index)
if isinstance(keys, (vaex.strings.StringList32, vaex.strings.StringList64)):
keys = vaex.strings.to_arrow(keys)
indices = np.delete(np.arange(len(keys)), deletes)
keys = keys.take(indices)
else:
keys = np.delete(keys, deletes)
if not dropmissing and hash_map_unique.has_null:
mask = np.zeros(len(keys), dtype=np.uint8)
mask[hash_map_unique.null_index] = 1
keys = np.ma.array(keys, mask=mask)
if data_type_item == str and isinstance(keys, np.ndarray):
# the np.delete will cast to dtype object
keys = pa.array(keys)
keys = vaex.array_types.convert(keys, array_type)
if return_inverse:
return keys, inverse
else:
return keys
progressbar = vaex.utils.progressbars(progress, title="unique")
hash_map_result = self._hash_map_unique(expression, progress=progressbar, selection=selection, flatten=axis is None, delay=True, limit=limit, limit_raise=limit_raise)
if return_inverse:
progress_inverse = progressbar.add("find inverse")
return self._delay(delay, progressbar.exit_on(process(hash_map_result)))
def bin_edges(self, expression, limits, shape=default_shape):
return self.bins(expression, limits, shape=shape, edges=True)
def bin_centers(self, expression, limits, shape=default_shape):
return self.bins(expression, limits, shape=shape, edges=False)
def bins(self, expression, limits, shape=default_shape, edges=True):
vmin, vmax = limits
if edges:
bins = np.ogrid[limits[0]:limits[1]:(shape + 1) * 1j]
return bins
else:
dx = (limits[1] - limits[0]) / shape
bins = np.ogrid[limits[0]:limits[1] - dx:(shape) * 1j]
return bins + dx / 2
def nearest_bin(self, value, limits, shape):
bins = self.bins('', limits=limits, edges=False, shape=shape)
index = np.argmin(np.abs(bins - value))
return index
def _compute_agg(self, name, expression, binby=[], limits=None, shape=default_shape, selection=False, delay=False, edges=False, progress=None, extra_expressions=None, array_type=None):
logger.debug("aggregate %s(%r, binby=%r, limits=%r)", name, expression, binby, limits)
expression = _ensure_strings_from_expressions(expression)
if extra_expressions:
extra_expressions = _ensure_strings_from_expressions(extra_expressions)
expression_waslist, [expressions, ] = vaex.utils.listify(expression)
# TODO: doesn't seemn needed anymore?
# expressions = [self._column_aliases.get(k, k) for k in expressions]
import traceback
trace = ''.join(traceback.format_stack())
for expression in expressions:
if expression and expression != "*":
self.validate_expression(expression)
if not hasattr(self.local, '_aggregator_nest_count'):
self.local._aggregator_nest_count = 0
if self.local._aggregator_nest_count != 0:
raise RuntimeError("nested aggregator call: \nlast trace:\n%s\ncurrent trace:\n%s" % (self.local.last_trace, trace))
else:
self.local.last_trace = trace
# Instead of 'expression is not None', we would like to have 'not virtual'
# but in agg.py we do some casting, which results in calling .dtype(..) with a non-column
# expression even though all expressions passed here are column references
# virtual = [k for k in expressions if k and k not in self.columns]
if self._future_behaviour != 5 and (self.filtered and expression not in [None, '*']):
# When our dataframe is filtered, and we have expressions, we may end up calling
# df.dtype(..) which in turn may call df.evaluate(..) which in turn needs to have
# the filter cache filled in order to compute the first non-missing row. This last
# item could call df.count() again, leading to nested aggregators, which we do not
# support. df.dtype() needs to call evaluate with filtering enabled since we consider
# it invalid that expressions are evaluate with filtered data. Sklearn for instance may
# give errors when evaluated with NaN's present.
# TODO: GET RID OF THIS
# TODO: temporary disabled
# len(self) # fill caches and masks
pass
progressbar = vaex.utils.progressbars(progress, title=name)
if not isinstance(binby, (list, tuple)) or len(binby) > 0:
progressbar_limits = progressbar.add("binners")
binners = self._create_binners(binby, limits, shape, selection=selection, delay=True, progress=progressbar_limits)
else:
binners = ()
progressbar_agg = progressbar
@delayed
def compute(expression, binners, selection, edges):
binners = tuple(binners)
if not hasattr(self.local, '_aggregator_nest_count'):
self.local._aggregator_nest_count = 0
self.local._aggregator_nest_count += 1
try:
if expression in ["*", None]:
agg = vaex.agg.aggregates[name](selection=selection, edges=edges)
else:
if extra_expressions:
agg = vaex.agg.aggregates[name](expression, *extra_expressions, selection=selection, edges=edges)
else:
agg = vaex.agg.aggregates[name](expression, selection=selection, edges=edges)
tasks, result = agg.add_tasks(self, binners, progress=progressbar)
@delayed
def finish(counts):
return np.asanyarray(counts)
return finish(result)
finally:
self.local._aggregator_nest_count -= 1
@delayed
def finish(binners, *counts):
if array_type == 'xarray':
dims = [binner.expression for binner in binners]
if expression_waslist:
dims = ['expression'] + dims
def to_coord(binner):
if isinstance(binner, BinnerOrdinal):
return self.category_labels(binner.expression)
elif isinstance(binner, BinnerScalar):
return self.bin_centers(binner.expression, [binner.minimum, binner.maximum], binner.count)
coords = [to_coord(binner) for binner in binners]
if expression_waslist:
coords = [expressions] + coords
counts = np.asanyarray(counts)
else:
counts = counts[0]
return xarray.DataArray(counts, dims=dims, coords=coords)
elif array_type == 'list':
return vaex.utils.unlistify(expression_waslist, counts).tolist()
elif array_type in [None, 'numpy']:
def possibly_masked_array(ar):
if isinstance(ar, (list, tuple)):
has_mask = any(np.ma.isMaskedArray(k) for k in ar)
else:
has_mask = np.ma.isMaskedArray(ar)
if has_mask:
return np.ma.array(ar)
else:
return np.asanyarray(ar)
return possibly_masked_array(vaex.utils.unlistify(expression_waslist, counts))
else:
raise RuntimeError(f'Unknown array_type {format}')
stats = [compute(expression, binners, selection=selection, edges=edges) for expression in expressions]
var = finish(binners, *stats)
return self._delay(delay, progressbar.exit_on(var))
[docs]
@docsubst
def count(self, expression=None, binby=[], limits=None, shape=default_shape, selection=False, delay=False, edges=False, progress=None, array_type=None):
"""Count the number of non-NaN values (or all, if expression is None or "*").
Example:
>>> df.count()
330000
>>> df.count("*")
330000.0
>>> df.count("*", binby=["x"], shape=4)
array([ 10925., 155427., 152007., 10748.])
:param expression: Expression or column for which to count non-missing values, or None or '*' for counting the rows
:param binby: {binby}
:param limits: {limits}
:param shape: {shape}
:param selection: {selection}
:param delay: {delay}
:param progress: {progress}
:param edges: {edges}
:param array_type: {array_type}
:return: {return_stat_scalar}
"""
return self._compute_agg('count', expression, binby, limits, shape, selection, delay, edges, progress, array_type=array_type)
@delayed
def _first_calculation(self, expression, order_expression, binby, limits, shape, selection, edges, progressbar):
if shape:
limits, shapes = limits
else:
limits, shapes = limits, shape
task = tasks.TaskStatistic(self, binby, shapes, limits, weights=[expression, order_expression], op=tasks.OP_FIRST, selection=selection, edges=edges)
task = self.executor.schedule(task)
progressbar.add_task(task, "count for %s" % expression)
@delayed
def finish(counts):
counts = np.array(counts)
return counts
return finish(task)
[docs]
@docsubst
def first(self, expression, order_expression=None, binby=[], limits=None, shape=default_shape, selection=False, delay=False, edges=False, progress=None, array_type=None):
"""Return the first element of a binned `expression`, where the values each bin are sorted by `order_expression`.
Example:
>>> import vaex
>>> df = vaex.example()
>>> df.first(df.x, df.y, shape=8)
>>> df.first(df.x, df.y, shape=8, binby=[df.y])
>>> df.first(df.x, df.y, shape=8, binby=[df.y])
array([-4.81883764, 11.65378 , 9.70084476, -7.3025589 , 4.84954977,
8.47446537, -5.73602629, 10.18783 ])
:param expression: {expression}
:param order_expression: Order the values in the bins by this expression.
:param binby: {binby}
:param limits: {limits}
:param shape: {shape}
:param selection: {selection}
:param delay: {delay}
:param progress: {progress}
:param edges: {edges}
:param array_type: {array_type}
:return: Ndarray containing the first elements.
:rtype: numpy.array
"""
return self._compute_agg('first', expression, binby, limits, shape, selection, delay, edges, progress, extra_expressions=[order_expression], array_type=array_type)
logger.debug("count(%r, binby=%r, limits=%r)", expression, binby, limits)
logger.debug("count(%r, binby=%r, limits=%r)", expression, binby, limits)
expression = _ensure_strings_from_expressions(expression)
order_expression = _ensure_string_from_expression(order_expression)
binby = _ensure_strings_from_expressions(binby)
waslist, [expressions,] = vaex.utils.listify(expression)
@delayed
def finish(*counts):
counts = np.asarray(counts)
return vaex.utils.unlistify(waslist, counts)
progressbar = vaex.utils.progressbars(progress)
limits = self.limits(binby, limits, delay=True, shape=shape)
stats = [self._first_calculation(expression, order_expression, binby=binby, limits=limits, shape=shape, selection=selection, edges=edges, progressbar=progressbar) for expression in expressions]
var = finish(*stats)
return self._delay(delay, var)
[docs]
@docsubst
def last(self, expression, order_expression=None, binby=[], limits=None, shape=default_shape, selection=False, delay=False, edges=False, progress=None, array_type=None):
"""Return the last element of a binned `expression`, where the values each bin are sorted by `order_expression`.
:param expression: The value to be placed in the bin.
:param order_expression: Order the values in the bins by this expression.
:param binby: {binby}
:param limits: {limits}
:param shape: {shape}
:param selection: {selection}
:param delay: {delay}
:param progress: {progress}
:param edges: {edges}
:param array_type: {array_type}
:return: Ndarray containing the first elements.
:rtype: numpy.array
"""
return self._compute_agg('last', expression, binby, limits, shape, selection, delay, edges, progress, extra_expressions=[order_expression], array_type=array_type)
[docs]
@docsubst
@stat_1d
def mean(self, expression, binby=[], limits=None, shape=default_shape, selection=False, delay=False, progress=None, edges=False, array_type=None):
"""Calculate the mean for expression, possibly on a grid defined by binby.
Example:
>>> df.mean("x")
-0.067131491264005971
>>> df.mean("(x**2+y**2)**0.5", binby="E", shape=4)
array([ 2.43483742, 4.41840721, 8.26742458, 15.53846476])
:param expression: {expression}
:param binby: {binby}
:param limits: {limits}
:param shape: {shape}
:param selection: {selection}
:param delay: {delay}
:param progress: {progress}
:param array_type: {array_type}
:return: {return_stat_scalar}
"""
return self._compute_agg('mean', expression, binby, limits, shape, selection, delay, edges, progress, array_type=array_type)
logger.debug("mean of %r, with binby=%r, limits=%r, shape=%r, selection=%r, delay=%r", expression, binby, limits, shape, selection, delay)
expression = _ensure_strings_from_expressions(expression)
selection = _ensure_strings_from_expressions(selection)
binby = _ensure_strings_from_expressions(binby)
@delayed
def calculate(expression, limits):
task = tasks.TaskStatistic(self, binby, shape, limits, weight=expression, op=tasks.OP_ADD_WEIGHT_MOMENTS_01, selection=selection)
task = self.executor.schedule(task)
progressbar.add_task(task, "mean for %s" % expression)
return task
@delayed
def finish(*stats_args):
stats = np.array(stats_args)
counts = stats[..., 0]
with np.errstate(divide='ignore', invalid='ignore'):
mean = stats[..., 1] / counts
return vaex.utils.unlistify(waslist, mean)
waslist, [expressions, ] = vaex.utils.listify(expression)
progressbar = vaex.utils.progressbars(progress)
limits = self.limits(binby, limits, delay=True)
stats = [calculate(expression, limits) for expression in expressions]
var = finish(*stats)
return self._delay(delay, var)
@delayed
def _sum_calculation(self, expression, binby, limits, shape, selection, progressbar):
task = tasks.TaskStatistic(self, binby, shape, limits, weight=expression, op=tasks.OP_ADD_WEIGHT_MOMENTS_01, selection=selection)
task = self.executor.schedule(task)
progressbar.add_task(task, "sum for %s" % expression)
@delayed
def finish(sum_grid):
stats = np.array(sum_grid)
return stats[...,1]
return finish(task)
[docs]
@docsubst
@stat_1d
def sum(self, expression, binby=[], limits=None, shape=default_shape, selection=False, delay=False, progress=None, edges=False, array_type=None):
"""Calculate the sum for the given expression, possible on a grid defined by binby
Example:
>>> df.sum("L")
304054882.49378014
>>> df.sum("L", binby="E", shape=4)
array([ 8.83517994e+06, 5.92217598e+07, 9.55218726e+07,
1.40008776e+08])
:param expression: {expression}
:param binby: {binby}
:param limits: {limits}
:param shape: {shape}
:param selection: {selection}
:param delay: {delay}
:param progress: {progress}
:param array_type: {array_type}
:return: {return_stat_scalar}
"""
return self._compute_agg('sum', expression, binby, limits, shape, selection, delay, edges, progress, array_type=array_type)
@delayed
def finish(*sums):
return vaex.utils.unlistify(waslist, sums)
expression = _ensure_strings_from_expressions(expression)
binby = _ensure_strings_from_expressions(binby)
waslist, [expressions, ] = vaex.utils.listify(expression)
progressbar = vaex.utils.progressbars(progress)
limits = self.limits(binby, limits, delay=True)
# stats = [calculate(expression, limits) for expression in expressions]
sums = [self._sum_calculation(expression, binby=binby, limits=limits, shape=shape, selection=selection, progressbar=progressbar) for expression in expressions]
s = finish(*sums)
return self._delay(delay, s)
[docs]
@docsubst
@stat_1d
def std(self, expression, binby=[], limits=None, shape=default_shape, selection=False, delay=False, progress=None, array_type=None):
"""Calculate the standard deviation for the given expression, possible on a grid defined by binby
>>> df.std("vz")
110.31773397535071
>>> df.std("vz", binby=["(x**2+y**2)**0.5"], shape=4)
array([ 123.57954851, 85.35190177, 61.14345748, 38.0740619 ])
:param expression: {expression}
:param binby: {binby}
:param limits: {limits}
:param shape: {shape}
:param selection: {selection}
:param delay: {delay}
:param progress: {progress}
:param array_type: {array_type}
:return: {return_stat_scalar}
"""
@delayed
def finish(var):
return var**0.5
return self._delay(delay, finish(self.var(expression, binby=binby, limits=limits, shape=shape, selection=selection, delay=True, progress=progress)))
[docs]
@docsubst
@stat_1d
def var(self, expression, binby=[], limits=None, shape=default_shape, selection=False, delay=False, progress=None, array_type=None):
"""Calculate the sample variance for the given expression, possible on a grid defined by binby
Example:
>>> df.var("vz")
12170.002429456246
>>> df.var("vz", binby=["(x**2+y**2)**0.5"], shape=4)
array([ 15271.90481083, 7284.94713504, 3738.52239232, 1449.63418988])
>>> df.var("vz", binby=["(x**2+y**2)**0.5"], shape=4)**0.5
array([ 123.57954851, 85.35190177, 61.14345748, 38.0740619 ])
:param expression: {expression}
:param binby: {binby}
:param limits: {limits}
:param shape: {shape}
:param selection: {selection}
:param delay: {delay}
:param progress: {progress}
:param array_type: {array_type}
:return: {return_stat_scalar}
"""
edges = False
return self._compute_agg('var', expression, binby, limits, shape, selection, delay, edges, progress, array_type=array_type)
[docs]
@docsubst
def skew(self, expression, binby=[], limits=None, shape=default_shape, selection=False, delay=False, progress=None, edges=False, array_type=None):
'''
Calculate the skew for the given expression, possible on a grid defined by binby.
Example:
>>> df.skew("vz")
0.02116528
>>> df.skew("vz", binby=["E"], shape=4)
array([-0.069976 , -0.01003445, 0.05624177, -2.2444322 ])
:param expression: {expression}
:param binby: {binby}
:param limits: {limits}
:param shape: {shape}
:param selection: {selection}
:param delay: {delay}
:param progress: {progress}
:param array_type: {array_type}
:return: {return_stat_scalar}
'''
edges=False
return self._compute_agg('skew', expression, binby, limits, shape, selection, delay, edges, progress, array_type=array_type)
[docs]
@docsubst
def kurtosis(self, expression, binby=[], limits=None, shape=default_shape, selection=False, delay=False, progress=None, edges=False, array_type=None):
'''
Calculate the kurtosis for the given expression, possible on a grid defined by binby.
Example:
>>> df.kurtosis('vz')
0.33414303
>>> df.kurtosis("vz", binby=["E"], shape=4)
array([0.35286113, 0.14455428, 0.52955107, 5.06716345])
:param expression: {expression}
:param binby: {binby}
:param limits: {limits}
:param shape: {shape}
:param selection: {selection}
:param delay: {delay}
:param progress: {progress}
:param array_type: {array_type}
:return: {return_stat_scalar}
'''
edges=False
return self._compute_agg('kurtosis', expression, binby, limits, shape, selection, delay, edges, progress, array_type=array_type)
[docs]
@docsubst
def covar(self, x, y, binby=[], limits=None, shape=default_shape, selection=False, delay=False, progress=None):
"""Calculate the covariance cov[x,y] between x and y, possibly on a grid defined by binby.
Example:
>>> df.covar("x**2+y**2+z**2", "-log(-E+1)")
array(52.69461456005138)
>>> df.covar("x**2+y**2+z**2", "-log(-E+1)")/(df.std("x**2+y**2+z**2") * df.std("-log(-E+1)"))
0.63666373822156686
>>> df.covar("x**2+y**2+z**2", "-log(-E+1)", binby="Lz", shape=4)
array([ 10.17387143, 51.94954078, 51.24902796, 20.2163929 ])
:param x: {expression}
:param y: {expression}
:param binby: {binby}
:param limits: {limits}
:param shape: {shape}
:param selection: {selection}
:param delay: {delay}
:param progress: {progress}
:return: {return_stat_scalar}
"""
@delayed
def cov(mean_x, mean_y, mean_xy):
return mean_xy - mean_x * mean_y
waslist, [xlist, ylist] = vaex.utils.listify(x, y)
# print("limits", limits)
limits = self.limits(binby, limits, selection=selection, delay=True)
# print("limits", limits)
@delayed
def calculate(limits):
results = []
for x, y in zip(xlist, ylist):
mx = self.mean(x, binby=binby, limits=limits, shape=shape, selection=selection, delay=True, progress=progressbar)
my = self.mean(y, binby=binby, limits=limits, shape=shape, selection=selection, delay=True, progress=progressbar)
cxy = self.mean("(%s)*(%s)" % (x, y), binby=binby, limits=limits, shape=shape, selection=selection,
delay=True, progress=progressbar)
results.append(cov(mx, my, cxy))
return results
progressbar = vaex.utils.progressbars(progress, title="covar")
covars = calculate(limits)
@delayed
def finish(covars):
value = np.array(vaex.utils.unlistify(waslist, covars))
return value
return self._delay(delay, finish(delayed_list(covars)))
[docs]
@docsubst
def correlation(self, x, y=None, binby=[], limits=None, shape=default_shape, sort=False, sort_key=np.abs, selection=False, delay=False, progress=None, array_type=None):
"""Calculate the correlation coefficient cov[x,y]/(std[x]*std[y]) between x and y, possibly on a grid defined by binby.
The `x` and `y` arguments can be single expressions of lists of expressions.
- If `x` and `y` are single expression, it computes the correlation between `x` and `y`;
- If `x` is a list of expressions and `y` is a single expression, it computes the correlation between each expression in `x` and the expression in `y`;
- If `x` is a list of expressions and `y` is None, it computes the correlation matrix amongst all expressions in `x`;
- If `x` is a list of tuples of length 2, it computes the correlation for the specified dimension pairs;
- If `x` and `y` are lists of expressions, it computes the correlation matrix defined by the two expression lists.
Example:
>>> import vaex
>>> df = vaex.example()
>>> df.correlation("x**2+y**2+z**2", "-log(-E+1)")
array(0.6366637382215669)
>>> df.correlation("x**2+y**2+z**2", "-log(-E+1)", binby="Lz", shape=4)
array([ 0.40594394, 0.69868851, 0.61394099, 0.65266318])
>>> df.correlation(x=['x', 'y', 'z'])
array([[ 1. , -0.06668907, -0.02709719],
[-0.06668907, 1. , 0.03450365],
[-0.02709719, 0.03450365, 1. ]])
>>> df.correlation(x=['x', 'y', 'z'], y=['E', 'Lz'])
array([[-0.01116315, -0.00369268],
[-0.0059848 , 0.02472491],
[ 0.01428211, -0.05900035]])
:param x: {expression}
:param y: {expression}
:param binby: {binby}
:param limits: {limits}
:param shape: {shape}
:param selection: {selection}
:param delay: {delay}
:param progress: {progress}
:return: {return_stat_scalar}
"""
selection = _normalize_selection(selection)
progressbar = vaex.utils.progressbars(progress, title="correlation")
if y is None:
if not _issequence(x):
raise ValueError("if y not given, x is expected to be a list or tuple, not %r" % x)
if all([_issequence(k) and len(k) == 2 for k in x]):
values = []
pairs = x
x = []
y = []
for col1, col2 in pairs:
x.append(col1)
y.append(col2)
values.append(self.correlation(col1, col2, delay=True, progress=progressbar))
@vaex.delayed
def finish(values):
return vaex.from_arrays(x=x, y=y, correlation=values)
result = finish(values)
else:
result = self._correlation_matrix(x, binby=binby, limits=limits, shape=shape, selection=selection, delay=True, progress=progressbar, array_type=array_type)
elif _issequence(x) and _issequence(y):
result = delayed(np.array)([[self.correlation(x_, y_, binby=binby, limits=limits, shape=shape, selection=selection, delay=True, progress=progressbar) for y_ in y] for x_ in x])
elif _issequence(x):
combinations = [(k, y) for k in x]
result = delayed(np.array)([self.correlation(x_, y, binby=binby, limits=limits, shape=shape, selection=selection, delay=True, progress=progressbar)for x_ in x])
elif _issequence(y):
combinations = [(x, k) for k in y]
result = self.correlation(combinations, binby=binby, limits=limits, shape=shape, selection=selection, delay=True, progress=progressbar)
else:
@vaex.delayed
def finish(matrix):
return matrix[...,0,1]
matrix = self._correlation_matrix([x, y], binby=binby, limits=limits, shape=shape, selection=selection, delay=True, progress=progressbar)
result = finish(matrix)
return self._delay(delay, result)
@docsubst
def _correlation_matrix(self, column_names=None, binby=[], limits=None, shape=default_shape, selection=False, delay=False, progress=None, array_type=None):
if column_names is None:
column_names = self.get_column_names()
@delayed
def normalize(cov_matrix):
norm = cov_matrix[:]
diag = np.diagonal(cov_matrix, axis1=-2, axis2=-1)
# generalized outer product
norm = (diag[...,np.newaxis,:] * diag[...,np.newaxis]) ** 0.5
# norm = np.outer(diag, diag)**0.5
return cov_matrix/norm
result = normalize(self.cov(column_names, binby=binby, limits=limits, shape=shape, selection=selection, delay=True, progress=progress))
@vaex.delayed
def finish(array):
if array_type == 'xarray':
dims = binby + ['x', 'y']
coords = [column_names, column_names]
return xarray.DataArray(array, dims=dims, coords=coords)
else:
return vaex.array_types.convert(array, array_type)
return self._delay(delay, finish(result))
[docs]
@docsubst
def cov(self, x, y=None, binby=[], limits=None, shape=default_shape, selection=False, delay=False, progress=None):
"""Calculate the covariance matrix for x and y or more expressions, possibly on a grid defined by binby.
Either x and y are expressions, e.g.:
>>> df.cov("x", "y")
Or only the x argument is given with a list of expressions, e.g.:
>>> df.cov(["x, "y, "z"])
Example:
>>> df.cov("x", "y")
array([[ 53.54521742, -3.8123135 ],
[ -3.8123135 , 60.62257881]])
>>> df.cov(["x", "y", "z"])
array([[ 53.54521742, -3.8123135 , -0.98260511],
[ -3.8123135 , 60.62257881, 1.21381057],
[ -0.98260511, 1.21381057, 25.55517638]])
>>> df.cov("x", "y", binby="E", shape=2)
array([[[ 9.74852878e+00, -3.02004780e-02],
[ -3.02004780e-02, 9.99288215e+00]],
[[ 8.43996546e+01, -6.51984181e+00],
[ -6.51984181e+00, 9.68938284e+01]]])
:param x: {expression}
:param y: {expression_single}
:param binby: {binby}
:param limits: {limits}
:param shape: {shape}
:param selection: {selection}
:param delay: {delay}
:return: {return_stat_scalar}, the last dimensions are of shape (2,2)
"""
selection = _ensure_strings_from_expressions(selection)
selection = _normalize_selection(selection)
if y is None:
if not _issequence(x):
raise ValueError("if y argument is not given, x is expected to be sequence, not %r", x)
expressions = x
else:
expressions = [x, y]
expressions = _ensure_strings_from_expressions(expressions)
N = len(expressions)
binby = _ensure_list(binby)
shape = _expand_shape(shape, len(binby))
limits = self.limits(binby, limits, selection=selection, delay=True)
@delayed
def calculate(expressions, limits):
# print('limits', limits)
task = tasks.TaskStatistic(self, binby, shape, limits, weights=expressions, op=tasks.OP_COV, selection=selection)
task = self.executor.schedule(task)
progressbar.add_task(task, "covariance values for %r" % expressions)
return task
@delayed
def finish(values):
N = len(expressions)
counts = values[..., :N]
sums = values[..., N:2 * N]
with np.errstate(divide='ignore', invalid='ignore'):
means = sums / counts
# matrix of means * means.T
meansxy = means[..., None] * means[..., None, :]
counts = values[..., 2 * N:2 * N + N**2]
sums = values[..., 2 * N + N**2:]
shape = counts.shape[:-1] + (N, N)
counts = counts.reshape(shape)
sums = sums.reshape(shape)
with np.errstate(divide='ignore', invalid='ignore'):
moments2 = sums / counts
cov_matrix = moments2 - meansxy
return cov_matrix
progressbar = vaex.utils.progressbars(progress, title="cov")
values = calculate(expressions, limits)
cov_matrix = finish(values)
return self._delay(delay, cov_matrix)
[docs]
@docsubst
@stat_1d
def minmax(self, expression, binby=[], limits=None, shape=default_shape, selection=False, delay=False, progress=None):
"""Calculate the minimum and maximum for expressions, possibly on a grid defined by binby.
Example:
>>> df.minmax("x")
array([-128.293991, 271.365997])
>>> df.minmax(["x", "y"])
array([[-128.293991 , 271.365997 ],
[ -71.5523682, 146.465836 ]])
>>> df.minmax("x", binby="x", shape=5, limits=[-10, 10])
array([[-9.99919128, -6.00010443],
[-5.99972439, -2.00002384],
[-1.99991322, 1.99998057],
[ 2.0000093 , 5.99983597],
[ 6.0004878 , 9.99984646]])
:param expression: {expression}
:param binby: {binby}
:param limits: {limits}
:param shape: {shape}
:param selection: {selection}
:param delay: {delay}
:param progress: {progress}
:return: {return_stat_scalar}, the last dimension is of shape (2)
"""
# vmin = self._compute_agg('min', expression, binby, limits, shape, selection, delay, edges, progress)
# vmax = self._compute_agg('max', expression, binby, limits, shape, selection, delay, edges, progress)
selection = _ensure_strings_from_expressions(selection)
selection = _normalize_selection(selection)
@delayed
def calculate(expression, limits):
task = tasks.TaskStatistic(self, binby, shape, limits, weight=expression, op=tasks.OP_MIN_MAX, selection=selection)
task = self.executor.schedule(task)
progressbar.add_task(task, "minmax for %s" % expression)
return task
@delayed
def finish(*minmax_list):
value = vaex.utils.unlistify(waslist, np.array(minmax_list))
value = vaex.array_types.to_numpy(value)
value = value.astype(data_type0.numpy)
return value
expression = _ensure_strings_from_expressions(expression)
binby = _ensure_strings_from_expressions(binby)
waslist, [expressions, ] = vaex.utils.listify(expression)
column_names = self.get_column_names(hidden=True)
expressions = [vaex.utils.valid_expression(column_names, k) for k in expressions]
data_types = [self.data_type(expr) for expr in expressions]
data_type0 = data_types[0]
# special case that we supported mixed endianness for ndarrays
all_same_kind = all(isinstance(data_type.internal, np.dtype) for data_type in data_types) and all([k.kind == data_type0.kind for k in data_types])
if not (all_same_kind or all([k == data_type0 for k in data_types])):
raise TypeError("cannot mix different dtypes in 1 minmax call")
progressbar = vaex.utils.progressbars(progress, title="minmaxes")
limits = self.limits(binby, limits, selection=selection, delay=True)
all_tasks = [calculate(expression, limits) for expression in expressions]
result = finish(*all_tasks)
return self._delay(delay, result)
[docs]
@docsubst
@stat_1d
def min(self, expression, binby=[], limits=None, shape=default_shape, selection=False, delay=False, progress=None, edges=False, array_type=None):
"""Calculate the minimum for given expressions, possibly on a grid defined by binby.
Example:
>>> df.min("x")
array(-128.293991)
>>> df.min(["x", "y"])
array([-128.293991 , -71.5523682])
>>> df.min("x", binby="x", shape=5, limits=[-10, 10])
array([-9.99919128, -5.99972439, -1.99991322, 2.0000093 , 6.0004878 ])
:param expression: {expression}
:param binby: {binby}
:param limits: {limits}
:param shape: {shape}
:param selection: {selection}
:param delay: {delay}
:param progress: {progress}
:param array_type: {array_type}
:return: {return_stat_scalar}, the last dimension is of shape (2)
"""
return self._compute_agg('min', expression, binby, limits, shape, selection, delay, edges, progress, array_type=array_type)
@delayed
def finish(result):
return result[..., 0]
return self._delay(delay, finish(self.minmax(expression, binby=binby, limits=limits, shape=shape, selection=selection, delay=delay, progress=progress)))
[docs]
@docsubst
@stat_1d
def max(self, expression, binby=[], limits=None, shape=default_shape, selection=False, delay=False, progress=None, edges=False, array_type=None):
"""Calculate the maximum for given expressions, possibly on a grid defined by binby.
Example:
>>> df.max("x")
array(271.365997)
>>> df.max(["x", "y"])
array([ 271.365997, 146.465836])
>>> df.max("x", binby="x", shape=5, limits=[-10, 10])
array([-6.00010443, -2.00002384, 1.99998057, 5.99983597, 9.99984646])
:param expression: {expression}
:param binby: {binby}
:param limits: {limits}
:param shape: {shape}
:param selection: {selection}
:param delay: {delay}
:param progress: {progress}
:param array_type: {array_type}
:return: {return_stat_scalar}, the last dimension is of shape (2)
"""
return self._compute_agg('max', expression, binby, limits, shape, selection, delay, edges, progress, array_type=array_type)
@delayed
def finish(result):
return result[..., 1]
return self._delay(delay, finish(self.minmax(expression, binby=binby, limits=limits, shape=shape, selection=selection, delay=delay, progress=progress)))
[docs]
@docsubst
def percentile_approx(self, expression, percentage=50., binby=[], limits=None, shape=default_shape, percentile_shape=1024, percentile_limits="minmax", selection=False, delay=False, progress=None):
"""Calculate the percentile given by percentage, possibly on a grid defined by binby.
NOTE: this value is approximated by calculating the cumulative distribution on a grid defined by
percentile_shape and percentile_limits.
Example:
>>> df.percentile_approx("x", 10), df.percentile_approx("x", 90)
(array([-8.3220355]), array([ 7.92080358]))
>>> df.percentile_approx("x", 50, binby="x", shape=5, limits=[-10, 10])
array([[-7.56462982],
[-3.61036641],
[-0.01296306],
[ 3.56697863],
[ 7.45838367]])
:param expression: {expression}
:param binby: {binby}
:param limits: {limits}
:param shape: {shape}
:param percentile_limits: {percentile_limits}
:param percentile_shape: {percentile_shape}
:param selection: {selection}
:param delay: {delay}
:param progress: {progress}
:return: {return_stat_scalar}
"""
waslist, [expressions, ] = vaex.utils.listify(expression)
if not isinstance(binby, (tuple, list)):
binby = [binby]
else:
binby = binby
@delayed
def calculate(expression, shape, limits):
# task = TaskStatistic(self, [expression] + binby, shape, limits, op=OP_ADD1, selection=selection)
# self.executor.schedule(task)
# return task
return self.count(binby=list(binby) + [expression], shape=shape, limits=limits, selection=selection, delay=True, edges=True, progress=progress)
@delayed
def finish(percentile_limits, counts_list):
results = []
for i, counts in enumerate(counts_list):
counts = counts.astype(np.float64)
# remove the nan and boundary edges from the first dimension,
nonnans = list([slice(2, -1, None) for k in range(len(counts.shape) - 1)])
nonnans.append(slice(1, None, None)) # we're gonna get rid only of the nan's, and keep the overflow edges
nonnans = tuple(nonnans)
cumulative_grid = np.cumsum(counts.__getitem__(nonnans), -1) # convert to cumulative grid
totalcounts = np.sum(counts.__getitem__(nonnans), -1)
empty = totalcounts == 0
original_shape = counts.shape
shape = cumulative_grid.shape # + (original_shape[-1] - 1,) #
counts = np.sum(counts, -1)
edges_floor = np.zeros(shape[:-1] + (2,), dtype=np.int64)
edges_ceil = np.zeros(shape[:-1] + (2,), dtype=np.int64)
# if we have an off # of elements, say, N=3, the center is at i=1=(N-1)/2
# if we have an even # of elements, say, N=4, the center is between i=1=(N-2)/2 and i=2=(N/2)
# index = (shape[-1] -1-3) * percentage/100. # the -3 is for the edges
waslist_percentage, [percentages, ] = vaex.utils.listify(percentage)
percentiles = []
for p in percentages:
if p == 0:
percentiles.append(percentile_limits[i][0])
continue
if p == 100:
percentiles.append(percentile_limits[i][1])
continue
values = np.array((totalcounts + 1) * p / 100.) # make sure it's an ndarray
values[empty] = 0
floor_values = np.array(np.floor(values))
ceil_values = np.array(np.ceil(values))
vaex.vaexfast.grid_find_edges(cumulative_grid, floor_values, edges_floor)
vaex.vaexfast.grid_find_edges(cumulative_grid, ceil_values, edges_ceil)
def index_choose(a, indices):
# alternative to np.choise, which doesn't like the last dim to be >= 32
# print(a, indices)
out = np.zeros(a.shape[:-1])
# print(out.shape)
for i in np.ndindex(out.shape):
# print(i, indices[i])
out[i] = a[i + (indices[i],)]
return out
def calculate_x(edges, values):
left, right = edges[..., 0], edges[..., 1]
left_value = index_choose(cumulative_grid, left)
right_value = index_choose(cumulative_grid, right)
with np.errstate(divide='ignore', invalid='ignore'):
u = np.array((values - left_value) / (right_value - left_value))
# TODO: should it really be -3? not -2
xleft, xright = percentile_limits[i][0] + (left - 0.5) * (percentile_limits[i][1] - percentile_limits[i][0]) / (shape[-1] - 3),\
percentile_limits[i][0] + (right - 0.5) * (percentile_limits[i][1] - percentile_limits[i][0]) / (shape[-1] - 3)
x = xleft + (xright - xleft) * u # /2
return x
x1 = calculate_x(edges_floor, floor_values)
x2 = calculate_x(edges_ceil, ceil_values)
u = values - floor_values
x = x1 + (x2 - x1) * u
percentiles.append(x)
percentile = vaex.utils.unlistify(waslist_percentage, np.array(percentiles))
results.append(percentile)
return results
shape = _expand_shape(shape, len(binby))
percentile_shapes = _expand_shape(percentile_shape, len(expressions))
if percentile_limits:
percentile_limits = _expand_limits(percentile_limits, len(expressions))
limits = self.limits(binby, limits, selection=selection, delay=True)
percentile_limits = self.limits(expressions, percentile_limits, selection=selection, delay=True)
@delayed
def calculation(limits, percentile_limits):
# print(">>>", expressions, percentile_limits)
# print(percentile_limits[0], list(percentile_limits[0]))
# print(list(np.array(limits).tolist()) + list(percentile_limits[0]))
# print("limits", limits, expressions, percentile_limits, ">>", list(limits) + [list(percentile_limits[0]))
tasks = [calculate(expression, tuple(shape) + (percentile_shape, ), list(limits) + [list(percentile_limit)])
for percentile_shape, percentile_limit, expression
in zip(percentile_shapes, percentile_limits, expressions)]
return finish(percentile_limits, delayed_args(*tasks))
# return tasks
result = calculation(limits, percentile_limits)
@delayed
def finish2(grid):
value = vaex.utils.unlistify(waslist, np.array(grid))
return value
return self._delay(delay, finish2(result))
def _use_delay(self, delay):
return delay == True
def _delay(self, delay, task, progressbar=False):
if task.isRejected:
task.get()
if delay:
return task
else:
self.execute()
return task.get()
[docs]
@docsubst
def limits_percentage(self, expression, percentage=99.73, square=False, selection=False, progress=None, delay=False):
"""Calculate the [min, max] range for expression, containing approximately a percentage of the data as defined
by percentage.
The range is symmetric around the median, i.e., for a percentage of 90, this gives the same results as:
Example:
>>> df.limits_percentage("x", 90)
array([-12.35081376, 12.14858052]
>>> df.percentile_approx("x", 5), df.percentile_approx("x", 95)
(array([-12.36813152]), array([ 12.13275818]))
NOTE: this value is approximated by calculating the cumulative distribution on a grid.
NOTE 2: The values above are not exactly the same, since percentile and limits_percentage do not share the same code
:param expression: {expression_limits}
:param float percentage: Value between 0 and 100
:param progress: {progress}
:param delay: {delay}
:return: {return_limits}
"""
logger.info("limits_percentage for %r, with percentage=%r", expression, percentage)
progressbar = vaex.utils.progressbars(progress, title="limits_percentage")
waslist, [expressions, ] = vaex.utils.listify(expression)
limits = []
for expr in expressions:
@delayed
def compute(limits_minmax, expr=expr):
@delayed
def compute_limits(counts):
cumcounts = np.concatenate([[0], np.cumsum(counts)])
cumcounts = cumcounts / cumcounts.max()
# TODO: this is crude.. see the details!
f = (1 - percentage / 100.) / 2
x = np.linspace(vmin, vmax, size + 1)
l = np.interp([f, 1 - f], cumcounts, x)
return l
vmin, vmax = limits_minmax
size = 1024 * 16
counts = self.count(binby=expr, shape=size, limits=limits_minmax, selection=selection, progress=progressbar, delay=delay)
return compute_limits(counts)
# limits.append(l)
limits_minmax = self.minmax(expr, selection=selection, delay=delay)
limits1 = compute(limits_minmax=limits_minmax)
limits.append(limits1)
return self._delay(delay, progressbar.exit_on(delayed(vaex.utils.unlistify)(waslist, limits)))
[docs]
@docsubst
def limits(self, expression, value=None, square=False, selection=None, delay=False, progress=None, shape=None):
"""Calculate the [min, max] range for expression, as described by value, which is 'minmax' by default.
If value is a list of the form [minvalue, maxvalue], it is simply returned, this is for convenience when using mixed
forms.
Example:
>>> import vaex
>>> df = vaex.example()
>>> df.limits("x")
array([-128.293991, 271.365997])
>>> df.limits("x", "99.7%")
array([-28.86381927, 28.9261226 ])
>>> df.limits(["x", "y"])
(array([-128.293991, 271.365997]), array([ -71.5523682, 146.465836 ]))
>>> df.limits(["x", "y"], "99.7%")
(array([-28.86381927, 28.9261226 ]), array([-28.60476934, 28.96535249]))
>>> df.limits(["x", "y"], ["minmax", "90%"])
(array([-128.293991, 271.365997]), array([-13.37438402, 13.4224423 ]))
>>> df.limits(["x", "y"], ["minmax", [0, 10]])
(array([-128.293991, 271.365997]), [0, 10])
:param expression: {expression_limits}
:param value: {limits}
:param selection: {selection}
:param delay: {delay}
:param progress: {progress}
:return: {return_limits}
"""
if expression == []:
return [] if shape is None else ([], [])
waslist, [expressions, ] = vaex.utils.listify(expression)
expressions = _ensure_strings_from_expressions(expressions)
selection = _ensure_strings_from_expressions(selection)
if value is None:
value = "minmax"
if _is_limit(value) or not _issequence(value):
values = (value,) * len(expressions)
else:
values = value
# we cannot hash arrow arrays
values = [vaex.array_types.to_numpy(k) if isinstance(k, vaex.array_types.supported_arrow_array_types) else k for k in values]
progressbar = vaex.utils.progressbars(progress, title="limits")
initial_expressions, initial_values = expressions, values
expression_values = dict()
expression_shapes = dict()
for i, (expression, value) in enumerate(zip(expressions, values)):
if _issequence(expression):
expressions = expression
nested = True
else:
expressions = [expression]
nested = False
if _is_limit(value) or not _issequence(value):
values = (value,) * len(expressions)
else:
values = value
for j, (expression, value) in enumerate(zip(expressions, values)):
if shape is not None:
if _issequence(shape):
shapes = shape
else:
shapes = (shape, ) * (len(expressions) if nested else len(initial_expressions))
shape_index = j if nested else i
if not _is_limit(value):
expression_values[(expression, value)] = None
if self.is_category(expression):
N = self._categories[_ensure_string_from_expression(expression)]['N']
expression_shapes[expression] = min(N, shapes[shape_index] if shape is not None else default_shape)
else:
expression_shapes[expression] = shapes[shape_index] if shape is not None else default_shape
limits_list = []
for expression, value in expression_values.keys():
if self.is_category(expression):
N = self._categories[_ensure_string_from_expression(expression)]['N']
limits = [-0.5, N-0.5]
else:
if isinstance(value, six.string_types):
if value == "minmax":
limits = self.minmax(expression, selection=selection, progress=progressbar, delay=True)
else:
match = re.match(r"([\d.]*)(\D*)", value)
if match is None:
raise ValueError("do not understand limit specifier %r, examples are 90%, 3sigma")
else:
number, type = match.groups()
import ast
number = ast.literal_eval(number)
type = type.strip()
if type in ["s", "sigma"]:
limits = self.limits_sigma(number)
elif type in ["ss", "sigmasquare"]:
limits = self.limits_sigma(number, square=True)
elif type in ["%", "percent"]:
limits = self.limits_percentage(expression, number, selection=selection, delay=True, progress=progressbar)
elif type in ["%s", "%square", "percentsquare"]:
limits = self.limits_percentage(expression, number, selection=selection, square=True, delay=True)
elif value is None:
limits = self.minmax(expression, selection=selection, delay=True)
else:
limits = value
limits_list.append(limits)
if limits is None:
raise ValueError("limit %r not understood" % value)
expression_values[(expression, value)] = limits
limits_list = delayed_args(*limits_list)
@delayed
def finish(limits_list):
# print("##### 2)", expression_values.keys())
limits_outer = []
shapes_list = []
for expression, value in zip(initial_expressions, initial_values):
if _issequence(expression):
expressions = expression
waslist2 = True
else:
expressions = [expression]
waslist2 = False
if _is_limit(value) or not _issequence(value):
values = (value,) * len(expressions)
else:
values = value
# print("expressions 3)", expressions)
# print("values 3)", values)
limits = []
shapes = []
for expression, value in zip(expressions, values):
if not _is_limit(value):
value = expression_values[(expression, value)]
if not _is_limit(value):
# print(">>> value", value)
value = value.get()
limits.append(value)
shapes.append(expression_shapes[expression])
# if not _is_limit(value): # if a
# #value = tuple(value) # list is not hashable
# expression_values[(expression, value)] = expression_values[(expression, value)].get()
# else:
# #value = tuple(value) # list is not hashable
# expression_values[(expression, value)] = ()
if waslist2:
limits_outer.append(limits)
shapes_list.append(shapes)
else:
limits_outer.append(limits[0])
shapes_list.append(shapes[0])
# logger.debug(">>>>>>>> complete list of limits: %r %r", limits_list, np.array(limits_list).shape)
# print("limits", limits_outer)
if shape:
return vaex.utils.unlistify(waslist, limits_outer), vaex.utils.unlistify(waslist, shapes_list)
else:
return vaex.utils.unlistify(waslist, limits_outer)
return self._delay(delay, progressbar.exit_on(finish(limits_list)))
[docs]
def mode(self, expression, binby=[], limits=None, shape=256, mode_shape=64, mode_limits=None, progressbar=False, selection=None):
"""Calculate/estimate the mode."""
if len(binby) == 0:
raise ValueError("only supported with binby argument given")
else:
# todo, fix progressbar into two...
try:
len(shape)
shape = tuple(shape)
except:
shape = len(binby) * (shape,)
shape = (mode_shape,) + shape
subspace = self(*(list(binby) + [expression]))
if selection:
subspace = subspace.selected()
limits = self.limits(list(binby), limits)
mode_limits = self.limits([expression], mode_limits)
limits = list(limits) + list(mode_limits)
counts = subspace.histogram(limits=limits, size=shape, progressbar=progressbar)
indices = np.argmax(counts, axis=0)
pmin, pmax = limits[-1]
centers = np.linspace(pmin, pmax, mode_shape + 1)[:-1] # ignore last bin
centers += (centers[1] - centers[0]) / 2 # and move half a bin to the right
modes = centers[indices]
ok = counts.sum(axis=0) > 0
modes[~ok] = np.nan
return modes
[docs]
@vaex.utils.deprecated('use plot_widget')
def plot_bq(self, x, y, grid=None, shape=256, limits=None, what="count(*)", figsize=None,
f="identity", figure_key=None, fig=None, axes=None, xlabel=None, ylabel=None, title=None,
show=True, selection=[None, True], colormap="afmhot", grid_limits=None, normalize="normalize",
grid_before=None,
what_kwargs={}, type="default",
scales=None, tool_select=False, bq_cleanup=True,
**kwargs):
import vaex.ext.bqplot
cls = vaex.ext.bqplot.get_class(type)
plot2d = cls(df=self, x=x, y=y, grid=grid, shape=shape, limits=limits, what=what,
f=f, figure_key=figure_key, fig=fig,
selection=selection, grid_before=grid_before,
grid_limits=grid_limits, normalize=normalize, colormap=colormap, what_kwargs=what_kwargs, **kwargs)
if show:
plot2d.show()
return plot2d
# @_hidden
[docs]
def healpix_count(self, expression=None, healpix_expression=None, healpix_max_level=12, healpix_level=8, binby=None, limits=None, shape=default_shape, delay=False, progress=None, selection=None):
"""Count non missing value for expression on an array which represents healpix data.
:param expression: Expression or column for which to count non-missing values, or None or '*' for counting the rows
:param healpix_expression: {healpix_max_level}
:param healpix_max_level: {healpix_max_level}
:param healpix_level: {healpix_level}
:param binby: {binby}, these dimension follow the first healpix dimension.
:param limits: {limits}
:param shape: {shape}
:param selection: {selection}
:param delay: {delay}
:param progress: {progress}
:return:
"""
# if binby is None:
import healpy as hp
if healpix_expression is None:
if self.ucds.get("source_id", None) == 'meta.id;meta.main': # we now assume we have gaia data
healpix_expression = "source_id/34359738368"
if healpix_expression is None:
raise ValueError("no healpix_expression given, and was unable to guess")
reduce_level = healpix_max_level - healpix_level
NSIDE = 2**healpix_level
nmax = hp.nside2npix(NSIDE)
scaling = 4**reduce_level
expr = "%s/%s" % (healpix_expression, scaling)
binby = [expr] + ([] if binby is None else _ensure_list(binby))
shape = (nmax,) + _expand_shape(shape, len(binby) - 1)
epsilon = 1. / scaling / 2
limits = [[-epsilon, nmax - epsilon]] + ([] if limits is None else limits)
return self.count(expression, binby=binby, limits=limits, shape=shape, delay=delay, progress=progress, selection=selection)
@docsubst
@stat_1d
def _stat(self, what="count(*)", what_kwargs={}, binby=[], limits=None, shape=default_shape, selection=False, delay=False, progress=None):
waslist_what, [whats, ] = vaex.utils.listify(what)
limits = self.limits(binby, limits, delay=True)
waslist_selection, [selections] = vaex.utils.listify(selection)
binby = _ensure_list(binby)
what_labels = []
shape = _expand_shape(shape, len(binby))
total_grid = np.zeros((len(whats), len(selections)) + shape, dtype=float)
@delayed
def copy_grids(grids):
total_grid[index] = grid
@delayed
def get_whats(limits):
grids = []
for j, what in enumerate(whats):
what = what.strip()
index = what.index("(")
groups = re.match(r"(.*)\((.*)\)", what).groups()
if groups and len(groups) == 2:
function = groups[0]
arguments = groups[1].strip()
if "," in arguments:
arguments = arguments.split(",")
functions = ["mean", "sum", "std", "var", "correlation", "covar", "min", "max"]
unit_expression = None
if function in ["mean", "sum", "std", "min", "max"]:
unit_expression = arguments
if function in ["var"]:
unit_expression = "(%s) * (%s)" % (arguments, arguments)
if function in ["covar"]:
unit_expression = "(%s) * (%s)" % arguments
if unit_expression:
unit = self.unit(unit_expression)
if unit:
what_units = unit.to_string('latex_inline')
if function in functions:
grid = getattr(self, function)(arguments, binby=binby, limits=limits, shape=shape,
selection=selections, progress=progress, delay=delay)
elif function == "count":
grid = self.count(arguments, binby, shape=shape, limits=limits, selection=selections,
progress=progress, delay=delay)
else:
raise ValueError("Could not understand method: %s, expected one of %r'" % (function, functions))
# what_labels.append(what_label)
grids.append(grid)
# else:
# raise ValueError("Could not understand 'what' argument %r, expected something in form: 'count(*)', 'mean(x)'" % what)
return grids
grids = get_whats(limits)
# print grids
# grids = delayed_args(*grids)
@delayed
def finish(grids):
for i, grid in enumerate(grids):
total_grid[i] = grid
return total_grid[slice(None, None, None) if waslist_what else 0, slice(None, None, None) if waslist_selection else 0]
s = finish(delayed_list(grids))
return self._delay(delay, s)
plot = _requires('viz')
plot1d = _requires('viz')
scatter = _requires('viz')
[docs]
def plot3d(self, x, y, z, vx=None, vy=None, vz=None, vwhat=None, limits=None, grid=None, what="count(*)", shape=128, selection=[None, True], f=None,
vcount_limits=None,
smooth_pre=None, smooth_post=None, grid_limits=None, normalize="normalize", colormap="afmhot",
figure_key=None, fig=None,
lighting=True, level=[0.1, 0.5, 0.9], opacity=[0.01, 0.05, 0.1], level_width=0.1,
show=True, **kwargs):
"""Use at own risk, requires ipyvolume"""
import vaex.ext.ipyvolume
# vaex.ext.ipyvolume.
cls = vaex.ext.ipyvolume.PlotDefault
plot3d = cls(df=self, x=x, y=y, z=z, vx=vx, vy=vy, vz=vz,
grid=grid, shape=shape, limits=limits, what=what,
f=f, figure_key=figure_key, fig=fig,
selection=selection, smooth_pre=smooth_pre, smooth_post=smooth_post,
grid_limits=grid_limits, vcount_limits=vcount_limits, normalize=normalize, colormap=colormap, **kwargs)
if show:
plot3d.show()
return plot3d
@property
def col(self):
"""Gives direct access to the columns only (useful for tab completion).
Convenient when working with ipython in combination with small DataFrames, since this gives tab-completion.
Columns can be accessed by their names, which are attributes. The attributes are currently expressions, so you can
do computations with them.
Example
>>> ds = vaex.example()
>>> df.plot(df.col.x, df.col.y)
"""
class ColumnList(object):
pass
data = ColumnList()
for name in self.get_column_names():
if name != 'col': # avoid recursion
expression = getattr(self, name, None)
if not isinstance(expression, Expression):
expression = Expression(self, name)
else:
expression = Expression(self, name)
setattr(data, name, expression)
return data
[docs]
def close(self):
"""Close any possible open file handles or other resources, the DataFrame will not be in a usable state afterwards."""
self.dataset.close()
[docs]
def byte_size(self, selection=False, virtual=False):
"""Return the size in bytes the whole DataFrame requires (or the selection), respecting the active_fraction."""
bytes_per_row = 0
N = self.count(selection=selection)
extra = 0
for column in list(self.get_column_names(virtual=virtual)):
dtype = self.data_type(column)
#if dtype in [str_type, str] and dtype_internal.kind == 'O':
if dtype == str:
# TODO: document or fix this
# is it too expensive to calculate this exactly?
extra += self.columns[column].nbytes
else:
bytes_per_row += dtype.numpy.itemsize
if np.ma.isMaskedArray(self.columns[column]):
bytes_per_row += 1
return bytes_per_row * self.count(selection=selection) + extra
@property
def nbytes(self):
"""Alias for `df.byte_size()`, see :meth:`DataFrame.byte_size`."""
return self.byte_size()
def _shape_of(self, expression, filtered=True):
# TODO: we don't seem to need it anymore, would expect a valid_expression() call
# if check_alias:
# if str(expression) in self._column_aliases:
# expression = self._column_aliases[str(expression)] # translate the alias name into the real name
sample = self.evaluate(expression, 0, 1, filtered=False, array_type="numpy-arrow", parallel=False)
dtype = vaex.dtype_of(sample)
rows = len(self) if filtered else self.length_unfiltered()
if dtype.is_arrow: # for arrow, we don't have nd arrays yet
return (rows,)
else:
return (rows,) + sample.shape[1:]
# TODO: remove array_type and internal arguments?
[docs]
def data_type(self, expression, array_type=None, internal=False, axis=0):
"""Return the datatype for the given expression, if not a column, the first row will be evaluated to get the data type.
Example:
>>> df = vaex.from_scalars(x=1, s='Hi')
:param str array_type: 'numpy', 'arrow' or None, to indicate if the data type should be converted
:param int axis: If a nested type (like list), it will return the value_type of the nested type, axis levels deep.
"""
if isinstance(expression, vaex.expression.Expression):
expression = expression._label
expression = _ensure_string_from_expression(expression)
data_type = None
if expression in self.variables:
data_type = np.float64(1).dtype
elif self.is_local() and expression in self.columns.keys():
column = self.columns[expression]
if hasattr(column, 'dtype'):
# TODO: this probably would use data_type
# to support Columns that wrap arrow arrays
data_type = column.dtype
data_type = self._auto_encode_type(expression, data_type)
if isinstance(data_type, vaex.datatype.DataType):
data_type = data_type.internal
else:
data = column[0:1]
data = self._auto_encode_data(expression, data)
else:
expression = vaex.utils.valid_expression(self.get_column_names(hidden=True), expression)
try:
data = self.evaluate(expression, 0, 1, filtered=False, array_type=array_type, parallel=False)
except:
data = self.evaluate(expression, 0, 1, filtered=True, array_type=array_type, parallel=False)
if data_type is None:
# means we have to determine it from the data
if isinstance(data, np.ndarray):
data_type = data.dtype
elif isinstance(data, Column):
data = data.to_arrow()
data_type = data.type
else:
import pandas
# when we eval constants, let arrow find it out
if isinstance(data, numbers.Number):
data_type = pa.array([data]).type
elif isinstance(data, pandas.core.arrays.base.ExtensionArray):
data_type = data.dtype.type
else:
data_type = data.type # assuming arrow
if array_type == "arrow":
data_type = array_types.to_arrow_type(data_type)
elif array_type == "numpy":
data_type = array_types.to_numpy_type(data_type)
elif array_type == "numpy-arrow":
data_type = array_types.to_numpy_type(data_type, strict=False)
elif array_type is None:
data_type = data_type
else:
raise ValueError(f'Unknown array_type {array_type}')
data_type = DataType(data_type)
# ugly, but fixes df.x.apply(lambda x: str(x))
if not internal:
if isinstance(data_type.internal, np.dtype) and data_type.kind in 'US':
return DataType(pa.string())
if axis != 0:
axis_data_type = [data_type]
while data_type.is_list:
data_type = data_type.value_type
axis_data_type.append(data_type)
data_type = axis_data_type[axis]
return data_type
@property
def dtypes(self):
"""Gives a Pandas series object containing all numpy dtypes of all columns (except hidden)."""
from pandas import Series
return Series({column_name:self.data_type(column_name) for column_name in self.get_column_names()})
[docs]
def schema(self):
'''Similar to df.dtypes, but returns a dict'''
return {column_name:self.data_type(column_name) for column_name in self.get_column_names()}
[docs]
@docsubst
def schema_arrow(self, reduce_large=False):
'''Similar to :meth:`~vaex.dataframe.DataFrame.schema`, but returns an arrow schema
:param bool reduce_large: change large_string to normal string
'''
def reduce(type):
if reduce_large and type == pa.large_string():
type = pa.string()
return type
return pa.schema({name: reduce(dtype.arrow) for name, dtype in self.schema().items()})
[docs]
def is_masked(self, column):
'''Return if a column is a masked (numpy.ma) column.'''
column = _ensure_string_from_expression(column)
if column in self.dataset:
return self.dataset.is_masked(column)
else:
ar = self.evaluate(column, i1=0, i2=1, parallel=False)
if isinstance(ar, np.ndarray) and np.ma.isMaskedArray(ar):
return True
return False
def label(self, expression, unit=None, output_unit=None, format="latex_inline"):
label = expression
unit = unit or self.unit(expression)
try: # if we can convert the unit, use that for the labeling
if output_unit and unit: # avoid unnecessary error msg'es
output_unit.to(unit)
unit = output_unit
except:
logger.exception("unit error")
if unit is not None:
label = "%s (%s)" % (label, unit.to_string('latex_inline'))
return label
[docs]
def unit(self, expression, default=None):
"""Returns the unit (an astropy.unit.Units object) for the expression.
Example
>>> import vaex
>>> ds = vaex.example()
>>> df.unit("x")
Unit("kpc")
>>> df.unit("x*L")
Unit("km kpc2 / s")
:param expression: Expression, which can be a column name
:param default: if no unit is known, it will return this
:return: The resulting unit of the expression
:rtype: astropy.units.Unit
"""
expression = _ensure_string_from_expression(expression)
try:
# if an expression like pi * <some_expr> it will evaluate to a quantity instead of a unit
unit_or_quantity = eval(expression, expression_namespace, scopes.UnitScope(self))
unit = unit_or_quantity.unit if hasattr(unit_or_quantity, "unit") else unit_or_quantity
unit_types = (astropy.units.core.UnitBase, )
return unit if isinstance(unit, unit_types) else None
except:
# logger.exception("error evaluating unit expression: %s", expression)
# astropy doesn't add units, so we try with a quatiti
try:
return eval(expression, expression_namespace, scopes.UnitScope(self, 1.)).unit
except:
# logger.exception("error evaluating unit expression: %s", expression)
return default
[docs]
def ucd_find(self, ucds, exclude=[]):
"""Find a set of columns (names) which have the ucd, or part of the ucd.
Prefixed with a ^, it will only match the first part of the ucd.
Example
>>> df.ucd_find('pos.eq.ra', 'pos.eq.dec')
['RA', 'DEC']
>>> df.ucd_find('pos.eq.ra', 'doesnotexist')
>>> df.ucds[df.ucd_find('pos.eq.ra')]
'pos.eq.ra;meta.main'
>>> df.ucd_find('meta.main')]
'dec'
>>> df.ucd_find('^meta.main')]
"""
if isinstance(ucds, six.string_types):
ucds = [ucds]
if len(ucds) == 1:
ucd = ucds[0]
if ucd[0] == "^": # we want it to start with
ucd = ucd[1:]
columns = [name for name in self.get_column_names() if self.ucds.get(name, "").startswith(ucd) and name not in exclude]
else:
columns = [name for name in self.get_column_names() if ucd in self.ucds.get(name, "") and name not in exclude]
return None if len(columns) == 0 else columns[0]
else:
columns = [self.ucd_find([ucd], exclude=exclude) for ucd in ucds]
return None if None in columns else columns
@vaex.utils.deprecated('Will most likely disappear or move')
@_hidden
def selection_favorite_add(self, name, selection_name="default"):
selection = self.get_selection(name=selection_name)
if selection:
self.favorite_selections[name] = selection
self.selections_favorite_store()
else:
raise ValueError("no selection exists")
@vaex.utils.deprecated('Will most likely disappear or move')
@_hidden
def selection_favorite_remove(self, name):
del self.favorite_selections[name]
self.selections_favorite_store()
@vaex.utils.deprecated('Will most likely disappear or move')
@_hidden
def selection_favorite_apply(self, name, selection_name="default", executor=None):
self.set_selection(self.favorite_selections[name], name=selection_name, executor=executor)
@vaex.utils.deprecated('Will most likely disappear or move')
@_hidden
def selections_favorite_store(self):
path = os.path.join(self.get_private_dir(create=True), "favorite_selection.yaml")
selections = collections.OrderedDict([(key, value.to_dict()) for key, value in self.favorite_selections.items()])
vaex.utils.write_json_or_yaml(path, selections)
@vaex.utils.deprecated('Will most likely disappear or move')
@_hidden
def selections_favorite_load(self):
try:
path = os.path.join(self.get_private_dir(create=True), "favorite_selection.yaml")
if os.path.exists(path):
selections_dict = vaex.utils.read_json_or_yaml(path)
for key, value in selections_dict.items():
self.favorite_selections[key] = selections.selection_from_dict(self, value)
except:
logger.exception("non fatal error")
[docs]
def get_private_dir(self, create=False):
"""Each DataFrame has a directory where files are stored for metadata etc.
Example
>>> import vaex
>>> ds = vaex.example()
>>> vaex.get_private_dir()
'/Users/users/breddels/.vaex/dfs/_Users_users_breddels_vaex-testing_data_helmi-dezeeuw-2000-10p.hdf5'
:param bool create: is True, it will create the directory if it does not exist
"""
if self.is_local():
name = os.path.abspath(self.path).replace(os.path.sep, "_")[:250] # should not be too long for most os'es
name = name.replace(":", "_") # for windows drive names
else:
server = self.server
name = "%s_%s_%s_%s" % (server.hostname, server.port, server.base_path.replace("/", "_"), self.name)
dir = os.path.join(vaex.utils.get_private_dir(), "dfs", name)
if create and not os.path.exists(dir):
os.makedirs(dir)
return dir
def state_get(self, skip=None):
if self._future_behaviour == 5:
return self._state_get_vaex_5(skip=skip)
else:
if not ((skip is None) or (len(skip) == 1 and skip[0] is self.dataset)):
raise ValueError(f'skip should be None or its own dataset')
return self._state_get_pre_vaex_5()
def state_set(self, state, use_active_range=False, keep_columns=None, set_filter=True, trusted=True, warn=True, delete_unused_columns = True):
if self._future_behaviour == 5:
return self._state_set_vaex_5(state, use_active_range=use_active_range, keep_columns=keep_columns, set_filter=set_filter, trusted=trusted, warn=warn)
else:
return self._state_set_pre_vaex_5(state, use_active_range=use_active_range, keep_columns=keep_columns, set_filter=set_filter, trusted=trusted, warn=warn, delete_unused_columns=delete_unused_columns)
def _state_get_vaex_5(self, skip=None):
"""Return the internal state of the DataFrame in a dictionary
Example:
>>> import vaex
>>> df = vaex.from_scalars(x=1, y=2)
>>> df['r'] = (df.x**2 + df.y**2)**0.5
>>> df.state_get()
{'active_range': [0, 1],
'column_names': ['x', 'y', 'r'],
'description': None,
'descriptions': {},
'functions': {},
'renamed_columns': [],
'selections': {'__filter__': None},
'ucds': {},
'units': {},
'variables': {},
'virtual_columns': {'r': '(((x ** 2) + (y ** 2)) ** 0.5)'}}
"""
virtual_names = list(self.virtual_columns.keys()) + list(self.variables.keys())
units = {key: str(value) for key, value in self.units.items()}
ucds = {key: value for key, value in self.ucds.items() if key in virtual_names}
descriptions = {key: value for key, value in self.descriptions.items()}
selections = {name: self.get_selection(name) for name, history in self.selection_histories.items() if self.has_selection(name)}
encoding = vaex.encoding.Encoding()
state = dict(virtual_columns=dict(self.virtual_columns),
column_names=list(self.column_names),
variables={name: encoding.encode("variable", value) for name, value in self.variables.items()},
functions={name: encoding.encode("function", value) for name, value in self.functions.items()},
selections={name: encoding.encode("selection", value) for name, value in selections.items()},
description=self.description,
ucds=ucds,
units=units,
descriptions=descriptions,
active_range=[self._index_start, self._index_end]
)
datasets = self.dataset.leafs() if skip is None else skip
for dataset in datasets:
# mark leafs to not encode
encoding._object_specs[dataset.id] = None
assert encoding.has_object_spec(dataset.id)
if len(datasets) != 1:
raise ValueError('Multiple datasets present, please pass skip= argument so we know which dataset not to include in the state.')
dataset_main = datasets[0]
if dataset_main is not self.dataset:
# encode without the leafs
data = encoding.encode('dataset', self.dataset)
# remove the dummy leaf data
for dataset in datasets:
assert encoding._object_specs[dataset.id] is None
del encoding._object_specs[dataset.id]
if data is not None:
state['dataset'] = data
state['dataset_missing'] = {'main': dataset_main.id}
state['blobs'] = {key: base64.b64encode(value).decode('ascii') for key, value in encoding.blobs.items()}
if encoding._object_specs:
state['objects'] = encoding._object_specs
return state
def _state_set_vaex_5(self, state, use_active_range=False, keep_columns=None, set_filter=True, trusted=True, warn=True):
"""Sets the internal state of the df
Example:
>>> import vaex
>>> df = vaex.from_scalars(x=1, y=2)
>>> df
# x y r
0 1 2 2.23607
>>> df['r'] = (df.x**2 + df.y**2)**0.5
>>> state = df.state_get()
>>> state
{'active_range': [0, 1],
'column_names': ['x', 'y', 'r'],
'description': None,
'descriptions': {},
'functions': {},
'renamed_columns': [],
'selections': {'__filter__': None},
'ucds': {},
'units': {},
'variables': {},
'virtual_columns': {'r': '(((x ** 2) + (y ** 2)) ** 0.5)'}}
>>> df2 = vaex.from_scalars(x=3, y=4)
>>> df2.state_set(state) # now the virtual functions are 'copied'
>>> df2
# x y r
0 3 4 5
:param state: dict as returned by :meth:`DataFrame.state_get`.
:param bool use_active_range: Whether to use the active range or not.
:param list keep_columns: List of columns that should be kept if the state to be set contains less columns.
:param bool set_filter: Set the filter from the state (default), or leave the filter as it is it.
:param bool warn: Give warning when issues are found in the state transfer that are recoverable.
"""
self.description = state['description']
if use_active_range:
self._index_start, self._index_end = state['active_range']
self._length_unfiltered = self._index_end - self._index_start
if keep_columns:
all_columns = self.get_column_names()
for column_name in keep_columns:
if column_name not in all_columns:
raise KeyError(f'Column name {column_name} does not exist')
encoding = vaex.encoding.Encoding()
if 'blobs' in state:
encoding.blobs = {key: base64.b64decode(value.encode('ascii')) for key, value in state['blobs'].items()}
if 'objects' in state:
encoding._object_specs = state['objects']
if 'dataset' in state:
encoding.set_object(state['dataset_missing']['main'], self.dataset)
self.dataset = encoding.decode('dataset', state['dataset'])
for name, value in state['functions'].items():
self.add_function(name, encoding.decode("function", value, trusted=trusted))
# we clear all columns, and add them later on, since otherwise self[name] = ... will try
# to rename the columns (which is unsupported for remote dfs)
self.column_names = []
self.virtual_columns = {}
self.column_names = list(set(self.dataset) & set(state['column_names'])) # initial values not to have virtual column trigger missing column values
if 'variables' in state:
self.variables = {name: encoding.decode("variable", value) for name, value in state['variables'].items()}
for name, value in state['virtual_columns'].items():
self[name] = self._expr(value)
# self._save_assign_expression(name)
self.column_names = list(state['column_names'])
if keep_columns:
self.column_names += list(keep_columns)
for name in self.column_names:
self._save_assign_expression(name)
if "units" in state:
units = {key: astropy.units.Unit(value) for key, value in state["units"].items()}
self.units.update(units)
if 'selections' in state:
for name, selection_dict in state['selections'].items():
selection = encoding.decode('selection', selection_dict)
if name == FILTER_SELECTION_NAME and not set_filter:
continue
self.set_selection(selection, name=name)
if self.is_local():
for name in self.dataset:
if name not in self.column_names:
del self.columns[name]
def _state_get_pre_vaex_5(self):
"""Return the internal state of the DataFrame in a dictionary
Example:
>>> import vaex
>>> df = vaex.from_scalars(x=1, y=2)
>>> df['r'] = (df.x**2 + df.y**2)**0.5
>>> df.state_get()
{'active_range': [0, 1],
'column_names': ['x', 'y', 'r'],
'description': None,
'descriptions': {},
'functions': {},
'renamed_columns': [],
'selections': {'__filter__': None},
'ucds': {},
'units': {},
'variables': {},
'virtual_columns': {'r': '(((x ** 2) + (y ** 2)) ** 0.5)'}}
"""
virtual_names = list(self.virtual_columns.keys()) + list(self.variables.keys())
units = {key: str(value) for key, value in self.units.items()}
ucds = {key: value for key, value in self.ucds.items() if key in virtual_names}
descriptions = {key: value for key, value in self.descriptions.items()}
import vaex.serialize
def check(key, value):
if not vaex.serialize.can_serialize(value.f):
warnings.warn('Cannot serialize function for virtual column {} (use vaex.serialize.register)'.format(key))
return False
return True
def clean(value):
return vaex.serialize.to_dict(value.f)
functions = {key: clean(value) for key, value in self.functions.items() if check(key, value)}
virtual_columns = {key: value for key, value in self.virtual_columns.items()}
selections = {name: self.get_selection(name) for name, history in self.selection_histories.items()}
selections = {name: selection.to_dict() if selection is not None else None for name, selection in selections.items()}
# if selection is not None}
state = dict(virtual_columns=virtual_columns,
column_names=self.column_names,
renamed_columns=self._renamed_columns,
variables=self.variables,
functions=functions,
selections=selections,
ucds=ucds,
units=units,
descriptions=descriptions,
description=self.description,
active_range=[self._index_start, self._index_end])
return state
def _state_set_pre_vaex_5(self, state, use_active_range=False, keep_columns=None, set_filter=True, trusted=True, warn=True, delete_unused_columns = True):
"""Sets the internal state of the df
Example:
>>> import vaex
>>> df = vaex.from_scalars(x=1, y=2)
>>> df
# x y r
0 1 2 2.23607
>>> df['r'] = (df.x**2 + df.y**2)**0.5
>>> state = df.state_get()
>>> state
{'active_range': [0, 1],
'column_names': ['x', 'y', 'r'],
'description': None,
'descriptions': {},
'functions': {},
'renamed_columns': [],
'selections': {'__filter__': None},
'ucds': {},
'units': {},
'variables': {},
'virtual_columns': {'r': '(((x ** 2) + (y ** 2)) ** 0.5)'}}
>>> df2 = vaex.from_scalars(x=3, y=4)
>>> df2.state_set(state) # now the virtual functions are 'copied'
>>> df2
# x y r
0 3 4 5
:param state: dict as returned by :meth:`DataFrame.state_get`.
:param bool use_active_range: Whether to use the active range or not.
:param list keep_columns: List of columns that should be kept if the state to be set contains less columns.
:param bool set_filter: Set the filter from the state (default), or leave the filter as it is it.
:param bool warn: Give warning when issues are found in the state transfer that are recoverable.
:param bool delete_unused_columns: Whether to delete columns from the DataFrame that are not in the column_names. Useful to set to False during prediction time.
"""
if 'description' in state:
self.description = state['description']
if use_active_range:
if 'active_range' in state:
self._index_start, self._index_end = state['active_range']
self._length_unfiltered = self._index_end - self._index_start
if keep_columns:
all_columns = self.get_column_names()
for column_name in keep_columns:
if column_name not in all_columns:
raise KeyError(f'Column name {column_name} does not exist')
if 'renamed_columns' in state:
for old, new in state['renamed_columns']:
if old in self:
self._rename(old, new)
elif warn:
warnings.warn(f'The state wants to rename {old} to {new}, but {new} was not found, ignoring the rename')
if 'functions' in state:
for name, value in state['functions'].items():
self.add_function(name, vaex.serialize.from_dict(value, trusted=trusted))
if 'variables' in state:
self.variables = state['variables']
if 'column_names' in state:
# we clear all columns, and add them later on, since otherwise self[name] = ... will try
# to rename the columns (which is unsupported for remote dfs)
self.column_names = []
self.virtual_columns = {}
self.column_names = list(set(self.dataset) & set(state['column_names'])) # initial values not to have virtual column trigger missing column values
if 'virtual_columns' in state:
for name, value in state['virtual_columns'].items():
self[name] = self._expr(value)
self.column_names = list(state['column_names'])
if keep_columns:
self.column_names += list(keep_columns)
for name in self.column_names:
self._save_assign_expression(name)
else:
# old behaviour
self.virtual_columns = {}
for name, value in state['virtual_columns'].items():
self[name] = self._expr(value)
if 'units' in state:
units = {key: astropy.units.Unit(value) for key, value in state["units"].items()}
self.units.update(units)
if 'selections' in state:
for name, selection_dict in state['selections'].items():
if name == FILTER_SELECTION_NAME and not set_filter:
continue
# TODO: make selection use the vaex.serialize framework
if selection_dict is None:
selection = None
else:
selection = selections.selection_from_dict(selection_dict)
self.set_selection(selection, name=name)
if self.is_local() and delete_unused_columns:
for name in self.dataset:
if name not in self.column_names:
del self.columns[name]
[docs]
def state_write(self, file, fs_options=None, fs=None):
"""Write the internal state to a json or yaml file (see :meth:`DataFrame.state_get`)
Example
>>> import vaex
>>> df = vaex.from_scalars(x=1, y=2)
>>> df['r'] = (df.x**2 + df.y**2)**0.5
>>> df.state_write('state.json')
>>> print(open('state.json').read())
{
"virtual_columns": {
"r": "(((x ** 2) + (y ** 2)) ** 0.5)"
},
"column_names": [
"x",
"y",
"r"
],
"renamed_columns": [],
"variables": {
"pi": 3.141592653589793,
"e": 2.718281828459045,
"km_in_au": 149597870.7,
"seconds_per_year": 31557600
},
"functions": {},
"selections": {
"__filter__": null
},
"ucds": {},
"units": {},
"descriptions": {},
"description": null,
"active_range": [
0,
1
]
}
>>> df.state_write('state.yaml')
>>> print(open('state.yaml').read())
active_range:
- 0
- 1
column_names:
- x
- y
- r
description: null
descriptions: {}
functions: {}
renamed_columns: []
selections:
__filter__: null
ucds: {}
units: {}
variables:
pi: 3.141592653589793
e: 2.718281828459045
km_in_au: 149597870.7
seconds_per_year: 31557600
virtual_columns:
r: (((x ** 2) + (y ** 2)) ** 0.5)
:param str file: filename (ending in .json or .yaml)
:param dict fs_options: arguments to pass the the file system handler (s3fs or gcsfs)
:param fs: 'Pass a file system object directly, see :func:`vaex.open`'
"""
fs_options = fs_options or {}
vaex.utils.write_json_or_yaml(file, self.state_get(), fs_options=fs_options, fs=fs, old_style=not self._future_behaviour)
[docs]
def state_load(self, file, use_active_range=False, keep_columns=None, set_filter=True, trusted=True, fs_options=None, fs=None):
"""Load a state previously stored by :meth:`DataFrame.state_write`, see also :meth:`DataFrame.state_set`.
:param str file: filename (ending in .json or .yaml)
:param bool use_active_range: Whether to use the active range or not.
:param list keep_columns: List of columns that should be kept if the state to be set contains less columns.
:param bool set_filter: Set the filter from the state (default), or leave the filter as it is it.
:param dict fs_options: arguments to pass the the file system handler (s3fs or gcsfs)
:param fs: 'Pass a file system object directly, see :func:`vaex.open`'
"""
state = vaex.utils.read_json_or_yaml(file, fs_options=fs_options, fs=fs, old_style=not self._future_behaviour)
self.state_set(state, use_active_range=use_active_range, keep_columns=keep_columns, set_filter=set_filter, trusted=trusted)
# def remove_meta(self):
# path = os.path.join(self.get_private_dir(create=True), "meta.yaml")
# os.remove(path)
@_hidden
def write_virtual_meta(self):
"""Writes virtual columns, variables and their ucd,description and units.
The default implementation is to write this to a file called virtual_meta.yaml in the directory defined by
:func:`DataFrame.get_private_dir`. Other implementation may store this in the DataFrame file itself.
This method is called after virtual columns or variables are added. Upon opening a file, :func:`DataFrame.update_virtual_meta`
is called, so that the information is not lost between sessions.
Note: opening a DataFrame twice may result in corruption of this file.
"""
path = os.path.join(self.get_private_dir(create=True), "virtual_meta.yaml")
virtual_names = list(self.virtual_columns.keys()) + list(self.variables.keys())
units = {key: str(value) for key, value in self.units.items() if key in virtual_names}
ucds = {key: value for key, value in self.ucds.items() if key in virtual_names}
descriptions = {key: value for key, value in self.descriptions.items() if key in virtual_names}
meta_info = dict(virtual_columns=self.virtual_columns,
variables=self.variables,
ucds=ucds, units=units, descriptions=descriptions)
vaex.utils.write_json_or_yaml(path, meta_info)
@_hidden
def update_virtual_meta(self):
"""Will read back the virtual column etc, written by :func:`DataFrame.write_virtual_meta`. This will be done when opening a DataFrame."""
try:
path = os.path.join(self.get_private_dir(create=False), "virtual_meta.yaml")
if os.path.exists(path):
meta_info = vaex.utils.read_json_or_yaml(path)
if 'virtual_columns' not in meta_info:
return
self.virtual_columns.update(meta_info["virtual_columns"])
self.variables.update(meta_info["variables"])
self.ucds.update(meta_info["ucds"])
self.descriptions.update(meta_info["descriptions"])
units = {key: astropy.units.Unit(value) for key, value in meta_info["units"].items()}
self.units.update(units)
except:
logger.exception("non fatal error")
@_hidden
def write_meta(self):
"""Writes all meta data, ucd,description and units
The default implementation is to write this to a file called meta.yaml in the directory defined by
:func:`DataFrame.get_private_dir`. Other implementation may store this in the DataFrame file itself.
(For instance the vaex hdf5 implementation does this)
This method is called after virtual columns or variables are added. Upon opening a file, :func:`DataFrame.update_meta`
is called, so that the information is not lost between sessions.
Note: opening a DataFrame twice may result in corruption of this file.
"""
# raise NotImplementedError
path = os.path.join(self.get_private_dir(create=True), "meta.yaml")
units = {key: str(value) for key, value in self.units.items()}
meta_info = dict(description=self.description,
ucds=self.ucds, units=units, descriptions=self.descriptions,
)
vaex.utils.write_json_or_yaml(path, meta_info)
@_hidden
def update_meta(self):
"""Will read back the ucd, descriptions, units etc, written by :func:`DataFrame.write_meta`. This will be done when opening a DataFrame."""
try:
path = os.path.join(self.get_private_dir(create=False), "meta.yaml")
if os.path.exists(path):
meta_info = vaex.utils.read_json_or_yaml(path)
self.description = meta_info["description"]
self.ucds.update(meta_info["ucds"])
self.descriptions.update(meta_info["descriptions"])
# self.virtual_columns.update(meta_info["virtual_columns"])
# self.variables.update(meta_info["variables"])
units = {key: astropy.units.Unit(value) for key, value in meta_info["units"].items()}
self.units.update(units)
except:
logger.exception("non fatal error, but could read/understand %s", path)
[docs]
def is_local(self):
"""Returns True if the DataFrame is local, False when a DataFrame is remote."""
raise NotImplementedError
def get_auto_fraction(self):
return self._auto_fraction
def set_auto_fraction(self, enabled):
self._auto_fraction = enabled
@classmethod
def can_open(cls, path, *args, **kwargs):
# """Tests if this class can open the file given by path"""
return False
@classmethod
def get_options(cls, path):
return []
@classmethod
def option_to_args(cls, option):
return []
[docs]
def combinations(self, expressions_list=None, dimension=2, exclude=None, **kwargs):
"""Generate a list of combinations for the possible expressions for the given dimension.
:param expressions_list: list of list of expressions, where the inner list defines the subspace
:param dimensions: if given, generates a subspace with all possible combinations for that dimension
:param exclude: list of
"""
if dimension is not None:
expressions_list = list(itertools.combinations(self.get_column_names(), dimension))
if exclude is not None:
import six
def excluded(expressions):
if callable(exclude):
return exclude(expressions)
elif isinstance(exclude, six.string_types):
return exclude in expressions
elif isinstance(exclude, (list, tuple)):
# $#expressions = set(expressions)
for e in exclude:
if isinstance(e, six.string_types):
if e in expressions:
return True
elif isinstance(e, (list, tuple)):
if set(e).issubset(expressions):
return True
else:
raise ValueError("elements of exclude should contain a string or a sequence of strings")
else:
raise ValueError("exclude should contain a string, a sequence of strings, or should be a callable")
return False
# test if any of the elements of exclude are a subset of the expression
expressions_list = [expr for expr in expressions_list if not excluded(expr)]
logger.debug("expression list generated: %r", expressions_list)
return expressions_list
[docs]
def set_variable(self, name, expression_or_value, write=True):
"""Set the variable to an expression or value defined by expression_or_value.
Example
>>> df.set_variable("a", 2.)
>>> df.set_variable("b", "a**2")
>>> df.get_variable("b")
'a**2'
>>> df.evaluate_variable("b")
4.0
:param name: Name of the variable
:param write: write variable to meta file
:param expression: value or expression
"""
self.variables[name] = expression_or_value
# if write:
# self.write_virtual_meta()
[docs]
def get_variable(self, name):
"""Returns the variable given by name, it will not evaluate it.
For evaluation, see :func:`DataFrame.evaluate_variable`, see also :func:`DataFrame.set_variable`
"""
return self.variables[name]
[docs]
def evaluate_variable(self, name):
"""Evaluates the variable given by name."""
if isinstance(self.variables[name], six.string_types):
# TODO: this does not allow more than one level deep variable, like a depends on b, b on c, c is a const
value = eval(self.variables[name], expression_namespace, self.variables)
return value
else:
return self.variables[name]
[docs]
@docsubst
def evaluate(self, expression, i1=None, i2=None, out=None, selection=None, filtered=True, array_type=None, parallel=True, chunk_size=None, progress=None):
"""Evaluate an expression, and return a numpy array with the results for the full column or a part of it.
Note that this is not how vaex should be used, since it means a copy of the data needs to fit in memory.
To get partial results, use i1 and i2
:param str expression: Name/expression to evaluate
:param int i1: Start row index, default is the start (0)
:param int i2: End row index, default is the length of the DataFrame
:param ndarray out: Output array, to which the result may be written (may be used to reuse an array, or write to
a memory mapped array)
:param progress: {{progress}}
:param selection: selection to apply
:return:
"""
if chunk_size is not None:
return self.evaluate_iterator(expression, s1=i1, s2=i2, out=out, selection=selection, filtered=filtered, array_type=array_type, parallel=parallel, chunk_size=chunk_size, progress=progress)
else:
return self._evaluate_implementation(expression, i1=i1, i2=i2, out=out, selection=selection, filtered=filtered, array_type=array_type, parallel=parallel, chunk_size=chunk_size, progress=progress)
[docs]
@docsubst
def evaluate_iterator(self, expression, s1=None, s2=None, out=None, selection=None, filtered=True, array_type=None, parallel=True, chunk_size=None, prefetch=True, progress=None):
"""Generator to efficiently evaluate expressions in chunks (number of rows).
See :func:`DataFrame.evaluate` for other arguments.
Example:
>>> import vaex
>>> df = vaex.example()
>>> for i1, i2, chunk in df.evaluate_iterator(df.x, chunk_size=100_000):
... print(f"Total of {{i1}} to {{i2}} = {{chunk.sum()}}")
...
Total of 0 to 100000 = -7460.610158279056
Total of 100000 to 200000 = -4964.85827154921
Total of 200000 to 300000 = -7303.271340043915
Total of 300000 to 330000 = -2424.65234724951
:param progress: {{progress}}
:param prefetch: Prefetch/compute the next chunk in parallel while the current value is yielded/returned.
"""
progressbar = vaex.utils.progressbars(progress, title="evaluate iterator")
import concurrent.futures
self._fill_filter_mask()
progressbar(0)
if not prefetch:
# this is the simple implementation
for l1, l2, i1, i2 in self._unfiltered_chunk_slices(chunk_size):
yield l1, l2, self._evaluate_implementation(expression, i1=i1, i2=i2, out=out, selection=selection, filtered=filtered, array_type=array_type, parallel=parallel, raw=True)
progressbar(l2/len(self))
# But this implementation is faster if the main thread work is single threaded
else:
with concurrent.futures.ThreadPoolExecutor(1) as executor:
iter = self._unfiltered_chunk_slices(chunk_size)
def f(i1, i2):
return self._evaluate_implementation(expression, i1=i1, i2=i2, out=out, selection=selection, filtered=filtered, array_type=array_type, parallel=parallel, raw=True)
try:
previous_l1, previous_l2, previous_i1, previous_i2 = next(iter)
except StopIteration:
# empty dataframe/filter
return
# we submit the 1st job
previous = executor.submit(f, previous_i1, previous_i2)
for l1, l2, i1, i2 in iter:
# and we submit the next job before returning the previous, so they run in parallel
# but make sure the previous is done
previous_chunk = previous.result()
current = executor.submit(f, i1, i2)
yield previous_l1, previous_l2, previous_chunk
progressbar(previous_l2/len(self))
previous = current
previous_l1, previous_l2 = l1, l2
previous_chunk = previous.result()
yield previous_l1, previous_l2, previous_chunk
progressbar(previous_l2/len(self))
[docs]
@docsubst
def to_records(self, index=None, selection=None, column_names=None, strings=True, virtual=True, parallel=True,
chunk_size=None, array_type='python'):
"""Return a list of [{{column_name: value}}, ...)] "records" where each dict is an evaluated row.
:param index: an index to use to get the record of a specific row when provided
:param column_names: list of column names, to export, when None DataFrame.get_column_names(strings=strings, virtual=virtual) is used
:param selection: {selection}
:param strings: argument passed to DataFrame.get_column_names when column_names is None
:param virtual: argument passed to DataFrame.get_column_names when column_names is None
:param parallel: {evaluate_parallel}
:param chunk_size: {chunk_size}
:param array_type: {array_type}
:return: list of [{{column_name:value}}, ...] records
"""
if isinstance(index, int):
return {key: value[0] for key, value in
self[index:index + 1].to_dict(selection=selection, column_names=column_names, strings=strings,
virtual=virtual, parallel=parallel, array_type=array_type).items()}
if index is not None:
raise RuntimeError(f"index can be None or an int - {type(index)} provided")
if chunk_size is not None:
def iterator():
for i1, i2, chunk in self.to_dict(selection=selection, column_names=column_names, strings=strings,
virtual=virtual, parallel=parallel, chunk_size=chunk_size,
array_type=array_type):
keys = list(chunk.keys())
yield i1, i2, [{key: value for key, value in zip(keys, values)} for values in zip(*chunk.values())]
return iterator()
chunk = self.to_dict(selection=selection, column_names=column_names, strings=strings,
virtual=virtual, parallel=parallel, chunk_size=chunk_size,
array_type=array_type)
keys = list(chunk.keys())
return [{key: value for key, value in zip(keys, values)} for values in zip(*chunk.values())]
[docs]
@docsubst
def to_items(self, column_names=None, selection=None, strings=True, virtual=True, parallel=True, chunk_size=None, array_type=None):
"""Return a list of [(column_name, ndarray), ...)] pairs where the ndarray corresponds to the evaluated data
:param column_names: list of column names, to export, when None DataFrame.get_column_names(strings=strings, virtual=virtual) is used
:param selection: {selection}
:param strings: argument passed to DataFrame.get_column_names when column_names is None
:param virtual: argument passed to DataFrame.get_column_names when column_names is None
:param parallel: {evaluate_parallel}
:param chunk_size: {chunk_size}
:param array_type: {array_type}
:return: list of (name, ndarray) pairs or iterator of
"""
column_names = column_names or self.get_column_names(strings=strings, virtual=virtual)
column_names = _ensure_strings_from_expressions(column_names)
if chunk_size is not None:
def iterator():
for i1, i2, chunks in self.evaluate_iterator(column_names, selection=selection, parallel=parallel, chunk_size=chunk_size):
yield i1, i2, list(zip(column_names, [array_types.convert(chunk, array_type) for chunk in chunks]))
return iterator()
else:
return list(zip(column_names, [array_types.convert(chunk, array_type) for chunk in self.evaluate(column_names, selection=selection, parallel=parallel)]))
[docs]
@docsubst
def to_arrays(self, column_names=None, selection=None, strings=True, virtual=True, parallel=True, chunk_size=None, array_type=None):
"""Return a list of ndarrays
:param column_names: list of column names, to export, when None DataFrame.get_column_names(strings=strings, virtual=virtual) is used
:param selection: {selection}
:param strings: argument passed to DataFrame.get_column_names when column_names is None
:param virtual: argument passed to DataFrame.get_column_names when column_names is None
:param parallel: {evaluate_parallel}
:param chunk_size: {chunk_size}
:param array_type: {array_type}
:return: list of arrays
"""
column_names = column_names or self.get_column_names(strings=strings, virtual=virtual)
column_names = _ensure_strings_from_expressions(column_names)
if chunk_size is not None:
def iterator():
for i1, i2, chunks in self.evaluate_iterator(column_names, selection=selection, parallel=parallel, chunk_size=chunk_size):
yield i1, i2, [array_types.convert(chunk, array_type) for chunk in chunks]
return iterator()
return [array_types.convert(chunk, array_type) for chunk in self.evaluate(column_names, selection=selection, parallel=parallel)]
[docs]
@docsubst
def to_dict(self, column_names=None, selection=None, strings=True, virtual=True, parallel=True, chunk_size=None, array_type=None):
"""Return a dict containing the ndarray corresponding to the evaluated data
:param column_names: list of column names, to export, when None DataFrame.get_column_names(strings=strings, virtual=virtual) is used
:param selection: {selection}
:param strings: argument passed to DataFrame.get_column_names when column_names is None
:param virtual: argument passed to DataFrame.get_column_names when column_names is None
:param parallel: {evaluate_parallel}
:param chunk_size: {chunk_size}
:param array_type: {array_type}
:return: dict
"""
column_names = column_names or self.get_column_names(strings=strings, virtual=virtual)
column_names = _ensure_strings_from_expressions(column_names)
if chunk_size is not None:
def iterator():
for i1, i2, chunks in self.evaluate_iterator(column_names, selection=selection, parallel=parallel, chunk_size=chunk_size):
yield i1, i2, dict(list(zip(column_names, [array_types.convert(chunk, array_type) for chunk in chunks])))
return iterator()
return dict(list(zip(column_names, [array_types.convert(chunk, array_type) for chunk in self.evaluate(column_names, selection=selection, parallel=parallel)])))
@_hidden
@docsubst
@vaex.utils.deprecated('`.to_copy()` is deprecated and it will be removed in version 5.x. Please use `.copy()` instead.')
def to_copy(self, column_names=None, selection=None, strings=True, virtual=True, selections=True):
"""Return a copy of the DataFrame, if selection is None, it does not copy the data, it just has a reference
:param column_names: list of column names, to copy, when None DataFrame.get_column_names(strings=strings, virtual=virtual) is used
:param selection: {selection}
:param strings: argument passed to DataFrame.get_column_names when column_names is None
:param virtual: argument passed to DataFrame.get_column_names when column_names is None
:param selections: copy selections to a new DataFrame
:return: DataFrame
"""
if column_names:
column_names = _ensure_strings_from_expressions(column_names)
df = vaex.from_items(*self.to_items(column_names=column_names, selection=selection, strings=strings, virtual=False))
if virtual:
for name, value in self.virtual_columns.items():
df.add_virtual_column(name, value)
if selections:
# the filter selection does not need copying
for key, value in self.selection_histories.items():
if key != FILTER_SELECTION_NAME:
df.selection_histories[key] = list(value)
for key, value in self.selection_history_indices.items():
if key != FILTER_SELECTION_NAME:
df.selection_history_indices[key] = value
df.functions.update(self.functions)
df.copy_metadata(self)
return df
def copy_metadata(self, other):
for name in self.get_column_names(strings=True):
if name in other.units:
self.units[name] = other.units[name]
if name in other.descriptions:
self.descriptions[name] = other.descriptions[name]
if name in other.ucds:
self.ucds[name] = other.ucds[name]
self.description = other.description
[docs]
@docsubst
def to_pandas_df(self, column_names=None, selection=None, strings=True, virtual=True, index_name=None, parallel=True, chunk_size=None, array_type=None):
"""Return a pandas DataFrame containing the ndarray corresponding to the evaluated data
If index is given, that column is used for the index of the dataframe.
Example
>>> df_pandas = df.to_pandas_df(["x", "y", "z"])
>>> df_copy = vaex.from_pandas(df_pandas)
:param column_names: list of column names, to export, when None DataFrame.get_column_names(strings=strings, virtual=virtual) is used
:param selection: {selection}
:param strings: argument passed to DataFrame.get_column_names when column_names is None
:param virtual: argument passed to DataFrame.get_column_names when column_names is None
:param index_column: if this column is given it is used for the index of the DataFrame
:param parallel: {evaluate_parallel}
:param chunk_size: {chunk_size}
:param array_type: {array_type}
:return: pandas.DataFrame object or iterator of
"""
import pandas as pd
column_names = column_names or self.get_column_names(strings=strings, virtual=virtual)
column_names = _ensure_strings_from_expressions(column_names)
if index_name not in column_names and index_name is not None:
column_names = column_names + [index_name]
def create_pdf(data):
if index_name is not None:
index = data.pop(index_name)
else:
index = None
df = pd.DataFrame(data=data, index=index)
if index is not None:
df.index.name = index_name
return df
if chunk_size is not None:
def iterator():
for i1, i2, chunks in self.evaluate_iterator(column_names, selection=selection, parallel=parallel, chunk_size=chunk_size, array_type=array_type):
yield i1, i2, create_pdf(dict(zip(column_names, chunks)))
return iterator()
else:
return create_pdf(self.to_dict(column_names=column_names, selection=selection, parallel=parallel, array_type=array_type))
[docs]
@docsubst
def to_arrow_table(self, column_names=None, selection=None, strings=True, virtual=True, parallel=True, chunk_size=None, reduce_large=False):
"""Returns an arrow Table object containing the arrays corresponding to the evaluated data
:param column_names: list of column names, to export, when None DataFrame.get_column_names(strings=strings, virtual=virtual) is used
:param selection: {selection}
:param strings: argument passed to DataFrame.get_column_names when column_names is None
:param virtual: argument passed to DataFrame.get_column_names when column_names is None
:param parallel: {evaluate_parallel}
:param chunk_size: {chunk_size}
:param bool reduce_large: If possible, cast large_string to normal string
:return: pyarrow.Table object or iterator of
"""
import pyarrow as pa
column_names = column_names or self.get_column_names(strings=strings, virtual=virtual)
column_names = _ensure_strings_from_expressions(column_names)
if chunk_size is not None:
def iterator():
for i1, i2, chunks in self.evaluate_iterator(column_names, selection=selection, parallel=parallel, chunk_size=chunk_size):
chunks = list(map(vaex.array_types.to_arrow, chunks))
if reduce_large:
chunks = list(map(vaex.array_types.arrow_reduce_large, chunks))
yield i1, i2, pa.Table.from_arrays(chunks, column_names)
return iterator()
else:
chunks = self.evaluate(column_names, selection=selection, parallel=parallel)
chunks = list(map(vaex.array_types.to_arrow, chunks))
if reduce_large:
chunks = list(map(vaex.array_types.arrow_reduce_large, chunks))
return pa.Table.from_arrays(chunks, column_names)
[docs]
@docsubst
def to_astropy_table(self, column_names=None, selection=None, strings=True, virtual=True, index=None, parallel=True):
"""Returns a astropy table object containing the ndarrays corresponding to the evaluated data
:param column_names: list of column names, to export, when None DataFrame.get_column_names(strings=strings, virtual=virtual) is used
:param selection: {selection}
:param strings: argument passed to DataFrame.get_column_names when column_names is None
:param virtual: argument passed to DataFrame.get_column_names when column_names is None
:param index: if this column is given it is used for the index of the DataFrame
:return: astropy.table.Table object
"""
from astropy.table import Table, Column, MaskedColumn
meta = dict()
meta["description"] = self.description
table = Table(meta=meta)
for name, data in self.to_items(column_names=column_names, selection=selection, strings=strings, virtual=virtual, parallel=parallel):
if self.is_string(name): # for astropy we convert it to unicode, it seems to ignore object type
data = np.array(data).astype('U')
meta = dict()
if name in self.ucds:
meta["ucd"] = self.ucds[name]
if np.ma.isMaskedArray(data):
cls = MaskedColumn
else:
cls = Column
table[name] = cls(data, unit=self.unit(name), description=self.descriptions.get(name), meta=meta)
return table
[docs]
def to_dask_array(self, chunks="auto"):
"""Lazily expose the DataFrame as a dask.array
Example
>>> df = vaex.example()
>>> A = df[['x', 'y', 'z']].to_dask_array()
>>> A
dask.array<vaex-df-1f048b40-10ec-11ea-9553, shape=(330000, 3), dtype=float64, chunksize=(330000, 3), chunktype=numpy.ndarray>
>>> A+1
dask.array<add, shape=(330000, 3), dtype=float64, chunksize=(330000, 3), chunktype=numpy.ndarray>
:param chunks: How to chunk the array, similar to :func:`dask.array.from_array`.
:return: :class:`dask.array.Array` object.
"""
import dask.array as da
import uuid
dtype = self._dtype
chunks = da.core.normalize_chunks(chunks, shape=self.shape, dtype=dtype.numpy)
name = 'vaex-df-%s' % str(uuid.uuid1())
def getitem(df, item):
return np.array(df.__getitem__(item).to_arrays(parallel=False)).T
# broken since https://github.com/dask/dask/pull/7417
if hasattr(da.core, "getem"):
dsk = da.core.getem(name, chunks, getitem=getitem, shape=self.shape, dtype=dtype.numpy)
dsk[name] = self
return da.Array(dsk, name, chunks, dtype=dtype.numpy)
else:
dsk = da.core.graph_from_arraylike(self, name=name, chunks=chunks, getitem=getitem, shape=self.shape, dtype=dtype.numpy)
return da.Array(dsk, name, chunks, dtype=dtype.numpy)
[docs]
def validate_expression(self, expression):
"""Validate an expression (may throw Exceptions)"""
# return self.evaluate(expression, 0, 2)
if str(expression) in self.virtual_columns:
return
if self.is_local() and str(expression) in self.columns:
return
vars = set(self.get_names(hidden=True)) | {'df'}
funcs = set(expression_namespace.keys()) | set(self.functions.keys())
try:
return vaex.expresso.validate_expression(expression, vars, funcs)
except NameError as e:
raise NameError(str(e)) from None
def _block_scope(self, i1, i2):
variables = {key: self.evaluate_variable(key) for key in self.variables.keys()}
return scopes._BlockScope(self, i1, i2, **variables)
def select(self, boolean_expression, mode="replace", name="default"):
"""Select rows based on the boolean_expression, if there was a previous selection, the mode is taken into account.
if boolean_expression is None, remove the selection, has_selection() will returns false
Note that per DataFrame, multiple selections are possible, and one filter (see :func:`DataFrame.select`).
:param str boolean_expression: boolean expression, such as 'x < 0', '(x < 0) || (y > -10)' or None to remove the selection
:param str mode: boolean operation to perform with the previous selection, "replace", "and", "or", "xor", "subtract"
:return: None
"""
raise NotImplementedError
[docs]
def add_column(self, name, f_or_array, dtype=None):
"""Add an in memory array as a column."""
column_position = len(self.column_names)
if name in self.get_column_names():
column_position = self.column_names.index(name)
renamed = '__' +vaex.utils.find_valid_name(name, used=self.get_column_names())
self._rename(name, renamed)
if isinstance(f_or_array, supported_column_types):
data = ar = f_or_array
# it can be None when we have an 'empty' DataFrameArrays
if self._length_original is None:
self._length_unfiltered = _len(data)
self._length_original = _len(data)
self._index_end = self._length_unfiltered
if _len(ar) != self.length_original():
if self.filtered:
# give a better warning to avoid confusion
if len(self) == len(ar):
raise ValueError("Array is of length %s, while the length of the DataFrame is %s due to the filtering, the (unfiltered) length is %s." % (len(ar), len(self), self.length_unfiltered()))
raise ValueError("array is of length %s, while the length of the DataFrame is %s" % (len(ar), self.length_original()))
valid_name = vaex.utils.find_valid_name(name, used=self.get_column_names(hidden=True))
self.columns[valid_name] = ar
if valid_name not in self.column_names:
self.column_names.insert(column_position, valid_name)
else:
raise ValueError("functions not yet implemented")
# self._save_assign_expression(valid_name, Expression(self, valid_name))
self._initialize_column(valid_name)
def _initialize_column(self, name):
self._save_assign_expression(name)
def _sparse_matrix(self, column):
column = _ensure_string_from_expression(column)
return self._sparse_matrices.get(column)
def add_columns(self, names, columns):
from scipy.sparse import csc_matrix, csr_matrix
if isinstance(columns, csr_matrix):
if len(names) != columns.shape[1]:
raise ValueError('number of columns ({}) does not match number of column names ({})'.format(columns.shape[1], len(names)))
for i, name in enumerate(names):
valid_name = vaex.utils.find_valid_name(name, used=self.get_column_names(hidden=True))
self.columns[valid_name] = ColumnSparse(columns, i)
self.column_names.append(valid_name)
self._sparse_matrices[valid_name] = columns
self._save_assign_expression(valid_name)
else:
raise ValueError('only scipy.sparse.csr_matrix is supported')
def _save_assign_expression(self, name, expression=None):
# it's ok to set it if it does not exist, or we overwrite an older expression
if not hasattr(self, name) or isinstance(getattr(self, name), Expression):
if expression is None:
expression = name
if isinstance(expression, str):
expression = vaex.utils.valid_expression(self.get_column_names(hidden=True), expression)
expression = Expression(self, expression)
setattr(self, name, expression)
@_hidden
def add_column_healpix(self, name="healpix", longitude="ra", latitude="dec", degrees=True, healpix_order=12, nest=True):
"""Add a healpix (in memory) column based on a longitude and latitude
:param name: Name of column
:param longitude: longitude expression
:param latitude: latitude expression (astronomical convenction latitude=90 is north pole)
:param degrees: If lon/lat are in degrees (default) or radians.
:param healpix_order: healpix order, >= 0
:param nest: Nested healpix (default) or ring.
"""
import healpy as hp
if degrees:
scale = "*pi/180"
else:
scale = ""
# TODO: multithread this
phi = self.evaluate("(%s)%s" % (longitude, scale))
theta = self.evaluate("pi/2-(%s)%s" % (latitude, scale))
hp_index = hp.ang2pix(hp.order2nside(healpix_order), theta, phi, nest=nest)
self.add_column("healpix", hp_index)
@_hidden
def add_virtual_columns_matrix3d(self, x, y, z, xnew, ynew, znew, matrix, matrix_name='deprecated', matrix_is_expression=False, translation=[0, 0, 0], propagate_uncertainties=False):
"""
:param str x: name of x column
:param str y:
:param str z:
:param str xnew: name of transformed x column
:param str ynew:
:param str znew:
:param list[list] matrix: 2d array or list, with [row,column] order
:param str matrix_name:
:return:
"""
m = matrix
x, y, z = self._expr(x, y, z)
self[xnew] = m[0][0] * x + m[0][1] * y + m[0][2] * z + translation[0]
self[ynew] = m[1][0] * x + m[1][1] * y + m[1][2] * z + translation[1]
self[znew] = m[2][0] * x + m[2][1] * y + m[2][2] * z + translation[2]
if propagate_uncertainties:
self.propagate_uncertainties([self[xnew], self[ynew], self[znew]], [x, y, z])
# wrap these with an informative msg
# add_virtual_columns_eq2ecl = _requires('astro')
# add_virtual_columns_eq2gal = _requires('astro')
# add_virtual_columns_distance_from_parallax = _requires('astro')
# add_virtual_columns_cartesian_velocities_to_pmvr = _requires('astro')
# add_virtual_columns_proper_motion_eq2gal = _requires('astro')
# add_virtual_columns_lbrvr_proper_motion2vcartesian = _requires('astro')
# add_virtual_columns_equatorial_to_galactic_cartesian = _requires('astro')
# add_virtual_columns_celestial = _requires('astro')
# add_virtual_columns_proper_motion2vperpendicular = _requires('astro')
def _covariance_matrix_guess(self, columns, full=False, as_expression=False):
all_column_names = self.get_column_names()
columns = _ensure_strings_from_expressions(columns)
def _guess(x, y):
if x == y:
postfixes = ["_error", "_uncertainty", "e", "_e"]
prefixes = ["e", "e_"]
for postfix in postfixes:
if x + postfix in all_column_names:
return x + postfix
for prefix in prefixes:
if prefix + x in all_column_names:
return prefix + x
if full:
raise ValueError("No uncertainty found for %r" % x)
else:
postfixes = ["_cov", "_covariance"]
for postfix in postfixes:
if x + "_" + y + postfix in all_column_names:
return x + "_" + y + postfix
if y + "_" + x + postfix in all_column_names:
return y + "_" + x + postfix
postfixes = ["_correlation", "_corr"]
for postfix in postfixes:
if x + "_" + y + postfix in all_column_names:
return x + "_" + y + postfix + " * " + _guess(x, x) + " * " + _guess(y, y)
if y + "_" + x + postfix in all_column_names:
return y + "_" + x + postfix + " * " + _guess(y, y) + " * " + _guess(x, x)
if full:
raise ValueError("No covariance or correlation found for %r and %r" % (x, y))
return "0"
N = len(columns)
cov_matrix = [[""] * N for i in range(N)]
for i in range(N):
for j in range(N):
cov = _guess(columns[i], columns[j])
if i == j and cov:
cov += "**2" # square the diagnal
cov_matrix[i][j] = cov
if as_expression:
return [[self[k] for k in row] for row in cov_matrix]
else:
return cov_matrix
def _jacobian(self, expressions, variables):
expressions = _ensure_strings_from_expressions(expressions)
return [[self[expression].expand(stop=[var]).derivative(var) for var in variables] for expression in expressions]
[docs]
def propagate_uncertainties(self, columns, depending_variables=None, cov_matrix='auto',
covariance_format="{}_{}_covariance",
uncertainty_format="{}_uncertainty"):
"""Propagates uncertainties (full covariance matrix) for a set of virtual columns.
Covariance matrix of the depending variables is guessed by finding columns prefixed by "e"
or `"e_"` or postfixed by "_error", "_uncertainty", "e" and `"_e"`.
Off diagonals (covariance or correlation) by postfixes with "_correlation" or "_corr" for
correlation or "_covariance" or "_cov" for covariances.
(Note that x_y_cov = x_e * y_e * x_y_correlation.)
Example
>>> df = vaex.from_scalars(x=1, y=2, e_x=0.1, e_y=0.2)
>>> df["u"] = df.x + df.y
>>> df["v"] = np.log10(df.x)
>>> df.propagate_uncertainties([df.u, df.v])
>>> df.u_uncertainty, df.v_uncertainty
:param columns: list of columns for which to calculate the covariance matrix.
:param depending_variables: If not given, it is found out automatically, otherwise a list of columns which have uncertainties.
:param cov_matrix: List of list with expressions giving the covariance matrix, in the same order as depending_variables. If 'full' or 'auto',
the covariance matrix for the depending_variables will be guessed, where 'full' gives an error if an entry was not found.
"""
names = _ensure_strings_from_expressions(columns)
virtual_columns = self._expr(*columns, always_list=True)
if depending_variables is None:
depending_variables = set()
for expression in virtual_columns:
depending_variables |= expression.expand().variables()
depending_variables = list(sorted(list(depending_variables)))
fs = [self[self.virtual_columns[name]] for name in names]
jacobian = self._jacobian(fs, depending_variables)
m = len(fs)
n = len(depending_variables)
# n x n matrix
cov_matrix = self._covariance_matrix_guess(depending_variables, full=cov_matrix == "full", as_expression=True)
# empty m x m matrix
cov_matrix_out = [[self['0'] for __ in range(m)] for __ in range(m)]
for i in range(m):
for j in range(m):
for k in range(n):
for l in range(n):
if jacobian[i][k].expression == '0' or jacobian[j][l].expression == '0' or cov_matrix[k][l].expression == '0':
pass
else:
cov_matrix_out[i][j] = cov_matrix_out[i][j] + jacobian[i][k] * cov_matrix[k][l] * jacobian[j][l]
for i in range(m):
for j in range(i + 1):
sigma = cov_matrix_out[i][j]
sigma = self._expr(vaex.expresso.simplify(_ensure_string_from_expression(sigma)))
if i != j:
self.add_virtual_column(covariance_format.format(names[i], names[j]), sigma)
else:
self.add_virtual_column(uncertainty_format.format(names[i]), np.sqrt(sigma))
@_hidden
def add_virtual_columns_cartesian_to_polar(self, x="x", y="y", radius_out="r_polar", azimuth_out="phi_polar",
propagate_uncertainties=False,
radians=False):
kwargs = dict(**locals())
del kwargs['self']
return self.geo.cartesian_to_polar(inplace=True, **kwargs)
@_hidden
def add_virtual_columns_cartesian_velocities_to_spherical(self, x="x", y="y", z="z", vx="vx", vy="vy", vz="vz", vr="vr", vlong="vlong", vlat="vlat", distance=None):
kwargs = dict(**locals())
del kwargs['self']
return self.geo.velocity_cartesian2spherical(inplace=True, **kwargs)
def _expr(self, *expressions, **kwargs):
always_list = kwargs.pop('always_list', False)
return self[str(expressions[0])] if len(expressions) == 1 and not always_list else [self[str(k)] for k in expressions]
def _selection_expression(self, expression):
return vaex.expression.Expression(self, str(expression), _selection=True)
@_hidden
def add_virtual_columns_cartesian_velocities_to_polar(self, x="x", y="y", vx="vx", radius_polar=None, vy="vy", vr_out="vr_polar", vazimuth_out="vphi_polar",
propagate_uncertainties=False,):
kwargs = dict(**locals())
del kwargs['self']
return self.geo.velocity_cartesian2polar(inplace=True, **kwargs)
@_hidden
def add_virtual_columns_polar_velocities_to_cartesian(self, x='x', y='y', azimuth=None, vr='vr_polar', vazimuth='vphi_polar', vx_out='vx', vy_out='vy', propagate_uncertainties=False):
kwargs = dict(**locals())
del kwargs['self']
return self.geo.velocity_polar2cartesian(inplace=True, **kwargs)
@_hidden
def add_virtual_columns_rotation(self, x, y, xnew, ynew, angle_degrees, propagate_uncertainties=False):
kwargs = dict(**locals())
del kwargs['self']
return self.geo.rotation_2d(inplace=True, **kwargs)
@docsubst
@_hidden
def add_virtual_columns_spherical_to_cartesian(self, alpha, delta, distance, xname="x", yname="y", zname="z",
propagate_uncertainties=False,
center=[0, 0, 0], radians=False):
kwargs = dict(**locals())
del kwargs['self']
return self.geo.spherical2cartesian(inplace=True, **kwargs)
@_hidden
def add_virtual_columns_cartesian_to_spherical(self, x="x", y="y", z="z", alpha="l", delta="b", distance="distance", radians=False, center=None, center_name="solar_position"):
kwargs = dict(**locals())
del kwargs['self']
return self.geo.cartesian2spherical(inplace=True, **kwargs)
@_hidden
def add_virtual_columns_aitoff(self, alpha, delta, x, y, radians=True):
kwargs = dict(**locals())
del kwargs['self']
return self.geo.project_aitoff(inplace=True, **kwargs)
@_hidden
def add_virtual_columns_projection_gnomic(self, alpha, delta, alpha0=0, delta0=0, x="x", y="y", radians=False, postfix=""):
kwargs = dict(**locals())
del kwargs['self']
return self.geo.project_gnomic(inplace=True, **kwargs)
def add_function(self, name, f, unique=False):
name = vaex.utils.find_valid_name(name, used=[] if not unique else self.functions.keys())
function = vaex.expression.Function(self, name, f)
self.functions[name] = function
return function
[docs]
def add_virtual_column(self, name, expression, unique=False):
"""Add a virtual column to the DataFrame.
Example:
>>> df.add_virtual_column("r", "sqrt(x**2 + y**2 + z**2)")
>>> df.select("r < 10")
:param: str name: name of virtual column
:param: expression: expression for the column
:param str unique: if name is already used, make it unique by adding a postfix, e.g. _1, or _2
"""
if isinstance(expression, Expression):
if expression.df is not self:
expression = expression.copy(self)
column_position = len(self.column_names)
# if the current name is an existing column name....
if name in self.get_column_names(hidden=True):
column_position = self.column_names.index(name)
renamed = vaex.utils.find_valid_name('__' +name, used=self.get_column_names(hidden=True))
# we rewrite all existing expressions (including the passed down expression argument)
self._rename(name, renamed)
expression = _ensure_string_from_expression(expression)
if vaex.utils.find_valid_name(name) != name:
# if we have to rewrite the name, we need to make it unique
unique = True
valid_name = vaex.utils.find_valid_name(name, used=None if not unique else self.get_column_names(hidden=True))
self.virtual_columns[valid_name] = expression
self._virtual_expressions[valid_name] = Expression(self, expression)
if name not in self.column_names:
self.column_names.insert(column_position, valid_name)
self._save_assign_expression(valid_name)
self.signal_column_changed.emit(self, valid_name, "add")
[docs]
def rename(self, name, new_name, unique=False):
"""Renames a column or variable, and rewrite expressions such that they refer to the new name"""
if name == new_name:
return
new_name = vaex.utils.find_valid_name(new_name, used=None if not unique else self.get_column_names(hidden=True))
self._rename(name, new_name, rename_meta_data=True)
return new_name
def _rename(self, old, new, rename_meta_data=False):
is_variable = False
is_function = False
if old in self.variables:
self.variables[new] = self.variables.pop(old)
is_variable = True
if old in self.functions:
self.functions[new] = self.functions.pop(old)
is_function = True
elif old in self.virtual_columns:
# renaming a column should not change the internal order, otherwise virtual
# columns do not resolve (it will reference an unknown column)
self.virtual_columns = vaex.utils.dict_replace_key(self.virtual_columns, old, new)
self._virtual_expressions = vaex.utils.dict_replace_key(self._virtual_expressions, old, new)
elif self.is_local() and old in self.columns:
# we only have to do this locally
# if we don't do this locally, we still store this info
# in self._renamed_columns, so it will happen at the server
self.dataset = self.dataset.renamed({old: new})
if rename_meta_data:
for d in [self.ucds, self.units, self.descriptions]:
if old in d:
d[new] = d[old]
del d[old]
for key, value in self.selection_histories.items():
self.selection_histories[key] = list([k if k is None else k._rename(self, old, new) for k in value])
if not (is_variable or is_function):
if new not in self.virtual_columns:
self._renamed_columns.append((old, new))
self.column_names[self.column_names.index(old)] = new
if hasattr(self, old):
if isinstance(getattr(self, old), Expression):
try:
delattr(self, old)
except:
pass
self._save_assign_expression(new)
existing_expressions = [k() for k in self._expressions]
existing_expressions = [k for k in existing_expressions if k is not None]
for expression in existing_expressions:
expression._rename(old, new, inplace=True)
self.virtual_columns = {k:self._virtual_expressions[k].expression for k, v in self.virtual_columns.items()}
[docs]
def delete_virtual_column(self, name):
"""Deletes a virtual column from a DataFrame."""
self.drop(name, inplace=True)
self.signal_column_changed.emit(self, name, "delete")
[docs]
def add_variable(self, name, expression, overwrite=True, unique=True):
"""Add a variable to a DataFrame.
A variable may refer to other variables, and virtual columns and expression may refer to variables.
Example
>>> df.add_variable('center', 0)
>>> df.add_virtual_column('x_prime', 'x-center')
>>> df.select('x_prime < 0')
:param: str name: name of virtual varible
:param: expression: expression for the variable
"""
if unique or overwrite or name not in self.variables:
existing_names = self.get_column_names(virtual=False) + list(self.variables.keys())
name = vaex.utils.find_valid_name(name, used=[] if not unique else existing_names)
self.variables[name] = expression
self.signal_variable_changed.emit(self, name, "add")
if unique:
return name
[docs]
def delete_variable(self, name):
"""Deletes a variable from a DataFrame."""
del self.variables[name]
self.signal_variable_changed.emit(self, name, "delete")
def info(self, description=True):
from IPython import display
self._output_css()
display.display(display.HTML(self._info(description=description)))
def _info(self, description=True):
parts = ["""<div><h2>{}</h2> <b>rows</b>: {:,}</div>""".format(self.name, len(self))]
if hasattr(self, 'path'):
parts += ["""<div><b>path</b>: <i>%s</i></div>""" % (self.path)]
if self.description:
parts += ["""<div><b>Description</b>: {}</div>""".format(self.description)]
parts += ["<h2>Columns:</h2>"]
parts += ["<table class='table-striped'>"]
parts += ["<thead><tr>"]
for header in "column type unit description expression".split():
if description or header != "description":
parts += ["<th>%s</th>" % header]
parts += ["</tr></thead>"]
for name in self.get_column_names():
parts += ["<tr>"]
parts += ["<td>%s</td>" % name]
virtual = name in self.virtual_columns
if not virtual:
dtype = str(self.data_type(name)) if self.data_type(name) != str else 'str'
else:
dtype = "</i>virtual column</i>"
parts += ["<td>%s</td>" % dtype]
units = self.unit(name)
units = units.to_string("latex_inline") if units else ""
parts += ["<td>%s</td>" % units]
if description:
parts += ["<td ><pre>%s</pre></td>" % self.descriptions.get(name, "")]
if virtual:
parts += ["<td><code>%s</code></td>" % self.virtual_columns[name]]
else:
parts += ["<td></td>"]
parts += ["</tr>"]
parts += "</table>"
ignore_list = 'pi e km_in_au seconds_per_year'.split()
variable_names = [name for name in self.variables.keys() if name not in ignore_list]
if variable_names:
parts += ["<h2>Variables:</h2>"]
parts += ["<table class='table-striped'>"]
parts += ["<thead><tr>"]
for header in "variable type unit description expression".split():
if description or header != "description":
parts += ["<th>%s</th>" % header]
parts += ["</tr></thead>"]
for name in variable_names:
parts += ["<tr>"]
parts += ["<td>%s</td>" % name]
parts += ["<td>%r</td>" % type]
units = self.unit(name)
units = units.to_string("latex_inline") if units else ""
parts += ["<td>%s</td>" % units]
if description:
parts += ["<td ><pre>%s</pre></td>" % self.descriptions.get(name, "")]
parts += ["<td><code>%s</code></td>" % (self.variables[name], )]
parts += ["</tr>"]
parts += "</table>"
return "".join(parts) + "<h2>Data:</h2>" + self._head_and_tail_table()
[docs]
def head(self, n=10):
"""Return a shallow copy a DataFrame with the first n rows."""
return self[:min(n, len(self))]
[docs]
def tail(self, n=10):
"""Return a shallow copy a DataFrame with the last n rows."""
N = len(self)
# self.cat(i1=max(0, N-n), i2=min(len(self), N))
return self[max(0, N - n):min(len(self), N)]
def _head_and_tail_table(self, n=None, format='html'):
n = n or vaex.settings.display.max_rows
N = _len(self)
if N <= n:
return self._as_table(0, N, format=format)
else:
return self._as_table(0, math.ceil(n / 2), N - math.floor(n / 2), N, format=format)
[docs]
def head_and_tail_print(self, n=5):
"""Display the first and last n elements of a DataFrame."""
from IPython import display
display.display(display.HTML(self._head_and_tail_table(n)))
[docs]
def describe(self, strings=True, virtual=True, selection=None):
"""Give a description of the DataFrame.
>>> import vaex
>>> df = vaex.example()[['x', 'y', 'z']]
>>> df.describe()
x y z
dtype float64 float64 float64
count 330000 330000 330000
missing 0 0 0
mean -0.0671315 -0.0535899 0.0169582
std 7.31746 7.78605 5.05521
min -128.294 -71.5524 -44.3342
max 271.366 146.466 50.7185
>>> df.describe(selection=df.x > 0)
x y z
dtype float64 float64 float64
count 164060 164060 164060
missing 165940 165940 165940
mean 5.13572 -0.486786 -0.0868073
std 5.18701 7.61621 5.02831
min 1.51635e-05 -71.5524 -44.3342
max 271.366 78.0724 40.2191
:param bool strings: Describe string columns or not
:param bool virtual: Describe virtual columns or not
:param selection: Optional selection to use.
:return: Pandas dataframe
"""
import pandas as pd
N = len(self)
columns = {}
for feature in self.get_column_names(strings=strings, virtual=virtual)[:]:
data_type = self.data_type(feature)
if data_type == str:
count = self.count(feature, selection=selection, delay=True)
self.execute()
count = count.get()
columns[feature] = ((data_type, count, N-count, '--', '--', '--', '--'))
elif data_type.kind in 'SU':
# TODO: this blocks is the same as the string block above, can we avoid SU types?
count = self.count(feature, selection=selection, delay=True)
self.execute()
count = count.get()
columns[feature] = ((data_type, count, N-count, '--', '--', '--', '--'))
elif data_type.kind in 'O':
# this will also properly count NaN-like objects like NaT
count_na = self[feature].isna().astype('int').sum(delay=True)
self.execute()
count_na = count_na.get()
columns[feature] = ((data_type, N-count_na, count_na, '--', '--', '--', '--'))
elif data_type.is_primitive or data_type.is_temporal:
mean = self.mean(feature, selection=selection, delay=True)
std = self.std(feature, selection=selection, delay=True)
minmax = self.minmax(feature, selection=selection, delay=True)
if data_type.is_datetime: # this path tests using isna, which test for nat
count_na = self[feature].isna().astype('int').sum(delay=True)
else:
count = self.count(feature, selection=selection, delay=True)
self.execute()
if data_type.is_datetime:
count_na, mean, std, minmax = count_na.get(), mean.get(), std.get(), minmax.get()
count = N - int(count_na)
else:
count, mean, std, minmax = count.get(), mean.get(), std.get(), minmax.get()
count = int(count)
columns[feature] = ((data_type, count, N-count, mean, std, minmax[0], minmax[1]))
else:
raise NotImplementedError(f'Did not implement describe for data type {data_type}')
return pd.DataFrame(data=columns, index=['data_type', 'count', 'NA', 'mean', 'std', 'min', 'max'])
[docs]
def cat(self, i1, i2, format='html'):
"""Display the DataFrame from row i1 till i2
For format, see https://pypi.org/project/tabulate/
:param int i1: Start row
:param int i2: End row.
:param str format: Format to use, e.g. 'html', 'plain', 'latex'
"""
from IPython import display
if format == 'html':
output = self._as_html_table(i1, i2)
display.display(display.HTML(output))
else:
output = self._as_table(i1, i2, format=format)
print(output)
def _as_table(self, i1, i2, j1=None, j2=None, format='html', ellipsis="..."):
from .formatting import _format_value
parts = [] # """<div>%s (length=%d)</div>""" % (self.name, len(self))]
parts += ["<table class='table-striped'>"]
# we need to get the underlying names since we use df.evaluate
column_names = self.get_column_names()
max_columns = vaex.settings.display.max_columns
if (max_columns is not None) and (max_columns > 0):
if max_columns < len(column_names):
columns_sliced = math.ceil(max_columns/2)
column_names = column_names[:columns_sliced] + column_names[-math.floor(max_columns/2):]
else:
columns_sliced = None
values_list = []
values_list.append(['#', []])
# parts += ["<thead><tr>"]
for i, name in enumerate(column_names):
if columns_sliced == i:
values_list.append([ellipsis, []])
values_list.append([name, []])
# parts += ["<th>%s</th>" % name]
# parts += ["</tr></thead>"]
def table_part(k1, k2, parts):
N = k2 - k1
# slicing will invoke .extract which will make the evaluation
# much quicker
df = self[k1:k2]
try:
values = dict(zip(column_names, df.evaluate(column_names)))
except:
values = {}
for i, name in enumerate(column_names):
try:
values[name] = df.evaluate(name)
except:
values[name] = ["error"] * (N)
logger.exception('error evaluating: %s at rows %i-%i' % (name, k1, k2))
for i in range(k2 - k1):
# parts += ["<tr>"]
# parts += ["<td><i style='opacity: 0.6'>{:,}</i></td>".format(i + k1)]
if format == 'html':
value = "<i style='opacity: 0.6'>{:,}</i>".format(i + k1)
else:
value = "{:,}".format(i + k1)
values_list[0][1].append(value)
for j, name in enumerate(column_names):
column_index = j
if columns_sliced == j:
values_list[column_index+1][1].append(ellipsis)
if columns_sliced is not None and j >= columns_sliced:
column_index += 1 # skip over the slice/ellipsis
value = values[name][i]
value = _format_value(value)
values_list[column_index+1][1].append(value)
# parts += ["</tr>"]
# return values_list
if i2 - i1 > 0:
parts = table_part(i1, i2, parts)
if j1 is not None and j2 is not None:
values_list[0][1].append(ellipsis)
for i in range(len(column_names)):
# parts += ["<td>...</td>"]
values_list[i+1][1].append(ellipsis)
# parts = table_part(j1, j2, parts)
table_part(j1, j2, parts)
else:
for header, values in values_list:
values.append(None)
# parts += "</table>"
# html = "".join(parts)
# return html
values_list = dict(values_list)
# print(values_list)
import tabulate
table_text = str(tabulate.tabulate(values_list, headers="keys", tablefmt=format))
# Tabulate 0.8.7+ escapes html :()
table_text = table_text.replace('<i style='opacity: 0.6'>', "<i style='opacity: 0.6'>")
table_text = table_text.replace('</i>', "</i>")
if i2 - i1 == 0:
if self._length_unfiltered != len(self):
footer_text = 'No rows to display (because of filtering).'
else:
footer_text = 'No rows to display.'
if format == 'html':
table_text += f'<i>{footer_text}</i>'
if format == 'plain':
table_text += f'\n{footer_text}'
return table_text
def _as_html_table(self, i1, i2, j1=None, j2=None):
# TODO: this method can be replaced by _as_table
from .formatting import _format_value
parts = [] # """<div>%s (length=%d)</div>""" % (self.name, len(self))]
parts += ["<table class='table-striped'>"]
column_names = self.get_column_names()
parts += ["<thead><tr>"]
for name in ["#"] + column_names:
parts += ["<th>%s</th>" % name]
parts += ["</tr></thead>"]
def table_part(k1, k2, parts):
data_parts = {}
N = k2 - k1
for name in column_names:
try:
data_parts[name] = self.evaluate(name, i1=k1, i2=k2)
except:
data_parts[name] = ["error"] * (N)
logger.exception('error evaluating: %s at rows %i-%i' % (name, k1, k2))
for i in range(k2 - k1):
parts += ["<tr>"]
parts += ["<td><i style='opacity: 0.6'>{:,}</i></td>".format(i + k1)]
for name in column_names:
value = data_parts[name][i]
value = _format_value(value)
parts += ["<td>%r</td>" % value]
parts += ["</tr>"]
return parts
parts = table_part(i1, i2, parts)
if j1 is not None and j2 is not None:
for i in range(len(column_names) + 1):
parts += ["<td>...</td>"]
parts = table_part(j1, j2, parts)
parts += "</table>"
html = "".join(parts)
return html
def _output_css(self):
css = """.vaex-description pre {
max-width : 450px;
white-space : nowrap;
overflow : hidden;
text-overflow: ellipsis;
}
.vex-description pre:hover {
max-width : initial;
white-space: pre;
}"""
from IPython import display
style = "<style>%s</style>" % css
display.display(display.HTML(style))
def _repr_mimebundle_(self, include=None, exclude=None, **kwargs):
# TODO: optimize, since we use the same data in both versions
# TODO: include latex version
return {'text/html':self._head_and_tail_table(format='html'), 'text/plain': self._head_and_tail_table(format='plain')}
def _repr_html_(self):
"""Representation for Jupyter."""
self._output_css()
return self._head_and_tail_table()
[docs]
def __str__(self):
return self._head_and_tail_table(format='plain')
if not _DEBUG:
[docs]
def __repr__(self):
return self._head_and_tail_table(format='plain')
def __current_sequence_index(self):
"""TODO"""
return 0
[docs]
def has_current_row(self):
"""Returns True/False if there currently is a picked row."""
return self._current_row is not None
[docs]
def get_current_row(self):
"""Individual rows can be 'picked', this is the index (integer) of the current row, or None there is nothing picked."""
return self._current_row
[docs]
def set_current_row(self, value):
"""Set the current row, and emit the signal signal_pick."""
if (value is not None) and ((value < 0) or (value >= len(self))):
raise IndexError("index %d out of range [0,%d]" % (value, len(self)))
self._current_row = value
self.signal_pick.emit(self, value)
def __has_snapshots(self):
# currenly disabled
return False
[docs]
def column_count(self, hidden=False):
"""Returns the number of columns (including virtual columns).
:param bool hidden: If True, include hidden columns in the tally
:returns: Number of columns in the DataFrame
"""
return len(self.get_column_names(hidden=hidden))
[docs]
def get_names(self, hidden=False):
"""Return a list of column names and variable names."""
names = self.get_column_names(hidden=hidden)
return names +\
[k for k in self.variables.keys() if not hidden or not k.startswith('__')] +\
[k for k in self.functions.keys() if not hidden or not k.startswith('__')]
[docs]
def get_column_names(self, virtual=True, strings=True, hidden=False, regex=None, dtype=None):
"""Return a list of column names
Example:
>>> import vaex
>>> df = vaex.from_scalars(x=1, x2=2, y=3, s='string')
>>> df['r'] = (df.x**2 + df.y**2)**2
>>> df.get_column_names()
['x', 'x2', 'y', 's', 'r']
>>> df.get_column_names(virtual=False)
['x', 'x2', 'y', 's']
>>> df.get_column_names(regex='x.*')
['x', 'x2']
>>> df.get_column_names(dtype='string')
['s']
:param virtual: If False, skip virtual columns
:param hidden: If False, skip hidden columns
:param strings: If False, skip string columns
:param regex: Only return column names matching the (optional) regular expression
:param dtype: Only return column names with the given dtype. Can be a single or a list of dtypes.
:rtype: list of str
"""
if dtype is None:
dtype = []
else:
dtype = vaex.utils._ensure_list(dtype)
def column_filter(name):
'''Return True if column with specified name should be returned'''
if regex and not re.match(regex, name):
return False
if not virtual and name in self.virtual_columns:
return False
if not strings and self.is_string(name):
return False
if not hidden and name.startswith('__'):
return False
if dtype and (vaex.expression.Expression(self, name).dtype not in dtype):
return False
return True
if hidden and virtual and regex is None and len(dtype) == 0 and strings is True:
return list(self.column_names) # quick path
if not hidden and virtual and regex is None and len(dtype) == 0 and strings is True:
return [k for k in self.column_names if not k.startswith('__')] # also a quick path
return [name for name in self.column_names if column_filter(name)]
def __bool__(self):
return True # we are always true :) otherwise Python might call __len__, which can be expensive
[docs]
def __len__(self):
"""Returns the number of rows in the DataFrame (filtering applied)."""
if not self.filtered:
return self._length_unfiltered
else:
if self._cached_filtered_length is None:
self._cached_filtered_length = int(self.count())
return self._cached_filtered_length
[docs]
def selected_length(self):
"""Returns the number of rows that are selected."""
raise NotImplementedError
[docs]
def length_original(self):
"""the full length of the DataFrame, independent what active_fraction is, or filtering. This is the real length of the underlying ndarrays."""
return self._length_original
[docs]
def length_unfiltered(self):
"""The length of the arrays that should be considered (respecting active range), but without filtering."""
return self._length_unfiltered
def active_length(self):
return self._length_unfiltered
[docs]
def get_active_fraction(self):
"""Value in the range (0, 1], to work only with a subset of rows.
"""
return self._active_fraction
[docs]
def set_active_fraction(self, value):
"""Sets the active_fraction, set picked row to None, and remove selection.
TODO: we may be able to keep the selection, if we keep the expression, and also the picked row
"""
if value != self._active_fraction:
self._active_fraction = value
# self._fraction_length = int(self._length * self._active_fraction)
self.select(None)
self.set_current_row(None)
self._length_unfiltered = int(round(self._length_original * self._active_fraction))
self._cached_filtered_length = None
self._filter_filled = False
self._index_start = 0
self._index_end = self._length_unfiltered
self.signal_active_fraction_changed.emit(self, value)
def get_active_range(self):
return self._index_start, self._index_end
[docs]
def set_active_range(self, i1, i2):
"""Sets the active_fraction, set picked row to None, and remove selection.
TODO: we may be able to keep the selection, if we keep the expression, and also the picked row
"""
# logger.debug("set active range to: %r", (i1, i2))
self._active_fraction = (i2 - i1) / float(self.length_original())
# self._fraction_length = int(self._length * self._active_fraction)
self._index_start = i1
self._index_end = i2
self.select(None)
self.set_current_row(None)
self._length_unfiltered = i2 - i1
if self.filtered:
mask = self._selection_masks[FILTER_SELECTION_NAME]
if not mask.view(i1, i2).is_dirty():
self._cached_filtered_length = mask.view(i1, i2).count()
else:
self._cached_filtered_length = None
self._filter_filled = False
self.signal_active_fraction_changed.emit(self, self._active_fraction)
[docs]
@docsubst
def trim(self, inplace=False):
'''Return a DataFrame, where all columns are 'trimmed' by the active range.
For the returned DataFrame, df.get_active_range() returns (0, df.length_original()).
{note_copy}
:param inplace: {inplace}
:rtype: DataFrame
'''
df = self if inplace else self.copy()
if self._index_start == 0 and self._index_end == self._length_original:
return df
df.dataset = self.dataset[self._index_start:self._index_end]
if df.filtered:
# we're gonna copy the mask from our parent
parent_mask = self._selection_masks[FILTER_SELECTION_NAME].view(self._index_start, self._index_end)
mask = df._selection_masks[FILTER_SELECTION_NAME]
np.copyto(np.asarray(mask)[df._index_start : df._index_end], np.asarray(parent_mask))
selection = df.get_selection(FILTER_SELECTION_NAME)
if not mask.is_dirty():
df._cached_filtered_length = mask.count()
cache = df._selection_mask_caches[FILTER_SELECTION_NAME]
assert not cache
chunk_size = self.executor.chunk_size_for(mask.length)
for i in range(vaex.utils.div_ceil(mask.length, chunk_size)):
i1 = i * chunk_size
i2 = min(mask.length, (i + 1) * chunk_size)
key = (i1, i2)
sub_mask = mask.view(i1, i2)
sub_mask_array = np.asarray(sub_mask)
cache[key] = selection, sub_mask_array
else:
df._cached_filtered_length = None
df._filter_filled = False
return df
[docs]
@docsubst
def take(self, indices, filtered=True, dropfilter=True):
'''Returns a DataFrame containing only rows indexed by indices
{note_copy}
Example:
>>> import vaex, numpy as np
>>> df = vaex.from_arrays(s=np.array(['a', 'b', 'c', 'd']), x=np.arange(1,5))
>>> df.take([0,2])
# s x
0 a 1
1 c 3
:param indices: sequence (list or numpy array) with row numbers
:param filtered: (for internal use) The indices refer to the filtered data.
:param dropfilter: (for internal use) Drop the filter, set to False when
indices refer to unfiltered, but may contain rows that still need to be filtered out.
:return: DataFrame which is a shallow copy of the original data.
:rtype: DataFrame
'''
df_trimmed = self.trim()
df = df_trimmed.copy()
indices = np.asarray(indices)
if df.filtered and filtered:
# we translate the indices that refer to filters row indices to
# indices of the unfiltered row indices
df._fill_filter_mask()
max_index = indices.max()
mask = df._selection_masks[FILTER_SELECTION_NAME]
filtered_indices = mask.first(max_index+1)
indices = filtered_indices[indices]
df.dataset = df.dataset.take(indices)
if dropfilter:
# if the indices refer to the filtered rows, we can discard the
# filter in the final dataframe
df.set_selection(None, name=FILTER_SELECTION_NAME)
return df
def _push_down_filter(self):
'''Push the filter down the dataset layer'''
self._fill_filter_mask() # make sure the mask is filled
mask = self._selection_masks[FILTER_SELECTION_NAME]
mask = np.asarray(mask)
# indices = mask.first(len(self))
# assert len(indices) == len(self)
selection = self.get_selection(FILTER_SELECTION_NAME)
from .dataset import DatasetFiltered
self.set_selection(None, name=FILTER_SELECTION_NAME)
self.dataset = DatasetFiltered(self.dataset, mask, state=self.state_get(skip=[self.dataset]), selection=selection)
[docs]
@docsubst
def shuffle(self, random_state=None):
'''Shuffle order of rows (equivalent to df.sample(frac=1))
{note_copy}
Example:
>>> import vaex, numpy as np
>>> df = vaex.from_arrays(s=np.array(['a', 'b', 'c']), x=np.arange(1,4))
>>> df
# s x
0 a 1
1 b 2
2 c 3
>>> df.shuffle(random_state=42)
# s x
0 a 1
1 b 2
2 c 3
:param int or RandomState: {random_state}
:return: {return_shallow_copy}
:rtype: DataFrame
'''
return self.sample(frac=1, random_state=random_state)
[docs]
@docsubst
def sample(self, n=None, frac=None, replace=False, weights=None, random_state=None):
'''Returns a DataFrame with a random set of rows
{note_copy}
Provide either n or frac.
Example:
>>> import vaex, numpy as np
>>> df = vaex.from_arrays(s=np.array(['a', 'b', 'c', 'd']), x=np.arange(1,5))
>>> df
# s x
0 a 1
1 b 2
2 c 3
3 d 4
>>> df.sample(n=2, random_state=42) # 2 random rows, fixed seed
# s x
0 b 2
1 d 4
>>> df.sample(frac=1, random_state=42) # 'shuffling'
# s x
0 c 3
1 a 1
2 d 4
3 b 2
>>> df.sample(frac=1, replace=True, random_state=42) # useful for bootstrap (may contain repeated samples)
# s x
0 d 4
1 a 1
2 a 1
3 d 4
:param int n: number of samples to take (default 1 if frac is None)
:param float frac: fractional number of takes to take
:param bool replace: If true, a row may be drawn multiple times
:param str or expression weights: (unnormalized) probability that a row can be drawn
:param int or RandomState: {random_state}
:return: {return_shallow_copy}
:rtype: DataFrame
'''
self = self.extract()
if type(random_state) == int or random_state is None:
random_state = np.random.RandomState(seed=random_state)
if n is None and frac is None:
n = 1
elif frac is not None:
n = int(round(frac * len(self)))
weights_values = None
if weights is not None:
weights_values = self.evaluate(weights)
weights_values = weights_values / self.sum(weights)
indices = random_state.choice(len(self), n, replace=replace, p=weights_values)
return self.take(indices)
[docs]
@docsubst
@vaex.utils.gen_to_list
def split_random(self, into, random_state=None):
'''Returns a list containing random portions of the DataFrame.
{note_copy}
Example:
>>> import vaex, import numpy as np
>>> np.random.seed(111)
>>> df = vaex.from_arrays(x = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
>>> for dfs in df.split_random(into=0.3, random_state=42):
... print(dfs.x.values)
...
[8 1 5]
[0 7 2 9 4 3 6]
>>> for split in df.split_random(into=[0.2, 0.3, 0.5], random_state=42):
... print(dfs.x.values)
[8 1]
[5 0 7]
[2 9 4 3 6]
:param int/float/list into: If float will split the DataFrame in two, the first of which will have a relative length as specified by this parameter.
When a list, will split into as many portions as elements in the list, where each element defines the relative length of that portion. Note that such a list of fractions will always be re-normalized to 1.
When an int, split DataFrame into n dataframes of equal length (last one may deviate), if len(df) < n, it will return len(df) DataFrames.
:param int or RandomState: {random_state}
:return: A list of DataFrames.
:rtype: list
'''
self = self.extract()
if type(random_state) == int or random_state is None:
random_state = np.random.RandomState(seed=random_state)
indices = random_state.choice(len(self), len(self), replace=False)
return self.take(indices).split(into)
[docs]
@docsubst
@vaex.utils.gen_to_list
def split(self, into=None):
'''Returns a list containing ordered subsets of the DataFrame.
{note_copy}
Example:
>>> import vaex
>>> df = vaex.from_arrays(x = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
>>> for dfs in df.split(into=0.3):
... print(dfs.x.values)
...
[0 1 3]
[3 4 5 6 7 8 9]
>>> for split in df.split(into=[0.2, 0.3, 0.5]):
... print(dfs.x.values)
[0 1]
[2 3 4]
[5 6 7 8 9]
:param int/float/list into: If float will split the DataFrame in two, the first of which will have a relative length as specified by this parameter.
When a list, will split into as many portions as elements in the list, where each element defines the relative length of that portion. Note that such a list of fractions will always be re-normalized to 1.
When an int, split DataFrame into n dataframes of equal length (last one may deviate), if len(df) < n, it will return len(df) DataFrames.
'''
self = self.extract()
if isinstance(into, numbers.Integral):
step = max(1, vaex.utils.div_ceil(len(self), into))
i1 = 0
i2 = step
while i1 < len(self):
i2 = min(len(self), i2)
yield self[i1:i2]
i1, i2 = i2, i2 + step
return
if _issequence(into):
# make sure it is normalized
total = sum(into)
into = [k / total for k in into]
else:
assert into <= 1, "when float, `into` should be <= 1"
assert into > 0, "`into` must be > 0."
into = [into, 1 - into]
offsets = np.round(np.cumsum(into) * len(self)).astype(np.int64)
start = 0
for offset in offsets:
yield self[start:offset]
start = offset
[docs]
@docsubst
def sort(self, by, ascending=True):
'''Return a sorted DataFrame, sorted by the expression 'by'.
Both 'by' and 'ascending' arguments can be lists.
Note that missing/nan/NA values will always be pushed to the end, no matter the sorting order.
{note_copy}
{note_filter}
Example:
>>> import vaex, numpy as np
>>> df = vaex.from_arrays(s=np.array(['a', 'b', 'c', 'd']), x=np.arange(1,5))
>>> df['y'] = (df.x-1.8)**2
>>> df
# s x y
0 a 1 0.64
1 b 2 0.04
2 c 3 1.44
3 d 4 4.84
>>> df.sort('y', ascending=False) # Note: passing '(x-1.8)**2' gives the same result
# s x y
0 d 4 4.84
1 c 3 1.44
2 a 1 0.64
3 b 2 0.04
:param str or expression or list of str/expressions by: expression to sort by.
:param bool or list of bools ascending: ascending (default, True) or descending (False).
'''
if len(self) == 0:
return self.copy()
self = self.trim()
# Ensure "by" is in the proper format
by = vaex.utils._ensure_list(by)
by = vaex.utils._ensure_strings_from_expressions(by)
# Ensure "ascending is in the proper format"
if isinstance(ascending, list):
assert len(ascending) == len(by), 'If "ascending" is a list, it must have the same number of elements as "by".'
else:
ascending = vaex.utils._ensure_list(ascending) * len(by)
sort_keys = [(key, 'ascending') if order is True else (key, 'descending') for key, order in zip(by, ascending)]
pa_table = self[by].to_arrow_table()
indices = pa.compute.sort_indices(pa_table, sort_keys=sort_keys)
# if we don't cast to int64, we get uint64 scalars, which when adding numbers to will auto case to float (numpy)
indices = vaex.array_types.to_numpy(indices).astype('int64')
return self.take(indices)
[docs]
@docsubst
def diff(self, periods=1, column=None, fill_value=None, trim=False, inplace=False, reverse=False):
"""Calculate the difference between the current row and the row offset by periods
:param int periods: Which row to take the difference with
:param str or list[str] column: Column or list of columns to use (default is all).
:param fill_value: Value to use instead of missing values.
:param bool trim: Do not include rows that would otherwise have missing values
:param bool reverse: When true, calculate `row[periods] - row[current]`
:param inplace: {inplace}
"""
df = self.trim(inplace=inplace)
if column is None:
columns = self.get_column_names()
else:
if isinstance(column, (list, tuple)):
columns = column
else:
columns = [column]
originals = {}
for column in columns:
new_name = df._find_valid_name(f'__{column}_original')
df[new_name] = df[column]
originals[column] = new_name
df = df.shift(periods, columns, fill_value=fill_value, trim=trim, inplace=inplace)
for column in columns:
if reverse:
df[column] = df[column] - df[originals[column]]
else:
df[column] = df[originals[column]] - df[column]
return df
[docs]
@docsubst
def shift(self, periods, column=None, fill_value=None, trim=False, inplace=False):
"""Shift a column or multiple columns by `periods` amounts of rows.
:param int periods: Shift column forward (when positive) or backwards (when negative)
:param str or list[str] column: Column or list of columns to shift (default is all).
:param fill_value: Value to use instead of missing values.
:param bool trim: Do not include rows that would otherwise have missing values
:param inplace: {inplace}
"""
df = self.trim(inplace=inplace)
if df.filtered:
df._push_down_filter()
from .shift import DatasetShifted
# we want to shows these shifted
if column is not None:
columns = set(column) if _issequence(column) else {column}
else:
columns = set(df.get_column_names())
columns_all = set(df.get_column_names(hidden=True))
# these columns we do NOT want to shift, because we didn't ask it
# or because we depend on them (virtual column)
columns_keep = columns_all - columns
columns_keep |= df._depending_columns(columns_keep, check_filter=False) # TODO: remove filter check
columns_shift = columns.copy()
columns_shift |= df._depending_columns(columns)
virtual_columns = df.virtual_columns.copy()
# these are the columns we want to shift, but *also* want to keep the original
columns_conflict = columns_keep & columns_shift
column_shift_mapping = {}
# we use this dataframe for tracking virtual columns when renaming
df_shifted = df.copy()
shifted_names = {}
unshifted_names = {}
for name in columns_shift:
if name in columns_conflict:
# we want to have two columns, an unshifted and shifted
# rename the current to unshifted
unshifted_name = df.rename(name, f'__{name}_unshifted', unique=True)
unshifted_names[name] = unshifted_name
# now make a shifted one
shifted_name = f'__{name}_shifted'
shifted_name = vaex.utils.find_valid_name(shifted_name, used=df.get_column_names(hidden=True))
shifted_names[name] = shifted_name
if name not in virtual_columns:
# if not virtual, we let the dataset layer handle it
column_shift_mapping[unshifted_name] = shifted_name
df.column_names.append(shifted_name)
# otherwise we can later on copy the virtual columns from this df
df_shifted.rename(name, shifted_name)
else:
if name not in virtual_columns:
# easy case, just shift
column_shift_mapping[name] = name
# now that we renamed columns into _shifted/_unshifted we
# restore the dataframe with the real column names
for name in columns_shift:
if name in columns_conflict:
if name in virtual_columns:
if name in columns:
df.add_virtual_column(name, df_shifted.virtual_columns[shifted_names[name]])
else:
df.add_virtual_column(name, unshifted_names[name])
else:
if name in columns:
df.add_virtual_column(name, shifted_names[name])
else:
df.add_virtual_column(name, unshifted_names[name])
else:
if name in virtual_columns:
df.virtual_columns[name] = df_shifted.virtual_columns[name]
df._virtual_expressions[name] = Expression(df, df.virtual_columns[name])
if _issequence(periods):
if len(periods) != 2:
raise ValueError(f'periods should be a int or a tuple of ints, not {periods}')
start, end = periods
else:
start = end = periods
dataset = DatasetShifted(original=df.dataset, start=start, end=end, column_mapping=column_shift_mapping, fill_value=fill_value)
if trim:
# assert start == end
slice_start = 0
slice_end = dataset.row_count
if start > 0:
slice_start = start
elif start < 0:
slice_end = dataset.row_count + start
if end != start:
if end > start:
slice_end -= end -1
dataset = dataset.slice(slice_start, slice_end)
df.dataset = dataset
for name in df.dataset:
assert name in df.column_names, f"oops, {name} in dataset, but not in column_names"
for name in df.column_names:
if name not in df.dataset:
assert name in df.virtual_columns
return df
[docs]
@docsubst
def fillna(self, value, column_names=None, prefix='__original_', inplace=False):
'''Return a DataFrame, where missing values/NaN are filled with 'value'.
The original columns will be renamed, and by default they will be hidden columns. No data is lost.
{note_copy}
{note_filter}
Example:
>>> import vaex
>>> import numpy as np
>>> x = np.array([3, 1, np.nan, 10, np.nan])
>>> df = vaex.from_arrays(x=x)
>>> df_filled = df.fillna(value=-1, column_names=['x'])
>>> df_filled
# x
0 3
1 1
2 -1
3 10
4 -1
:param float value: The value to use for filling nan or masked values.
:param bool fill_na: If True, fill np.nan values with `value`.
:param bool fill_masked: If True, fill masked values with `values`.
:param list column_names: List of column names in which to fill missing values.
:param str prefix: The prefix to give the original columns.
:param inplace: {inplace}
'''
df = self.trim(inplace=inplace)
column_names = column_names or list(self)
for name in column_names:
column = df.columns.get(name)
df[name] = df.func.fillna(df[name], value)
return df
[docs]
@docsubst
def materialize(self, column=None, inplace=False, virtual_column=None):
'''Turn columns into native CPU format for optimal performance at cost of memory.
.. warning:: This may use of lot of memory, be mindfull.
Virtual columns will be evaluated immediately, and all real columns will be
cached in memory when used for the first time.
Example for virtual column:
>>> x = np.arange(1,4)
>>> y = np.arange(2,5)
>>> df = vaex.from_arrays(x=x, y=y)
>>> df['r'] = (df.x**2 + df.y**2)**0.5 # 'r' is a virtual column (computed on the fly)
>>> df = df.materialize('r') # now 'r' is a 'real' column (i.e. a numpy array)
Example with parquet file
>>> df = vaex.open('somewhatslow.parquet')
>>> df.x.sum() # slow
>>> df = df.materialize()
>>> df.x.sum() # slow, but will fill the cache
>>> df.x.sum() # as fast as possible, will use memory
:param column: string or list of strings with column names to materialize, all columns when None
:param inplace: {inplace}
:param virtual_column: for backward compatibility
'''
if virtual_column is not None:
warnings.warn("virtual_column argument is deprecated, please use column")
column = virtual_column
df = self.trim(inplace=inplace)
if column is None:
columns = df.get_column_names(hidden=True)
else:
columns = _ensure_strings_from_expressions(column)
columns = _ensure_list(columns)
virtual = []
cache = []
for column in columns:
if column in self.dataset:
cache.append(column)
elif column in self.virtual_columns:
virtual.append(column)
else:
raise NameError(f'{column} is not a column or virtual column')
dataset = df._dataset
if cache:
dataset = vaex.dataset.DatasetCached(dataset, cache)
if virtual:
arrays = df.evaluate(virtual, filtered=False)
materialized = vaex.dataset.DatasetArrays(dict(zip(virtual, arrays)))
dataset = dataset.merged(materialized)
df.dataset = dataset
for name in virtual:
del df.virtual_columns[name]
else:
# in this case we don't need to invalidate caches,
# also the fingerprint will be the same
df._dataset = dataset
return df
def _lazy_materialize(self, *virtual_columns):
'''Returns a new DataFrame where the virtual column is turned into an lazily evaluated column.'''
df = self.trim()
virtual_columns = _ensure_strings_from_expressions(virtual_columns)
for name in virtual_columns:
if name not in df.virtual_columns:
raise KeyError('Virtual column not found: %r' % name)
column = ColumnConcatenatedLazy([self[name]])
del df[name]
df.add_column(name, column)
return df
[docs]
def get_selection(self, name="default"):
"""Get the current selection object (mostly for internal use atm)."""
name = _normalize_selection_name(name)
selection_history = self.selection_histories[name]
index = self.selection_history_indices[name]
if index == -1:
return None
else:
return selection_history[index]
[docs]
def selection_undo(self, name="default", executor=None):
"""Undo selection, for the name."""
logger.debug("undo")
executor = executor or self.executor
assert self.selection_can_undo(name=name)
selection_history = self.selection_histories[name]
index = self.selection_history_indices[name]
self.selection_history_indices[name] -= 1
self.signal_selection_changed.emit(self, name)
logger.debug("undo: selection history is %r, index is %r", selection_history, self.selection_history_indices[name])
[docs]
def selection_redo(self, name="default", executor=None):
"""Redo selection, for the name."""
logger.debug("redo")
executor = executor or self.executor
assert self.selection_can_redo(name=name)
selection_history = self.selection_histories[name]
index = self.selection_history_indices[name]
next = selection_history[index + 1]
self.selection_history_indices[name] += 1
self.signal_selection_changed.emit(self, name)
logger.debug("redo: selection history is %r, index is %r", selection_history, index)
[docs]
def selection_can_undo(self, name="default"):
"""Can selection name be undone?"""
return self.selection_history_indices[name] > -1
[docs]
def selection_can_redo(self, name="default"):
"""Can selection name be redone?"""
return (self.selection_history_indices[name] + 1) < len(self.selection_histories[name])
[docs]
def select(self, boolean_expression, mode="replace", name="default", executor=None):
"""Perform a selection, defined by the boolean expression, and combined with the previous selection using the given mode.
Selections are recorded in a history tree, per name, undo/redo can be done for them separately.
:param str boolean_expression: Any valid column expression, with comparison operators
:param str mode: Possible boolean operator: replace/and/or/xor/subtract
:param str name: history tree or selection 'slot' to use
:param executor:
:return:
"""
boolean_expression = _ensure_string_from_expression(boolean_expression)
if boolean_expression is None and not self.has_selection(name=name):
pass # we don't want to pollute the history with many None selections
self.signal_selection_changed.emit(self, name) # TODO: unittest want to know, does this make sense?
else:
def create(current):
return selections.SelectionExpression(boolean_expression, current, mode) if boolean_expression else None
self._selection(create, name)
[docs]
def select_non_missing(self, drop_nan=True, drop_masked=True, column_names=None, mode="replace", name="default"):
"""Create a selection that selects rows having non missing values for all columns in column_names.
The name reflects Pandas, no rows are really dropped, but a mask is kept to keep track of the selection
:param drop_nan: drop rows when there is a NaN in any of the columns (will only affect float values)
:param drop_masked: drop rows when there is a masked value in any of the columns
:param column_names: The columns to consider, default: all (real, non-virtual) columns
:param str mode: Possible boolean operator: replace/and/or/xor/subtract
:param str name: history tree or selection 'slot' to use
:return:
"""
column_names = column_names or self.get_column_names(virtual=False)
def create(current):
return selections.SelectionDropNa(drop_nan, drop_masked, [str(self[k]) for k in column_names], current, mode)
self._selection(create, name)
[docs]
def dropmissing(self, column_names=None, how="any"):
"""Create a shallow copy of a DataFrame, with filtering set using ismissing.
:param column_names: The columns to consider, default: all (real, non-virtual) columns
:param str how: One of ("any", "all").
If "any", then drop rows where any of the columns are missing.
If "all", then drop rows where all of the columns are missing.
:rtype: DataFrame
"""
return self._filter_all(self.func.ismissing, column_names, how=how)
[docs]
def dropnan(self, column_names=None, how="any"):
"""Create a shallow copy of a DataFrame, with filtering set using isnan.
:param column_names: The columns to consider, default: all (real, non-virtual) columns
:param str how: One of ("any", "all").
If "any", then drop rows where any of the columns are nan.
If "all", then drop rows where all of the columns are nan.
:rtype: DataFrame
"""
return self._filter_all(self.func.isnan, column_names, how=how)
[docs]
def dropna(self, column_names=None, how="any"):
"""Create a shallow copy of a DataFrame, with filtering set using isna.
:param column_names: The columns to consider, default: all (real, non-virtual) columns
:param str how: One of ("any", "all").
If "any", then drop rows where any of the columns are na.
If "all", then drop rows where all of the columns are na.
:rtype: DataFrame
"""
return self._filter_all(self.func.isna, column_names, how=how)
[docs]
def dropinf(self, column_names=None, how="any"):
""" Create a shallow copy of a DataFrame, with filtering set using isinf.
:param column_names: The columns to consider, default: all (real, non-virtual) columns
:param str how: One of ("any", "all").
If "any", then drop rows where any of the columns are inf.
If "all", then drop rows where all of the columns are inf.
:rtype: DataFrame
"""
return self._filter_all(self.func.isinf, column_names, how=how)
def _filter_all(self, f, column_names=None, how="any"):
if column_names is None:
column_names = self.get_column_names(virtual=False)
if how not in ("any", "all"):
raise ValueError("`how` must be either 'any' or 'all'")
expression = f(self[column_names[0]])
for column in column_names[1:]:
if how == "any":
expression = expression | f(self[column])
else:
expression = expression & f(self[column])
return self.filter(~expression, mode='and')
[docs]
def select_nothing(self, name="default"):
"""Select nothing."""
logger.debug("selecting nothing")
self.select(None, name=name)
self.signal_selection_changed.emit(self, name)
[docs]
def select_rectangle(self, x, y, limits, mode="replace", name="default"):
"""Select a 2d rectangular box in the space given by x and y, bounded by limits.
Example:
>>> df.select_box('x', 'y', [(0, 10), (0, 1)])
:param x: expression for the x space
:param y: expression fo the y space
:param limits: sequence of shape [(x1, x2), (y1, y2)]
:param mode:
"""
self.select_box([x, y], limits, mode=mode, name=name)
[docs]
def select_box(self, spaces, limits, mode="replace", name="default"):
"""Select a n-dimensional rectangular box bounded by limits.
The following examples are equivalent:
>>> df.select_box(['x', 'y'], [(0, 10), (0, 1)])
>>> df.select_rectangle('x', 'y', [(0, 10), (0, 1)])
:param spaces: list of expressions
:param limits: sequence of shape [(x1, x2), (y1, y2)]
:param mode:
:param name:
:return:
"""
sorted_limits = [(min(l), max(l)) for l in limits]
expressions = ["((%s) >= %f) & ((%s) <= %f)" % (expression, lmin, expression, lmax) for
(expression, (lmin, lmax)) in zip(spaces, sorted_limits)]
self.select("&".join(expressions), mode=mode, name=name)
[docs]
def select_circle(self, x, y, xc, yc, r, mode="replace", name="default", inclusive=True):
"""
Select a circular region centred on xc, yc, with a radius of r.
Example:
>>> df.select_circle('x','y',2,3,1)
:param x: expression for the x space
:param y: expression for the y space
:param xc: location of the centre of the circle in x
:param yc: location of the centre of the circle in y
:param r: the radius of the circle
:param name: name of the selection
:param mode:
:return:
"""
# expr = "({x}-{xc})**2 + ({y}-{yc})**2 <={r}**2".format(**locals())
if inclusive:
expr = (self[x] - xc)**2 + (self[y] - yc)**2 <= r**2
else:
expr = (self[x] - xc)**2 + (self[y] - yc)**2 < r**2
self.select(boolean_expression=expr, mode=mode, name=name)
[docs]
def select_ellipse(self, x, y, xc, yc, width, height, angle=0, mode="replace", name="default", radians=False, inclusive=True):
"""
Select an elliptical region centred on xc, yc, with a certain width, height
and angle.
Example:
>>> df.select_ellipse('x','y', 2, -1, 5,1, 30, name='my_ellipse')
:param x: expression for the x space
:param y: expression for the y space
:param xc: location of the centre of the ellipse in x
:param yc: location of the centre of the ellipse in y
:param width: the width of the ellipse (diameter)
:param height: the width of the ellipse (diameter)
:param angle: (degrees) orientation of the ellipse, counter-clockwise
measured from the y axis
:param name: name of the selection
:param mode:
:return:
"""
# Computing the properties of the ellipse prior to selection
if radians:
pass
else:
alpha = np.deg2rad(angle)
xr = width / 2
yr = height / 2
r = max(xr, yr)
a = xr / r
b = yr / r
expr = "(({x}-{xc})*cos({alpha})+({y}-{yc})*sin({alpha}))**2/{a}**2 + (({x}-{xc})*sin({alpha})-({y}-{yc})*cos({alpha}))**2/{b}**2 <= {r}**2".format(**locals())
if inclusive:
expr = ((self[x] - xc) * np.cos(alpha) + (self[y] - yc) * np.sin(alpha))**2 / a**2 + ((self[x] - xc) * np.sin(alpha) - (self[y] - yc) * np.cos(alpha))**2 / b**2 <= r**2
else:
expr = ((self[x] - xc) * np.cos(alpha) + (self[y] - yc) * np.sin(alpha))**2 / a**2 + ((self[x] - xc) * np.sin(alpha) - (self[y] - yc) * np.cos(alpha))**2 / b**2 < r**2
self.select(boolean_expression=expr, mode=mode, name=name)
[docs]
def select_lasso(self, expression_x, expression_y, xsequence, ysequence, mode="replace", name="default", executor=None):
"""For performance reasons, a lasso selection is handled differently.
:param str expression_x: Name/expression for the x coordinate
:param str expression_y: Name/expression for the y coordinate
:param xsequence: list of x numbers defining the lasso, together with y
:param ysequence:
:param str mode: Possible boolean operator: replace/and/or/xor/subtract
:param str name:
:param executor:
:return:
"""
def create(current):
return selections.SelectionLasso(expression_x, expression_y, xsequence, ysequence, current, mode)
self._selection(create, name, executor=executor)
[docs]
def select_inverse(self, name="default", executor=None):
"""Invert the selection, i.e. what is selected will not be, and vice versa
:param str name:
:param executor:
:return:
"""
def create(current):
return selections.SelectionInvert(current)
self._selection(create, name, executor=executor)
[docs]
def set_selection(self, selection, name="default", executor=None):
"""Sets the selection object
:param selection: Selection object
:param name: selection 'slot'
:param executor:
:return:
"""
def create(current):
return selection
self._selection(create, name, executor=executor, execute_fully=True)
def _selection(self, create_selection, name, executor=None, execute_fully=False):
"""select_lasso and select almost share the same code"""
selection_history = self.selection_histories[name]
previous_index = self.selection_history_indices[name]
current = selection_history[previous_index] if selection_history else None
selection = create_selection(current)
executor = executor or self.executor
selection_history.append(selection)
self.selection_history_indices[name] += 1
# clip any redo history
del selection_history[self.selection_history_indices[name]:-1]
self.signal_selection_changed.emit(self, name)
result = vaex.promise.Promise.fulfilled(None)
# logger.debug("select selection history is %r, index is %r", selection_history, self.selection_history_indices[name])
return result
[docs]
def has_selection(self, name="default"):
"""Returns True if there is a selection with the given name."""
return self.get_selection(name) is not None
[docs]
def __setitem__(self, name, value):
'''Convenient way to add a virtual column / expression to this DataFrame.
Example:
>>> import vaex, numpy as np
>>> df = vaex.example()
>>> df['r'] = np.sqrt(df.x**2 + df.y**2 + df.z**2)
>>> df.r
<vaex.expression.Expression(expressions='r')> instance at 0x121687e80 values=[2.9655450396553587, 5.77829281049018, 6.99079603950256, 9.431842752707537, 0.8825613121347967 ... (total 330000 values) ... 7.453831761514681, 15.398412491068198, 8.864250273925633, 17.601047186042507, 14.540181524970293]
'''
if isinstance(name, six.string_types):
if isinstance(value, supported_column_types):
self.add_column(name, value)
else:
self.add_virtual_column(name, value)
else:
raise TypeError('__setitem__ only takes strings as arguments, not {}'.format(type(name)))
[docs]
def drop_filter(self, inplace=False):
"""Removes all filters from the DataFrame"""
df = self if inplace else self.copy()
df.select_nothing(name=FILTER_SELECTION_NAME)
df._invalidate_caches()
return df
[docs]
def filter(self, expression, mode="and"):
"""General version of df[<boolean expression>] to modify the filter applied to the DataFrame.
See :func:`DataFrame.select` for usage of selection.
Note that using `df = df[<boolean expression>]`, one can only narrow the filter (i.e. only less rows
can be selected). Using the filter method, and a different boolean mode (e.g. "or") one can actually
cause more rows to be selected. This differs greatly from numpy and pandas for instance, which can only
narrow the filter.
Example:
>>> import vaex
>>> import numpy as np
>>> x = np.arange(10)
>>> df = vaex.from_arrays(x=x, y=x**2)
>>> df
# x y
0 0 0
1 1 1
2 2 4
3 3 9
4 4 16
5 5 25
6 6 36
7 7 49
8 8 64
9 9 81
>>> dff = df[df.x<=2]
>>> dff
# x y
0 0 0
1 1 1
2 2 4
>>> dff = dff.filter(dff.x >=7, mode="or")
>>> dff
# x y
0 0 0
1 1 1
2 2 4
3 7 49
4 8 64
5 9 81
"""
df = self.copy()
df.select(expression, name=FILTER_SELECTION_NAME, mode=mode)
df._cached_filtered_length = None # invalide cached length
df._filter_filled = False
# WARNING: this is a special case where we create a new filter
# the cache mask chunks still hold references to views on the old
# mask, and this new mask will be filled when required
df._selection_masks[FILTER_SELECTION_NAME] = vaex.superutils.Mask(int(df._length_unfiltered))
return df
[docs]
def __getitem__(self, item):
"""Convenient way to get expressions, (shallow) copies of a few columns, or to apply filtering.
Example:
>>> df['Lz'] # the expression 'Lz
>>> df['Lz/2'] # the expression 'Lz/2'
>>> df[["Lz", "E"]] # a shallow copy with just two columns
>>> df[df.Lz < 0] # a shallow copy with the filter Lz < 0 applied
"""
if isinstance(item, int):
names = self.get_column_names()
item = item % len(self)
return [self.evaluate(name, item, item+1, array_type='python')[0] for name in names]
elif isinstance(item, six.string_types):
if hasattr(self, item) and isinstance(getattr(self, item), Expression):
return getattr(self, item)
# if item in self.virtual_columns:
# return Expression(self, self.virtual_columns[item])
# if item in self._virtual_expressions:
# return self._virtual_expressions[item]
if item not in self.column_names:
self.validate_expression(item)
item = vaex.utils.valid_expression(self.get_column_names(), item)
return Expression(self, item) # TODO we'd like to return the same expression if possible
elif isinstance(item, Expression):
expression = item.expression
return self.filter(expression)
elif isinstance(item, (tuple, list)):
df = self
if isinstance(item[0], slice):
df = df[item[0]]
if len(item) > 1:
if isinstance(item[1], int):
name = self.get_column_names()[item[1]]
return df[name]
elif isinstance(item[1], slice):
names = self.get_column_names().__getitem__(item[1])
return df[names]
for expression in item:
if expression not in self.column_names:
self.validate_expression(expression)
df = self.copy(column_names=item)
return df
elif isinstance(item, slice):
start, stop, step = item.start, item.stop, item.step
start = start or 0
stop = stop or len(self)
if start < 0:
start = len(self)+start
if stop < 0:
stop = len(self)+stop
stop = min(stop, len(self))
if start >= stop: # empty slice
df = self.trim()
df.set_active_range(start, max(start, stop))
return df.trim()
assert step in [None, 1]
if self.filtered:
self._fill_filter_mask()
mask = self._selection_masks[FILTER_SELECTION_NAME]
startf, stopf = mask.indices(start, stop-1) # -1 since it is inclusive
assert startf != -1
assert stopf != -1
stopf = stopf+1 # +1 to make it inclusive
start, stop = startf, stopf
df = self.trim()
df.set_active_range(start, stop)
return df.trim()
[docs]
def __delitem__(self, item):
'''Alias of df.drop(item, inplace=True)'''
if item in self.columns:
name = item
if name in self._depending_columns(columns_exclude=[name]):
raise ValueError(f'Oops, you are trying to remove column {name} while other columns depend on it (use .drop instead)')
self.drop([item], inplace=True)
def _real_drop(self, item):
'''Removes a (virtual) column from the DataFrame.
Note: this does not check if the column is used in a virtual expression or in the filter\
and may lead to issues. It is safer to use :meth:`drop`.
'''
if isinstance(item, Expression):
name = item.expression
else:
name = item
if name in self.columns:
del self.columns[name]
self.column_names.remove(name)
elif name in self.virtual_columns:
del self.virtual_columns[name]
del self._virtual_expressions[name]
self.column_names.remove(name)
else:
matches = difflib.get_close_matches(name, self.get_column_names(hidden=True))
msg = "Column or variable %r does not exist." % name
if matches:
msg += ' Did you mean: ' + " or ".join(map(repr, matches))
raise KeyError(msg)
self.signal_column_changed.emit(self, name, "delete")
if hasattr(self, name):
try:
if isinstance(getattr(self, name), Expression):
delattr(self, name)
except:
pass
[docs]
@docsubst
def drop(self, columns, inplace=False, check=True):
"""Drop columns (or a single column).
:param columns: List of columns or a single column name
:param inplace: {inplace}
:param check: When true, it will check if the column is used in virtual columns or the filter, and hide it instead.
"""
columns = _ensure_list(columns)
columns = _ensure_strings_from_expressions(columns)
df = self if inplace else self.copy()
depending_columns = df._depending_columns(columns_exclude=columns)
for column in columns:
if check and column in depending_columns:
df._hide_column(column)
else:
df._real_drop(column)
return df
def _hide_column(self, column):
'''Hides a column by prefixing the name with \'__\''''
column = _ensure_string_from_expression(column)
new_name = self._find_valid_name('__' + column)
self._rename(column, new_name)
return new_name
def _find_valid_name(self, initial_name):
'''Finds a non-colliding name by optional postfixing'''
return vaex.utils.find_valid_name(initial_name, used=self.get_column_names(hidden=True))
def _depending_columns(self, columns=None, columns_exclude=None, check_filter=True):
'''Find all depending column for a set of column (default all), minus the excluded ones'''
columns = set(columns or self.get_column_names(hidden=True))
if columns_exclude:
columns -= set(columns_exclude)
depending_columns = set()
for column in columns:
expression = self[str(column)]
depending_columns |= expression.variables()
depending_columns -= set(columns)
if check_filter:
if self.filtered:
selection = self.get_selection(FILTER_SELECTION_NAME)
depending_columns |= selection._depending_columns(self)
return depending_columns
def iterrows(self):
columns = self.get_column_names()
for i in range(len(self)):
yield i, {key: self.evaluate(key, i, i+1, array_type='python')[0] for key in columns}
#return self[i]
[docs]
def __iter__(self):
"""Iterator over the column names."""
return iter(list(self.get_column_names()))
def _root_nodes(self):
"""Returns a list of string which are the virtual columns that are not used in any other virtual column."""
# these lists (~used as ordered set) keep track of leafes and root nodes
# root nodes
root_nodes = []
leafes = []
def walk(node):
# this function recursively walks the expression graph
if isinstance(node, six.string_types):
# we end up at a leaf
leafes.append(node)
if node in root_nodes: # so it cannot be a root node
root_nodes.remove(node)
else:
node_repr, fname, fobj, deps = node
if node_repr in self.virtual_columns:
# we encountered a virtual column, similar behaviour as leaf
leafes.append(node_repr)
if node_repr in root_nodes:
root_nodes.remove(node_repr)
# resursive part
for dep in deps:
walk(dep)
for column in self.virtual_columns.keys():
if column not in leafes:
root_nodes.append(column)
node = self[column]._graph()
# we don't do the virtual column itself, just it's depedencies
node_repr, fname, fobj, deps = node
for dep in deps:
walk(dep)
return root_nodes
def _graphviz(self, dot=None):
"""Return a graphviz.Digraph object with a graph of all virtual columns"""
from graphviz import Digraph
dot = dot or Digraph(comment='whole dataframe')
root_nodes = self._root_nodes()
for column in root_nodes:
self[column]._graphviz(dot=dot)
return dot
@docsubst
@stat_1d
def _agg(self, aggregator, binners=tuple(), delay=False, progress=None):
"""
:param delay: {delay}
:return: {return_stat_scalar}
"""
tasks, result = aggregator.add_tasks(self, binners, progress=progress)
return self._delay(delay, result)
def _binner(self, expression, limits=None, shape=None, selection=None, progress=None, delay=False):
expression = str(expression)
if limits is not None and not isinstance(limits, (tuple, str)):
limits = tuple(limits)
if expression in self._categories:
N = self._categories[expression]['N']
min_value = self._categories[expression]['min_value']
binner = self._binner_ordinal(expression, N, min_value)
binner = vaex.promise.Promise.fulfilled(binner)
else:
@delayed
def create_binner(limits):
return self._binner_scalar(expression, limits, shape)
binner = create_binner(self.limits(expression, limits, selection=selection, progress=progress, delay=True))
return self._delay(delay, binner)
def _binner_scalar(self, expression, limits, shape):
dtype = self.data_type(expression)
return BinnerScalar(expression, limits[0], limits[1], shape, dtype)
def _binner_ordinal(self, expression, ordinal_count, min_value=0, invert=False):
dtype = self.data_type(expression)
return BinnerOrdinal(expression, min_value, ordinal_count, invert, dtype)
def _binner_hash(self, expression, hash_map_unique):
dtype = self.data_type(expression)
return BinnerHash(expression, hash_map_unique, dtype)
def _create_binners(self, binby, limits, shape, selection=None, progress=None, delay=False):
if isinstance(binby, (list, tuple)):
binbys = binby
else:
binbys = [binby]
binbys = _ensure_strings_from_expressions(binbys)
for expression in binbys:
if expression:
self.validate_expression(expression)
binners = []
if len(binbys):
limits = _expand_limits(limits, len(binbys))
else:
limits = []
shapes = _expand_shape(shape, len(binbys))
for binby, limits1, shape in zip(binbys, limits, shapes):
binners.append(self._binner(binby, limits1, shape, selection, progress=progress, delay=True))
@delayed
def finish(*binners):
return binners
return self._delay(delay, finish(*binners))
[docs]
@docsubst
def rolling(self, window, trim=False, column=None, fill_value=None, edge="right"):
'''Create a :py:data:`vaex.rolling.Rolling` rolling window object
:param int window: Size of the rolling window.
:param bool trim: {trim}
:param str or list[str] column: Column name or column names of columns affected (None for all)
:param any fill_value: Scalar value to use for data outside of existing rows.
:param str edge: Where the edge of the rolling window is for the current row.
'''
columns = self.get_column_names() if column is None else (column if _issequence(column) else [column])
from .rolling import Rolling
return Rolling(self, window, trim=trim