diff --git a/pallas-search/src/main/java/com/vip/pallas/search/cache/RoutingCache.java b/pallas-search/src/main/java/com/vip/pallas/search/cache/RoutingCache.java index 65aaf2dc..a749d022 100644 --- a/pallas-search/src/main/java/com/vip/pallas/search/cache/RoutingCache.java +++ b/pallas-search/src/main/java/com/vip/pallas/search/cache/RoutingCache.java @@ -50,7 +50,9 @@ public class RoutingCache extends AbstractCache> { public static final String INDEX_NODE_LIST = "INDEX_NODE_LIST"; public static final String CLUSTER_NODE_LIST = "CLUSTER_NODE_LIST"; public static final String SHARD_NODE_LIST = "SHARD_NODE_LIST"; + public static final String SHARD_NODE_LIST_RAW = "SHARD_NODE_LIST_RAW"; public static final String ALIASE_INDEX_MAP = "ALIASE_INDEX_MAP"; //索引别名 + public static final String ALIASE_INDEX_MAP_RAW = "ALIASE_INDEX_MAP_RAW"; //索引别名 public static final String INDEXLEVEL_ROUTING = "INDEXLEVEL_ROUTING"; public static final String CLUSTERLEVEL_ROUTING = "CLUSTERLEVEL_ROUTING"; public static final String INDEX_TARGET_GROUP = "INDEX_TARGET_GROUP"; @@ -213,16 +215,21 @@ private void cacheNode(Map cacheMap) throws Exception { if (nodeList != null) { nodeList.removeAll(elasticSearchService.getExcludeNodeList(cluster.getHttpAddress())); nodeList.removeAll(getAbnormalNodeList(cluster.getClusterId())); - return Optional.ofNullable(nodeList).orElseGet(Collections::emptyList); + if (nodeList.isEmpty()) { + LogUtils.warn(LOGGER, SearchLogEvent.ROUTING_EVENT, "cluster {} is empty after getExcludeNodeList.", cluster.getClusterId()); + } + return nodeList; } // if sth. goes wrong, keep the former cache rather than the null value. - LogUtils.info(LOGGER, SearchLogEvent.ROUTING_EVENT, "getNodeList by {} returns null, keep the former values in cache.", - cluster.getHttpAddress()); - Map> clusterNodeListMapInCache = (Map>) cacheMap - .get(CLUSTER_NODE_LIST); + Map> clusterNodeListMapInCache = (Map>) cacheMap.get(CLUSTER_NODE_LIST); + LogUtils.warn(LOGGER, SearchLogEvent.ROUTING_EVENT, "getNodeList by {} returns null, keep the former values in clusterNodeListMapInCache", + cluster.getClusterId(), clusterNodeListMapInCache); if (clusterNodeListMapInCache != null) { - List nodeListInCache = clusterNodeListMapInCache.get(cluster.getClusterId()); - return nodeListInCache == null ? emptyList() : nodeListInCache; + List nodeListInCache = clusterNodeListMapInCache.getOrDefault(cluster.getClusterId(), emptyList()); + if (nodeListInCache.isEmpty()) { + LogUtils.warn(LOGGER, SearchLogEvent.ROUTING_EVENT, "cluster {} is empty in clusterNodeListMapInCache.", cluster.getClusterId()); + } + return nodeListInCache; } return emptyList(); } @@ -236,46 +243,70 @@ private void cacheNode(Map cacheMap) throws Exception { .collect(toList()).stream().flatMap(Collection::stream).collect(toList()) )); + //clusterId -> shardNodeListMap + Map>> clusterShardNodeListMapRaw = clusterList.stream().collect(toMap(Cluster::getClusterId, cluster -> { + List shardNodeList = elasticSearchService.getIndexAndNodes(cluster.getHttpAddress()); + if (shardNodeList != null) { + shardNodeList.removeAll(elasticSearchService.getExcludeNodeList(cluster.getHttpAddress())); + shardNodeList.removeAll(getAbnormalNodeList(cluster.getClusterId())); + return Optional + .ofNullable(shardNodeList.stream() + .collect(groupingBy(t -> t[0], mapping(t -> t[1], toList())))) + .orElseGet(Collections::emptyMap); + } + // if sth. goes wrong, keep the former cache rather than the null value. + Map>> clusterShardNodeListMapRawInCache = (Map>>) cacheMap + .get(SHARD_NODE_LIST_RAW); + LogUtils.warn(LOGGER, SearchLogEvent.ROUTING_EVENT, "getShards by {} returns null, keep the former values in clusterShardNodeListMapRawInCache", + cluster.getClusterId(), clusterShardNodeListMapRawInCache); + + if (clusterShardNodeListMapRawInCache != null) { + Map> shardNodeListInCache = clusterShardNodeListMapRawInCache.getOrDefault(cluster.getClusterId(), emptyMap()); + if (shardNodeListInCache.isEmpty()) { + LogUtils.warn(LOGGER, SearchLogEvent.ROUTING_EVENT, "cluster {} is empty in clusterNodeListMapRawInCache.", cluster.getClusterId()); + } + return shardNodeListInCache; + } + return emptyMap(); + })); + + // index -> clusterId -> shardNodeListMap - Map>> clusterShardNodeListMap = reverseMapKey( - clusterList.stream().collect(toMap(Cluster::getClusterId, cluster -> { - List shardNodeList = elasticSearchService.getIndexAndNodes(cluster.getHttpAddress()); - if (shardNodeList != null) { - shardNodeList.removeAll(elasticSearchService.getExcludeNodeList(cluster.getHttpAddress())); - shardNodeList.removeAll(getAbnormalNodeList(cluster.getClusterId())); - return Optional - .ofNullable(shardNodeList.stream() - .collect(groupingBy(t -> t[0], mapping(t -> t[1], toList())))) - .orElseGet(Collections::emptyMap); - } - // if sth. goes wrong, keep the former cache rather than the null value. - LogUtils.info(LOGGER, SearchLogEvent.ROUTING_EVENT, "getShards by {} returns null, keep the former values in cache.", - cluster.getHttpAddress()); - Map>> clusterShardNodeListMapInCache = (Map>>) cacheMap - .get(SHARD_NODE_LIST); - if (clusterShardNodeListMapInCache != null) { - Map> shardNodeListInCache = clusterShardNodeListMapInCache - .get(cluster.getClusterId()); - return shardNodeListInCache == null ? emptyMap() : shardNodeListInCache; - } - return emptyMap(); - }))); + Map>> clusterShardNodeListMap = reverseMapKey(clusterShardNodeListMapRaw); + //aliaseIndex -> clusterId -> indexList - Map>> clusterAliaseIndexMap = reverseMapKey(clusterList.stream().collect(toMap( - cluster -> cluster.getClusterId(), - cluster -> { - List aliasesList = elasticSearchService.getActualIndexs(cluster.getHttpAddress()); - return aliasesList != null ? Optional.ofNullable(aliasesList.stream(). - collect(groupingBy(i -> i[0], mapping(i -> i[1], toList())))).orElseGet(Collections::emptyMap) : emptyMap(); - } - ))); + Map>> clusterAliaseIndexMapRaw = clusterList.stream().collect(toMap( + Cluster::getClusterId, + cluster -> { + List aliasesList = elasticSearchService.getActualIndexs(cluster.getHttpAddress()); + if (aliasesList == null || aliasesList.isEmpty()) { + LogUtils.warn(LOGGER, SearchLogEvent.ROUTING_EVENT, "aliasesList by {} returns null, keep the former values in clusterAliaseIndexMapCache", + cluster.getClusterId()); + Map>> clusterAliaseIndexMapCache = (Map>>) cacheMap + .get(ALIASE_INDEX_MAP_RAW); + + Map> cache = clusterAliaseIndexMapCache.getOrDefault(cluster.getClusterId(), emptyMap()); + if (cache.isEmpty()) { + LogUtils.warn(LOGGER, SearchLogEvent.ROUTING_EVENT, "cluster {} is empty in clusterAliaseIndexMapCache.", cluster.getClusterId()); + return cache; + } + } + return aliasesList != null ? Optional.ofNullable(aliasesList.stream(). + collect(groupingBy(i -> i[0], mapping(i -> i[1], toList())))).orElseGet(Collections::emptyMap) : emptyMap(); + } + )); + + //aliaseIndex -> clusterId -> indexList + Map>> clusterAliaseIndexMap = reverseMapKey(clusterAliaseIndexMapRaw); cacheMap.put(INDEX_NODE_LIST, indexNodeListMap); cacheMap.put(CLUSTER_NODE_LIST, clusterNodeListMap); + cacheMap.put(SHARD_NODE_LIST_RAW, clusterShardNodeListMapRaw); cacheMap.put(SHARD_NODE_LIST, clusterShardNodeListMap); cacheMap.put(INDEX_CLUSTER_PORT, indexClusterPortMap); cacheMap.put(CLUSTER_PORT, clusterPortMap); + cacheMap.put(ALIASE_INDEX_MAP_RAW, clusterAliaseIndexMapRaw); cacheMap.put(ALIASE_INDEX_MAP, clusterAliaseIndexMap); cacheMap.put(INDEX_CLUSTER_MAP, indexClusterMap); }