maarten
  • Maarten Breddels
6 min read
A Hybrid Apache Arrow/Numpy DataFrame with Vaex version 4.0

The Vaex DataFrame has always been very fast. Built from the ground up to be out of core (the size of your disk is the limit), it pushes the limits of what single machines can do in the context of big data analysis. Starting from version 2, we added better support for string data, giving an almost 1000x speedup compared to Pandas at the time. To support this seemingly trivial datatype, we had to choose a disk and memory format and did not want to reinvent the wheel. Apache Arrow was an obvious choice but did not meet the requirements at that time. However, we still added string support in Vaex, but in a future compatible way so that when the time arrives (now!), we can adopt Apache Arrow without rendering data from the past obsolete, or requiring data conversions. For compatibility with Apache Arrow, we developed the vaex-arrow package, which made interoperability with Vaex smooth, at the cost of a possible memory copy here and there.

Apache Arrow

With Apache Arrow version 3.0 the time has come to integrate Arrow support into the core of Vaex (the Python package vaex-core), deprecating the vaex-arrow package. While all versions of Vaex support the same string data on disk (either in HDF5 or Apache Arrow format), what is different in version 4.0 of Vaex, is that we now pass these around as Arrow arrays.

import vaex
df = vaex.from_arrays(library=['Vaex', 'NumPy', 'Apache Arrow'])
print(df)
##  #  library
##  0  'Vaex'
##  1  'NumPy'
##  2  'Apache Arrow'
print(repr(df['library'].values))
##<pyarrow.lib.StringArray object at 0x7f631c1a1360>
##[
##  "Vaex",
##  "NumPy",
##  "Apache Arrow"
##]

You can mix NumPy

import vaex
import numpy as np
import pyarrow as pa

x = np.arange(4)
y = pa.array([42, 12, 144, 1024])

df = vaex.from_arrays(x=x, y=y)
df['y'] = df.x * df.y
print(repr(df.y))
##Expression = y
##Length: 4 dtype: object (column)
##--------------------------------
##0     0
##1    12
##2   288
##3  3072

And while by default, Arrow takes a Pandas-like approach of converting missing values to NaN when mixing with NumPy, in Vaex we ensure your missing values stay missing values, and your arrays do not get upcasted to floats because of that.

import vaex
import numpy as np
import pyarrow as pa

x = np.ma.array(np.arange(4), mask=[0, 1, 0, 0], dtype='i4')
y = pa.array([42, 12, 144, None], type=pa.int32())

# Now we get NaN (not a number)
print(x*y)
##[0.0 -- 288.0 nan]

df = vaex.from_arrays(x=x, y=y)
df['y'] = df.x * df.y
# vaex propagates missing values
print(repr(df.y))
##Expression = y
##Length: 4 dtype: int64 (column)
##-------------------------------
##0    0
##1   --
##2  288
##3   --

Not only have we adopted Apache Arrow in the core library of Vaex, but we are also moving the fast string algorithms from Vaex to arrow.compute. As a result we have not only interoperability of data, but also the same semantics for operations on string data. This means that Vaex, Pandas, Dask DataFrame can all operate on the same data, in the same way. This work is sponsored by the Chan Zuckerberg Initiative, and we thank Tom Augspurger for organizing this.

List support

While support for numerical and string data gets most of the work done, Apache Arrow has support for much more interesting data structures. For instance, splitting a string results in a list-of-strings. This data structure is ideal in memory layout for high performance (contiguous layout versus many small buffers) and still allows us to operate on it like a regular string array. This means we can split/tokenize strings, and apply operations like strip on it without having to go through join and split each time:

import vaex
df = vaex.from_arrays(text=['So, can you split this?', 'And this.', None])
df.text.str.split(" ")
##Expression = str_split(text, ' ')
##Length: 3 dtype: list<item: string> (expression)
##------------------------------------------------
##0  ['So,', 'can', 'you', 'split', 'this?']
##1                         ['And', 'this.']
##2                                       --
# apply string operations to each string in the list
df.text.str.split(" ").str.strip(' ,?.'))
##Expression = str_strip(str_split(text, ' '), ' ,?.')
##Length: 3 dtype: list<item: string> (expression)
##------------------------------------------------
##0  ['So', 'can', 'you', 'split', 'this']
##1                        ['And', 'this']
##2                                     --

