diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/StateMachine.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/StateMachine.java index ccb6bebd5..e2af0ed8e 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/StateMachine.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/StateMachine.java @@ -64,8 +64,8 @@ public interface StateMachine { * call done.run(status) when snapshot finished. * Default: Save nothing and returns error. * - * @param writer snapshot writer - * @param done callback + * @param writer snapshot writer + * @param done callback */ void onSnapshotSave(SnapshotWriter writer, Closure done); diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/Snapshot.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/Snapshot.java index bd870253f..a7485ad31 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/Snapshot.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/Snapshot.java @@ -38,7 +38,7 @@ public abstract class Snapshot extends Status { * Snapshot file prefix. */ public static final String JRAFT_SNAPSHOT_PREFIX = "snapshot_"; - /** Snapshot uri scheme for remote peer*/ + /** Snapshot uri scheme for remote peer */ public static final String REMOTE_SNAPSHOT_URI_SCHEME = "remote://"; /** diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/Platform.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/Platform.java index 1bf600763..ed0acbb9b 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/Platform.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/Platform.java @@ -39,7 +39,9 @@ public static boolean isWindows() { } private static boolean isWindows0() { - boolean windows = SystemPropertyUtil.get("os.name", "").toLowerCase(Locale.US).contains("win"); + final boolean windows = SystemPropertyUtil.get("os.name", "") // + .toLowerCase(Locale.US) // + .contains("win"); if (windows) { LOG.debug("Platform: Windows"); } diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/DefaultRegionKVService.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/DefaultRegionKVService.java index d4d1e76cb..10b185be9 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/DefaultRegionKVService.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/DefaultRegionKVService.java @@ -449,10 +449,11 @@ public void handleKeyLockRequest(final KeyLockRequest request, try { checkRegionEpoch(request); final byte[] key = requireNonNull(request.getKey(), "lock.key"); + final byte[] fencingKey = this.regionEngine.getRegion().getStartKey(); final DistributedLock.Acquirer acquirer = requireNonNull(request.getAcquirer(), "lock.acquirer"); requireNonNull(acquirer.getId(), "lock.id"); requirePositive(acquirer.getLeaseMillis(), "lock.leaseMillis"); - this.rawKVStore.tryLockWith(key, request.isKeepLease(), acquirer, new BaseKVStoreClosure() { + this.rawKVStore.tryLockWith(key, fencingKey, request.isKeepLease(), acquirer, new BaseKVStoreClosure() { @Override public void run(Status status) { diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/RegionEngine.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/RegionEngine.java index ef4458ebc..d6ceb003b 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/RegionEngine.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/RegionEngine.java @@ -85,7 +85,7 @@ public synchronized boolean init(final RegionEngineOptions opts) { return true; } this.regionOpts = Requires.requireNonNull(opts, "opts"); - this.fsm = new KVStoreStateMachine(this.region.getId(), this.storeEngine); + this.fsm = new KVStoreStateMachine(this.region, this.storeEngine); // node options NodeOptions nodeOpts = opts.getNodeOptions(); diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/StoreEngine.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/StoreEngine.java index 56e5fb1ec..f95f0ebd0 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/StoreEngine.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/StoreEngine.java @@ -515,11 +515,7 @@ public void doSplit(final Long regionId, final Long newRegionId, final byte[] sp rOpts.setRaftDataPath(baseRaftDataPath + "raft_data_region_" + region.getId() + "_" + getSelfEndpoint().getPort()); final RegionEngine engine = new RegionEngine(region, this); - if (engine.init(rOpts)) { - final RegionKVService regionKVService = new DefaultRegionKVService(engine); - registerRegionKVService(regionKVService); - this.regionEngineTable.put(region.getId(), engine); - } else { + if (!engine.init(rOpts)) { LOG.error("Fail to init [RegionEngine: {}].", region); if (closure != null) { // null on follower @@ -528,12 +524,20 @@ public void doSplit(final Long regionId, final Long newRegionId, final byte[] sp } return; } + // update parent conf final Region pRegion = parent.getRegion(); final RegionEpoch pEpoch = pRegion.getRegionEpoch(); final long version = pEpoch.getVersion(); pEpoch.setVersion(version + 1); // version + 1 pRegion.setEndKey(splitKey); // update endKey + + // the following two lines of code can make a relation of 'happens-before' for + // read 'pRegion', because that a write to a ConcurrentMap happens-before every + // subsequent read of that ConcurrentMap. + this.regionEngineTable.put(region.getId(), engine); + registerRegionKVService(new DefaultRegionKVService(engine)); + // update local regionRouteTable this.pdClient.getRegionRouteTable().splitRegion(pRegion.getId(), region); if (closure != null) { diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/DefaultRheaKVStore.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/DefaultRheaKVStore.java index cd0991a69..d13ff85b5 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/DefaultRheaKVStore.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/DefaultRheaKVStore.java @@ -1165,7 +1165,7 @@ private void internalTryLockWith(final byte[] key, final boolean keepLease, fina retryRunner); if (regionEngine != null) { if (ensureOnValidEpoch(region, regionEngine, closure)) { - getRawKVStore(regionEngine).tryLockWith(key, keepLease, acquirer, closure); + getRawKVStore(regionEngine).tryLockWith(key, region.getStartKey(), keepLease, acquirer, closure); } } else { final KeyLockRequest request = new KeyLockRequest(); diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/AbstractKVStoreSnapshotFile.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/AbstractKVStoreSnapshotFile.java new file mode 100644 index 000000000..e83549004 --- /dev/null +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/AbstractKVStoreSnapshotFile.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.rhea.storage; + +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Paths; +import java.util.concurrent.ExecutorService; +import java.util.zip.ZipOutputStream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.alipay.sofa.jraft.Closure; +import com.alipay.sofa.jraft.Status; +import com.alipay.sofa.jraft.error.RaftError; +import com.alipay.sofa.jraft.rhea.metadata.Region; +import com.alipay.sofa.jraft.rhea.serialization.Serializer; +import com.alipay.sofa.jraft.rhea.serialization.Serializers; +import com.alipay.sofa.jraft.rhea.util.StackTraceUtil; +import com.alipay.sofa.jraft.rhea.util.ZipUtil; +import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader; +import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter; +import com.google.protobuf.ByteString; + +import static com.alipay.sofa.jraft.entity.LocalFileMetaOutter.LocalFileMeta; + +/** + * @author jiachun.fjc + */ +public abstract class AbstractKVStoreSnapshotFile implements KVStoreSnapshotFile { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractKVStoreSnapshotFile.class); + + private static final String SNAPSHOT_DIR = "kv"; + private static final String SNAPSHOT_ARCHIVE = "kv.zip"; + + protected final Serializer serializer = Serializers.getDefault(); + + @Override + public void save(final SnapshotWriter writer, final Closure done, final Region region, + final ExecutorService executor) { + final String writerPath = writer.getPath(); + final String snapshotPath = Paths.get(writerPath, SNAPSHOT_DIR).toString(); + try { + final LocalFileMeta meta = doSnapshotSave(snapshotPath, region); + executor.execute(() -> compressSnapshot(writer, meta, done)); + } catch (final Throwable t) { + LOG.error("Fail to save snapshot, path={}, file list={}, {}.", writerPath, writer.listFiles(), + StackTraceUtil.stackTrace(t)); + done.run(new Status(RaftError.EIO, "Fail to save snapshot at %s, error is %s", writerPath, + t.getMessage())); + } + } + + @Override + public boolean load(final SnapshotReader reader, final Region region) { + final LocalFileMeta meta = (LocalFileMeta) reader.getFileMeta(SNAPSHOT_ARCHIVE); + final String readerPath = reader.getPath(); + if (meta == null) { + LOG.error("Can't find kv snapshot file, path={}.", readerPath); + return false; + } + final String snapshotPath = Paths.get(readerPath, SNAPSHOT_DIR).toString(); + try { + decompressSnapshot(readerPath); + doSnapshotLoad(snapshotPath, meta, region); + return true; + } catch (final Throwable t) { + LOG.error("Fail to load snapshot, path={}, file list={}, {}.", readerPath, reader.listFiles(), + StackTraceUtil.stackTrace(t)); + return false; + } + } + + abstract LocalFileMeta doSnapshotSave(final String snapshotPath, final Region region) throws Exception; + + abstract void doSnapshotLoad(final String snapshotPath, final LocalFileMeta meta, final Region region) + throws Exception; + + protected void compressSnapshot(final SnapshotWriter writer, final LocalFileMeta meta, final Closure done) { + final String writerPath = writer.getPath(); + final String outputFile = Paths.get(writerPath, SNAPSHOT_ARCHIVE).toString(); + try { + try (final ZipOutputStream out = new ZipOutputStream(new FileOutputStream(outputFile))) { + ZipUtil.compressDirectoryToZipFile(writerPath, SNAPSHOT_DIR, out); + } + if (writer.addFile(SNAPSHOT_ARCHIVE, meta)) { + done.run(Status.OK()); + } else { + done.run(new Status(RaftError.EIO, "Fail to add snapshot file: %s", writerPath)); + } + } catch (final Throwable t) { + LOG.error("Fail to compress snapshot, path={}, file list={}, {}.", writerPath, writer.listFiles(), + StackTraceUtil.stackTrace(t)); + done.run(new Status(RaftError.EIO, "Fail to compress snapshot at %s, error is %s", writerPath, t + .getMessage())); + } + } + + protected void decompressSnapshot(final String readerPath) throws IOException { + final String sourceFile = Paths.get(readerPath, SNAPSHOT_ARCHIVE).toString(); + ZipUtil.unzipFile(sourceFile, readerPath); + } + + protected T readMetadata(final LocalFileMeta meta, final Class cls) { + final ByteString userMeta = meta.getUserMeta(); + return this.serializer.readObject(userMeta.toByteArray(), cls); + } + + protected LocalFileMeta buildMetadata(final T metadata) { + return metadata == null ? null : LocalFileMeta.newBuilder() // + .setUserMeta(ByteString.copyFrom(this.serializer.writeObject(metadata))) // + .build(); + } +} diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BaseRawKVStore.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BaseRawKVStore.java index d35e9f3a5..6a77843ef 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BaseRawKVStore.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BaseRawKVStore.java @@ -27,10 +27,8 @@ import com.alipay.sofa.jraft.rhea.errors.Errors; import com.alipay.sofa.jraft.rhea.metrics.KVMetrics; import com.alipay.sofa.jraft.rhea.util.StackTraceUtil; -import com.alipay.sofa.jraft.util.BytesUtil; import com.codahale.metrics.Timer; -import static com.alipay.sofa.jraft.entity.LocalFileMetaOutter.LocalFileMeta; import static com.alipay.sofa.jraft.rhea.metrics.KVMetricNames.DB_TIMER; /** @@ -38,8 +36,6 @@ */ public abstract class BaseRawKVStore implements RawKVStore, Lifecycle { - protected static final byte[] LOCK_FENCING_KEY = BytesUtil.writeUtf8("LOCK_FENCING_KEY"); - @Override public void get(final byte[] key, final KVStoreClosure closure) { get(key, true, closure); @@ -96,9 +92,13 @@ public void execute(final NodeExecutor nodeExecutor, final boolean isLeader, fin */ public abstract byte[] jumpOver(final byte[] startKey, final long distance); - public abstract LocalFileMeta onSnapshotSave(final String snapshotPath) throws Exception; - - public abstract void onSnapshotLoad(final String snapshotPath, final LocalFileMeta meta) throws Exception; + /** + * Init the fencing token of new region. + * + * @param parentKey the fencing key of parent region + * @param childKey the fencing key of new region + */ + public abstract void initFencingToken(final byte[] parentKey, final byte[] childKey); // static methods // diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BatchRawKVStore.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BatchRawKVStore.java index 86d8181b7..e3d1bb9f2 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BatchRawKVStore.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BatchRawKVStore.java @@ -85,7 +85,8 @@ public void batchTryLockWith(final KVStateOutputList kvStates) { final KVState kvState = kvStates.get(i); final KVOperation op = kvState.getOp(); final Pair acquirerPair = op.getAcquirerPair(); - tryLockWith(op.getKey(), acquirerPair.getKey(), acquirerPair.getValue(), kvState.getDone()); + tryLockWith(op.getKey(), op.getFencingKey(), acquirerPair.getKey(), acquirerPair.getValue(), + kvState.getDone()); } } diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVOperation.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVOperation.java index 50098fb2d..a3fe348d6 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVOperation.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVOperation.java @@ -155,10 +155,10 @@ public static KVOperation createNodeExecutor(final NodeExecutor nodeExecutor) { return new KVOperation(BytesUtil.EMPTY_BYTES, BytesUtil.EMPTY_BYTES, nodeExecutor, NODE_EXECUTE); } - public static KVOperation createKeyLockRequest(final byte[] key, + public static KVOperation createKeyLockRequest(final byte[] key, final byte[] fencingKey, final Pair acquirerPair) { Requires.requireNonNull(key, "key"); - return new KVOperation(key, BytesUtil.EMPTY_BYTES, acquirerPair, KEY_LOCK); + return new KVOperation(key, fencingKey, acquirerPair, KEY_LOCK); } public static KVOperation createKeyLockReleaseRequest(final byte[] key, final DistributedLock.Acquirer acquirer) { @@ -239,6 +239,10 @@ public byte[] getEndKey() { return value; } + public byte[] getFencingKey() { + return value; + } + public void setValue(byte[] value) { this.value = value; } diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVStoreSnapshotFile.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVStoreSnapshotFile.java new file mode 100644 index 000000000..91bc0d6ae --- /dev/null +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVStoreSnapshotFile.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.rhea.storage; + +import java.util.concurrent.ExecutorService; + +import com.alipay.sofa.jraft.Closure; +import com.alipay.sofa.jraft.rhea.metadata.Region; +import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader; +import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter; + +/** + * + * @author jiachun.fjc + */ +public interface KVStoreSnapshotFile { + + /** + * Save a snapshot for the specified region. + * + * @param writer snapshot writer + * @param done callback + * @param region the region to save snapshot + * @param executor the executor to compress snapshot + */ + void save(final SnapshotWriter writer, final Closure done, final Region region, final ExecutorService executor); + + /** + * Load snapshot for the specified region. + * + * @param reader snapshot reader + * @param region the region to load snapshot + * @return true if load succeed + */ + boolean load(final SnapshotReader reader, final Region region); +} diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVStoreSnapshotFileFactory.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVStoreSnapshotFileFactory.java new file mode 100644 index 000000000..e54b8084d --- /dev/null +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVStoreSnapshotFileFactory.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.rhea.storage; + +import com.alipay.sofa.jraft.util.Requires; + +/** + * + * @author jiachun.fjc + */ +public final class KVStoreSnapshotFileFactory { + + public static KVStoreSnapshotFile getKVStoreSnapshotFile(final BaseRawKVStore kvStore) { + Requires.requireNonNull(kvStore, "kvStore"); + if (kvStore instanceof RocksRawKVStore) { + return new RocksKVStoreSnapshotFile((RocksRawKVStore) kvStore); + } + if (kvStore instanceof MemoryRawKVStore) { + return new MemoryKVStoreSnapshotFile((MemoryRawKVStore) kvStore); + } + throw reject("fail to find a KVStoreSnapshotFile with " + kvStore.getClass().getName()); + } + + private static UnsupportedOperationException reject(final String message) { + return new UnsupportedOperationException(message); + } + + private KVStoreSnapshotFileFactory() { + } +} diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVStoreStateMachine.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVStoreStateMachine.java index e9d8a03df..3d9435804 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVStoreStateMachine.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVStoreStateMachine.java @@ -16,12 +16,10 @@ */ package com.alipay.sofa.jraft.rhea.storage; -import java.io.File; -import java.io.FileOutputStream; +import java.util.Arrays; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicLong; -import java.util.zip.ZipOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,21 +31,20 @@ import com.alipay.sofa.jraft.error.RaftError; import com.alipay.sofa.jraft.rhea.LeaderStateListener; import com.alipay.sofa.jraft.rhea.StoreEngine; +import com.alipay.sofa.jraft.rhea.errors.Errors; import com.alipay.sofa.jraft.rhea.errors.IllegalKVOperationException; import com.alipay.sofa.jraft.rhea.errors.StoreCodecException; +import com.alipay.sofa.jraft.rhea.metadata.Region; import com.alipay.sofa.jraft.rhea.metrics.KVMetrics; import com.alipay.sofa.jraft.rhea.serialization.Serializer; import com.alipay.sofa.jraft.rhea.serialization.Serializers; import com.alipay.sofa.jraft.rhea.util.Pair; import com.alipay.sofa.jraft.rhea.util.RecycleUtil; -import com.alipay.sofa.jraft.rhea.util.StackTraceUtil; -import com.alipay.sofa.jraft.rhea.util.ZipUtil; import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader; import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter; import com.codahale.metrics.Histogram; import com.codahale.metrics.Meter; -import static com.alipay.sofa.jraft.entity.LocalFileMetaOutter.LocalFileMeta; import static com.alipay.sofa.jraft.rhea.metrics.KVMetricNames.STATE_MACHINE_APPLY_QPS; import static com.alipay.sofa.jraft.rhea.metrics.KVMetricNames.STATE_MACHINE_BATCH_WRITE; @@ -58,25 +55,24 @@ */ public class KVStoreStateMachine extends StateMachineAdapter { - private static final Logger LOG = LoggerFactory.getLogger(KVStoreStateMachine.class); + private static final Logger LOG = LoggerFactory.getLogger(KVStoreStateMachine.class); - private static final String SNAPSHOT_DIR = "kv"; - private static final String SNAPSHOT_ARCHIVE = "kv.zip"; - - private final List listeners = new CopyOnWriteArrayList<>(); - private final AtomicLong leaderTerm = new AtomicLong(-1L); - private final Serializer serializer = Serializers.getDefault(); - private final long regionId; + private final List listeners = new CopyOnWriteArrayList<>(); + private final AtomicLong leaderTerm = new AtomicLong(-1L); + private final Serializer serializer = Serializers.getDefault(); + private final Region region; private final StoreEngine storeEngine; private final BatchRawKVStore rawKVStore; + private final KVStoreSnapshotFile storeSnapshotFile; private final Meter applyMeter; private final Histogram batchWriteHistogram; - public KVStoreStateMachine(long regionId, StoreEngine storeEngine) { - this.regionId = regionId; + public KVStoreStateMachine(Region region, StoreEngine storeEngine) { + this.region = region; this.storeEngine = storeEngine; this.rawKVStore = storeEngine.getRawKVStore(); - final String regionStr = String.valueOf(this.regionId); + this.storeSnapshotFile = KVStoreSnapshotFileFactory.getKVStoreSnapshotFile(this.rawKVStore); + final String regionStr = String.valueOf(this.region.getId()); this.applyMeter = KVMetrics.meter(STATE_MACHINE_APPLY_QPS, regionStr); this.batchWriteHistogram = KVMetrics.histogram(STATE_MACHINE_BATCH_WRITE, regionStr); } @@ -127,7 +123,7 @@ private void batchApplyAndRecycle(final byte opByte, final KVStateOutputList kvS } // metrics: op qps - final Meter opApplyMeter = KVMetrics.meter(STATE_MACHINE_APPLY_QPS, String.valueOf(this.regionId), + final Meter opApplyMeter = KVMetrics.meter(STATE_MACHINE_APPLY_QPS, String.valueOf(this.region.getId()), KVOperation.opName(opByte)); final int size = kvStates.size(); opApplyMeter.mark(size); @@ -196,24 +192,30 @@ private void batchApply(final byte opType, final KVStateOutputList kvStates) { } private void doSplit(final KVStateOutputList kvStates) { + final byte[] parentKey = this.region.getStartKey(); for (final KVState kvState : kvStates) { final KVOperation op = kvState.getOp(); final Pair regionIds = op.getRegionIds(); - this.storeEngine.doSplit(regionIds.getKey(), regionIds.getValue(), op.getKey(), kvState.getDone()); + final byte[] splitKey = op.getKey(); + final KVStoreClosure closure = kvState.getDone(); + try { + this.rawKVStore.initFencingToken(parentKey, splitKey); + this.storeEngine.doSplit(regionIds.getKey(), regionIds.getValue(), splitKey, closure); + } catch (final Exception e) { + LOG.error("Fail to split, regionId={}, newRegionId={}, splitKey={}.", regionIds.getKey(), + regionIds.getValue(), Arrays.toString(splitKey)); + if (closure != null) { + // closure is null on follower node + closure.setError(Errors.STORAGE_ERROR); + closure.run(new Status(RaftError.EIO, e.getMessage())); + } + } } } @Override public void onSnapshotSave(final SnapshotWriter writer, final Closure done) { - final String snapshotPath = writer.getPath() + File.separator + SNAPSHOT_DIR; - try { - final LocalFileMeta meta = this.rawKVStore.onSnapshotSave(snapshotPath); - this.storeEngine.getSnapshotExecutor().execute(() -> doCompressSnapshot(writer, meta, done)); - } catch (final Throwable t) { - LOG.error("Fail to save snapshot at {}, {}.", snapshotPath, StackTraceUtil.stackTrace(t)); - done.run(new Status(RaftError.EIO, "Fail to save snapshot at %s, error is %s", snapshotPath, - t.getMessage())); - } + this.storeSnapshotFile.save(writer, done, this.region.copy(), this.storeEngine.getSnapshotExecutor()); } @Override @@ -222,37 +224,7 @@ public boolean onSnapshotLoad(final SnapshotReader reader) { LOG.warn("Leader is not supposed to load snapshot."); return false; } - final LocalFileMeta meta = (LocalFileMeta) reader.getFileMeta(SNAPSHOT_ARCHIVE); - if (meta == null) { - LOG.error("Can't find kv snapshot file at {}.", reader.getPath()); - return false; - } - try { - ZipUtil.unzipFile(reader.getPath() + File.separator + SNAPSHOT_ARCHIVE, reader.getPath()); - this.rawKVStore.onSnapshotLoad(reader.getPath() + File.separator + SNAPSHOT_DIR, meta); - return true; - } catch (final Throwable t) { - LOG.error("Fail to load snapshot: {}.", StackTraceUtil.stackTrace(t)); - return false; - } - } - - private void doCompressSnapshot(final SnapshotWriter writer, final LocalFileMeta meta, final Closure done) { - final String backupPath = writer.getPath() + File.separator + SNAPSHOT_DIR; - try { - try (final ZipOutputStream out = new ZipOutputStream(new FileOutputStream(writer.getPath() + File.separator - + SNAPSHOT_ARCHIVE))) { - ZipUtil.compressDirectoryToZipFile(writer.getPath(), SNAPSHOT_DIR, out); - } - if (writer.addFile(SNAPSHOT_ARCHIVE, meta)) { - done.run(Status.OK()); - } else { - done.run(new Status(RaftError.EIO, "Fail to add snapshot file: %s", backupPath)); - } - } catch (final Throwable t) { - LOG.error("Fail to save snapshot at {}, {}.", backupPath, StackTraceUtil.stackTrace(t)); - done.run(new Status(RaftError.EIO, "Fail to save snapshot at %s, error is %s", backupPath, t.getMessage())); - } + return this.storeSnapshotFile.load(reader, this.region.copy()); } @Override @@ -293,6 +265,6 @@ public void addLeaderStateListener(final LeaderStateListener listener) { } public long getRegionId() { - return regionId; + return this.region.getId(); } } diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/MemoryKVIterator.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/MemoryKVIterator.java index a053197dc..546b9318e 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/MemoryKVIterator.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/MemoryKVIterator.java @@ -18,147 +18,67 @@ import java.util.Map; import java.util.concurrent.ConcurrentNavigableMap; -import java.util.concurrent.locks.Lock; - -import com.alipay.sofa.jraft.rhea.errors.InvalidIteratorVersion; /** * @author jiachun.fjc */ public class MemoryKVIterator implements KVIterator { - private final MemoryRawKVStore memoryRawKVStore; private final ConcurrentNavigableMap db; - private final Lock dbReadLock; - private final long dbVersion; private Map.Entry cursorEntry; - public MemoryKVIterator(MemoryRawKVStore memoryRawKVStore, ConcurrentNavigableMap db, - Lock dbReadLock, long dbVersion) { - this.memoryRawKVStore = memoryRawKVStore; + public MemoryKVIterator(ConcurrentNavigableMap db) { this.db = db; - this.dbReadLock = dbReadLock; - this.dbVersion = dbVersion; } @Override public boolean isValid() { - final Lock readLock = this.dbReadLock; - readLock.lock(); - try { - ensureSafety(); - return this.cursorEntry != null; - } finally { - readLock.unlock(); - } + return this.cursorEntry != null; } @Override public void seekToFirst() { - final Lock readLock = this.dbReadLock; - readLock.lock(); - try { - ensureSafety(); - this.cursorEntry = this.db.firstEntry(); - } finally { - readLock.unlock(); - } + this.cursorEntry = this.db.firstEntry(); } @Override public void seekToLast() { - final Lock readLock = this.dbReadLock; - readLock.lock(); - try { - ensureSafety(); - this.cursorEntry = this.db.lastEntry(); - } finally { - readLock.unlock(); - } + this.cursorEntry = this.db.lastEntry(); } @Override public void seek(final byte[] target) { - final Lock readLock = this.dbReadLock; - readLock.lock(); - try { - ensureSafety(); - this.cursorEntry = this.db.ceilingEntry(target); - } finally { - readLock.unlock(); - } + this.cursorEntry = this.db.ceilingEntry(target); } @Override public void seekForPrev(final byte[] target) { - final Lock readLock = this.dbReadLock; - readLock.lock(); - try { - ensureSafety(); - this.cursorEntry = this.db.lowerEntry(target); - } finally { - readLock.unlock(); - } + this.cursorEntry = this.db.lowerEntry(target); } @Override public void next() { - final Lock readLock = this.dbReadLock; - readLock.lock(); - try { - ensureSafety(); - this.cursorEntry = this.db.higherEntry(this.cursorEntry.getKey()); - } finally { - readLock.unlock(); - } + this.cursorEntry = this.db.higherEntry(this.cursorEntry.getKey()); } @Override public void prev() { - final Lock readLock = this.dbReadLock; - readLock.lock(); - try { - ensureSafety(); - this.cursorEntry = this.db.lowerEntry(this.cursorEntry.getKey()); - } finally { - readLock.unlock(); - } + this.cursorEntry = this.db.lowerEntry(this.cursorEntry.getKey()); } @Override public byte[] key() { - final Lock readLock = this.dbReadLock; - readLock.lock(); - try { - ensureSafety(); - return this.cursorEntry.getKey(); - } finally { - readLock.unlock(); - } + return this.cursorEntry.getKey(); } @Override public byte[] value() { - final Lock readLock = this.dbReadLock; - readLock.lock(); - try { - ensureSafety(); - return this.cursorEntry.getValue(); - } finally { - readLock.unlock(); - } + return this.cursorEntry.getValue(); } @Override public void close() throws Exception { // no-op } - - private void ensureSafety() { - if (this.dbVersion != this.memoryRawKVStore.getDatabaseVersion()) { - throw new InvalidIteratorVersion("current iterator is belong to the older version of db: " + this.dbVersion - + ", the newest db version: " + this.memoryRawKVStore.getDatabaseVersion()); - } - } } diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/MemoryKVStoreSnapshotFile.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/MemoryKVStoreSnapshotFile.java new file mode 100644 index 000000000..fff5d0c44 --- /dev/null +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/MemoryKVStoreSnapshotFile.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.rhea.storage; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.Map; + +import org.apache.commons.io.FileUtils; + +import com.alipay.sofa.jraft.rhea.errors.StorageException; +import com.alipay.sofa.jraft.rhea.metadata.Region; +import com.alipay.sofa.jraft.rhea.util.ByteArray; +import com.alipay.sofa.jraft.rhea.util.Pair; +import com.alipay.sofa.jraft.rhea.util.RegionHelper; +import com.alipay.sofa.jraft.rhea.util.concurrent.DistributedLock; +import com.alipay.sofa.jraft.util.Bits; + +import static com.alipay.sofa.jraft.entity.LocalFileMetaOutter.LocalFileMeta; + +/** + * + * @author jiachun.fjc + */ +public class MemoryKVStoreSnapshotFile extends AbstractKVStoreSnapshotFile { + + private final MemoryRawKVStore kvStore; + + MemoryKVStoreSnapshotFile(MemoryRawKVStore kvStore) { + this.kvStore = kvStore; + } + + @Override + LocalFileMeta doSnapshotSave(final String snapshotPath, final Region region) throws Exception { + final File file = new File(snapshotPath); + FileUtils.deleteDirectory(file); + FileUtils.forceMkdir(file); + this.kvStore.doSnapshotSave(this, snapshotPath, region); + return buildMetadata(region); + } + + @Override + void doSnapshotLoad(final String snapshotPath, final LocalFileMeta meta, final Region region) throws Exception { + final File file = new File(snapshotPath); + if (!file.exists()) { + throw new StorageException("Snapshot file [" + snapshotPath + "] not exists"); + } + final Region snapshotRegion = readMetadata(meta, Region.class); + if (!RegionHelper.isSameRange(region, snapshotRegion)) { + throw new StorageException("Invalid snapshot region: " + snapshotRegion + ", current region is: " + region); + } + this.kvStore.doSnapshotLoad(this, snapshotPath); + } + + void writeToFile(final String rootPath, final String fileName, final Persistence persist) throws Exception { + final Path path = Paths.get(rootPath, fileName); + try (final FileOutputStream out = new FileOutputStream(path.toFile()); + final BufferedOutputStream bufOutput = new BufferedOutputStream(out)) { + final byte[] bytes = this.serializer.writeObject(persist); + final byte[] lenBytes = new byte[4]; + Bits.putInt(lenBytes, 0, bytes.length); + bufOutput.write(lenBytes); + bufOutput.write(bytes); + bufOutput.flush(); + out.getFD().sync(); + } + } + + T readFromFile(final String rootPath, final String fileName, final Class clazz) throws Exception { + final Path path = Paths.get(rootPath, fileName); + final File file = path.toFile(); + if (!file.exists()) { + throw new NoSuchFieldException(path.toString()); + } + try (final FileInputStream in = new FileInputStream(file); + final BufferedInputStream bufInput = new BufferedInputStream(in)) { + final byte[] lenBytes = new byte[4]; + int read = bufInput.read(lenBytes); + if (read != lenBytes.length) { + throw new IOException("fail to read snapshot file length, expects " + lenBytes.length + + " bytes, but read " + read); + } + final int len = Bits.getInt(lenBytes, 0); + final byte[] bytes = new byte[len]; + read = bufInput.read(bytes); + if (read != bytes.length) { + throw new IOException("fail to read snapshot file, expects " + bytes.length + " bytes, but read " + + read); + } + return this.serializer.readObject(bytes, clazz); + } + } + + static class Persistence { + + private final T data; + + public Persistence(T data) { + this.data = data; + } + + public T data() { + return data; + } + } + + /** + * The data of sequences + */ + static class SequenceDB extends Persistence> { + + public SequenceDB(Map data) { + super(data); + } + } + + /** + * The data of fencing token keys + */ + static class FencingKeyDB extends Persistence> { + + public FencingKeyDB(Map data) { + super(data); + } + } + + /** + * The data of lock info + */ + static class LockerDB extends Persistence> { + + public LockerDB(Map data) { + super(data); + } + } + + /** + * The data will be cut into many small portions, each called a segment + */ + static class Segment extends Persistence>> { + + public Segment(List> data) { + super(data); + } + } + + /** + * The 'tailIndex' records the largest segment number (the segment number starts from 0) + */ + static class TailIndex extends Persistence { + + public TailIndex(Integer data) { + super(data); + } + } +} diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/MemoryRawKVStore.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/MemoryRawKVStore.java index 620fbb423..ba23b4177 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/MemoryRawKVStore.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/MemoryRawKVStore.java @@ -16,104 +16,72 @@ */ package com.alipay.sofa.jraft.rhea.storage; -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.nio.file.Path; -import java.nio.file.Paths; import java.util.Arrays; import java.util.Comparator; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.NavigableMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.alipay.sofa.jraft.rhea.metadata.Region; import com.alipay.sofa.jraft.rhea.options.MemoryDBOptions; -import com.alipay.sofa.jraft.rhea.serialization.Serializer; -import com.alipay.sofa.jraft.rhea.serialization.Serializers; +import com.alipay.sofa.jraft.rhea.storage.MemoryKVStoreSnapshotFile.SequenceDB; import com.alipay.sofa.jraft.rhea.util.ByteArray; import com.alipay.sofa.jraft.rhea.util.Lists; import com.alipay.sofa.jraft.rhea.util.Maps; import com.alipay.sofa.jraft.rhea.util.Pair; +import com.alipay.sofa.jraft.rhea.util.RegionHelper; import com.alipay.sofa.jraft.rhea.util.StackTraceUtil; import com.alipay.sofa.jraft.rhea.util.concurrent.DistributedLock; -import com.alipay.sofa.jraft.util.Bits; import com.alipay.sofa.jraft.util.BytesUtil; import com.codahale.metrics.Timer; -import static com.alipay.sofa.jraft.entity.LocalFileMetaOutter.LocalFileMeta; +import static com.alipay.sofa.jraft.rhea.storage.MemoryKVStoreSnapshotFile.FencingKeyDB; +import static com.alipay.sofa.jraft.rhea.storage.MemoryKVStoreSnapshotFile.LockerDB; +import static com.alipay.sofa.jraft.rhea.storage.MemoryKVStoreSnapshotFile.Segment; +import static com.alipay.sofa.jraft.rhea.storage.MemoryKVStoreSnapshotFile.TailIndex; /** * @author jiachun.fjc */ public class MemoryRawKVStore extends BatchRawKVStore { - private static final Logger LOG = LoggerFactory.getLogger(MemoryRawKVStore.class); + private static final Logger LOG = LoggerFactory.getLogger(MemoryRawKVStore.class); - private static final byte DELIMITER = (byte) ','; - private static final Comparator COMPARATOR = BytesUtil.getDefaultByteArrayComparator(); + private static final byte DELIMITER = (byte) ','; + private static final Comparator COMPARATOR = BytesUtil.getDefaultByteArrayComparator(); - // this rw_lock is for memory_kv_iterator - private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + private final ConcurrentNavigableMap defaultDB = new ConcurrentSkipListMap<>(COMPARATOR); + private final Map sequenceDB = new ConcurrentHashMap<>(); + private final Map fencingKeyDB = new ConcurrentHashMap<>(); + private final Map lockerDB = new ConcurrentHashMap<>(); - private final AtomicLong databaseVersion = new AtomicLong(0); - private final Serializer serializer = Serializers.getDefault(); - - private ConcurrentNavigableMap defaultDB; - private Map sequenceDB; - private Map fencingKeyDB; - private Map lockerDB; - - private MemoryDBOptions opts; + private volatile MemoryDBOptions opts; @Override public boolean init(final MemoryDBOptions opts) { - final Lock writeLock = this.readWriteLock.writeLock(); - writeLock.lock(); - try { - final ConcurrentNavigableMap defaultDB = new ConcurrentSkipListMap<>(COMPARATOR); - final Map sequenceDB = Maps.newHashMap(); - final Map fencingKeyDB = Maps.newHashMap(); - final Map lockerDB = Maps.newHashMap(); - openMemoryDB(opts, defaultDB, sequenceDB, fencingKeyDB, lockerDB); - LOG.info("[MemoryRawKVStore] start successfully, options: {}.", opts); - return true; - } finally { - writeLock.unlock(); - } + this.opts = opts; + LOG.info("[MemoryRawKVStore] start successfully, options: {}.", opts); + return true; } @Override public void shutdown() { - final Lock writeLock = this.readWriteLock.writeLock(); - writeLock.lock(); - try { - closeMemoryDB(); - } finally { - writeLock.unlock(); - } + this.defaultDB.clear(); + this.sequenceDB.clear(); + this.fencingKeyDB.clear(); + this.lockerDB.clear(); } @Override public KVIterator localIterator() { - final Lock readLock = this.readWriteLock.readLock(); - readLock.lock(); - try { - return new MemoryKVIterator(this, this.defaultDB, readLock, this.databaseVersion.get()); - } finally { - readLock.unlock(); - } + return new MemoryKVIterator(this.defaultDB); } @Override @@ -309,8 +277,8 @@ public void putIfAbsent(final byte[] key, final byte[] value, final KVStoreClosu } @Override - public void tryLockWith(final byte[] key, final boolean keepLease, final DistributedLock.Acquirer acquirer, - final KVStoreClosure closure) { + public void tryLockWith(final byte[] key, final byte[] fencingKey, final boolean keepLease, + final DistributedLock.Acquirer acquirer, final KVStoreClosure closure) { final Timer.Context timeCtx = getTimeContext("TRY_LOCK"); try { // The algorithm relies on the assumption that while there is no @@ -349,7 +317,7 @@ public void tryLockWith(final byte[] key, final boolean keepLease, final Distrib // first time to acquire and success .remainingMillis(DistributedLock.OwnerBuilder.FIRST_TIME_SUCCESS) // create a new fencing token - .fencingToken(getNextFencingToken(LOCK_FENCING_KEY)) + .fencingToken(getNextFencingToken(fencingKey)) // init acquires .acquires(1) // set acquirer ctx @@ -388,7 +356,7 @@ public void tryLockWith(final byte[] key, final boolean keepLease, final Distrib // success as a new acquirer .remainingMillis(DistributedLock.OwnerBuilder.NEW_ACQUIRE_SUCCESS) // create a new fencing token - .fencingToken(getNextFencingToken(LOCK_FENCING_KEY)) + .fencingToken(getNextFencingToken(fencingKey)) // init acquires .acquires(1) // set acquirer ctx @@ -539,14 +507,14 @@ public void releaseLockWith(final byte[] key, final DistributedLock.Acquirer acq } } - @SuppressWarnings("SameParameterValue") private long getNextFencingToken(final byte[] fencingKey) { final Timer.Context timeCtx = getTimeContext("FENCING_TOKEN"); try { // Don't worry about the token number overflow. // It takes about 290,000 years for the 1 million TPS system // to use the numbers in the range [0 ~ Long.MAX_VALUE]. - return this.fencingKeyDB.compute(ByteArray.wrap(fencingKey), (key, prevVal) -> { + final byte[] realKey = BytesUtil.nullToEmpty(fencingKey); + return this.fencingKeyDB.compute(ByteArray.wrap(realKey), (key, prevVal) -> { if (prevVal == null) { return 1L; } @@ -635,205 +603,95 @@ public byte[] jumpOver(final byte[] startKey, final long distance) { } @Override - public LocalFileMeta onSnapshotSave(final String snapshotPath) throws Exception { - final File file = new File(snapshotPath); - FileUtils.deleteDirectory(file); - FileUtils.forceMkdir(file); - writeToFile(snapshotPath, "sequenceDB", new SequenceDB(this.sequenceDB)); - writeToFile(snapshotPath, "fencingKeyDB", new FencingKeyDB(this.fencingKeyDB)); - writeToFile(snapshotPath, "lockerDB", new LockerDB(this.lockerDB)); - final int size = this.opts.getKeysPerSegment(); - final List> segment = Lists.newArrayListWithCapacity(size); - int index = 0; - for (final Map.Entry entry : this.defaultDB.entrySet()) { - segment.add(Pair.of(entry.getKey(), entry.getValue())); - if (segment.size() >= size) { - writeToFile(snapshotPath, "segment" + index++, new Segment(segment)); - segment.clear(); - } - } - if (!segment.isEmpty()) { - writeToFile(snapshotPath, "segment" + index++, new Segment(segment)); - } - writeToFile(snapshotPath, "tailIndex", new TailIndex(--index)); - return null; - } - - @Override - public void onSnapshotLoad(final String snapshotPath, final LocalFileMeta meta) throws Exception { - final File file = new File(snapshotPath); - if (!file.exists()) { - LOG.error("Snapshot file [{}] not exists.", snapshotPath); - return; - } - final SequenceDB sequenceDB = readFromFile(snapshotPath, "sequenceDB", SequenceDB.class); - final FencingKeyDB fencingKeyDB = readFromFile(snapshotPath, "fencingKeyDB", FencingKeyDB.class); - final LockerDB lockerDB = readFromFile(snapshotPath, "lockerDB", LockerDB.class); - final TailIndex tailIndex = readFromFile(snapshotPath, "tailIndex", TailIndex.class); - final int tail = tailIndex.data(); - final List segments = Lists.newArrayListWithCapacity(tail + 1); - for (int i = 0; i <= tail; i++) { - final Segment segment = readFromFile(snapshotPath, "segment" + i, Segment.class); - segments.add(segment); - } - final ConcurrentNavigableMap defaultDB = new ConcurrentSkipListMap<>(COMPARATOR); - for (final Segment segment : segments) { - for (final Pair p : segment.data()) { - defaultDB.put(p.getKey(), p.getValue()); - } - } - final Lock writeLock = this.readWriteLock.writeLock(); - writeLock.lock(); + public void initFencingToken(final byte[] parentKey, final byte[] childKey) { + final Timer.Context timeCtx = getTimeContext("INIT_FENCING_TOKEN"); try { - closeMemoryDB(); - openMemoryDB(this.opts, defaultDB, sequenceDB.data(), fencingKeyDB.data(), lockerDB.data()); + final byte[] realKey = BytesUtil.nullToEmpty(parentKey); + final Long parentVal = this.fencingKeyDB.get(ByteArray.wrap(realKey)); + if (parentVal == null) { + return; + } + this.fencingKeyDB.put(ByteArray.wrap(childKey), parentVal); } finally { - writeLock.unlock(); - } - } - - public long getDatabaseVersion() { - return this.databaseVersion.get(); - } - - private void writeToFile(final String rootPath, final String fileName, final Persistence persist) - throws Exception { - final Path path = Paths.get(rootPath, fileName); - try (final FileOutputStream out = new FileOutputStream(path.toFile()); - final BufferedOutputStream bufOutput = new BufferedOutputStream(out)) { - final byte[] bytes = this.serializer.writeObject(persist); - final byte[] lenBytes = new byte[4]; - Bits.putInt(lenBytes, 0, bytes.length); - bufOutput.write(lenBytes); - bufOutput.write(bytes); - bufOutput.flush(); - out.getFD().sync(); + timeCtx.stop(); } } - private T readFromFile(final String rootPath, final String fileName, final Class clazz) throws Exception { - final Path path = Paths.get(rootPath, fileName); - final File file = path.toFile(); - if (!file.exists()) { - throw new NoSuchFieldException(path.toString()); - } - try (final FileInputStream in = new FileInputStream(file); - final BufferedInputStream bufInput = new BufferedInputStream(in)) { - final byte[] lenBytes = new byte[4]; - int read = bufInput.read(lenBytes); - if (read != lenBytes.length) { - throw new IOException("fail to read snapshot file length, expects " + lenBytes.length - + " bytes, but read " + read); + void doSnapshotSave(final MemoryKVStoreSnapshotFile snapshotFile, final String snapshotPath, final Region region) + throws Exception { + final Timer.Context timeCtx = getTimeContext("SNAPSHOT_SAVE"); + try { + snapshotFile.writeToFile(snapshotPath, "sequenceDB", new SequenceDB(subRangeMap(this.sequenceDB, region))); + snapshotFile.writeToFile(snapshotPath, "fencingKeyDB", + new FencingKeyDB(subRangeMap(this.fencingKeyDB, region))); + snapshotFile.writeToFile(snapshotPath, "lockerDB", new LockerDB(subRangeMap(this.lockerDB, region))); + final int size = this.opts.getKeysPerSegment(); + final List> segment = Lists.newArrayListWithCapacity(size); + int index = 0; + final byte[] realStartKey = BytesUtil.nullToEmpty(region.getStartKey()); + final byte[] endKey = region.getEndKey(); + final NavigableMap subMap; + if (endKey == null) { + subMap = this.defaultDB.tailMap(realStartKey); + } else { + subMap = this.defaultDB.subMap(realStartKey, endKey); + } + for (final Map.Entry entry : subMap.entrySet()) { + segment.add(Pair.of(entry.getKey(), entry.getValue())); + if (segment.size() >= size) { + snapshotFile.writeToFile(snapshotPath, "segment" + index++, new Segment(segment)); + segment.clear(); + } } - final int len = Bits.getInt(lenBytes, 0); - final byte[] bytes = new byte[len]; - read = bufInput.read(bytes); - if (read != bytes.length) { - throw new IOException("fail to read snapshot file, expects " + bytes.length + " bytes, but read " - + read); + if (!segment.isEmpty()) { + snapshotFile.writeToFile(snapshotPath, "segment" + index++, new Segment(segment)); } - return this.serializer.readObject(bytes, clazz); - } - } - - private void openMemoryDB(final MemoryDBOptions opts, final ConcurrentNavigableMap defaultDB, - final Map sequenceDB, final Map fencingKeyDB, - final Map lockerDB) { - final Lock writeLock = this.readWriteLock.writeLock(); - writeLock.lock(); - try { - this.databaseVersion.incrementAndGet(); - this.opts = opts; - this.defaultDB = defaultDB; - this.sequenceDB = sequenceDB; - this.fencingKeyDB = fencingKeyDB; - this.lockerDB = lockerDB; + snapshotFile.writeToFile(snapshotPath, "tailIndex", new TailIndex(--index)); } finally { - writeLock.unlock(); + timeCtx.stop(); } } - private void closeMemoryDB() { - final Lock writeLock = this.readWriteLock.writeLock(); - writeLock.lock(); + void doSnapshotLoad(final MemoryKVStoreSnapshotFile snapshotFile, final String snapshotPath) throws Exception { + final Timer.Context timeCtx = getTimeContext("SNAPSHOT_LOAD"); try { - if (this.defaultDB != null) { - this.defaultDB.clear(); - } - if (this.sequenceDB != null) { - this.sequenceDB.clear(); - } - if (this.fencingKeyDB != null) { - this.fencingKeyDB.clear(); + final SequenceDB sequenceDB = snapshotFile.readFromFile(snapshotPath, "sequenceDB", SequenceDB.class); + final FencingKeyDB fencingKeyDB = snapshotFile.readFromFile(snapshotPath, "fencingKeyDB", + FencingKeyDB.class); + final LockerDB lockerDB = snapshotFile.readFromFile(snapshotPath, "lockerDB", LockerDB.class); + + this.sequenceDB.putAll(sequenceDB.data()); + this.fencingKeyDB.putAll(fencingKeyDB.data()); + this.lockerDB.putAll(lockerDB.data()); + + final TailIndex tailIndex = snapshotFile.readFromFile(snapshotPath, "tailIndex", TailIndex.class); + final int tail = tailIndex.data(); + final List segments = Lists.newArrayListWithCapacity(tail + 1); + for (int i = 0; i <= tail; i++) { + final Segment segment = snapshotFile.readFromFile(snapshotPath, "segment" + i, Segment.class); + segments.add(segment); } - if (this.lockerDB != null) { - this.lockerDB.clear(); + for (final Segment segment : segments) { + for (final Pair p : segment.data()) { + this.defaultDB.put(p.getKey(), p.getValue()); + } } } finally { - writeLock.unlock(); - } - } - - static class Persistence { - - private final T data; - - public Persistence(T data) { - this.data = data; - } - - public T data() { - return data; - } - } - - /** - * The data of sequences - */ - static class SequenceDB extends Persistence> { - - public SequenceDB(Map data) { - super(data); - } - } - - /** - * The data of fencing token keys - */ - static class FencingKeyDB extends Persistence> { - - public FencingKeyDB(Map data) { - super(data); - } - } - - /** - * The data of lock info - */ - static class LockerDB extends Persistence> { - - public LockerDB(Map data) { - super(data); + timeCtx.stop(); } } - /** - * The data will be cut into many small portions, each called a segment - */ - static class Segment extends Persistence>> { - - public Segment(List> data) { - super(data); + static Map subRangeMap(final Map input, final Region region) { + if (RegionHelper.isSingleGroup(region)) { + return input; } - } - - /** - * The 'tailIndex' records the largest segment number (the segment number starts from 0) - */ - static class TailIndex extends Persistence { - - public TailIndex(Integer data) { - super(data); + final Map output = new HashMap<>(); + for (final Map.Entry entry : input.entrySet()) { + final ByteArray key = entry.getKey(); + if (RegionHelper.isKeyInRegion(key.getBytes(), region)) { + output.put(key, entry.getValue()); + } } + return output; } } diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/MetricsRawKVStore.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/MetricsRawKVStore.java index c5a67f155..7ee88ee98 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/MetricsRawKVStore.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/MetricsRawKVStore.java @@ -153,11 +153,11 @@ public void putIfAbsent(final byte[] key, final byte[] value, final KVStoreClosu } @Override - public void tryLockWith(final byte[] key, final boolean keepLease, final DistributedLock.Acquirer acquirer, - final KVStoreClosure closure) { + public void tryLockWith(final byte[] key, final byte[] fencingKey, final boolean keepLease, + final DistributedLock.Acquirer acquirer, final KVStoreClosure closure) { // 'keysCount' and 'bytesWritten' can't be provided with exact numbers, but I endured final KVStoreClosure c = metricsAdapter(closure, KEY_LOCK, 2, 0); - this.rawKVStore.tryLockWith(key, keepLease, acquirer, c); + this.rawKVStore.tryLockWith(key, fencingKey, keepLease, acquirer, c); } @Override diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RaftRawKVStore.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RaftRawKVStore.java index 768c13a25..34f0cd371 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RaftRawKVStore.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RaftRawKVStore.java @@ -209,14 +209,14 @@ public void putIfAbsent(final byte[] key, final byte[] value, final KVStoreClosu } @Override - public void tryLockWith(final byte[] key, final boolean keepLease, final DistributedLock.Acquirer acquirer, - final KVStoreClosure closure) { + public void tryLockWith(final byte[] key, final byte[] fencingKey, final boolean keepLease, + final DistributedLock.Acquirer acquirer, final KVStoreClosure closure) { // The algorithm relies on the assumption that while there is no // synchronized clock across the processes, still the local time in // every process flows approximately at the same rate, with an error // which is small compared to the auto-release time of the lock. acquirer.setLockingTimestamp(Clock.defaultClock().getTime()); - applyOperation(KVOperation.createKeyLockRequest(key, Pair.of(keepLease, acquirer)), closure); + applyOperation(KVOperation.createKeyLockRequest(key, fencingKey, Pair.of(keepLease, acquirer)), closure); } @Override diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RawKVStore.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RawKVStore.java index d86f8d6fa..facef0b86 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RawKVStore.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RawKVStore.java @@ -154,8 +154,8 @@ void scan(final byte[] startKey, final byte[] endKey, final int limit, final boo /** * Tries to lock the specified key, must contain a timeout */ - void tryLockWith(final byte[] key, final boolean keepLease, final DistributedLock.Acquirer acquirer, - final KVStoreClosure closure); + void tryLockWith(final byte[] key, final byte[] fencingKey, final boolean keepLease, + final DistributedLock.Acquirer acquirer, final KVStoreClosure closure); /** * Unlock the specified key with lock. diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RocksKVStoreSnapshotFile.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RocksKVStoreSnapshotFile.java new file mode 100644 index 000000000..873bfe01f --- /dev/null +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RocksKVStoreSnapshotFile.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.rhea.storage; + +import com.alipay.sofa.jraft.rhea.errors.StorageException; +import com.alipay.sofa.jraft.rhea.metadata.Region; +import com.alipay.sofa.jraft.rhea.util.RegionHelper; + +import static com.alipay.sofa.jraft.entity.LocalFileMetaOutter.LocalFileMeta; + +/** + * + * @author jiachun.fjc + */ +public class RocksKVStoreSnapshotFile extends AbstractKVStoreSnapshotFile { + + private final RocksRawKVStore kvStore; + + RocksKVStoreSnapshotFile(RocksRawKVStore kvStore) { + this.kvStore = kvStore; + } + + @Override + LocalFileMeta doSnapshotSave(final String snapshotPath, final Region region) throws Exception { + if (RegionHelper.isMultiGroup(region)) { + this.kvStore.writeSstSnapshot(snapshotPath, region); + return buildMetadata(region); + } + if (this.kvStore.isFastSnapshot()) { + this.kvStore.writeSnapshot(snapshotPath); + return null; + } + final RocksDBBackupInfo backupInfo = this.kvStore.backupDB(snapshotPath); + return buildMetadata(backupInfo); + } + + @Override + void doSnapshotLoad(final String snapshotPath, final LocalFileMeta meta, final Region region) throws Exception { + if (RegionHelper.isMultiGroup(region)) { + final Region snapshotRegion = readMetadata(meta, Region.class); + if (!RegionHelper.isSameRange(region, snapshotRegion)) { + throw new StorageException("Invalid snapshot region: " + snapshotRegion + " current region is: " + + region); + } + this.kvStore.readSstSnapshot(snapshotPath); + return; + } + if (this.kvStore.isFastSnapshot()) { + this.kvStore.readSnapshot(snapshotPath); + return; + } + final RocksDBBackupInfo rocksBackupInfo = readMetadata(meta, RocksDBBackupInfo.class); + this.kvStore.restoreBackup(snapshotPath, rocksBackupInfo); + } +} diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RocksRawKVStore.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RocksRawKVStore.java index 07ce7c745..075887015 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RocksRawKVStore.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RocksRawKVStore.java @@ -17,6 +17,8 @@ package com.alipay.sofa.jraft.rhea.storage; import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; @@ -63,6 +65,7 @@ import org.slf4j.LoggerFactory; import com.alipay.sofa.jraft.rhea.errors.StorageException; +import com.alipay.sofa.jraft.rhea.metadata.Region; import com.alipay.sofa.jraft.rhea.options.RocksDBOptions; import com.alipay.sofa.jraft.rhea.rocks.support.RocksStatisticsCollector; import com.alipay.sofa.jraft.rhea.serialization.Serializer; @@ -75,12 +78,10 @@ import com.alipay.sofa.jraft.rhea.util.concurrent.DistributedLock; import com.alipay.sofa.jraft.util.Bits; import com.alipay.sofa.jraft.util.BytesUtil; +import com.alipay.sofa.jraft.util.Requires; import com.alipay.sofa.jraft.util.StorageOptionsFactory; import com.alipay.sofa.jraft.util.SystemPropertyUtil; import com.codahale.metrics.Timer; -import com.google.protobuf.ByteString; - -import static com.alipay.sofa.jraft.entity.LocalFileMetaOutter.LocalFileMeta; /** * Local KV store based on RocksDB @@ -154,10 +155,13 @@ public boolean init(final RocksDBOptions opts) { this.writeOptions = new WriteOptions(); this.writeOptions.setSync(opts.isSync()); this.writeOptions.setDisableWAL(false); + // Delete existing data, relying on raft's snapshot and log playback + // to reply to the data is the correct behavior. + FileUtils.deleteDirectory(new File(opts.getDbPath())); openRocksDB(opts); LOG.info("[RocksRawKVStore] start successfully, options: {}.", opts); return true; - } catch (final RocksDBException e) { + } catch (final Exception e) { LOG.error("Fail to open rocksDB at path {}, {}.", opts.getDbPath(), StackTraceUtil.stackTrace(e)); } finally { writeLock.unlock(); @@ -604,8 +608,8 @@ public void putIfAbsent(final byte[] key, final byte[] value, final KVStoreClosu } @Override - public void tryLockWith(final byte[] key, final boolean keepLease, final DistributedLock.Acquirer acquirer, - final KVStoreClosure closure) { + public void tryLockWith(final byte[] key, final byte[] fencingKey, final boolean keepLease, + final DistributedLock.Acquirer acquirer, final KVStoreClosure closure) { final Timer.Context timeCtx = getTimeContext("TRY_LOCK"); final Lock readLock = this.readWriteLock.readLock(); readLock.lock(); @@ -645,7 +649,7 @@ public void tryLockWith(final byte[] key, final boolean keepLease, final Distrib // first time to acquire and success .remainingMillis(DistributedLock.OwnerBuilder.FIRST_TIME_SUCCESS) // create a new fencing token - .fencingToken(getNextFencingToken(LOCK_FENCING_KEY)) + .fencingToken(getNextFencingToken(fencingKey)) // init acquires .acquires(1) // set acquirer ctx @@ -686,7 +690,7 @@ public void tryLockWith(final byte[] key, final boolean keepLease, final Distrib // success as a new acquirer .remainingMillis(DistributedLock.OwnerBuilder.NEW_ACQUIRE_SUCCESS) // create a new fencing token - .fencingToken(getNextFencingToken(LOCK_FENCING_KEY)) + .fencingToken(getNextFencingToken(fencingKey)) // init acquires .acquires(1) // set acquirer ctx @@ -843,13 +847,13 @@ public void releaseLockWith(final byte[] key, final DistributedLock.Acquirer acq } } - @SuppressWarnings("SameParameterValue") private long getNextFencingToken(final byte[] fencingKey) throws RocksDBException { final Timer.Context timeCtx = getTimeContext("FENCING_TOKEN"); final Lock readLock = this.readWriteLock.readLock(); readLock.lock(); try { - final byte[] prevBytesVal = this.db.get(this.fencingHandle, fencingKey); + final byte[] realKey = BytesUtil.nullToEmpty(fencingKey); + final byte[] prevBytesVal = this.db.get(this.fencingHandle, realKey); final long prevVal; if (prevBytesVal == null) { prevVal = 0; // init @@ -862,7 +866,7 @@ private long getNextFencingToken(final byte[] fencingKey) throws RocksDBExceptio final long newVal = prevVal + 1; final byte[] newBytesVal = new byte[8]; Bits.putLong(newBytesVal, 0, newVal); - this.db.put(this.fencingHandle, this.writeOptions, fencingKey, newBytesVal); + this.db.put(this.fencingHandle, this.writeOptions, realKey, newBytesVal); return newVal; } finally { readLock.unlock(); @@ -1023,23 +1027,22 @@ public byte[] jumpOver(final byte[] startKey, final long distance) { } @Override - public LocalFileMeta onSnapshotSave(final String snapshotPath) throws Exception { - if (this.opts.isFastSnapshot()) { - FileUtils.deleteDirectory(new File(snapshotPath)); - writeSnapshot(snapshotPath); - return null; - } else { - FileUtils.forceMkdir(new File(snapshotPath)); - return backupDB(snapshotPath); - } - } - - @Override - public void onSnapshotLoad(final String snapshotPath, final LocalFileMeta meta) throws Exception { - if (this.opts.isFastSnapshot()) { - readSnapshot(snapshotPath); - } else { - restoreBackup(snapshotPath, meta); + public void initFencingToken(final byte[] parentKey, final byte[] childKey) { + final Timer.Context timeCtx = getTimeContext("INIT_FENCING_TOKEN"); + final Lock readLock = this.readWriteLock.readLock(); + readLock.lock(); + try { + final byte[] realKey = BytesUtil.nullToEmpty(parentKey); + final byte[] parentBytesVal = this.db.get(this.fencingHandle, realKey); + if (parentBytesVal == null) { + return; + } + this.db.put(this.fencingHandle, this.writeOptions, childKey, parentBytesVal); + } catch (final RocksDBException e) { + throw new StorageException("Fail to init fencing token.", e); + } finally { + readLock.unlock(); + timeCtx.stop(); } } @@ -1047,8 +1050,18 @@ public long getDatabaseVersion() { return this.databaseVersion.get(); } - public void createSstFiles(final EnumMap sstFileTable, final byte[] startKey, - final byte[] endKey) { + public void addStatisticsCollectorCallback(final StatisticsCollectorCallback callback) { + final RocksStatisticsCollector collector = Requires.requireNonNull(this.statisticsCollector, + "statisticsCollector"); + final Statistics statistics = Requires.requireNonNull(this.statistics, "statistics"); + collector.addStatsCollectorInput(new StatsCollectorInput(statistics, callback)); + } + + boolean isFastSnapshot() { + return Requires.requireNonNull(this.opts, "opts").isFastSnapshot(); + } + + void createSstFiles(final EnumMap sstFileTable, final byte[] startKey, final byte[] endKey) { final Timer.Context timeCtx = getTimeContext("CREATE_SST_FILE"); final Lock readLock = this.readWriteLock.readLock(); readLock.lock(); @@ -1069,6 +1082,7 @@ public void createSstFiles(final EnumMap sstFileTable, fi it.seek(startKey); } sstFileWriter.open(sstFile.getAbsolutePath()); + long count = 0; for (;;) { if (!it.isValid()) { break; @@ -1078,9 +1092,15 @@ public void createSstFiles(final EnumMap sstFileTable, fi break; } sstFileWriter.put(key, it.value()); + ++count; it.next(); } - sstFileWriter.finish(); + if (count == 0) { + sstFileWriter.close(); + } else { + sstFileWriter.finish(); + } + LOG.info("Finish sst file {} with {} keys.", sstFile, count); } catch (final RocksDBException e) { throw new StorageException("Fail to create sst file at path: " + sstFile, e); } @@ -1095,7 +1115,7 @@ public void createSstFiles(final EnumMap sstFileTable, fi } } - public void ingestSstFiles(final EnumMap sstFileTable) { + void ingestSstFiles(final EnumMap sstFileTable) { final Timer.Context timeCtx = getTimeContext("INGEST_SST_FILE"); final Lock readLock = this.readWriteLock.readLock(); readLock.lock(); @@ -1105,8 +1125,12 @@ public void ingestSstFiles(final EnumMap sstFileTable) { final File sstFile = entry.getValue(); final ColumnFamilyHandle columnFamilyHandle = findColumnFamilyHandle(sstColumnFamily); try (final IngestExternalFileOptions ingestOptions = new IngestExternalFileOptions()) { - final List filePathList = Collections.singletonList(sstFile.getAbsolutePath()); - this.db.ingestExternalFile(columnFamilyHandle, filePathList, ingestOptions); + if (FileUtils.sizeOf(sstFile) == 0L) { + return; + } + final String filePath = sstFile.getAbsolutePath(); + LOG.info("Start ingest sst file {}.", filePath); + this.db.ingestExternalFile(columnFamilyHandle, Collections.singletonList(filePath), ingestOptions); } catch (final RocksDBException e) { throw new StorageException("Fail to ingest sst file at path: " + sstFile, e); } @@ -1117,99 +1141,92 @@ public void ingestSstFiles(final EnumMap sstFileTable) { } } - public void addStatisticsCollectorCallback(final StatisticsCollectorCallback callback) { - if (this.statisticsCollector == null || this.statistics == null) { - throw new IllegalStateException("statistics collector is not running"); - } - this.statisticsCollector.addStatsCollectorInput(new StatsCollectorInput(this.statistics, callback)); - } - - private LocalFileMeta backupDB(final String backupDBPath) { + RocksDBBackupInfo backupDB(final String backupDBPath) throws IOException { final Timer.Context timeCtx = getTimeContext("BACKUP_DB"); + FileUtils.forceMkdir(new File(backupDBPath)); final Lock writeLock = this.readWriteLock.writeLock(); writeLock.lock(); - try (final BackupableDBOptions backupOptions = createBackupDBOptions(backupDBPath); - final BackupEngine backupEngine = BackupEngine.open(this.options.getEnv(), backupOptions)) { + try (final BackupableDBOptions backupOpts = createBackupDBOptions(backupDBPath); + final BackupEngine backupEngine = BackupEngine.open(this.options.getEnv(), backupOpts)) { backupEngine.createNewBackup(this.db, true); final List backupInfoList = backupEngine.getBackupInfo(); if (backupInfoList.isEmpty()) { - LOG.warn("Fail to do backup at {}, empty backup info.", backupDBPath); + LOG.warn("Fail to backup at {}, empty backup info.", backupDBPath); return null; } // chose the backupInfo who has max backupId final BackupInfo backupInfo = Collections.max(backupInfoList, Comparator.comparingInt(BackupInfo::backupId)); final RocksDBBackupInfo rocksBackupInfo = new RocksDBBackupInfo(backupInfo); - final LocalFileMeta.Builder fb = LocalFileMeta.newBuilder(); - fb.setUserMeta(ByteString.copyFrom(this.serializer.writeObject(rocksBackupInfo))); LOG.info("Backup rocksDB into {} with backupInfo {}.", backupDBPath, rocksBackupInfo); - return fb.build(); + return rocksBackupInfo; } catch (final RocksDBException e) { - throw new StorageException("Fail to do backup at path: " + backupDBPath, e); + throw new StorageException("Fail to backup at path: " + backupDBPath, e); } finally { writeLock.unlock(); timeCtx.stop(); } } - private void restoreBackup(final String backupDBPath, final LocalFileMeta meta) { + void restoreBackup(final String backupDBPath, final RocksDBBackupInfo rocksBackupInfo) { final Timer.Context timeCtx = getTimeContext("RESTORE_BACKUP"); final Lock writeLock = this.readWriteLock.writeLock(); writeLock.lock(); closeRocksDB(); - try (final BackupableDBOptions options = createBackupDBOptions(backupDBPath); - final RestoreOptions restoreOptions = new RestoreOptions(false); - final BackupEngine backupEngine = BackupEngine.open(this.options.getEnv(), options)) { - final ByteString userMeta = meta.getUserMeta(); - final RocksDBBackupInfo rocksBackupInfo = this.serializer.readObject(userMeta.toByteArray(), - RocksDBBackupInfo.class); + try (final BackupableDBOptions backupOpts = createBackupDBOptions(backupDBPath); + final BackupEngine backupEngine = BackupEngine.open(this.options.getEnv(), backupOpts); + final RestoreOptions restoreOpts = new RestoreOptions(false)) { final String dbPath = this.opts.getDbPath(); - backupEngine.restoreDbFromBackup(rocksBackupInfo.getBackupId(), dbPath, dbPath, restoreOptions); + backupEngine.restoreDbFromBackup(rocksBackupInfo.getBackupId(), dbPath, dbPath, restoreOpts); LOG.info("Restored rocksDB from {} with {}.", backupDBPath, rocksBackupInfo); // reopen the db openRocksDB(this.opts); } catch (final RocksDBException e) { - throw new StorageException("Fail to do restore from path: " + backupDBPath, e); + throw new StorageException("Fail to restore from path: " + backupDBPath, e); } finally { writeLock.unlock(); timeCtx.stop(); } } - private void writeSnapshot(final String snapshotPath) { + void writeSnapshot(final String snapshotPath) { final Timer.Context timeCtx = getTimeContext("WRITE_SNAPSHOT"); final Lock writeLock = this.readWriteLock.writeLock(); writeLock.lock(); try (final Checkpoint checkpoint = Checkpoint.create(this.db)) { - final File tempFile = new File(snapshotPath); - if (tempFile.exists()) { - FileUtils.deleteDirectory(tempFile); + final String tempPath = snapshotPath + "_temp"; + final File tempFile = new File(tempPath); + FileUtils.deleteDirectory(tempFile); + checkpoint.createCheckpoint(tempPath); + final File snapshotFile = new File(snapshotPath); + FileUtils.deleteDirectory(snapshotFile); + if (!tempFile.renameTo(snapshotFile)) { + throw new StorageException("Fail to rename [" + tempPath + "] to [" + snapshotPath + "]."); } - checkpoint.createCheckpoint(snapshotPath); + } catch (final StorageException e) { + throw e; } catch (final Exception e) { - throw new StorageException("Fail to do write snapshot at path: " + snapshotPath, e); + throw new StorageException("Fail to write snapshot at path: " + snapshotPath, e); } finally { writeLock.unlock(); timeCtx.stop(); } } - private void readSnapshot(final String snapshotPath) { + void readSnapshot(final String snapshotPath) { final Timer.Context timeCtx = getTimeContext("READ_SNAPSHOT"); final Lock writeLock = this.readWriteLock.writeLock(); writeLock.lock(); try { - final File file = new File(snapshotPath); - if (!file.exists()) { + final File snapshotFile = new File(snapshotPath); + if (!snapshotFile.exists()) { LOG.error("Snapshot file [{}] not exists.", snapshotPath); return; } closeRocksDB(); final String dbPath = this.opts.getDbPath(); final File dbFile = new File(dbPath); - if (dbFile.exists()) { - FileUtils.deleteDirectory(dbFile); - } - if (!file.renameTo(new File(dbPath))) { + FileUtils.deleteDirectory(dbFile); + if (!snapshotFile.renameTo(dbFile)) { throw new StorageException("Fail to rename [" + snapshotPath + "] to [" + dbPath + "]."); } // reopen the db @@ -1222,6 +1239,55 @@ private void readSnapshot(final String snapshotPath) { } } + void writeSstSnapshot(final String snapshotPath, final Region region) { + final Timer.Context timeCtx = getTimeContext("WRITE_SST_SNAPSHOT"); + final Lock readLock = this.readWriteLock.readLock(); + readLock.lock(); + try { + final String tempPath = snapshotPath + "_temp"; + final File tempFile = new File(tempPath); + FileUtils.deleteDirectory(tempFile); + FileUtils.forceMkdir(tempFile); + + final EnumMap sstFileTable = getSstFileTable(tempPath); + createSstFiles(sstFileTable, region.getStartKey(), region.getEndKey()); + final File snapshotFile = new File(snapshotPath); + FileUtils.deleteDirectory(snapshotFile); + if (!tempFile.renameTo(snapshotFile)) { + throw new StorageException("Fail to rename [" + tempPath + "] to [" + snapshotPath + "]."); + } + } catch (final Exception e) { + throw new StorageException("Fail to do read sst snapshot at path: " + snapshotPath, e); + } finally { + readLock.unlock(); + timeCtx.stop(); + } + } + + void readSstSnapshot(final String snapshotPath) { + final Timer.Context timeCtx = getTimeContext("READ_SST_SNAPSHOT"); + final Lock readLock = this.readWriteLock.readLock(); + readLock.lock(); + try { + final EnumMap sstFileTable = getSstFileTable(snapshotPath); + ingestSstFiles(sstFileTable); + } catch (final Exception e) { + throw new StorageException("Fail to write sst snapshot at path: " + snapshotPath, e); + } finally { + readLock.unlock(); + timeCtx.stop(); + } + } + + private EnumMap getSstFileTable(final String path) { + final EnumMap sstFileTable = new EnumMap<>(SstColumnFamily.class); + sstFileTable.put(SstColumnFamily.DEFAULT, Paths.get(path, "default.sst").toFile()); + sstFileTable.put(SstColumnFamily.SEQUENCE, Paths.get(path, "sequence.sst").toFile()); + sstFileTable.put(SstColumnFamily.LOCKING, Paths.get(path, "locking.sst").toFile()); + sstFileTable.put(SstColumnFamily.FENCING, Paths.get(path, "fencing.sst").toFile()); + return sstFileTable; + } + private ColumnFamilyHandle findColumnFamilyHandle(final SstColumnFamily sstColumnFamily) { switch (sstColumnFamily) { case DEFAULT: diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/util/RegionHelper.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/util/RegionHelper.java new file mode 100644 index 000000000..6aade9c2a --- /dev/null +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/util/RegionHelper.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.rhea.util; + +import com.alipay.sofa.jraft.rhea.metadata.Region; +import com.alipay.sofa.jraft.util.BytesUtil; + +/** + * + * @author jiachun.fjc + */ +public final class RegionHelper { + + public static boolean isSingleGroup(final Region region) { + return BytesUtil.nullToEmpty(region.getStartKey()).length == 0 + && BytesUtil.nullToEmpty(region.getEndKey()).length == 0; + } + + public static boolean isMultiGroup(final Region region) { + return !isSingleGroup(region); + } + + public static boolean isSameRange(final Region r1, final Region r2) { + if (BytesUtil.compare(BytesUtil.nullToEmpty(r1.getStartKey()), BytesUtil.nullToEmpty(r2.getStartKey())) != 0) { + return false; + } + return BytesUtil.compare(BytesUtil.nullToEmpty(r1.getEndKey()), BytesUtil.nullToEmpty(r2.getEndKey())) == 0; + } + + public static boolean isKeyInRegion(final byte[] key, final Region region) { + final byte[] startKey = BytesUtil.nullToEmpty(region.getStartKey()); + if (BytesUtil.compare(key, startKey) < 0) { + return false; + } + final byte[] endKey = region.getEndKey(); + return endKey == null || BytesUtil.compare(key, endKey) < 0; + } + + private RegionHelper() { + } +} diff --git a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/benchmark/raw/BaseRawStoreBenchmark.java b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/benchmark/raw/BaseRawStoreBenchmark.java index d87f48d54..96a98d762 100644 --- a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/benchmark/raw/BaseRawStoreBenchmark.java +++ b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/benchmark/raw/BaseRawStoreBenchmark.java @@ -20,10 +20,6 @@ import java.io.IOException; import org.apache.commons.io.FileUtils; -import org.rocksdb.HistogramData; -import org.rocksdb.HistogramType; -import org.rocksdb.StatisticsCollectorCallback; -import org.rocksdb.TickerType; import com.alipay.sofa.jraft.rhea.options.RocksDBOptions; import com.alipay.sofa.jraft.rhea.storage.RocksRawKVStore; @@ -42,23 +38,7 @@ protected void setup() throws Exception { this.dbOptions = new RocksDBOptions(); this.dbOptions.setDbPath(this.tempPath); this.dbOptions.setSync(false); - this.dbOptions.setStatisticsCallbackIntervalSeconds(10); this.kvStore.init(this.dbOptions); - StatisticsCollectorCallback callback = new StatisticsCollectorCallback() { - - @Override - public void tickerCallback(TickerType c, long tickerCount) { - System.out.print(c + " "); - System.out.println(tickerCount); - } - - @Override - public void histogramCallback(HistogramType histType, HistogramData histData) { - System.out.print(histType + " "); - System.out.println(histData.getAverage()); - } - }; - this.kvStore.addStatisticsCollectorCallback(callback); } protected File getTempDir() throws IOException { diff --git a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/benchmark/raw/SnapshotBenchmark.java b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/benchmark/raw/SnapshotBenchmark.java index f5241c59a..719ff672c 100644 --- a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/benchmark/raw/SnapshotBenchmark.java +++ b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/benchmark/raw/SnapshotBenchmark.java @@ -19,15 +19,21 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.nio.file.Paths; import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.zip.ZipOutputStream; import org.apache.commons.io.FileUtils; import com.alipay.sofa.jraft.entity.LocalFileMetaOutter; -import com.alipay.sofa.jraft.rhea.options.RocksDBOptions; +import com.alipay.sofa.jraft.rhea.metadata.Region; import com.alipay.sofa.jraft.rhea.storage.KVEntry; +import com.alipay.sofa.jraft.rhea.storage.KVStoreAccessHelper; import com.alipay.sofa.jraft.rhea.storage.RocksRawKVStore; import com.alipay.sofa.jraft.rhea.util.Lists; import com.alipay.sofa.jraft.rhea.util.ZipUtil; @@ -67,8 +73,8 @@ public void tearDown() { public void put() { final List batch = Lists.newArrayListWithCapacity(100); - for (int i = 0; i < KEY_COUNT * 100; i++) { - byte[] key = BytesUtil.writeUtf8("benchmark_" + i); + for (int i = 0; i < KEY_COUNT * 10; i++) { + final byte[] key = BytesUtil.writeUtf8("benchmark_" + i); batch.add(new KVEntry(key, VALUE_BYTES)); if (batch.size() >= 100) { this.kvStore.put(batch, null); @@ -77,92 +83,148 @@ public void put() { } } - /* - 100 million keys, the time unit is milliseconds - - slow save snapshot time cost: 8265 - slow compress time cost: 46517 - slow load snapshot time cost: 21907 - - slow save snapshot time cost: 7424 - slow compress time cost: 45040 - slow load snapshot time cost: 19257 - - slow save snapshot time cost: 7025 - slow compress time cost: 44410 - slow load snapshot time cost: 20087 - - ----------------------------------------------------- - - fast save snapshot time cost: 742 - fast compress time cost: 37548 - fast load snapshot time cost: 13100 - - fast save snapshot time cost: 743 - fast compress time cost: 43864 - fast load snapshot time cost: 14176 - - fast save snapshot time cost: 755 - fast compress time cost: 45789 - fast load snapshot time cost: 14308 + /** + ----------------------------------------------- + db size = 10000000 + slow save snapshot time cost: 2552 + slow compressed file size: 41915298 + slow compress time cost: 9173 + slow load snapshot time cost: 5119 + ----------------------------------------------- + db size = 10000000 + fast save snapshot time cost: 524 + fast compressed file size: 41920248 + fast compress time cost: 8807 + fast load snapshot time cost: 3090 + ----------------------------------------------- + db size = 10000000 + sst save snapshot time cost: 4296 + sst compressed file size: 10741032 + sst compress time cost: 2005 + sst load snapshot time cost: 593 + ----------------------------------------------- + db size = 10000000 + slow save snapshot time cost: 2248 + slow compressed file size: 41918551 + slow compress time cost: 8705 + slow load snapshot time cost: 4485 + ----------------------------------------------- + db size = 10000000 + fast save snapshot time cost: 508 + fast compressed file size: 41914702 + fast compress time cost: 8736 + fast load snapshot time cost: 3047 + ----------------------------------------------- + db size = 10000000 + sst save snapshot time cost: 4206 + sst compressed file size: 10741032 + sst compress time cost: 1950 + sst load snapshot time cost: 599 + ----------------------------------------------- + db size = 10000000 + slow save snapshot time cost: 2327 + slow compressed file size: 41916640 + slow compress time cost: 8643 + slow load snapshot time cost: 4590 + ----------------------------------------------- + db size = 10000000 + fast save snapshot time cost: 511 + fast compressed file size: 41914533 + fast compress time cost: 8704 + fast load snapshot time cost: 3013 + ----------------------------------------------- + db size = 10000000 + sst save snapshot time cost: 4253 + sst compressed file size: 10741032 + sst compress time cost: 1947 + sst load snapshot time cost: 590 + ----------------------------------------------- */ public static void main(String[] args) throws IOException { for (int i = 0; i < 3; i++) { SnapshotBenchmark snapshot = new SnapshotBenchmark(); snapshot.setup(); - snapshot.snapshot(false); + snapshot.snapshot(false, false); snapshot.tearDown(); - } - for (int i = 0; i < 3; i++) { - SnapshotBenchmark snapshot = new SnapshotBenchmark(); + snapshot = new SnapshotBenchmark(); snapshot.setup(); - snapshot.snapshot(true); + snapshot.snapshot(false, true); + snapshot.tearDown(); + + snapshot = new SnapshotBenchmark(); + snapshot.setup(); + snapshot.snapshot(true, true); snapshot.tearDown(); } } - public void snapshot(boolean isFastSnapshot) throws IOException { + public void snapshot(final boolean isSstSnapshot, final boolean isFastSnapshot) throws IOException { final File backupDir = new File("backup"); if (backupDir.exists()) { FileUtils.deleteDirectory(backupDir); } FileUtils.forceMkdir(backupDir); - final LocalFileMetaOutter.LocalFileMeta meta = doSnapshotSave(backupDir.getAbsolutePath(), isFastSnapshot); + final LocalFileMetaOutter.LocalFileMeta meta = doSnapshotSave(backupDir.getAbsolutePath(), isSstSnapshot, + isFastSnapshot); this.kvStore.shutdown(); FileUtils.deleteDirectory(new File(this.tempPath)); FileUtils.forceMkdir(new File(this.tempPath)); this.kvStore = new RocksRawKVStore(); - final RocksDBOptions dbOpts = new RocksDBOptions(); - dbOpts.setDbPath(this.tempPath); - this.kvStore.init(dbOpts); + this.kvStore.init(this.dbOptions); final long loadStart = System.nanoTime(); doSnapshotLoad(backupDir.getAbsolutePath(), meta, isFastSnapshot); - System.out.println((isFastSnapshot ? "fast" : "slow") + " load snapshot time cost: " + final String name; + if (isSstSnapshot) { + name = "sst"; + } else { + if (isFastSnapshot) { + name = "fast"; + } else { + name = "slow"; + } + } + System.out.println(name + " load snapshot time cost: " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - loadStart)); FileUtils.deleteDirectory(backupDir); } - private LocalFileMetaOutter.LocalFileMeta doSnapshotSave(final String path, final boolean isFastSnapshot) { - final String snapshotPath = path + File.separator + SNAPSHOT_DIR; + private LocalFileMetaOutter.LocalFileMeta doSnapshotSave(final String path, final boolean isSstSnapshot, + final boolean isFastSnapshot) { + final String snapshotPath = Paths.get(path, SNAPSHOT_DIR).toString(); try { final long saveStart = System.nanoTime(); - final LocalFileMetaOutter.LocalFileMeta meta; - if (isFastSnapshot) { - doFastSnapshotSave(snapshotPath); - meta = null; + LocalFileMetaOutter.LocalFileMeta meta = null; + if (isSstSnapshot) { + doSstSnapshotSave(snapshotPath); } else { - meta = doSlowSnapshotSave(snapshotPath); + if (isFastSnapshot) { + doFastSnapshotSave(snapshotPath); + } else { + meta = doSlowSnapshotSave(snapshotPath); + } } - System.out.println((isFastSnapshot ? "fast" : "slow") + " save snapshot time cost: " + final String name; + if (isSstSnapshot) { + name = "sst"; + } else { + if (isFastSnapshot) { + name = "fast"; + } else { + name = "slow"; + } + } + System.out.println(name + " save snapshot time cost: " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - saveStart)); final long compressStart = System.nanoTime(); doCompressSnapshot(path); - System.out.println((isFastSnapshot ? "fast" : "slow") + " compress time cost: " + System.out.println(name + " compressed file size: " + + FileUtils.sizeOf(Paths.get(path, SNAPSHOT_ARCHIVE).toFile())); + System.out.println(name + " compress time cost: " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - compressStart)); return meta; } catch (final Throwable t) { @@ -173,12 +235,14 @@ private LocalFileMetaOutter.LocalFileMeta doSnapshotSave(final String path, fina public boolean doSnapshotLoad(final String path, final LocalFileMetaOutter.LocalFileMeta meta, final boolean isFastSnapshot) { + final String sourceFile = Paths.get(path, SNAPSHOT_ARCHIVE).toString(); + final String snapshotPath = Paths.get(path, SNAPSHOT_DIR).toString(); try { - ZipUtil.unzipFile(path + File.separator + SNAPSHOT_ARCHIVE, path); + ZipUtil.unzipFile(sourceFile, path); if (isFastSnapshot) { - doFastSnapshotLoad(path + File.separator + SNAPSHOT_DIR); + doFastSnapshotLoad(snapshotPath); } else { - doSlowSnapshotLoad(path + File.separator + SNAPSHOT_DIR, meta); + doSlowSnapshotLoad(snapshotPath, meta); } return true; } catch (final Throwable t) { @@ -189,18 +253,54 @@ public boolean doSnapshotLoad(final String path, final LocalFileMetaOutter.Local private void doFastSnapshotSave(final String snapshotPath) throws Exception { this.dbOptions.setFastSnapshot(true); - this.kvStore.onSnapshotSave(snapshotPath); + final Region region = new Region(); + KVStoreAccessHelper.saveSnapshot(this.kvStore, snapshotPath, region); } private LocalFileMetaOutter.LocalFileMeta doSlowSnapshotSave(final String snapshotPath) throws Exception { this.dbOptions.setFastSnapshot(false); - return this.kvStore.onSnapshotSave(snapshotPath); + final Region region = new Region(); + return KVStoreAccessHelper.saveSnapshot(this.kvStore, snapshotPath, region); + } + + private void doSstSnapshotSave(final String snapshotPath) throws Exception { + FileUtils.forceMkdir(new File(snapshotPath)); + final List regions = Lists.newArrayList(); + for (int i = 0; i < 10; i++) { + final Region r = new Region(); + r.setId(i); + r.setStartKey(BytesUtil.writeUtf8("benchmark_" + i)); + if (i < 9) { + r.setEndKey(BytesUtil.writeUtf8("benchmark_" + (i + 1))); + } + regions.add(r); + } + final ExecutorService executor = Executors.newFixedThreadPool(10); + final List> futures = Lists.newArrayList(); + for (final Region r : regions) { + final Future f = executor.submit(() -> { + try { + KVStoreAccessHelper.saveSnapshot(this.kvStore, Paths.get(snapshotPath, String.valueOf(r.getId())).toString(), r); + } catch (final Exception e) { + e.printStackTrace(); + } + }); + futures.add(f); + } + for (final Future f : futures) { + try { + f.get(); + } catch (final InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + } + executor.shutdownNow(); } private void doCompressSnapshot(final String path) { + final String outputFile = Paths.get(path, SNAPSHOT_ARCHIVE).toString(); try { - try (final ZipOutputStream out = new ZipOutputStream(new FileOutputStream(path + File.separator - + SNAPSHOT_ARCHIVE))) { + try (final ZipOutputStream out = new ZipOutputStream(new FileOutputStream(outputFile))) { ZipUtil.compressDirectoryToZipFile(path, SNAPSHOT_DIR, out); } } catch (final Throwable t) { @@ -211,8 +311,9 @@ private void doCompressSnapshot(final String path) { private void doFastSnapshotLoad(final String snapshotPath) { try { this.dbOptions.setFastSnapshot(true); - this.kvStore.onSnapshotLoad(snapshotPath, null); - } catch (Exception e) { + final Region region = new Region(); + KVStoreAccessHelper.loadSnapshot(this.kvStore, snapshotPath, null, region); + } catch (final Exception e) { e.printStackTrace(); } } @@ -220,9 +321,43 @@ private void doFastSnapshotLoad(final String snapshotPath) { private void doSlowSnapshotLoad(final String snapshotPath, final LocalFileMetaOutter.LocalFileMeta meta) { try { this.dbOptions.setFastSnapshot(false); - this.kvStore.onSnapshotLoad(snapshotPath, meta); - } catch (Exception e) { + final Region region = new Region(); + KVStoreAccessHelper.loadSnapshot(this.kvStore, snapshotPath, meta, region); + } catch (final Exception e) { e.printStackTrace(); } } + + private void doSstSnapshotLoad(final String snapshotPath) { + final List regions = Lists.newArrayList(); + for (int i = 0; i < 10; i++) { + final Region r = new Region(); + r.setId(i); + r.setStartKey(BytesUtil.writeUtf8("benchmark_" + i)); + if (i < 9) { + r.setEndKey(BytesUtil.writeUtf8("benchmark_" + (i + 1))); + } + regions.add(r); + } + final ExecutorService executor = Executors.newFixedThreadPool(10); + final List> futures = Lists.newArrayList(); + for (final Region r : regions) { + final Future f = executor.submit(() -> { + try { + KVStoreAccessHelper.loadSnapshot(kvStore, Paths.get(snapshotPath, String.valueOf(r.getId())).toString(), null, r); + } catch (final Exception e) { + e.printStackTrace(); + } + }); + futures.add(f); + } + for (final Future f : futures) { + try { + f.get(); + } catch (final InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + } + executor.shutdownNow(); + } } diff --git a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/KVStoreAccessHelper.java b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/KVStoreAccessHelper.java new file mode 100644 index 000000000..a0e72b3b2 --- /dev/null +++ b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/KVStoreAccessHelper.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.rhea.storage; + +import java.io.File; +import java.util.EnumMap; + +import com.alipay.sofa.jraft.entity.LocalFileMetaOutter.LocalFileMeta; +import com.alipay.sofa.jraft.rhea.metadata.Region; + +/** + * KV store test helper + * + * @author jiachun.fjc + */ +public final class KVStoreAccessHelper { + + public static void createSstFiles(final RocksRawKVStore store, final EnumMap sstFileTable, + final byte[] startKey, final byte[] endKey) { + store.createSstFiles(sstFileTable, startKey, endKey); + } + + public static void ingestSstFiles(final RocksRawKVStore store, final EnumMap sstFileTable) { + store.ingestSstFiles(sstFileTable); + } + + public static LocalFileMeta saveSnapshot(final BaseRawKVStore store, final String snapshotPath, + final Region region) throws Exception { + final KVStoreSnapshotFile snapshotFile = KVStoreSnapshotFileFactory.getKVStoreSnapshotFile(store); + return ((AbstractKVStoreSnapshotFile) snapshotFile).doSnapshotSave(snapshotPath, region); + } + + public static void loadSnapshot(final BaseRawKVStore store, final String snapshotPath, final LocalFileMeta meta, + final Region region) throws Exception { + final KVStoreSnapshotFile snapshotFile = KVStoreSnapshotFileFactory.getKVStoreSnapshotFile(store); + ((AbstractKVStoreSnapshotFile) snapshotFile).doSnapshotLoad(snapshotPath, meta, region); + } +} diff --git a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/LocalLock.java b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/LocalLock.java index ad9cdece7..00cf12c1d 100644 --- a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/LocalLock.java +++ b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/LocalLock.java @@ -47,7 +47,7 @@ protected Owner internalTryLock(final byte[] ctx) { final Acquirer acquirer = getAcquirer(); acquirer.setContext(ctx); final KVStoreClosure closure = new TestClosure(); - this.rawKVStore.tryLockWith(internalKey, false, acquirer, closure); + this.rawKVStore.tryLockWith(internalKey, internalKey, false, acquirer, closure); final Owner owner = (Owner) closure.getData(); updateOwnerAndAcquirer(owner); return owner; diff --git a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/memorydb/MemoryKVStoreTest.java b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/memorydb/MemoryKVStoreTest.java index b667b8c88..5a45e6f3f 100644 --- a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/memorydb/MemoryKVStoreTest.java +++ b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/memorydb/MemoryKVStoreTest.java @@ -18,6 +18,9 @@ import java.io.File; import java.io.FileOutputStream; +import java.lang.reflect.Method; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -29,9 +32,11 @@ import org.junit.Before; import org.junit.Test; +import com.alipay.sofa.jraft.rhea.metadata.Region; import com.alipay.sofa.jraft.rhea.options.MemoryDBOptions; import com.alipay.sofa.jraft.rhea.storage.KVEntry; import com.alipay.sofa.jraft.rhea.storage.KVIterator; +import com.alipay.sofa.jraft.rhea.storage.KVStoreAccessHelper; import com.alipay.sofa.jraft.rhea.storage.KVStoreClosure; import com.alipay.sofa.jraft.rhea.storage.LocalLock; import com.alipay.sofa.jraft.rhea.storage.MemoryRawKVStore; @@ -104,7 +109,6 @@ public void execute(RawKVStore kvStore, KVStoreClosure closure) { /** * Test method: {@link MemoryRawKVStore#multiGet(List, KVStoreClosure)} */ - @SuppressWarnings("unchecked") @Test public void multiGetTest() { final List keyList = Lists.newArrayList(); @@ -234,7 +238,6 @@ public void getLocalIteratorTest() { /** * Test method: {@link MemoryRawKVStore#scan(byte[], byte[], KVStoreClosure)} */ - @SuppressWarnings("unchecked") @Test public void scanTest() { final List keyList = Lists.newArrayList(); @@ -356,7 +359,6 @@ public void execute(RawKVStore kvStore, KVStoreClosure closure) { /** * Test method: {@link MemoryRawKVStore#put(List, KVStoreClosure)} */ - @SuppressWarnings("unchecked") @Test public void putListTest() { final List entries = Lists.newArrayList(); @@ -392,7 +394,7 @@ public void putIfAbsent() { } /** - * Test method: {@link MemoryRawKVStore#tryLockWith(byte[], boolean, DistributedLock.Acquirer, KVStoreClosure)} + * Test method: {@link MemoryRawKVStore#tryLockWith(byte[], byte[], boolean, DistributedLock.Acquirer, KVStoreClosure)} */ @Test public void tryLockWith() throws InterruptedException { @@ -488,8 +490,11 @@ public void snapshotTest() throws Exception { final String v = String.valueOf(i); this.kvStore.put(makeKey(v), makeValue(v), null); } - - final LocalFileMeta meta = doSnapshotSave(backupDir.getAbsolutePath()); + for (int i = 0; i < 10000; i++) { + this.kvStore.getSequence(makeKey((i % 100) + "seq_test"), 10, null); + } + final Region region = new Region(); + final LocalFileMeta meta = doSnapshotSave(backupDir.getAbsolutePath(), region); assertNotNull(get(makeKey("1"))); @@ -503,7 +508,7 @@ public void snapshotTest() throws Exception { assertNull(get(makeKey("1"))); - doSnapshotLoad(backupDir.getAbsolutePath(), meta); + doSnapshotLoad(backupDir.getAbsolutePath(), meta, region); for (int i = 0; i < 100000; i++) { final String v = String.valueOf(i); @@ -516,10 +521,75 @@ public void snapshotTest() throws Exception { FileUtils.deleteDirectory(backupDir); } - private LocalFileMeta doSnapshotSave(final String path) { - final String snapshotPath = path + File.separator + SNAPSHOT_DIR; + @Test + public void multiGroupSnapshotTest() throws Exception { + final File backupDir = new File("multi-backup"); + if (backupDir.exists()) { + FileUtils.deleteDirectory(backupDir); + } + + final List regions = Lists.newArrayList(); + final List metas = Lists.newArrayList(); + regions.add(new Region(1, makeKey("0"), makeKey("1"), null, null)); + regions.add(new Region(2, makeKey("1"), makeKey("2"), null, null)); + regions.add(new Region(3, makeKey("2"), makeKey("3"), null, null)); + regions.add(new Region(4, makeKey("3"), makeKey("4"), null, null)); + regions.add(new Region(5, makeKey("4"), makeKey("5"), null, null)); + + for (int i = 0; i < 5; i++) { + final String v = String.valueOf(i); + this.kvStore.put(makeKey(v), makeValue(v), null); + } + for (int i = 0; i < 5; i++) { + this.kvStore.getSequence(makeKey(i + "_seq_test"), 10, null); + } + + for (int i = 0; i < 4; i++) { + final Path p = Paths.get(backupDir.getAbsolutePath(), String.valueOf(i)); + final LocalFileMeta meta = doSnapshotSave(p.toString(), regions.get(i)); + metas.add(meta); + } + + this.kvStore.shutdown(); + this.kvStore = new MemoryRawKVStore(); + final MemoryDBOptions dbOpts = new MemoryDBOptions(); + this.kvStore.init(dbOpts); + + for (int i = 0; i < 4; i++) { + final Path p = Paths.get(backupDir.getAbsolutePath(), String.valueOf(i)); + doSnapshotLoad(p.toString(), metas.get(i), regions.get(i)); + } + + for (int i = 0; i < 4; i++) { + final String v = String.valueOf(i); + final byte[] seqKey = makeKey(i + "_seq_test"); + assertArrayEquals(makeValue(v), get(makeKey(v))); + final Sequence sequence = new SyncKVStore() { + @Override + public void execute(RawKVStore kvStore, KVStoreClosure closure) { + kvStore.getSequence(seqKey, 10, closure); + } + }.apply(this.kvStore); + assertEquals(10L, sequence.getStartValue()); + assertEquals(20L, sequence.getEndValue()); + } + + assertNull(get(makeKey("5"))); + final Sequence sequence = new SyncKVStore() { + @Override + public void execute(RawKVStore kvStore, KVStoreClosure closure) { + kvStore.getSequence(makeKey("4_seq_test"), 10, closure); + } + }.apply(this.kvStore); + assertEquals(0L, sequence.getStartValue()); + + FileUtils.deleteDirectory(backupDir); + } + + private LocalFileMeta doSnapshotSave(final String path, final Region region) { + final String snapshotPath = Paths.get(path, SNAPSHOT_DIR).toString(); try { - final LocalFileMeta meta = this.kvStore.onSnapshotSave(snapshotPath); + final LocalFileMeta meta = KVStoreAccessHelper.saveSnapshot(this.kvStore, snapshotPath, region); doCompressSnapshot(path); return meta; } catch (final Throwable t) { @@ -528,10 +598,12 @@ private LocalFileMeta doSnapshotSave(final String path) { return null; } - public boolean doSnapshotLoad(final String path, final LocalFileMeta meta) { + public boolean doSnapshotLoad(final String path, final LocalFileMeta meta, final Region region) { + final String sourceFile = Paths.get(path, SNAPSHOT_ARCHIVE).toString(); + final String snapshotPath = Paths.get(path, SNAPSHOT_DIR).toString(); try { - ZipUtil.unzipFile(path + File.separator + SNAPSHOT_ARCHIVE, path); - this.kvStore.onSnapshotLoad(path + File.separator + SNAPSHOT_DIR, meta); + ZipUtil.unzipFile(sourceFile, path); + KVStoreAccessHelper.loadSnapshot(this.kvStore, snapshotPath, meta, region); return true; } catch (final Throwable t) { t.printStackTrace(); @@ -540,9 +612,9 @@ public boolean doSnapshotLoad(final String path, final LocalFileMeta meta) { } private void doCompressSnapshot(final String path) { + final String outputFile = Paths.get(path, SNAPSHOT_ARCHIVE).toString(); try { - try (final ZipOutputStream out = new ZipOutputStream(new FileOutputStream(path + File.separator - + SNAPSHOT_ARCHIVE))) { + try (final ZipOutputStream out = new ZipOutputStream(new FileOutputStream(outputFile))) { ZipUtil.compressDirectoryToZipFile(path, SNAPSHOT_DIR, out); } } catch (final Throwable t) { @@ -578,4 +650,23 @@ public void jumpOverTest() { final long approximateKeys = this.kvStore.getApproximateKeysInRange(makeKey("approximate_test0000"), endKey); assertEquals(1000, approximateKeys, 10); } + + @Test + public void initFencingTokenTest() throws Exception { + final Method getNextFencingMethod = MemoryRawKVStore.class.getDeclaredMethod("getNextFencingToken", + byte[].class); + getNextFencingMethod.setAccessible(true); + final List parentKeys = Lists.newArrayList(); + parentKeys.add(null); // startKey == null + parentKeys.add(BytesUtil.writeUtf8("parent")); + for (int i = 0; i < 2; i++) { + final byte[] parentKey = parentKeys.get(i); + final byte[] childKey = BytesUtil.writeUtf8("child"); + assertEquals(1L, getNextFencingMethod.invoke(this.kvStore, (Object) parentKey)); + assertEquals(2L, getNextFencingMethod.invoke(this.kvStore, (Object) parentKey)); + this.kvStore.initFencingToken(parentKey, childKey); + assertEquals(3L, getNextFencingMethod.invoke(this.kvStore, (Object) childKey)); + assertEquals(4L, getNextFencingMethod.invoke(this.kvStore, (Object) childKey)); + } + } } diff --git a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/rhea/AbstractRheaKVStoreTest.java b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/rhea/AbstractRheaKVStoreTest.java index ac36c01d6..e2643e13f 100644 --- a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/rhea/AbstractRheaKVStoreTest.java +++ b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/rhea/AbstractRheaKVStoreTest.java @@ -716,4 +716,58 @@ public void rangeSplitTest() { newStore.bPut("f_first_key", BytesUtil.writeUtf8("split_ok")); assertArrayEquals(BytesUtil.writeUtf8("split_ok"), newStore.bGet("f_first_key")); } + + @Test + public void restartAllWithLeaderTest() throws Exception { + RheaKVStore store = getRandomLeaderStore(); + // regions: 1 -> [null, g), 2 -> [g, null) + store.bPut("a_get_test", makeValue("a_get_test_value")); + store.bPut("h_get_test", makeValue("h_get_test_value")); + store.bPut("z_get_test", makeValue("z_get_test_value")); + + store.bGetSequence("a_seqTest", 10); + store.bGetSequence("h_seqTest", 11); + store.bGetSequence("z_seqTest", 12); + + shutdown(false); + + start(getStorageType(), false); + + store = getRandomLeaderStore(); + + assertArrayEquals(makeValue("a_get_test_value"), store.bGet("a_get_test")); + assertArrayEquals(makeValue("h_get_test_value"), store.bGet("h_get_test")); + assertArrayEquals(makeValue("z_get_test_value"), store.bGet("z_get_test")); + + assertEquals(10, store.bGetSequence("a_seqTest", 1).getStartValue()); + assertEquals(11, store.bGetSequence("h_seqTest", 1).getStartValue()); + assertEquals(12, store.bGetSequence("z_seqTest", 1).getStartValue()); + } + + @Test + public void restartAllWithFollowerTest() throws Exception { + RheaKVStore store = getRandomFollowerStore(); + // regions: 1 -> [null, g), 2 -> [g, null) + store.bPut("a_get_test", makeValue("a_get_test_value")); + store.bPut("h_get_test", makeValue("h_get_test_value")); + store.bPut("z_get_test", makeValue("z_get_test_value")); + + store.bGetSequence("a_seqTest", 10); + store.bGetSequence("h_seqTest", 11); + store.bGetSequence("z_seqTest", 12); + + shutdown(false); + + start(getStorageType(), false); + + store = getRandomFollowerStore(); + + assertArrayEquals(makeValue("a_get_test_value"), store.bGet("a_get_test")); + assertArrayEquals(makeValue("h_get_test_value"), store.bGet("h_get_test")); + assertArrayEquals(makeValue("z_get_test_value"), store.bGet("z_get_test")); + + assertEquals(10, store.bGetSequence("a_seqTest", 1).getStartValue()); + assertEquals(11, store.bGetSequence("h_seqTest", 1).getStartValue()); + assertEquals(12, store.bGetSequence("z_seqTest", 1).getStartValue()); + } } diff --git a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/rhea/RheaKVTestCluster.java b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/rhea/RheaKVTestCluster.java index d889dc535..34104223e 100644 --- a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/rhea/RheaKVTestCluster.java +++ b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/rhea/RheaKVTestCluster.java @@ -52,11 +52,15 @@ public class RheaKVTestCluster { private List stores = new CopyOnWriteArrayList<>(); protected void start(final StorageType storageType) throws Exception { - deleteFiles(); + start(storageType, true); + } + + protected void start(final StorageType storageType, final boolean deleteFiles) throws Exception { + if (deleteFiles) { + deleteFiles(); + } for (final String c : CONF) { - final ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); - final InputStream in = RheaKVTestCluster.class.getResourceAsStream(c); - final RheaKVStoreOptions opts = mapper.readValue(in, RheaKVStoreOptions.class); + final RheaKVStoreOptions opts = readOpts(c); opts.getStoreEngineOptions().setStorageType(storageType); final RheaKVStore rheaKVStore = new DefaultRheaKVStore(); if (rheaKVStore.init(opts)) { @@ -71,13 +75,27 @@ protected void start(final StorageType storageType) throws Exception { } protected void shutdown() throws Exception { + shutdown(true); + } + + protected void shutdown(final boolean deleteFiles) throws Exception { for (final RheaKVStore store : stores) { store.shutdown(); } - deleteFiles(); + stores.clear(); + if (deleteFiles) { + deleteFiles(); + } LOG.info("RheaKVTestCluster shutdown complete"); } + private RheaKVStoreOptions readOpts(final String conf) throws IOException { + final ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + try (final InputStream in = RheaKVTestCluster.class.getResourceAsStream(conf)) { + return mapper.readValue(in, RheaKVStoreOptions.class); + } + } + protected RheaKVStore getRandomLeaderStore() { return getLeaderStore(getRandomRegionId()); } diff --git a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/rocksdb/RocksKVStoreTest.java b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/rocksdb/RocksKVStoreTest.java index 514106ec6..cb3badce6 100644 --- a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/rocksdb/RocksKVStoreTest.java +++ b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/rocksdb/RocksKVStoreTest.java @@ -20,6 +20,8 @@ import java.io.FileOutputStream; import java.io.IOException; import java.lang.reflect.Method; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.EnumMap; import java.util.List; import java.util.Map; @@ -33,6 +35,7 @@ import org.junit.Test; import com.alipay.sofa.jraft.entity.LocalFileMetaOutter.LocalFileMeta; +import com.alipay.sofa.jraft.rhea.metadata.Region; import com.alipay.sofa.jraft.rhea.options.RocksDBOptions; import com.alipay.sofa.jraft.rhea.rocks.support.RocksStatistics; import com.alipay.sofa.jraft.rhea.storage.KVEntry; @@ -41,6 +44,7 @@ import com.alipay.sofa.jraft.rhea.storage.LocalLock; import com.alipay.sofa.jraft.rhea.storage.RawKVStore; import com.alipay.sofa.jraft.rhea.storage.RocksRawKVStore; +import com.alipay.sofa.jraft.rhea.storage.KVStoreAccessHelper; import com.alipay.sofa.jraft.rhea.storage.Sequence; import com.alipay.sofa.jraft.rhea.storage.SstColumnFamily; import com.alipay.sofa.jraft.rhea.storage.SyncKVStore; @@ -110,7 +114,6 @@ public void execute(RawKVStore kvStore, KVStoreClosure closure) { /** * Test method: {@link RocksRawKVStore#multiGet(List, KVStoreClosure)} */ - @SuppressWarnings("unchecked") @Test public void multiGetTest() { final List keyList = Lists.newArrayList(); @@ -150,19 +153,14 @@ public void getLocalIteratorTest() { } final List entries = Lists.newArrayList(); - final KVIterator it = this.kvStore.localIterator(); - try { + try (final KVIterator it = this.kvStore.localIterator()) { it.seekToFirst(); while (it.isValid()) { entries.add(new KVEntry(it.key(), it.value())); it.next(); } - } finally { - try { - it.close(); - } catch (Exception ignored) { - // ignored - } + } catch (final Exception e) { + e.printStackTrace(); } assertEquals(entries.size(), keyList.size()); @@ -176,7 +174,6 @@ public void getLocalIteratorTest() { /** * Test method: {@link RocksRawKVStore#scan(byte[], byte[], KVStoreClosure)} */ - @SuppressWarnings("unchecked") @Test public void scanTest() { final List keyList = Lists.newArrayList(); @@ -298,7 +295,6 @@ public void execute(RawKVStore kvStore, KVStoreClosure closure) { /** * Test method: {@link RocksRawKVStore#put(List, KVStoreClosure)} */ - @SuppressWarnings("unchecked") @Test public void putListTest() { final List entries = Lists.newArrayList(); @@ -334,7 +330,7 @@ public void putIfAbsent() { } /** - * Test method: {@link RocksRawKVStore#tryLockWith(byte[], boolean, DistributedLock.Acquirer, KVStoreClosure)} + * Test method: {@link RocksRawKVStore#tryLockWith(byte[], byte[], boolean, DistributedLock.Acquirer, KVStoreClosure)} */ @Test public void tryLockWith() throws InterruptedException { @@ -433,7 +429,8 @@ public void snapshotTest() throws Exception { this.kvStore.put(makeKey(v), makeValue(v), null); } - final LocalFileMeta meta = doSnapshotSave(backupDir.getAbsolutePath()); + final Region region = new Region(); + final LocalFileMeta meta = doSnapshotSave(backupDir.getAbsolutePath(), region); assertNotNull(get(makeKey("1"))); @@ -448,7 +445,7 @@ public void snapshotTest() throws Exception { assertNull(get(makeKey("1"))); - doSnapshotLoad(backupDir.getAbsolutePath(), meta); + doSnapshotLoad(backupDir.getAbsolutePath(), meta, region); for (int i = 0; i < 100000; i++) { final String v = String.valueOf(i); @@ -470,10 +467,76 @@ public void execute(RawKVStore kvStore, KVStoreClosure closure) { }.apply(this.kvStore); } - private LocalFileMeta doSnapshotSave(final String path) { - final String snapshotPath = path + File.separator + SNAPSHOT_DIR; + @Test + public void multiGroupSnapshotTest() throws Exception { + final File backupDir = new File("multi-backup"); + if (backupDir.exists()) { + FileUtils.deleteDirectory(backupDir); + } + + final List regions = Lists.newArrayList(); + final List metas = Lists.newArrayList(); + regions.add(new Region(1, makeKey("0"), makeKey("1"), null, null)); + regions.add(new Region(2, makeKey("1"), makeKey("2"), null, null)); + regions.add(new Region(3, makeKey("2"), makeKey("3"), null, null)); + regions.add(new Region(4, makeKey("3"), makeKey("4"), null, null)); + regions.add(new Region(5, makeKey("4"), makeKey("5"), null, null)); + + for (int i = 0; i < 5; i++) { + final String v = String.valueOf(i); + this.kvStore.put(makeKey(v), makeValue(v), null); + } + for (int i = 0; i < 5; i++) { + this.kvStore.getSequence(makeKey(i + "_seq_test"), 10, null); + } + + for (int i = 0; i < 4; i++) { + final Path p = Paths.get(backupDir.getAbsolutePath(), String.valueOf(i)); + final LocalFileMeta meta = doSnapshotSave(p.toString(), regions.get(i)); + metas.add(meta); + } + + this.kvStore.shutdown(); + FileUtils.deleteDirectory(new File(this.tempPath)); + FileUtils.forceMkdir(new File(this.tempPath)); + this.kvStore = new RocksRawKVStore(); + this.kvStore.init(this.dbOptions); + + for (int i = 0; i < 4; i++) { + final Path p = Paths.get(backupDir.getAbsolutePath(), String.valueOf(i)); + doSnapshotLoad(p.toString(), metas.get(i), regions.get(i)); + } + + for (int i = 0; i < 4; i++) { + final String v = String.valueOf(i); + final byte[] seqKey = makeKey(i + "_seq_test"); + assertArrayEquals(makeValue(v), get(makeKey(v))); + final Sequence sequence = new SyncKVStore() { + @Override + public void execute(RawKVStore kvStore, KVStoreClosure closure) { + kvStore.getSequence(seqKey, 10, closure); + } + }.apply(this.kvStore); + assertEquals(10L, sequence.getStartValue()); + assertEquals(20L, sequence.getEndValue()); + } + + assertNull(get(makeKey("5"))); + final Sequence sequence = new SyncKVStore() { + @Override + public void execute(RawKVStore kvStore, KVStoreClosure closure) { + kvStore.getSequence(makeKey("4_seq_test"), 10, closure); + } + }.apply(this.kvStore); + assertEquals(0L, sequence.getStartValue()); + + FileUtils.deleteDirectory(backupDir); + } + + private LocalFileMeta doSnapshotSave(final String path, final Region region) { + final String snapshotPath = Paths.get(path, SNAPSHOT_DIR).toString(); try { - final LocalFileMeta meta = this.kvStore.onSnapshotSave(snapshotPath); + final LocalFileMeta meta = KVStoreAccessHelper.saveSnapshot(this.kvStore, snapshotPath, region); doCompressSnapshot(path); return meta; } catch (final Throwable t) { @@ -482,10 +545,12 @@ private LocalFileMeta doSnapshotSave(final String path) { return null; } - public boolean doSnapshotLoad(final String path, final LocalFileMeta meta) { + public boolean doSnapshotLoad(final String path, final LocalFileMeta meta, final Region region) { + final String sourceFile = Paths.get(path, SNAPSHOT_ARCHIVE).toString(); + final String snapshotPath = Paths.get(path, SNAPSHOT_DIR).toString(); try { - ZipUtil.unzipFile(path + File.separator + SNAPSHOT_ARCHIVE, path); - this.kvStore.onSnapshotLoad(path + File.separator + SNAPSHOT_DIR, meta); + ZipUtil.unzipFile(sourceFile, path); + KVStoreAccessHelper.loadSnapshot(this.kvStore, snapshotPath, meta, region); return true; } catch (final Throwable t) { t.printStackTrace(); @@ -494,9 +559,9 @@ public boolean doSnapshotLoad(final String path, final LocalFileMeta meta) { } private void doCompressSnapshot(final String path) { + final String outputFile = Paths.get(path, SNAPSHOT_ARCHIVE).toString(); try { - try (final ZipOutputStream out = new ZipOutputStream(new FileOutputStream(path + File.separator - + SNAPSHOT_ARCHIVE))) { + try (final ZipOutputStream out = new ZipOutputStream(new FileOutputStream(outputFile))) { ZipUtil.compressDirectoryToZipFile(path, SNAPSHOT_DIR, out); } } catch (final Throwable t) { @@ -553,6 +618,25 @@ public void jumpOverTest() { assertEquals(1000, approximateKeys, 100); } + @Test + public void initFencingTokenTest() throws Exception { + final Method getNextFencingMethod = RocksRawKVStore.class + .getDeclaredMethod("getNextFencingToken", byte[].class); + getNextFencingMethod.setAccessible(true); + final List parentKeys = Lists.newArrayList(); + parentKeys.add(null); // startKey == null + parentKeys.add(BytesUtil.writeUtf8("parent")); + for (int i = 0; i < 2; i++) { + final byte[] parentKey = parentKeys.get(i); + final byte[] childKey = BytesUtil.writeUtf8("child"); + assertEquals(1L, getNextFencingMethod.invoke(this.kvStore, (Object) parentKey)); + assertEquals(2L, getNextFencingMethod.invoke(this.kvStore, (Object) parentKey)); + this.kvStore.initFencingToken(parentKey, childKey); + assertEquals(3L, getNextFencingMethod.invoke(this.kvStore, (Object) childKey)); + assertEquals(4L, getNextFencingMethod.invoke(this.kvStore, (Object) childKey)); + } + } + @Test public void sstFilesTest() throws IOException { for (int i = 0; i < 10000; i++) { @@ -562,16 +646,26 @@ public void sstFilesTest() throws IOException { this.kvStore.getSequence(BytesUtil.writeUtf8("seq"), 100, null); final File defaultSstFile = new File("default.sst"); final File seqSstFile = new File("seq.sst"); + final File lockingFile = new File("locking.sst"); + final File fencingFile = new File("fencing.sst"); if (defaultSstFile.exists()) { FileUtils.forceDelete(defaultSstFile); } if (seqSstFile.exists()) { FileUtils.forceDelete(seqSstFile); } + if (lockingFile.exists()) { + FileUtils.forceDelete(lockingFile); + } + if (fencingFile.exists()) { + FileUtils.forceDelete(fencingFile); + } final EnumMap sstFileTable = new EnumMap<>(SstColumnFamily.class); sstFileTable.put(SstColumnFamily.DEFAULT, defaultSstFile); sstFileTable.put(SstColumnFamily.SEQUENCE, seqSstFile); - this.kvStore.createSstFiles(sstFileTable, null, null); + sstFileTable.put(SstColumnFamily.LOCKING, lockingFile); + sstFileTable.put(SstColumnFamily.FENCING, fencingFile); + KVStoreAccessHelper.createSstFiles(this.kvStore, sstFileTable, null, null); // remove keys for (int i = 0; i < 10000; i++) { byte[] bytes = BytesUtil.writeUtf8(String.valueOf(i)); @@ -585,13 +679,19 @@ public void sstFilesTest() throws IOException { this.kvStore.get(bytes, closure); assertNull(closure.getData()); } - this.kvStore.ingestSstFiles(sstFileTable); + KVStoreAccessHelper.ingestSstFiles(this.kvStore, sstFileTable); if (defaultSstFile.exists()) { FileUtils.forceDelete(defaultSstFile); } if (seqSstFile.exists()) { FileUtils.forceDelete(seqSstFile); } + if (lockingFile.exists()) { + FileUtils.forceDelete(lockingFile); + } + if (fencingFile.exists()) { + FileUtils.forceDelete(fencingFile); + } for (int i = 0; i < 10000; i++) { byte[] bytes = BytesUtil.writeUtf8(String.valueOf(i)); TestClosure closure = new TestClosure();