Skip to content

Commit

Permalink
#106 【search】RoutingCache 的日志不够精准 [vip:platform/pallas#942]
Browse files Browse the repository at this point in the history
  • Loading branch information
owenericsson committed Jul 26, 2019
1 parent c09b2e5 commit fcdde49
Showing 1 changed file with 69 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ public class RoutingCache extends AbstractCache<String, Map<String, Object>> {
public static final String INDEX_NODE_LIST = "INDEX_NODE_LIST";
public static final String CLUSTER_NODE_LIST = "CLUSTER_NODE_LIST";
public static final String SHARD_NODE_LIST = "SHARD_NODE_LIST";
public static final String SHARD_NODE_LIST_RAW = "SHARD_NODE_LIST_RAW";
public static final String ALIASE_INDEX_MAP = "ALIASE_INDEX_MAP"; //索引别名
public static final String ALIASE_INDEX_MAP_RAW = "ALIASE_INDEX_MAP_RAW"; //索引别名
public static final String INDEXLEVEL_ROUTING = "INDEXLEVEL_ROUTING";
public static final String CLUSTERLEVEL_ROUTING = "CLUSTERLEVEL_ROUTING";
public static final String INDEX_TARGET_GROUP = "INDEX_TARGET_GROUP";
Expand Down Expand Up @@ -213,16 +215,21 @@ private void cacheNode(Map<String, Object> cacheMap) throws Exception {
if (nodeList != null) {
nodeList.removeAll(elasticSearchService.getExcludeNodeList(cluster.getHttpAddress()));
nodeList.removeAll(getAbnormalNodeList(cluster.getClusterId()));
return Optional.ofNullable(nodeList).orElseGet(Collections::emptyList);
if (nodeList.isEmpty()) {
LogUtils.warn(LOGGER, SearchLogEvent.ROUTING_EVENT, "cluster {} is empty after getExcludeNodeList.", cluster.getClusterId());
}
return nodeList;
}
// if sth. goes wrong, keep the former cache rather than the null value.
LogUtils.info(LOGGER, SearchLogEvent.ROUTING_EVENT, "getNodeList by {} returns null, keep the former values in cache.",
cluster.getHttpAddress());
Map<String, List<String>> clusterNodeListMapInCache = (Map<String, List<String>>) cacheMap
.get(CLUSTER_NODE_LIST);
Map<String, List<String>> clusterNodeListMapInCache = (Map<String, List<String>>) cacheMap.get(CLUSTER_NODE_LIST);
LogUtils.warn(LOGGER, SearchLogEvent.ROUTING_EVENT, "getNodeList by {} returns null, keep the former values in clusterNodeListMapInCache",
cluster.getClusterId(), clusterNodeListMapInCache);
if (clusterNodeListMapInCache != null) {
List<String> nodeListInCache = clusterNodeListMapInCache.get(cluster.getClusterId());
return nodeListInCache == null ? emptyList() : nodeListInCache;
List<String> nodeListInCache = clusterNodeListMapInCache.getOrDefault(cluster.getClusterId(), emptyList());
if (nodeListInCache.isEmpty()) {
LogUtils.warn(LOGGER, SearchLogEvent.ROUTING_EVENT, "cluster {} is empty in clusterNodeListMapInCache.", cluster.getClusterId());
}
return nodeListInCache;
}
return emptyList();
}
Expand All @@ -236,46 +243,70 @@ private void cacheNode(Map<String, Object> cacheMap) throws Exception {
.collect(toList()).stream().flatMap(Collection::stream).collect(toList())
));

//clusterId -> shardNodeListMap
Map<String, Map<String, List<String>>> clusterShardNodeListMapRaw = clusterList.stream().collect(toMap(Cluster::getClusterId, cluster -> {
List<String[]> shardNodeList = elasticSearchService.getIndexAndNodes(cluster.getHttpAddress());
if (shardNodeList != null) {
shardNodeList.removeAll(elasticSearchService.getExcludeNodeList(cluster.getHttpAddress()));
shardNodeList.removeAll(getAbnormalNodeList(cluster.getClusterId()));
return Optional
.ofNullable(shardNodeList.stream()
.collect(groupingBy(t -> t[0], mapping(t -> t[1], toList()))))
.orElseGet(Collections::emptyMap);
}
// if sth. goes wrong, keep the former cache rather than the null value.
Map<String, Map<String, List<String>>> clusterShardNodeListMapRawInCache = (Map<String, Map<String, List<String>>>) cacheMap
.get(SHARD_NODE_LIST_RAW);
LogUtils.warn(LOGGER, SearchLogEvent.ROUTING_EVENT, "getShards by {} returns null, keep the former values in clusterShardNodeListMapRawInCache",
cluster.getClusterId(), clusterShardNodeListMapRawInCache);

if (clusterShardNodeListMapRawInCache != null) {
Map<String, List<String>> shardNodeListInCache = clusterShardNodeListMapRawInCache.getOrDefault(cluster.getClusterId(), emptyMap());
if (shardNodeListInCache.isEmpty()) {
LogUtils.warn(LOGGER, SearchLogEvent.ROUTING_EVENT, "cluster {} is empty in clusterNodeListMapRawInCache.", cluster.getClusterId());
}
return shardNodeListInCache;
}
return emptyMap();
}));


