Skip to content

Commit

Permalink
Fix/some fix (#465)
Browse files Browse the repository at this point in the history
* [fix] maybe a recycle bug

* [fix] add read-index timeout scanner
  • Loading branch information
fengjiachun authored Jun 15, 2020
1 parent 0d8c96f commit e0f5daf
Show file tree
Hide file tree
Showing 9 changed files with 193 additions and 36 deletions.
16 changes: 15 additions & 1 deletion jraft-core/src/main/java/com/alipay/sofa/jraft/JRaftUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.option.BootstrapOptions;
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.JRaftServiceLoader;
import com.alipay.sofa.jraft.util.NamedThreadFactory;
import com.alipay.sofa.jraft.util.ThreadPoolUtil;
import com.alipay.sofa.jraft.util.timer.RaftTimerFactory;

/**
* Some helper methods for jraft usage.
Expand All @@ -40,7 +42,16 @@
*/
public final class JRaftUtils {

private JRaftUtils() {
private final static RaftTimerFactory TIMER_FACTORY = JRaftServiceLoader.load(RaftTimerFactory.class) //
.first();

/**
* Get raft timer factory.
*
* @return {@link RaftTimerFactory}
*/
public static RaftTimerFactory raftTimerFactory() {
return TIMER_FACTORY;
}

/**
Expand Down Expand Up @@ -135,4 +146,7 @@ public static Endpoint getEndPoint(final String s) {
}
return new Endpoint(tmps[0], Integer.parseInt(tmps[1]));
}

private JRaftUtils() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,71 @@
*/
package com.alipay.sofa.jraft.closure;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.JRaftUtils;
import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.util.SystemPropertyUtil;
import com.alipay.sofa.jraft.util.timer.Timeout;
import com.alipay.sofa.jraft.util.timer.Timer;
import com.alipay.sofa.jraft.util.timer.TimerTask;

/**
* Read index closure
*
* @author dennis
*/
public abstract class ReadIndexClosure implements Closure {
private static final Logger LOG = LoggerFactory.getLogger(ReadIndexClosure.class);

private static final Logger LOG = LoggerFactory
.getLogger(ReadIndexClosure.class);

private static final AtomicIntegerFieldUpdater<ReadIndexClosure> STATE_UPDATER = AtomicIntegerFieldUpdater
.newUpdater(
ReadIndexClosure.class,
"state");

private static final long DEFAULT_TIMEOUT = SystemPropertyUtil.getInt(
"jraft.read-index.timeout",
2 * 1000);

private static final int PENDING = 0;
private static final int COMPLETE = 1;
private static final int TIMEOUT = 2;

/**
* Invalid log index -1.
*/
public static final long INVALID_LOG_INDEX = -1;
private long index = INVALID_LOG_INDEX;
private byte[] requestContext;
public static final long INVALID_LOG_INDEX = -1;

private long index = INVALID_LOG_INDEX;
private byte[] requestContext;

private volatile int state;

public ReadIndexClosure() {
this(DEFAULT_TIMEOUT);
}

/**
* Create a read-index closure with a timeout parameter.
*
* @param timeoutMs timeout millis
*/
public ReadIndexClosure(long timeoutMs) {
this.state = PENDING;
if (timeoutMs >= 0) {
// Lazy to init the timer
TimeoutScanner.TIMER.newTimeout(new TimeoutTask(this), timeoutMs, TimeUnit.MILLISECONDS);
}
}

/**
* Called when ReadIndex can be executed.
Expand Down Expand Up @@ -79,10 +123,45 @@ public byte[] getRequestContext() {

@Override
public void run(final Status status) {
if (!STATE_UPDATER.compareAndSet(this, PENDING, COMPLETE)) {
LOG.warn("A timeout read-index response finally returned: {}.", status);
return;
}

try {
run(status, this.index, this.requestContext);
} catch (Throwable t) {
LOG.error("Fail to run ReadIndexClosure with status: {}.", status, t);
}
}

static class TimeoutTask implements TimerTask {

private final ReadIndexClosure closure;

TimeoutTask(ReadIndexClosure closure) {
this.closure = closure;
}

@Override
public void run(final Timeout timeout) throws Exception {
if (!STATE_UPDATER.compareAndSet(this.closure, PENDING, TIMEOUT)) {
return;
}

final Status status = new Status(RaftError.ETIMEDOUT, "read-index request timeout");
try {
this.closure.run(status, INVALID_LOG_INDEX, null);
} catch (final Throwable t) {
LOG.error("[Timeout] fail to run ReadIndexClosure with status: {}.", status, t);
}
}
}

/**
* Lazy to create a timer
*/
static class TimeoutScanner {
private static final Timer TIMER = JRaftUtils.raftTimerFactory().createTimer("read-index.timeout.scanner");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.FSMCaller;
import com.alipay.sofa.jraft.JRaftServiceFactory;
import com.alipay.sofa.jraft.JRaftUtils;
import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.NodeManager;
import com.alipay.sofa.jraft.ReadOnlyService;
Expand Down Expand Up @@ -153,10 +154,8 @@ public class NodeImpl implements Node, RaftServerService {
}
}

private final static RaftTimerFactory TIMER_FACTORY = JRaftServiceLoader
.load(
RaftTimerFactory.class) //
.first();
public final static RaftTimerFactory TIMER_FACTORY = JRaftUtils
.raftTimerFactory();

// Max retry times when applying tasks.
private static final int MAX_APPLY_RETRY_TIMES = 3;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ public void run(final Status status) {
} else {
// Not applied, add it to pending-notify cache.
ReadOnlyServiceImpl.this.pendingNotifyStatus
.computeIfAbsent(readIndexStatus.getIndex(), k -> new ArrayList<>(10)) //
.add(readIndexStatus);
.computeIfAbsent(readIndexStatus.getIndex(), k -> new ArrayList<>(10)) //
.add(readIndexStatus);
}
} finally {
if (doUnlock) {
Expand Down Expand Up @@ -242,21 +242,21 @@ public boolean init(final ReadOnlyServiceOptions opts) {
this.raftOptions = opts.getRaftOptions();

this.scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(new NamedThreadFactory("ReadOnlyService-PendingNotify-Scanner", true));
.newSingleThreadScheduledExecutor(new NamedThreadFactory("ReadOnlyService-PendingNotify-Scanner", true));
this.readIndexDisruptor = DisruptorBuilder.<ReadIndexEvent> newInstance() //
.setEventFactory(new ReadIndexEventFactory()) //
.setRingBufferSize(this.raftOptions.getDisruptorBufferSize()) //
.setThreadFactory(new NamedThreadFactory("JRaft-ReadOnlyService-Disruptor-", true)) //
.setWaitStrategy(new BlockingWaitStrategy()) //
.setProducerType(ProducerType.MULTI) //
.build();
.setEventFactory(new ReadIndexEventFactory()) //
.setRingBufferSize(this.raftOptions.getDisruptorBufferSize()) //
.setThreadFactory(new NamedThreadFactory("JRaft-ReadOnlyService-Disruptor-", true)) //
.setWaitStrategy(new BlockingWaitStrategy()) //
.setProducerType(ProducerType.MULTI) //
.build();
this.readIndexDisruptor.handleEventsWith(new ReadIndexEventHandler());
this.readIndexDisruptor
.setDefaultExceptionHandler(new LogExceptionHandler<Object>(this.getClass().getSimpleName()));
.setDefaultExceptionHandler(new LogExceptionHandler<Object>(getClass().getSimpleName()));
this.readIndexQueue = this.readIndexDisruptor.start();
if (this.nodeMetrics.getMetricRegistry() != null) {
this.nodeMetrics.getMetricRegistry() //
.register("jraft-read-only-service-disruptor", new DisruptorMetricSet(this.readIndexQueue));
.register("jraft-read-only-service-disruptor", new DisruptorMetricSet(this.readIndexQueue));
}
// listen on lastAppliedLogIndex change events.
this.fsmCaller.addLastAppliedLogIndexListener(this);
Expand Down
27 changes: 17 additions & 10 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/core/Replicator.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import com.alipay.sofa.jraft.util.Requires;
import com.alipay.sofa.jraft.util.ThreadId;
import com.alipay.sofa.jraft.util.Utils;
import com.alipay.sofa.jraft.util.internal.ThrowUtil;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricFilter;
Expand Down Expand Up @@ -1571,21 +1572,27 @@ private boolean sendEntries(final long nextSendingIndex) {
final int v = this.version;
final long monotonicSendTimeMs = Utils.monotonicMs();
final int seq = getAndIncrementReqSeq();
final Future<Message> rpcFuture = this.rpcService.appendEntries(this.options.getPeerId().getEndpoint(),
request, -1, new RpcResponseClosureAdapter<AppendEntriesResponse>() {

@Override
public void run(final Status status) {
RecycleUtil.recycle(recyclable);
onRpcReturned(Replicator.this.id, RequestType.AppendEntries, status, request, getResponse(), seq,
v, monotonicSendTimeMs);
}
Future<Message> rpcFuture = null;
try {
rpcFuture = this.rpcService.appendEntries(this.options.getPeerId().getEndpoint(), request, -1,
new RpcResponseClosureAdapter<AppendEntriesResponse>() {

});
@Override
public void run(final Status status) {
RecycleUtil.recycle(recyclable); // TODO: recycle on send success, not response received.
onRpcReturned(Replicator.this.id, RequestType.AppendEntries, status, request, getResponse(),
seq, v, monotonicSendTimeMs);
}
});
} catch (final Throwable t) {
RecycleUtil.recycle(recyclable);
ThrowUtil.throwException(t);
}
addInflight(RequestType.AppendEntries, nextSendingIndex, request.getEntriesCount(), request.getData().size(),
seq, rpcFuture);
return true;

return true;
}

public static void sendHeartbeat(final ThreadId id, final RpcResponseClosure<AppendEntriesResponse> closure) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public SegmentList(final boolean recycleSegment) {
}

/**
* A recyclable segment.
* A recyclable segment.
* @author boyan([email protected])
*
* @param <T>
Expand Down Expand Up @@ -117,6 +117,7 @@ int cap() {
return SEGMENT_SIZE - this.pos;
}

@SuppressWarnings("SuspiciousSystemArraycopy")
private void addAll(final Object[] src, final int srcPos, final int len) {
System.arraycopy(src, srcPos, this.elements, this.pos, len);
this.pos += len;
Expand Down Expand Up @@ -261,7 +262,8 @@ public boolean isEmpty() {

/**
* Remove elements from first until predicate returns false.
* @param predicate
*
* @param predicate predicate functional interface
*/
public void removeFromFirstWhen(final Predicate<T> predicate) {
Segment<T> firstSeg = getFirst();
Expand Down Expand Up @@ -293,7 +295,8 @@ public void clear() {

/**
* Remove elements from last until predicate returns false.
* @param predicate
*
* @param predicate predicate functional interface
*/
public void removeFromLastWhen(final Predicate<T> predicate) {
Segment<T> lastSeg = getLast();
Expand Down Expand Up @@ -371,7 +374,7 @@ public void addAll(final Collection<T> coll) {
}

private Object[] coll2Array(final Collection<T> coll) {
Object[] src = null;
Object[] src;
if (coll instanceof ArrayList && UnsafeUtil.hasUnsafe()) {
src = LIST_ARRAY_GETTER.get((ArrayList<T>) coll);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,13 @@ public Scheduler getRaftScheduler(final boolean shared, final int workerNum, fin
return shared ? SCHEDULER_REF.getRef() : createScheduler(workerNum, name);
}

private static Timer createTimer(final String name) {
@Override
public Timer createTimer(final String name) {
return new HashedWheelTimer(new NamedThreadFactory(name, true), 1, TimeUnit.MILLISECONDS, 2048);
}

private static Scheduler createScheduler(final int workerNum, final String name) {
@Override
public Scheduler createScheduler(final int workerNum, final String name) {
return new TimerManager(workerNum, name);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,8 @@ public interface RaftTimerFactory {
Timer getSnapshotTimer(final boolean shared, final String name);

Scheduler getRaftScheduler(final boolean shared, final int workerNum, final String name);

Timer createTimer(final String name);

Scheduler createScheduler(final int workerNum, final String name);
}
49 changes: 49 additions & 0 deletions jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1368,6 +1368,55 @@ public void run(final Status status, final long index, final byte[] reqCtx) {
cluster.stopAll();
}

@Test
public void testReadIndexTimeout() throws Exception {
final List<PeerId> peers = TestUtils.generatePeers(3);

final TestCluster cluster = new TestCluster("unittest", this.dataPath, peers);
for (final PeerId peer : peers) {
assertTrue(cluster.start(peer.getEndpoint(), false, 300, true));
}

// elect leader
cluster.waitLeader();

// get leader
final Node leader = cluster.getLeader();
assertNotNull(leader);
assertEquals(3, leader.listPeers().size());
// apply tasks to leader
sendTestTaskAndWait(leader);

assertReadIndex(leader, 11);

// read from follower
for (final Node follower : cluster.getFollowers()) {
assertNotNull(follower);
assertReadIndex(follower, 11);
}

// read with null request context
final CountDownLatch latch = new CountDownLatch(1);
final long start = System.currentTimeMillis();
leader.readIndex(null, new ReadIndexClosure(0) {

@Override
public void run(final Status status, final long index, final byte[] reqCtx) {
assertNull(reqCtx);
if (status.isOk()) {
System.err.println("Read-index so fast: " + (System.currentTimeMillis() - start) + "ms");
} else {
assertEquals(status, new Status(RaftError.ETIMEDOUT, "read-index request timeout"));
assertEquals(index, -1);
}
latch.countDown();
}
});
latch.await();

cluster.stopAll();
}

@Test
public void testReadIndexFromLearner() throws Exception {
final List<PeerId> peers = TestUtils.generatePeers(3);
Expand Down

0 comments on commit e0f5daf

Please sign in to comment.