From 1c9dd7df9db2e5679457c9cee6acca7171f0bf45 Mon Sep 17 00:00:00 2001 From: Goson Zhang <4675739@qq.com> Date: Mon, 16 Dec 2024 10:06:18 +0800 Subject: [PATCH] [INLONG-11599][SDK] Optimize the configuration related content in the ProxyClientConfig class (#11600) Co-authored-by: gosonzhang --- .../sinks/filecollect/SenderManager.java | 7 +- .../sinks/filecollect/TestSenderManager.java | 1 - .../inlong/sdk/dataproxy/ConfigConstants.java | 7 +- .../sdk/dataproxy/DefaultMessageSender.java | 117 +++---- .../inlong/sdk/dataproxy/MessageSender.java | 3 +- .../sdk/dataproxy/ProxyClientConfig.java | 63 +++- .../dataproxy/config/ProxyConfigEntry.java | 1 + .../dataproxy/config/ProxyConfigManager.java | 301 +++++++++++++----- .../sdk/dataproxy/network/ClientMgr.java | 2 +- .../sdk/dataproxy/network/QueueObject.java | 5 +- .../inlong/sdk/dataproxy/network/Sender.java | 39 +-- .../network/SyncMessageCallable.java | 11 +- .../dataproxy/threads/MetricWorkerThread.java | 3 +- .../dataproxy/threads/TimeoutScanThread.java | 10 +- 14 files changed, 356 insertions(+), 214 deletions(-) diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java index ec4502a7fb..9ac9083ad8 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java @@ -203,6 +203,7 @@ private void createMessageSender() throws Exception { authSecretKey); proxyClientConfig.setTotalAsyncCallbackSize(totalAsyncBufSize); proxyClientConfig.setAliveConnections(aliveConnectionNum); + proxyClientConfig.setRequestTimeoutMs(maxSenderTimeout * 1000L); proxyClientConfig.setIoThreadNum(ioThreadNum); proxyClientConfig.setEnableBusyWait(enableBusyWait); @@ -242,7 +243,7 @@ private void sendBatchWithRetryCount(SenderMessage message, int retry) { message.getTotalSize(), auditVersion); asyncSendByMessageSender(cb, message.getDataList(), message.getGroupId(), message.getStreamId(), message.getDataTime(), SEQUENTIAL_ID.getNextUuid(), - maxSenderTimeout, TimeUnit.SECONDS, message.getExtraMap(), proxySend); + message.getExtraMap(), proxySend); getMetricItem(message.getGroupId(), message.getStreamId()).pluginSendCount.addAndGet( message.getMsgCnt()); suc = true; @@ -270,11 +271,9 @@ private void sendBatchWithRetryCount(SenderMessage message, int retry) { private void asyncSendByMessageSender(SendMessageCallback cb, List bodyList, String groupId, String streamId, long dataTime, String msgUUID, - long timeout, TimeUnit timeUnit, Map extraAttrMap, boolean isProxySend) throws ProxysdkException { sender.asyncSendMessage(cb, bodyList, groupId, - streamId, dataTime, msgUUID, - timeout, timeUnit, extraAttrMap, isProxySend); + streamId, dataTime, msgUUID, extraAttrMap, isProxySend); } /** diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java index 9655e757ef..508e21588f 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java @@ -96,7 +96,6 @@ public void testNormalAck() { return null; }).when(senderManager, "asyncSendByMessageSender", Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyLong(), Mockito.any(), - Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()); senderManager.Start(); Long offset = 0L; diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java index 7adc5087af..0216b77c2c 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java @@ -32,7 +32,8 @@ public class ConfigConstants { public static final String REMOTE_ENCRYPT_CACHE_FILE_SUFFIX = ".pubKey"; // authorization key public static final String BASIC_AUTH_HEADER = "authorization"; - + // default region name + public static final String VAL_DEF_REGION_NAME = ""; // config info sync interval in minutes public static final int VAL_DEF_CONFIG_SYNC_INTERVAL_MIN = 3; public static final int VAL_MIN_CONFIG_SYNC_INTERVAL_MIN = 1; @@ -43,6 +44,10 @@ public class ConfigConstants { public static final int VAL_MAX_RETRY_IF_CONFIG_SYNC_FAIL = 5; // cache config expired time in ms public static final long VAL_DEF_CACHE_CONFIG_EXPIRED_MS = 20 * 60 * 1000L; + // cache config fail status expired time in ms + public static final long VAL_DEF_CONFIG_FAIL_STATUS_EXPIRED_MS = 1000L; + public static final long VAL_MAX_CONFIG_FAIL_STATUS_EXPIRED_MS = 3 * 60 * 1000L; + // node force choose interval in ms public static final long VAL_DEF_FORCE_CHOOSE_INR_MS = 10 * 60 * 1000L; public static final long VAL_MIN_FORCE_CHOOSE_INR_MS = 30 * 1000L; diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java index 61a68e0d9c..02157fe95a 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java @@ -40,15 +40,12 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; public class DefaultMessageSender implements MessageSender { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultMessageSender.class); - private static final long DEFAULT_SEND_TIMEOUT = 100; - private static final TimeUnit DEFAULT_SEND_TIMEUNIT = TimeUnit.MILLISECONDS; private static final ConcurrentHashMap CACHE_SENDER = new ConcurrentHashMap<>(); private static final AtomicBoolean MANAGER_FETCHER_THREAD_STARTED = new AtomicBoolean(false); @@ -137,6 +134,20 @@ public static void finallyCleanup() { CACHE_SENDER.clear(); } + @Override + public void close() { + LOGGER.info("ready to close resources, may need five minutes !"); + if (sender.getClusterId() != -1) { + CACHE_SENDER.remove(sender.getClusterId()); + } + sender.close(); + shutdownInternalThreads(); + } + + public ProxyClientConfig getProxyClientConfig() { + return sender.getConfigure(); + } + public boolean isSupportLF() { return isSupportLF; } @@ -433,25 +444,25 @@ public SendResult sendMessage(List bodyList, String groupId, String stre @Override public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt, String msgUUID, Map extraAttrMap) throws ProxysdkException { - + asyncSendMessage(callback, body, groupId, streamId, dt, msgUUID, extraAttrMap, false); } @Override public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt, String msgUUID) throws ProxysdkException { - + asyncSendMessage(callback, body, groupId, streamId, dt, msgUUID, false); } @Override public void asyncSendMessage(SendMessageCallback callback, List bodyList, String groupId, String streamId, long dt, String msgUUID) throws ProxysdkException { - + asyncSendMessage(callback, bodyList, groupId, streamId, dt, msgUUID, false); } @Override public void asyncSendMessage(SendMessageCallback callback, List bodyList, String groupId, String streamId, long dt, String msgUUID, Map extraAttrMap) throws ProxysdkException { - + asyncSendMessage(callback, bodyList, groupId, streamId, dt, msgUUID, extraAttrMap, false); } /** @@ -508,11 +519,6 @@ public SendResult sendMessage(List bodyList, String groupId, String stre return null; } - public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt, - String msgUUID, long timeout, TimeUnit timeUnit) throws ProxysdkException { - asyncSendMessage(callback, body, groupId, streamId, dt, msgUUID, timeout, timeUnit, false); - } - /** * async send single message * @@ -522,13 +528,11 @@ public void asyncSendMessage(SendMessageCallback callback, byte[] body, String g * @param streamId streamId * @param dt data report timestamp * @param msgUUID msg uuid - * @param timeout - * @param timeUnit * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ * @throws ProxysdkException */ - public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt, - String msgUUID, long timeout, TimeUnit timeUnit, boolean isProxySend) throws ProxysdkException { + public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, + String streamId, long dt, String msgUUID, boolean isProxySend) throws ProxysdkException { dt = ProxyUtils.covertZeroDt(dt); if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) { throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString()); @@ -549,7 +553,7 @@ public void asyncSendMessage(SendMessageCallback callback, byte[] body, String g isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(), groupId, streamId, proxySend); encodeObject.setSupportLF(isSupportLF); - sender.asyncSendMessage(encodeObject, callback, msgUUID, timeout, timeUnit); + sender.asyncSendMessage(encodeObject, callback, msgUUID); } else if (msgtype == 3 || msgtype == 5) { if (isCompressEnd) { if (isProxySend) { @@ -558,23 +562,15 @@ public void asyncSendMessage(SendMessageCallback callback, byte[] body, String g sender.asyncSendMessage(new EncodeObject(Collections.singletonList(body), "groupId=" + groupId + "&streamId=" + streamId + "&dt=" + dt + "&cp=snappy" + proxySend, idGenerator.getNextId(), this.getMsgtype(), true, groupId), - callback, msgUUID, timeout, timeUnit); + callback, msgUUID); } else { sender.asyncSendMessage( new EncodeObject(Collections.singletonList(body), "groupId=" + groupId + "&streamId=" + streamId + "&dt=" + dt + proxySend, idGenerator.getNextId(), this.getMsgtype(), false, groupId), - callback, - msgUUID, timeout, timeUnit); + callback, msgUUID); } } - - } - - public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt, - String msgUUID, long timeout, TimeUnit timeUnit, Map extraAttrMap) - throws ProxysdkException { - asyncSendMessage(callback, body, groupId, streamId, dt, msgUUID, timeout, timeUnit, extraAttrMap, false); } /** @@ -586,15 +582,12 @@ public void asyncSendMessage(SendMessageCallback callback, byte[] body, String g * @param streamId streamId * @param dt data report timestamp * @param msgUUID msg uuid - * @param timeout - * @param timeUnit * @param extraAttrMap extra attributes * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ * @throws ProxysdkException */ public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt, - String msgUUID, long timeout, TimeUnit timeUnit, Map extraAttrMap, boolean isProxySend) - throws ProxysdkException { + String msgUUID, Map extraAttrMap, boolean isProxySend) throws ProxysdkException { dt = ProxyUtils.covertZeroDt(dt); if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt) || !ProxyUtils.isAttrKeysValid(extraAttrMap)) { throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString()); @@ -615,28 +608,23 @@ public void asyncSendMessage(SendMessageCallback callback, byte[] body, String g isReport, isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(), groupId, streamId, attrs.toString()); encodeObject.setSupportLF(isSupportLF); - sender.asyncSendMessage(encodeObject, callback, msgUUID, timeout, timeUnit); + sender.asyncSendMessage(encodeObject, callback, msgUUID); } else if (msgtype == 3 || msgtype == 5) { attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId).append("&dt=").append(dt); if (isCompressEnd) { attrs.append("&cp=snappy"); sender.asyncSendMessage(new EncodeObject(Collections.singletonList(body), attrs.toString(), idGenerator.getNextId(), this.getMsgtype(), true, groupId), - callback, msgUUID, timeout, timeUnit); + callback, msgUUID); } else { sender.asyncSendMessage( new EncodeObject(Collections.singletonList(body), attrs.toString(), idGenerator.getNextId(), this.getMsgtype(), false, groupId), - callback, msgUUID, timeout, timeUnit); + callback, msgUUID); } } } - public void asyncSendMessage(SendMessageCallback callback, List bodyList, String groupId, String streamId, - long dt, String msgUUID, long timeout, TimeUnit timeUnit) throws ProxysdkException { - asyncSendMessage(callback, bodyList, groupId, streamId, dt, msgUUID, timeout, timeUnit, false); - } - /** * async send a batch of messages * @@ -646,14 +634,11 @@ public void asyncSendMessage(SendMessageCallback callback, List bodyList * @param streamId streamId * @param dt data report time * @param msgUUID msg uuid - * @param timeout - * @param timeUnit * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ * @throws ProxysdkException */ public void asyncSendMessage(SendMessageCallback callback, List bodyList, - String groupId, String streamId, long dt, String msgUUID, - long timeout, TimeUnit timeUnit, boolean isProxySend) throws ProxysdkException { + String groupId, String streamId, long dt, String msgUUID, boolean isProxySend) throws ProxysdkException { dt = ProxyUtils.covertZeroDt(dt); if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)) { throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString()); @@ -671,7 +656,7 @@ public void asyncSendMessage(SendMessageCallback callback, List bodyList isReport, isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(), groupId, streamId, proxySend); encodeObject.setSupportLF(isSupportLF); - sender.asyncSendMessage(encodeObject, callback, msgUUID, timeout, timeUnit); + sender.asyncSendMessage(encodeObject, callback, msgUUID); } else if (msgtype == 3 || msgtype == 5) { if (isProxySend) { proxySend = "&" + proxySend; @@ -682,24 +667,18 @@ public void asyncSendMessage(SendMessageCallback callback, List bodyList + "&dt=" + dt + "&cp=snappy" + "&cnt=" + bodyList.size() + proxySend, idGenerator.getNextId(), this.getMsgtype(), true, groupId), - callback, msgUUID, timeout, timeUnit); + callback, msgUUID); } else { sender.asyncSendMessage( new EncodeObject(bodyList, "groupId=" + groupId + "&streamId=" + streamId + "&dt=" + dt + "&cnt=" + bodyList.size() + proxySend, idGenerator.getNextId(), this.getMsgtype(), false, groupId), - callback, msgUUID, timeout, timeUnit); + callback, msgUUID); } } } - public void asyncSendMessage(SendMessageCallback callback, - List bodyList, String groupId, String streamId, long dt, String msgUUID, - long timeout, TimeUnit timeUnit, Map extraAttrMap) throws ProxysdkException { - asyncSendMessage(callback, bodyList, groupId, streamId, dt, msgUUID, timeout, timeUnit, extraAttrMap, false); - } - /** * async send a batch of messages * @@ -709,15 +688,12 @@ public void asyncSendMessage(SendMessageCallback callback, * @param streamId streamId * @param dt data report time * @param msgUUID msg uuid - * @param timeout - * @param timeUnit * @param extraAttrMap extra attributes * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ * @throws ProxysdkException */ public void asyncSendMessage(SendMessageCallback callback, List bodyList, String groupId, String streamId, long dt, String msgUUID, - long timeout, TimeUnit timeUnit, Map extraAttrMap, boolean isProxySend) throws ProxysdkException { dt = ProxyUtils.covertZeroDt(dt); if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt) || !ProxyUtils.isAttrKeysValid( @@ -739,17 +715,17 @@ public void asyncSendMessage(SendMessageCallback callback, isCompress, isReport, isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(), groupId, streamId, attrs.toString()); encodeObject.setSupportLF(isSupportLF); - sender.asyncSendMessage(encodeObject, callback, msgUUID, timeout, timeUnit); + sender.asyncSendMessage(encodeObject, callback, msgUUID); } else if (msgtype == 3 || msgtype == 5) { attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId) .append("&dt=").append(dt).append("&cnt=").append(bodyList.size()); if (isCompress) { attrs.append("&cp=snappy"); sender.asyncSendMessage(new EncodeObject(bodyList, attrs.toString(), idGenerator.getNextId(), - this.getMsgtype(), true, groupId), callback, msgUUID, timeout, timeUnit); + this.getMsgtype(), true, groupId), callback, msgUUID); } else { sender.asyncSendMessage(new EncodeObject(bodyList, attrs.toString(), idGenerator.getNextId(), - this.getMsgtype(), false, groupId), callback, msgUUID, timeout, timeUnit); + this.getMsgtype(), false, groupId), callback, msgUUID); } } @@ -767,8 +743,8 @@ public void asyncSendMessage(SendMessageCallback callback, @Override public void asyncSendMessage(String inlongGroupId, String inlongStreamId, byte[] body, SendMessageCallback callback) throws ProxysdkException { - this.asyncSendMessage(callback, body, inlongGroupId, inlongStreamId, System.currentTimeMillis(), - idGenerator.getNextId(), DEFAULT_SEND_TIMEOUT, DEFAULT_SEND_TIMEUNIT); + this.asyncSendMessage(callback, body, inlongGroupId, + inlongStreamId, System.currentTimeMillis(), idGenerator.getNextId()); } /** @@ -783,8 +759,8 @@ public void asyncSendMessage(String inlongGroupId, String inlongStreamId, byte[] */ public void asyncSendMessage(String inlongGroupId, String inlongStreamId, byte[] body, SendMessageCallback callback, boolean isProxySend) throws ProxysdkException { - this.asyncSendMessage(callback, body, inlongGroupId, inlongStreamId, System.currentTimeMillis(), - idGenerator.getNextId(), DEFAULT_SEND_TIMEOUT, DEFAULT_SEND_TIMEUNIT, isProxySend); + this.asyncSendMessage(callback, body, inlongGroupId, + inlongStreamId, System.currentTimeMillis(), idGenerator.getNextId(), isProxySend); } /** @@ -799,8 +775,8 @@ public void asyncSendMessage(String inlongGroupId, String inlongStreamId, byte[] @Override public void asyncSendMessage(String inlongGroupId, String inlongStreamId, List bodyList, SendMessageCallback callback) throws ProxysdkException { - this.asyncSendMessage(callback, bodyList, inlongGroupId, inlongStreamId, System.currentTimeMillis(), - idGenerator.getNextId(), DEFAULT_SEND_TIMEOUT, DEFAULT_SEND_TIMEUNIT); + this.asyncSendMessage(callback, bodyList, inlongGroupId, + inlongStreamId, System.currentTimeMillis(), idGenerator.getNextId()); } /** @@ -815,8 +791,8 @@ public void asyncSendMessage(String inlongGroupId, String inlongStreamId, List bodyList, SendMessageCallback callback, boolean isProxySend) throws ProxysdkException { - this.asyncSendMessage(callback, bodyList, inlongGroupId, inlongStreamId, System.currentTimeMillis(), - idGenerator.getNextId(), DEFAULT_SEND_TIMEOUT, DEFAULT_SEND_TIMEUNIT, isProxySend); + this.asyncSendMessage(callback, bodyList, inlongGroupId, + inlongStreamId, System.currentTimeMillis(), idGenerator.getNextId(), isProxySend); } private void addIndexCnt(String groupId, String streamId, long cnt) { @@ -837,13 +813,4 @@ private void shutdownInternalThreads() { indexCol.shutDown(); MANAGER_FETCHER_THREAD_STARTED.set(false); } - - public void close() { - LOGGER.info("ready to close resources, may need five minutes !"); - if (sender.getClusterId() != -1) { - CACHE_SENDER.remove(sender.getClusterId()); - } - sender.close(); - shutdownInternalThreads(); - } } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java index e980e65974..2a4ae6313a 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java @@ -26,6 +26,8 @@ public interface MessageSender { + void close(); + /** * This method provides a synchronized function which you want to send data without packing * @@ -137,5 +139,4 @@ void asyncSendMessage(String inlongGroupId, String inlongStreamId, byte[] body, void asyncSendMessage(String inlongGroupId, String inlongStreamId, List bodyList, SendMessageCallback callback) throws ProxysdkException; - void close(); } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java index 7af91dba34..6a80fbf21c 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java @@ -39,19 +39,24 @@ public class ProxyClientConfig { private String configStoreBasePath = System.getProperty("user.dir"); // max expired time for config cache. private long configCacheExpiredMs = ConfigConstants.VAL_DEF_CACHE_CONFIG_EXPIRED_MS; + // max expired time for config query failure status + private long configFailStatusExpiredMs = ConfigConstants.VAL_DEF_CONFIG_FAIL_STATUS_EXPIRED_MS; // nodes force choose interval ms private long forceReChooseInrMs = ConfigConstants.VAL_DEF_FORCE_CHOOSE_INR_MS; private boolean enableAuthentication = false; private String authSecretId = ""; private String authSecretKey = ""; private String inlongGroupId; + private String regionName = ConfigConstants.VAL_DEF_REGION_NAME; private int aliveConnections = ConfigConstants.VAL_DEF_ALIVE_CONNECTIONS; + // data encrypt info + private boolean enableDataEncrypt = false; + private String rsaPubKeyUrl = ""; + private String userName = ""; private int syncThreadPoolSize; private int asyncCallbackSize; - private boolean isNeedDataEncry = false; - private String rsaPubKeyUrl = ""; private String tlsServerCertFilePathAndName; private String tlsServerKey; private String tlsVersion = "TLSv1.2"; @@ -240,6 +245,15 @@ public void setConfigCacheExpiredMs(long configCacheExpiredMs) { this.configCacheExpiredMs = configCacheExpiredMs; } + public long getConfigFailStatusExpiredMs() { + return configFailStatusExpiredMs; + } + + public void setConfigFailStatusExpiredMs(long configFailStatusExpiredMs) { + this.configFailStatusExpiredMs = + Math.min(configFailStatusExpiredMs, ConfigConstants.VAL_MAX_CONFIG_FAIL_STATUS_EXPIRED_MS); + } + public long getForceReChooseInrMs() { return forceReChooseInrMs; } @@ -253,6 +267,16 @@ public String getInlongGroupId() { return inlongGroupId; } + public String getRegionName() { + return regionName; + } + + public void setRegionName(String regionName) { + if (StringUtils.isNotBlank(regionName)) { + this.regionName = regionName.trim(); + } + } + public int getAliveConnections() { return this.aliveConnections; } @@ -262,6 +286,33 @@ public void setAliveConnections(int aliveConnections) { Math.max(ConfigConstants.VAL_MIN_ALIVE_CONNECTIONS, aliveConnections); } + public boolean isEnableDataEncrypt() { + return enableDataEncrypt; + } + + public String getRsaPubKeyUrl() { + return rsaPubKeyUrl; + } + + public String getUserName() { + return userName; + } + + public void enableDataEncrypt(boolean needDataEncrypt, String userName, String rsaPubKeyUrl) { + this.enableDataEncrypt = needDataEncrypt; + if (!this.enableDataEncrypt) { + return; + } + if (StringUtils.isBlank(userName)) { + throw new IllegalArgumentException("userName is Blank!"); + } + if (StringUtils.isBlank(rsaPubKeyUrl)) { + throw new IllegalArgumentException("rsaPubKeyUrl is Blank!"); + } + this.userName = userName.trim(); + this.rsaPubKeyUrl = rsaPubKeyUrl.trim(); + } + public String getTlsServerCertFilePathAndName() { return tlsServerCertFilePathAndName; } @@ -370,14 +421,6 @@ public void setMaxMsgInFlightPerConn(long maxMsgInFlightPerConn) { this.maxMsgInFlightPerConn = maxMsgInFlightPerConn; } - public String getRsaPubKeyUrl() { - return rsaPubKeyUrl; - } - - public boolean isNeedDataEncry() { - return isNeedDataEncry; - } - public void setHttpsInfo(String tlsServerCertFilePathAndName, String tlsServerKey) { if (StringUtils.isBlank(tlsServerCertFilePathAndName)) { throw new IllegalArgumentException("tlsServerCertFilePathAndName is Blank!"); diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java index e37b9b2c0a..ebf76464ea 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java @@ -62,6 +62,7 @@ public Map getHostMap() { public void setHostMap(Map hostMap) { this.hostMap = hostMap; } + public boolean isNodesEmpty() { return this.hostMap.isEmpty(); } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java index 033395667b..97952eb1ff 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java @@ -74,9 +74,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; /** @@ -90,17 +91,22 @@ public class ProxyConfigManager extends Thread { private static final Logger logger = LoggerFactory.getLogger(ProxyConfigManager.class); private static final LogCounter exptCounter = new LogCounter(10, 100000, 60 * 1000L); private static final LogCounter parseCounter = new LogCounter(10, 100000, 60 * 1000L); + private static final Map> fetchFailProxyMap = + new ConcurrentHashMap<>(); + private static final Map> fetchFailEncryptMap = + new ConcurrentHashMap<>(); private static final ReentrantReadWriteLock fileRw = new ReentrantReadWriteLock(); private final String callerId; - private ProxyClientConfig clientConfig; private final Gson gson = new Gson(); private final ClientMgr clientManager; private final ThreadLocalRandom random = ThreadLocalRandom.current(); private final AtomicBoolean shutDown = new AtomicBoolean(false); // proxy configure info + private ProxyClientConfig clientConfig = null; private String localProxyConfigStoreFile; private String proxyConfigVisitUrl; + private String proxyQueryFailKey; private String proxyConfigCacheFile; private List proxyInfoList = new ArrayList<>(); private int oldStat = 0; @@ -108,6 +114,7 @@ public class ProxyConfigManager extends Thread { private long lstUpdateTime = 0; // encrypt configure info private String encryptConfigVisitUrl; + private String encryptQueryFailKey; private String encryptConfigCacheFile; private EncryptConfigEntry userEncryptConfigEntry; @@ -118,7 +125,9 @@ public ProxyConfigManager(ProxyClientConfig configure) { public ProxyConfigManager(String callerId, ProxyClientConfig configure, ClientMgr clientManager) { this.callerId = callerId; this.clientManager = clientManager; - this.storeAndBuildMetaConfigure(configure); + if (configure != null) { + this.storeAndBuildMetaConfigure(configure); + } if (this.clientManager != null) { this.setName("ConfigManager-" + this.callerId); logger.info("ConfigManager({}) started, groupId={}", @@ -130,19 +139,20 @@ public ProxyConfigManager(String callerId, ProxyClientConfig configure, ClientMg * Update proxy client configure for query case * * @param configure proxy client configure - * @throws Exception exception + * @return process result */ - public void updProxyClientConfig(ProxyClientConfig configure) throws Exception { + public Tuple2 updProxyClientConfig(ProxyClientConfig configure) { + if (this.shutDown.get()) { + return new Tuple2<>(false, "SDK has shutdown!"); + } if (configure == null) { - throw new Exception("ProxyClientConfig is null"); + return new Tuple2<>(false, "ProxyClientConfig is null"); } if (this.clientManager != null) { - throw new Exception("Not allowed for non meta-query case!"); - } - if (shutDown.get()) { - return; + return new Tuple2<>(false, "Not allowed for non meta-query case!"); } this.storeAndBuildMetaConfigure(configure); + return new Tuple2<>(true, "OK"); } public void shutDown() { @@ -166,6 +176,9 @@ public Tuple2 getGroupIdConfigure(boolean needRetry) t if (shutDown.get()) { return new Tuple2<>(null, "SDK has shutdown!"); } + if (clientConfig == null) { + return new Tuple2<>(null, "Configure not initialized!"); + } if (clientConfig.isOnlyUseLocalProxyConfig()) { return getLocalProxyListFromFile(this.localProxyConfigStoreFile); } else { @@ -183,7 +196,7 @@ public Tuple2 getGroupIdConfigure(boolean needRetry) t break; } // sleep then retry - TimeUnit.MILLISECONDS.sleep(500); + Thread.sleep(500L); } while (++retryCount < clientConfig.getConfigSyncMaxRetryIfFail()); } if (shutDown.get()) { @@ -205,12 +218,15 @@ public Tuple2 getGroupIdConfigure(boolean needRetry) t * @throws Exception ex */ public Tuple2 getEncryptConfigure(boolean needRetry) throws Exception { - if (!clientConfig.isNeedDataEncry()) { + if (!clientConfig.isEnableDataEncrypt()) { return new Tuple2<>(null, "Not need data encrypt!"); } if (shutDown.get()) { return new Tuple2<>(null, "SDK has shutdown!"); } + if (clientConfig == null) { + return new Tuple2<>(null, "Configure not initialized!"); + } EncryptConfigEntry encryptEntry = this.userEncryptConfigEntry; if (encryptEntry != null) { return new Tuple2<>(encryptEntry, "Ok"); @@ -228,7 +244,7 @@ public Tuple2 getEncryptConfigure(boolean needRetry) break; } // sleep then retry - TimeUnit.MILLISECONDS.sleep(500); + Thread.sleep(500L); } while (++retryCount < clientConfig.getConfigSyncMaxRetryIfFail()); } if (shutDown.get()) { @@ -258,7 +274,7 @@ public void run() { } } // update encrypt configure - if (clientConfig.isNeedDataEncry()) { + if (clientConfig.isEnableDataEncrypt()) { try { doEncryptConfigEntryQueryWork(); } catch (Throwable ex) { @@ -288,7 +304,7 @@ public void run() { * @throws Exception */ public void doProxyEntryQueryWork() throws Exception { - if (shutDown.get()) { + if (shutDown.get() || this.clientManager == null) { return; } /* Request the configuration from manager. */ @@ -306,7 +322,7 @@ public void doProxyEntryQueryWork() throws Exception { break; } // sleep then retry. - TimeUnit.SECONDS.sleep(2); + Thread.sleep(2000L); } while (++retryCnt < this.clientConfig.getConfigSyncMaxRetryIfFail() && !shutDown.get()); if (shutDown.get()) { return; @@ -340,7 +356,7 @@ public void doProxyEntryQueryWork() throws Exception { } private void doEncryptConfigEntryQueryWork() throws Exception { - if (shutDown.get()) { + if (shutDown.get() || this.clientManager == null) { return; } int retryCount = 0; @@ -351,7 +367,7 @@ private void doEncryptConfigEntryQueryWork() throws Exception { break; } // sleep then retry - TimeUnit.MILLISECONDS.sleep(500); + Thread.sleep(500L); } while (++retryCount < clientConfig.getConfigSyncMaxRetryIfFail()); if (shutDown.get()) { return; @@ -380,15 +396,21 @@ public Tuple2 getLocalProxyListFromFile(String filePat if (StringUtils.isBlank(strRet)) { return new Tuple2<>(null, "Blank configure local file from " + filePath); } - return getProxyConfigEntry(strRet); + return getProxyConfigEntry(false, strRet); } private Tuple2 requestProxyEntryQuietly() { - List params = buildProxyNodeQueryParams(); + // check cache failure + String qryResult = getManagerQryResultInFailStatus(true); + if (qryResult != null) { + return new Tuple2<>(null, "Query fail(" + qryResult + ") just now, please retry later!"); + } // request meta info from manager + List params = buildProxyNodeQueryParams(); logger.debug("ConfigManager({}) request configure to manager({}), param={}", this.callerId, this.proxyConfigVisitUrl, params); - Tuple2 queryResult = requestConfiguration(this.proxyConfigVisitUrl, params); + Tuple2 queryResult = + requestConfiguration(true, this.proxyConfigVisitUrl, params); if (!queryResult.getF0()) { return new Tuple2<>(null, queryResult.getF1()); } @@ -396,12 +418,20 @@ private Tuple2 requestProxyEntryQuietly() { logger.debug("ConfigManager({}) received configure, from manager({}), groupId={}, result={}", callerId, proxyConfigVisitUrl, clientConfig.getInlongGroupId(), queryResult.getF1()); try { - return getProxyConfigEntry(queryResult.getF1()); + Tuple2 parseResult = + getProxyConfigEntry(true, queryResult.getF1()); + if (parseResult.getF0() == null) { + bookManagerQryFailStatus(true, parseResult.getF1()); + } else { + rmvManagerQryFailStatus(true); + } + return parseResult; } catch (Throwable ex) { if (exptCounter.shouldPrint()) { logger.warn("ConfigManager({}) parse failure, from manager({}), groupId={}, result={}", callerId, proxyConfigVisitUrl, clientConfig.getInlongGroupId(), queryResult.getF1(), ex); } + bookManagerQryFailStatus(true, ex.getMessage()); return new Tuple2<>(null, ex.getMessage()); } } @@ -518,16 +548,23 @@ private Tuple2 tryToReadCacheProxyEntry() { } private Tuple2 requestPubKeyFromManager() { - List params = buildPubKeyQueryParams(); + // check cache failure + String qryResult = getManagerQryResultInFailStatus(false); + if (qryResult != null) { + return new Tuple2<>(null, "Query fail(" + qryResult + ") just now, please retry later!"); + } // request meta info from manager + List params = buildPubKeyQueryParams(); logger.debug("ConfigManager({}) request pubkey to manager({}), param={}", this.callerId, this.encryptConfigVisitUrl, params); - Tuple2 queryResult = requestConfiguration(this.encryptConfigVisitUrl, params); + Tuple2 queryResult = + requestConfiguration(false, this.encryptConfigVisitUrl, params); if (!queryResult.getF0()) { return new Tuple2<>(null, queryResult.getF1()); } logger.debug("ConfigManager({}) received pubkey from manager({}), result={}", this.callerId, this.encryptConfigVisitUrl, queryResult.getF1()); + String errorMsg; JsonObject pubKeyConf; try { pubKeyConf = JsonParser.parseString(queryResult.getF1()).getAsJsonObject(); @@ -536,62 +573,72 @@ private Tuple2 requestPubKeyFromManager() { logger.warn("ConfigManager({}) parse failure, secretId={}, config={}!", this.callerId, this.clientConfig.getAuthSecretId(), queryResult.getF1()); } - return new Tuple2<>(null, "parse pubkey failure:" + ex.getMessage()); + errorMsg = "parse pubkey failure:" + ex.getMessage(); + bookManagerQryFailStatus(false, errorMsg); + return new Tuple2<>(null, errorMsg); } if (pubKeyConf == null) { - return new Tuple2<>(null, "No public key information"); + errorMsg = "No public key information"; + bookManagerQryFailStatus(false, errorMsg); + return new Tuple2<>(null, errorMsg); } - if (!pubKeyConf.has("resultCode")) { - if (parseCounter.shouldPrint()) { - logger.warn("ConfigManager({}) config failure: resultCode field not exist, secretId={}, config={}!", - this.callerId, this.clientConfig.getAuthSecretId(), queryResult.getF1()); - } - return new Tuple2<>(null, "resultCode field not exist"); - } - int resultCode = pubKeyConf.get("resultCode").getAsInt(); - if (resultCode != 0) { - if (parseCounter.shouldPrint()) { - logger.warn("ConfigManager({}) config failure: resultCode != 0, secretId={}, config={}!", - this.callerId, this.clientConfig.getAuthSecretId(), queryResult.getF1()); - } - return new Tuple2<>(null, "resultCode != 0!"); - } - if (!pubKeyConf.has("resultData")) { - if (parseCounter.shouldPrint()) { - logger.warn("ConfigManager({}) config failure: resultData field not exist, secretId={}, config={}!", - this.callerId, this.clientConfig.getAuthSecretId(), queryResult.getF1()); - } - return new Tuple2<>(null, "resultData field not exist"); - } - JsonObject resultData = pubKeyConf.get("resultData").getAsJsonObject(); - if (resultData != null) { - String publicKey = resultData.get("publicKey").getAsString(); - if (StringUtils.isBlank(publicKey)) { + try { + if (!pubKeyConf.has("resultCode")) { if (parseCounter.shouldPrint()) { - logger.warn("ConfigManager({}) config failure: publicKey is blank, secretId={}, config={}!", + logger.warn("ConfigManager({}) config failure: resultCode field not exist, secretId={}, config={}!", this.callerId, this.clientConfig.getAuthSecretId(), queryResult.getF1()); } - return new Tuple2<>(null, "publicKey is blank!"); + throw new Exception("resultCode field not exist"); } - String username = resultData.get("username").getAsString(); - if (StringUtils.isBlank(username)) { + int resultCode = pubKeyConf.get("resultCode").getAsInt(); + if (resultCode != 0) { if (parseCounter.shouldPrint()) { - logger.warn("ConfigManager({}) config failure: username is blank, secretId={}, config={}!", + logger.warn("ConfigManager({}) config failure: resultCode != 0, secretId={}, config={}!", this.callerId, this.clientConfig.getAuthSecretId(), queryResult.getF1()); } - return new Tuple2<>(null, "username is blank!"); + throw new Exception("resultCode != 0!"); } - String versionStr = resultData.get("version").getAsString(); - if (StringUtils.isBlank(versionStr)) { + if (!pubKeyConf.has("resultData")) { if (parseCounter.shouldPrint()) { - logger.warn("ConfigManager({}) config failure: version is blank, secretId={}, config={}!", + logger.warn("ConfigManager({}) config failure: resultData field not exist, secretId={}, config={}!", this.callerId, this.clientConfig.getAuthSecretId(), queryResult.getF1()); } - return new Tuple2<>(null, "version is blank!"); + throw new Exception("resultData field not exist"); + } + JsonObject resultData = pubKeyConf.get("resultData").getAsJsonObject(); + if (resultData != null) { + String publicKey = resultData.get("publicKey").getAsString(); + if (StringUtils.isBlank(publicKey)) { + if (parseCounter.shouldPrint()) { + logger.warn("ConfigManager({}) config failure: publicKey is blank, secretId={}, config={}!", + this.callerId, this.clientConfig.getAuthSecretId(), queryResult.getF1()); + } + throw new Exception("publicKey is blank!"); + } + String username = resultData.get("username").getAsString(); + if (StringUtils.isBlank(username)) { + if (parseCounter.shouldPrint()) { + logger.warn("ConfigManager({}) config failure: username is blank, secretId={}, config={}!", + this.callerId, this.clientConfig.getAuthSecretId(), queryResult.getF1()); + } + throw new Exception("username is blank!"); + } + String versionStr = resultData.get("version").getAsString(); + if (StringUtils.isBlank(versionStr)) { + if (parseCounter.shouldPrint()) { + logger.warn("ConfigManager({}) config failure: version is blank, secretId={}, config={}!", + this.callerId, this.clientConfig.getAuthSecretId(), queryResult.getF1()); + } + throw new Exception("version is blank!"); + } + rmvManagerQryFailStatus(false); + return new Tuple2<>(new EncryptConfigEntry(username, versionStr, publicKey), "Ok"); } - return new Tuple2<>(new EncryptConfigEntry(username, versionStr, publicKey), "Ok"); + throw new Exception("resultData value is null!"); + } catch (Throwable ex) { + bookManagerQryFailStatus(false, ex.getMessage()); + return new Tuple2<>(null, ex.getMessage()); } - return new Tuple2<>(null, "resultData value is null!"); } private void updateEncryptConfigEntry(EncryptConfigEntry newEncryptEntry) { @@ -674,7 +721,8 @@ private void writeCachePubKeyEntryFile(EncryptConfigEntry entry) { } /* Request new configurations from Manager. */ - private Tuple2 requestConfiguration(String url, List params) { + private Tuple2 requestConfiguration( + boolean queryProxyInfo, String url, List params) { HttpParams myParams = new BasicHttpParams(); HttpConnectionParams.setConnectionTimeout(myParams, clientConfig.getManagerConnTimeoutMs()); HttpConnectionParams.setSoTimeout(myParams, clientConfig.getManagerSocketTimeoutMs()); @@ -702,13 +750,20 @@ private Tuple2 requestConfiguration(String url, List(false, response.getStatusLine().getStatusCode() - + ":" + response.getStatusLine().getStatusCode()); + errMsg = response.getStatusLine().getStatusCode() + + ":" + response.getStatusLine().getReasonPhrase(); + if (response.getStatusLine().getStatusCode() >= 500) { + bookManagerQryFailStatus(queryProxyInfo, errMsg); + } + return new Tuple2<>(false, errMsg); } String returnStr = EntityUtils.toString(response.getEntity()); if (StringUtils.isBlank(returnStr)) { - return new Tuple2<>(false, "query result is blank!"); + errMsg = "server return blank entity!"; + bookManagerQryFailStatus(queryProxyInfo, errMsg); + return new Tuple2<>(false, errMsg); } return new Tuple2<>(true, returnStr); } catch (Throwable ex) { @@ -755,6 +810,11 @@ private void storeAndBuildMetaConfigure(ProxyClientConfig config) { .append(ConfigConstants.MANAGER_DATAPROXY_API).append(clientConfig.getInlongGroupId()) .toString(); strBuff.delete(0, strBuff.length()); + this.proxyQueryFailKey = strBuff + .append("proxy:").append(clientConfig.getInlongGroupId()) + .append("#").append(clientConfig.getRegionName()) + .append("#").append(clientConfig.getProtocolType()).toString(); + strBuff.delete(0, strBuff.length()); this.localProxyConfigStoreFile = strBuff .append(clientConfig.getConfigStoreBasePath()) .append(ConfigConstants.META_STORE_SUB_DIR) @@ -770,6 +830,9 @@ private void storeAndBuildMetaConfigure(ProxyClientConfig config) { .toString(); strBuff.delete(0, strBuff.length()); this.encryptConfigVisitUrl = clientConfig.getRsaPubKeyUrl(); + this.encryptQueryFailKey = strBuff + .append("encrypt:").append(clientConfig.getUserName()).toString(); + strBuff.delete(0, strBuff.length()); this.encryptConfigCacheFile = strBuff .append(clientConfig.getConfigStoreBasePath()) .append(ConfigConstants.META_STORE_SUB_DIR) @@ -795,23 +858,85 @@ private List buildProxyNodeQueryParams() { private List buildPubKeyQueryParams() { List params = new ArrayList<>(); params.add(new BasicNameValuePair("operation", "query")); - params.add(new BasicNameValuePair("username", clientConfig.getAuthSecretId())); + params.add(new BasicNameValuePair("username", clientConfig.getUserName())); return params; } - private Tuple2 getProxyConfigEntry(String strRet) { - DataProxyNodeResponse proxyCluster; - try { - proxyCluster = gson.fromJson(strRet, DataProxyNodeResponse.class); - } catch (Throwable ex) { - if (parseCounter.shouldPrint()) { - logger.warn("ConfigManager({}) parse exception, groupId={}, config={}", - this.callerId, clientConfig.getInlongGroupId(), strRet, ex); + private void bookManagerQryFailStatus(boolean proxyQry, String errMsg) { + if (proxyQry) { + fetchFailProxyMap.put(proxyQueryFailKey, + new Tuple2<>(new AtomicLong(System.currentTimeMillis()), errMsg)); + } else { + fetchFailEncryptMap.put(encryptQueryFailKey, + new Tuple2<>(new AtomicLong(System.currentTimeMillis()), errMsg)); + } + } + + private void rmvManagerQryFailStatus(boolean proxyQry) { + if (proxyQry) { + fetchFailProxyMap.remove(proxyQueryFailKey); + } else { + fetchFailEncryptMap.remove(encryptQueryFailKey); + } + } + + private String getManagerQryResultInFailStatus(boolean proxyQry) { + if (clientConfig.getConfigFailStatusExpiredMs() <= 0) { + return null; + } + Tuple2 queryResult; + if (proxyQry) { + queryResult = fetchFailProxyMap.get(proxyQueryFailKey); + } else { + queryResult = fetchFailEncryptMap.get(encryptQueryFailKey); + } + if (queryResult != null + && (System.currentTimeMillis() - queryResult.getF0().get() < clientConfig + .getConfigFailStatusExpiredMs())) { + return queryResult.getF1(); + } + return null; + } + + private Tuple2 getProxyConfigEntry(boolean fromManager, String strRet) { + DataProxyNodeResponse proxyNodeConfig; + if (fromManager) { + ProxyClusterConfig clusterConfig; + try { + clusterConfig = gson.fromJson(strRet, ProxyClusterConfig.class); + } catch (Throwable ex) { + if (parseCounter.shouldPrint()) { + logger.warn("ConfigManager({}) parse exception, groupId={}, config={}", + this.callerId, clientConfig.getInlongGroupId(), strRet, ex); + } + return new Tuple2<>(null, "parse failure:" + ex.getMessage()); + } + if (clusterConfig == null) { + return new Tuple2<>(null, "content parse result is null!"); + } + if (!clusterConfig.isSuccess()) { + return new Tuple2<>(null, clusterConfig.getErrMsg()); + } + if (clusterConfig.getData() == null) { + return new Tuple2<>(null, "return data content is null!"); + } + proxyNodeConfig = clusterConfig.getData(); + } else { + try { + proxyNodeConfig = gson.fromJson(strRet, DataProxyNodeResponse.class); + } catch (Throwable ex) { + if (parseCounter.shouldPrint()) { + logger.warn("ConfigManager({}) parse local file exception, groupId={}, config={}", + this.callerId, clientConfig.getInlongGroupId(), strRet, ex); + } + return new Tuple2<>(null, "parse file failure:" + ex.getMessage()); + } + if (proxyNodeConfig == null) { + return new Tuple2<>(null, "file content parse result is null!"); } - return new Tuple2<>(null, "parse failure:" + ex.getMessage()); } // parse nodeList - List nodeList = proxyCluster.getNodeList(); + List nodeList = proxyNodeConfig.getNodeList(); if (CollectionUtils.isEmpty(nodeList)) { return new Tuple2<>(null, "nodeList is empty!"); } @@ -836,23 +961,23 @@ private Tuple2 getProxyConfigEntry(String strRet) { } // parse clusterId int clusterId = -1; - if (ObjectUtils.isNotEmpty(proxyCluster.getClusterId())) { - clusterId = proxyCluster.getClusterId(); + if (ObjectUtils.isNotEmpty(proxyNodeConfig.getClusterId())) { + clusterId = proxyNodeConfig.getClusterId(); } // parse load int load = ConfigConstants.LOAD_THRESHOLD; - if (ObjectUtils.isNotEmpty(proxyCluster.getLoad())) { - load = proxyCluster.getLoad() > 200 ? 200 : (Math.max(proxyCluster.getLoad(), 0)); + if (ObjectUtils.isNotEmpty(proxyNodeConfig.getLoad())) { + load = proxyNodeConfig.getLoad() > 200 ? 200 : (Math.max(proxyNodeConfig.getLoad(), 0)); } // parse isIntranet boolean isIntranet = true; - if (ObjectUtils.isNotEmpty(proxyCluster.getIsIntranet())) { - isIntranet = proxyCluster.getIsIntranet() == 1; + if (ObjectUtils.isNotEmpty(proxyNodeConfig.getIsIntranet())) { + isIntranet = proxyNodeConfig.getIsIntranet() == 1; } // parse isSwitch int isSwitch = 0; - if (ObjectUtils.isNotEmpty(proxyCluster.getIsSwitch())) { - isSwitch = proxyCluster.getIsSwitch(); + if (ObjectUtils.isNotEmpty(proxyNodeConfig.getIsSwitch())) { + isSwitch = proxyNodeConfig.getIsSwitch(); } // build ProxyConfigEntry ProxyConfigEntry proxyEntry = new ProxyConfigEntry(); @@ -863,7 +988,7 @@ private Tuple2 getProxyConfigEntry(String strRet) { proxyEntry.setSwitchStat(isSwitch); proxyEntry.setLoad(load); proxyEntry.setMaxPacketLength( - proxyCluster.getMaxPacketLength() != null ? proxyCluster.getMaxPacketLength() : -1); + proxyNodeConfig.getMaxPacketLength() != null ? proxyNodeConfig.getMaxPacketLength() : -1); return new Tuple2<>(proxyEntry, "ok"); } } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java index 0444edbfd5..ea9fc15a51 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java @@ -345,7 +345,7 @@ public void updateProxyInfoList(boolean nodeChanged, List newNodes) { if (!realHosts.isEmpty()) { break; } - Thread.sleep(1000); + Thread.sleep(1000L); } while (--maxCycleCnt > 0); // update active nodes if (realHosts.isEmpty()) { diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/QueueObject.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/QueueObject.java index 3699213176..009d513bdb 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/QueueObject.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/QueueObject.java @@ -19,7 +19,6 @@ import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; public class QueueObject { @@ -32,11 +31,11 @@ public class QueueObject { private final int size; public QueueObject(NettyClient client, long sendTimeInMillis, - SendMessageCallback callback, int size, long timeout, TimeUnit timeUnit) { + SendMessageCallback callback, int size, long timeoutMs) { this.client = client; this.sendTimeInMillis = sendTimeInMillis; this.callback = callback; - this.timeoutInMillis = TimeUnit.MILLISECONDS.convert(timeout, timeUnit); + this.timeoutInMillis = timeoutMs; this.size = size; } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java index 13920fb7a6..b56781c0ce 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java @@ -97,7 +97,7 @@ public Sender(ProxyClientConfig configure, ThreadFactory selfDefineFactory) thro if (!configure.isEnableAuthentication()) { throw new Exception("In OutNetwork isNeedAuthentication must be true!"); } - if (!configure.isNeedDataEncry()) { + if (!configure.isEnableDataEncrypt()) { throw new Exception("In OutNetwork isNeedDataEncry must be true!"); } } @@ -201,15 +201,13 @@ public SendResult syncSendMessage(EncodeObject encodeObject, String msgUUID) { clientMgr.getStreamIdNum(encodeObject.getStreamId())); } } - if (this.configure.isNeedDataEncry()) { + if (this.configure.isEnableDataEncrypt()) { encodeObject.setEncryptEntry(true, configure.getAuthSecretId(), clientMgr.getEncryptConfigureInfo()); - } else { - encodeObject.setEncryptEntry(false, null, null); } encodeObject.setMsgUUID(msgUUID); - SyncMessageCallable callable = new SyncMessageCallable(clientResult.getF1(), - encodeObject, configure.getRequestTimeoutMs(), TimeUnit.MILLISECONDS); + SyncMessageCallable callable = new SyncMessageCallable( + clientResult.getF1(), encodeObject, configure.getRequestTimeoutMs()); syncCallables.put(encodeObject.getMessageId(), callable); Future future = threadPool.submit(callable); message = future.get(configure.getRequestTimeoutMs(), TimeUnit.MILLISECONDS); @@ -309,8 +307,8 @@ private boolean validAttribute(String attr) { /** * Following methods used by asynchronously message sending. */ - public void asyncSendMessage(EncodeObject encodeObject, SendMessageCallback callback, String msgUUID, - long timeout, TimeUnit timeUnit) throws ProxysdkException { + public void asyncSendMessage(EncodeObject encodeObject, + SendMessageCallback callback, String msgUUID) throws ProxysdkException { if (!started.get()) { if (callback != null) { callback.onMessageAck(SendResult.SENDER_CLOSED); @@ -381,8 +379,8 @@ public void asyncSendMessage(EncodeObject encodeObject, SendMessageCallback call } int size = 1; if (currentBufferSize.incrementAndGet() >= asyncCallbackMaxSize) { - currentBufferSize.decrementAndGet(); clientResult.getF1().decMsgInFlight(); + currentBufferSize.decrementAndGet(); if (callback != null) { callback.onMessageAck(SendResult.ASYNC_CALLBACK_BUFFER_FULL); return; @@ -393,7 +391,8 @@ public void asyncSendMessage(EncodeObject encodeObject, SendMessageCallback call ConcurrentHashMap msgQueueMap = callbacks.computeIfAbsent(clientResult.getF1().getChannel(), (k) -> new ConcurrentHashMap<>()); QueueObject queueObject = msgQueueMap.putIfAbsent(encodeObject.getMessageId(), - new QueueObject(clientResult.getF1(), System.currentTimeMillis(), callback, size, timeout, timeUnit)); + new QueueObject(clientResult.getF1(), System.currentTimeMillis(), callback, + size, configure.getRequestTimeoutMs())); if (queueObject != null) { if (reqChkLoggCount.shouldPrint()) { logger.warn("Sender({}) found message id {} has existed.", @@ -407,11 +406,9 @@ public void asyncSendMessage(EncodeObject encodeObject, SendMessageCallback call clientMgr.getStreamIdNum(encodeObject.getStreamId())); } } - if (this.configure.isNeedDataEncry()) { + if (this.configure.isEnableDataEncrypt()) { encodeObject.setEncryptEntry(true, configure.getAuthSecretId(), clientMgr.getEncryptConfigureInfo()); - } else { - encodeObject.setEncryptEntry(false, null, null); } encodeObject.setMsgUUID(msgUUID); clientResult.getF1().write(encodeObject); @@ -501,7 +498,7 @@ public void notifyConnectionDisconnected(Channel channel) { } } - /* Deal with unexpected exception. only used for asyc send */ + /* Deal with unexpected exception. only used for async send */ public void waitForAckForChannel(Channel channel) { if (channel == null) { return; @@ -513,14 +510,14 @@ public void waitForAckForChannel(Channel channel) { } try { while (!queueObjMap.isEmpty()) { + if (System.currentTimeMillis() - startTime >= configure.getConCloseWaitPeriodMs()) { + break; + } try { - Thread.sleep(100); + Thread.sleep(100L); } catch (InterruptedException ex1) { // } - if (System.currentTimeMillis() - startTime >= configure.getConCloseWaitPeriodMs()) { - break; - } } } catch (Throwable ex) { if (exptCnt.shouldPrint()) { @@ -559,13 +556,17 @@ public ClientMgr getClientMgr() { return clientMgr; } + public ProxyClientConfig getConfigure() { + return configure; + } + private void checkCallbackList() { // max wait for 1 min try { long startTime = System.currentTimeMillis(); while (currentBufferSize.get() > 0 && System.currentTimeMillis() - startTime < configure.getConCloseWaitPeriodMs()) { - TimeUnit.MILLISECONDS.sleep(300); + Thread.sleep(300L); } if (currentBufferSize.get() > 0) { logger.warn("Sender({}) callback size({}) not empty, force quit!", diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SyncMessageCallable.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SyncMessageCallable.java index bce2ad4468..fa3f2bf14e 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SyncMessageCallable.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SyncMessageCallable.java @@ -37,17 +37,14 @@ public class SyncMessageCallable implements Callable { private final NettyClient client; private final CountDownLatch awaitLatch = new CountDownLatch(1); private final EncodeObject encodeObject; - private final long timeout; - private final TimeUnit timeUnit; + private final long timeoutMs; private SendResult message; - public SyncMessageCallable(NettyClient client, EncodeObject encodeObject, - long timeout, TimeUnit timeUnit) { + public SyncMessageCallable(NettyClient client, EncodeObject encodeObject, long timeoutMs) { this.client = client; this.encodeObject = encodeObject; - this.timeout = timeout; - this.timeUnit = timeUnit; + this.timeoutMs = timeoutMs; } public void update(SendResult message) { @@ -61,7 +58,7 @@ public SendResult call() throws Exception { return SendResult.WRITE_OVER_WATERMARK; } ChannelFuture channelFuture = client.write(encodeObject); - awaitLatch.await(timeout, timeUnit); + awaitLatch.await(timeoutMs, TimeUnit.MILLISECONDS); } catch (Throwable ex) { if (exptCnt.shouldPrint()) { logger.warn("SyncMessageCallable write data throw exception", ex); diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java index 39f8562b52..4d5459b8e1 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java @@ -193,8 +193,7 @@ private void tryToSendMetricToManager(EncodeObject encodeObject, MetricSendCallB callBack.increaseRetry(); try { if (callBack.getRetryCount() < 4) { - sender.asyncSendMessage(encodeObject, callBack, - String.valueOf(System.currentTimeMillis()), 20, TimeUnit.SECONDS); + sender.asyncSendMessage(encodeObject, callBack, String.valueOf(System.currentTimeMillis())); } else { logger.error("Send metric failure: {}", encodeObject.getBodylist()); } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/TimeoutScanThread.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/TimeoutScanThread.java index f53af9b7aa..5211ba9907 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/TimeoutScanThread.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/TimeoutScanThread.java @@ -30,7 +30,6 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; /** * Daemon threads to check timeout for asynchronous callback. @@ -162,7 +161,6 @@ public void run() { checkMessageIdBasedCallbacks(entry.getKey(), entry.getValue()); } checkTimeoutChannel(); - TimeUnit.SECONDS.sleep(1); } catch (Throwable ex) { if (exptCnt.shouldPrint()) { logger.warn("TimeoutScanThread({}) throw exception", sender.getInstanceId(), ex); @@ -172,6 +170,14 @@ public void run() { logger.info("TimeoutScanThread({}) scan, currentBufferSize={}", sender.getInstanceId(), sender.getCurrentBufferSize().get()); } + if (bShutDown) { + break; + } + try { + Thread.sleep(1000L); + } catch (InterruptedException e) { + // + } } logger.info("TimeoutScanThread({}) thread existed !", sender.getInstanceId()); }