Skip to content

Commit

Permalink
fix: bq timestamp filter constraints (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
lavkesh authored Jun 9, 2023
1 parent 6d78e73 commit dc1ad87
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 27 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ plugins {
}

group 'com.gotocompany'
version '0.4.6'
version '0.4.7'

repositories {
mavenCentral()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class BigQueryProtoStorageClient implements BigQueryStorageClient {
private final MessageParser parser;
private final String schemaClass;
private final SinkConnectorSchemaMessageMode mode;
private static final long FIVE_YEARS_DAYS = 1825;
private static final long ONE_YEAR_DAYS = 365;

public BigQueryProtoStorageClient(BigQueryWriter writer, BigQuerySinkConfig config, MessageParser parser) {
this.writer = (BigQueryProtoWriter) writer;
Expand Down Expand Up @@ -115,8 +117,8 @@ private DynamicMessage.Builder convert(DynamicMessage inputMessage, Descriptors.
continue;
}
if (fieldValue instanceof Instant) {
long timeStampValue = getBQInstant((Instant) fieldValue);
if (timeStampValue > 0) {
if (((Instant) fieldValue).getEpochSecond() > 0) {
long timeStampValue = getBQInstant((Instant) fieldValue, outputField);
messageBuilder.setField(outputField, timeStampValue);
}
} else if (protoField.getClass().getName().equals(MessageProtoField.class.getName())
Expand All @@ -130,11 +132,29 @@ private DynamicMessage.Builder convert(DynamicMessage inputMessage, Descriptors.
return messageBuilder;
}

private long getBQInstant(Instant instant) {
private long getBQInstant(Instant instant, Descriptors.FieldDescriptor fieldDescriptor) {
Instant currentInstant = Instant.now();
boolean isPastInstant = currentInstant.isAfter(instant);
boolean isValid;
if (isPastInstant) {
Instant fiveYearPast = currentInstant.minusMillis(TimeUnit.DAYS.toMillis(FIVE_YEARS_DAYS));
isValid = fiveYearPast.isBefore(instant);
} else {
Instant oneYearFuture = currentInstant.plusMillis(TimeUnit.DAYS.toMillis(ONE_YEAR_DAYS));
isValid = oneYearFuture.isAfter(instant);

}
if (!isValid) {
throw new IllegalArgumentException(instant + " for field "
+ fieldDescriptor.getFullName() + " is outside the allowed bounds. "
+ "You can only stream to date range within 1825 days in the past "
+ "and 366 days in the future relative to the current date.");
}
// Timestamp should be in microseconds
return TimeUnit.SECONDS.toMicros(instant.getEpochSecond()) + TimeUnit.NANOSECONDS.toMicros(instant.getNano());
}


private void addRepeatedFields(DynamicMessage.Builder messageBuilder, Descriptors.FieldDescriptor outputField, List<?> fieldValue) {
if (fieldValue.isEmpty()) {
return;
Expand All @@ -146,7 +166,9 @@ private void addRepeatedFields(DynamicMessage.Builder messageBuilder, Descriptor
repeatedNestedFields.add(convert((DynamicMessage) f, messageType).build());
} else {
if (f instanceof Instant) {
repeatedNestedFields.add(getBQInstant((Instant) f));
if (((Instant) f).getEpochSecond() > 0) {
repeatedNestedFields.add(getBQInstant((Instant) f, outputField));
}
} else {
repeatedNestedFields.add(f);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import org.junit.Test;
import org.mockito.Mockito;
import org.skyscreamer.jsonassert.JSONAssert;
import org.threeten.extra.Days;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.mockito.Mockito.CALLS_REAL_METHODS;
Expand Down Expand Up @@ -171,6 +173,7 @@ public void shouldConvertPrimitiveFields() throws Exception {
.setOrderNumber("order-no-112")
.setOrderUrl("order-url-1")
.setDiscount(1200L)
.setCreatedAt(Timestamp.newBuilder().setSeconds(Instant.now().getEpochSecond()))
.setPrice(23)
.setUserToken(ByteString.copyFrom("test-token".getBytes()))
.setCounter(20)
Expand Down Expand Up @@ -200,6 +203,7 @@ public void shouldConvertPrimitiveFields() throws Exception {
@Test
public void shouldReturnDurationField() throws IOException {
TestMessageBQ m1 = TestMessageBQ.newBuilder()
.setCreatedAt(Timestamp.newBuilder().setSeconds(Instant.now().getEpochSecond()))
.setTripDuration(Duration.newBuilder().setSeconds(1234L).setNanos(231).build())
.build();
List<Message> inputList = new ArrayList<Message>() {{
Expand All @@ -219,6 +223,7 @@ public void shouldReturnDurationField() throws IOException {
public void shouldReturnMapField() throws Exception {
TestMessageBQ m1 = TestMessageBQ.newBuilder()
.putCurrentState("k4", "v4")
.setCreatedAt(Timestamp.newBuilder().setSeconds(Instant.now().getEpochSecond()))
.putCurrentState("k3", "v3")
.putCurrentState("k1", "v1")
.putCurrentState("k2", "v2")
Expand Down Expand Up @@ -321,10 +326,11 @@ public void shouldReturnComplexAndNestedType() throws Descriptors.DescriptorVali

@Test
public void shouldConvertTimeStamp() throws IOException {
Instant now = Instant.now();
TestMessageBQ m1 = TestMessageBQ.newBuilder()
.setCreatedAt(Timestamp.newBuilder().setSeconds(1680609402L).build())
.addUpdatedAt(Timestamp.newBuilder().setSeconds(1680609402L).build())
.addUpdatedAt(Timestamp.newBuilder().setSeconds(1680609402L).build())
.setCreatedAt(Timestamp.newBuilder().setSeconds(now.getEpochSecond()).build())
.addUpdatedAt(Timestamp.newBuilder().setSeconds(now.getEpochSecond()).build())
.addUpdatedAt(Timestamp.newBuilder().setSeconds(now.getEpochSecond()).build())
.build();
List<Message> inputList = new ArrayList<Message>() {{
add(new Message(null, m1.toByteArray()));
Expand All @@ -336,10 +342,10 @@ public void shouldConvertTimeStamp() throws IOException {
DynamicMessage convertedMessage = DynamicMessage.parseFrom(testDescriptor, serializedRows);
long createdAt = (long) convertedMessage.getField(testDescriptor.findFieldByName("created_at"));
// Microseconds
Assert.assertEquals(1680609402000000L, createdAt);
Assert.assertEquals(TimeUnit.SECONDS.toMicros(now.getEpochSecond()), createdAt);
List<Object> updatedAt = (List) convertedMessage.getField(testDescriptor.findFieldByName("updated_at"));
Assert.assertEquals(1680609402000000L, updatedAt.get(0));
Assert.assertEquals(1680609402000000L, updatedAt.get(1));
Assert.assertEquals(TimeUnit.SECONDS.toMicros(now.getEpochSecond()), updatedAt.get(0));
Assert.assertEquals(TimeUnit.SECONDS.toMicros(now.getEpochSecond()), updatedAt.get(1));
}

@Test
Expand All @@ -359,6 +365,7 @@ public void shouldConvertStruct() throws IOException {

TestMessageBQ m1 = TestMessageBQ.newBuilder()
.setOrderNumber("order-1")
.setCreatedAt(Timestamp.newBuilder().setSeconds(Instant.now().getEpochSecond()))
.setProperties(value)
.build();
List<Message> inputList = new ArrayList<Message>() {{
Expand All @@ -385,8 +392,9 @@ public void shouldConvertStruct() throws IOException {

@Test
public void shouldHaveMetadataOnPayload() throws InvalidProtocolBufferException {
Instant now = Instant.now();
TestMessageBQ m1 = TestMessageBQ.newBuilder()
.setCreatedAt(Timestamp.newBuilder().setSeconds(1680609402L).build())
.setCreatedAt(Timestamp.newBuilder().setSeconds(now.getEpochSecond()).build())
.build();
List<Message> inputList = new ArrayList<Message>() {{
add(new Message(
Expand All @@ -395,17 +403,17 @@ public void shouldHaveMetadataOnPayload() throws InvalidProtocolBufferException
new Tuple<>("message_partition", 10),
new Tuple<>("message_topic", "test-topic"),
new Tuple<>("message_offset", 143),
new Tuple<>("load_time", 1680609402L),
new Tuple<>("message_timestamp", 1680609402L))
new Tuple<>("load_time", now.toEpochMilli()),
new Tuple<>("message_timestamp", now.toEpochMilli()))
);
add(new Message(
null,
m1.toByteArray(),
new Tuple<>("message_partition", 10),
new Tuple<>("message_topic", "test-topic"),
new Tuple<>("message_offset", 144L),
new Tuple<>("load_time", 1680770429L),
new Tuple<>("message_timestamp", 1680770429L))
new Tuple<>("load_time", now.toEpochMilli()),
new Tuple<>("message_timestamp", now.toEpochMilli()))
);
}};
BigQueryPayload payload = converter.convert(inputList);
Expand All @@ -416,16 +424,16 @@ public void shouldHaveMetadataOnPayload() throws InvalidProtocolBufferException
Assert.assertEquals(10L, convertedMessage.getField(testDescriptor.findFieldByName("message_partition")));
Assert.assertEquals("test-topic", convertedMessage.getField(testDescriptor.findFieldByName("message_topic")));
Assert.assertEquals(143L, convertedMessage.getField(testDescriptor.findFieldByName("message_offset")));
Assert.assertEquals(1680609402000L, convertedMessage.getField(testDescriptor.findFieldByName("load_time")));
Assert.assertEquals(1680609402000L, convertedMessage.getField(testDescriptor.findFieldByName("message_timestamp")));
Assert.assertEquals(TimeUnit.MILLISECONDS.toMicros(now.toEpochMilli()), convertedMessage.getField(testDescriptor.findFieldByName("load_time")));
Assert.assertEquals(TimeUnit.MILLISECONDS.toMicros(now.toEpochMilli()), convertedMessage.getField(testDescriptor.findFieldByName("message_timestamp")));

serializedRows = protoPayload.getSerializedRows(1);
convertedMessage = DynamicMessage.parseFrom(testDescriptor, serializedRows);
Assert.assertEquals(10L, convertedMessage.getField(testDescriptor.findFieldByName("message_partition")));
Assert.assertEquals("test-topic", convertedMessage.getField(testDescriptor.findFieldByName("message_topic")));
Assert.assertEquals(144L, convertedMessage.getField(testDescriptor.findFieldByName("message_offset")));
Assert.assertEquals(1680770429000L, convertedMessage.getField(testDescriptor.findFieldByName("load_time")));
Assert.assertEquals(1680770429000L, convertedMessage.getField(testDescriptor.findFieldByName("message_timestamp")));
Assert.assertEquals(TimeUnit.MILLISECONDS.toMicros(now.toEpochMilli()), convertedMessage.getField(testDescriptor.findFieldByName("load_time")));
Assert.assertEquals(TimeUnit.MILLISECONDS.toMicros(now.toEpochMilli()), convertedMessage.getField(testDescriptor.findFieldByName("message_timestamp")));
}


Expand Down Expand Up @@ -475,8 +483,9 @@ public void shouldHaveMetadataOnPayloadWithNameSpace() throws InvalidProtocolBuf
converter = new BigQueryProtoStorageClient(writer, config, protoMessageParser);
Mockito.when(writer.getDescriptor()).thenReturn(testDescriptor);

Instant now = Instant.now();
TestMessageBQ m1 = TestMessageBQ.newBuilder()
.setCreatedAt(Timestamp.newBuilder().setSeconds(1680609402L).build())
.setCreatedAt(Timestamp.newBuilder().setSeconds(now.getEpochSecond()).build())
.build();

List<Message> inputList = new ArrayList<Message>() {{
Expand All @@ -486,8 +495,8 @@ public void shouldHaveMetadataOnPayloadWithNameSpace() throws InvalidProtocolBuf
new Tuple<>("message_partition", 10),
new Tuple<>("message_topic", "test-topic"),
new Tuple<>("message_offset", 143),
new Tuple<>("load_time", 1680609402L),
new Tuple<>("message_timestamp", 1680609402L))
new Tuple<>("load_time", now.toEpochMilli()),
new Tuple<>("message_timestamp", now.toEpochMilli()))
);
}};
BigQueryPayload payload = converter.convert(inputList);
Expand All @@ -499,14 +508,15 @@ public void shouldHaveMetadataOnPayloadWithNameSpace() throws InvalidProtocolBuf
Assert.assertEquals(10L, metadata.getField(metadata.getDescriptorForType().findFieldByName("message_partition")));
Assert.assertEquals("test-topic", metadata.getField(metadata.getDescriptorForType().findFieldByName("message_topic")));
Assert.assertEquals(143L, metadata.getField(metadata.getDescriptorForType().findFieldByName("message_offset")));
Assert.assertEquals(1680609402000L, metadata.getField(metadata.getDescriptorForType().findFieldByName("load_time")));
Assert.assertEquals(1680609402000L, metadata.getField(metadata.getDescriptorForType().findFieldByName("message_timestamp")));
Assert.assertEquals(TimeUnit.MILLISECONDS.toMicros(now.toEpochMilli()), metadata.getField(metadata.getDescriptorForType().findFieldByName("load_time")));
Assert.assertEquals(TimeUnit.MILLISECONDS.toMicros(now.toEpochMilli()), metadata.getField(metadata.getDescriptorForType().findFieldByName("message_timestamp")));
}

@Test
public void shouldReturnInvalidRecords() throws InvalidProtocolBufferException {
Instant now = Instant.now();
TestMessageBQ m1 = TestMessageBQ.newBuilder()
.setCreatedAt(Timestamp.newBuilder().setSeconds(1680609402L).build())
.setCreatedAt(Timestamp.newBuilder().setSeconds(now.getEpochSecond()).build())
.build();
List<Message> inputList = new ArrayList<Message>() {{
add(new Message(null, m1.toByteArray(), new Tuple<>("message_offset", 11)));
Expand All @@ -526,7 +536,7 @@ public void shouldReturnInvalidRecords() throws InvalidProtocolBufferException {
DynamicMessage convertedMessage = DynamicMessage.parseFrom(testDescriptor, serializedRows);
long createdAt = (long) convertedMessage.getField(testDescriptor.findFieldByName("created_at"));
// Microseconds
Assert.assertEquals(1680609402000000L, createdAt);
Assert.assertEquals(TimeUnit.SECONDS.toMicros(now.getEpochSecond()), createdAt);

List<BigQueryRecordMeta> metas = new ArrayList<>();
for (BigQueryRecordMeta r : payload) {
Expand All @@ -544,4 +554,77 @@ public void shouldReturnInvalidRecords() throws InvalidProtocolBufferException {
Assert.assertEquals("While parsing a protocol message, the input ended unexpectedly in the middle of a field. This could mean either that the input has been truncated or that an embedded message misreported its own length.",
invalidRecord.getErrorInfo().getException().getMessage());
}

@Test
public void shouldNotConvertFiveYearsOldTimeStamp() throws IOException {
Instant moreThanFiveYears = Instant.now().minus(Days.of(1826));
TestMessageBQ m1 = TestMessageBQ.newBuilder()
.setCreatedAt(Timestamp.newBuilder().setSeconds(moreThanFiveYears.getEpochSecond()).build())
.addUpdatedAt(Timestamp.newBuilder().setSeconds(moreThanFiveYears.getEpochSecond()).build())
.addUpdatedAt(Timestamp.newBuilder().setSeconds(moreThanFiveYears.getEpochSecond()).build())
.build();
List<Message> inputList = new ArrayList<Message>() {{
add(new Message(null, m1.toByteArray()));
}};
BigQueryPayload payload = converter.convert(inputList);
ProtoRows protoPayload = (ProtoRows) payload.getPayload();
Assert.assertEquals(0, protoPayload.getSerializedRowsCount());
List<BigQueryRecordMeta> metas = new ArrayList<>();
for (BigQueryRecordMeta r : payload) {
metas.add(r);
}
Assert.assertEquals(1, metas.size());
Assert.assertEquals(ErrorType.DESERIALIZATION_ERROR, metas.get(0).getErrorInfo().getErrorType());
Assert.assertTrue(metas.get(0).getErrorInfo().getException().getMessage()
.contains("is outside the allowed bounds. You can only stream to date range within 1825 days in the past and 366 days in the future relative to the current date."));
}

@Test
public void shouldNotConvertMoreThanOneYearFutureTimeStamp() throws IOException {
Instant moreThanOneYear = Instant.now().plus(Days.of(10000));
TestMessageBQ m1 = TestMessageBQ.newBuilder()
.setCreatedAt(Timestamp.newBuilder().setSeconds(moreThanOneYear.getEpochSecond()).build())
.addUpdatedAt(Timestamp.newBuilder().setSeconds(moreThanOneYear.getEpochSecond()).build())
.addUpdatedAt(Timestamp.newBuilder().setSeconds(moreThanOneYear.getEpochSecond()).build())
.build();
List<Message> inputList = new ArrayList<Message>() {{
add(new Message(null, m1.toByteArray()));
}};
BigQueryPayload payload = converter.convert(inputList);
ProtoRows protoPayload = (ProtoRows) payload.getPayload();
Assert.assertEquals(0, protoPayload.getSerializedRowsCount());
List<BigQueryRecordMeta> metas = new ArrayList<>();
for (BigQueryRecordMeta r : payload) {
metas.add(r);
}
Assert.assertEquals(1, metas.size());
Assert.assertEquals(ErrorType.DESERIALIZATION_ERROR, metas.get(0).getErrorInfo().getErrorType());
Assert.assertTrue(metas.get(0).getErrorInfo().getException().getMessage()
.contains("is outside the allowed bounds. You can only stream to date range within 1825 days in the past and 366 days in the future relative to the current date."));
}

@Test
public void shouldConvertTimeStampCloseToLimits() throws IOException {
Instant past = Instant.now().minus(Days.of(1824));
Instant future = Instant.now().plus(Days.of(365));
TestMessageBQ m1 = TestMessageBQ.newBuilder()
.setCreatedAt(Timestamp.newBuilder().setSeconds(past.getEpochSecond()).build())
.addUpdatedAt(Timestamp.newBuilder().setSeconds(future.getEpochSecond()).build())
.addUpdatedAt(Timestamp.newBuilder().setSeconds(past.getEpochSecond()).build())
.build();
List<Message> inputList = new ArrayList<Message>() {{
add(new Message(null, m1.toByteArray()));
}};
BigQueryPayload payload = converter.convert(inputList);
ProtoRows protoPayload = (ProtoRows) payload.getPayload();
Assert.assertEquals(1, protoPayload.getSerializedRowsCount());
ByteString serializedRows = protoPayload.getSerializedRows(0);
DynamicMessage convertedMessage = DynamicMessage.parseFrom(testDescriptor, serializedRows);
long createdAt = (long) convertedMessage.getField(testDescriptor.findFieldByName("created_at"));
// Microseconds
Assert.assertEquals(TimeUnit.SECONDS.toMicros(past.getEpochSecond()), createdAt);
List<Object> updatedAt = (List) convertedMessage.getField(testDescriptor.findFieldByName("updated_at"));
Assert.assertEquals(TimeUnit.SECONDS.toMicros(future.getEpochSecond()), updatedAt.get(0));
Assert.assertEquals(TimeUnit.SECONDS.toMicros(past.getEpochSecond()), updatedAt.get(1));
}
}

0 comments on commit dc1ad87

Please sign in to comment.