Skip to content

Commit

Permalink
[INLONG-11179][Agent] Delete useless code (apache#11180)
Browse files Browse the repository at this point in the history
  • Loading branch information
justinwwhuang authored and PeterZh6 committed Sep 23, 2024
1 parent f68efc2 commit 34f0a7a
Show file tree
Hide file tree
Showing 3 changed files with 0 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import java.util.Map;

import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_GROUP_ID;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_STREAM_ID;

Expand All @@ -37,30 +36,20 @@ public class ProxyMessage implements Message {
private final Map<String, String> header;
private final String inlongGroupId;
private final String inlongStreamId;
// determine the group key when making batch
private final String batchKey;
private final String dataKey;
OffsetAckInfo ackInfo;

public ProxyMessage(byte[] body, Map<String, String> header) {
this.body = body;
this.header = header;
this.inlongGroupId = header.get(PROXY_KEY_GROUP_ID);
this.inlongStreamId = header.getOrDefault(PROXY_KEY_STREAM_ID, DEFAULT_INLONG_STREAM_ID);
this.dataKey = header.getOrDefault(PROXY_KEY_DATA, "");
// use the batch key of user and inlongStreamId to determine one batch
this.batchKey = dataKey + inlongStreamId;
ackInfo = new OffsetAckInfo(header.get(TaskConstants.OFFSET), body.length, false);
}

public ProxyMessage(Message message) {
this(message.getBody(), message.getHeader());
}

public String getDataKey() {
return dataKey;
}

/**
* Get first line of body list
*
Expand Down Expand Up @@ -92,8 +81,4 @@ public String getInlongGroupId() {
public String getInlongStreamId() {
return inlongStreamId;
}

public String getBatchKey() {
return batchKey;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,6 @@ public ProxyMessageCache(InstanceProfile instanceProfile, String groupId, String
}
}

public void generateExtraMap(String dataKey) {
this.extraMap.put(AttributeConstants.MESSAGE_PARTITION_KEY, dataKey);
}

/**
* Check whether queue is nearly full
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ private boolean putInCache(Message message) {
MemoryManager.getInstance().printDetail(AGENT_GLOBAL_WRITER_PERMIT, "proxy sink");
return false;
}
cache.generateExtraMap(proxyMessage.getDataKey());
// add message to package proxy
boolean suc = cache.add(proxyMessage);
if (suc) {
Expand Down

0 comments on commit 34f0a7a

Please sign in to comment.