Categories
Misc

Optimizing Access to Parquet Data with fsspec

This post details how the filesystem specification’s new parquet model provides a format-aware byte-cashing optimization.

As datasets continue to grow in size, the adoption of cloud-storage platforms like Amazon S3 and Google Cloud Storage (GCS) are becoming more popular. Although node-local storage is likely to result in better IO performance, this approach can become impractical after the dataset exceeds the single-terabyte scale.

For cases where remote storage is the only practical solution, much of the PyData ecosystem has already adopted Filesystem Spec (fsspec) as a universal file-system API. While fsspec has provided users with adequate remote storage access since the introduction of s3fs and gcsfs, the internal byte-transfer and caching algorithms have not yet leveraged format-specific optimizations for high-performance file formats like Parquet.

Key learnings

In this post, we introduce the fsspec.parquet module, which provides a format-aware, byte-caching optimization for remote Parquet files. This module is both experimental and limited in scope to a single public API: open_parquet_file.

This API provides faster remote-file access. When compared with the default read_parquet behavior in the pandas and cuDF DataFrame libraries, there is a consistent performance boost in overall throughput for partial I/O (column-chunk and row-group selection) from large Parquet files.

We also discuss the optimizations used within this new module and present the preliminary performance results.

What is fsspec?

Filesystem Spec (fsspec) is an open-sourced project providing a unified Python interface to a variety of backend storage systems. fsspec corresponds to a specific fsspec Python library and a larger GitHub organization containing many system-specific repositories (for example, s3fs and gcsfs).

The advantage of using fsspec within a Python-based library or application is that the same POSIX-like file API can write and read from remote objects and local files alike. When a cloud-based object is opened by an fsspec-compatible file system, the underlying application is given an AbstractBufferedFile object, or some other file-like object designed to duck-type with native Python file objects. Now, these file-like objects can be treated the same way as local Python-file handles.

One obvious difference is the AbstractBufferedFile object must use file system-specific commands internally to put and get bytes to and from the remote storage system. As internal data-transfer operations typically have higher latency than local disk accesses, fsspec offers several caching strategies to prefetch data while avoiding many small requests.

Later in this post, we explain why the default caching strategy is often suboptimal for large Parquet files, and how a new KnownPartsOfAFile (“parts”) option can dramatically reduce read latency.

What is Parquet?

Parquet is a binary column-oriented data-storage format, designed with performance, compression, and partial I/O in mind. Due to its dramatic performance advantages over text-formatted files like CSV, the open-source format has experienced rapid growth in popularity since its initial release in 2013.

To understand the optimizations used within the Fsspec-Parquet module, it is useful to have a high-level understanding of the Parquet specification. Figure 1 shows that all Parquet files consist of a collection of contiguously stored row-groups, and each of these row-groups includes a collection of contiguously stored column chunks.

A Parquet file is divided into row-groups, and row-groups are divided into column chunks. Every Parquet file also includes a single block of footer metadata.
Figure 1. High-level visual representation of the Parquet file format

Altogether, the majority of all bytes in a Parquet file correspond to these column chunks. The remaining bytes are used to store file metadata within a thrift-formatted footer. This footer metadata includes byte offsets and optional statistics (min, max, valid count, null count) for every column-chunk in the file.

Because this vital information is consolidated in the footer, it is only necessary to sample the end of a Parquet file to determine the specific location of each column-chunk in the file.

What is the purpose of fsspec’s new Parquet module?

fsspec is already the most common means of loading Parquet files under Python. The primary purpose of the new fsspec.parquet module is to provide an optimized utility for exactly this task.

Internally, this new utility (open_parquet_file) essentially wraps a conventional open call in Parquet-specific logic that is designed to begin the transfer of all relevant data from the remote file into local memory before the user has even initiated an explicit read operation.

Although reads of any size may benefit from this new utility, the most significant improvements are seen when the read operation targets only a subset of all columns and row-groups. For example, when the remote read is using column projection, the same list of columns can be passed directly to open_parquet_file:

from fsspec.parquet import open_parquet_file
import pandas as pd

