Skip to content

Commit

Permalink
Timestamp check for columns (#22)
Browse files Browse the repository at this point in the history
* fix: check timestamp only for partition column

* fix: add timestamp check
  • Loading branch information
lavkesh authored Jun 20, 2023
1 parent 273cd8a commit effe0ca
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 35 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ plugins {
}

group 'com.gotocompany'
version '0.4.8'
version '0.4.9'

repositories {
mavenCentral()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class BigQueryProtoStorageClient implements BigQueryStorageClient {

Expand All @@ -36,8 +35,6 @@ public class BigQueryProtoStorageClient implements BigQueryStorageClient {
private final MessageParser parser;
private final String schemaClass;
private final SinkConnectorSchemaMessageMode mode;
private static final long FIVE_YEARS_DAYS = 1825;
private static final long ONE_YEAR_DAYS = 365;

public BigQueryProtoStorageClient(BigQueryWriter writer, BigQuerySinkConfig config, MessageParser parser) {
this.writer = (BigQueryProtoWriter) writer;
Expand Down Expand Up @@ -93,12 +90,12 @@ public AppendRowsResponse appendAndGet(BigQueryPayload payload) throws Execution
private DynamicMessage convert(Message message, Descriptors.Descriptor descriptor) throws IOException {
ParsedMessage parsedMessage = parser.parse(message, mode, schemaClass);
parsedMessage.validate(config);
DynamicMessage.Builder messageBuilder = convert((DynamicMessage) parsedMessage.getRaw(), descriptor);
DynamicMessage.Builder messageBuilder = convert((DynamicMessage) parsedMessage.getRaw(), descriptor, true);
BigQueryProtoUtils.addMetadata(message.getMetadata(), messageBuilder, descriptor, config);
return messageBuilder.build();
}

private DynamicMessage.Builder convert(DynamicMessage inputMessage, Descriptors.Descriptor descriptor) {
private DynamicMessage.Builder convert(DynamicMessage inputMessage, Descriptors.Descriptor descriptor, boolean isTopLevel) {
DynamicMessage.Builder messageBuilder = DynamicMessage.newBuilder(descriptor);
List<Descriptors.FieldDescriptor> allFields = inputMessage.getDescriptorForType().getFields();
for (Descriptors.FieldDescriptor inputField : allFields) {
Expand All @@ -118,43 +115,20 @@ private DynamicMessage.Builder convert(DynamicMessage inputMessage, Descriptors.
}
if (fieldValue instanceof Instant) {
if (((Instant) fieldValue).getEpochSecond() > 0) {
long timeStampValue = getBQInstant((Instant) fieldValue, outputField);
long timeStampValue = TimeStampUtils.getBQInstant((Instant) fieldValue, outputField, isTopLevel, config);
messageBuilder.setField(outputField, timeStampValue);
}
} else if (protoField.getClass().getName().equals(MessageProtoField.class.getName())
|| protoField.getClass().getName().equals(DurationProtoField.class.getName())) {
Descriptors.Descriptor messageType = outputField.getMessageType();
messageBuilder.setField(outputField, convert((DynamicMessage) fieldValue, messageType).build());
messageBuilder.setField(outputField, convert((DynamicMessage) fieldValue, messageType, false).build());
} else {
messageBuilder.setField(outputField, fieldValue);
}
}
return messageBuilder;
}

private long getBQInstant(Instant instant, Descriptors.FieldDescriptor fieldDescriptor) {
Instant currentInstant = Instant.now();
boolean isPastInstant = currentInstant.isAfter(instant);
boolean isValid;
if (isPastInstant) {
Instant fiveYearPast = currentInstant.minusMillis(TimeUnit.DAYS.toMillis(FIVE_YEARS_DAYS));
isValid = fiveYearPast.isBefore(instant);
} else {
Instant oneYearFuture = currentInstant.plusMillis(TimeUnit.DAYS.toMillis(ONE_YEAR_DAYS));
isValid = oneYearFuture.isAfter(instant);

}
if (!isValid) {
throw new IllegalArgumentException(instant + " for field "
+ fieldDescriptor.getFullName() + " is outside the allowed bounds. "
+ "You can only stream to date range within 1825 days in the past "
+ "and 366 days in the future relative to the current date.");
}
// Timestamp should be in microseconds
return TimeUnit.SECONDS.toMicros(instant.getEpochSecond()) + TimeUnit.NANOSECONDS.toMicros(instant.getNano());
}


private void addRepeatedFields(DynamicMessage.Builder messageBuilder, Descriptors.FieldDescriptor outputField, List<?> fieldValue) {
if (fieldValue.isEmpty()) {
return;
Expand All @@ -163,11 +137,11 @@ private void addRepeatedFields(DynamicMessage.Builder messageBuilder, Descriptor
for (Object f : fieldValue) {
if (f instanceof DynamicMessage) {
Descriptors.Descriptor messageType = outputField.getMessageType();
repeatedNestedFields.add(convert((DynamicMessage) f, messageType).build());
repeatedNestedFields.add(convert((DynamicMessage) f, messageType, false).build());
} else {
if (f instanceof Instant) {
if (((Instant) f).getEpochSecond() > 0) {
repeatedNestedFields.add(getBQInstant((Instant) f, outputField));
repeatedNestedFields.add(TimeStampUtils.getBQInstant((Instant) f, outputField, false, config));
}
} else {
repeatedNestedFields.add(f);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.gotocompany.depot.bigquery.storage.proto;

import com.google.protobuf.Descriptors;
import com.gotocompany.depot.config.BigQuerySinkConfig;

import java.time.Instant;
import java.util.concurrent.TimeUnit;

public class TimeStampUtils {
private static final long FIVE_YEARS_DAYS = 1825;
private static final long ONE_YEAR_DAYS = 365;
private static final Instant MIN_TIMESTAMP = Instant.parse("0001-01-01T00:00:00Z");
private static final Instant MAX_TIMESTAMP = Instant.parse("9999-12-31T23:59:59.999999Z");

public static long getBQInstant(Instant instant, Descriptors.FieldDescriptor fieldDescriptor, boolean isTopLevel, BigQuerySinkConfig config) {
// Timestamp should be in microseconds
long timeStamp = TimeUnit.SECONDS.toMicros(instant.getEpochSecond()) + TimeUnit.NANOSECONDS.toMicros(instant.getNano());
// Partition column is always top level
if (isTopLevel && fieldDescriptor.getName().equals(config.getTablePartitionKey())) {
Instant currentInstant = Instant.now();
boolean isValid;
boolean isPastInstant = currentInstant.isAfter(instant);
if (isPastInstant) {
Instant fiveYearPast = currentInstant.minusMillis(TimeUnit.DAYS.toMillis(FIVE_YEARS_DAYS));
isValid = fiveYearPast.isBefore(instant);
} else {
Instant oneYearFuture = currentInstant.plusMillis(TimeUnit.DAYS.toMillis(ONE_YEAR_DAYS));
isValid = oneYearFuture.isAfter(instant);

}
if (!isValid) {
throw new UnsupportedOperationException(instant + " for field "
+ fieldDescriptor.getFullName() + " is outside the allowed bounds. "
+ "You can only stream to date range within 1825 days in the past "
+ "and 366 days in the future relative to the current date.");
}
return timeStamp;
} else {
// other timestamps should be in the limit specifies by BQ
if (instant.isAfter(MIN_TIMESTAMP) && instant.isBefore(MAX_TIMESTAMP)) {
return timeStamp;
} else {
throw new UnsupportedOperationException(instant
+ " for field "
+ fieldDescriptor.getFullName()
+ " is outside the allowed bounds in BQ.");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public void setUp() throws IOException, Descriptors.DescriptorValidationExceptio
System.setProperty("SINK_BIGQUERY_METADATA_NAMESPACE", "");
System.setProperty("SINK_BIGQUERY_METADATA_COLUMNS_TYPES",
"message_offset=integer,message_topic=string,load_time=timestamp,message_timestamp=timestamp,message_partition=integer");
System.setProperty("SINK_BIGQUERY_TABLE_PARTITION_KEY", "created_at");
System.setProperty("SINK_BIGQUERY_TABLE_PARTITIONING_ENABLE", "true");
ClassLoadStencilClient stencilClient = Mockito.mock(ClassLoadStencilClient.class, CALLS_REAL_METHODS);
protoMessageParser = new ProtoMessageParser(stencilClient);
testMessageBQSchema = TableSchema.newBuilder()
Expand Down Expand Up @@ -595,11 +597,34 @@ public void shouldNotConvertFiveYearsOldTimeStamp() throws IOException {
metas.add(r);
}
Assert.assertEquals(1, metas.size());
Assert.assertEquals(ErrorType.DESERIALIZATION_ERROR, metas.get(0).getErrorInfo().getErrorType());
Assert.assertEquals(ErrorType.INVALID_MESSAGE_ERROR, metas.get(0).getErrorInfo().getErrorType());
Assert.assertTrue(metas.get(0).getErrorInfo().getException().getMessage()
.contains("is outside the allowed bounds. You can only stream to date range within 1825 days in the past and 366 days in the future relative to the current date."));
}

@Test
public void shouldConvertAnyTimeStampIfNotPartitionColumn() throws IOException {
Instant moreThanFiveYears = Instant.now().minus(Days.of(18216));
Instant lessThanFiveYears = Instant.now().minus(Days.of(100));
TestMessageBQ m1 = TestMessageBQ.newBuilder()
.setCreatedAt(Timestamp.newBuilder().setSeconds(lessThanFiveYears.getEpochSecond()).build())
.addUpdatedAt(Timestamp.newBuilder().setSeconds(moreThanFiveYears.getEpochSecond()).build())
.addUpdatedAt(Timestamp.newBuilder().setSeconds(moreThanFiveYears.getEpochSecond()).build())
.build();
List<Message> inputList = new ArrayList<Message>() {{
add(new Message(null, m1.toByteArray()));
}};
BigQueryPayload payload = converter.convert(inputList);
ProtoRows protoPayload = (ProtoRows) payload.getPayload();
Assert.assertEquals(1, protoPayload.getSerializedRowsCount());
List<BigQueryRecordMeta> metas = new ArrayList<>();
for (BigQueryRecordMeta r : payload) {
metas.add(r);
}
Assert.assertEquals(1, metas.size());
Assert.assertNull(metas.get(0).getErrorInfo());
}

@Test
public void shouldNotConvertMoreThanOneYearFutureTimeStamp() throws IOException {
Instant moreThanOneYear = Instant.now().plus(Days.of(10000));
Expand All @@ -619,10 +644,34 @@ public void shouldNotConvertMoreThanOneYearFutureTimeStamp() throws IOException
metas.add(r);
}
Assert.assertEquals(1, metas.size());
Assert.assertEquals(ErrorType.DESERIALIZATION_ERROR, metas.get(0).getErrorInfo().getErrorType());
Assert.assertEquals(ErrorType.INVALID_MESSAGE_ERROR, metas.get(0).getErrorInfo().getErrorType());
Assert.assertTrue(metas.get(0).getErrorInfo().getException().getMessage()
.contains("is outside the allowed bounds. You can only stream to date range within 1825 days in the past and 366 days in the future relative to the current date."));
}
@Test
public void shouldNotConvertIfInvalidTimeStamp() throws IOException {
Instant now = Instant.now();
Instant invalid = Instant.ofEpochSecond(1111111111111111L);
TestMessageBQ m1 = TestMessageBQ.newBuilder()
.setCreatedAt(Timestamp.newBuilder().setSeconds(now.getEpochSecond()).build())
.addUpdatedAt(Timestamp.newBuilder().setSeconds(invalid.getEpochSecond()).build())
.addUpdatedAt(Timestamp.newBuilder().setSeconds(now.getEpochSecond()).build())
.build();
List<Message> inputList = new ArrayList<Message>() {{
add(new Message(null, m1.toByteArray()));
}};
BigQueryPayload payload = converter.convert(inputList);
ProtoRows protoPayload = (ProtoRows) payload.getPayload();
Assert.assertEquals(0, protoPayload.getSerializedRowsCount());
List<BigQueryRecordMeta> metas = new ArrayList<>();
for (BigQueryRecordMeta r : payload) {
metas.add(r);
}
Assert.assertEquals(1, metas.size());
Assert.assertEquals(ErrorType.INVALID_MESSAGE_ERROR, metas.get(0).getErrorInfo().getErrorType());
Assert.assertTrue(metas.get(0).getErrorInfo().getException().getMessage()
.contains("is outside the allowed bounds in BQ"));
}

@Test
public void shouldConvertTimeStampCloseToLimits() throws IOException {
Expand Down

0 comments on commit effe0ca

Please sign in to comment.