Search
 
SCRIPT & CODE EXAMPLE
 

PYTHON

python pandas read parquet with progressbar

from functools import partial

import pandas as pd
import pyarrow as pa
from tqdm.auto import tqdm
from tqdm.contrib.concurrent import process_map


def _read_parquet(filename, columns=None):
    """
    Wrapper to pass to a ProcessPoolExecutor to read parquet files as fast as possible. The PyArrow engine (v4.0.0) is faster than the fastparquet engine (v0.7.0) as it can read columns in parallel. Explicitly enable multithreaded column reading with `use_threads == true`.

    Parameters
    ----------

    filename : str
        Path of the parquet file to read.
    columns : list, default=None
        List of columns to read from the parquet file. If None, reads all columns.

    Returns
    -------
    pandas Dataframe
    """

    return pd.read_parquet(
        filename, columns=columns, engine="pyarrow", use_threads=True
    )


def read_parquet(
    files,
    columns=None,
    parallel=True,
    n_concurrent_files=8,
    n_concurrent_columns=4,
    show_progress=True,
    ignore_index=True,
    chunksize=None,
):
    """
    Read a single parquet file or a list of parquet files and return a pandas DataFrame.

    If `parallel==True`, it's on average 50% faster than `pd.read_parquet(..., engine="fastparquet")`. Limited benchmarks indicate that the default values for `n_concurrent_files` and `n_concurrent_columns` are the fastest combination on a 32 core CPU. `n_concurrent_files` * `n_concurrent_columns` <= the number of available cores.

    Parameters
    ----------

    files : list or str
        String with path or list of strings with paths of the parqiaet file(s) to be read.
    columns : list, default=None
        List of columns to read from the parquet file(s). If None, reads all columns.
    parallel : bool, default=True
        If True, reads both files and columns in parallel. If False, read the files serially while still reading the columns in parallel.
    n_concurrent_files : int, default=8
        Number of files to read in parallel.
    n_concurrent_columns : int, default=4
        Number of columns to read in parallel.
    show_progress : bool, default=True
        If True, shows a tqdm progress bar with the number of files that have already been read.
    ignore_index : bool, default=True
        If True, do not use the index values along the concatenation axis. The resulting axis will be labeled 0, ..., n-1. This is useful if you are concatenating objects where the concatention axis does not have meaningful indexing information.
    chunksize : int, default=None
        Number of files to pass as a single task to a single process. Values greater than 1 can improve performance if each task is expected to take a similar amount of time to complete and `len(files) > n_concurrent_files`. If None, chunksize is set to `len(files) / n_concurrent_files` if `len(files) > n_concurrent_files` else it's set to 1.

    Returns
    ------
    pandas DataFrame
    """

    # ensure files is a list when reading a single file
    if isinstance(files, str):
        files = [files]

    # no need for more cpu's then files
    if len(files) < n_concurrent_files:
        n_concurrent_files = len(files)

    # no need for more workers than columns
    if columns:
        if len(columns) < n_concurrent_columns:
            n_concurrent_columns = len(columns)

    # set number of threads used for reading the columns of each parquet files
    pa.set_cpu_count(n_concurrent_columns)

    # try to optimize the chunksize based on
    # https://stackoverflow.com/questions/53751050/python-multiprocessing-understanding-logic-behind-chunksize
    # this assumes each task takes roughly the same amount of time to complete
    # i.e. each dataset is roughly the same size if there are only a few files
    # to be read, i.e. ´len(files) < n_concurrent_files´, give each cpu a single file to read
    # when there are more files than cpu's give chunks of multiple files to each cpu
    # this is in an attempt to minimize the overhead of assigning files after every completed file read
    if (chunksize is None) and (len(files) > n_concurrent_files):
        chunksize, remainder = divmod(len(files), n_concurrent_files)
        if remainder:
            chunksize += 1
    else:
        chunksize = 1

    if parallel is True:
        _read_parquet_map = partial(_read_parquet, columns=columns)
        dfs = process_map(
            _read_parquet_map,
            files,
            max_workers=n_concurrent_files,
            chunksize=chunksize,
            disabled=not show_progress,
        )

    else:
        dfs = [_read_parquet(file) for file in tqdm(files, disabled=not show_progress)]

    # reduce the list of dataframes to a single dataframe
    df = pd.concat(dfs, ignore_index=ignore_index)

    return df
Comment

PREVIOUS NEXT
Code Example
Python :: codeforces problem 580A 
Python :: Python __ge__ 
Python :: __div__ 
Python :: __le__ 
Python :: django ejemplo de un formulario crud 
Python :: NumPy fliplr Example 
Python :: using Canvas with tkinger 
Python :: print number upto 2 decimal places in f string python 
Python :: colorbar over two axes 
Python :: NumPy packbits Code Packed array along axis 1 
Python :: django view - apiview decorator (retrieve, update or delete - GET, PUT, DELETE) 
Python :: How to use "to_representation" hook for django rest serializers 
Python :: instance variables python 
Python :: penggunaan items di python 
Python :: Python range Incrementing with the range using a positive step 
Python :: How to Preprocess for categorical data 
Python :: finding-the-largest-three-digits-number-within-a-number 
Python :: pydantic model from dataclass 
Python :: map function in pyhton 
Python :: how to access specific index of matrix in python 
Python :: HTML not being displayed properly in Flask, Python 
Python :: jsfakjfkjadjfksajfa 
Python :: np sign no 0 
Python :: ring Delete Item From List 
Python :: ring get a list of functions names written in the Ring language 
Python :: how to start spaCy code 
Python :: python netcdf double 
Python :: capiatlize first letter in list 
Python :: attribute error rest framework 
Python :: bar plot with patterns colors 
ADD CONTENT
Topic
Content
Source link
Name
9+2 =