Skip to content

Commit

Permalink
Have petastorm-generate-metadata preserve old indexes
Browse files Browse the repository at this point in the history
  • Loading branch information
Robbie Gruener authored and rgruener committed Aug 24, 2018
1 parent 3e76dfa commit 062101f
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 12 deletions.
28 changes: 22 additions & 6 deletions petastorm/etl/petastorm_generate_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
from pyarrow import parquet as pq
from pyspark.sql import SparkSession

from petastorm.etl.dataset_metadata import materialize_dataset, get_schema
from petastorm.etl.dataset_metadata import materialize_dataset, get_schema, ROW_GROUPS_PER_FILE_KEY
from petastorm.etl.rowgroup_indexing import ROWGROUPS_INDEX_KEY
from petastorm.fs_utils import FilesystemResolver
from petastorm.utils import add_to_dataset_metadata

example_text = '''This is meant to be run as a spark job. Example (some replacement required):
Expand Down Expand Up @@ -50,21 +52,26 @@ def generate_petastorm_metadata(spark, dataset_url, unischema_class=None):
"""
sc = spark.sparkContext

resolver = FilesystemResolver(dataset_url, sc._jsc.hadoopConfiguration())
dataset = pq.ParquetDataset(
resolver.parsed_dataset_url().path,
filesystem=resolver.filesystem(),
validate_schema=False)

if unischema_class:
schema = locate(unischema_class)
else:
resolver = FilesystemResolver(dataset_url, sc._jsc.hadoopConfiguration())
dataset = pq.ParquetDataset(
resolver.parsed_dataset_url().path,
filesystem=resolver.filesystem(),
validate_schema=False)

try:
schema = get_schema(dataset)
except ValueError:
raise ValueError('Unischema class could not be located in existing dataset,'
' please specify it')

# In order to be backwards compatible, we retrieve the common metadata from the dataset before
# overwriting the metadata to keep row group indexes and the old row group per file index
arrow_metadata = dataset.common_metadata or None

with materialize_dataset(spark, dataset_url, schema):
# Inside the materialize dataset context we just need to write the metadata file as the schema will
# be written by the context manager.
Expand All @@ -75,6 +82,15 @@ def generate_petastorm_metadata(spark, dataset_url, unischema_class=None):
parquet_output_committer = sc._gateway.jvm.org.apache.parquet.hadoop.ParquetOutputCommitter
parquet_output_committer.writeMetaDataFile(hadoop_config, Path(dataset_url))

if arrow_metadata:
# If there was the old row groups per file key or the row groups index key, add them to the new dataset metadata
base_schema = arrow_metadata.schema.to_arrow_schema()
metadata_dict = base_schema.metadata
if ROW_GROUPS_PER_FILE_KEY in metadata_dict:
add_to_dataset_metadata(dataset, ROW_GROUPS_PER_FILE_KEY, metadata_dict[ROW_GROUPS_PER_FILE_KEY])
if ROWGROUPS_INDEX_KEY in metadata_dict:
add_to_dataset_metadata(dataset, ROWGROUPS_INDEX_KEY, metadata_dict[ROWGROUPS_INDEX_KEY])


def _main(args):
parser = argparse.ArgumentParser(prog='petastorm_generate_metadata',
Expand Down
13 changes: 7 additions & 6 deletions petastorm/tests/test_generate_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
import pyarrow.parquet as pq

from petastorm.reader import Reader
from petastorm.tests.test_common import create_test_dataset
from petastorm.selectors import SingleIndexSelector
from petastorm.tests.test_common import create_test_dataset, TestSchema
from petastorm.workers_pool.dummy_pool import DummyPool
from petastorm.etl import petastorm_generate_metadata

Expand All @@ -24,9 +25,9 @@ def synthetic_dataset(tmpdir_factory):
return SyntheticDataset(url=url, path=path, data=data)


def _check_reader(path):
def _check_reader(path, rowgroup_selector=None):
# Just check that you can open and read from a reader successfully
with Reader('file://{}'.format(path), reader_pool=DummyPool()) as reader:
with Reader('file://{}'.format(path), reader_pool=DummyPool(), rowgroup_selector=rowgroup_selector) as reader:
[next(reader) for _ in range(10)]


Expand All @@ -48,8 +49,8 @@ def test_regenerate_row_group_metadata(synthetic_dataset, tmpdir):
# Regenerate the metadata (taking the schema information from the common_metadata which exists)
petastorm_generate_metadata._main(['--dataset_url', 'file://{}'.format(a_moved_path)])

# Reader should now work again
_check_reader(a_moved_path)
# Reader should now work again with rowgroup selector since it was in original metadata
_check_reader(a_moved_path, SingleIndexSelector(TestSchema.id.name, [2, 18]))


def test_regenerate_all_metadata(synthetic_dataset, tmpdir):
Expand All @@ -74,7 +75,7 @@ def test_regenerate_all_metadata(synthetic_dataset, tmpdir):
'--unischema_class', 'petastorm.tests.test_common.TestSchema',
])

# Reader should now work again
# Reader should now work again (row group selector will not since we removed all metadata)
_check_reader(a_moved_path)


Expand Down

0 comments on commit 062101f

Please sign in to comment.