From 3e3ad4efd23388b5b56e76901ff6da1f4350a0a7 Mon Sep 17 00:00:00 2001 From: ArtemMarchukQlikCom Date: Fri, 29 Nov 2024 18:14:50 +0200 Subject: [PATCH] fix(QAPPINT-803): Fixing timestamp attribute for iteratorType=AT_TIMESTAMP for Kinesis2Component --- .../kinesis/Kinesis2ComponentConfigurer.java | 6 +++ .../kinesis/Kinesis2EndpointConfigurer.java | 6 +++ .../kinesis/Kinesis2EndpointUriFactory.java | 3 +- .../aws2/kinesis/Kinesis2Configuration.java | 11 +++++ .../aws2/kinesis/Kinesis2Consumer.java | 44 +++++++++++++++++++ .../KinesisComponentConfigurationTest.java | 16 +++++++ 6 files changed, 85 insertions(+), 1 deletion(-) diff --git a/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2ComponentConfigurer.java b/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2ComponentConfigurer.java index 1a15fdc28b6b5..145d3901e68fb 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2ComponentConfigurer.java +++ b/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2ComponentConfigurer.java @@ -45,6 +45,8 @@ public boolean configure(CamelContext camelContext, Object obj, String name, Obj case "lazyStartProducer": target.setLazyStartProducer(property(camelContext, boolean.class, value)); return true; case "maxresultsperrequest": case "maxResultsPerRequest": getOrCreateConfiguration(target).setMaxResultsPerRequest(property(camelContext, int.class, value)); return true; + case "messagetimestamp": + case "messageTimestamp": getOrCreateConfiguration(target).setMessageTimestamp(property(camelContext, java.lang.String.class, value)); return true; case "overrideendpoint": case "overrideEndpoint": getOrCreateConfiguration(target).setOverrideEndpoint(property(camelContext, boolean.class, value)); return true; case "proxyhost": @@ -97,6 +99,8 @@ public Class getOptionType(String name, boolean ignoreCase) { case "lazyStartProducer": return boolean.class; case "maxresultsperrequest": case "maxResultsPerRequest": return int.class; + case "messagetimestamp": + case "messageTimestamp": return java.lang.String.class; case "overrideendpoint": case "overrideEndpoint": return boolean.class; case "proxyhost": @@ -145,6 +149,8 @@ public Object getOptionValue(Object obj, String name, boolean ignoreCase) { case "lazyStartProducer": return target.isLazyStartProducer(); case "maxresultsperrequest": case "maxResultsPerRequest": return getOrCreateConfiguration(target).getMaxResultsPerRequest(); + case "messagetimestamp": + case "messageTimestamp": return getOrCreateConfiguration(target).getMessageTimestamp(); case "overrideendpoint": case "overrideEndpoint": return getOrCreateConfiguration(target).isOverrideEndpoint(); case "proxyhost": diff --git a/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2EndpointConfigurer.java b/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2EndpointConfigurer.java index b165e7113e0bf..3212a3ec9e093 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2EndpointConfigurer.java +++ b/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2EndpointConfigurer.java @@ -49,6 +49,8 @@ public boolean configure(CamelContext camelContext, Object obj, String name, Obj case "lazyStartProducer": target.setLazyStartProducer(property(camelContext, boolean.class, value)); return true; case "maxresultsperrequest": case "maxResultsPerRequest": target.getConfiguration().setMaxResultsPerRequest(property(camelContext, int.class, value)); return true; + case "messagetimestamp": + case "messageTimestamp": target.getConfiguration().setMessageTimestamp(property(camelContext, java.lang.String.class, value)); return true; case "overrideendpoint": case "overrideEndpoint": target.getConfiguration().setOverrideEndpoint(property(camelContext, boolean.class, value)); return true; case "pollstrategy": @@ -131,6 +133,8 @@ public Class getOptionType(String name, boolean ignoreCase) { case "lazyStartProducer": return boolean.class; case "maxresultsperrequest": case "maxResultsPerRequest": return int.class; + case "messagetimestamp": + case "messageTimestamp": return java.lang.String.class; case "overrideendpoint": case "overrideEndpoint": return boolean.class; case "pollstrategy": @@ -209,6 +213,8 @@ public Object getOptionValue(Object obj, String name, boolean ignoreCase) { case "lazyStartProducer": return target.isLazyStartProducer(); case "maxresultsperrequest": case "maxResultsPerRequest": return target.getConfiguration().getMaxResultsPerRequest(); + case "messagetimestamp": + case "messageTimestamp": return target.getConfiguration().getMessageTimestamp(); case "overrideendpoint": case "overrideEndpoint": return target.getConfiguration().isOverrideEndpoint(); case "pollstrategy": diff --git a/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2EndpointUriFactory.java b/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2EndpointUriFactory.java index 3875f59fc34ff..3061221626ac0 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2EndpointUriFactory.java +++ b/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2EndpointUriFactory.java @@ -21,7 +21,7 @@ public class Kinesis2EndpointUriFactory extends org.apache.camel.support.compone private static final Set SECRET_PROPERTY_NAMES; private static final Set MULTI_VALUE_PREFIXES; static { - Set props = new HashSet<>(38); + Set props = new HashSet<>(39); props.add("accessKey"); props.add("amazonKinesisClient"); props.add("backoffErrorThreshold"); @@ -37,6 +37,7 @@ public class Kinesis2EndpointUriFactory extends org.apache.camel.support.compone props.add("iteratorType"); props.add("lazyStartProducer"); props.add("maxResultsPerRequest"); + props.add("messageTimestamp"); props.add("overrideEndpoint"); props.add("pollStrategy"); props.add("proxyHost"); diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java index db57da4206a1c..ba240fbe41291 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java @@ -52,6 +52,9 @@ public class Kinesis2Configuration implements Cloneable { @UriParam(label = "consumer", description = "The sequence number to start polling from. Required if iteratorType is set to AFTER_SEQUENCE_NUMBER or AT_SEQUENCE_NUMBER") private String sequenceNumber = ""; + @UriParam(label = "consumer", + description = "The message timestamp to start polling from. Required if iteratorType is set to AFTER_SEQUENCE_NUMBER or AT_SEQUENCE_NUMBER") + private String messageTimestamp = ""; @UriParam(label = "consumer", defaultValue = "ignore", description = "Define what will be the behavior in case of shard closed. Possible value are ignore, silent and fail." + " In case of ignore a message will be logged and the consumer will restart from the beginning," @@ -131,6 +134,14 @@ public void setSequenceNumber(String sequenceNumber) { this.sequenceNumber = sequenceNumber; } + public String getMessageTimestamp() { + return messageTimestamp; + } + + public void setMessageTimestamp(String messageTimestamp) { + this.messageTimestamp = messageTimestamp; + } + public Kinesis2ShardClosedStrategyEnum getShardClosed() { return shardClosed; } diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java index e3eae1edd2d88..dcdff611d9270 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java @@ -16,6 +16,8 @@ */ package org.apache.camel.component.aws2.kinesis; +import java.math.BigDecimal; +import java.time.Instant; import java.util.ArrayDeque; import java.util.List; import java.util.Queue; @@ -48,6 +50,9 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R private static final Logger LOG = LoggerFactory.getLogger(Kinesis2Consumer.class); + private static final String UNIX_TIMESTAMP_MILLIS_REGEX = "^\\d{1,13}$"; + private static final String UNIX_TIMESTAMP_DOUBLE_REGEX = "^[-+]?\\d+(\\.\\d+)?([eE][-+]?\\d+)?$"; + private String currentShardIterator; private boolean isShardClosed; private ResumeStrategy resumeStrategy; @@ -173,6 +178,11 @@ private String getShardIterator() { req.startingSequenceNumber(getEndpoint().getConfiguration().getSequenceNumber()); } + if (hasMessageTimestamp()) { + String messageTimestamp = getEndpoint().getConfiguration().getMessageTimestamp(); + req.timestamp(parseMessageTimestamp(messageTimestamp)); + } + resume(req); GetShardIteratorResponse result = getClient().getShardIterator(req.build()); @@ -237,6 +247,11 @@ private boolean hasSequenceNumber() { || getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER)); } + private boolean hasMessageTimestamp() { + return !getEndpoint().getConfiguration().getMessageTimestamp().isEmpty() + && getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AT_TIMESTAMP); + } + @Override protected void doStart() throws Exception { super.doStart(); @@ -259,4 +274,33 @@ protected void doStart() throws Exception { protected Kinesis2Configuration getConfiguration() { return getEndpoint().getConfiguration(); } + + private Instant parseMessageTimestamp(String messageTimestamp) { + if (messageTimestamp == null) { + throw new IllegalArgumentException("Timestamp can't be null"); + } + // Milliseconds format + if (messageTimestamp.matches(UNIX_TIMESTAMP_MILLIS_REGEX)) { + long epochMilli = Long.parseLong(messageTimestamp); + return Instant.ofEpochMilli(epochMilli); + } + + // Double format of seconds with fractional part. (1732882967.573, 1.732882967573E9 etc.) + if (messageTimestamp.matches(UNIX_TIMESTAMP_DOUBLE_REGEX)) { + // Using BigDecimal to better precision + BigDecimal decimalTime = new BigDecimal(messageTimestamp); + long seconds = decimalTime.longValue(); + BigDecimal fractionalPart = decimalTime.subtract(new BigDecimal(seconds)); + int nanos = fractionalPart.multiply(BigDecimal.valueOf(1_000_000_000)).intValue(); + + return Instant.ofEpochSecond(seconds, nanos); + } + + // ISO 8601 format + try { + return Instant.parse(messageTimestamp); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid timestamp format: " + messageTimestamp); + } + } } diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisComponentConfigurationTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisComponentConfigurationTest.java index dff6b312e2994..edfa17d0bd21a 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisComponentConfigurationTest.java +++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisComponentConfigurationTest.java @@ -20,6 +20,7 @@ import org.junit.jupiter.api.Test; import software.amazon.awssdk.core.Protocol; import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -95,4 +96,19 @@ public void createEndpointWithEndpointOverride() throws Exception { assertTrue(endpoint.getConfiguration().isOverrideEndpoint()); assertEquals("http://localhost:4567", endpoint.getConfiguration().getUriEndpointOverride()); } + + @Test + public void createEndpointWithMessageTimestamp() throws Exception { + Kinesis2Component component = context.getComponent("aws2-kinesis", Kinesis2Component.class); + component.getConfiguration().setAccessKey("XXX"); + component.getConfiguration().setSecretKey("YYY"); + + Kinesis2Endpoint endpoint = (Kinesis2Endpoint) component.createEndpoint("aws2-kinesis://some_stream_name" + + "?iteratorType=AT_TIMESTAMP&messageTimestamp=1732882967.573"); + + assertEquals("some_stream_name", endpoint.getConfiguration().getStreamName()); + assertEquals(ShardIteratorType.AT_TIMESTAMP, endpoint.getConfiguration().getIteratorType()); + assertEquals("1732882967.573", endpoint.getConfiguration().getMessageTimestamp()); + + } }