diff --git a/jraft-rheakv/rheakv-pd/src/main/java/com/alipay/sofa/jraft/rhea/PlacementDriverServer.java b/jraft-rheakv/rheakv-pd/src/main/java/com/alipay/sofa/jraft/rhea/PlacementDriverServer.java index d24d8193a..dbc921cc3 100644 --- a/jraft-rheakv/rheakv-pd/src/main/java/com/alipay/sofa/jraft/rhea/PlacementDriverServer.java +++ b/jraft-rheakv/rheakv-pd/src/main/java/com/alipay/sofa/jraft/rhea/PlacementDriverServer.java @@ -16,10 +16,15 @@ */ package com.alipay.sofa.jraft.rhea; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; +import com.alipay.sofa.jraft.conf.Configuration; +import com.alipay.sofa.jraft.rhea.options.RegionEngineOptions; +import com.alipay.sofa.jraft.rhea.options.StoreEngineOptions; +import com.alipay.sofa.jraft.rhea.util.Constants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -128,11 +133,14 @@ public synchronized boolean init(final PlacementDriverServerOptions opts) { final RheaKVStoreOptions rheaOpts = opts.getRheaKVStoreOptions(); Requires.requireNonNull(rheaOpts, "opts.rheaKVStoreOptions"); this.rheaKVStore = new DefaultRheaKVStore(); + this.placementDriverService = new DefaultPlacementDriverService(this.rheaKVStore); + // Set up a listener before becoming a leader + this.rheaKVStore.addLeaderStateListener(getPdReginId(rheaOpts), + ((DefaultPlacementDriverService) this.placementDriverService)); if (!this.rheaKVStore.init(rheaOpts)) { LOG.error("Fail to init [RheaKVStore]."); return false; } - this.placementDriverService = new DefaultPlacementDriverService(this.rheaKVStore); if (!this.placementDriverService.init(opts)) { LOG.error("Fail to init [PlacementDriverService]."); return false; @@ -147,13 +155,42 @@ public synchronized boolean init(final PlacementDriverServerOptions opts) { throw new IllegalArgumentException("Only support single region for [PlacementDriverServer]"); } this.regionEngine = regionEngines.get(0); - this.rheaKVStore.addLeaderStateListener(this.regionEngine.getRegion().getId(), - ((DefaultPlacementDriverService) this.placementDriverService)); addPlacementDriverProcessor(storeEngine.getRpcServer()); LOG.info("[PlacementDriverServer] start successfully, options: {}.", opts); return this.started = true; } + private long getPdReginId(final RheaKVStoreOptions rheaOpts) { + StoreEngineOptions storeEngineOptions = rheaOpts.getStoreEngineOptions(); + Requires.requireNonNull(storeEngineOptions, "storeEngineOptions"); + List rOptsList = storeEngineOptions.getRegionEngineOptionsList(); + if (rOptsList == null || rOptsList.isEmpty()) { + return Constants.DEFAULT_REGION_ID; + } + List filteredOptsList = new ArrayList<>(); + for (RegionEngineOptions rOpts : rOptsList) { + if (inConfiguration(rOpts.getServerAddress().toString(), rOpts.getInitialServerList())) { + filteredOptsList.add(rOpts); + } + } + if (filteredOptsList.size() > 1) { + throw new IllegalArgumentException("Only support single region for [PlacementDriverServer]"); + } + return rOptsList.get(0).getRegionId(); + } + + private boolean inConfiguration(final String curr, final String all) { + final PeerId currPeer = new PeerId(); + if (!currPeer.parse(curr)) { + return false; + } + final Configuration allConf = new Configuration(); + if (!allConf.parse(all)) { + return false; + } + return allConf.contains(currPeer) || allConf.getLearners().contains(currPeer); + } + @Override public synchronized void shutdown() { if (!this.started) {