// index -> clusterId -> shardNodeListMap
Map<String, Map<String, List<String>>> clusterShardNodeListMap = reverseMapKey(
clusterList.stream().collect(toMap(Cluster::getClusterId, cluster -> {
List<String[]> shardNodeList = elasticSearchService.getIndexAndNodes(cluster.getHttpAddress());
if (shardNodeList != null) {
shardNodeList.removeAll(elasticSearchService.getExcludeNodeList(cluster.getHttpAddress()));
shardNodeList.removeAll(getAbnormalNodeList(cluster.getClusterId()));
return Optional
.ofNullable(shardNodeList.stream()
.collect(groupingBy(t -> t[0], mapping(t -> t[1], toList()))))
.orElseGet(Collections::emptyMap);
}
// if sth. goes wrong, keep the former cache rather than the null value.
LogUtils.info(LOGGER, SearchLogEvent.ROUTING_EVENT, "getShards by {} returns null, keep the former values in cache.",
cluster.getHttpAddress());
Map<String, Map<String, List<String>>> clusterShardNodeListMapInCache = (Map<String, Map<String, List<String>>>) cacheMap
.get(SHARD_NODE_LIST);
if (clusterShardNodeListMapInCache != null) {
Map<String, List<String>> shardNodeListInCache = clusterShardNodeListMapInCache
.get(cluster.getClusterId());
return shardNodeListInCache == null ? emptyMap() : shardNodeListInCache;
}
return emptyMap();
})));
Map<String, Map<String, List<String>>> clusterShardNodeListMap = reverseMapKey(clusterShardNodeListMapRaw);


//aliaseIndex -> clusterId -> indexList
Map<String, Map<String, List<String>>> clusterAliaseIndexMap = reverseMapKey(clusterList.stream().collect(toMap(
cluster -> cluster.getClusterId(),
cluster -> {
List<String[]> aliasesList = elasticSearchService.getActualIndexs(cluster.getHttpAddress());
return aliasesList != null ? Optional.ofNullable(aliasesList.stream().
collect(groupingBy(i -> i[0], mapping(i -> i[1], toList())))).orElseGet(Collections::emptyMap) : emptyMap();
}
)));
Map<String, Map<String, List<String>>> clusterAliaseIndexMapRaw = clusterList.stream().collect(toMap(
Cluster::getClusterId,
cluster -> {
List<String[]> aliasesList = elasticSearchService.getActualIndexs(cluster.getHttpAddress());
if (aliasesList == null || aliasesList.isEmpty()) {
LogUtils.warn(LOGGER, SearchLogEvent.ROUTING_EVENT, "aliasesList by {} returns null, keep the former values in clusterAliaseIndexMapCache",
cluster.getClusterId());
Map<String, Map<String, List<String>>> clusterAliaseIndexMapCache = (Map<String, Map<String, List<String>>>) cacheMap
.get(ALIASE_INDEX_MAP_RAW);

Map<String, List<String>> cache = clusterAliaseIndexMapCache.getOrDefault(cluster.getClusterId(), emptyMap());
if (cache.isEmpty()) {
LogUtils.warn(LOGGER, SearchLogEvent.ROUTING_EVENT, "cluster {} is empty in clusterAliaseIndexMapCache.", cluster.getClusterId());
return cache;
}
}
return aliasesList != null ? Optional.ofNullable(aliasesList.stream().
collect(groupingBy(i -> i[0], mapping(i -> i[1], toList())))).orElseGet(Collections::emptyMap) : emptyMap();
}
));

//aliaseIndex -> clusterId -> indexList
Map<String, Map<String, List<String>>> clusterAliaseIndexMap = reverseMapKey(clusterAliaseIndexMapRaw);

cacheMap.put(INDEX_NODE_LIST, indexNodeListMap);
cacheMap.put(CLUSTER_NODE_LIST, clusterNodeListMap);
cacheMap.put(SHARD_NODE_LIST_RAW, clusterShardNodeListMapRaw);
cacheMap.put(SHARD_NODE_LIST, clusterShardNodeListMap);
cacheMap.put(INDEX_CLUSTER_PORT, indexClusterPortMap);
cacheMap.put(CLUSTER_PORT, clusterPortMap);
cacheMap.put(ALIASE_INDEX_MAP_RAW, clusterAliaseIndexMapRaw);
cacheMap.put(ALIASE_INDEX_MAP, clusterAliaseIndexMap);
cacheMap.put(INDEX_CLUSTER_MAP, indexClusterMap);
}
Expand Down

0 comments on commit fcdde49

Please sign in to comment.