diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/JRaftUtils.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/JRaftUtils.java index 48aa930da..020137fe2 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/JRaftUtils.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/JRaftUtils.java @@ -28,8 +28,10 @@ import com.alipay.sofa.jraft.entity.PeerId; import com.alipay.sofa.jraft.option.BootstrapOptions; import com.alipay.sofa.jraft.util.Endpoint; +import com.alipay.sofa.jraft.util.JRaftServiceLoader; import com.alipay.sofa.jraft.util.NamedThreadFactory; import com.alipay.sofa.jraft.util.ThreadPoolUtil; +import com.alipay.sofa.jraft.util.timer.RaftTimerFactory; /** * Some helper methods for jraft usage. @@ -40,7 +42,16 @@ */ public final class JRaftUtils { - private JRaftUtils() { + private final static RaftTimerFactory TIMER_FACTORY = JRaftServiceLoader.load(RaftTimerFactory.class) // + .first(); + + /** + * Get raft timer factory. + * + * @return {@link RaftTimerFactory} + */ + public static RaftTimerFactory raftTimerFactory() { + return TIMER_FACTORY; } /** @@ -135,4 +146,7 @@ public static Endpoint getEndPoint(final String s) { } return new Endpoint(tmps[0], Integer.parseInt(tmps[1])); } + + private JRaftUtils() { + } } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/closure/ReadIndexClosure.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/closure/ReadIndexClosure.java index cd715ef69..549db8e88 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/closure/ReadIndexClosure.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/closure/ReadIndexClosure.java @@ -16,12 +16,21 @@ */ package com.alipay.sofa.jraft.closure; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alipay.sofa.jraft.Closure; +import com.alipay.sofa.jraft.JRaftUtils; import com.alipay.sofa.jraft.Node; import com.alipay.sofa.jraft.Status; +import com.alipay.sofa.jraft.error.RaftError; +import com.alipay.sofa.jraft.util.SystemPropertyUtil; +import com.alipay.sofa.jraft.util.timer.Timeout; +import com.alipay.sofa.jraft.util.timer.Timer; +import com.alipay.sofa.jraft.util.timer.TimerTask; /** * Read index closure @@ -29,14 +38,49 @@ * @author dennis */ public abstract class ReadIndexClosure implements Closure { - private static final Logger LOG = LoggerFactory.getLogger(ReadIndexClosure.class); + + private static final Logger LOG = LoggerFactory + .getLogger(ReadIndexClosure.class); + + private static final AtomicIntegerFieldUpdater STATE_UPDATER = AtomicIntegerFieldUpdater + .newUpdater( + ReadIndexClosure.class, + "state"); + + private static final long DEFAULT_TIMEOUT = SystemPropertyUtil.getInt( + "jraft.read-index.timeout", + 2 * 1000); + + private static final int PENDING = 0; + private static final int COMPLETE = 1; + private static final int TIMEOUT = 2; /** * Invalid log index -1. */ - public static final long INVALID_LOG_INDEX = -1; - private long index = INVALID_LOG_INDEX; - private byte[] requestContext; + public static final long INVALID_LOG_INDEX = -1; + + private long index = INVALID_LOG_INDEX; + private byte[] requestContext; + + private volatile int state; + + public ReadIndexClosure() { + this(DEFAULT_TIMEOUT); + } + + /** + * Create a read-index closure with a timeout parameter. + * + * @param timeoutMs timeout millis + */ + public ReadIndexClosure(long timeoutMs) { + this.state = PENDING; + if (timeoutMs >= 0) { + // Lazy to init the timer + TimeoutScanner.TIMER.newTimeout(new TimeoutTask(this), timeoutMs, TimeUnit.MILLISECONDS); + } + } /** * Called when ReadIndex can be executed. @@ -79,10 +123,45 @@ public byte[] getRequestContext() { @Override public void run(final Status status) { + if (!STATE_UPDATER.compareAndSet(this, PENDING, COMPLETE)) { + LOG.warn("A timeout read-index response finally returned: {}.", status); + return; + } + try { run(status, this.index, this.requestContext); } catch (Throwable t) { LOG.error("Fail to run ReadIndexClosure with status: {}.", status, t); } } + + static class TimeoutTask implements TimerTask { + + private final ReadIndexClosure closure; + + TimeoutTask(ReadIndexClosure closure) { + this.closure = closure; + } + + @Override + public void run(final Timeout timeout) throws Exception { + if (!STATE_UPDATER.compareAndSet(this.closure, PENDING, TIMEOUT)) { + return; + } + + final Status status = new Status(RaftError.ETIMEDOUT, "read-index request timeout"); + try { + this.closure.run(status, INVALID_LOG_INDEX, null); + } catch (final Throwable t) { + LOG.error("[Timeout] fail to run ReadIndexClosure with status: {}.", status, t); + } + } + } + + /** + * Lazy to create a timer + */ + static class TimeoutScanner { + private static final Timer TIMER = JRaftUtils.raftTimerFactory().createTimer("read-index.timeout.scanner"); + } } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java index 36314e55d..6bd7a5f83 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java @@ -40,6 +40,7 @@ import com.alipay.sofa.jraft.Closure; import com.alipay.sofa.jraft.FSMCaller; import com.alipay.sofa.jraft.JRaftServiceFactory; +import com.alipay.sofa.jraft.JRaftUtils; import com.alipay.sofa.jraft.Node; import com.alipay.sofa.jraft.NodeManager; import com.alipay.sofa.jraft.ReadOnlyService; @@ -153,10 +154,8 @@ public class NodeImpl implements Node, RaftServerService { } } - private final static RaftTimerFactory TIMER_FACTORY = JRaftServiceLoader - .load( - RaftTimerFactory.class) // - .first(); + public final static RaftTimerFactory TIMER_FACTORY = JRaftUtils + .raftTimerFactory(); // Max retry times when applying tasks. private static final int MAX_APPLY_RETRY_TIMES = 3; diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/ReadOnlyServiceImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/ReadOnlyServiceImpl.java index df5afd61b..0df91f6ce 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/ReadOnlyServiceImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/ReadOnlyServiceImpl.java @@ -178,8 +178,8 @@ public void run(final Status status) { } else { // Not applied, add it to pending-notify cache. ReadOnlyServiceImpl.this.pendingNotifyStatus - .computeIfAbsent(readIndexStatus.getIndex(), k -> new ArrayList<>(10)) // - .add(readIndexStatus); + .computeIfAbsent(readIndexStatus.getIndex(), k -> new ArrayList<>(10)) // + .add(readIndexStatus); } } finally { if (doUnlock) { @@ -242,21 +242,21 @@ public boolean init(final ReadOnlyServiceOptions opts) { this.raftOptions = opts.getRaftOptions(); this.scheduledExecutorService = Executors - .newSingleThreadScheduledExecutor(new NamedThreadFactory("ReadOnlyService-PendingNotify-Scanner", true)); + .newSingleThreadScheduledExecutor(new NamedThreadFactory("ReadOnlyService-PendingNotify-Scanner", true)); this.readIndexDisruptor = DisruptorBuilder. newInstance() // - .setEventFactory(new ReadIndexEventFactory()) // - .setRingBufferSize(this.raftOptions.getDisruptorBufferSize()) // - .setThreadFactory(new NamedThreadFactory("JRaft-ReadOnlyService-Disruptor-", true)) // - .setWaitStrategy(new BlockingWaitStrategy()) // - .setProducerType(ProducerType.MULTI) // - .build(); + .setEventFactory(new ReadIndexEventFactory()) // + .setRingBufferSize(this.raftOptions.getDisruptorBufferSize()) // + .setThreadFactory(new NamedThreadFactory("JRaft-ReadOnlyService-Disruptor-", true)) // + .setWaitStrategy(new BlockingWaitStrategy()) // + .setProducerType(ProducerType.MULTI) // + .build(); this.readIndexDisruptor.handleEventsWith(new ReadIndexEventHandler()); this.readIndexDisruptor - .setDefaultExceptionHandler(new LogExceptionHandler(this.getClass().getSimpleName())); + .setDefaultExceptionHandler(new LogExceptionHandler(getClass().getSimpleName())); this.readIndexQueue = this.readIndexDisruptor.start(); if (this.nodeMetrics.getMetricRegistry() != null) { this.nodeMetrics.getMetricRegistry() // - .register("jraft-read-only-service-disruptor", new DisruptorMetricSet(this.readIndexQueue)); + .register("jraft-read-only-service-disruptor", new DisruptorMetricSet(this.readIndexQueue)); } // listen on lastAppliedLogIndex change events. this.fsmCaller.addLastAppliedLogIndexListener(this); diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/Replicator.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/Replicator.java index 70388d698..197f5148c 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/Replicator.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/Replicator.java @@ -61,6 +61,7 @@ import com.alipay.sofa.jraft.util.Requires; import com.alipay.sofa.jraft.util.ThreadId; import com.alipay.sofa.jraft.util.Utils; +import com.alipay.sofa.jraft.util.internal.ThrowUtil; import com.codahale.metrics.Gauge; import com.codahale.metrics.Metric; import com.codahale.metrics.MetricFilter; @@ -1571,21 +1572,27 @@ private boolean sendEntries(final long nextSendingIndex) { final int v = this.version; final long monotonicSendTimeMs = Utils.monotonicMs(); final int seq = getAndIncrementReqSeq(); - final Future rpcFuture = this.rpcService.appendEntries(this.options.getPeerId().getEndpoint(), - request, -1, new RpcResponseClosureAdapter() { - @Override - public void run(final Status status) { - RecycleUtil.recycle(recyclable); - onRpcReturned(Replicator.this.id, RequestType.AppendEntries, status, request, getResponse(), seq, - v, monotonicSendTimeMs); - } + Future rpcFuture = null; + try { + rpcFuture = this.rpcService.appendEntries(this.options.getPeerId().getEndpoint(), request, -1, + new RpcResponseClosureAdapter() { - }); + @Override + public void run(final Status status) { + RecycleUtil.recycle(recyclable); // TODO: recycle on send success, not response received. + onRpcReturned(Replicator.this.id, RequestType.AppendEntries, status, request, getResponse(), + seq, v, monotonicSendTimeMs); + } + }); + } catch (final Throwable t) { + RecycleUtil.recycle(recyclable); + ThrowUtil.throwException(t); + } addInflight(RequestType.AppendEntries, nextSendingIndex, request.getEntriesCount(), request.getData().size(), seq, rpcFuture); - return true; + return true; } public static void sendHeartbeat(final ThreadId id, final RpcResponseClosure closure) { diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/SegmentList.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/SegmentList.java index a7139974d..1828b20a2 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/SegmentList.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/SegmentList.java @@ -63,7 +63,7 @@ public SegmentList(final boolean recycleSegment) { } /** - * A recyclable segment. + * A recyclable segment. * @author boyan(boyan@antfin.com) * * @param @@ -117,6 +117,7 @@ int cap() { return SEGMENT_SIZE - this.pos; } + @SuppressWarnings("SuspiciousSystemArraycopy") private void addAll(final Object[] src, final int srcPos, final int len) { System.arraycopy(src, srcPos, this.elements, this.pos, len); this.pos += len; @@ -261,7 +262,8 @@ public boolean isEmpty() { /** * Remove elements from first until predicate returns false. - * @param predicate + * + * @param predicate predicate functional interface */ public void removeFromFirstWhen(final Predicate predicate) { Segment firstSeg = getFirst(); @@ -293,7 +295,8 @@ public void clear() { /** * Remove elements from last until predicate returns false. - * @param predicate + * + * @param predicate predicate functional interface */ public void removeFromLastWhen(final Predicate predicate) { Segment lastSeg = getLast(); @@ -371,7 +374,7 @@ public void addAll(final Collection coll) { } private Object[] coll2Array(final Collection coll) { - Object[] src = null; + Object[] src; if (coll instanceof ArrayList && UnsafeUtil.hasUnsafe()) { src = LIST_ARRAY_GETTER.get((ArrayList) coll); } else { diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/timer/DefaultRaftTimerFactory.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/timer/DefaultRaftTimerFactory.java index a4143744c..68ec4feff 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/timer/DefaultRaftTimerFactory.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/timer/DefaultRaftTimerFactory.java @@ -95,11 +95,13 @@ public Scheduler getRaftScheduler(final boolean shared, final int workerNum, fin return shared ? SCHEDULER_REF.getRef() : createScheduler(workerNum, name); } - private static Timer createTimer(final String name) { + @Override + public Timer createTimer(final String name) { return new HashedWheelTimer(new NamedThreadFactory(name, true), 1, TimeUnit.MILLISECONDS, 2048); } - private static Scheduler createScheduler(final int workerNum, final String name) { + @Override + public Scheduler createScheduler(final int workerNum, final String name) { return new TimerManager(workerNum, name); } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/timer/RaftTimerFactory.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/timer/RaftTimerFactory.java index dcb60171f..fba500d13 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/timer/RaftTimerFactory.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/timer/RaftTimerFactory.java @@ -32,4 +32,8 @@ public interface RaftTimerFactory { Timer getSnapshotTimer(final boolean shared, final String name); Scheduler getRaftScheduler(final boolean shared, final int workerNum, final String name); + + Timer createTimer(final String name); + + Scheduler createScheduler(final int workerNum, final String name); } diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java index be35a7208..7d957ca08 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java @@ -1368,6 +1368,55 @@ public void run(final Status status, final long index, final byte[] reqCtx) { cluster.stopAll(); } + @Test + public void testReadIndexTimeout() throws Exception { + final List peers = TestUtils.generatePeers(3); + + final TestCluster cluster = new TestCluster("unittest", this.dataPath, peers); + for (final PeerId peer : peers) { + assertTrue(cluster.start(peer.getEndpoint(), false, 300, true)); + } + + // elect leader + cluster.waitLeader(); + + // get leader + final Node leader = cluster.getLeader(); + assertNotNull(leader); + assertEquals(3, leader.listPeers().size()); + // apply tasks to leader + sendTestTaskAndWait(leader); + + assertReadIndex(leader, 11); + + // read from follower + for (final Node follower : cluster.getFollowers()) { + assertNotNull(follower); + assertReadIndex(follower, 11); + } + + // read with null request context + final CountDownLatch latch = new CountDownLatch(1); + final long start = System.currentTimeMillis(); + leader.readIndex(null, new ReadIndexClosure(0) { + + @Override + public void run(final Status status, final long index, final byte[] reqCtx) { + assertNull(reqCtx); + if (status.isOk()) { + System.err.println("Read-index so fast: " + (System.currentTimeMillis() - start) + "ms"); + } else { + assertEquals(status, new Status(RaftError.ETIMEDOUT, "read-index request timeout")); + assertEquals(index, -1); + } + latch.countDown(); + } + }); + latch.await(); + + cluster.stopAll(); + } + @Test public void testReadIndexFromLearner() throws Exception { final List peers = TestUtils.generatePeers(3);