Skip to content

Commit

Permalink
More BugFixes
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Dec 5, 2024
1 parent c5000e7 commit c7ec56d
Show file tree
Hide file tree
Showing 14 changed files with 73 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,17 @@ public int available() throws IOException {
}

/**
* There are two cases we don't send a validity buffer: - the simplest case is following the arrow flight spec,
* which says that if there are no nulls present, the buffer is optional. - Our implementation of nullCount()
* for primitive types will return zero if the useDeephavenNulls flag is set, so the buffer will also be omitted
* in that case. The client's marshaller does not need to be aware of deephaven nulls but in this mode we assume
* the consumer understands which value is the assigned NULL.
* @formatter:off
* There are two cases we don't send a validity buffer:
* - the simplest case is following the arrow flight spec, which says that if there are no nulls present, the
* buffer is optional.
* - Our implementation of nullCount() for primitive types will return zero if the useDeephavenNulls flag is
* set, so the buffer will also be omitted in that case. The client's marshaller does not need to be aware of
* deephaven nulls but in this mode we assume the consumer understands which value is the assigned NULL.
* @formatter:on
*/
protected boolean sendValidityBuffer() {
return !fieldNullable || nullCount() != 0;
return fieldNullable && nullCount() != 0;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
//
package io.deephaven.extensions.barrage.chunk;

import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.WritableByteChunk;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.WritableLongChunk;
import io.deephaven.chunk.WritableObjectChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.util.BooleanUtils;
import io.deephaven.util.datastructures.LongSizedDataStructure;
Expand All @@ -16,6 +18,7 @@
import java.io.IOException;
import java.util.Iterator;
import java.util.PrimitiveIterator;
import java.util.function.Function;

import static io.deephaven.extensions.barrage.chunk.BaseChunkWriter.getNumLongsForBitPackOfSize;

Expand All @@ -39,6 +42,33 @@ public BooleanChunkReader(ByteConversion conversion) {
this.conversion = conversion;
}

public <T> ChunkReader<WritableObjectChunk<T, Values>> transform(Function<Byte, T> transform) {
return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows) -> {
try (final WritableByteChunk<Values> inner = BooleanChunkReader.this.readChunk(
fieldNodeIter, bufferInfoIter, is, null, 0, 0)) {

final WritableObjectChunk<T, Values> chunk = castOrCreateChunk(
outChunk,
Math.max(totalRows, inner.size()),
WritableObjectChunk::makeWritableChunk,
WritableChunk::asWritableObjectChunk);

if (outChunk == null) {
// if we're not given an output chunk then we better be writing at the front of the new one
Assert.eqZero(outOffset, "outOffset");
}

for (int ii = 0; ii < inner.size(); ++ii) {
byte value = inner.get(ii);
chunk.set(outOffset + ii, transform.apply(value));
}
chunk.setSize(outOffset + inner.size());

return chunk;
}
};
}

