Skip to content

Commit

Permalink
test: add unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
lavkesh committed Oct 16, 2023
1 parent 10ac7d4 commit 1631230
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,23 @@ public String getTopic(String fieldName) {
return (String) metadata.getField(metadataDescriptor.findFieldByName(KafkaMetadataProtoMessage.MESSAGE_TOPIC_FIELD_NAME));
}

public Instant getTimestampFromMessage(String fieldName) {
return getTimeStampFromDescriptor(fieldName, message);
public Instant getMessageTimeStamp(String metadataColumnName) {
Descriptors.Descriptor metadataDescriptor = metadata.getDescriptorForType();
com.google.protobuf.Timestamp timestamp;
if (!metadataColumnName.isEmpty()) {
DynamicMessage nestedMetadataMessage = (DynamicMessage) metadata.getField(metadataDescriptor.findFieldByName(metadataColumnName));
Descriptors.Descriptor nestedMetadataMessageDescriptor = nestedMetadataMessage.getDescriptorForType();
timestamp = (com.google.protobuf.Timestamp) nestedMetadataMessage.getField(nestedMetadataMessageDescriptor.findFieldByName(KafkaMetadataProtoMessage.MESSAGE_TIMESTAMP_FIELD_NAME));
} else {
timestamp = (com.google.protobuf.Timestamp) metadata.getField(metadataDescriptor.findFieldByName(KafkaMetadataProtoMessage.MESSAGE_TIMESTAMP_FIELD_NAME));
}
long seconds = (long) timestamp.getField(timestamp.getDescriptorForType().findFieldByName("seconds"));
int nanos = (int) timestamp.getField(timestamp.getDescriptorForType().findFieldByName("nanos"));
return Instant.ofEpochSecond(seconds, nanos);
}

public Instant getTimestampFromMetadata(String fieldName) {
return getTimeStampFromDescriptor(fieldName, metadata);
public Instant getTimestampFromMessage(String fieldName) {
return getTimeStampFromDescriptor(fieldName, message);
}

public Instant getTimeStampFromDescriptor(String fieldName, DynamicMessage m) {
Expand All @@ -50,8 +61,8 @@ public LocalDateTime getLocalDateTime(BlobSinkConfig config) {
switch (config.getFilePartitionTimeType()) {
case MESSAGE_TIMESTAMP:
return LocalDateTime.ofInstant(
getTimestampFromMetadata(KafkaMetadataProtoMessage.MESSAGE_TIMESTAMP_FIELD_NAME),
ZoneId.of(config.getFilePartitionProtoTimestampTimezone()));
getMessageTimeStamp(config.getOutputKafkaMetadataColumnName()),
ZoneId.of("UTC"));
case PROCESSING_TIMESTAMP:
return LocalDateTime.now();
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
public class RecordTest {

private final Instant defaultTimestamp = Instant.parse("2020-01-01T10:00:00.000Z");
private final Instant messageTimeStamp = Instant.parse("2020-01-02T10:00:00.000Z");
private final int defaultOrderNumber = 100;
private final long defaultOffset = 1L;
private final int defaultPartition = 1;
Expand Down Expand Up @@ -64,6 +65,7 @@ public void shouldGetDateTimeLocally() throws InterruptedException {
@Test
public void shouldGetDateTimeFromMessage() throws InterruptedException {
BlobSinkConfig config = Mockito.mock(BlobSinkConfig.class);
Mockito.when(config.getFilePartitionTimeType()).thenReturn(TimePartitionType.EVENT_TIMESTAMP);
Mockito.when(config.getFilePartitionProtoTimestampFieldName()).thenReturn("created_time");
Mockito.when(config.getFilePartitionProtoTimestampTimezone()).thenReturn("UTC");
DynamicMessage message = TestUtils.createMessage(defaultTimestamp, defaultOrderNumber);
Expand All @@ -72,4 +74,17 @@ public void shouldGetDateTimeFromMessage() throws InterruptedException {
LocalDateTime localDateTime = record.getLocalDateTime(config);
Assert.assertEquals(LocalDateTime.ofInstant(defaultTimestamp, ZoneId.of("UTC")), localDateTime);
}
@Test
public void shouldGetDateTimeFromKafkaMessage() throws InterruptedException {
BlobSinkConfig config = Mockito.mock(BlobSinkConfig.class);
Mockito.when(config.getFilePartitionTimeType()).thenReturn(TimePartitionType.MESSAGE_TIMESTAMP);
Mockito.when(config.getOutputKafkaMetadataColumnName()).thenReturn("nested_field");
Mockito.when(config.getFilePartitionProtoTimestampFieldName()).thenReturn("created_time");
Mockito.when(config.getFilePartitionProtoTimestampTimezone()).thenReturn("UTC");
DynamicMessage message = TestUtils.createMessage(defaultTimestamp, defaultOrderNumber);
DynamicMessage metadata = TestUtils.createMetadata("nested_field", messageTimeStamp, defaultOffset, defaultPartition, defaultTopic);
Record record = new Record(message, metadata);
LocalDateTime localDateTime = record.getLocalDateTime(config);
Assert.assertEquals(LocalDateTime.ofInstant(messageTimeStamp, ZoneId.of("UTC")), localDateTime);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.protobuf.DynamicMessage;
import com.gotocompany.firehose.config.BlobSinkConfig;
import com.gotocompany.firehose.config.enums.TimePartitionType;
import com.gotocompany.firehose.sink.blob.Constants;
import com.gotocompany.firehose.sink.blob.TestProtoMessage;
import com.gotocompany.firehose.sink.blob.TestUtils;
Expand Down Expand Up @@ -41,6 +42,7 @@ public void shouldCreateDayPartitioningPath() {
Mockito.when(sinkConfig.getFilePartitionProtoTimestampTimezone()).thenReturn(zone);
Mockito.when(sinkConfig.getFilePartitionProtoTimestampFieldName()).thenReturn(timeStampFieldName);
Mockito.when(sinkConfig.getFilePartitionTimeGranularityType()).thenReturn(Constants.FilePartitionType.DAY);
Mockito.when(sinkConfig.getFilePartitionTimeType()).thenReturn(TimePartitionType.EVENT_TIMESTAMP);
Mockito.when(sinkConfig.getOutputKafkaMetadataColumnName()).thenReturn(kafkaMetadataFieldName);
Mockito.when(sinkConfig.getFilePartitionTimeDatePrefix()).thenReturn("date=");
Mockito.when(sinkConfig.getFilePartitionTimeHourPrefix()).thenReturn("");
Expand All @@ -58,6 +60,7 @@ public void shouldCreateHourPartitioningPath() {
Mockito.when(sinkConfig.getFilePartitionProtoTimestampTimezone()).thenReturn(zone);
Mockito.when(sinkConfig.getOutputKafkaMetadataColumnName()).thenReturn(kafkaMetadataFieldName);
Mockito.when(sinkConfig.getFilePartitionProtoTimestampFieldName()).thenReturn(timeStampFieldName);
Mockito.when(sinkConfig.getFilePartitionTimeType()).thenReturn(TimePartitionType.EVENT_TIMESTAMP);
Mockito.when(sinkConfig.getFilePartitionTimeGranularityType()).thenReturn(Constants.FilePartitionType.HOUR);
Mockito.when(sinkConfig.getFilePartitionTimeDatePrefix()).thenReturn(datePrefix);
Mockito.when(sinkConfig.getFilePartitionTimeHourPrefix()).thenReturn(hourPrefix);
Expand Down Expand Up @@ -91,6 +94,7 @@ public void shouldCreatePartitionPathWhenKafkaMetadataIsNotNested() {
BlobSinkConfig sinkConfig = Mockito.mock(BlobSinkConfig.class);
Mockito.when(sinkConfig.getFilePartitionProtoTimestampTimezone()).thenReturn(zone);
Mockito.when(sinkConfig.getFilePartitionTimeGranularityType()).thenReturn(Constants.FilePartitionType.DAY);
Mockito.when(sinkConfig.getFilePartitionTimeType()).thenReturn(TimePartitionType.EVENT_TIMESTAMP);
Mockito.when(sinkConfig.getFilePartitionProtoTimestampFieldName()).thenReturn(timeStampFieldName);
Mockito.when(sinkConfig.getFilePartitionTimeDatePrefix()).thenReturn(datePrefix);
Mockito.when(sinkConfig.getFilePartitionTimeHourPrefix()).thenReturn(hourPrefix);
Expand All @@ -107,6 +111,7 @@ public void shouldCreatePartitioningPathForNestedKafkaMetadata() {
Record record = new Record(message, metadata);
BlobSinkConfig sinkConfig = Mockito.mock(BlobSinkConfig.class);
Mockito.when(sinkConfig.getFilePartitionProtoTimestampTimezone()).thenReturn(zone);
Mockito.when(sinkConfig.getFilePartitionTimeType()).thenReturn(TimePartitionType.EVENT_TIMESTAMP);
Mockito.when(sinkConfig.getFilePartitionTimeGranularityType()).thenReturn(Constants.FilePartitionType.DAY);
Mockito.when(sinkConfig.getFilePartitionProtoTimestampFieldName()).thenReturn(timeStampFieldName);
Mockito.when(sinkConfig.getFilePartitionTimeDatePrefix()).thenReturn(datePrefix);
Expand Down

0 comments on commit 1631230

Please sign in to comment.