Skip to content

Commit

Permalink
Transfer Kernel
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Sep 11, 2023
1 parent 58783c0 commit 8dab591
Show file tree
Hide file tree
Showing 18 changed files with 931 additions and 362 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import io.deephaven.util.SafeCloseable;
import org.jetbrains.annotations.NotNull;
import io.deephaven.util.annotations.FinalDefault;
import org.apache.parquet.column.statistics.Statistics;

import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
class MappedSchema {

static MappedSchema create(
final Map<String, Map<ParquetTableWriter.CacheTags, Object>> computedCache,
final Map<String, Map<ParquetCacheTags, Object>> computedCache,
final TableDefinition definition,
final RowSet rowSet,
final Map<String, ? extends ColumnSource<?>> columnSourceMap,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/**
* Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.parquet.table;

public enum ParquetCacheTags {
DECIMAL_ARGS
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
* Contains the necessary information to convert a Deephaven table into a Parquet table. Both the schema translation,
* and the data translation.
*/
class TypeInfos {
public class TypeInfos {
private static final TypeInfo[] TYPE_INFOS = new TypeInfo[] {
IntType.INSTANCE,
LongType.INSTANCE,
Expand Down Expand Up @@ -105,14 +105,14 @@ static Pair<String, String> getCodecAndArgs(
return new ImmutablePair<>(SerializableCodec.class.getName(), null);
}

static PrecisionAndScale getPrecisionAndScale(
@NotNull final Map<String, Map<ParquetTableWriter.CacheTags, Object>> computedCache,
public static PrecisionAndScale getPrecisionAndScale(
@NotNull final Map<String, Map<ParquetCacheTags, Object>> computedCache,
@NotNull final String columnName,
@NotNull final RowSet rowSet,
@NotNull Supplier<ColumnSource<BigDecimal>> columnSourceSupplier) {
return (PrecisionAndScale) computedCache
.computeIfAbsent(columnName, unusedColumnName -> new HashMap<>())
.computeIfAbsent(ParquetTableWriter.CacheTags.DECIMAL_ARGS,
.computeIfAbsent(ParquetCacheTags.DECIMAL_ARGS,
uct -> parquetCompatible(computePrecisionAndScale(rowSet, columnSourceSupplier.get())));
}

Expand All @@ -130,7 +130,7 @@ private static PrecisionAndScale parquetCompatible(PrecisionAndScale pas) {
}

static TypeInfo bigDecimalTypeInfo(
final Map<String, Map<ParquetTableWriter.CacheTags, Object>> computedCache,
final Map<String, Map<ParquetCacheTags, Object>> computedCache,
@NotNull final ColumnDefinition<?> column,
final RowSet rowSet,
final Map<String, ? extends ColumnSource<?>> columnSourceMap) {
Expand All @@ -157,7 +157,7 @@ public PrimitiveBuilder<PrimitiveType> getBuilder(boolean required, boolean repe
}

static TypeInfo getTypeInfo(
final Map<String, Map<ParquetTableWriter.CacheTags, Object>> computedCache,
final Map<String, Map<ParquetCacheTags, Object>> computedCache,
@NotNull final ColumnDefinition<?> column,
final RowSet rowSet,
final Map<String, ? extends ColumnSource<?>> columnSourceMap,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/**
* Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.parquet.table.transfer;

import io.deephaven.chunk.ByteChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.table.ChunkSource;
import io.deephaven.engine.table.ColumnSource;
import org.apache.parquet.column.statistics.Statistics;
import org.jetbrains.annotations.NotNull;

import java.nio.ByteBuffer;

public class BooleanTransfer implements TransferObject<ByteBuffer> {

private final ColumnSource<?> columnSource;
private final ChunkSource.GetContext context;
private ByteChunk<? extends Values> chunk;
private ByteBuffer buffer;

public BooleanTransfer(@NotNull final ColumnSource<?> columnSource, final int targetSize) {
this.columnSource = columnSource;
this.buffer = ByteBuffer.allocate(targetSize);
this.context = columnSource.makeGetContext(targetSize);
}

@Override
public ByteBuffer getBuffer() {
return buffer;
}

@Override
public int rowCount() {
return chunk.size();
}

@Override
public void fetchData(@NotNull final RowSequence rs) {
chunk = columnSource.getChunk(context, rs).asByteChunk();
if(buffer.capacity() < chunk.size()) {
buffer = ByteBuffer.allocate(chunk.size());
}

buffer.clear();
for (int ii = 0; ii < chunk.size(); ii++) {
final byte val = chunk.get(ii);
buffer.put(val);
}
buffer.flip();
}

@Override
public void close() {
context.close();
}

@Override
public <T extends Comparable<T>> void updateStatistics(@NotNull final Statistics<T> stats) {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/**
* Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
*/
/*
* ---------------------------------------------------------------------------------------------------------------------
* AUTO-GENERATED CLASS - DO NOT EDIT MANUALLY - for any changes edit IntTransfer and regenerate
* ---------------------------------------------------------------------------------------------------------------------
*/
package io.deephaven.parquet.table.transfer;

import io.deephaven.chunk.ByteChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.table.ChunkSource;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.util.QueryConstants;
import org.apache.parquet.column.statistics.IntStatistics;
import org.apache.parquet.column.statistics.Statistics;
import org.jetbrains.annotations.NotNull;

import java.nio.IntBuffer;

public class ByteTransfer implements TransferObject<IntBuffer> {

private final ColumnSource<?> columnSource;
private final ChunkSource.GetContext context;
private ByteChunk<? extends Values> chunk;
private IntBuffer buffer;
private byte minValue = QueryConstants.NULL_BYTE;
private byte maxValue = QueryConstants.NULL_BYTE;

public ByteTransfer(@NotNull final ColumnSource<?> columnSource, final int targetSize) {
this.columnSource = columnSource;
this.buffer = IntBuffer.allocate(targetSize);
this.context = columnSource.makeGetContext(targetSize);
}

@Override
public IntBuffer getBuffer() {
return buffer;
}

@Override
public int rowCount() {
return chunk.size();
}

@Override
public void fetchData(@NotNull final RowSequence rs) {
chunk = columnSource.getChunk(context, rs).asByteChunk();
if(buffer.capacity() < chunk.size()) {
buffer = IntBuffer.allocate(chunk.size());
}

buffer.clear();
for (int ii = 0; ii < chunk.size(); ii++) {
final byte val = chunk.get(ii);
if(val != QueryConstants.NULL_BYTE) {
if (minValue == QueryConstants.NULL_BYTE) {
minValue = maxValue = val;
} else if (val < minValue) {
minValue = val;
} else if (val > maxValue) {
maxValue = val;
}
}

buffer.put(val);
}
buffer.flip();
}

@Override
public void close() {
context.close();
}

@Override
public <T extends Comparable<T>> void updateStatistics(@NotNull final Statistics<T> stats) {
if(minValue != QueryConstants.NULL_BYTE) {
((IntStatistics) stats).setMinMax(minValue, maxValue);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/**
* Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
*/
/*
* ---------------------------------------------------------------------------------------------------------------------
* AUTO-GENERATED CLASS - DO NOT EDIT MANUALLY - for any changes edit IntTransfer and regenerate
* ---------------------------------------------------------------------------------------------------------------------
*/
package io.deephaven.parquet.table.transfer;

import io.deephaven.chunk.CharChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.table.ChunkSource;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.util.QueryConstants;
import org.apache.parquet.column.statistics.IntStatistics;
import org.apache.parquet.column.statistics.Statistics;
import org.jetbrains.annotations.NotNull;

import java.nio.IntBuffer;

public class CharTransfer implements TransferObject<IntBuffer> {

private final ColumnSource<?> columnSource;
private final ChunkSource.GetContext context;
private CharChunk<? extends Values> chunk;
private IntBuffer buffer;
private char minValue = QueryConstants.NULL_CHAR;
private char maxValue = QueryConstants.NULL_CHAR;

public CharTransfer(@NotNull final ColumnSource<?> columnSource, final int targetSize) {
this.columnSource = columnSource;
this.buffer = IntBuffer.allocate(targetSize);
this.context = columnSource.makeGetContext(targetSize);
}

@Override
public IntBuffer getBuffer() {
return buffer;
}

@Override
public int rowCount() {
return chunk.size();
}

@Override
public void fetchData(@NotNull final RowSequence rs) {
chunk = columnSource.getChunk(context, rs).asCharChunk();
if(buffer.capacity() < chunk.size()) {
buffer = IntBuffer.allocate(chunk.size());
}

buffer.clear();
for (int ii = 0; ii < chunk.size(); ii++) {
final char val = chunk.get(ii);
if(val != QueryConstants.NULL_CHAR) {
if (minValue == QueryConstants.NULL_CHAR) {
minValue = maxValue = val;
} else if (val < minValue) {
minValue = val;
} else if (val > maxValue) {
maxValue = val;
}
}

buffer.put(val);
}
buffer.flip();
}

@Override
public void close() {
context.close();
}

@Override
public <T extends Comparable<T>> void updateStatistics(@NotNull final Statistics<T> stats) {
if(minValue != QueryConstants.NULL_CHAR) {
((IntStatistics) stats).setMinMax(minValue, maxValue);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/**
* Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.parquet.table.transfer;

import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.table.ChunkSource;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.util.codec.ObjectCodec;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.io.api.Binary;
import org.jetbrains.annotations.NotNull;

public class CodecTransfer<T> implements TransferObject<Binary[]> {

private final ChunkSource.GetContext context;
private final ObjectCodec<? super T> codec;
private ObjectChunk<T, Values> chunk;
private final Binary[] buffer;
private final ColumnSource<T> columnSource;


public CodecTransfer(
@NotNull final ColumnSource<T> columnSource,
@NotNull final ObjectCodec<? super T> codec,
final int targetSize) {
this.columnSource = columnSource;
this.buffer = new Binary[targetSize];
context = this.columnSource.makeGetContext(targetSize);
this.codec = codec;
}

@Override
public Binary[] getBuffer() {
return buffer;
}

@Override
public int rowCount() {
return chunk.size();
}

@Override
public void fetchData(@NotNull final RowSequence rs) {
// noinspection unchecked
chunk = (ObjectChunk<T, Values>) columnSource.getChunk(context, rs);
for (int i = 0; i < chunk.size(); i++) {
T value = chunk.get(i);
buffer[i] = value == null ? null : Binary.fromConstantByteArray(codec.encode(value));
}
}

@Override
public void close() {
context.close();
}

@Override
public <T extends Comparable<T>> void updateStatistics(@NotNull final Statistics<T> stats) {

}
}
Loading

0 comments on commit 8dab591

Please sign in to comment.