Skip to content

Commit

Permalink
[INLONG-9077][Sort] Fix TubeMQ connector fail to subscribe streamId
Browse files Browse the repository at this point in the history
  • Loading branch information
vernedeng committed Oct 19, 2023
1 parent f0908ee commit abf44d9
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ public ExtractNode createExtractNode(StreamNode streamNodeInfo) {
source.getMasterRpc(),
source.getTopic(),
source.getSerializationType(),
source.getGroupId(),
source.getConsumeGroup(),
source.getSessionKey(),
source.getTid());
source.getStreamId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class TubeMQSource extends StreamSource {
private String topic;

@ApiModelProperty("Group of the TubeMQ")
private String groupId;
private String consumeGroup;

@ApiModelProperty("Session key of the TubeMQ")
private String sessionKey;
Expand All @@ -61,7 +61,7 @@ public class TubeMQSource extends StreamSource {
* The TubeMQ consumers use this tid set to filter records reading from server.
*/
@ApiModelProperty("Tid of the TubeMQ")
private TreeSet<String> tid;
private TreeSet<String> streamId;

public TubeMQSource() {
this.setSourceType(SourceType.TUBEMQ);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public Map<String, List<StreamSource>> getSourcesMap(InlongGroupInfo groupInfo,
String streamId = streamInfo.getInlongStreamId();
tubeMQSource.setSourceName(streamId);
tubeMQSource.setTopic(groupInfo.getMqResource());
tubeMQSource.setGroupId(streamId);
tubeMQSource.setConsumeGroup(streamId);
tubeMQSource.setMasterRpc(masterRpc);
tubeMQSource.setIgnoreParseError(streamInfo.getIgnoreParseError());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public TubeMQExtractNode(
@Nonnull @JsonProperty("format") String format,
@Nonnull @JsonProperty("consumeGroup") String consumeGroup,
@JsonProperty("sessionKey") String sessionKey,
@JsonProperty("tid") TreeSet<String> streamId) {
@JsonProperty("streamId") TreeSet<String> streamId) {
super(id, name, fields, waterMarkField, properties);
this.masterRpc = Preconditions.checkNotNull(masterRpc, "TubeMQ masterRpc is null");
this.topic = Preconditions.checkNotNull(topic, "TubeMQ topic is null");
Expand Down

0 comments on commit abf44d9

Please sign in to comment.