Skip to content

Commit

Permalink
[INLONG-11599][SDK] Optimize the configuration related content in the…
Browse files Browse the repository at this point in the history
… ProxyClientConfig class (#11600)



Co-authored-by: gosonzhang <[email protected]>
  • Loading branch information
gosonzhang and gosonzhang authored Dec 16, 2024
1 parent d3be4d4 commit 1c9dd7d
Show file tree
Hide file tree
Showing 14 changed files with 356 additions and 214 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ private void createMessageSender() throws Exception {
authSecretKey);
proxyClientConfig.setTotalAsyncCallbackSize(totalAsyncBufSize);
proxyClientConfig.setAliveConnections(aliveConnectionNum);
proxyClientConfig.setRequestTimeoutMs(maxSenderTimeout * 1000L);

proxyClientConfig.setIoThreadNum(ioThreadNum);
proxyClientConfig.setEnableBusyWait(enableBusyWait);
Expand Down Expand Up @@ -242,7 +243,7 @@ private void sendBatchWithRetryCount(SenderMessage message, int retry) {
message.getTotalSize(), auditVersion);
asyncSendByMessageSender(cb, message.getDataList(), message.getGroupId(),
message.getStreamId(), message.getDataTime(), SEQUENTIAL_ID.getNextUuid(),
maxSenderTimeout, TimeUnit.SECONDS, message.getExtraMap(), proxySend);
message.getExtraMap(), proxySend);
getMetricItem(message.getGroupId(), message.getStreamId()).pluginSendCount.addAndGet(
message.getMsgCnt());
suc = true;
Expand Down Expand Up @@ -270,11 +271,9 @@ private void sendBatchWithRetryCount(SenderMessage message, int retry) {

private void asyncSendByMessageSender(SendMessageCallback cb,
List<byte[]> bodyList, String groupId, String streamId, long dataTime, String msgUUID,
long timeout, TimeUnit timeUnit,
Map<String, String> extraAttrMap, boolean isProxySend) throws ProxysdkException {
sender.asyncSendMessage(cb, bodyList, groupId,
streamId, dataTime, msgUUID,
timeout, timeUnit, extraAttrMap, isProxySend);
streamId, dataTime, msgUUID, extraAttrMap, isProxySend);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ public void testNormalAck() {
return null;
}).when(senderManager, "asyncSendByMessageSender", Mockito.any(),
Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyLong(), Mockito.any(),
Mockito.anyLong(), Mockito.any(),
Mockito.any(), Mockito.anyBoolean());
senderManager.Start();
Long offset = 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public class ConfigConstants {
public static final String REMOTE_ENCRYPT_CACHE_FILE_SUFFIX = ".pubKey";
// authorization key
public static final String BASIC_AUTH_HEADER = "authorization";

// default region name
public static final String VAL_DEF_REGION_NAME = "";
// config info sync interval in minutes
public static final int VAL_DEF_CONFIG_SYNC_INTERVAL_MIN = 3;
public static final int VAL_MIN_CONFIG_SYNC_INTERVAL_MIN = 1;
Expand All @@ -43,6 +44,10 @@ public class ConfigConstants {
public static final int VAL_MAX_RETRY_IF_CONFIG_SYNC_FAIL = 5;
// cache config expired time in ms
public static final long VAL_DEF_CACHE_CONFIG_EXPIRED_MS = 20 * 60 * 1000L;
// cache config fail status expired time in ms
public static final long VAL_DEF_CONFIG_FAIL_STATUS_EXPIRED_MS = 1000L;
public static final long VAL_MAX_CONFIG_FAIL_STATUS_EXPIRED_MS = 3 * 60 * 1000L;

// node force choose interval in ms
public static final long VAL_DEF_FORCE_CHOOSE_INR_MS = 10 * 60 * 1000L;
public static final long VAL_MIN_FORCE_CHOOSE_INR_MS = 30 * 1000L;
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

public interface MessageSender {

void close();

/**
* This method provides a synchronized function which you want to send data without packing
*
Expand Down Expand Up @@ -137,5 +139,4 @@ void asyncSendMessage(String inlongGroupId, String inlongStreamId, byte[] body,
void asyncSendMessage(String inlongGroupId, String inlongStreamId, List<byte[]> bodyList,
SendMessageCallback callback) throws ProxysdkException;

void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,24 @@ public class ProxyClientConfig {
private String configStoreBasePath = System.getProperty("user.dir");
// max expired time for config cache.
private long configCacheExpiredMs = ConfigConstants.VAL_DEF_CACHE_CONFIG_EXPIRED_MS;
// max expired time for config query failure status
private long configFailStatusExpiredMs = ConfigConstants.VAL_DEF_CONFIG_FAIL_STATUS_EXPIRED_MS;
// nodes force choose interval ms
private long forceReChooseInrMs = ConfigConstants.VAL_DEF_FORCE_CHOOSE_INR_MS;
private boolean enableAuthentication = false;
private String authSecretId = "";
private String authSecretKey = "";
private String inlongGroupId;
private String regionName = ConfigConstants.VAL_DEF_REGION_NAME;
private int aliveConnections = ConfigConstants.VAL_DEF_ALIVE_CONNECTIONS;
// data encrypt info
private boolean enableDataEncrypt = false;
private String rsaPubKeyUrl = "";
private String userName = "";

private int syncThreadPoolSize;
private int asyncCallbackSize;

private boolean isNeedDataEncry = false;
private String rsaPubKeyUrl = "";
private String tlsServerCertFilePathAndName;
private String tlsServerKey;
private String tlsVersion = "TLSv1.2";
Expand Down Expand Up @@ -240,6 +245,15 @@ public void setConfigCacheExpiredMs(long configCacheExpiredMs) {
this.configCacheExpiredMs = configCacheExpiredMs;
}

public long getConfigFailStatusExpiredMs() {
return configFailStatusExpiredMs;
}

public void setConfigFailStatusExpiredMs(long configFailStatusExpiredMs) {
this.configFailStatusExpiredMs =
Math.min(configFailStatusExpiredMs, ConfigConstants.VAL_MAX_CONFIG_FAIL_STATUS_EXPIRED_MS);
}

public long getForceReChooseInrMs() {
return forceReChooseInrMs;
}
Expand All @@ -253,6 +267,16 @@ public String getInlongGroupId() {
return inlongGroupId;
}

public String getRegionName() {
return regionName;
}

public void setRegionName(String regionName) {
if (StringUtils.isNotBlank(regionName)) {
this.regionName = regionName.trim();
}
}

public int getAliveConnections() {
return this.aliveConnections;
}
Expand All @@ -262,6 +286,33 @@ public void setAliveConnections(int aliveConnections) {
Math.max(ConfigConstants.VAL_MIN_ALIVE_CONNECTIONS, aliveConnections);
}

public boolean isEnableDataEncrypt() {
return enableDataEncrypt;
}

public String getRsaPubKeyUrl() {
return rsaPubKeyUrl;
}

public String getUserName() {
return userName;
}

public void enableDataEncrypt(boolean needDataEncrypt, String userName, String rsaPubKeyUrl) {
this.enableDataEncrypt = needDataEncrypt;
if (!this.enableDataEncrypt) {
return;
}
if (StringUtils.isBlank(userName)) {
throw new IllegalArgumentException("userName is Blank!");
}
if (StringUtils.isBlank(rsaPubKeyUrl)) {
throw new IllegalArgumentException("rsaPubKeyUrl is Blank!");
}
this.userName = userName.trim();
this.rsaPubKeyUrl = rsaPubKeyUrl.trim();
}

public String getTlsServerCertFilePathAndName() {
return tlsServerCertFilePathAndName;
}
Expand Down Expand Up @@ -370,14 +421,6 @@ public void setMaxMsgInFlightPerConn(long maxMsgInFlightPerConn) {
this.maxMsgInFlightPerConn = maxMsgInFlightPerConn;
}

public String getRsaPubKeyUrl() {
return rsaPubKeyUrl;
}

public boolean isNeedDataEncry() {
return isNeedDataEncry;
}

public void setHttpsInfo(String tlsServerCertFilePathAndName, String tlsServerKey) {
if (StringUtils.isBlank(tlsServerCertFilePathAndName)) {
throw new IllegalArgumentException("tlsServerCertFilePathAndName is Blank!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public Map<String, HostInfo> getHostMap() {
public void setHostMap(Map<String, HostInfo> hostMap) {
this.hostMap = hostMap;
}

public boolean isNodesEmpty() {
return this.hostMap.isEmpty();
}
Expand Down
Loading

0 comments on commit 1c9dd7d

Please sign in to comment.