Skip to content

Commit

Permalink
[INLONG-9237][Agent] Move addictive fields to package attributes (#9238)
Browse files Browse the repository at this point in the history
  • Loading branch information
justinwwhuang authored Nov 8, 2023
1 parent 7344211 commit b1c0beb
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public class TaskConstants extends CommonConstants {
public static final String TASK_START_TIME = "task.fileTask.startTime";
public static final String TASK_END_TIME = "task.fileTask.endTime";
public static final String FILE_MAX_NUM = "task.fileTask.maxFileCount";
public static final String PREDEFINE_FIELDS = "task.predefineFields";
public static final String PREDEFINE_FIELDS = "task.predefinedFields";

// Binlog job
public static final String JOB_DATABASE_USER = "job.binlogJob.user";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public ProxyMessageCache(InstanceProfile instanceProfile, String groupId, String
this.streamId = streamId;
this.inodeInfo = instanceProfile.get(TaskConstants.INODE_INFO);
extraMap.put(AttributeConstants.MESSAGE_SYNC_SEND, "false");
extraMap.putAll(AgentUtils.parseAddAttrToMap(instanceProfile.getPredefineFields()));
}

public void generateExtraMap(String dataKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.StringTokenizer;
import java.util.TimeZone;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -251,6 +252,30 @@ public static Pair<String, Map<String, String>> parseAddAttr(String additionStr)
return Pair.of(mValue, attr);
}

public static Map<String, String> parseAddAttrToMap(String addictiveAttr) {
StringTokenizer token = new StringTokenizer(addictiveAttr, "&");
Map<String, String> attr = new HashMap<String, String>();
while (token.hasMoreTokens()) {
String value = token.nextToken().trim();
if (value.contains("=")) {
String[] pairs = value.split("=");

if (pairs[0].equalsIgnoreCase("m")) {
continue;
}

// when addictiveattr like "m=10&__addcol1__worldid="
if (value.endsWith("=") && pairs.length == 1) {
attr.put(pairs[0], "");
} else {
attr.put(pairs[0], pairs[1]);
}

}
}
return attr;
}

/**
* Get the attrs in pairs can be complicated in online env
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.inlong.agent.plugin.file.Reader;
import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
import org.apache.inlong.agent.plugin.sources.reader.file.KubernetesMetadataProvider;
import org.apache.inlong.agent.plugin.utils.MetaDataUtils;
import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
import org.apache.inlong.agent.utils.AgentUtils;

Expand Down Expand Up @@ -349,7 +348,6 @@ private Message createMessage(SourceData sourceData) {
Map<String, String> header = new HashMap<>();
header.put(PROXY_KEY_DATA, proxyPartitionKey);
header.put(OFFSET, sourceData.offset.toString());
header.putAll(MetaDataUtils.parseAddAttr(profile.getPredefineFields()));
Message finalMsg = new DefaultMessage(msgWithMetaData.getBytes(StandardCharsets.UTF_8), header);
// if the message size is greater than max pack size,should drop it.
if (finalMsg.getBody().length > maxPackSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.StringTokenizer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -137,28 +136,4 @@ public static String getPodName(AbstractConfiguration taskProfile) {
}).filter(Objects::nonNull).collect(Collectors.toList());
return podName.isEmpty() ? null : podName.get(0);
}

public static Map<String, String> parseAddAttr(String addictiveAttr) {
StringTokenizer token = new StringTokenizer(addictiveAttr, "&");
Map<String, String> attr = new HashMap<String, String>();
while (token.hasMoreTokens()) {
String value = token.nextToken().trim();
if (value.contains("=")) {
String[] pairs = value.split("=");

if (pairs[0].equalsIgnoreCase("m")) {
continue;
}

// when addictiveattr like "m=10&__addcol1__worldid="
if (value.endsWith("=") && pairs.length == 1) {
attr.put(pairs[0], "");
} else {
attr.put(pairs[0], pairs[1]);
}

}
}
return attr;
}
}

0 comments on commit b1c0beb

Please sign in to comment.