path = “://my-bucket/my-data”
columns = [“col1”, “col2”]
options = {“necessary”: ”credentials”}

with open_parquet_file(
    path,
    columns=columns,
    storage_options=options,
) as f:
    df = pd.read_parquet(path, columns=columns)

As we mentioned earlier, the primary purpose of the fsspec new open_parquet_file function is to improve read performance from large Parquet files by using a format-aware caching strategy within AbstractBufferedFile.open.

To understand why the specific optimizations used by the Parquet module are beneficial, it is helpful to start with an understanding of the simple cache-free approach, as well as the default read-ahead strategy.

Understanding cache-free and read-ahead approaches

Figure 2 shows how cache-free file access is likely to map onto remote get calls within a read_parquet operation (after the remote file is already opened) for both an ideal and naive sequence of read calls.

The overall number of remote-get operations can be dramatically reduced by coalescing multiple requests for adjacent data blocks.
Figure 2. Comparison of ideal and naive data access patterns from a remote Parquet file

In this particular example, assume the Parquet file is large enough (~400MB+) to comprise four distinct row-groups, and the read_parquet call is targeting two out of the four available columns. This means the AbstractBufferedFile object must ultimately transfer eight distinct column-chunk ranges, along with the footer metadata, into local memory.

In the case that caching is disabled, it is possible for a well-tuned Parquet library to move only the necessary data into local memory using five distinct requests. However, as fsspec’s file-like interface contains state, these five read calls are always serial and incur one whole latency time for each call.

This ideal-access scenario is incapable of leveraging concurrency to minimize latency, even though the strategy is likely to minimize the number of file-system requests and produce high overall throughput. For Parquet libraries that take a naive-access approach and do not explicitly minimize the number of read calls, the read_parquet operation is likely to suffer from high latency and low overall throughput!

At this point, we’ve established that the I/O library is unable to take advantage of file-system concurrency after the file is already opened for reading. What is even more important to consider is that the default read-ahead caching strategy can degrade the observed performance even further when partial I/O is involved. This is because the inherent assumption of read-ahead caching is that sequential file accesses are likely to be contiguous.

Figure 3 shows that read-ahead caching is likely to transfer about 20 MB of unnecessary data, 5 MB of read-ahead for each row-group.

Using read-ahead caching to load a subset of columns from a Parquet file will result in unnecessary data transfer.
Figure 3. Comparison of the read-ahead and cache-free file-access strategies in fsspec

As you see in the following performance results, the benefit of disabled caching over read-ahead caching is dependent on both the engine used within read_parquet and the specific storage system.

If the engine already includes format-aware optimizations to move the necessary byte ranges into local memory (that is, pyarrow and fastparquet), then the no-caching option is likely the better choice. When the engine assumes local file access is fast (cuDF), some form of fsspec-level caching may be critical.

In either case, the engine is unable to begin the transfer of the required byte ranges until after the fsspec file object is created. It is limited by the additive latency of sequential data transfer.

Now that we’ve established a high-level understanding of both default and cache-free AbstractBufferedFile behavior within a typical read_parquet call, we can now explain the two general optimizations used by open_parquet_file to improve overall read throughput.

Optimization 1: Use the “parts” caching strategy

Instead of the format-agnostic caching strategies used within AbstractBufferedFile by default, you use the new KnownPartsOfAFile (“parts”) option to cache exactly what you need before the file is even open.

In other words, begin by using an external Parquet engine (either fastparquet or pyarrow) to parse the footer metadata up-front. Then, transfer the necessary byte-ranges into local memory, and use the parts’ cache to ensure that downstream applications never have to wait for remote data after an open fsspec file object is acquired.

Figure 4 shows that there is no longer any relationship between the logic used to access data within read_parquet and the number or size of get calls used for remote-data access. From the perspective of the Parquet engine, any read operation is near-instantaneous, because all necessary data is already cached in memory.

When the “parts” caching strategy is used, read operations will not result in new remote-file accesses.
Figure 4. The fsspec’s “parts” caching strategy. All necessary data must be cached in local memory before the file-like object is even opened.

