Skip to content

Commit

Permalink
Feat/multi group snapshot (#42)
Browse files Browse the repository at this point in the history
* (fix) multi-group snapshot with memoryStore

* (fix) multi-group snapshot with rocksDBStore

* (fix) add multi sst snapshot benchmark

* (fix) add meta check on load snapshot

* (fix) add meta check on load snapshot

* (fix) add timer metric with memory snapshot

* (fix) typo

* (fix) log

* (bugfix) fencing token isolation

* (bugfix) rename local variable

* (bugfix) fencing token isolation by region

* (fix) typo

* (fix) region info visibility

* (feat) add comment

* (fix) delete existing data on rocksdb restart, because raft will play them back

* (fix) delete existing data on rocksdb restart, because raft will play them back

* (fix) add restart test #86

* (fix) add restart test #86

* (fix) update restart test

* (fix) add error log on splitting fail

* (fix) requires

* (fix) remove useless code

* (fix) code refactoring with snapshot

* (fix) fix bad name

* (fix) typo
  • Loading branch information
fengjiachun committed Apr 17, 2019
1 parent 09fb200 commit 11c7d60
Show file tree
Hide file tree
Showing 31 changed files with 1,405 additions and 622 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ public interface StateMachine {
* call done.run(status) when snapshot finished.
* Default: Save nothing and returns error.
*
* @param writer snapshot writer
* @param done callback
* @param writer snapshot writer
* @param done callback
*/
void onSnapshotSave(SnapshotWriter writer, Closure done);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public abstract class Snapshot extends Status {
* Snapshot file prefix.
*/
public static final String JRAFT_SNAPSHOT_PREFIX = "snapshot_";
/** Snapshot uri scheme for remote peer*/
/** Snapshot uri scheme for remote peer */
public static final String REMOTE_SNAPSHOT_URI_SCHEME = "remote://";

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ public static boolean isWindows() {
}

private static boolean isWindows0() {
boolean windows = SystemPropertyUtil.get("os.name", "").toLowerCase(Locale.US).contains("win");
final boolean windows = SystemPropertyUtil.get("os.name", "") //
.toLowerCase(Locale.US) //
.contains("win");
if (windows) {
LOG.debug("Platform: Windows");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,10 +449,11 @@ public void handleKeyLockRequest(final KeyLockRequest request,
try {
checkRegionEpoch(request);
final byte[] key = requireNonNull(request.getKey(), "lock.key");
final byte[] fencingKey = this.regionEngine.getRegion().getStartKey();
final DistributedLock.Acquirer acquirer = requireNonNull(request.getAcquirer(), "lock.acquirer");
requireNonNull(acquirer.getId(), "lock.id");
requirePositive(acquirer.getLeaseMillis(), "lock.leaseMillis");
this.rawKVStore.tryLockWith(key, request.isKeepLease(), acquirer, new BaseKVStoreClosure() {
this.rawKVStore.tryLockWith(key, fencingKey, request.isKeepLease(), acquirer, new BaseKVStoreClosure() {

@Override
public void run(Status status) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public synchronized boolean init(final RegionEngineOptions opts) {
return true;
}
this.regionOpts = Requires.requireNonNull(opts, "opts");
this.fsm = new KVStoreStateMachine(this.region.getId(), this.storeEngine);
this.fsm = new KVStoreStateMachine(this.region, this.storeEngine);

// node options
NodeOptions nodeOpts = opts.getNodeOptions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,11 +515,7 @@ public void doSplit(final Long regionId, final Long newRegionId, final byte[] sp
rOpts.setRaftDataPath(baseRaftDataPath + "raft_data_region_" + region.getId() + "_"
+ getSelfEndpoint().getPort());
final RegionEngine engine = new RegionEngine(region, this);
if (engine.init(rOpts)) {
final RegionKVService regionKVService = new DefaultRegionKVService(engine);
registerRegionKVService(regionKVService);
this.regionEngineTable.put(region.getId(), engine);
} else {
if (!engine.init(rOpts)) {
LOG.error("Fail to init [RegionEngine: {}].", region);
if (closure != null) {
// null on follower
Expand All @@ -528,12 +524,20 @@ public void doSplit(final Long regionId, final Long newRegionId, final byte[] sp
}
return;
}

// update parent conf
final Region pRegion = parent.getRegion();
final RegionEpoch pEpoch = pRegion.getRegionEpoch();
final long version = pEpoch.getVersion();
pEpoch.setVersion(version + 1); // version + 1
pRegion.setEndKey(splitKey); // update endKey

// the following two lines of code can make a relation of 'happens-before' for
// read 'pRegion', because that a write to a ConcurrentMap happens-before every
// subsequent read of that ConcurrentMap.
this.regionEngineTable.put(region.getId(), engine);
registerRegionKVService(new DefaultRegionKVService(engine));

// update local regionRouteTable
this.pdClient.getRegionRouteTable().splitRegion(pRegion.getId(), region);
if (closure != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1165,7 +1165,7 @@ private void internalTryLockWith(final byte[] key, final boolean keepLease, fina
retryRunner);
if (regionEngine != null) {
if (ensureOnValidEpoch(region, regionEngine, closure)) {
getRawKVStore(regionEngine).tryLockWith(key, keepLease, acquirer, closure);
getRawKVStore(regionEngine).tryLockWith(key, region.getStartKey(), keepLease, acquirer, closure);
}
} else {
final KeyLockRequest request = new KeyLockRequest();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.jraft.rhea.storage;

import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.concurrent.ExecutorService;
import java.util.zip.ZipOutputStream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.rhea.metadata.Region;
import com.alipay.sofa.jraft.rhea.serialization.Serializer;
import com.alipay.sofa.jraft.rhea.serialization.Serializers;
import com.alipay.sofa.jraft.rhea.util.StackTraceUtil;
import com.alipay.sofa.jraft.rhea.util.ZipUtil;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
import com.google.protobuf.ByteString;

import static com.alipay.sofa.jraft.entity.LocalFileMetaOutter.LocalFileMeta;

/**
* @author jiachun.fjc
*/
public abstract class AbstractKVStoreSnapshotFile implements KVStoreSnapshotFile {

private static final Logger LOG = LoggerFactory.getLogger(AbstractKVStoreSnapshotFile.class);

private static final String SNAPSHOT_DIR = "kv";
private static final String SNAPSHOT_ARCHIVE = "kv.zip";

protected final Serializer serializer = Serializers.getDefault();

@Override
public void save(final SnapshotWriter writer, final Closure done, final Region region,
final ExecutorService executor) {
final String writerPath = writer.getPath();
final String snapshotPath = Paths.get(writerPath, SNAPSHOT_DIR).toString();
try {
final LocalFileMeta meta = doSnapshotSave(snapshotPath, region);
executor.execute(() -> compressSnapshot(writer, meta, done));
} catch (final Throwable t) {
LOG.error("Fail to save snapshot, path={}, file list={}, {}.", writerPath, writer.listFiles(),
StackTraceUtil.stackTrace(t));
done.run(new Status(RaftError.EIO, "Fail to save snapshot at %s, error is %s", writerPath,
t.getMessage()));
}
}

@Override
public boolean load(final SnapshotReader reader, final Region region) {
final LocalFileMeta meta = (LocalFileMeta) reader.getFileMeta(SNAPSHOT_ARCHIVE);
final String readerPath = reader.getPath();
if (meta == null) {
LOG.error("Can't find kv snapshot file, path={}.", readerPath);
return false;
}
final String snapshotPath = Paths.get(readerPath, SNAPSHOT_DIR).toString();
try {
decompressSnapshot(readerPath);
doSnapshotLoad(snapshotPath, meta, region);
return true;
} catch (final Throwable t) {
LOG.error("Fail to load snapshot, path={}, file list={}, {}.", readerPath, reader.listFiles(),
StackTraceUtil.stackTrace(t));
return false;
}
}

abstract LocalFileMeta doSnapshotSave(final String snapshotPath, final Region region) throws Exception;

abstract void doSnapshotLoad(final String snapshotPath, final LocalFileMeta meta, final Region region)
throws Exception;

protected void compressSnapshot(final SnapshotWriter writer, final LocalFileMeta meta, final Closure done) {
final String writerPath = writer.getPath();
final String outputFile = Paths.get(writerPath, SNAPSHOT_ARCHIVE).toString();
try {
try (final ZipOutputStream out = new ZipOutputStream(new FileOutputStream(outputFile))) {
ZipUtil.compressDirectoryToZipFile(writerPath, SNAPSHOT_DIR, out);
}
if (writer.addFile(SNAPSHOT_ARCHIVE, meta)) {
done.run(Status.OK());
} else {
done.run(new Status(RaftError.EIO, "Fail to add snapshot file: %s", writerPath));
}
} catch (final Throwable t) {
LOG.error("Fail to compress snapshot, path={}, file list={}, {}.", writerPath, writer.listFiles(),
StackTraceUtil.stackTrace(t));
done.run(new Status(RaftError.EIO, "Fail to compress snapshot at %s, error is %s", writerPath, t
.getMessage()));
}
}

protected void decompressSnapshot(final String readerPath) throws IOException {
final String sourceFile = Paths.get(readerPath, SNAPSHOT_ARCHIVE).toString();
ZipUtil.unzipFile(sourceFile, readerPath);
}

protected <T> T readMetadata(final LocalFileMeta meta, final Class<T> cls) {
final ByteString userMeta = meta.getUserMeta();
return this.serializer.readObject(userMeta.toByteArray(), cls);
}

protected <T> LocalFileMeta buildMetadata(final T metadata) {
return metadata == null ? null : LocalFileMeta.newBuilder() //
.setUserMeta(ByteString.copyFrom(this.serializer.writeObject(metadata))) //
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,15 @@
import com.alipay.sofa.jraft.rhea.errors.Errors;
import com.alipay.sofa.jraft.rhea.metrics.KVMetrics;
import com.alipay.sofa.jraft.rhea.util.StackTraceUtil;
import com.alipay.sofa.jraft.util.BytesUtil;
import com.codahale.metrics.Timer;

import static com.alipay.sofa.jraft.entity.LocalFileMetaOutter.LocalFileMeta;
import static com.alipay.sofa.jraft.rhea.metrics.KVMetricNames.DB_TIMER;

/**
* @author jiachun.fjc
*/
public abstract class BaseRawKVStore<T> implements RawKVStore, Lifecycle<T> {

protected static final byte[] LOCK_FENCING_KEY = BytesUtil.writeUtf8("LOCK_FENCING_KEY");

@Override
public void get(final byte[] key, final KVStoreClosure closure) {
get(key, true, closure);
Expand Down Expand Up @@ -96,9 +92,13 @@ public void execute(final NodeExecutor nodeExecutor, final boolean isLeader, fin
*/
public abstract byte[] jumpOver(final byte[] startKey, final long distance);

public abstract LocalFileMeta onSnapshotSave(final String snapshotPath) throws Exception;

public abstract void onSnapshotLoad(final String snapshotPath, final LocalFileMeta meta) throws Exception;
/**
* Init the fencing token of new region.
*
* @param parentKey the fencing key of parent region
* @param childKey the fencing key of new region
*/
public abstract void initFencingToken(final byte[] parentKey, final byte[] childKey);

// static methods
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public void batchTryLockWith(final KVStateOutputList kvStates) {
final KVState kvState = kvStates.get(i);
final KVOperation op = kvState.getOp();
final Pair<Boolean, DistributedLock.Acquirer> acquirerPair = op.getAcquirerPair();
tryLockWith(op.getKey(), acquirerPair.getKey(), acquirerPair.getValue(), kvState.getDone());
tryLockWith(op.getKey(), op.getFencingKey(), acquirerPair.getKey(), acquirerPair.getValue(),
kvState.getDone());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,10 @@ public static KVOperation createNodeExecutor(final NodeExecutor nodeExecutor) {
return new KVOperation(BytesUtil.EMPTY_BYTES, BytesUtil.EMPTY_BYTES, nodeExecutor, NODE_EXECUTE);
}

public static KVOperation createKeyLockRequest(final byte[] key,
public static KVOperation createKeyLockRequest(final byte[] key, final byte[] fencingKey,
final Pair<Boolean, DistributedLock.Acquirer> acquirerPair) {
Requires.requireNonNull(key, "key");
return new KVOperation(key, BytesUtil.EMPTY_BYTES, acquirerPair, KEY_LOCK);
return new KVOperation(key, fencingKey, acquirerPair, KEY_LOCK);
}

public static KVOperation createKeyLockReleaseRequest(final byte[] key, final DistributedLock.Acquirer acquirer) {
Expand Down Expand Up @@ -239,6 +239,10 @@ public byte[] getEndKey() {
return value;
}

public byte[] getFencingKey() {
return value;
}

public void setValue(byte[] value) {
this.value = value;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.jraft.rhea.storage;

import java.util.concurrent.ExecutorService;

import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.rhea.metadata.Region;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;

/**
*
* @author jiachun.fjc
*/
public interface KVStoreSnapshotFile {

/**
* Save a snapshot for the specified region.
*
* @param writer snapshot writer
* @param done callback
* @param region the region to save snapshot
* @param executor the executor to compress snapshot
*/
void save(final SnapshotWriter writer, final Closure done, final Region region, final ExecutorService executor);

/**
* Load snapshot for the specified region.
*
* @param reader snapshot reader
* @param region the region to load snapshot
* @return true if load succeed
*/
boolean load(final SnapshotReader reader, final Region region);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.jraft.rhea.storage;

import com.alipay.sofa.jraft.util.Requires;

/**
*
* @author jiachun.fjc
*/
public final class KVStoreSnapshotFileFactory {

public static <T> KVStoreSnapshotFile getKVStoreSnapshotFile(final BaseRawKVStore<T> kvStore) {
Requires.requireNonNull(kvStore, "kvStore");
if (kvStore instanceof RocksRawKVStore) {
return new RocksKVStoreSnapshotFile((RocksRawKVStore) kvStore);
}
if (kvStore instanceof MemoryRawKVStore) {
return new MemoryKVStoreSnapshotFile((MemoryRawKVStore) kvStore);
}
throw reject("fail to find a KVStoreSnapshotFile with " + kvStore.getClass().getName());
}

private static UnsupportedOperationException reject(final String message) {
return new UnsupportedOperationException(message);
}

private KVStoreSnapshotFileFactory() {
}
}
Loading

0 comments on commit 11c7d60

Please sign in to comment.