@Override
public WritableByteChunk<Values> readChunk(
@NotNull final Iterator<ChunkWriter.FieldNodeInfo> fieldNodeIter,
Expand Down Expand Up @@ -99,7 +129,6 @@ public WritableByteChunk<Values> readChunk(
return chunk;
}


private static void useValidityBuffer(
final ByteConversion conversion,
final DataInput is,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
public class ByteChunkWriter<SOURCE_CHUNK_TYPE extends Chunk<Values>> extends BaseChunkWriter<SOURCE_CHUNK_TYPE> {
private static final String DEBUG_NAME = "ByteChunkWriter";
private static final ByteChunkWriter<ByteChunk<Values>> NULLABLE_IDENTITY_INSTANCE = new ByteChunkWriter<>(
ByteChunk::isNull, ByteChunk::getEmptyChunk, ByteChunk::get, false);
private static final ByteChunkWriter<ByteChunk<Values>> NON_NULLABLE_IDENTITY_INSTANCE = new ByteChunkWriter<>(
ByteChunk::isNull, ByteChunk::getEmptyChunk, ByteChunk::get, true);
private static final ByteChunkWriter<ByteChunk<Values>> NON_NULLABLE_IDENTITY_INSTANCE = new ByteChunkWriter<>(
ByteChunk::isNull, ByteChunk::getEmptyChunk, ByteChunk::get, false);


public static ByteChunkWriter<ByteChunk<Values>> getIdentity(boolean isNullable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
public class CharChunkWriter<SOURCE_CHUNK_TYPE extends Chunk<Values>> extends BaseChunkWriter<SOURCE_CHUNK_TYPE> {
private static final String DEBUG_NAME = "CharChunkWriter";
private static final CharChunkWriter<CharChunk<Values>> NULLABLE_IDENTITY_INSTANCE = new CharChunkWriter<>(
CharChunk::isNull, CharChunk::getEmptyChunk, CharChunk::get, false);
private static final CharChunkWriter<CharChunk<Values>> NON_NULLABLE_IDENTITY_INSTANCE = new CharChunkWriter<>(
CharChunk::isNull, CharChunk::getEmptyChunk, CharChunk::get, true);
private static final CharChunkWriter<CharChunk<Values>> NON_NULLABLE_IDENTITY_INSTANCE = new CharChunkWriter<>(
CharChunk::isNull, CharChunk::getEmptyChunk, CharChunk::get, false);


public static CharChunkWriter<CharChunk<Values>> getIdentity(boolean isNullable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.time.DateTimeUtils;
import io.deephaven.util.BooleanUtils;
import io.deephaven.util.QueryConstants;
import io.deephaven.util.type.TypeUtils;
import io.deephaven.vector.Vector;
Expand Down Expand Up @@ -275,7 +276,17 @@ public <T extends WritableChunk<Values>> ChunkReader<T> newReaderPojo(

if (typeId == ArrowType.ArrowTypeID.Union) {
final ArrowType.Union unionType = (ArrowType.Union) field.getType();
final List<ChunkReader<WritableChunk<Values>>> innerReaders = new ArrayList<>();
final List<ChunkReader<? extends WritableChunk<Values>>> innerReaders = new ArrayList<>();

for (int ii = 0; ii < field.getChildren().size(); ++ii) {
final Field childField = field.getChildren().get(ii);
final BarrageTypeInfo<Field> childTypeInfo = BarrageUtil.getDefaultType(childField);
ChunkReader<? extends WritableChunk<Values>> childReader = newReaderPojo(childTypeInfo, options, false);
if (childField.getType().getTypeID() == ArrowType.ArrowTypeID.Bool) {
childReader = ((BooleanChunkReader) childReader).transform(BooleanUtils::byteAsBoolean);
}
innerReaders.add(childReader);
}

// noinspection unchecked
return (ChunkReader<T>) new UnionChunkReader<T>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
public class DoubleChunkWriter<SOURCE_CHUNK_TYPE extends Chunk<Values>> extends BaseChunkWriter<SOURCE_CHUNK_TYPE> {
private static final String DEBUG_NAME = "DoubleChunkWriter";
private static final DoubleChunkWriter<DoubleChunk<Values>> NULLABLE_IDENTITY_INSTANCE = new DoubleChunkWriter<>(
DoubleChunk::isNull, DoubleChunk::getEmptyChunk, DoubleChunk::get, false);
private static final DoubleChunkWriter<DoubleChunk<Values>> NON_NULLABLE_IDENTITY_INSTANCE = new DoubleChunkWriter<>(
DoubleChunk::isNull, DoubleChunk::getEmptyChunk, DoubleChunk::get, true);
private static final DoubleChunkWriter<DoubleChunk<Values>> NON_NULLABLE_IDENTITY_INSTANCE = new DoubleChunkWriter<>(
DoubleChunk::isNull, DoubleChunk::getEmptyChunk, DoubleChunk::get, false);


public static DoubleChunkWriter<DoubleChunk<Values>> getIdentity(boolean isNullable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
public class FloatChunkWriter<SOURCE_CHUNK_TYPE extends Chunk<Values>> extends BaseChunkWriter<SOURCE_CHUNK_TYPE> {
private static final String DEBUG_NAME = "FloatChunkWriter";
private static final FloatChunkWriter<FloatChunk<Values>> NULLABLE_IDENTITY_INSTANCE = new FloatChunkWriter<>(
FloatChunk::isNull, FloatChunk::getEmptyChunk, FloatChunk::get, false);
private static final FloatChunkWriter<FloatChunk<Values>> NON_NULLABLE_IDENTITY_INSTANCE = new FloatChunkWriter<>(
FloatChunk::isNull, FloatChunk::getEmptyChunk, FloatChunk::get, true);
private static final FloatChunkWriter<FloatChunk<Values>> NON_NULLABLE_IDENTITY_INSTANCE = new FloatChunkWriter<>(
FloatChunk::isNull, FloatChunk::getEmptyChunk, FloatChunk::get, false);


public static FloatChunkWriter<FloatChunk<Values>> getIdentity(boolean isNullable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
public class IntChunkWriter<SOURCE_CHUNK_TYPE extends Chunk<Values>> extends BaseChunkWriter<SOURCE_CHUNK_TYPE> {
private static final String DEBUG_NAME = "IntChunkWriter";
private static final IntChunkWriter<IntChunk<Values>> NULLABLE_IDENTITY_INSTANCE = new IntChunkWriter<>(
IntChunk::isNull, IntChunk::getEmptyChunk, IntChunk::get, false);
private static final IntChunkWriter<IntChunk<Values>> NON_NULLABLE_IDENTITY_INSTANCE = new IntChunkWriter<>(
IntChunk::isNull, IntChunk::getEmptyChunk, IntChunk::get, true);
private static final IntChunkWriter<IntChunk<Values>> NON_NULLABLE_IDENTITY_INSTANCE = new IntChunkWriter<>(
IntChunk::isNull, IntChunk::getEmptyChunk, IntChunk::get, false);


public static IntChunkWriter<IntChunk<Values>> getIdentity(boolean isNullable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
public class LongChunkWriter<SOURCE_CHUNK_TYPE extends Chunk<Values>> extends BaseChunkWriter<SOURCE_CHUNK_TYPE> {
private static final String DEBUG_NAME = "LongChunkWriter";
private static final LongChunkWriter<LongChunk<Values>> NULLABLE_IDENTITY_INSTANCE = new LongChunkWriter<>(
LongChunk::isNull, LongChunk::getEmptyChunk, LongChunk::get, false);
private static final LongChunkWriter<LongChunk<Values>> NON_NULLABLE_IDENTITY_INSTANCE = new LongChunkWriter<>(
LongChunk::isNull, LongChunk::getEmptyChunk, LongChunk::get, true);
private static final LongChunkWriter<LongChunk<Values>> NON_NULLABLE_IDENTITY_INSTANCE = new LongChunkWriter<>(
LongChunk::isNull, LongChunk::getEmptyChunk, LongChunk::get, false);


public static LongChunkWriter<LongChunk<Values>> getIdentity(boolean isNullable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
public class ShortChunkWriter<SOURCE_CHUNK_TYPE extends Chunk<Values>> extends BaseChunkWriter<SOURCE_CHUNK_TYPE> {
private static final String DEBUG_NAME = "ShortChunkWriter";
private static final ShortChunkWriter<ShortChunk<Values>> NULLABLE_IDENTITY_INSTANCE = new ShortChunkWriter<>(
ShortChunk::isNull, ShortChunk::getEmptyChunk, ShortChunk::get, false);
private static final ShortChunkWriter<ShortChunk<Values>> NON_NULLABLE_IDENTITY_INSTANCE = new ShortChunkWriter<>(
ShortChunk::isNull, ShortChunk::getEmptyChunk, ShortChunk::get, true);
private static final ShortChunkWriter<ShortChunk<Values>> NON_NULLABLE_IDENTITY_INSTANCE = new ShortChunkWriter<>(
ShortChunk::isNull, ShortChunk::getEmptyChunk, ShortChunk::get, false);


public static ShortChunkWriter<ShortChunk<Values>> getIdentity(boolean isNullable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public OutputChunkType readChunk(
for (int ii = 0; ii < wireValues.size(); ++ii) {
transformFunction.apply(wireValues, chunk, ii, outOffset + ii);
}
chunk.setSize(outOffset + wireValues.size());
return chunk;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ public static Mode mode(UnionMode mode) {
private static final String DEBUG_NAME = "UnionChunkReader";

private final Mode mode;
private final List<ChunkReader<WritableChunk<Values>>> readers;
private final List<ChunkReader<? extends WritableChunk<Values>>> readers;

public UnionChunkReader(
final Mode mode,
final List<ChunkReader<WritableChunk<Values>>> readers) {
final List<ChunkReader<? extends WritableChunk<Values>>> readers) {
this.mode = mode;
this.readers = readers;
// the specification doesn't allow the union column to have more than signed byte number of types
Expand All @@ -65,7 +65,7 @@ public WritableObjectChunk<T, Values> readChunk(
int numRows = nodeInfo.numElements;
if (numRows == 0) {
is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, coiBufferLength + offsetsBufferLength));
for (final ChunkReader<WritableChunk<Values>> reader : readers) {
for (final ChunkReader<? extends WritableChunk<Values>> reader : readers) {
// noinspection EmptyTryBlock
try (final SafeCloseable ignored = reader.readChunk(fieldNodeIter, bufferInfoIter, is, null, 0, 0)) {
// do nothing; we need each reader to consume fieldNodeIter and bufferInfoIter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ public static BarrageTable make(
return value instanceof Boolean && (Boolean) value;
};

schema.attributes.put(Table.BARRAGE_SCHEMA_ATTRIBUTE, schema.arrowSchema);
if (getAttribute.test(Table.BLINK_TABLE_ATTRIBUTE)) {
final LinkedHashMap<String, ColumnSource<?>> finalColumns = makeColumns(schema, writableSources);
table = new BarrageBlinkTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,11 +265,12 @@ public BarrageMessage safelyParseFrom(final BarrageOptions options,
}

// fill the chunk with data and assign back into the array
chunk = readers.get(ci).readChunk(fieldNodeIter, bufferInfoIter, ois, chunk, chunk.size(),
final int origSize = chunk.size();
chunk = readers.get(ci).readChunk(fieldNodeIter, bufferInfoIter, ois, chunk, origSize,
(int) batch.length());
acd.data.set(lastChunkIndex, chunk);
if (!options.columnsAsList()) {
chunk.setSize(chunk.size() + (int) batch.length());
chunk.setSize(origSize + (int) batch.length());
}
}

Expand Down

0 comments on commit c7ec56d

Please sign in to comment.