From abf44d935c6bdfd84509ebc82bf0acb3703ab41c Mon Sep 17 00:00:00 2001 From: vernedeng Date: Thu, 19 Oct 2023 19:33:01 +0800 Subject: [PATCH] [INLONG-9077][Sort] Fix TubeMQ connector fail to subscribe streamId --- .../manager/pojo/sort/node/provider/TubeMqProvider.java | 4 ++-- .../inlong/manager/pojo/source/tubemq/TubeMQSource.java | 4 ++-- .../manager/service/source/tubemq/TubeMQSourceOperator.java | 2 +- .../inlong/sort/protocol/node/extract/TubeMQExtractNode.java | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java index 72399221f88..d2ed8cd0c3d 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java @@ -53,8 +53,8 @@ public ExtractNode createExtractNode(StreamNode streamNodeInfo) { source.getMasterRpc(), source.getTopic(), source.getSerializationType(), - source.getGroupId(), + source.getConsumeGroup(), source.getSessionKey(), - source.getTid()); + source.getStreamId()); } } \ No newline at end of file diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java index 6e6720208cb..bcb6ac1a63f 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java @@ -52,7 +52,7 @@ public class TubeMQSource extends StreamSource { private String topic; @ApiModelProperty("Group of the TubeMQ") - private String groupId; + private String consumeGroup; @ApiModelProperty("Session key of the TubeMQ") private String sessionKey; @@ -61,7 +61,7 @@ public class TubeMQSource extends StreamSource { * The TubeMQ consumers use this tid set to filter records reading from server. */ @ApiModelProperty("Tid of the TubeMQ") - private TreeSet tid; + private TreeSet streamId; public TubeMQSource() { this.setSourceType(SourceType.TUBEMQ); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java index 7fc56539d93..df7af84b09a 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java @@ -107,7 +107,7 @@ public Map> getSourcesMap(InlongGroupInfo groupInfo, String streamId = streamInfo.getInlongStreamId(); tubeMQSource.setSourceName(streamId); tubeMQSource.setTopic(groupInfo.getMqResource()); - tubeMQSource.setGroupId(streamId); + tubeMQSource.setConsumeGroup(streamId); tubeMQSource.setMasterRpc(masterRpc); tubeMQSource.setIgnoreParseError(streamInfo.getIgnoreParseError()); diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java index 1c7211e9f4d..fa9272f2ee7 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java @@ -85,7 +85,7 @@ public TubeMQExtractNode( @Nonnull @JsonProperty("format") String format, @Nonnull @JsonProperty("consumeGroup") String consumeGroup, @JsonProperty("sessionKey") String sessionKey, - @JsonProperty("tid") TreeSet streamId) { + @JsonProperty("streamId") TreeSet streamId) { super(id, name, fields, waterMarkField, properties); this.masterRpc = Preconditions.checkNotNull(masterRpc, "TubeMQ masterRpc is null"); this.topic = Preconditions.checkNotNull(topic, "TubeMQ topic is null");