Skip to content

Commit

Permalink
[INLONG-11565][SDK] Optimize the implementation of the Utils.java cla…
Browse files Browse the repository at this point in the history
…ss (#11566)
  • Loading branch information
gosonzhang authored Dec 1, 2024
1 parent d43f288 commit 853419d
Show file tree
Hide file tree
Showing 19 changed files with 81 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ public class DataProxyNodeResponse {
/**
* DataProxy cluster id
*/
@Deprecated
private Integer clusterId;

private String reportSourceType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.inlong.sdk.dataproxy.network.ProxysdkException;
import org.apache.inlong.sdk.dataproxy.network.Sender;
import org.apache.inlong.sdk.dataproxy.network.SequentialID;
import org.apache.inlong.sdk.dataproxy.network.Utils;
import org.apache.inlong.sdk.dataproxy.threads.IndexCollectThread;
import org.apache.inlong.sdk.dataproxy.threads.ManagerFetcherThread;
import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
Expand All @@ -54,7 +53,7 @@ public class DefaultMessageSender implements MessageSender {
new ConcurrentHashMap<>();
private static final AtomicBoolean MANAGER_FETCHER_THREAD_STARTED = new AtomicBoolean(false);
private static ManagerFetcherThread managerFetcherThread;
private static final SequentialID idGenerator = new SequentialID(Utils.getLocalIp());
private static final SequentialID idGenerator = new SequentialID();
private final Sender sender;
private final IndexCollectThread indexCol;
/* Store index <groupId_streamId,cnt> */
Expand All @@ -65,7 +64,7 @@ public class DefaultMessageSender implements MessageSender {
private boolean isGroupIdTransfer = false;
private boolean isReport = false;
private boolean isSupportLF = false;
private int maxPacketLength;
private int maxPacketLength = -1;
private int cpsSize = ConfigConstants.COMPRESS_SIZE;
private final int senderMaxAttempt;

Expand Down Expand Up @@ -110,8 +109,7 @@ public static DefaultMessageSender generateSenderByClusterId(ProxyClientConfig c
}
LOGGER.info("Initial tcp sender, configure is {}", configure);
// initial sender object
ProxyConfigManager proxyConfigManager = new ProxyConfigManager(configure,
Utils.getLocalIp(), null);
ProxyConfigManager proxyConfigManager = new ProxyConfigManager(configure, null);
proxyConfigManager.setInlongGroupId(configure.getInlongGroupId());
ProxyConfigEntry entry = proxyConfigManager.getGroupIdConfigure();
DefaultMessageSender sender = CACHE_SENDER.get(entry.getClusterId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.inlong.sdk.dataproxy;

import org.apache.inlong.sdk.dataproxy.metric.MetricConfig;
import org.apache.inlong.sdk.dataproxy.network.IpUtils;
import org.apache.inlong.sdk.dataproxy.network.ProxysdkException;
import org.apache.inlong.sdk.dataproxy.network.Utils;

import lombok.Data;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -98,13 +98,13 @@ public class ProxyClientConfig {
public ProxyClientConfig(String localHost, boolean requestByHttp, String managerIp,
int managerPort, String inlongGroupId, String authSecretId, String authSecretKey,
LoadBalance loadBalance, int virtualNode, int maxRetry) throws ProxysdkException {
if (Utils.isBlank(localHost)) {
if (StringUtils.isBlank(localHost)) {
throw new ProxysdkException("localHost is blank!");
}
if (Utils.isBlank(managerIp)) {
if (StringUtils.isBlank(managerIp)) {
throw new IllegalArgumentException("managerIp is Blank!");
}
if (Utils.isBlank(inlongGroupId)) {
if (StringUtils.isBlank(inlongGroupId)) {
throw new ProxysdkException("groupId is blank!");
}
this.inlongGroupId = inlongGroupId;
Expand All @@ -114,7 +114,7 @@ public ProxyClientConfig(String localHost, boolean requestByHttp, String manager
this.managerAddress = getManagerAddress(managerIp, managerPort, requestByHttp);
this.managerUrl =
getManagerUrl(managerAddress, inlongGroupId);
Utils.validLocalIp(localHost);
IpUtils.validLocalIp(localHost);
this.aliveConnections = ConfigConstants.ALIVE_CONNECTIONS;
this.syncThreadPoolSize = ConfigConstants.SYNC_THREAD_POOL_SIZE;
this.asyncCallbackSize = ConfigConstants.ASYNC_CALLBACK_SIZE;
Expand All @@ -131,11 +131,11 @@ public ProxyClientConfig(String localHost, boolean requestByHttp, String manager
/* pay attention to the last url parameter ip */
public ProxyClientConfig(String managerAddress, String inlongGroupId, String authSecretId, String authSecretKey,
LoadBalance loadBalance, int virtualNode, int maxRetry) throws ProxysdkException {
if (Utils.isBlank(managerAddress) || (!managerAddress.startsWith(ConfigConstants.HTTP)
if (StringUtils.isBlank(managerAddress) || (!managerAddress.startsWith(ConfigConstants.HTTP)
&& !managerAddress.startsWith(ConfigConstants.HTTPS))) {
throw new ProxysdkException("managerAddress is blank or missing http/https protocol ");
}
if (Utils.isBlank(inlongGroupId)) {
if (StringUtils.isBlank(inlongGroupId)) {
throw new ProxysdkException("groupId is blank!");
}
if (managerAddress.startsWith(ConfigConstants.HTTPS)) {
Expand Down Expand Up @@ -348,10 +348,10 @@ public void setAuthenticationInfo(boolean needAuthentication, boolean needDataEn
this.needAuthentication = needAuthentication;
this.isNeedDataEncry = needDataEncry;
if (this.needAuthentication || this.isNeedDataEncry) {
if (Utils.isBlank(userName)) {
if (StringUtils.isBlank(userName)) {
throw new IllegalArgumentException("userName is Blank!");
}
if (Utils.isBlank(secretKey)) {
if (StringUtils.isBlank(secretKey)) {
throw new IllegalArgumentException("secretKey is Blank!");
}
}
Expand All @@ -360,10 +360,10 @@ public void setAuthenticationInfo(boolean needAuthentication, boolean needDataEn
}

public void setHttpsInfo(String tlsServerCertFilePathAndName, String tlsServerKey) {
if (Utils.isBlank(tlsServerCertFilePathAndName)) {
if (StringUtils.isBlank(tlsServerCertFilePathAndName)) {
throw new IllegalArgumentException("tlsServerCertFilePathAndName is Blank!");
}
if (Utils.isBlank(tlsServerKey)) {
if (StringUtils.isBlank(tlsServerKey)) {
throw new IllegalArgumentException("tlsServerKey is Blank!");
}
this.tlsServerKey = tlsServerKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@
import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.sdk.dataproxy.config.EncryptConfigEntry;
import org.apache.inlong.sdk.dataproxy.config.EncryptInfo;
import org.apache.inlong.sdk.dataproxy.network.Utils;
import org.apache.inlong.sdk.dataproxy.network.IpUtils;
import org.apache.inlong.sdk.dataproxy.utils.EncryptUtil;
import org.apache.inlong.sdk.dataproxy.utils.LogCounter;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xerial.snappy.Snappy;
Expand Down Expand Up @@ -78,18 +79,18 @@ private ByteBuf writeToBuf8(EncodeObject object) {
try {
String endAttr = object.getCommonattr();
if (object.isAuth()) {
if (Utils.isNotBlank(endAttr)) {
if (StringUtils.isNotBlank(endAttr)) {
endAttr = endAttr + "&";
}
long timestamp = System.currentTimeMillis();
int nonce = new SecureRandom(String.valueOf(timestamp).getBytes()).nextInt(Integer.MAX_VALUE);
endAttr = endAttr + "_userName=" + object.getUserName() + "&_clientIP=" + Utils.getLocalIp()
+ "&_signature=" + Utils.generateSignature(object.getUserName(),
endAttr = endAttr + "_userName=" + object.getUserName() + "&_clientIP=" + IpUtils.getLocalIp()
+ "&_signature=" + IpUtils.generateSignature(object.getUserName(),
timestamp, nonce, object.getSecretKey())
+ "&_timeStamp=" + timestamp + "&_nonce=" + nonce;
}
if (Utils.isNotBlank(object.getMsgUUID())) {
if (Utils.isNotBlank(endAttr)) {
if (StringUtils.isNotBlank(object.getMsgUUID())) {
if (StringUtils.isNotBlank(endAttr)) {
endAttr = endAttr + "&";
}
endAttr = endAttr + "msgUUID=" + object.getMsgUUID();
Expand Down Expand Up @@ -129,7 +130,7 @@ private ByteBuf constructBody(byte[] body, EncodeObject object,
if (object.isEncrypt()) {
EncryptConfigEntry encryptEntry = object.getEncryptEntry();
if (encryptEntry != null) {
if (Utils.isNotBlank(endAttr)) {
if (StringUtils.isNotBlank(endAttr)) {
endAttr = endAttr + "&";
}
EncryptInfo encryptInfo = encryptEntry.getRsaEncryptInfo();
Expand All @@ -140,13 +141,13 @@ private ByteBuf constructBody(byte[] body, EncodeObject object,
}
}
if (!object.isGroupIdTransfer()) {
if (Utils.isNotBlank(endAttr)) {
if (StringUtils.isNotBlank(endAttr)) {
endAttr = endAttr + "&";
}
endAttr = (endAttr + "groupId=" + object.getGroupId() + "&streamId=" + object.getStreamId());
}
if (Utils.isNotBlank(object.getMsgUUID())) {
if (Utils.isNotBlank(endAttr)) {
if (StringUtils.isNotBlank(object.getMsgUUID())) {
if (StringUtils.isNotBlank(endAttr)) {
endAttr = endAttr + "&";
}
endAttr = endAttr + "msgUUID=" + object.getMsgUUID();
Expand Down Expand Up @@ -258,7 +259,7 @@ private ByteBuf writeToBuf5(EncodeObject object) {
if (object.isEncrypt()) {
EncryptConfigEntry encryptEntry = object.getEncryptEntry();
if (encryptEntry != null) {
if (Utils.isNotBlank(msgAttrs)) {
if (StringUtils.isNotBlank(msgAttrs)) {
msgAttrs = msgAttrs + "&";
}
EncryptInfo encryptInfo = encryptEntry.getRsaEncryptInfo();
Expand All @@ -268,8 +269,8 @@ private ByteBuf writeToBuf5(EncodeObject object) {
body = EncryptUtil.aesEncrypt(body, encryptInfo.getAesKey());
}
}
if (Utils.isNotBlank(object.getMsgUUID())) {
if (Utils.isNotBlank(msgAttrs)) {
if (StringUtils.isNotBlank(object.getMsgUUID())) {
if (StringUtils.isNotBlank(msgAttrs)) {
msgAttrs = msgAttrs + "&";
}
msgAttrs = msgAttrs + "msgUUID=" + object.getMsgUUID();
Expand Down Expand Up @@ -322,7 +323,7 @@ private ByteBuf writeToBuf3(EncodeObject object) {
if (object.isEncrypt()) {
EncryptConfigEntry encryptEntry = object.getEncryptEntry();
if (encryptEntry != null) {
if (Utils.isNotBlank(msgAttrs)) {
if (StringUtils.isNotBlank(msgAttrs)) {
msgAttrs = msgAttrs + "&";
}
EncryptInfo encryptInfo = encryptEntry.getRsaEncryptInfo();
Expand All @@ -332,8 +333,8 @@ private ByteBuf writeToBuf3(EncodeObject object) {
body = EncryptUtil.aesEncrypt(body, encryptInfo.getAesKey());
}
}
if (Utils.isNotBlank(object.getMsgUUID())) {
if (Utils.isNotBlank(msgAttrs)) {
if (StringUtils.isNotBlank(object.getMsgUUID())) {
if (StringUtils.isNotBlank(msgAttrs)) {
msgAttrs = msgAttrs + "&";
}
msgAttrs = msgAttrs + "msgUUID=" + object.getMsgUUID();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.network.ClientMgr;
import org.apache.inlong.sdk.dataproxy.network.HashRing;
import org.apache.inlong.sdk.dataproxy.network.Utils;
import org.apache.inlong.sdk.dataproxy.network.IpUtils;

import com.google.gson.Gson;
import com.google.gson.JsonArray;
Expand Down Expand Up @@ -93,7 +93,6 @@ public class ProxyConfigManager extends Thread {
public static final String APPLICATION_JSON = "application/json";
private static final Logger LOGGER = LoggerFactory.getLogger(ProxyConfigManager.class);
private final ProxyClientConfig clientConfig;
private final String localIP;
private final ClientMgr clientManager;
private final ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
private final JsonParser jsonParser = new JsonParser();
Expand All @@ -108,9 +107,8 @@ public class ProxyConfigManager extends Thread {
private long doworkTime = 0;
private EncryptConfigEntry userEncryConfigEntry;

public ProxyConfigManager(final ProxyClientConfig configure, final String localIP, final ClientMgr clientManager) {
public ProxyConfigManager(final ProxyClientConfig configure, final ClientMgr clientManager) {
this.clientConfig = configure;
this.localIP = localIP;
this.clientManager = clientManager;
this.hashRing.setVirtualNode(configure.getVirtualNode());
}
Expand Down Expand Up @@ -358,7 +356,7 @@ private void compareProxyList(ProxyConfigEntry proxyEntry) {
}

public EncryptConfigEntry getEncryptConfigEntry(final String userName) {
if (Utils.isBlank(userName)) {
if (StringUtils.isBlank(userName)) {
return null;
}
EncryptConfigEntry encryptEntry = this.userEncryConfigEntry;
Expand Down Expand Up @@ -397,7 +395,7 @@ public EncryptConfigEntry getEncryptConfigEntry(final String userName) {
}

private void updateEncryptConfigEntry() {
if (Utils.isBlank(this.clientConfig.getUserName())) {
if (StringUtils.isBlank(this.clientConfig.getUserName())) {
return;
}
int retryCount = 0;
Expand All @@ -422,7 +420,7 @@ private void updateEncryptConfigEntry() {
}

private EncryptConfigEntry getStoredPubKeyEntry(String userName) {
if (Utils.isBlank(userName)) {
if (StringUtils.isBlank(userName)) {
LOGGER.warn(" userName(" + userName + ") is not available");
return null;
}
Expand Down Expand Up @@ -509,15 +507,15 @@ private String calcHostInfoMd5(List<HostInfo> hostInfoList) {
}

private EncryptConfigEntry requestPubKey(String pubKeyUrl, String userName, boolean needGet) {
if (Utils.isBlank(userName)) {
if (StringUtils.isBlank(userName)) {
LOGGER.error("Queried userName is null!");
return null;
}
List<BasicNameValuePair> params = new ArrayList<BasicNameValuePair>();
params.add(new BasicNameValuePair("operation", "query"));
params.add(new BasicNameValuePair("username", userName));
String returnStr = requestConfiguration(pubKeyUrl, params);
if (Utils.isBlank(returnStr)) {
if (StringUtils.isBlank(returnStr)) {
LOGGER.info("No public key information returned from manager");
return null;
}
Expand All @@ -543,15 +541,15 @@ private EncryptConfigEntry requestPubKey(String pubKeyUrl, String userName, bool
JsonObject resultData = pubKeyConf.get("resultData").getAsJsonObject();
if (resultData != null) {
String publicKey = resultData.get("publicKey").getAsString();
if (Utils.isBlank(publicKey)) {
if (StringUtils.isBlank(publicKey)) {
return null;
}
String username = resultData.get("username").getAsString();
if (Utils.isBlank(username)) {
if (StringUtils.isBlank(username)) {
return null;
}
String versionStr = resultData.get("version").getAsString();
if (Utils.isBlank(versionStr)) {
if (StringUtils.isBlank(versionStr)) {
return null;
}
return new EncryptConfigEntry(username, versionStr, publicKey);
Expand Down Expand Up @@ -591,7 +589,7 @@ private Map<String, Integer> getStreamIdMap(JsonObject localProxyAddrJson) {

public ProxyConfigEntry requestProxyList(String url) {
ArrayList<BasicNameValuePair> params = new ArrayList<BasicNameValuePair>();
params.add(new BasicNameValuePair("ip", this.localIP));
params.add(new BasicNameValuePair("ip", IpUtils.getLocalIp()));
params.add(new BasicNameValuePair("protocolType", clientConfig.getProtocolType()));
LOGGER.info("Begin to get configure from manager {}, param is {}", url, params);

Expand Down Expand Up @@ -700,7 +698,7 @@ private String updateUrl(String url, int tryIdx, String localManagerIpList) {

/* Request new configurations from Manager. */
private String requestConfiguration(String url, List<BasicNameValuePair> params) {
if (Utils.isBlank(url)) {
if (StringUtils.isBlank(url)) {
LOGGER.error("request url is null");
return null;
}
Expand Down Expand Up @@ -746,7 +744,7 @@ private String requestConfiguration(String url, List<BasicNameValuePair> params)
httpPost.setEntity(urlEncodedFormEntity);
HttpResponse response = httpClient.execute(httpPost);
returnStr = EntityUtils.toString(response.getEntity());
if (Utils.isNotBlank(returnStr)
if (StringUtils.isNotBlank(returnStr)
&& response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
LOGGER.info("Get configure from manager is " + returnStr);
return returnStr;
Expand Down
Loading

0 comments on commit 853419d

Please sign in to comment.