Skip to content

Commit

Permalink
POC
Browse files Browse the repository at this point in the history
More complete e2e test

wip
  • Loading branch information
snf2ye committed Jul 16, 2024
1 parent 3dc45d0 commit 71cd6a8
Show file tree
Hide file tree
Showing 8 changed files with 397 additions and 8 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,5 @@ gha-creds-*.json
#tools output
tools/setupResourceScripts/*_outputs.json
tools/profileEndpoints/results_*.csv

src/main/java/bio/terra/service/parquet/output/*
23 changes: 15 additions & 8 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -241,19 +241,26 @@ dependencies {

// OpenTelemetry @WithSpan annotations:
implementation 'io.opentelemetry.instrumentation:opentelemetry-instrumentation-annotations:2.2.0'

testImplementation 'org.apache.parquet:parquet-common:1.12.0'
testImplementation 'org.apache.parquet:parquet-hadoop:1.12.0'
testImplementation 'org.apache.parquet:parquet-hadoop-bundle:1.12.0'
testImplementation 'org.apache.parquet:parquet-encoding:1.12.0'
testImplementation 'org.apache.parquet:parquet-column:1.12.0'
testImplementation ('org.apache.hadoop:hadoop-common:3.3.1') {
// Needed for HadoopInputFile.fromPath(path(signed uri))
// Old version is needed - https://stackoverflow.com/questions/66713254/spark-wasb-and-jetty-11
implementation 'org.eclipse.jetty:jetty-util:9.4.55.v20240627'

// Needed for org.apache.parquet.avro.AvroParquetReader & AvroParquetWriter
implementation 'org.apache.parquet:parquet-avro:1.12.0'
// Needed for hadoop.fs.Path & hadoop.conf.Configuration
implementation ('org.apache.hadoop:hadoop-common:3.3.1') {
exclude group: 'com.sun.jersey', module: 'jersey-core'
exclude group: 'com.sun.jersey', module: 'jersey-servlet'
exclude group: 'com.sun.jersey', module: 'jersey-json'
exclude group: 'com.sun.jersey', module: 'jersey-server'
}
testImplementation ('org.apache.hadoop:hadoop-azure:3.3.1') {

implementation 'org.apache.parquet:parquet-common:1.12.0'
implementation 'org.apache.parquet:parquet-hadoop:1.12.0'
implementation 'org.apache.parquet:parquet-hadoop-bundle:1.12.0'
implementation 'org.apache.parquet:parquet-encoding:1.12.0'
implementation 'org.apache.parquet:parquet-column:1.12.0'
implementation ('org.apache.hadoop:hadoop-azure:3.3.1') {
exclude group: 'com.sun.jersey', module: 'jersey-core'
exclude group: 'com.sun.jersey', module: 'jersey-servlet'
exclude group: 'com.sun.jersey', module: 'jersey-json'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package bio.terra.service.parquet;

import bio.terra.model.TableDataType;
import com.azure.storage.blob.BlobUrlParts;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.OutputFile;

public class ParquetReaderWriterWithAvro {

private ParquetReaderWriterWithAvro() {}

public static List<GenericData.Record> readFromParquet(InputFile inputFile) throws IOException {
List<GenericData.Record> records = new ArrayList<>();
try (ParquetReader<GenericData.Record> reader =
AvroParquetReader.<GenericData.Record>builder(inputFile).build()) {
GenericData.Record record;

while ((record = reader.read()) != null) {
records.add(record);
}
return records;
}
}

public static List<GenericData.Record> readFromParquet(Path filePath) throws IOException {
var config = new Configuration();
config.set("parquet.avro.readInt96AsFixed", "true");
InputFile inputFile = HadoopInputFile.fromPath(filePath, config);
return readFromParquet(inputFile);
}

public static void writeToParquet(
List<GenericData.Record> recordsToWrite,
List<String> columnNames,
Map<String, TableDataType> columnDataTypeMap,
OutputFile fileToWrite,
Schema schema,
Configuration config)
throws IOException {
try (ParquetWriter<GenericData.Record> writer =
AvroParquetWriter.<GenericData.Record>builder(fileToWrite)
.withSchema(schema)
.withConf(config)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.build()) {

for (GenericData.Record record : recordsToWrite) {
var newRecord = new GenericData.Record(schema);
for (var column : columnNames) {
switch (columnDataTypeMap.get(column)) {
case DATETIME, TIMESTAMP:
// Convert from fixed length binary to a long representing microseconds since epoch
GenericData.Fixed dtFixed = (GenericData.Fixed) record.get(column);
if (dtFixed == null) {
newRecord.put(column, null);
} else {
var bytes = dtFixed.bytes();
ByteBuffer bb = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN);
long timeOfDayNanos = bb.getLong();
var julianDay = bb.getInt();
// Given timeOfDayNanos and julianDay, convert to microseconds since epoch
Long microSeconds =
(long) (julianDay - 2440588) * 86400 * 1000000 + timeOfDayNanos / 1000;
newRecord.put(column, microSeconds);
}
break;
default:
newRecord.put(column, record.get(column));
}
}
writer.write(newRecord);
}
}
}

public static void simpleWriteToParquet(
List<GenericData.Record> recordsToWrite,
OutputFile fileToWrite,
Schema schema,
Configuration config)
throws IOException {
try (ParquetWriter<GenericData.Record> writer =
AvroParquetWriter.<GenericData.Record>builder(fileToWrite)
.withSchema(schema)
.withConf(config)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.build()) {

for (GenericData.Record record : recordsToWrite) {
writer.write(record);
}
}
}

// Pulled this code from ParquetUtils.java
// In a real implementation, we would probably want to do something a little more direct in the
// flight rather than producing the signed url and then breaking it back down
public static Configuration getConfigFromSignedUri(String signedUrl) {
BlobUrlParts blobUrlParts = BlobUrlParts.parse(signedUrl);
Configuration config = new Configuration();
config.set("parquet.avro.readInt96AsFixed", "true");
config.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem");
config.set(
"fs.azure.sas."
+ blobUrlParts.getBlobContainerName()
+ "."
+ blobUrlParts.getAccountName()
+ ".blob.core.windows.net",
blobUrlParts.getCommonSasQueryParameters().encode());
return config;
}

public static URI getURIFromSignedUrl(String signedUrl) {
BlobUrlParts blobUrlParts = BlobUrlParts.parse(signedUrl);
URI uri;
try {
uri =
new URI(
"wasbs://"
+ blobUrlParts.getBlobContainerName()
+ "@"
+ blobUrlParts.getAccountName()
+ ".blob.core.windows.net/"
+ blobUrlParts.getBlobName());
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
return uri;
}
}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file added src/main/resources/parquet/bq_original.parquet
Binary file not shown.
Loading

0 comments on commit 71cd6a8

Please sign in to comment.