Skip to content

Commit

Permalink
Support passing multiple url files to make_reader function. (#731)
Browse files Browse the repository at this point in the history
Support passing multiple url files to make_reader function.

Resolves #728.
  • Loading branch information
selitvin authored Jan 10, 2022
1 parent 54e6bc2 commit 3579e68
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 12 deletions.
1 change: 1 addition & 0 deletions docs/release-notes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Release notes
Release 0.11.4 (unreleased)
===========================
- `PR 730 <https://github.com/uber/petastorm/pull/730>`_ (resolves `PR 724 <https://github.com/uber/petastorm/issues/724>`_): Hadoop 3 support: allow more than 2 nodenames in hadoop configuration file.
- `PR 731 <https://github.com/uber/petastorm/pull/731>`_ (resolves `PR 728 <https://github.com/uber/petastorm/issues/728>`_): Support passing multiple parquet dataset urls to make_reader.
- `PR 732 <https://github.com/uber/petastorm/pull/732>`_ (resolves `PR 585 <https://github.com/uber/petastorm/issues/585>`_): Restructure process_pool implementation code in a way that resolves ``RuntimeWarning: 'petastorm.workers_pool.exec_in_new_process' found in sys.modules after
import of package 'petastorm.workers_pool', but prior to execution of 'petastorm.workers_pool.exec_in_new_process'; this may result in unpredictable behaviou when using process pool`` warning.

Expand Down
6 changes: 5 additions & 1 deletion petastorm/py_dict_reader_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import hashlib
import threading
from collections.abc import Iterable

import numpy as np
from pyarrow import parquet as pq
Expand Down Expand Up @@ -157,7 +158,10 @@ def process(self, piece_index, worker_predicate, shuffle_row_drop_partition):
# 2. Dataset path is hashed, to make sure we don't create too long keys, which maybe incompatible with
# some cache implementations
# 3. Still leave relative path and the piece_index in plain text to make it easier to debug
cache_key = '{}:{}:{}'.format(hashlib.md5(self._dataset_path.encode('utf-8')).hexdigest(),
# self._dataset_path could be a list of urls or a string.
_dataset_path_for_hash = "_".join(self._dataset_path) if isinstance(self._dataset_path,
Iterable) else self._dataset_path
cache_key = '{}:{}:{}'.format(hashlib.md5(_dataset_path_for_hash.encode('utf-8')).hexdigest(),
piece.path, piece_index)
all_cols = self._local_cache.get(cache_key,
lambda: self._load_rows(parquet_file, piece, shuffle_row_drop_partition))
Expand Down
11 changes: 6 additions & 5 deletions petastorm/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,10 @@ def make_reader(dataset_url,
See :func:`~petastorm.make_batch_reader` to read from a Parquet store that was not generated using
:func:`~petastorm.etl.dataset_metadata.materialize_dataset`.
:param dataset_url: an filepath or a url to a parquet directory,
:param dataset_url: a url to a parquet directory or a url list (with the same scheme) to parquet files.
e.g. ``'hdfs://some_hdfs_cluster/user/yevgeni/parquet8'``, or ``'file:///tmp/mydataset'``,
or ``'s3://bucket/mydataset'``, or ``'gs://bucket/mydataset'``.
or ``'s3://bucket/mydataset'``, or ``'gs://bucket/mydataset'``,
or ``[file:///tmp/mydataset/00000.parquet, file:///tmp/mydataset/00001.parquet]``.
:param schema_fields: Can be: a list of unischema fields and/or regex pattern strings; ``None`` to read all fields;
an NGram object, then it will return an NGram of the specified fields.
:param reader_pool_type: A string denoting the reader pool type. Should be one of ['thread', 'process', 'dummy']
Expand Down Expand Up @@ -132,10 +133,10 @@ def make_reader(dataset_url,
other filesystem configs if it's provided.
:return: A :class:`Reader` object
"""
dataset_url = normalize_dir_url(dataset_url)
dataset_url_or_urls = normalize_dataset_url_or_urls(dataset_url)

filesystem, dataset_path = get_filesystem_and_path_or_paths(
dataset_url,
dataset_url_or_urls,
hdfs_driver,
storage_options=storage_options,
filesystem=filesystem
Expand All @@ -149,7 +150,7 @@ def make_reader(dataset_url,
raise ValueError('Unknown cache_type: {}'.format(cache_type))

try:
dataset_metadata.get_schema_from_dataset_url(dataset_url, hdfs_driver=hdfs_driver,
dataset_metadata.get_schema_from_dataset_url(dataset_url_or_urls, hdfs_driver=hdfs_driver,
storage_options=storage_options, filesystem=filesystem)
except PetastormMetadataError:
warnings.warn('Currently make_reader supports reading only Petastorm datasets. '
Expand Down
15 changes: 9 additions & 6 deletions petastorm/tests/test_end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import glob
import tempfile
import operator
import os
import tempfile
from concurrent.futures import ThreadPoolExecutor
from shutil import rmtree, copytree
from six.moves.urllib.parse import urlparse
from unittest import mock

import numpy as np
import pyarrow.hdfs
import pytest
from pyspark.sql import SparkSession
from pyspark.sql.types import LongType, ShortType, StringType

from unittest import mock
from six.moves.urllib.parse import urlparse

from petastorm import make_reader, make_batch_reader, TransformSpec
from petastorm.codecs import ScalarCodec, CompressedImageCodec
Expand Down Expand Up @@ -118,8 +117,12 @@ def test_simple_read_from_parquet_file(synthetic_dataset, reader_factory):
path = synthetic_dataset.url[len('file://'):]
one_parquet_file = glob.glob(f'{path}/**/*.parquet')[0]
with reader_factory(f"file://{one_parquet_file}") as reader:
all_data = list(reader)
assert len(all_data) > 0
all_data_from_file = list(reader)
assert len(all_data_from_file) > 0

with reader_factory([f"file://{one_parquet_file}", f"file://{one_parquet_file}"]) as reader:
all_data_from_file_twice = list(reader)
assert len(all_data_from_file) < len(all_data_from_file_twice)


@pytest.mark.parametrize('reader_factory', [
Expand Down

0 comments on commit 3579e68

Please sign in to comment.