Skip to content

Commit

Permalink
[INLONG-11589][SDK] Optimize the implementation of proxy configuratio…
Browse files Browse the repository at this point in the history
…n management (#11590)



Co-authored-by: gosonzhang <[email protected]>
  • Loading branch information
gosonzhang and gosonzhang authored Dec 9, 2024
1 parent ebcd6d3 commit 36a9017
Show file tree
Hide file tree
Showing 14 changed files with 995 additions and 716 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,42 @@ public class ConfigConstants {

public static final String PROXY_SDK_VERSION = "1.2.11";

public static String HTTP = "http://";
public static String HTTPS = "https://";

// dataproxy node config
public static final String MANAGER_DATAPROXY_API = "/inlong/manager/openapi/dataproxy/getIpList/";
public static final String META_STORE_SUB_DIR = "/.inlong/";
public static final String LOCAL_DP_CONFIG_FILE_SUFFIX = ".local";
public static final String REMOTE_DP_CACHE_FILE_SUFFIX = ".proxyip";
public static final String REMOTE_ENCRYPT_CACHE_FILE_SUFFIX = ".pubKey";
// authorization key
public static final String BASIC_AUTH_HEADER = "authorization";

// config info sync interval in minutes
public static final int VAL_DEF_CONFIG_SYNC_INTERVAL_MIN = 3;
public static final int VAL_MIN_CONFIG_SYNC_INTERVAL_MIN = 1;
public static final int VAL_MAX_CONFIG_SYNC_INTERVAL_MIN = 30;
public static final long VAL_UNIT_MIN_TO_MS = 60 * 1000L;
// config info sync max retry if failure
public static final int VAL_DEF_RETRY_IF_CONFIG_SYNC_FAIL = 3;
public static final int VAL_MAX_RETRY_IF_CONFIG_SYNC_FAIL = 5;
// cache config expired time in ms
public static final long VAL_DEF_CACHE_CONFIG_EXPIRED_MS = 20 * 60 * 1000L;
// node force choose interval in ms
public static final long VAL_DEF_FORCE_CHOOSE_INR_MS = 10 * 60 * 1000L;
public static final long VAL_MIN_FORCE_CHOOSE_INR_MS = 30 * 1000L;

// connection timeout in milliseconds
public static final int VAL_DEF_CONNECT_TIMEOUT_MS = 10000;
public static final int VAL_MIN_CONNECT_TIMEOUT_MS = 2000;
public static final int VAL_MAX_CONNECT_TIMEOUT_MS = 60000;
public static final int VAL_DEF_CONNECT_CLOSE_DELAY_MS = 500;
// socket timeout in milliseconds
public static final int VAL_DEF_SOCKET_TIMEOUT_MS = 20000;
public static final int VAL_MIN_SOCKET_TIMEOUT_MS = 2000;
public static final int VAL_MAX_SOCKET_TIMEOUT_MS = 60000;

public static final int ALIVE_CONNECTIONS = 3;
public static final int MAX_TIMEOUT_CNT = 3;
public static final int LOAD_THRESHOLD = 0;
Expand All @@ -43,17 +79,11 @@ public class ConfigConstants {
/* one hour interval */
public static final int PROXY_HTTP_UPDATE_INTERVAL_MINUTES = 60;

public static final int PROXY_UPDATE_MAX_RETRY = 10;

public static final int MAX_LINE_CNT = 30;

// connection timeout in milliseconds
public static final long VAL_DEF_CONNECT_TIMEOUT_MS = 20000L;
public static final long VAL_MIN_CONNECT_TIMEOUT_MS = 1L;
public static final long VAL_DEF_CONNECT_CLOSE_DELAY_MS = 500L;
// request timeout in milliseconds
public static final long VAL_DEF_REQUEST_TIMEOUT_MS = 10000L;
public static final long VAL_MIN_REQUEST_TIMEOUT_MS = 1L;
public static final long VAL_MIN_REQUEST_TIMEOUT_MS = 500L;

public static final int DEFAULT_SEND_BUFFER_SIZE = 16777216;
public static final int DEFAULT_RECEIVE_BUFFER_SIZE = 16777216;
Expand All @@ -65,14 +95,10 @@ public class ConfigConstants {
public static final int FLAG_ALLOW_ENCRYPT = 1 << 6;
public static final int FLAG_ALLOW_COMPRESS = 1 << 5;

public static final String MANAGER_DATAPROXY_API = "/inlong/manager/openapi/dataproxy/getIpList/";
public static LoadBalance DEFAULT_LOAD_BALANCE = LoadBalance.ROBIN;
public static int DEFAULT_VIRTUAL_NODE = 1000;
public static int DEFAULT_RANDOM_MAX_RETRY = 1000;

public static String HTTP = "http://";
public static String HTTPS = "https://";

public static int DEFAULT_SENDER_MAX_ATTEMPT = 1;

/* Reserved attribute data size(bytes). */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.inlong.sdk.dataproxy.network.SequentialID;
import org.apache.inlong.sdk.dataproxy.threads.IndexCollectThread;
import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
import org.apache.inlong.sdk.dataproxy.utils.Tuple2;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -107,17 +108,20 @@ public static DefaultMessageSender generateSenderByClusterId(ProxyClientConfig c
}
LOGGER.info("Initial tcp sender, configure is {}", configure);
// initial sender object
ProxyConfigManager proxyConfigManager = new ProxyConfigManager(configure, null);
proxyConfigManager.setInlongGroupId(configure.getInlongGroupId());
ProxyConfigEntry entry = proxyConfigManager.getGroupIdConfigure();
DefaultMessageSender sender = CACHE_SENDER.get(entry.getClusterId());
ProxyConfigManager proxyConfigManager = new ProxyConfigManager(configure);
Tuple2<ProxyConfigEntry, String> result =
proxyConfigManager.getGroupIdConfigure(true);
if (result.getF0() == null) {
throw new Exception(result.getF1());
}
DefaultMessageSender sender = CACHE_SENDER.get(result.getF0().getClusterId());
if (sender != null) {
return sender;
} else {
DefaultMessageSender tmpMessageSender =
new DefaultMessageSender(configure, selfDefineFactory);
tmpMessageSender.setMaxPacketLength(entry.getMaxPacketLength());
CACHE_SENDER.put(entry.getClusterId(), tmpMessageSender);
tmpMessageSender.setMaxPacketLength(result.getF0().getMaxPacketLength());
CACHE_SENDER.put(result.getF0().getClusterId(), tmpMessageSender);
return tmpMessageSender;
}
}
Expand Down
Loading

0 comments on commit 36a9017

Please sign in to comment.