diff --git a/src/main/java/com/gotocompany/firehose/sink/blob/message/Record.java b/src/main/java/com/gotocompany/firehose/sink/blob/message/Record.java index 10b118f59..4cc14e179 100644 --- a/src/main/java/com/gotocompany/firehose/sink/blob/message/Record.java +++ b/src/main/java/com/gotocompany/firehose/sink/blob/message/Record.java @@ -29,12 +29,23 @@ public String getTopic(String fieldName) { return (String) metadata.getField(metadataDescriptor.findFieldByName(KafkaMetadataProtoMessage.MESSAGE_TOPIC_FIELD_NAME)); } - public Instant getTimestampFromMessage(String fieldName) { - return getTimeStampFromDescriptor(fieldName, message); + public Instant getMessageTimeStamp(String metadataColumnName) { + Descriptors.Descriptor metadataDescriptor = metadata.getDescriptorForType(); + com.google.protobuf.Timestamp timestamp; + if (!metadataColumnName.isEmpty()) { + DynamicMessage nestedMetadataMessage = (DynamicMessage) metadata.getField(metadataDescriptor.findFieldByName(metadataColumnName)); + Descriptors.Descriptor nestedMetadataMessageDescriptor = nestedMetadataMessage.getDescriptorForType(); + timestamp = (com.google.protobuf.Timestamp) nestedMetadataMessage.getField(nestedMetadataMessageDescriptor.findFieldByName(KafkaMetadataProtoMessage.MESSAGE_TIMESTAMP_FIELD_NAME)); + } else { + timestamp = (com.google.protobuf.Timestamp) metadata.getField(metadataDescriptor.findFieldByName(KafkaMetadataProtoMessage.MESSAGE_TIMESTAMP_FIELD_NAME)); + } + long seconds = (long) timestamp.getField(timestamp.getDescriptorForType().findFieldByName("seconds")); + int nanos = (int) timestamp.getField(timestamp.getDescriptorForType().findFieldByName("nanos")); + return Instant.ofEpochSecond(seconds, nanos); } - public Instant getTimestampFromMetadata(String fieldName) { - return getTimeStampFromDescriptor(fieldName, metadata); + public Instant getTimestampFromMessage(String fieldName) { + return getTimeStampFromDescriptor(fieldName, message); } public Instant getTimeStampFromDescriptor(String fieldName, DynamicMessage m) { @@ -50,7 +61,7 @@ public LocalDateTime getLocalDateTime(BlobSinkConfig config) { switch (config.getFilePartitionTimeType()) { case MESSAGE_TIMESTAMP: return LocalDateTime.ofInstant( - getTimestampFromMetadata(KafkaMetadataProtoMessage.MESSAGE_TIMESTAMP_FIELD_NAME), + getMessageTimeStamp(config.getOutputKafkaMetadataColumnName()), ZoneId.of(config.getFilePartitionProtoTimestampTimezone())); case PROCESSING_TIMESTAMP: return LocalDateTime.now(); diff --git a/src/test/java/com/gotocompany/firehose/sink/blob/message/RecordTest.java b/src/test/java/com/gotocompany/firehose/sink/blob/message/RecordTest.java index d2bd958a1..e0d449f91 100644 --- a/src/test/java/com/gotocompany/firehose/sink/blob/message/RecordTest.java +++ b/src/test/java/com/gotocompany/firehose/sink/blob/message/RecordTest.java @@ -15,6 +15,7 @@ public class RecordTest { private final Instant defaultTimestamp = Instant.parse("2020-01-01T10:00:00.000Z"); + private final Instant messageTimeStamp = Instant.parse("2020-01-02T10:00:00.000Z"); private final int defaultOrderNumber = 100; private final long defaultOffset = 1L; private final int defaultPartition = 1; @@ -64,6 +65,7 @@ public void shouldGetDateTimeLocally() throws InterruptedException { @Test public void shouldGetDateTimeFromMessage() throws InterruptedException { BlobSinkConfig config = Mockito.mock(BlobSinkConfig.class); + Mockito.when(config.getFilePartitionTimeType()).thenReturn(TimePartitionType.EVENT_TIMESTAMP); Mockito.when(config.getFilePartitionProtoTimestampFieldName()).thenReturn("created_time"); Mockito.when(config.getFilePartitionProtoTimestampTimezone()).thenReturn("UTC"); DynamicMessage message = TestUtils.createMessage(defaultTimestamp, defaultOrderNumber); @@ -72,4 +74,17 @@ public void shouldGetDateTimeFromMessage() throws InterruptedException { LocalDateTime localDateTime = record.getLocalDateTime(config); Assert.assertEquals(LocalDateTime.ofInstant(defaultTimestamp, ZoneId.of("UTC")), localDateTime); } + @Test + public void shouldGetDateTimeFromKafkaMessage() throws InterruptedException { + BlobSinkConfig config = Mockito.mock(BlobSinkConfig.class); + Mockito.when(config.getFilePartitionTimeType()).thenReturn(TimePartitionType.MESSAGE_TIMESTAMP); + Mockito.when(config.getOutputKafkaMetadataColumnName()).thenReturn("nested_field"); + Mockito.when(config.getFilePartitionProtoTimestampFieldName()).thenReturn("created_time"); + Mockito.when(config.getFilePartitionProtoTimestampTimezone()).thenReturn("UTC"); + DynamicMessage message = TestUtils.createMessage(defaultTimestamp, defaultOrderNumber); + DynamicMessage metadata = TestUtils.createMetadata("nested_field", messageTimeStamp, defaultOffset, defaultPartition, defaultTopic); + Record record = new Record(message, metadata); + LocalDateTime localDateTime = record.getLocalDateTime(config); + Assert.assertEquals(LocalDateTime.ofInstant(messageTimeStamp, ZoneId.of("UTC")), localDateTime); + } } diff --git a/src/test/java/com/gotocompany/firehose/sink/blob/writer/local/TimePartitionedPathUtilsTest.java b/src/test/java/com/gotocompany/firehose/sink/blob/writer/local/TimePartitionedPathUtilsTest.java index b67359b4c..8771123e3 100644 --- a/src/test/java/com/gotocompany/firehose/sink/blob/writer/local/TimePartitionedPathUtilsTest.java +++ b/src/test/java/com/gotocompany/firehose/sink/blob/writer/local/TimePartitionedPathUtilsTest.java @@ -2,6 +2,7 @@ import com.google.protobuf.DynamicMessage; import com.gotocompany.firehose.config.BlobSinkConfig; +import com.gotocompany.firehose.config.enums.TimePartitionType; import com.gotocompany.firehose.sink.blob.Constants; import com.gotocompany.firehose.sink.blob.TestProtoMessage; import com.gotocompany.firehose.sink.blob.TestUtils; @@ -41,6 +42,7 @@ public void shouldCreateDayPartitioningPath() { Mockito.when(sinkConfig.getFilePartitionProtoTimestampTimezone()).thenReturn(zone); Mockito.when(sinkConfig.getFilePartitionProtoTimestampFieldName()).thenReturn(timeStampFieldName); Mockito.when(sinkConfig.getFilePartitionTimeGranularityType()).thenReturn(Constants.FilePartitionType.DAY); + Mockito.when(sinkConfig.getFilePartitionTimeType()).thenReturn(TimePartitionType.EVENT_TIMESTAMP); Mockito.when(sinkConfig.getOutputKafkaMetadataColumnName()).thenReturn(kafkaMetadataFieldName); Mockito.when(sinkConfig.getFilePartitionTimeDatePrefix()).thenReturn("date="); Mockito.when(sinkConfig.getFilePartitionTimeHourPrefix()).thenReturn(""); @@ -58,6 +60,7 @@ public void shouldCreateHourPartitioningPath() { Mockito.when(sinkConfig.getFilePartitionProtoTimestampTimezone()).thenReturn(zone); Mockito.when(sinkConfig.getOutputKafkaMetadataColumnName()).thenReturn(kafkaMetadataFieldName); Mockito.when(sinkConfig.getFilePartitionProtoTimestampFieldName()).thenReturn(timeStampFieldName); + Mockito.when(sinkConfig.getFilePartitionTimeType()).thenReturn(TimePartitionType.EVENT_TIMESTAMP); Mockito.when(sinkConfig.getFilePartitionTimeGranularityType()).thenReturn(Constants.FilePartitionType.HOUR); Mockito.when(sinkConfig.getFilePartitionTimeDatePrefix()).thenReturn(datePrefix); Mockito.when(sinkConfig.getFilePartitionTimeHourPrefix()).thenReturn(hourPrefix); @@ -91,6 +94,7 @@ public void shouldCreatePartitionPathWhenKafkaMetadataIsNotNested() { BlobSinkConfig sinkConfig = Mockito.mock(BlobSinkConfig.class); Mockito.when(sinkConfig.getFilePartitionProtoTimestampTimezone()).thenReturn(zone); Mockito.when(sinkConfig.getFilePartitionTimeGranularityType()).thenReturn(Constants.FilePartitionType.DAY); + Mockito.when(sinkConfig.getFilePartitionTimeType()).thenReturn(TimePartitionType.EVENT_TIMESTAMP); Mockito.when(sinkConfig.getFilePartitionProtoTimestampFieldName()).thenReturn(timeStampFieldName); Mockito.when(sinkConfig.getFilePartitionTimeDatePrefix()).thenReturn(datePrefix); Mockito.when(sinkConfig.getFilePartitionTimeHourPrefix()).thenReturn(hourPrefix); @@ -107,6 +111,7 @@ public void shouldCreatePartitioningPathForNestedKafkaMetadata() { Record record = new Record(message, metadata); BlobSinkConfig sinkConfig = Mockito.mock(BlobSinkConfig.class); Mockito.when(sinkConfig.getFilePartitionProtoTimestampTimezone()).thenReturn(zone); + Mockito.when(sinkConfig.getFilePartitionTimeType()).thenReturn(TimePartitionType.EVENT_TIMESTAMP); Mockito.when(sinkConfig.getFilePartitionTimeGranularityType()).thenReturn(Constants.FilePartitionType.DAY); Mockito.when(sinkConfig.getFilePartitionProtoTimestampFieldName()).thenReturn(timeStampFieldName); Mockito.when(sinkConfig.getFilePartitionTimeDatePrefix()).thenReturn(datePrefix);