Skip to content

Commit

Permalink
Merge pull request #1876 from ClickHouse/v2_fix_aggregate_function_serde
Browse files Browse the repository at this point in the history
[client-v2] Fix Bitmap SerDe
  • Loading branch information
chernser authored Oct 29, 2024
2 parents 65bc28c + a0abab9 commit ec9f249
Show file tree
Hide file tree
Showing 14 changed files with 195 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ public static ClickHouseBitmap deserialize(DataInputStream in, ClickHouseDataTyp
byte[] bytes = new byte[2 + byteLen * cardinality];
bytes[0] = (byte) flag;
bytes[1] = cardinality;
in.read(bytes, 2, bytes.length - 2);
in.readFully(bytes, 2, bytes.length - 2);

rb = ClickHouseBitmap.deserialize(bytes, innerType);
} else {
Expand Down
21 changes: 10 additions & 11 deletions client-v2/src/main/java/com/clickhouse/client/api/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.clickhouse.client.api.data_formats.internal.BinaryStreamReader;
import com.clickhouse.client.api.data_formats.internal.MapBackedRecord;
import com.clickhouse.client.api.data_formats.internal.ProcessParser;
import com.clickhouse.client.api.data_formats.internal.SerializerUtils;
import com.clickhouse.client.api.enums.Protocol;
import com.clickhouse.client.api.enums.ProxyType;
import com.clickhouse.client.api.insert.DataSerializationException;
Expand All @@ -26,7 +27,6 @@
import com.clickhouse.client.api.internal.ClientV1AdaptorHelper;
import com.clickhouse.client.api.internal.HttpAPIClientHelper;
import com.clickhouse.client.api.internal.MapUtils;
import com.clickhouse.client.api.internal.SerializerUtils;
import com.clickhouse.client.api.internal.SettingsConverter;
import com.clickhouse.client.api.internal.TableSchemaParser;
import com.clickhouse.client.api.internal.ValidationUtils;
Expand All @@ -47,7 +47,6 @@
import com.clickhouse.data.ClickHouseColumn;
import com.clickhouse.data.ClickHouseDataType;
import com.clickhouse.data.ClickHouseFormat;
import com.clickhouse.data.format.BinaryStreamUtils;
import org.apache.hc.client5.http.ConnectTimeoutException;
import org.apache.hc.core5.concurrent.DefaultThreadFactory;
import org.apache.hc.core5.http.ClassicHttpResponse;
Expand Down Expand Up @@ -1094,35 +1093,35 @@ public synchronized void register(Class<?> clazz, TableSchema schema) {

if (defaultsSupport) {
if (value != null) {//Because we now support defaults, we have to send nonNull
BinaryStreamUtils.writeNonNull(stream);//Write 0 for no default
SerializerUtils.writeNonNull(stream);//Write 0 for no default

if (column.isNullable()) {//If the column is nullable
BinaryStreamUtils.writeNonNull(stream);//Write 0 for not null
SerializerUtils.writeNonNull(stream);//Write 0 for not null
}
} else {//So if the object is null
if (column.hasDefault()) {
BinaryStreamUtils.writeNull(stream);//Send 1 for default
SerializerUtils.writeNull(stream);//Send 1 for default
return;
} else if (column.isNullable()) {//And the column is nullable
BinaryStreamUtils.writeNonNull(stream);
BinaryStreamUtils.writeNull(stream);//Then we send null, write 1
SerializerUtils.writeNonNull(stream);
SerializerUtils.writeNull(stream);//Then we send null, write 1
return;//And we're done
} else if (column.getDataType() == ClickHouseDataType.Array) {//If the column is an array
BinaryStreamUtils.writeNonNull(stream);//Then we send nonNull
SerializerUtils.writeNonNull(stream);//Then we send nonNull
} else {
throw new IllegalArgumentException(String.format("An attempt to write null into not nullable column '%s'", column.getColumnName()));
}
}
} else {
if (column.isNullable()) {
if (value == null) {
BinaryStreamUtils.writeNull(stream);
SerializerUtils.writeNull(stream);
return;
}
BinaryStreamUtils.writeNonNull(stream);
SerializerUtils.writeNonNull(stream);
} else if (value == null) {
if (column.getDataType() == ClickHouseDataType.Array) {
BinaryStreamUtils.writeNonNull(stream);
SerializerUtils.writeNonNull(stream);
} else {
throw new IllegalArgumentException(String.format("An attempt to write null into not nullable column '%s'", column.getColumnName()));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.clickhouse.client.api.data_formats;

import com.clickhouse.client.api.metadata.TableSchema;
import com.clickhouse.data.value.ClickHouseBitmap;
import com.clickhouse.data.value.ClickHouseGeoMultiPolygonValue;
import com.clickhouse.data.value.ClickHouseGeoPointValue;
import com.clickhouse.data.value.ClickHouseGeoPolygonValue;
Expand Down Expand Up @@ -519,4 +520,8 @@ public interface ClickHouseBinaryFormatReader extends AutoCloseable {
LocalDateTime getLocalDateTime(int index);

TableSchema getSchema();

ClickHouseBitmap getClickHouseBitmap(String colName);

ClickHouseBitmap getClickHouseBitmap(int index);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

import com.clickhouse.client.api.ClientException;
import com.clickhouse.client.api.data_formats.ClickHouseBinaryFormatReader;
import com.clickhouse.client.api.internal.SerializerUtils;
import com.clickhouse.client.api.metadata.TableSchema;
import com.clickhouse.client.api.query.NullValueException;
import com.clickhouse.client.api.query.POJOSetter;
import com.clickhouse.client.api.query.QuerySettings;
import com.clickhouse.client.config.ClickHouseClientOption;
import com.clickhouse.data.ClickHouseColumn;
import com.clickhouse.data.value.ClickHouseBitmap;
import com.clickhouse.data.value.ClickHouseGeoMultiPolygonValue;
import com.clickhouse.data.value.ClickHouseGeoPointValue;
import com.clickhouse.data.value.ClickHouseGeoPolygonValue;
Expand Down Expand Up @@ -707,6 +707,16 @@ public LocalDateTime getLocalDateTime(int index) {
return (LocalDateTime) value;
}

@Override
public ClickHouseBitmap getClickHouseBitmap(String colName) {
return readValue(colName);
}

@Override
public ClickHouseBitmap getClickHouseBitmap(int index) {
return readValue(index);
}

@Override
public void close() throws Exception {
input.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.clickhouse.client.api.data_formats.ClickHouseBinaryFormatReader;
import com.clickhouse.client.api.query.GenericRecord;
import com.clickhouse.data.value.ClickHouseBitmap;
import com.clickhouse.data.value.ClickHouseGeoMultiPolygonValue;
import com.clickhouse.data.value.ClickHouseGeoPointValue;
import com.clickhouse.data.value.ClickHouseGeoPolygonValue;
Expand Down Expand Up @@ -356,4 +357,14 @@ public Object getObject(String colName) {
public Object getObject(int index) {
return reader.readValue(index);
}

@Override
public ClickHouseBitmap getClickHouseBitmap(String colName) {
return reader.readValue(colName);
}

@Override
public ClickHouseBitmap getClickHouseBitmap(int index) {
return reader.readValue(index);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public <T> T readValue(ClickHouseColumn column, Class<?> typeHint) throws IOExce
return null;
// case SimpleAggregateFunction:
case AggregateFunction:
return (T) ClickHouseBitmap.deserialize(input, column.getNestedColumns().get(0).getDataType());
return (T) readBitmap( column);
default:
throw new IllegalArgumentException("Unsupported data type: " + column.getDataType());
}
Expand Down Expand Up @@ -887,6 +887,11 @@ public static boolean isReadToPrimitive(ClickHouseDataType dataType) {
return false;
}
}

private ClickHouseBitmap readBitmap(ClickHouseColumn column) throws IOException {
return ClickHouseBitmap.deserialize(input, column.getNestedColumns().get(0).getDataType());
}

/**
* Byte allocator that caches preallocated byte arrays for small sizes.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.clickhouse.client.api.query.NullValueException;
import com.clickhouse.data.ClickHouseColumn;
import com.clickhouse.data.value.ClickHouseArrayValue;
import com.clickhouse.data.value.ClickHouseBitmap;
import com.clickhouse.data.value.ClickHouseGeoMultiPolygonValue;
import com.clickhouse.data.value.ClickHouseGeoPointValue;
import com.clickhouse.data.value.ClickHouseGeoPolygonValue;
Expand Down Expand Up @@ -486,6 +487,16 @@ public LocalDateTime getLocalDateTime(int index) {
return (LocalDateTime) value;
}

@Override
public ClickHouseBitmap getClickHouseBitmap(String colName) {
return readValue(colName);
}

@Override
public ClickHouseBitmap getClickHouseBitmap(int index) {
return readValue(index);
}

@Override
public Object getObject(String colName) {
return readValue(colName);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package com.clickhouse.client.api.internal;
package com.clickhouse.client.api.data_formats.internal;

import com.clickhouse.client.api.Client;
import com.clickhouse.client.api.ClientException;
import com.clickhouse.client.api.data_formats.internal.BinaryStreamReader;
import com.clickhouse.client.api.query.POJOSetter;
import com.clickhouse.data.ClickHouseAggregateFunction;
import com.clickhouse.data.ClickHouseColumn;
Expand Down Expand Up @@ -72,14 +71,14 @@ private static void serializeArrayData(OutputStream stream, Object value, ClickH
//Serialize the array to the stream
//The array is a list of values
List<?> values = (List<?>) value;
BinaryStreamUtils.writeVarInt(stream, values.size());
writeVarInt(stream, values.size());
for (Object val : values) {
if (column.getArrayBaseColumn().isNullable()) {
if (val == null) {
BinaryStreamUtils.writeNull(stream);
writeNull(stream);
continue;
}
BinaryStreamUtils.writeNonNull(stream);
writeNonNull(stream);
}
serializeData(stream, val, column.getArrayBaseColumn());
}
Expand Down Expand Up @@ -107,7 +106,7 @@ private static void serializeMapData(OutputStream stream, Object value, ClickHou
//Serialize the map to the stream
//The map is a list of key-value pairs
Map<?, ?> map = (Map<?, ?>) value;
BinaryStreamUtils.writeVarInt(stream, map.size());
writeVarInt(stream, map.size());
map.forEach((key, val) -> {
try {
serializePrimitiveData(stream, key, Objects.requireNonNull(column.getKeyInfo()));
Expand Down Expand Up @@ -213,7 +212,13 @@ private static void serializePrimitiveData(OutputStream stream, Object value, Cl

private static void serializeAggregateFunction(OutputStream stream, Object value, ClickHouseColumn column) throws IOException {
if (column.getAggregateFunction() == ClickHouseAggregateFunction.groupBitmap) {
BinaryStreamUtils.writeBitmap(stream, (ClickHouseBitmap) value);
if (value == null) {
throw new IllegalArgumentException("Cannot serialize null value for aggregate function: " + column.getAggregateFunction());
} else if (value instanceof ClickHouseBitmap) {
stream.write(((ClickHouseBitmap)value).toBytes()); // TODO: review toBytes() implementation - it can be simplified
} else {
throw new IllegalArgumentException("Cannot serialize value of type " + value.getClass() + " for aggregate function: " + column.getAggregateFunction());
}
} else {
throw new UnsupportedOperationException("Unsupported aggregate function: " + column.getAggregateFunction());
}
Expand Down Expand Up @@ -548,6 +553,36 @@ public Class<?> defineClass(String name, byte[] code) throws ClassNotFoundExcept
}
}

public static void writeVarInt(OutputStream output, long value) throws IOException {
// reference code https://github.com/ClickHouse/ClickHouse/blob/abe314feecd1647d7c2b952a25da7abf5c19f352/src/IO/VarInt.h#L187
for (int i = 0; i < 9; i++) {
byte b = (byte) (value & 0x7F);

if (value > 0x7F) {
b |= 0x80;
}

output.write(b);
value >>= 7;

if (value == 0) {
return;
}
}
}

public static void writeNull(OutputStream output) throws IOException {
writeBoolean(output, true);
}

public static void writeNonNull(OutputStream output) throws IOException {
writeBoolean(output, false);
}

public static void writeBoolean(OutputStream output, boolean value) throws IOException {
output.write(value ? 1 : 0);
}

public static class NumberConverter {

public static byte toByte(Number value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.clickhouse.client.api.ConnectionInitiationException;
import com.clickhouse.client.api.ConnectionReuseStrategy;
import com.clickhouse.client.api.ServerException;
import com.clickhouse.client.api.data_formats.internal.SerializerUtils;
import com.clickhouse.client.api.enums.ProxyType;
import com.clickhouse.client.config.ClickHouseClientOption;
import com.clickhouse.client.config.ClickHouseDefaults;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.clickhouse.client.api.query;

import com.clickhouse.data.value.ClickHouseBitmap;
import com.clickhouse.data.value.ClickHouseGeoMultiPolygonValue;
import com.clickhouse.data.value.ClickHouseGeoPointValue;
import com.clickhouse.data.value.ClickHouseGeoPolygonValue;
Expand Down Expand Up @@ -489,4 +490,8 @@ public interface GenericRecord {
Object getObject(String colName);

Object getObject(int index);

ClickHouseBitmap getClickHouseBitmap(String colName);

ClickHouseBitmap getClickHouseBitmap(int index);
}
Original file line number Diff line number Diff line change
Expand Up @@ -680,10 +680,10 @@ public static String generateTableCreateSQL(String tableName) {
"array Array(String), " +
"tuple Tuple(UInt64, Int32, String), " +
"map Map(String, Int32), " +
"nested Nested (innerInt Int32, innerString String, innerNullableInt Nullable(Int32))" +
// "groupBitmapUint32 AggregateFunction(groupBitmap, UInt32)," +
// TODO: fix this
// "groupBitmapUint64 AggregateFunction(groupBitmap, UInt64)" +
"nested Nested (innerInt Int32, innerString String, " +
"innerNullableInt Nullable(Int32)), " +
"groupBitmapUint32 AggregateFunction(groupBitmap, UInt32), " +
"groupBitmapUint64 AggregateFunction(groupBitmap, UInt64) " +
") ENGINE = Memory";
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.clickhouse.client.internal;

import com.clickhouse.client.api.internal.SerializerUtils;
import com.clickhouse.client.api.data_formats.internal.SerializerUtils;
import org.testng.annotations.Test;

import static org.testng.AssertJUnit.assertEquals;
import static org.junit.Assert.assertEquals;

public class SerializerUtilsTests {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.clickhouse.client.query;

import com.clickhouse.data.value.ClickHouseBitmap;

import java.util.Objects;
import java.util.Random;

public class AggregateFuncDTO {

private ClickHouseBitmap groupBitmapUint32;
private ClickHouseBitmap groupBitmapUint64;

public AggregateFuncDTO() {
Random random = new Random();
this.groupBitmapUint32 = ClickHouseBitmap.wrap(random.ints(5, Integer.MAX_VALUE - 100, Integer.MAX_VALUE).toArray());
this.groupBitmapUint64 = ClickHouseBitmap.wrap(random.longs(5, Long.MAX_VALUE - 100, Long.MAX_VALUE).toArray());
}

public ClickHouseBitmap getGroupBitmapUint32() {
return groupBitmapUint32;
}

public void setGroupBitmapUint32(ClickHouseBitmap groupBitmapUint32) {
this.groupBitmapUint32 = groupBitmapUint32;
}

public ClickHouseBitmap getGroupBitmapUint64() {
return groupBitmapUint64;
}

public void setGroupBitmapUint64(ClickHouseBitmap groupBitmapUint64) {
this.groupBitmapUint64 = groupBitmapUint64;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AggregateFuncDTO that = (AggregateFuncDTO) o;
return Objects.equals(groupBitmapUint32, that.groupBitmapUint32) && Objects.equals(groupBitmapUint64, that.groupBitmapUint64);
}

@Override
public int hashCode() {
return Objects.hash(groupBitmapUint32, groupBitmapUint64);
}

public static String generateTableCreateSQL(String tableName) {
return "CREATE TABLE " + tableName + " (" +
"groupBitmapUint32 AggregateFunction(groupBitmap, UInt32), " +
"groupBitmapUint64 AggregateFunction(groupBitmap, UInt64) " +
") ENGINE = MergeTree() ORDER BY tuple()";
}
}
Loading

0 comments on commit ec9f249

Please sign in to comment.