This post details how the filesystem specification’s new parquet model provides a format-aware byte-cashing optimization.
As datasets continue to grow in size, the adoption of cloud-storage platforms like Amazon S3 and Google Cloud Storage (GCS) are becoming more popular. Although node-local storage is likely to result in better IO performance, this approach can become impractical after the dataset exceeds the single-terabyte scale.
For cases where remote storage is the only practical solution, much of the PyData ecosystem has already adopted Filesystem Spec (fsspec
) as a universal file-system API. While fsspec
has provided users with adequate remote storage access since the introduction of s3fs
and gcsfs
, the internal byte-transfer and caching algorithms have not yet leveraged format-specific optimizations for high-performance file formats like Parquet.
Key learnings
In this post, we introduce the fsspec.parquet
module, which provides a format-aware, byte-caching optimization for remote Parquet files. This module is both experimental and limited in scope to a single public API: open_parquet_file
.
This API provides faster remote-file access. When compared with the default read_parquet
behavior in the pandas and cuDF DataFrame libraries, there is a consistent performance boost in overall throughput for partial I/O (column-chunk and row-group selection) from large Parquet files.
We also discuss the optimizations used within this new module and present the preliminary performance results.
What is fsspec?
Filesystem Spec (fsspec
) is an open-sourced project providing a unified Python interface to a variety of backend storage systems. fsspec
corresponds to a specific fsspec
Python library and a larger GitHub organization containing many system-specific repositories (for example, s3fs
and gcsfs
).
The advantage of using fsspec
within a Python-based library or application is that the same POSIX-like file API can write and read from remote objects and local files alike. When a cloud-based object is opened by an fsspec
-compatible file system, the underlying application is given an AbstractBufferedFile
object, or some other file-like object designed to duck-type with native Python file objects. Now, these file-like objects can be treated the same way as local Python-file handles.
One obvious difference is the AbstractBufferedFile
object must use file system-specific commands internally to put and get bytes to and from the remote storage system. As internal data-transfer operations typically have higher latency than local disk accesses, fsspec
offers several caching strategies to prefetch data while avoiding many small requests.
Later in this post, we explain why the default caching strategy is often suboptimal for large Parquet files, and how a new KnownPartsOfAFile
(“parts”) option can dramatically reduce read latency.
What is Parquet?
Parquet is a binary column-oriented data-storage format, designed with performance, compression, and partial I/O in mind. Due to its dramatic performance advantages over text-formatted files like CSV, the open-source format has experienced rapid growth in popularity since its initial release in 2013.
To understand the optimizations used within the Fsspec-Parquet module, it is useful to have a high-level understanding of the Parquet specification. Figure 1 shows that all Parquet files consist of a collection of contiguously stored row-groups, and each of these row-groups includes a collection of contiguously stored column chunks.
Altogether, the majority of all bytes in a Parquet file correspond to these column chunks. The remaining bytes are used to store file metadata within a thrift-formatted footer. This footer metadata includes byte offsets and optional statistics (min
, max
, valid count
, null count
) for every column-chunk in the file.
Because this vital information is consolidated in the footer, it is only necessary to sample the end of a Parquet file to determine the specific location of each column-chunk in the file.
What is the purpose of fsspec’s new Parquet module?
fsspec
is already the most common means of loading Parquet files under Python. The primary purpose of the new fsspec.parquet
module is to provide an optimized utility for exactly this task.
Internally, this new utility (open_parquet_file
) essentially wraps a conventional open call in Parquet-specific logic that is designed to begin the transfer of all relevant data from the remote file into local memory before the user has even initiated an explicit read operation.
Although reads of any size may benefit from this new utility, the most significant improvements are seen when the read operation targets only a subset of all columns and row-groups. For example, when the remote read is using column projection, the same list of columns can be passed directly to open_parquet_file
:
from fsspec.parquet import open_parquet_file import pandas as pd path = “://my-bucket/my-data” columns = [“col1”, “col2”] options = {“necessary”: ”credentials”} with open_parquet_file( path, columns=columns, storage_options=options, ) as f: df = pd.read_parquet(path, columns=columns)
As we mentioned earlier, the primary purpose of the fsspec
new open_parquet_file
function is to improve read performance from large Parquet files by using a format-aware caching strategy within AbstractBufferedFile.open
.
To understand why the specific optimizations used by the Parquet module are beneficial, it is helpful to start with an understanding of the simple cache-free approach, as well as the default read-ahead strategy.
Understanding cache-free and read-ahead approaches
Figure 2 shows how cache-free file access is likely to map onto remote get calls within a read_parquet
operation (after the remote file is already opened) for both an ideal and naive sequence of read calls.
In this particular example, assume the Parquet file is large enough (~400MB+) to comprise four distinct row-groups, and the read_parquet
call is targeting two out of the four available columns. This means the AbstractBufferedFile
object must ultimately transfer eight distinct column-chunk ranges, along with the footer metadata, into local memory.
In the case that caching is disabled, it is possible for a well-tuned Parquet library to move only the necessary data into local memory using five distinct requests. However, as fsspec
’s file-like interface contains state
, these five read calls are always serial and incur one whole latency time for each call.
This ideal-access scenario is incapable of leveraging concurrency to minimize latency, even though the strategy is likely to minimize the number of file-system requests and produce high overall throughput. For Parquet libraries that take a naive-access approach and do not explicitly minimize the number of read calls, the read_parquet
operation is likely to suffer from high latency and low overall throughput!
At this point, we’ve established that the I/O library is unable to take advantage of file-system concurrency after the file is already opened for reading. What is even more important to consider is that the default read-ahead caching strategy can degrade the observed performance even further when partial I/O is involved. This is because the inherent assumption of read-ahead caching is that sequential file accesses are likely to be contiguous.
Figure 3 shows that read-ahead caching is likely to transfer about 20 MB of unnecessary data, 5 MB of read-ahead for each row-group.
As you see in the following performance results, the benefit of disabled caching over read-ahead caching is dependent on both the engine used within read_parquet
and the specific storage system.
If the engine already includes format-aware optimizations to move the necessary byte ranges into local memory (that is, pyarrow
and fastparquet
), then the no-caching option is likely the better choice. When the engine assumes local file access is fast (cuDF), some form of fsspec
-level caching may be critical.
In either case, the engine is unable to begin the transfer of the required byte ranges until after the fsspec
file object is created. It is limited by the additive latency of sequential data transfer.
Now that we’ve established a high-level understanding of both default and cache-free AbstractBufferedFile behavior within a typical read_parquet call, we can now explain the two general optimizations used by open_parquet_file
to improve overall read throughput.
Optimization 1: Use the “parts” caching strategy
Instead of the format-agnostic caching strategies used within AbstractBufferedFile
by default, you use the new KnownPartsOfAFile
(“parts”) option to cache exactly what you need before the file is even open.
In other words, begin by using an external Parquet engine (either fastparquet
or pyarrow
) to parse the footer metadata up-front. Then, transfer the necessary byte-ranges into local memory, and use the parts’ cache to ensure that downstream applications never have to wait for remote data after an open fsspec
file object is acquired.
Figure 4 shows that there is no longer any relationship between the logic used to access data within read_parquet
and the number or size of get
calls used for remote-data access. From the perspective of the Parquet engine, any read operation is near-instantaneous, because all necessary data is already cached in memory.
Optimization 2: Transfer “parts” asynchronously and in parallel
Although the previous optimization enables you to avoid many small get operations and unnecessary data transfer within read_parquet
, you must still populate the KnownPArtsOfAFile
cache before AbstractBufferedFile
can be initialized.
To do this as efficiently as possible, use cat_ranges
to fetch all the required column-chunks at one time, both asynchronously and in parallel using asyncio
. Because the total number of transferred column chunks can be large for files containing many fields or multiple row-groups, you also aggregate adjacent byte-range requests as long as the size of the aggregated request stays below an upper limit.
Figure 5 shows that this approach ultimately leads to the concurrent transfer of multiple byte ranges of optimal size.
Preliminary fsspec.parquet benchmark results
To compare the performance of open_parquet_file
with other fsspec
– and pyarrow
-based file-handling methods, use the available Python script:
pyarrow-6.0.1
fastparquet-0.8.0
cudf-22.04
fsspec/s3fs/gcfs-2022.2.0
Using this script, we read a single column from a 789M Parquet file containing a total of 10 row-groups and 30 columns with snappy compression requiring the transfer of roughly 27M of data. The results for both S3 and GCS, summarized in Figures 6-7, clearly show a significant performance boost of 85% or more when moving from the default caching strategy to open_parquet_file
.
In fact, the new function effectively matches the performance of PyArrow’s native S3FileSystem
implementation, which is specifically designed for optimal performance with its Parquet engine. In this post, we compare it only with a native PyArrow filesystem for this single Amazon S3 benchmark, because PyArrow only offers publicly supported implementations for Amazon S3 and Hadoop at the time of publication.
To illustrate the benefit of using open_parquet_file
scales with the overall size of the file, we also read a single column from a 12 GB Parquet file containing the first day of the Criteo dataset from public GCS storage with compression disabled. Figure 8 shows that the new fsspec
function can offer a 10x speedup or more over default caching.
Test it out yourself
In this post, we introduced the fsspec.parquet
module, which provides a format-aware, byte-caching optimization for opening remote Parquet files, open_parquet_file
. Benchmarking clearly suggests that the new optimization can offer significant performance improvements over default file-opening behavior in fsspec
, and may even approach the performance of optimized C++ file-system implementations in PyArrow for partial I/O.
Since being officially released in the 2021.11.0 version of fsspec, the open_parquet_file
utility has already been adopted by both the RAPIDS cuDF library and Dask-Dataframe.
Due to dramatic and consistent improvement available for cuDF-based workflows, this new feature has already been adopted as the default file-opening approach within cudf.read_parquet
and dask_cudf.read_parquet
.
For Dask users without GPU resources, the optimized caching approach can now be opted into by passing an open_file_options
argument to read_parquet
. For example, the following code example instructs Dask to open all Parquet data files with the parquet
pre-caching method:
import dask.dataframe as dd
ddf = dd.read_parquet(
path, open_file_options={“precache_options”: {“method”: “parquet”}}
)
Given this early success, we are hoping to expand and simplify the available pre-caching options in fsspec
, and establish a clear precache_options API within all file-opening functions.
Community feedback here is critical. Please engage on GitHub or comment below, and let us know what you think!