Optimization 2: Transfer “parts” asynchronously and in parallel

Although the previous optimization enables you to avoid many small get operations and unnecessary data transfer within read_parquet, you must still populate the KnownPArtsOfAFile cache before AbstractBufferedFile can be initialized.

To do this as efficiently as possible, use cat_ranges to fetch all the required column-chunks at one time, both asynchronously and in parallel using asyncio. Because the total number of transferred column chunks can be large for files containing many fields or multiple row-groups, you also aggregate adjacent byte-range requests as long as the size of the aggregated request stays below an upper limit.

Figure 5 shows that this approach ultimately leads to the concurrent transfer of multiple byte ranges of optimal size.

Diagram shows that all necessary remote data is transferred into local-memory in parallel. Adjacent data transfers are coalesced to reduce the overall number of remote-file accesses.
Figure 5. Data-transfer optimizations used in fsspec.parquet

Preliminary fsspec.parquet benchmark results

To compare the performance of open_parquet_file with other fsspec– and pyarrow-based file-handling methods, use the available Python script:

  • pyarrow-6.0.1
  • fastparquet-0.8.0
  • cudf-22.04
  • fsspec/s3fs/gcfs-2022.2.0

Using this script, we read a single column from a 789M Parquet file containing a total of 10 row-groups and 30 columns with snappy compression requiring the transfer of roughly 27M of data. The results for both S3 and GCS, summarized in Figures 6-7, clearly show a significant performance boost of 85% or more when moving from the default caching strategy to open_parquet_file.

In fact, the new function effectively matches the performance of PyArrow’s native S3FileSystem implementation, which is specifically designed for optimal performance with its Parquet engine. In this post, we compare it only with a native PyArrow filesystem for this single Amazon S3 benchmark, because PyArrow only offers publicly supported implementations for Amazon S3 and Hadoop at the time of publication.

Using fsspec’s Parquet module reduces the read latency for both cuDF and fastparquet, and roughly matches optimal performance in pyarrow.
Figure 6. General benchmark results for S3 storage
Using fsspec’s Parquet module reduces the read latency for cudf, fastparquet, and pyarrow. The latency is consistently reduced by 50% or more.
Figure 7. General benchmark results for GCS storage

To illustrate the benefit of using open_parquet_file scales with the overall size of the file, we also read a single column from a 12 GB Parquet file containing the first day of the Criteo dataset from public GCS storage with compression disabled. Figure 8 shows that the new fsspec function can offer a 10x speedup or more over default caching.

Using fsspec’s Parquet module significantly reduces the read latency for cuDF, fastparquet, and PyArrow. The measured speedup is a factor of ten or more.
Figure 8. Criteo-dataset benchmark results for GCS storage

Test it out yourself

In this post, we introduced the fsspec.parquet module, which provides a format-aware, byte-caching optimization for opening remote Parquet files, open_parquet_file. Benchmarking clearly suggests that the new optimization can offer significant performance improvements over default file-opening behavior in fsspec, and may even approach the performance of optimized C++ file-system implementations in PyArrow for partial I/O.

Since being officially released in the 2021.11.0 version of fsspec, the open_parquet_file utility has already been adopted by both the RAPIDS cuDF library and Dask-Dataframe.

Due to dramatic and consistent improvement available for cuDF-based workflows, this new feature has already been adopted as the default file-opening approach within cudf.read_parquet and dask_cudf.read_parquet.

For Dask users without GPU resources, the optimized caching approach can now be opted into by passing an open_file_options argument to read_parquet. For example, the following code example instructs Dask to open all Parquet data files with the parquet pre-caching method:

import dask.dataframe as dd

ddf = dd.read_parquet(
    path, open_file_options={“precache_options”: {“method”: “parquet”}}
)

Given this early success, we are hoping to expand and simplify the available pre-caching options in fsspec, and establish a clear precache_options API within all file-opening functions.

Community feedback here is critical. Please engage on GitHub or comment below, and let us know what you think!

Leave a Reply

Your email address will not be published. Required fields are marked *