String splitting can even be done multiple times creating a nested list without any performance loss.

df.text.str.split(" ").str.strip(' ,?.').str.split('a')
##Expression = str_split(str_strip(str_split(text, ' '), ' ,?.'), 'a')
##Length: 3 dtype: list<item: list<item: string>> (expression)
##------------------------------------------------------------
##0  [['So'], ['c', 'n'], ['you'], ['split'], ['this']]
##1                                 [['And'], ['this']]
##2                                                  --

Why hybrid?

Apache Arrow will bring a lot of good to the whole data analytics world, and not only to Python. It's growing fast, and moving fast, and we believe will be the future of analytics in Python, supplementing or substituting NumPy in many areas. However, adoption will take time, and most people are probably more comfortable seeing NumPy arrays. Therefore a Vaex version 4 a DataFrame can hold both NumPy arrays and Apache Arrow arrays to make the transition period easier. If the data comes from an HDF5 file or from external NumPy arrays we keep it as a NumPy array, except for strings. If the data comes from an Arrow file or external Arrow arrays we keep them as Arrow arrays.

If you happen to have an Arrow file, but prefer to work with NumPy arrays (because of the computational semantics) you can lazily convert all data, with the exception of string data.

Parquet support

With Apache Arrow comes out of core Parquet support, a very much requested feature. Although previous versions of vaex-arrow provided some Parquet support, we now support lazy loading, allowing you to scan through parquet datasets larger than memory. Although Parquet will come at the cost of some CPU, its size reduction makes it an attractive format for cloud storage, where network bandwidth is often a limiting factor.

An additional benefit of using Apache Arrow is the support for reading partitioned parquet files. This is quite handy since files generated from the Spark/Hadoop world often come in this format, making ingestion from these platforms easier.

Writing partitioned files

We also support creating partitioned files, it may not be the most performant part of vaex, but we use it for testing, and thought it would be good to share.

import vaex
countries = ['US', 'US', 'NL', 'FR', 'NL', 'NL']
years = [2020, 2021, 2020, 2020, 2019, 2020]
values = [1, 2, 3, 4, 5, 6]
df = vaex.from_dict({
    'country': countries,
    'year': years,
    'value': values,
})
df.export_partitioned('./partitioned', by=['country', 'year'])
##
## $ ls -R ./partitioned/
##./partitioned/:
##'country=FR'  'country=NL'  'country=US'
##
##'./partitioned/country=FR':
##'year=2020'
##
##'./partitioned/country=FR/year=2020':
##db5f59db-f207-4dca-8999-e405436c3a88.parquet
##
##'./partitioned/country=NL':
##'year=2019'  'year=2020'
##
##'./partitioned/country=NL/year=2019':
##ee1b5558-e1c7-4d49-a899-a0b6c29ace00.parquet
##
##'./partitioned/country=NL/year=2020':
##3908d92f-d280-4c83-9e51-bead619d470b.parquet
##
##'./partitioned/country=US':
##'year=2020'  'year=2021'
##
##'./partitioned/country=US/year=2020':
##8be82725-c46d-40ad-b2e4-bd3962469394.parquet
##
##'./partitioned/country=US/year=2021':
##f4743b01-9783-4ef1-8bff-e892c050a50a.parquet

Reading partitioned files

If the partitioning is in Hive format, you are good to go, just open the directory:

import vaex
df = vaex.open('./partitioned/')
print(df)
##  #    value  country      year
##  0        4  'FR'         2020
##  1        5  'NL'         2019
##  2        3  'NL'         2020
##  3        6  'NL'         2020
##  4        1  'US'         2020
##  5        2  'US'         2021

For so called 'directory' format, you have to give the partition names manually:

import vaex
df.export_partitioned('./partitioned_directory/', by=['country', 'year'], directory_format='{value}')
vaex.open('./partitioned_directory/', partitioning=['country', 'year'])
print(df)
##  #    value  country      year
##  0        4  'FR'         2020
##  1        5  'NL'         2019
##  2        3  'NL'         2020
##  3        6  'NL'         2020
##  4        1  'US'         2020
##  5        2  'US'         2021

Pickle support

In version 4.0, Vaex supports pickling of DataFrames which makes integration with Dask, Ray or Python's multiprocessing library significantly easier.

