Skip to content

Commit

Permalink
[INLONG-8819][DataProxy] Optimize ConfigHolder related subclass loadi…
Browse files Browse the repository at this point in the history
…ng processing (apache#8820)
  • Loading branch information
gosonzhang authored Aug 30, 2023
1 parent 49cbf96 commit a45ecb1
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,14 @@ public abstract class ConfigHolder {
private final AtomicBoolean fileChanged = new AtomicBoolean(false);
// list of callbacks for this holder
private final List<ConfigUpdateCallback> callbackList = new ArrayList<>();
private long lastModifyTime;
private long lastModifyTime = 0;
private String filePath;
private File configFile;

public ConfigHolder(String fileName) {
this.fileName = fileName;
setFilePath(fileName);
CONFIG_HOLDER_LIST.add(this);
if (configFile != null) {
this.lastModifyTime = configFile.lastModified();
}
}

/**
Expand All @@ -77,7 +74,7 @@ public void executeCallbacks() {
/**
* load from file to holder
*
* @return - true if configure updated
* @return - true if the configure file read, otherwise it will be false.
*/
protected abstract boolean loadFromFileToHolder();

Expand All @@ -89,11 +86,22 @@ public void executeCallbacks() {
public boolean checkAndUpdateHolder() {
if (fileChanged.compareAndSet(true, false)
|| (configFile != null && configFile.lastModified() != this.lastModifyTime)) {
if (configFile != null) {
this.lastModifyTime = configFile.lastModified();
long startTime = System.currentTimeMillis();
if (loadFromFileToHolder()) {
boolean initialized = (this.lastModifyTime != 0L);
if (configFile != null) {
this.lastModifyTime = configFile.lastModified();
}
if (initialized) {
LOG.info("File {} has changed, reload from local file, wast {} ms",
this.fileName, (System.currentTimeMillis() - startTime));
} else {
LOG.info("File {} has imported, reload from local file, wast {} ms",
this.fileName, (System.currentTimeMillis() - startTime));
}
return true;
}
LOG.info("File {} has changed, reload from local file", this.fileName);
return loadFromFileToHolder();
LOG.warn("File {} has changed, but reload content failure", this.fileName);
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public boolean updateConfigMap(String inDataMd5, String inDataJsonStr) {
getFileName(), System.currentTimeMillis() - this.lastSyncVersion.get());
return false;
} else {
if (inDataMd5.equalsIgnoreCase(dataMd5)) {
if (inDataMd5.equals(dataMd5)) {
return false;
}
}
Expand Down Expand Up @@ -223,36 +223,42 @@ public Set<String> getAllTopicName() {

@Override
protected boolean loadFromFileToHolder() {
// check meta update setting
if (!CommonConfigHolder.getInstance().isEnableStartupUsingLocalMetaFile()
&& !ConfigManager.handshakeManagerOk.get()) {
LOG.warn("Failed to load json config from {}, don't obtain metadata from the Manager,"
+ " and the startup via the cache file is false", getFileName());
return false;
}
String jsonString = "";
readWriteLock.readLock().lock();
try {
jsonString = loadConfigFromFile();
if (StringUtils.isBlank(jsonString)) {
LOG.info("Load changed json {}, but no records configured", getFileName());
return false;
LOG.warn("Load changed json {}, but no records configured", getFileName());
return true;
}
DataProxyConfigResponse metaConfig =
GSON.fromJson(jsonString, DataProxyConfigResponse.class);
if (!metaConfig.isResult() || metaConfig.getErrCode() != DataProxyConfigResponse.SUCC) {
LOG.warn("Load failed json config from {}, error code is {}",
getFileName(), metaConfig.getErrCode());
return false;
return true;
}
DataProxyCluster clusterObj = metaConfig.getData();
if (clusterObj == null) {
LOG.warn("Load failed json config from {}, malformed content, data is null", getFileName());
return false;
}
if (!CommonConfigHolder.getInstance().isEnableStartupUsingLocalMetaFile()
&& !ConfigManager.handshakeManagerOk.get()) {
LOG.info("Failed to load json config from {}, don't obtain metadata from the Manager,"
+ " and the startup via the cache file is false", getFileName());
return false;
return true;
}
// update cache data
if (updateCacheData(clusterObj)) {
// update cache string
synchronized (this.lastUpdVersion) {
boolean updated;
synchronized (this.lastUpdVersion) {
if (metaConfig.getMd5().equals(this.dataMd5)) {
LOG.warn("Load json config from {}, configure md5 not changed!", getFileName());
return true;
}
updated = updateCacheData(clusterObj);
if (updated) {
if (this.lastSyncVersion.get() == 0) {
this.lastUpdVersion.set(System.currentTimeMillis());
this.lastSyncVersion.compareAndSet(0, this.lastUpdVersion.get());
Expand All @@ -262,14 +268,12 @@ protected boolean loadFromFileToHolder() {
this.dataMd5 = metaConfig.getMd5();
this.dataStr = jsonString;
}
LOG.info(
"Load changed {}, loaded dataMd5={}, data={}, id2TopicSrcMap={}, mqClusterMap={}, id2TopicSinkMap={}",
getFileName(), dataMd5, dataStr, id2TopicSrcMap, mqClusterMap, id2TopicSinkMap);
return true;
}
return false;
if (updated) {
LOG.info("Load changed {} file success!", getFileName());
}
return true;
} catch (Throwable e) {
//
LOG.warn("Process json {} changed data {} failure", getFileName(), jsonString, e);
return false;
} finally {
Expand Down Expand Up @@ -324,9 +328,6 @@ private boolean updateCacheData(DataProxyCluster metaConfigObj) {
// get topic config info
Map<String, IdTopicConfig> tmpTopicConfigMap = buildCacheTopicConfig(mqType, inLongIds);
replaceCacheConfig(mqType, tmpClusterConfigMap, tmpTopicConfigMap);
if (mqType.equals(CacheType.TUBE)) {
executeCallbacks();
}
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,15 @@ protected boolean loadFromFileToHolder() {
try {
Map<String, String> loadMap = loadConfigFromFile();
if (loadMap == null || loadMap.isEmpty()) {
LOG.debug("Load changed properties {}, but no records configured", getFileName());
return false;
LOG.warn("Load changed properties {}, but no records configured", getFileName());
return true;
}
// filter blank items
Map<String, String> filteredMap = filterInValidRecords(loadMap);
if (filteredMap.isEmpty()) {
LOG.info("Load changed properties {}, but the records are all illegal {}",
LOG.warn("Load changed properties {}, but the records are all illegal {}",
getFileName(), loadMap);
return false;
return true;
}
// remove records
Set<String> rmvKeys = new HashSet<>();
Expand All @@ -116,13 +116,14 @@ protected boolean loadFromFileToHolder() {
}
}
if (rmvKeys.isEmpty() && repKeys.isEmpty()) {
return false;
LOG.warn("Load changed properties {}, but no add or delete records", getFileName());
return true;
}
// update cache data
boolean result = updateCacheData();
// output update result
LOG.info("Load changed properties {}, loaded config {}, updated holder {}, updated cache {}",
getFileName(), loadMap, confHolder, result);
LOG.info("Load changed properties {}, deleted_record = {}, updated_record = {}, updated_cache = {}",
getFileName(), rmvKeys.isEmpty(), repKeys.isEmpty(), result);
return true;
} finally {
readWriteLock.readLock().unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ protected boolean loadFromFileToHolder() {
try {
Map<String, Long> tmpHolder = loadFile();
if (tmpHolder == null) {
return false;
return true;
}
// clear removed records
boolean added = false;
Expand Down Expand Up @@ -126,14 +126,23 @@ protected boolean loadFromFileToHolder() {
added = true;
}
}
if (!tmpKeys.isEmpty()) {
if (this.isBlackList) {
// output load result
if (this.isBlackList) {
if (!tmpKeys.isEmpty()) {
LOG.warn("Load BlackList data error, found error data items: " + tmpKeys);
} else {
}
if (added) {
LOG.info("Load BlackList data, new data items are added!");
}
} else {
if (!tmpKeys.isEmpty()) {
LOG.warn("Load WhiteList data error, found error data items: " + tmpKeys);
}
if (removed) {
LOG.info("Load WhiteList data, cached data items are deleted!");
}
}
return (isBlackList && added) || (!isBlackList && removed);
return true;
} finally {
readWriteLock.writeLock().unlock();
}
Expand Down

0 comments on commit a45ecb1

Please sign in to comment.