Skip to content

Commit

Permalink
[server][common] Allow store config repo to fetch configs only when n…
Browse files Browse the repository at this point in the history
…eeded (#1204)

* [common] Allow store config repo to fetch configs only when needed
  • Loading branch information
adamxchen authored Oct 8, 2024
1 parent e53e684 commit 47d987e
Show file tree
Hide file tree
Showing 9 changed files with 195 additions and 158 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,7 @@ private void initServerStoreAndSchemaRepository() {
// Load existing store config and setup watches
storeRepo.refresh();

storeConfigRepo = new HelixReadOnlyStoreConfigRepository(
zkClient,
adapter,
clusterConfig.getRefreshAttemptsForZkReconnect(),
clusterConfig.getRefreshIntervalForZkReconnectInMs());
storeConfigRepo = new HelixReadOnlyStoreConfigRepository(zkClient, adapter);
storeConfigRepo.refresh();

readOnlyZKSharedSchemaRepository = new HelixReadOnlyZKSharedSchemaRepository(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
package com.linkedin.venice.helix;

import com.google.common.annotations.VisibleForTesting;
import com.linkedin.venice.VeniceResource;
import com.linkedin.venice.common.VeniceSystemStoreType;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VeniceNoStoreException;
import com.linkedin.venice.meta.ReadOnlyStoreConfigRepository;
import com.linkedin.venice.meta.StoreConfig;
import java.util.ArrayList;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
Expand All @@ -33,73 +32,58 @@
public class HelixReadOnlyStoreConfigRepository implements ReadOnlyStoreConfigRepository, VeniceResource {
private static final Logger LOGGER = LogManager.getLogger(HelixReadOnlyStoreConfigRepository.class);

private final AtomicReference<Map<String, StoreConfig>> storeConfigMap;
private final Map<String, StoreConfig> loadedStoreConfigMap;
private final AtomicReference<Set<String>> availableStoreSet;
private final ZkStoreConfigAccessor accessor;
private final StoreConfigChangedListener storeConfigChangedListener;
private final StoreConfigAddedOrDeletedChangedListener storeConfigAddedOrDeletedListener;
private final ZkClient zkClient;
private final CachedResourceZkStateListener zkStateListener;
private final int refreshAttemptsForZkReconnect;
private final long refreshIntervalForZkReconnectInMs;

public HelixReadOnlyStoreConfigRepository(
ZkClient zkClient,
HelixAdapterSerializer adapterSerializer,
int refreshAttemptsForZkReconnect,
long refreshIntervalForZkReconnectInMs) {
this(
zkClient,
new ZkStoreConfigAccessor(zkClient, adapterSerializer, Optional.empty()),
refreshAttemptsForZkReconnect,
refreshIntervalForZkReconnectInMs);

public HelixReadOnlyStoreConfigRepository(ZkClient zkClient, HelixAdapterSerializer adapterSerializer) {
this(zkClient, new ZkStoreConfigAccessor(zkClient, adapterSerializer, Optional.empty()));
}

public HelixReadOnlyStoreConfigRepository(
ZkClient zkClient,
ZkStoreConfigAccessor accessor,
int refreshAttemptsForZkReconnect,
long refreshIntervalForZkReconnectInMs) {
public HelixReadOnlyStoreConfigRepository(ZkClient zkClient, ZkStoreConfigAccessor accessor) {
this.zkClient = zkClient;
this.accessor = accessor;
this.storeConfigMap = new AtomicReference<>(new HashMap<>());
this.loadedStoreConfigMap = new VeniceConcurrentHashMap<>();
this.availableStoreSet = new AtomicReference<>(new HashSet<>());
storeConfigChangedListener = new StoreConfigChangedListener();
storeConfigAddedOrDeletedListener = new StoreConfigAddedOrDeletedChangedListener();
this.refreshAttemptsForZkReconnect = refreshAttemptsForZkReconnect;
this.refreshIntervalForZkReconnectInMs = refreshIntervalForZkReconnectInMs;
// This repository already retry on getChildren, so do not need extra retry in listener.
zkStateListener = new CachedResourceZkStateListener(this);
}

/**
* Obtain all available stores and load them into cache, but it doesn't fetch the store configs and attach ZK watch yet
*/
@Override
public void refresh() {
LOGGER.info("Loading all store configs from zk.");
LOGGER.info("Loading all store names from zk.");
accessor.subscribeStoreConfigAddedOrDeletedListener(storeConfigAddedOrDeletedListener);
List<StoreConfig> configList =
accessor.getAllStoreConfigs(refreshAttemptsForZkReconnect, refreshIntervalForZkReconnectInMs);
LOGGER.info("Found {} store configs.", configList.size());
Map<String, StoreConfig> configMap = new HashMap<>();
for (StoreConfig config: configList) {
configMap.put(config.getStoreName(), config);
accessor.subscribeStoreConfigDataChangedListener(config.getStoreName(), storeConfigChangedListener);
}
storeConfigMap.set(configMap);
availableStoreSet.set(new HashSet<>(accessor.getAllStores()));
LOGGER.info("Found {} stores.", availableStoreSet.get().size());
zkClient.subscribeStateChanges(zkStateListener);
LOGGER.info("All store configs are loaded.");
LOGGER.info("All store names are loaded.");
}

@Override
public void clear() {
LOGGER.info("Clearing all store configs in local");
accessor.unsubscribeStoreConfigAddedOrDeletedListener(storeConfigAddedOrDeletedListener);
for (String storeName: storeConfigMap.get().keySet()) {
for (String storeName: loadedStoreConfigMap.keySet()) {
accessor.unsubscribeStoreConfigDataChangedListener(storeName, storeConfigChangedListener);
}
this.storeConfigMap.set(Collections.emptyMap());
this.loadedStoreConfigMap.clear();
this.availableStoreSet.set(Collections.emptySet());
zkClient.unsubscribeStateChanges(zkStateListener);
LOGGER.info("Cleared all store configs in local");
}

/**
* Get the store config by store name. It would fetch the store config from ZK if it's not in cache yet and attach ZK
* watch.
* The corresponding Venice store config is returned for metadata system store's store config. This is the most
* natural way to handle cluster discovery for metadata system stores and store migration.
*/
Expand All @@ -114,12 +98,28 @@ public Optional<StoreConfig> getStoreConfig(String storeName) {
if (systemStoreType != null && systemStoreType.equals(VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE)) {
veniceStoreName = VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.extractRegularStoreName(storeName);
}
StoreConfig config = storeConfigMap.get().get(veniceStoreName);
if (config != null) {
return Optional.of(config.cloneStoreConfig());
} else {
return Optional.empty();

if (availableStoreSet.get().contains(veniceStoreName)) {
StoreConfig config = loadedStoreConfigMap.get(veniceStoreName);
if (config == null) {
// lazy fetch from ZK and attach watch
config = accessor.getStoreConfig(veniceStoreName);
if (config == null) {
LOGGER.warn("Store config is not found for store: {}", veniceStoreName);
} else {
loadedStoreConfigMap.put(config.getStoreName(), config);
accessor.subscribeStoreConfigDataChangedListener(veniceStoreName, storeConfigChangedListener);
StoreConfig configToVerify = accessor.getStoreConfig(veniceStoreName);
if (!configToVerify.equals(config)) {
LOGGER.debug("Store config is changed during fetching. Reload the store config for {}", configToVerify);
config = configToVerify;
loadedStoreConfigMap.put(config.getStoreName(), config);
}
}
}
return Optional.ofNullable(config != null ? config.cloneStoreConfig() : null);
}
return Optional.empty();
}

@Override
Expand All @@ -131,63 +131,61 @@ public StoreConfig getStoreConfigOrThrow(String storeName) {
return storeConfig.get();
}

@Override
public List<StoreConfig> getAllStoreConfigs() {
return new ArrayList<>(storeConfigMap.get().values());
}

protected StoreConfigAddedOrDeletedChangedListener getStoreConfigAddedOrDeletedListener() {
@VisibleForTesting
StoreConfigAddedOrDeletedChangedListener getStoreConfigAddedOrDeletedListener() {
return storeConfigAddedOrDeletedListener;
}

protected StoreConfigChangedListener getStoreConfigChangedListener() {
@VisibleForTesting
StoreConfigChangedListener getStoreConfigChangedListener() {
return storeConfigChangedListener;
}

@VisibleForTesting
Set<String> getAvailableStoreSet() {
return availableStoreSet.get();
}

@VisibleForTesting
Map<String, StoreConfig> getLoadedStoreConfigMap() {
return loadedStoreConfigMap;
}

protected class StoreConfigAddedOrDeletedChangedListener implements IZkChildListener {
@Override
public void handleChildChange(String parentPath, List<String> currentChildren) throws Exception {
synchronized (storeConfigMap) {
Map<String, StoreConfig> map = new HashMap<>(storeConfigMap.get());
List<String> newStores =
currentChildren.stream().filter(newStore -> !map.containsKey(newStore)).collect(Collectors.toList());

Set<String> deletedStores = new HashSet<>(map.keySet());
currentChildren.forEach(deletedStores::remove);
LOGGER.info(
public void handleChildChange(String parentPath, List<String> currentChildren) {
synchronized (availableStoreSet) {
Set<String> storeSetSnapshot = new HashSet<>(availableStoreSet.get());
long newStoresCount = currentChildren.stream().filter(newStore -> !storeSetSnapshot.contains(newStore)).count();

// obtain the stores that are removed
currentChildren.forEach(storeSetSnapshot::remove);
LOGGER.debug(
"Store configs list is changed. {} new configs. And will delete {} configs.",
newStores.size(),
deletedStores.size());
// New added store configs
List<StoreConfig> newConfigs = accessor.getStoreConfigs(newStores);
for (StoreConfig config: newConfigs) {
map.put(config.getStoreName(), config);
accessor.subscribeStoreConfigDataChangedListener(config.getStoreName(), storeConfigChangedListener);
}
newStoresCount,
storeSetSnapshot.size());

// update the available store set
availableStoreSet.set(new HashSet<>(currentChildren));

// Deleted store configs
for (String deletedStore: deletedStores) {
map.remove(deletedStore);
for (String deletedStore: storeSetSnapshot) {
loadedStoreConfigMap.remove(deletedStore);
accessor.unsubscribeStoreConfigDataChangedListener(deletedStore, storeConfigChangedListener);
}
storeConfigMap.set(map);
}
}
}

protected class StoreConfigChangedListener implements IZkDataListener {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
public void handleDataChange(String dataPath, Object data) {
if (!(data instanceof StoreConfig)) {
throw new VeniceException(
"Invalid data from zk notification. Required: StoreConfig, but get: " + data.getClass().getName());
}
StoreConfig config = (StoreConfig) data;
synchronized (storeConfigMap) {
Map<String, StoreConfig> map = new HashMap<>(storeConfigMap.get());
map.put(config.getStoreName(), config);
storeConfigMap.set(map);
}
loadedStoreConfigMap.put(config.getStoreName(), config);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,7 @@ public ZkStoreConfigAccessor(
}

public List<String> getAllStores() {
return dataAccessor.getChildNames(ROOT_PATH, AccessOption.PERSISTENT);
}

public List<StoreConfig> getAllStoreConfigs(
int refreshAttemptsForZkReconnect,
long refreshIntervalForZkReconnectInMs) {
// Only return not null configs.
List<StoreConfig> configs = HelixUtils
.getChildren(dataAccessor, ROOT_PATH, refreshAttemptsForZkReconnect, refreshIntervalForZkReconnectInMs)
.stream()
.filter(storeConfig -> storeConfig != null)
.collect(Collectors.toList());

LOGGER.info("Read {} store configs from path: {}.", configs.size(), ROOT_PATH);
return configs;
return HelixUtils.listPathContents(dataAccessor, ROOT_PATH);
}

public synchronized boolean containsConfig(String store) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.linkedin.venice.meta;

import java.util.List;
import java.util.Optional;


Expand All @@ -11,6 +10,4 @@ public interface ReadOnlyStoreConfigRepository {
Optional<StoreConfig> getStoreConfig(String storeName);

StoreConfig getStoreConfigOrThrow(String storeName);

List<StoreConfig> getAllStoreConfigs();
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.systemstore.schemas.StoreClusterConfig;
import com.linkedin.venice.utils.AvroRecordUtils;
import java.util.Objects;


/**
Expand Down Expand Up @@ -88,4 +89,32 @@ public StoreConfig cloneStoreConfig() {
clonedStoreConfig.setMigrationDestCluster(getMigrationDestCluster());
return clonedStoreConfig;
}

@Override
public int hashCode() {
return Objects.hash(
storeClusterConfig.storeName,
storeClusterConfig.cluster,
storeClusterConfig.deleting,
storeClusterConfig.migrationSrcCluster,
storeClusterConfig.migrationDestCluster);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

StoreConfig that = (StoreConfig) o;

return storeClusterConfig.deleting == that.storeClusterConfig.deleting
&& storeClusterConfig.storeName.equals(that.storeClusterConfig.storeName)
&& storeClusterConfig.cluster.equals(that.storeClusterConfig.cluster)
&& (Objects.equals(storeClusterConfig.migrationSrcCluster, that.storeClusterConfig.migrationSrcCluster))
&& (Objects.equals(storeClusterConfig.migrationDestCluster, that.storeClusterConfig.migrationDestCluster));
}
}
Loading

0 comments on commit 47d987e

Please sign in to comment.