From 39d798a9d47ed3f5d26049b10f49b1e53950a413 Mon Sep 17 00:00:00 2001 From: gosonzhang Date: Thu, 5 Dec 2024 20:17:13 +0800 Subject: [PATCH] [INLONG-11564][SDK] DataProxy SDK Implementation Optimization --- .../sdk/dataproxy/DefaultMessageSender.java | 2 - .../sdk/dataproxy/ProxyClientConfig.java | 24 -- .../sdk/dataproxy/codec/ProtocolDecoder.java | 53 ++-- .../sdk/dataproxy/codec/ProtocolEncoder.java | 32 +- .../sdk/dataproxy/common/SendResult.java | 2 +- .../dataproxy/config/EncryptConfigEntry.java | 5 +- .../inlong/sdk/dataproxy/config/HostInfo.java | 4 +- .../dataproxy/config/ProxyConfigManager.java | 270 ++++++----------- .../network/SyncMessageCallable.java | 13 +- .../sdk/dataproxy/network/TimeScanObject.java | 4 +- .../threads/ManagerFetcherThread.java | 68 ----- .../sdk/dataproxy/utils/EventLoopUtil.java | 3 +- .../utils/ServiceDiscoveryUtils.java | 279 ------------------ 13 files changed, 153 insertions(+), 606 deletions(-) delete mode 100644 inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/ManagerFetcherThread.java delete mode 100644 inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ServiceDiscoveryUtils.java diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java index 6fa6f6ff9a8..dd22e63cced 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java @@ -29,7 +29,6 @@ import org.apache.inlong.sdk.dataproxy.network.Sender; import org.apache.inlong.sdk.dataproxy.network.SequentialID; import org.apache.inlong.sdk.dataproxy.threads.IndexCollectThread; -import org.apache.inlong.sdk.dataproxy.threads.ManagerFetcherThread; import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils; import org.slf4j.Logger; @@ -52,7 +51,6 @@ public class DefaultMessageSender implements MessageSender { private static final ConcurrentHashMap CACHE_SENDER = new ConcurrentHashMap<>(); private static final AtomicBoolean MANAGER_FETCHER_THREAD_STARTED = new AtomicBoolean(false); - private static ManagerFetcherThread managerFetcherThread; private static final SequentialID idGenerator = new SequentialID(); private final Sender sender; private final IndexCollectThread indexCol; diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java index e7a0f6a3e65..ce8a3b3b390 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java @@ -34,7 +34,6 @@ public class ProxyClientConfig { private String managerIP = ""; private String managerAddress; - private String managerIpLocalPath = System.getProperty("user.dir") + "/.inlong/.managerIps"; private String managerUrl = ""; private int proxyUpdateIntervalMinutes; private int proxyUpdateMaxRetry; @@ -54,7 +53,6 @@ public class ProxyClientConfig { private String authSecretKey; private String protocolType; - private boolean enableSaveManagerVIps = false; // metric configure private MetricConfig metricConfig = new MetricConfig(); @@ -211,28 +209,6 @@ public String getManagerIP() { return managerIP; } - public String getManagerIpLocalPath() { - return managerIpLocalPath; - } - - public void setManagerIpLocalPath(String managerIpLocalPath) throws ProxysdkException { - if (StringUtils.isEmpty(managerIpLocalPath)) { - throw new ProxysdkException("managerIpLocalPath is empty."); - } - if (managerIpLocalPath.charAt(managerIpLocalPath.length() - 1) == '/') { - managerIpLocalPath = managerIpLocalPath.substring(0, managerIpLocalPath.length() - 1); - } - this.managerIpLocalPath = managerIpLocalPath + "/.managerIps"; - } - - public boolean isEnableSaveManagerVIps() { - return enableSaveManagerVIps; - } - - public void setEnableSaveManagerVIps(boolean enable) { - this.enableSaveManagerVIps = enable; - } - public String getConfStoreBasePath() { return confStoreBasePath; } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java index 2038a8b8d68..b7c31cba79e 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java @@ -17,6 +17,8 @@ package org.apache.inlong.sdk.dataproxy.codec; +import org.apache.inlong.sdk.dataproxy.utils.LogCounter; + import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; @@ -29,18 +31,19 @@ public class ProtocolDecoder extends MessageToMessageDecoder { - private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolDecoder.class); - + private static final Logger logger = LoggerFactory.getLogger(ProtocolDecoder.class); + private static final LogCounter decExptCounter = new LogCounter(10, 200000, 60 * 1000L); @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List out) throws Exception { buffer.markReaderIndex(); // totallen int totalLen = buffer.readInt(); - LOGGER.debug("decode totalLen : {}", totalLen); if (totalLen != buffer.readableBytes()) { - LOGGER.error("totalLen is not equal readableBytes.total:" + totalLen - + ";readableBytes:" + buffer.readableBytes()); + if (decExptCounter.shouldPrint()) { + logger.error("Length not equal, totalLen={},readableBytes={},from={}", + totalLen, buffer.readableBytes(), ctx.channel()); + } buffer.resetReaderIndex(); throw new Exception("totalLen is not equal readableBytes.total"); } @@ -48,14 +51,17 @@ protected void decode(ChannelHandlerContext ctx, int msgType = buffer.readByte() & 0x1f; if (msgType == 4) { - LOGGER.info("debug decode"); - } - if (msgType == 3 | msgType == 5) { + if (logger.isDebugEnabled()) { + logger.debug("debug decode"); + } + } else if (msgType == 3 | msgType == 5) { // bodylen int bodyLength = buffer.readInt(); if (bodyLength >= totalLen) { - LOGGER.error("bodyLen is greater than totalLen.totalLen:" + totalLen - + ";bodyLen:" + bodyLength); + if (decExptCounter.shouldPrint()) { + logger.error("bodyLen greater than totalLen, totalLen={},bodyLen={},from={}", + totalLen, bodyLength, ctx.channel()); + } buffer.resetReaderIndex(); throw new Exception("bodyLen is greater than totalLen.totalLen"); } @@ -64,20 +70,19 @@ protected void decode(ChannelHandlerContext ctx, bodyBytes = new byte[bodyLength]; buffer.readBytes(bodyBytes); } - // attrlen + String attrInfo = ""; int attrLength = buffer.readInt(); - byte[] attrBytes = null; if (attrLength > 0) { - attrBytes = new byte[attrLength]; + byte[] attrBytes = new byte[attrLength]; buffer.readBytes(attrBytes); + attrInfo = new String(attrBytes, StandardCharsets.UTF_8); } EncodeObject object; if (bodyBytes == null) { - object = new EncodeObject(new String(attrBytes, StandardCharsets.UTF_8)); + object = new EncodeObject(attrInfo); } else { - object = new EncodeObject(Collections.singletonList(bodyBytes), - new String(attrBytes, StandardCharsets.UTF_8)); + object = new EncodeObject(Collections.singletonList(bodyBytes), attrInfo); } object.setMsgtype(5); out.add(object); @@ -85,12 +90,13 @@ protected void decode(ChannelHandlerContext ctx, int seqId = buffer.readInt(); int attrLen = buffer.readShort(); - byte[] attrBytes = null; + String attrInfo = ""; if (attrLen > 0) { - attrBytes = new byte[attrLen]; + byte[] attrBytes = new byte[attrLen]; buffer.readBytes(attrBytes); + attrInfo = new String(attrBytes, StandardCharsets.UTF_8); } - EncodeObject object = new EncodeObject(new String(attrBytes, StandardCharsets.UTF_8)); + EncodeObject object = new EncodeObject(attrInfo); object.setMessageId(String.valueOf(seqId)); buffer.readShort(); @@ -103,15 +109,14 @@ protected void decode(ChannelHandlerContext ctx, buffer.skipBytes(4 + 1 + 4); // skip datatime, body_ver and body_len final short load = buffer.readShort(); // read from body int attrLen = buffer.readShort(); - byte[] attrBytes = null; + String attrInfo = ""; if (attrLen > 0) { - attrBytes = new byte[attrLen]; + byte[] attrBytes = new byte[attrLen]; buffer.readBytes(attrBytes); + attrInfo = new String(attrBytes, StandardCharsets.UTF_8); } buffer.skipBytes(2); // skip magic - - String attrs = (attrBytes == null ? "" : new String(attrBytes, StandardCharsets.UTF_8)); - EncodeObject object = new EncodeObject(attrs); + EncodeObject object = new EncodeObject(attrInfo); object.setMsgtype(8); object.setLoad(load); out.add(object); diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java index c3d463987ba..1a20766e55e 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java @@ -99,16 +99,17 @@ private ByteBuf writeToBuf8(EncodeObject object) { if (object.isAuth()) { msgType |= FLAG_ALLOW_AUTH; } - int totalLength = 1 + 4 + 1 + 4 + 2 + endAttr.getBytes("utf8").length + 2; + byte[] attrData = endAttr.getBytes(StandardCharsets.UTF_8); + int totalLength = 1 + 4 + 1 + 4 + 2 + attrData.length + 2; buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength); buf.writeInt(totalLength); buf.writeByte(msgType); buf.writeInt((int) object.getDt()); buf.writeByte(1); buf.writeInt(0); - buf.writeShort(endAttr.getBytes("utf8").length); - if (endAttr.getBytes("utf8").length > 0) { - buf.writeBytes(endAttr.getBytes("utf8")); + buf.writeShort(attrData.length); + if (attrData.length > 0) { + buf.writeBytes(attrData); } buf.writeShort(0xee01); } catch (Throwable ex) { @@ -160,7 +161,8 @@ private ByteBuf constructBody(byte[] body, EncodeObject object, if (object.isCompress()) { msgType |= FLAG_ALLOW_COMPRESS; } - totalLength = totalLength + body.length + endAttr.getBytes("utf8").length; + byte[] attrData = endAttr.getBytes(StandardCharsets.UTF_8); + totalLength = totalLength + body.length + attrData.length; buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength); buf.writeInt(totalLength); buf.writeByte(msgType); @@ -181,8 +183,8 @@ private ByteBuf constructBody(byte[] body, EncodeObject object, buf.writeInt(body.length); buf.writeBytes(body); - buf.writeShort(endAttr.getBytes("utf8").length); - buf.writeBytes(endAttr.getBytes("utf8")); + buf.writeShort(attrData.length); + buf.writeBytes(attrData); buf.writeShort(0xee01); } return buf; @@ -207,7 +209,7 @@ private ByteBuf writeToBuf7(EncodeObject object) { ByteArrayOutputStream data = new ByteArrayOutputStream(); for (byte[] entry : object.getBodylist()) { if (totalCnt++ > 0) { - data.write("\n".getBytes("utf8")); + data.write(AttributeConstants.LINE_FEED_SEP.getBytes(StandardCharsets.UTF_8)); } data.write(entry); } @@ -280,14 +282,15 @@ private ByteBuf writeToBuf5(EncodeObject object) { if (object.isEncrypt()) { msgType |= FLAG_ALLOW_ENCRYPT; } - totalLength = totalLength + body.length + msgAttrs.getBytes("utf8").length; + byte[] attrData = msgAttrs.getBytes(StandardCharsets.UTF_8); + totalLength = totalLength + body.length + attrData.length; buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength); buf.writeInt(totalLength); buf.writeByte(msgType); buf.writeInt(body.length); buf.writeBytes(body); - buf.writeInt(msgAttrs.getBytes("utf8").length); - buf.writeBytes(msgAttrs.getBytes("utf8")); + buf.writeInt(attrData.length); + buf.writeBytes(attrData); } } catch (Throwable ex) { if (exptCounter.shouldPrint()) { @@ -344,14 +347,15 @@ private ByteBuf writeToBuf3(EncodeObject object) { if (object.isEncrypt()) { msgType |= FLAG_ALLOW_ENCRYPT; } - totalLength = totalLength + body.length + msgAttrs.getBytes("utf8").length; + byte[] attrData = msgAttrs.getBytes(StandardCharsets.UTF_8); + totalLength = totalLength + body.length + attrData.length; buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength); buf.writeInt(totalLength); buf.writeByte(msgType); buf.writeInt(body.length); buf.writeBytes(body); - buf.writeInt(msgAttrs.getBytes("utf8").length); - buf.writeBytes(msgAttrs.getBytes("utf8")); + buf.writeInt(attrData.length); + buf.writeBytes(attrData); } } catch (Throwable ex) { if (exptCounter.shouldPrint()) { diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendResult.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendResult.java index adab601e0ac..f336702ee42 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendResult.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendResult.java @@ -18,8 +18,8 @@ package org.apache.inlong.sdk.dataproxy.common; public enum SendResult { - INVALID_ATTRIBUTES, // including DataProxyErrCode(100,101,102,112) OK, + INVALID_ATTRIBUTES, // including DataProxyErrCode(100,101,102,112) TIMEOUT, CONNECTION_BREAK, THREAD_INTERRUPT, diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/EncryptConfigEntry.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/EncryptConfigEntry.java index 6acfe09d8ab..47f6cd1ed73 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/EncryptConfigEntry.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/EncryptConfigEntry.java @@ -25,6 +25,7 @@ import java.net.URLEncoder; import java.security.interfaces.RSAPublicKey; +import java.util.Objects; import java.util.concurrent.atomic.AtomicLong; public class EncryptConfigEntry implements java.io.Serializable { @@ -122,7 +123,7 @@ public EncryptInfo getRsaEncryptInfo() { @Override public boolean equals(Object other) { - if (other == null || !(other instanceof EncryptConfigEntry)) { + if (!(other instanceof EncryptConfigEntry)) { return false; } if (other == this) { @@ -131,7 +132,7 @@ public boolean equals(Object other) { EncryptConfigEntry info = (EncryptConfigEntry) other; return (this.userName.equals(info.getUserName())) && (this.version.equals(info.getVersion())) - && (this.pubKey == info.getPubKey()); + && (Objects.equals(this.pubKey, info.getPubKey())); } public String toString() { diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/HostInfo.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/HostInfo.java index c93c872ea55..071cf4b6eb9 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/HostInfo.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/HostInfo.java @@ -25,10 +25,10 @@ public class HostInfo implements Comparable, java.io.Serializable { private final String hostName; private final int portNumber; - public HostInfo(String referenceName, String hostName, int portNumber) { - this.referenceName = referenceName; + public HostInfo(String hostName, int portNumber) { this.hostName = hostName; this.portNumber = portNumber; + this.referenceName = hostName + ":" + portNumber; } public String getReferenceName() { diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java index b10e5330376..d259cdff083 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java @@ -35,7 +35,6 @@ import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; -import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; import org.apache.http.Header; @@ -91,11 +90,10 @@ public class ProxyConfigManager extends Thread { public static final String APPLICATION_JSON = "application/json"; - private static final Logger LOGGER = LoggerFactory.getLogger(ProxyConfigManager.class); + private static final Logger logger = LoggerFactory.getLogger(ProxyConfigManager.class); private final ProxyClientConfig clientConfig; private final ClientMgr clientManager; private final ReentrantReadWriteLock rw = new ReentrantReadWriteLock(); - private final JsonParser jsonParser = new JsonParser(); private final Gson gson = new Gson(); private final HashRing hashRing = HashRing.getInstance(); private List proxyInfoList = new ArrayList(); @@ -104,8 +102,8 @@ public class ProxyConfigManager extends Thread { private String inlongGroupId; private String localMd5; private boolean bShutDown = false; - private long doworkTime = 0; - private EncryptConfigEntry userEncryConfigEntry; + private long lstUpdatedTime = 0; + private EncryptConfigEntry userEncryptConfigEntry; public ProxyConfigManager(final ProxyClientConfig configure, final ClientMgr clientManager) { this.clientConfig = configure; @@ -122,7 +120,7 @@ public void setInlongGroupId(String inlongGroupId) { } public void shutDown() { - LOGGER.info("Begin to shut down ProxyConfigManager!"); + logger.info("Begin to shut down ProxyConfigManager!"); bShutDown = true; } @@ -132,9 +130,9 @@ public void run() { try { doProxyEntryQueryWork(); updateEncryptConfigEntry(); - LOGGER.info("ProxyConf update!"); + logger.info("ProxyConf update!"); } catch (Throwable e) { - LOGGER.error("Refresh proxy ip list runs into exception {}, {}", e.toString(), e.getStackTrace()); + logger.error("Refresh proxy ip list runs into exception {}, {}", e.toString(), e.getStackTrace()); e.printStackTrace(); } @@ -147,13 +145,13 @@ public void run() { if (proxyUpdateIntervalSec > 5) { sleepTimeSec = proxyUpdateIntervalSec + random.nextInt() % (proxyUpdateIntervalSec / 5); } - LOGGER.info("sleep time {}", sleepTimeSec); + logger.info("sleep time {}", sleepTimeSec); Thread.sleep(sleepTimeSec * 1000); } catch (Throwable e2) { // } } - LOGGER.info("ProxyConfigManager worker existed!"); + logger.info("ProxyConfigManager worker existed!"); } /** @@ -170,11 +168,11 @@ private ProxyConfigEntry tryToReadCacheProxyEntry(String configCachePath) { if (diffTime < clientConfig.getMaxProxyCacheTimeInMs()) { JsonReader reader = new JsonReader(new FileReader(configCachePath)); ProxyConfigEntry proxyConfigEntry = gson.fromJson(reader, ProxyConfigEntry.class); - LOGGER.info("{} has a backup! {}", inlongGroupId, proxyConfigEntry); + logger.info("{} has a backup! {}", inlongGroupId, proxyConfigEntry); return proxyConfigEntry; } } catch (Exception ex) { - LOGGER.warn("try to read local cache, caught {}", ex.getMessage()); + logger.warn("try to read local cache, caught {}", ex.getMessage()); } finally { rw.readLock().unlock(); } @@ -189,13 +187,13 @@ private void tryToWriteCacheProxyEntry(ProxyConfigEntry entry, String configCach // try to create parent file.getParentFile().mkdirs(); } - LOGGER.info("try to write {}} to local cache {}", entry, configCachePath); + logger.info("try to write {}} to local cache {}", entry, configCachePath); FileWriter fileWriter = new FileWriter(configCachePath); gson.toJson(entry, fileWriter); fileWriter.flush(); fileWriter.close(); } catch (Exception ex) { - LOGGER.warn("try to write local cache, caught {}", ex.getMessage()); + logger.warn("try to write local cache, caught {}", ex.getMessage()); } finally { rw.writeLock().unlock(); } @@ -205,7 +203,7 @@ private ProxyConfigEntry requestProxyEntryQuietly() { try { return requestProxyList(this.clientConfig.getManagerUrl()); } catch (Exception e) { - LOGGER.warn("try to request proxy list by http, caught {}", e.getMessage()); + logger.warn("try to request proxy list by http, caught {}", e.getMessage()); } return null; } @@ -280,7 +278,7 @@ public void doProxyEntryQueryWork() throws Exception { } /* We should exit if no local IP list and can't request it from manager. */ if (localMd5 == null && proxyEntry == null) { - LOGGER.error("Can't connect manager at the start of proxy API {}", + logger.error("Can't connect manager at the start of proxy API {}", this.clientConfig.getManagerUrl()); proxyEntry = tryToReadCacheProxyEntry(configAddr); } @@ -290,7 +288,7 @@ public void doProxyEntryQueryWork() throws Exception { s.append(tmp.getHostName()).append(";").append(tmp.getPortNumber()) .append(","); } - LOGGER.warn("Backup proxyEntry [{}]", s); + logger.warn("Backup proxyEntry [{}]", s); } } if (localMd5 == null && proxyEntry == null && proxyInfoList == null) { @@ -312,7 +310,7 @@ public void doProxyEntryQueryWork() throws Exception { */ private void compareProxyList(ProxyConfigEntry proxyEntry) { if (proxyEntry != null) { - LOGGER.info("{}", proxyEntry.toString()); + logger.info("{}", proxyEntry.toString()); if (proxyEntry.getSize() != 0) { /* Initialize the current proxy information list first. */ clientManager.setLoadThreshold(proxyEntry.getLoad()); @@ -326,31 +324,31 @@ private void compareProxyList(ProxyConfigEntry proxyEntry) { String oldMd5 = calcHostInfoMd5(proxyInfoList); if (newMd5 != null && !newMd5.equals(oldMd5)) { /* Choose random alive connections to send messages. */ - LOGGER.info("old md5 {} new md5 {}", oldMd5, newMd5); + logger.info("old md5 {} new md5 {}", oldMd5, newMd5); proxyInfoList.clear(); proxyInfoList = newProxyInfoList; clientManager.setProxyInfoList(proxyInfoList); - doworkTime = System.currentTimeMillis(); + lstUpdatedTime = System.currentTimeMillis(); } else if (proxyEntry.getSwitchStat() != oldStat) { /* judge cluster's switch state */ oldStat = proxyEntry.getSwitchStat(); - if ((System.currentTimeMillis() - doworkTime) > 3 * 60 * 1000) { - LOGGER.info("switch the cluster!"); + if ((System.currentTimeMillis() - lstUpdatedTime) > 3 * 60 * 1000) { + logger.info("switch the cluster!"); proxyInfoList.clear(); proxyInfoList = newProxyInfoList; clientManager.setProxyInfoList(proxyInfoList); } else { - LOGGER.info("only change oldStat "); + logger.info("only change oldStat "); } } else { newProxyInfoList.clear(); - LOGGER.info("proxy IP list doesn't change, load {}", proxyEntry.getLoad()); + logger.info("proxy IP list doesn't change, load {}", proxyEntry.getLoad()); } if (clientConfig.getLoadBalance() == LoadBalance.CONSISTENCY_HASH) { updateHashRing(proxyInfoList); } } else { - LOGGER.error("proxyEntry's size is zero"); + logger.error("proxyEntry's size is zero"); } } } @@ -359,7 +357,7 @@ public EncryptConfigEntry getEncryptConfigEntry(final String userName) { if (StringUtils.isBlank(userName)) { return null; } - EncryptConfigEntry encryptEntry = this.userEncryConfigEntry; + EncryptConfigEntry encryptEntry = this.userEncryptConfigEntry; if (encryptEntry == null) { int retryCount = 0; encryptEntry = requestPubKey(this.clientConfig.getRsaPubKeyUrl(), userName, false); @@ -372,21 +370,21 @@ public EncryptConfigEntry getEncryptConfigEntry(final String userName) { if (encryptEntry != null) { encryptEntry.getRsaEncryptedKey(); synchronized (this) { - if (this.userEncryConfigEntry == null) { - this.userEncryConfigEntry = encryptEntry; + if (this.userEncryptConfigEntry == null) { + this.userEncryptConfigEntry = encryptEntry; } else { - encryptEntry = this.userEncryConfigEntry; + encryptEntry = this.userEncryptConfigEntry; } } } } else { synchronized (this) { - if (this.userEncryConfigEntry == null || this.userEncryConfigEntry != encryptEntry) { + if (this.userEncryptConfigEntry == null || this.userEncryptConfigEntry != encryptEntry) { storePubKeyEntry(encryptEntry); encryptEntry.getRsaEncryptedKey(); - this.userEncryConfigEntry = encryptEntry; + this.userEncryptConfigEntry = encryptEntry; } else { - encryptEntry = this.userEncryConfigEntry; + encryptEntry = this.userEncryptConfigEntry; } } } @@ -410,10 +408,10 @@ private void updateEncryptConfigEntry() { return; } synchronized (this) { - if (this.userEncryConfigEntry == null || this.userEncryConfigEntry != encryptConfigEntry) { + if (this.userEncryptConfigEntry == null || this.userEncryptConfigEntry != encryptConfigEntry) { storePubKeyEntry(encryptConfigEntry); encryptConfigEntry.getRsaEncryptedKey(); - this.userEncryConfigEntry = encryptConfigEntry; + this.userEncryptConfigEntry = encryptConfigEntry; } } return; @@ -421,7 +419,7 @@ private void updateEncryptConfigEntry() { private EncryptConfigEntry getStoredPubKeyEntry(String userName) { if (StringUtils.isBlank(userName)) { - LOGGER.warn(" userName(" + userName + ") is not available"); + logger.warn(" userName(" + userName + ") is not available"); return null; } EncryptConfigEntry entry; @@ -441,7 +439,7 @@ private EncryptConfigEntry getStoredPubKeyEntry(String userName) { return null; } } catch (Throwable e1) { - LOGGER.error("Read " + userName + " stored PubKeyEntry error ", e1); + logger.error("Read " + userName + " stored PubKeyEntry error ", e1); return null; } finally { if (fis != null) { @@ -473,7 +471,7 @@ private void storePubKeyEntry(EncryptConfigEntry entry) { p.flush(); // p.close(); } catch (Throwable e) { - LOGGER.error("store EncryptConfigEntry " + entry.toString() + " exception ", e); + logger.error("store EncryptConfigEntry " + entry.toString() + " exception ", e); e.printStackTrace(); } finally { if (fos != null) { @@ -508,7 +506,7 @@ private String calcHostInfoMd5(List hostInfoList) { private EncryptConfigEntry requestPubKey(String pubKeyUrl, String userName, boolean needGet) { if (StringUtils.isBlank(userName)) { - LOGGER.error("Queried userName is null!"); + logger.error("Queried userName is null!"); return null; } List params = new ArrayList(); @@ -516,26 +514,26 @@ private EncryptConfigEntry requestPubKey(String pubKeyUrl, String userName, bool params.add(new BasicNameValuePair("username", userName)); String returnStr = requestConfiguration(pubKeyUrl, params); if (StringUtils.isBlank(returnStr)) { - LOGGER.info("No public key information returned from manager"); + logger.info("No public key information returned from manager"); return null; } - JsonObject pubKeyConf = jsonParser.parse(returnStr).getAsJsonObject(); + JsonObject pubKeyConf = JsonParser.parseString(returnStr).getAsJsonObject(); if (pubKeyConf == null) { - LOGGER.info("No public key information returned from manager"); + logger.info("No public key information returned from manager"); return null; } if (!pubKeyConf.has("resultCode")) { - LOGGER.info("Parse pubKeyConf failure: No resultCode key information returned from manager"); + logger.info("Parse pubKeyConf failure: No resultCode key information returned from manager"); return null; } int resultCode = pubKeyConf.get("resultCode").getAsInt(); if (resultCode != 0) { - LOGGER.info("query pubKeyConf failure, error code is " + resultCode + ", errInfo is " + logger.info("query pubKeyConf failure, error code is " + resultCode + ", errInfo is " + pubKeyConf.get("message").getAsString()); return null; } if (!pubKeyConf.has("resultData")) { - LOGGER.info("Parse pubKeyConf failure: No resultData key information returned from manager"); + logger.info("Parse pubKeyConf failure: No resultData key information returned from manager"); return null; } JsonObject resultData = pubKeyConf.get("resultData").getAsJsonObject(); @@ -566,7 +564,7 @@ public ProxyConfigEntry getLocalProxyListFromFile(String filePath) throws Except throw new Exception("Read local proxyList File failure by " + filePath + ", reason is " + e.getCause()); } if (ObjectUtils.isEmpty(proxyCluster)) { - LOGGER.warn("no proxyCluster configure from local file"); + logger.warn("no proxyCluster configure from local file"); return null; } @@ -591,7 +589,7 @@ public ProxyConfigEntry requestProxyList(String url) { ArrayList params = new ArrayList(); params.add(new BasicNameValuePair("ip", IpUtils.getLocalIp())); params.add(new BasicNameValuePair("protocolType", clientConfig.getProtocolType())); - LOGGER.info("Begin to get configure from manager {}, param is {}", url, params); + logger.info("Begin to get configure from manager {}, param is {}", url, params); String resultStr = requestConfiguration(url, params); ProxyClusterConfig clusterConfig = gson.fromJson(resultStr, ProxyClusterConfig.class); @@ -606,7 +604,7 @@ public ProxyConfigEntry requestProxyList(String url) { private ProxyConfigEntry getProxyConfigEntry(DataProxyNodeResponse proxyCluster) { List nodeList = proxyCluster.getNodeList(); if (CollectionUtils.isEmpty(nodeList)) { - LOGGER.error("dataproxy nodeList is empty in DataProxyNodeResponse!"); + logger.error("dataproxy nodeList is empty in DataProxyNodeResponse!"); return null; } Map hostMap = formatHostInfoMap(nodeList); @@ -644,133 +642,73 @@ private ProxyConfigEntry getProxyConfigEntry(DataProxyNodeResponse proxyCluster) } private Map formatHostInfoMap(List nodeList) { + HostInfo tmpHostInfo; Map hostMap = new HashMap<>(); for (DataProxyNodeInfo proxy : nodeList) { if (ObjectUtils.isEmpty(proxy.getId()) || StringUtils.isEmpty(proxy.getIp()) || ObjectUtils .isEmpty(proxy.getPort()) || proxy.getPort() < 0) { - LOGGER.error("invalid proxy node, id:{}, ip:{}, port:{}", proxy.getId(), proxy.getIp(), + logger.error("invalid proxy node, id:{}, ip:{}, port:{}", proxy.getId(), proxy.getIp(), proxy.getPort()); continue; } - String refId = proxy.getIp() + ":" + proxy.getPort(); - hostMap.put(refId, new HostInfo(refId, proxy.getIp(), proxy.getPort())); + tmpHostInfo = new HostInfo(proxy.getIp(), proxy.getPort()); + hostMap.put(tmpHostInfo.getReferenceName(), tmpHostInfo); } if (hostMap.isEmpty()) { - LOGGER.error("Parse proxyList failure: address is empty for response from manager!"); + logger.error("Parse proxyList failure: address is empty for response from manager!"); return null; } return hostMap; } - private String updateUrl(String url, int tryIdx, String localManagerIpList) { - if (tryIdx == 0) { - return url; - } - - int headerIdx = url.indexOf("://"); - if (headerIdx == -1) { - return null; - } - String header = ""; - header = url.substring(0, headerIdx + 3); - String tmpUrl = url.substring(headerIdx + 3); - int tailerIdx = tmpUrl.indexOf("/"); - if (tailerIdx == -1) { - return null; - } - String tailer = ""; - tailer = tmpUrl.substring(tailerIdx); - String[] managerIps = localManagerIpList.split(","); - String currentManagerIp = ""; - int idx = 1; - for (String managerIp : managerIps) { - if (idx++ == tryIdx) { - currentManagerIp = managerIp; - break; - } - } - if (!currentManagerIp.equals("")) { - return header + currentManagerIp + ":" + clientConfig.getManagerPort() + tailer; - } - return null; - } - /* Request new configurations from Manager. */ private String requestConfiguration(String url, List params) { if (StringUtils.isBlank(url)) { - LOGGER.error("request url is null"); + logger.error("request url is null"); return null; } - // get local managerIpList - String localManagerIps = ""; - int tryIdx = 0; - while (true) { - HttpPost httpPost = null; - String returnStr = null; - HttpParams myParams = new BasicHttpParams(); - HttpConnectionParams.setConnectionTimeout(myParams, 10000); - HttpConnectionParams.setSoTimeout(myParams, clientConfig.getManagerSocketTimeout()); - CloseableHttpClient httpClient = null; - if (this.clientConfig.isRequestByHttp()) { - httpClient = new DefaultHttpClient(myParams); - } else { - try { - httpClient = getCloseableHttpClient(params); - } catch (Throwable eHttps) { - LOGGER.error("Create Https cliet failure, error 1 is ", eHttps); - eHttps.printStackTrace(); - return null; - } - } - - if (!clientConfig.isEnableSaveManagerVIps() && tryIdx > 0) { + HttpPost httpPost = null; + HttpParams myParams = new BasicHttpParams(); + HttpConnectionParams.setConnectionTimeout(myParams, 10000); + HttpConnectionParams.setSoTimeout(myParams, clientConfig.getManagerSocketTimeout()); + CloseableHttpClient httpClient; + if (this.clientConfig.isRequestByHttp()) { + httpClient = new DefaultHttpClient(myParams); + } else { + try { + httpClient = getCloseableHttpClient(params); + } catch (Throwable eHttps) { + logger.error("Create Https cliet failure, error 1 is ", eHttps); + eHttps.printStackTrace(); return null; } - // change url's manager host port when occur error - url = updateUrl(url, tryIdx, localManagerIps); - if (url == null) { - return null; + } + logger.info("Request url : {}, params : {}", url, params); + try { + httpPost = new HttpPost(url); + httpPost.addHeader(BasicAuth.BASIC_AUTH_HEADER, + BasicAuth.genBasicAuthCredential(clientConfig.getAuthSecretId(), + clientConfig.getAuthSecretKey())); + UrlEncodedFormEntity urlEncodedFormEntity = new UrlEncodedFormEntity(params, "UTF-8"); + httpPost.setEntity(urlEncodedFormEntity); + HttpResponse response = httpClient.execute(httpPost); + String returnStr = EntityUtils.toString(response.getEntity()); + if (StringUtils.isNotBlank(returnStr) + && response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { + logger.info("Get configure from manager is {}", returnStr); + return returnStr; } - tryIdx++; - - LOGGER.info("Request url : " + url + ", localManagerIps : " + localManagerIps); - try { - httpPost = new HttpPost(url); - httpPost.addHeader(BasicAuth.BASIC_AUTH_HEADER, - BasicAuth.genBasicAuthCredential(clientConfig.getAuthSecretId(), - clientConfig.getAuthSecretKey())); - UrlEncodedFormEntity urlEncodedFormEntity = new UrlEncodedFormEntity(params, "UTF-8"); - httpPost.setEntity(urlEncodedFormEntity); - HttpResponse response = httpClient.execute(httpPost); - returnStr = EntityUtils.toString(response.getEntity()); - if (StringUtils.isNotBlank(returnStr) - && response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { - LOGGER.info("Get configure from manager is " + returnStr); - return returnStr; - } - - if (!clientConfig.isRequestByHttp()) { - return null; - } - } catch (Throwable e) { - LOGGER.error("Connect Manager error, message: {}, url is {}", e.getMessage(), url); - - if (!clientConfig.isRequestByHttp()) { - return null; - } - // get localManagerIps - localManagerIps = getLocalManagerIps(); - if (localManagerIps == null) { - return null; - } - } finally { - if (httpPost != null) { - httpPost.releaseConnection(); - } - if (httpClient != null) { - httpClient.getConnectionManager().shutdown(); - } + return null; + } catch (Throwable e) { + logger.error("Connect Manager error, message: {}, url is {}", e.getMessage(), url); + return null; + } finally { + if (httpPost != null) { + httpPost.releaseConnection(); + } + if (httpClient != null) { + httpClient.getConnectionManager().shutdown(); } } } @@ -803,36 +741,8 @@ private CloseableHttpClient getCloseableHttpClient(List para return httpClient; } - private String getLocalManagerIps() { - String localManagerIps; - try { - File localManagerIpsFile = new File(clientConfig.getManagerIpLocalPath()); - if (localManagerIpsFile.exists()) { - byte[] serialized; - serialized = FileUtils.readFileToByteArray(localManagerIpsFile); - if (serialized == null) { - LOGGER.error("Local managerIp file is empty, file path : " - + clientConfig.getManagerIpLocalPath()); - return null; - } - localManagerIps = new String(serialized, "UTF-8"); - } else { - if (!localManagerIpsFile.getParentFile().exists()) { - localManagerIpsFile.getParentFile().mkdirs(); - } - localManagerIps = ""; - LOGGER.error("Get local managerIpList not exist, file path : " - + clientConfig.getManagerIpLocalPath()); - } - } catch (Throwable t) { - localManagerIps = ""; - LOGGER.error("Get local managerIpList occur exception,", t); - } - return localManagerIps; - } - public void updateHashRing(List newHosts) { this.hashRing.updateNode(newHosts); - LOGGER.debug("update hash ring {}", hashRing.getVirtualNode2RealNode()); + logger.debug("update hash ring {}", hashRing.getVirtualNode2RealNode()); } } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SyncMessageCallable.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SyncMessageCallable.java index 8e00ff8c3ed..2f75226925a 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SyncMessageCallable.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SyncMessageCallable.java @@ -19,6 +19,7 @@ import org.apache.inlong.sdk.dataproxy.codec.EncodeObject; import org.apache.inlong.sdk.dataproxy.common.SendResult; +import org.apache.inlong.sdk.dataproxy.utils.LogCounter; import io.netty.channel.ChannelFuture; import org.slf4j.Logger; @@ -30,8 +31,8 @@ public class SyncMessageCallable implements Callable { - private static final Logger logger = LoggerFactory - .getLogger(SyncMessageCallable.class); + private static final Logger logger = LoggerFactory.getLogger(SyncMessageCallable.class); + private static final LogCounter exptCnt = new LogCounter(10, 100000, 60 * 1000L); private final NettyClient client; private final CountDownLatch awaitLatch = new CountDownLatch(1); @@ -55,13 +56,13 @@ public void update(SendResult message) { } public SendResult call() throws Exception { - // TODO Auto-generated method stub try { ChannelFuture channelFuture = client.write(encodeObject); awaitLatch.await(timeout, timeUnit); - } catch (Exception e) { - logger.error("SendResult call", e); - e.printStackTrace(); + } catch (Throwable ex) { + if (exptCnt.shouldPrint()) { + logger.warn("SyncMessageCallable write data throw exception", ex); + } return SendResult.UNKOWN_ERROR; } return message; diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/TimeScanObject.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/TimeScanObject.java index 01d36c23fbb..19291e4336d 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/TimeScanObject.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/TimeScanObject.java @@ -22,8 +22,8 @@ public class TimeScanObject { - private AtomicInteger count = new AtomicInteger(0); - private AtomicLong time = new AtomicLong(0); + private final AtomicInteger count = new AtomicInteger(0); + private final AtomicLong time = new AtomicLong(0); public TimeScanObject() { this.count.set(0); diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/ManagerFetcherThread.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/ManagerFetcherThread.java deleted file mode 100644 index 1424c3cb8f4..00000000000 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/ManagerFetcherThread.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sdk.dataproxy.threads; - -import org.apache.inlong.sdk.dataproxy.ProxyClientConfig; -import org.apache.inlong.sdk.dataproxy.utils.ServiceDiscoveryUtils; - -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.TimeUnit; - -/** - * manager fetch thread - */ -public class ManagerFetcherThread extends Thread { - - private final Logger logger = LoggerFactory.getLogger(ManagerFetcherThread.class); - private volatile boolean isShutdown; - private final ProxyClientConfig proxyClientConfig; - - public ManagerFetcherThread(ProxyClientConfig proxyClientConfig) { - isShutdown = false; - this.proxyClientConfig = proxyClientConfig; - this.setDaemon(true); - this.setName("ManagerFetcherThread"); - } - - public void shutdown() { - logger.info("Begin to shutdown ManagerFetcherThread."); - isShutdown = true; - } - - @Override - public void run() { - logger.info("ManagerFetcherThread Thread=" + Thread.currentThread().getId() + " started !"); - while (!isShutdown) { - try { - String managerIpList = ServiceDiscoveryUtils.getManagerIpList(proxyClientConfig); - if (StringUtils.isBlank(managerIpList)) { - logger.error("ManagerFetcher get managerIpList is blank."); - } else { - ServiceDiscoveryUtils.updateManagerInfo2Local(managerIpList, - proxyClientConfig.getManagerIpLocalPath()); - } - TimeUnit.MILLISECONDS.sleep((long) proxyClientConfig.getProxyUpdateIntervalMinutes() * 60 * 1000); - } catch (Throwable e) { - logger.error("ManagerFetcher get or save managerIpList occur error,", e); - } - } - } -} \ No newline at end of file diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/EventLoopUtil.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/EventLoopUtil.java index bf26d3fc598..eb99c43b9eb 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/EventLoopUtil.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/EventLoopUtil.java @@ -53,12 +53,11 @@ public static EventLoopGroup newEventLoopGroup(int nThreads, boolean enableBusyW } else if (!enableBusyWait) { return new EpollEventLoopGroup(nThreads, threadFactory); } else { - EpollEventLoopGroup eventLoopGroup = new EpollEventLoopGroup(nThreads, threadFactory, () -> { + return new EpollEventLoopGroup(nThreads, threadFactory, () -> { return (selectSupplier, hasTasks) -> { return -3; }; }); - return eventLoopGroup; } } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ServiceDiscoveryUtils.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ServiceDiscoveryUtils.java deleted file mode 100644 index 6352e253088..00000000000 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ServiceDiscoveryUtils.java +++ /dev/null @@ -1,279 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sdk.dataproxy.utils; - -import org.apache.inlong.common.util.BasicAuth; -import org.apache.inlong.sdk.dataproxy.ProxyClientConfig; -import org.apache.inlong.sdk.dataproxy.network.IpUtils; - -import com.google.gson.JsonArray; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.http.Header; -import org.apache.http.HttpResponse; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.client.entity.UrlEncodedFormEntity; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.conn.ssl.SSLConnectionSocketFactory; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.DefaultHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.apache.http.message.BasicHeader; -import org.apache.http.message.BasicNameValuePair; -import org.apache.http.params.BasicHttpParams; -import org.apache.http.params.HttpConnectionParams; -import org.apache.http.params.HttpParams; -import org.apache.http.ssl.SSLContexts; -import org.apache.http.util.EntityUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.net.ssl.SSLContext; - -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.nio.charset.StandardCharsets; -import java.security.SecureRandom; -import java.util.ArrayList; -import java.util.List; - -/** - * Utils for service discovery - */ -public class ServiceDiscoveryUtils { - - private static final Logger log = LoggerFactory.getLogger(ServiceDiscoveryUtils.class); - - private static final String GET_MANAGER_IP_LIST_API = "/inlong/manager/openapi/agent/getManagerIpList"; - private static String latestManagerIPList = ""; - private static String arraySed = ","; - - /** - * Get Inlong-Manager IP list from the given proxy client config - */ - public static String getManagerIpList(ProxyClientConfig clientConfig) { - String managerAddress = clientConfig.getManagerAddress(); - if (StringUtils.isBlank(managerAddress)) { - log.error("ServiceDiscovery get managerIpList but managerAddress is blank, just return"); - return null; - } - - String managerIpList = getManagerIpListByHttp(managerAddress, clientConfig); - if (StringUtils.isNotBlank(managerIpList)) { - latestManagerIPList = managerIpList; - return managerIpList; - } - - log.error("ServiceDiscovery get managerIpList from {} occur error, try to get from latestManagerIPList", - managerAddress); - - String[] managerIps = latestManagerIPList.split(arraySed); - if (managerIps.length > 0) { - for (String managerIp : managerIps) { - if (StringUtils.isBlank(managerIp)) { - log.error("ServiceDiscovery managerIp is null, latestManagerIPList is {}", latestManagerIPList); - continue; - } - - String currentAddress = managerIp + ":" + clientConfig.getManagerPort(); - managerIpList = getManagerIpListByHttp(currentAddress, clientConfig); - if (StringUtils.isBlank(managerIpList)) { - log.error("ServiceDiscovery get latestManagerIPList from {} but got nothing, will try next ip", - managerIp); - continue; - } - latestManagerIPList = managerIpList; - return managerIpList; - } - } else { - log.error("ServiceDiscovery latestManagerIpList {} format error, or not contain ip", latestManagerIPList); - } - - String existedIpList = getLocalManagerIpList(clientConfig.getManagerIpLocalPath()); - if (StringUtils.isNotBlank(existedIpList)) { - String[] existedIps = existedIpList.split(arraySed); - if (existedIps.length > 0) { - for (String existedIp : existedIps) { - if (StringUtils.isBlank(existedIp)) { - log.error("ServiceDiscovery get illegal format ipList from local file, " - + "exist ip is empty, managerIpList is {}, local file is {}", - existedIpList, clientConfig.getManagerIpLocalPath()); - continue; - } - - String currentAddress = existedIp + ":" + clientConfig.getManagerPort(); - managerIpList = getManagerIpListByHttp(currentAddress, clientConfig); - if (StringUtils.isBlank(managerIpList)) { - log.error("ServiceDiscovery get {} from local file {} but got nothing, will try next ip", - existedIp, clientConfig.getManagerIpLocalPath()); - continue; - } - latestManagerIPList = managerIpList; - return managerIpList; - } - } else { - log.error("ServiceDiscovery get illegal format ipList from local file, " - + "exist ip is empty, managerIpList is {}, local file is {}", - existedIpList, clientConfig.getManagerIpLocalPath()); - } - } else { - log.error("ServiceDiscovery get empty ipList from local file {}", clientConfig.getManagerIpLocalPath()); - } - - return managerIpList; - } - - /** - * Get Inlong-Manager IP list from the given managerIp and proxy client config - */ - public static String getManagerIpListByHttp(String managerIp, ProxyClientConfig proxyClientConfig) { - String url = managerIp + GET_MANAGER_IP_LIST_API; - ArrayList params = new ArrayList(); - params.add(new BasicNameValuePair("operation", "query")); - params.add(new BasicNameValuePair("username", proxyClientConfig.getUserName())); - - log.info("Begin to get configure from manager {}, param is {}", url, params); - CloseableHttpClient httpClient; - HttpParams myParams = new BasicHttpParams(); - HttpConnectionParams.setConnectionTimeout(myParams, proxyClientConfig.getManagerConnectionTimeout()); - HttpConnectionParams.setSoTimeout(myParams, proxyClientConfig.getManagerSocketTimeout()); - if (proxyClientConfig.isRequestByHttp()) { - httpClient = new DefaultHttpClient(myParams); - } else { - try { - ArrayList
headers = new ArrayList<>(); - for (BasicNameValuePair paramItem : params) { - headers.add(new BasicHeader(paramItem.getName(), paramItem.getValue())); - } - RequestConfig requestConfig = RequestConfig.custom() - .setConnectTimeout(10000).setSocketTimeout(30000).build(); - SSLContext sslContext = SSLContexts.custom().build(); - SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslContext, - new String[]{"TLSv1"}, null, - SSLConnectionSocketFactory.getDefaultHostnameVerifier()); - httpClient = HttpClients.custom().setDefaultHeaders(headers) - .setDefaultRequestConfig(requestConfig).setSSLSocketFactory(sslsf).build(); - } catch (Throwable t) { - log.error("Create Https client failed: ", t); - return null; - } - } - - HttpPost httpPost = null; - try { - httpPost = new HttpPost(url); - if (proxyClientConfig.isNeedAuthentication()) { - long timestamp = System.currentTimeMillis(); - int nonce = new SecureRandom(String.valueOf(timestamp).getBytes()).nextInt(Integer.MAX_VALUE); - httpPost.setHeader(BasicAuth.BASIC_AUTH_HEADER, - IpUtils.getAuthorizenInfo(proxyClientConfig.getUserName(), - proxyClientConfig.getSecretKey(), timestamp, nonce)); - } - httpPost.setEntity(new UrlEncodedFormEntity(params)); - HttpResponse response = httpClient.execute(httpPost); - String returnStr = EntityUtils.toString(response.getEntity()); - if (StringUtils.isNotBlank(returnStr) && response.getStatusLine().getStatusCode() == 200) { - log.info("Get configure from manager is " + returnStr); - JsonParser jsonParser = new JsonParser(); - JsonObject jb = jsonParser.parse(returnStr).getAsJsonObject(); - if (jb == null) { - log.warn("ServiceDiscovery updated manager ip failed, returnStr = {} jb is " - + "null ", returnStr, jb); - return null; - } - JsonArray retData = jb.get("data").getAsJsonArray(); - List managerIpList = new ArrayList<>(); - for (JsonElement datum : retData) { - JsonObject record = datum.getAsJsonObject(); - managerIpList.add(record.get("ip").getAsString()); - } - if (managerIpList.isEmpty()) { - return null; - } - String strIPs = String.join(",", managerIpList); - log.info("ServiceDiscovery updated manager ip success, ip : " + strIPs + ", retStr : " + returnStr); - return strIPs; - } - return null; - } catch (Throwable t) { - log.error("Connect Manager error: ", t); - return null; - } finally { - if (httpPost != null) { - httpPost.releaseConnection(); - } - if (httpClient != null) { - httpClient.getConnectionManager().shutdown(); - } - } - } - - /** - * Get Inlong-Manager IP list from local path - */ - public static String getLocalManagerIpList(String localPath) { - log.info("ServiceDiscovery start loading config from file {} ...", localPath); - String newestIp = null; - try { - File managerIpListFile = new File(localPath); - if (!managerIpListFile.exists()) { - log.info("ServiceDiscovery not found local groupIdInfo file from {}", localPath); - return null; - } - byte[] serialized = FileUtils.readFileToByteArray(managerIpListFile); - if (serialized == null) { - return null; - } - newestIp = new String(serialized, StandardCharsets.UTF_8); - log.info("ServiceDiscovery get manager ip list from local success, result is: {}", newestIp); - } catch (IOException e) { - log.error("ServiceDiscovery load manager config error: ", e); - } - - return newestIp; - } - - /** - * Update Inlong-Manager info to local file - */ - public static void updateManagerInfo2Local(String storeString, String path) { - if (StringUtils.isBlank(storeString)) { - log.warn("ServiceDiscovery updateTdmInfo2Local error, configMap is empty or managerIpList is blank"); - return; - } - File localPath = new File(path); - if (!localPath.getParentFile().exists()) { - localPath.getParentFile().mkdirs(); - } - - try (BufferedWriter writer = new BufferedWriter( - new OutputStreamWriter(new FileOutputStream(localPath), StandardCharsets.UTF_8))) { - writer.write(storeString); - writer.flush(); - } catch (IOException e) { - log.error("ServiceDiscovery save manager config error: ", e); - } - } - -}