While you may be used to pickling being a non-efficient way or storing data, in many cases Vaex does not pickle the data itself. When we do:

import vaex
import pickle

df = vaex.open('s3://vaex/taxi/nyc_taxi_2015_mini.parquet')
df['tip_percentage'] = df.tip_amount / df.total_amount
N = len(df)
print(f'{N:,} rows')
##300,000 rows

with open('mydataframe.pickle', 'wb') as f:
    pickle.dump(df, f)
## $ ls -alh ./mydataframe.pickle
## -rw-rw-r-- 1 maartenbreddels maartenbreddels 907 dec  1 16:24 ./mydataframe.pickle

You might be surprised that this is only 907 bytes!. If you inspect the file, you will notice that we pickle the path, and the internal state (such as the virtual column df.tip_amount / df.total_amount).

This can be very useful when you want to store your whole dataframe to disk, or send it over a wire. Instead of copying 1 terabyte of data, we simply pickle the original file location and all operations you have done to it (transformations and filtering). Note that this works particularly well with cloud storage, since the file location will be the same for all computers.

Although with Vaex we primarily focus on getting the maximum performance out of a single powerful machine, this paves the way for optimized distributed computations, for those situations in which you really need a cluster.

Parallel apply with multiprocessing

Recognizing that df.apply is an escape hatch that is necessary to get stuff done, we also had to recognize that this was painfully slow compared to the rest of Vaex, dropping to 100% CPU Usage. The bottleneck here is caused by the Global Interpreter Lock (GIL) and not Vaex directly. Nonetheless, we felt we should try and find a way to improve the performance of the apply method.

Now, by default, apply uses the multiprocessing module, making your apply fly!

import vaex

def is_prime(x):
    return x > 1 and all((x % i) != 0 for i in range(2, x))

df = vaex.from_arrays(x=vaex.vrange(0, 100_000, dtype='i4'))
# you need to explicitly specify which arguments you need
df['is_prime'] = df.apply(is_prime, arguments=[df.x])
df.head(10)
##  #    x  is_prime
##  0    0  False
##  1    1  False
##  2    2  True
##  3    3  True
##  4    4  False
##  5    5  True
##  6    6  False
##  7    7  True
##  8    8  False
##  9    9  False

Cloud storage

While S3 support for HDF5 was present in earlier Vaex versions, with version 4.0 we further improve on this thanks to both the Apache Arrow and FSSpec projects. Vaex can read and write most file formats to many cloud storage systems, with the most performant being S3. For S3 and Google Cloud Storage, we natively support their file paths, e.g "s3://vaex/testing/xys.hdf5", or "gc://vaex/testing/xys.hdf5". In all other cases, one can specify a FileSystem object via either Apache Arrow or FSSpec.

import vaex
df = vaex.open('s3://vaex/testing/xys.hdf5')
df = vaex.open('gc://vaex/testing/xys.hdf5')
# Using Apache Arrow to connect to a Hadoop File System
import pyarrow.fs as fs
hdfs = fs.HadoopFileSystem(host, port)
df = vaex.open('myfile.parquet', fs=hdfs)
# Or FSSpec to use HTTP
import fsspec
https = fsspec.filesystem('https')
df = vaex.open('https://cdn.gea.esac.esa.int/Gaia/gedr3/gaia_source/GaiaSource_000000-003111.csv.gz', fs=http)

Conclusions

We hope you are as excited as we are with the 4.0 release of Vaex. Unifying Apache Arrow and NumPy in a single DataFrame library gives us rich data structures (e.g. list of strings), compatibility with NumPy, interoperability (zero-copy data sharing), and an overall bright future. The out-of-core Parquet support together with much-improved cloud storage support makes working with and on the cloud much more transparent. Improved pickle support and default parallel apply allow for better performance when you need to resort to pure Python code.

Installation

# Want to install?
$ pip install "vaex==4.*"
# Just want to core functionality?
$ pip install "vaex-core==4.*"
# Using conda or mamba?
$ conda install -c conda-forge vaex=4

Contributing

Join us on GitHub at https://github.com/vaexio/vaex if you want to contribute or find issues, or https://github.com/vaexio/vaex/discussions for more general questions.

Do you need help accelerating you data processing or data analytics? Reach out to us to see if we can help you.

Liked this article? Subscribe to receive notifications of new articles.

* indicates required