Skip to content

Commit

Permalink
fix(QAPPINT-803): Fixing timestamp attribute for iteratorType=AT_TIME…
Browse files Browse the repository at this point in the history
…STAMP for Kinesis2Component
  • Loading branch information
ArtemMarchukQlikCom committed Nov 29, 2024
1 parent 05d6b35 commit 3e3ad4e
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public boolean configure(CamelContext camelContext, Object obj, String name, Obj
case "lazyStartProducer": target.setLazyStartProducer(property(camelContext, boolean.class, value)); return true;
case "maxresultsperrequest":
case "maxResultsPerRequest": getOrCreateConfiguration(target).setMaxResultsPerRequest(property(camelContext, int.class, value)); return true;
case "messagetimestamp":
case "messageTimestamp": getOrCreateConfiguration(target).setMessageTimestamp(property(camelContext, java.lang.String.class, value)); return true;
case "overrideendpoint":
case "overrideEndpoint": getOrCreateConfiguration(target).setOverrideEndpoint(property(camelContext, boolean.class, value)); return true;
case "proxyhost":
Expand Down Expand Up @@ -97,6 +99,8 @@ public Class<?> getOptionType(String name, boolean ignoreCase) {
case "lazyStartProducer": return boolean.class;
case "maxresultsperrequest":
case "maxResultsPerRequest": return int.class;
case "messagetimestamp":
case "messageTimestamp": return java.lang.String.class;
case "overrideendpoint":
case "overrideEndpoint": return boolean.class;
case "proxyhost":
Expand Down Expand Up @@ -145,6 +149,8 @@ public Object getOptionValue(Object obj, String name, boolean ignoreCase) {
case "lazyStartProducer": return target.isLazyStartProducer();
case "maxresultsperrequest":
case "maxResultsPerRequest": return getOrCreateConfiguration(target).getMaxResultsPerRequest();
case "messagetimestamp":
case "messageTimestamp": return getOrCreateConfiguration(target).getMessageTimestamp();
case "overrideendpoint":
case "overrideEndpoint": return getOrCreateConfiguration(target).isOverrideEndpoint();
case "proxyhost":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public boolean configure(CamelContext camelContext, Object obj, String name, Obj
case "lazyStartProducer": target.setLazyStartProducer(property(camelContext, boolean.class, value)); return true;
case "maxresultsperrequest":
case "maxResultsPerRequest": target.getConfiguration().setMaxResultsPerRequest(property(camelContext, int.class, value)); return true;
case "messagetimestamp":
case "messageTimestamp": target.getConfiguration().setMessageTimestamp(property(camelContext, java.lang.String.class, value)); return true;
case "overrideendpoint":
case "overrideEndpoint": target.getConfiguration().setOverrideEndpoint(property(camelContext, boolean.class, value)); return true;
case "pollstrategy":
Expand Down Expand Up @@ -131,6 +133,8 @@ public Class<?> getOptionType(String name, boolean ignoreCase) {
case "lazyStartProducer": return boolean.class;
case "maxresultsperrequest":
case "maxResultsPerRequest": return int.class;
case "messagetimestamp":
case "messageTimestamp": return java.lang.String.class;
case "overrideendpoint":
case "overrideEndpoint": return boolean.class;
case "pollstrategy":
Expand Down Expand Up @@ -209,6 +213,8 @@ public Object getOptionValue(Object obj, String name, boolean ignoreCase) {
case "lazyStartProducer": return target.isLazyStartProducer();
case "maxresultsperrequest":
case "maxResultsPerRequest": return target.getConfiguration().getMaxResultsPerRequest();
case "messagetimestamp":
case "messageTimestamp": return target.getConfiguration().getMessageTimestamp();
case "overrideendpoint":
case "overrideEndpoint": return target.getConfiguration().isOverrideEndpoint();
case "pollstrategy":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class Kinesis2EndpointUriFactory extends org.apache.camel.support.compone
private static final Set<String> SECRET_PROPERTY_NAMES;
private static final Set<String> MULTI_VALUE_PREFIXES;
static {
Set<String> props = new HashSet<>(38);
Set<String> props = new HashSet<>(39);
props.add("accessKey");
props.add("amazonKinesisClient");
props.add("backoffErrorThreshold");
Expand All @@ -37,6 +37,7 @@ public class Kinesis2EndpointUriFactory extends org.apache.camel.support.compone
props.add("iteratorType");
props.add("lazyStartProducer");
props.add("maxResultsPerRequest");
props.add("messageTimestamp");
props.add("overrideEndpoint");
props.add("pollStrategy");
props.add("proxyHost");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ public class Kinesis2Configuration implements Cloneable {
@UriParam(label = "consumer",
description = "The sequence number to start polling from. Required if iteratorType is set to AFTER_SEQUENCE_NUMBER or AT_SEQUENCE_NUMBER")
private String sequenceNumber = "";
@UriParam(label = "consumer",
description = "The message timestamp to start polling from. Required if iteratorType is set to AFTER_SEQUENCE_NUMBER or AT_SEQUENCE_NUMBER")
private String messageTimestamp = "";
@UriParam(label = "consumer", defaultValue = "ignore",
description = "Define what will be the behavior in case of shard closed. Possible value are ignore, silent and fail."
+ " In case of ignore a message will be logged and the consumer will restart from the beginning,"
Expand Down Expand Up @@ -131,6 +134,14 @@ public void setSequenceNumber(String sequenceNumber) {
this.sequenceNumber = sequenceNumber;
}

public String getMessageTimestamp() {
return messageTimestamp;
}

public void setMessageTimestamp(String messageTimestamp) {
this.messageTimestamp = messageTimestamp;
}

public Kinesis2ShardClosedStrategyEnum getShardClosed() {
return shardClosed;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.camel.component.aws2.kinesis;

import java.math.BigDecimal;
import java.time.Instant;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Queue;
Expand Down Expand Up @@ -48,6 +50,9 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R

private static final Logger LOG = LoggerFactory.getLogger(Kinesis2Consumer.class);

private static final String UNIX_TIMESTAMP_MILLIS_REGEX = "^\\d{1,13}$";
private static final String UNIX_TIMESTAMP_DOUBLE_REGEX = "^[-+]?\\d+(\\.\\d+)?([eE][-+]?\\d+)?$";

private String currentShardIterator;
private boolean isShardClosed;
private ResumeStrategy resumeStrategy;
Expand Down Expand Up @@ -173,6 +178,11 @@ private String getShardIterator() {
req.startingSequenceNumber(getEndpoint().getConfiguration().getSequenceNumber());
}

if (hasMessageTimestamp()) {
String messageTimestamp = getEndpoint().getConfiguration().getMessageTimestamp();
req.timestamp(parseMessageTimestamp(messageTimestamp));
}

resume(req);

GetShardIteratorResponse result = getClient().getShardIterator(req.build());
Expand Down Expand Up @@ -237,6 +247,11 @@ private boolean hasSequenceNumber() {
|| getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER));
}

private boolean hasMessageTimestamp() {
return !getEndpoint().getConfiguration().getMessageTimestamp().isEmpty()
&& getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AT_TIMESTAMP);
}

@Override
protected void doStart() throws Exception {
super.doStart();
Expand All @@ -259,4 +274,33 @@ protected void doStart() throws Exception {
protected Kinesis2Configuration getConfiguration() {
return getEndpoint().getConfiguration();
}

private Instant parseMessageTimestamp(String messageTimestamp) {
if (messageTimestamp == null) {
throw new IllegalArgumentException("Timestamp can't be null");
}
// Milliseconds format
if (messageTimestamp.matches(UNIX_TIMESTAMP_MILLIS_REGEX)) {
long epochMilli = Long.parseLong(messageTimestamp);
return Instant.ofEpochMilli(epochMilli);
}

// Double format of seconds with fractional part. (1732882967.573, 1.732882967573E9 etc.)
if (messageTimestamp.matches(UNIX_TIMESTAMP_DOUBLE_REGEX)) {
// Using BigDecimal to better precision
BigDecimal decimalTime = new BigDecimal(messageTimestamp);
long seconds = decimalTime.longValue();
BigDecimal fractionalPart = decimalTime.subtract(new BigDecimal(seconds));
int nanos = fractionalPart.multiply(BigDecimal.valueOf(1_000_000_000)).intValue();

return Instant.ofEpochSecond(seconds, nanos);
}

// ISO 8601 format
try {
return Instant.parse(messageTimestamp);
} catch (Exception e) {
throw new IllegalArgumentException("Invalid timestamp format: " + messageTimestamp);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.core.Protocol;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -95,4 +96,19 @@ public void createEndpointWithEndpointOverride() throws Exception {
assertTrue(endpoint.getConfiguration().isOverrideEndpoint());
assertEquals("http://localhost:4567", endpoint.getConfiguration().getUriEndpointOverride());
}

@Test
public void createEndpointWithMessageTimestamp() throws Exception {
Kinesis2Component component = context.getComponent("aws2-kinesis", Kinesis2Component.class);
component.getConfiguration().setAccessKey("XXX");
component.getConfiguration().setSecretKey("YYY");

Kinesis2Endpoint endpoint = (Kinesis2Endpoint) component.createEndpoint("aws2-kinesis://some_stream_name" +
"?iteratorType=AT_TIMESTAMP&messageTimestamp=1732882967.573");

assertEquals("some_stream_name", endpoint.getConfiguration().getStreamName());
assertEquals(ShardIteratorType.AT_TIMESTAMP, endpoint.getConfiguration().getIteratorType());
assertEquals("1732882967.573", endpoint.getConfiguration().getMessageTimestamp());

}
}

0 comments on commit 3e3ad4e

Please sign in to comment.