Skip to content

Commit

Permalink
Read and write in the same loop instead of materializing the records …
Browse files Browse the repository at this point in the history
…to a list
  • Loading branch information
snf2ye committed Jul 17, 2024
1 parent 71cd6a8 commit 23703b5
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ public static List<GenericData.Record> readFromParquet(Path filePath) throws IOE
return readFromParquet(inputFile);
}

public static void writeToParquet(
List<GenericData.Record> recordsToWrite,
public static void readWriteToParquet(
InputFile inputFile,
List<String> columnNames,
Map<String, TableDataType> columnDataTypeMap,
OutputFile fileToWrite,
Expand All @@ -63,32 +63,37 @@ public static void writeToParquet(
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.build()) {
try (ParquetReader<GenericData.Record> reader =
AvroParquetReader.<GenericData.Record>builder(inputFile).build()) {
GenericData.Record record;

for (GenericData.Record record : recordsToWrite) {
var newRecord = new GenericData.Record(schema);
for (var column : columnNames) {
switch (columnDataTypeMap.get(column)) {
case DATETIME, TIMESTAMP:
// Convert from fixed length binary to a long representing microseconds since epoch
GenericData.Fixed dtFixed = (GenericData.Fixed) record.get(column);
if (dtFixed == null) {
newRecord.put(column, null);
} else {
var bytes = dtFixed.bytes();
ByteBuffer bb = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN);
long timeOfDayNanos = bb.getLong();
var julianDay = bb.getInt();
// Given timeOfDayNanos and julianDay, convert to microseconds since epoch
Long microSeconds =
(long) (julianDay - 2440588) * 86400 * 1000000 + timeOfDayNanos / 1000;
newRecord.put(column, microSeconds);
}
break;
default:
newRecord.put(column, record.get(column));
while ((record = reader.read()) != null) {

var newRecord = new GenericData.Record(schema);
for (var column : columnNames) {
switch (columnDataTypeMap.get(column)) {
case DATETIME, TIMESTAMP:
// Convert from fixed length binary to a long representing microseconds since epoch
GenericData.Fixed dtFixed = (GenericData.Fixed) record.get(column);
if (dtFixed == null) {
newRecord.put(column, null);
} else {
var bytes = dtFixed.bytes();
ByteBuffer bb = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN);
long timeOfDayNanos = bb.getLong();
var julianDay = bb.getInt();
// Given timeOfDayNanos and julianDay, convert to microseconds since epoch
Long microSeconds =
(long) (julianDay - 2440588) * 86400 * 1000000 + timeOfDayNanos / 1000;
newRecord.put(column, microSeconds);
}
break;
default:
newRecord.put(column, record.get(column));
}
}
writer.write(newRecord);
}
writer.write(newRecord);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,10 @@ void readWriteToAzureStorageBlob() throws IOException {

@Test
void formatDateTimeField() throws IOException {
Path sourceFile = new Path(FOLDER_PATH + "azure_original.parquet");
List<GenericData.Record> records = ParquetReaderWriterWithAvro.readFromParquet(sourceFile);
Path azureSourceFile = new Path(FOLDER_PATH + "azure_original.parquet");
var localConfig = new Configuration();
localConfig.set("parquet.avro.readInt96AsFixed", "true");
InputFile azureInputFile = HadoopInputFile.fromPath(azureSourceFile, localConfig);

Schema nullType = Schema.create(Schema.Type.NULL);
Schema timestampMicroType =
Expand All @@ -93,7 +95,8 @@ void formatDateTimeField() throws IOException {
var output = randomizedFilePath("datetime");
var config = new Configuration();
OutputFile outputFile = HadoopOutputFile.fromPath(output, config);
writeToParquet(records, columnNames, columnDataTypeMap, outputFile, newSchema, config);
readWriteToParquet(
azureInputFile, columnNames, columnDataTypeMap, outputFile, newSchema, config);

List<GenericData.Record> formattedRecords = ParquetReaderWriterWithAvro.readFromParquet(output);
assertThat(
Expand All @@ -110,13 +113,12 @@ void formatDateTimeField() throws IOException {
void e2eTestFormatDatetime() throws IOException {
// == local read ==
Path azureSourceFile = new Path(FOLDER_PATH + "azure_original.parquet");
List<GenericData.Record> recordsToWrite =
ParquetReaderWriterWithAvro.readFromParquet(azureSourceFile);
var localConfig = new Configuration();
localConfig.set("parquet.avro.readInt96AsFixed", "true");
InputFile azureInputFile = HadoopInputFile.fromPath(azureSourceFile, localConfig);
// === generate signed url from snapshotExport ===
// var signedUrlFromSnapshotExport = "<redacted>";
// var signedUrlFromSnapshotExport = null;
// InputFile azureInputFile = buildInputFileFromSignedUrl(signedUrlFromSnapshotExport);
// List<GenericData.Record> recordsToWrite =
// ParquetReaderWriterWithAvro.readFromParquet(azureInputFile);

Schema nullType = Schema.create(Schema.Type.NULL);
Schema timestampMicroType =
Expand Down Expand Up @@ -144,7 +146,8 @@ void e2eTestFormatDatetime() throws IOException {
var config = ParquetReaderWriterWithAvro.getConfigFromSignedUri(signedUrl);
var path = new Path(writeToUri);
OutputFile outputFile = HadoopOutputFile.fromPath(path, config);
writeToParquet(recordsToWrite, columnNames, columnDataTypeMap, outputFile, newSchema, config);
readWriteToParquet(
azureInputFile, columnNames, columnDataTypeMap, outputFile, newSchema, config);

// Read from azure storage blob
InputFile inputFile = HadoopInputFile.fromPath(path, config);
Expand Down Expand Up @@ -210,11 +213,10 @@ private String buildSignedUrl(String fileName) {
var storageAccount = "shelbytestaccount";
var filePath = "testfilesystem/oysters/";
// Url signed from "testfilesystem" container
var defaultSasToken = "<redacted>";
// TODO: override this with your SAS token
var sasToken = defaultSasToken;
if (sasToken.equals(defaultSasToken)) {
throw new IllegalArgumentException("Please set a valid SAS token");
// TODO - set your own SAS token here
String sasToken = null;
if (sasToken == null) {
throw new IllegalArgumentException("SAS token must be set");
}
return "https://%s.blob.core.windows.net/%s%s?%s"
.formatted(storageAccount, filePath, fileName, sasToken);
Expand Down

0 comments on commit 23703b5

Please sign in to comment.