diff --git a/.travis.yml b/.travis.yml index 8e0789f83..2a4ab71da 100644 --- a/.travis.yml +++ b/.travis.yml @@ -16,9 +16,13 @@ --- language: java -jdk: -- oraclejdk8 -# - oraclejdk9 +matrix: + include: + - jdk: oraclejdk8 + - jdk: openjdk11 + env: SKIP_RELEASE=true + - jdk: openjdk12 + env: SKIP_RELEASE=true env: global: @@ -37,4 +41,3 @@ cache: directories: - $HOME/.gradle/caches/ - $HOME/.gradle/wrapper/ - diff --git a/README.md b/README.md index 8a9f85e11..5eacd94f0 100644 --- a/README.md +++ b/README.md @@ -23,10 +23,10 @@ Example: ```groovy dependencies { - implementation 'io.rsocket:rsocket-core:0.11.14' - implementation 'io.rsocket:rsocket-transport-netty:0.11.14' -// implementation 'io.rsocket:rsocket-core:0.11.15.BUILD-SNAPSHOT' -// implementation 'io.rsocket:rsocket-transport-netty:0.11.15.BUILD-SNAPSHOT' + implementation 'io.rsocket:rsocket-core:0.12.2-RC2' + implementation 'io.rsocket:rsocket-transport-netty:0.12.2-RC2' +// implementation 'io.rsocket:rsocket-core:0.12.2-RC3-SNAPSHOT' +// implementation 'io.rsocket:rsocket-transport-netty:0.12.2-RC3-SNAPSHOT' } ``` @@ -91,7 +91,7 @@ or you will get a memory leak. Used correctly this will reduce latency and incre ```java RSocketFactory.receive() // Enable Zero Copy - .payloadDecoder(Frame::retain) + .frameDecoder(PayloadDecoder.ZERO_COPY) .acceptor(new PingHandler()) .transport(TcpServerTransport.create(7878)) .start() @@ -105,7 +105,7 @@ RSocketFactory.receive() Mono client = RSocketFactory.connect() // Enable Zero Copy - .payloadDecoder(Frame::retain) + .frameDecoder(PayloadDecoder.ZERO_COPY) .transport(TcpClientTransport.create(7878)) .start(); ``` diff --git a/artifactory.gradle b/artifactory.gradle index 6e622a610..5626fc61e 100644 --- a/artifactory.gradle +++ b/artifactory.gradle @@ -35,6 +35,9 @@ if (project.hasProperty('bintrayUser') && project.hasProperty('bintrayKey')) { } } } + tasks.named("artifactoryPublish").configure { + onlyIf { System.getenv('SKIP_RELEASE') != "true" } + } } } } diff --git a/bintray.gradle b/bintray.gradle index 6fe0db84b..5015f94e4 100644 --- a/bintray.gradle +++ b/bintray.gradle @@ -55,6 +55,9 @@ if (project.hasProperty('bintrayUser') && project.hasProperty('bintrayKey') && } } } + tasks.named("bintrayUpload").configure { + onlyIf { System.getenv('SKIP_RELEASE') != "true" } + } } } } diff --git a/build.gradle b/build.gradle index bf9901506..7a6cfcebb 100644 --- a/build.gradle +++ b/build.gradle @@ -29,11 +29,11 @@ subprojects { apply plugin: 'io.spring.dependency-management' apply plugin: 'com.github.sherter.google-java-format' - ext['reactor-bom.version'] = 'Californium-SR5' + ext['reactor-bom.version'] = 'Californium-SR8' ext['logback.version'] = '1.2.3' ext['findbugs.version'] = '3.0.2' - ext['netty.version'] = '4.1.31.Final' - ext['netty-boringssl.version'] = '2.0.18.Final' + ext['netty.version'] = '4.1.36.Final' + ext['netty-boringssl.version'] = '2.0.25.Final' ext['hdrhistogram.version'] = '2.1.10' ext['mockito.version'] = '2.25.1' ext['slf4j.version'] = '1.7.25' @@ -116,6 +116,10 @@ subprojects { systemProperty "io.netty.leakDetection.level", "ADVANCED" } + + tasks.named("javadoc").configure { + onlyIf { System.getenv('SKIP_RELEASE') != "true" } + } } plugins.withType(JavaLibraryPlugin) { diff --git a/gradle.properties b/gradle.properties index f5b674ddf..044cb3c77 100644 --- a/gradle.properties +++ b/gradle.properties @@ -11,4 +11,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # -version=0.12.2-RC2 +version=0.12.2-RC3 diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java index 9218135de..539e104b4 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java @@ -16,16 +16,23 @@ package io.rsocket; +import static io.rsocket.keepalive.KeepAliveSupport.ClientKeepAliveSupport; +import static io.rsocket.keepalive.KeepAliveSupport.KeepAlive; + import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.util.ReferenceCountUtil; import io.netty.util.collection.IntObjectHashMap; +import io.rsocket.exceptions.ConnectionErrorException; import io.rsocket.exceptions.Exceptions; import io.rsocket.frame.*; import io.rsocket.frame.decoder.PayloadDecoder; import io.rsocket.internal.LimitableRequestPublisher; import io.rsocket.internal.UnboundedProcessor; import io.rsocket.internal.UnicastMonoProcessor; +import io.rsocket.keepalive.KeepAliveFramesAcceptor; +import io.rsocket.keepalive.KeepAliveHandler; +import io.rsocket.keepalive.KeepAliveSupport; import java.nio.channels.ClosedChannelException; import java.util.Collections; import java.util.Map; @@ -36,11 +43,7 @@ import org.reactivestreams.Processor; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; -import reactor.core.publisher.BaseSubscriber; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.publisher.SignalType; -import reactor.core.publisher.UnicastProcessor; +import reactor.core.publisher.*; /** Client Side of a RSocket socket. Sends {@link ByteBuf}s to a {@link RSocketServer} */ class RSocketClient implements RSocket { @@ -54,6 +57,7 @@ class RSocketClient implements RSocket { private final UnboundedProcessor sendProcessor; private final Lifecycle lifecycle = new Lifecycle(); private final ByteBufAllocator allocator; + private final KeepAliveFramesAcceptor keepAliveFramesAcceptor; /*client requester*/ RSocketClient( @@ -61,7 +65,10 @@ class RSocketClient implements RSocket { DuplexConnection connection, PayloadDecoder payloadDecoder, Consumer errorConsumer, - StreamIdSupplier streamIdSupplier) { + StreamIdSupplier streamIdSupplier, + int keepAliveTickPeriod, + int keepAliveAckTimeout, + KeepAliveHandler keepAliveHandler) { this.allocator = allocator; this.connection = connection; this.payloadDecoder = payloadDecoder; @@ -74,19 +81,40 @@ class RSocketClient implements RSocket { this.sendProcessor = new UnboundedProcessor<>(); connection.onClose().doFinally(signalType -> terminate()).subscribe(null, errorConsumer); - - sendProcessor - .doOnRequest( - r -> { - for (LimitableRequestPublisher lrp : senders.values()) { - lrp.increaseInternalLimit(r); - } - }) - .transform(connection::send) + connection + .send(sendProcessor) .doFinally(this::handleSendProcessorCancel) .subscribe(null, this::handleSendProcessorError); connection.receive().subscribe(this::handleIncomingFrames, errorConsumer); + + if (keepAliveTickPeriod != 0 && keepAliveHandler != null) { + KeepAliveSupport keepAliveSupport = + new ClientKeepAliveSupport(allocator, keepAliveTickPeriod, keepAliveAckTimeout); + this.keepAliveFramesAcceptor = + keepAliveHandler.start(keepAliveSupport, sendProcessor::onNext, this::terminate); + } else { + keepAliveFramesAcceptor = null; + } + } + + /*server requester*/ + RSocketClient( + ByteBufAllocator allocator, + DuplexConnection connection, + PayloadDecoder payloadDecoder, + Consumer errorConsumer, + StreamIdSupplier streamIdSupplier) { + this(allocator, connection, payloadDecoder, errorConsumer, streamIdSupplier, 0, 0, null); + } + + private void terminate(KeepAlive keepAlive) { + String message = + String.format("No keep-alive acks for %d ms", keepAlive.getTimeout().toMillis()); + ConnectionErrorException err = new ConnectionErrorException(message); + lifecycle.setTerminationError(err); + errorConsumer.accept(err); + connection.dispose(); } private void handleSendProcessorError(Throwable t) { @@ -294,7 +322,7 @@ public void accept(long n) { .transform( f -> { LimitableRequestPublisher wrapped = - LimitableRequestPublisher.wrap(f, sendProcessor.available()); + LimitableRequestPublisher.wrap(f); // Need to set this to one for first the frame wrapped.request(1); senders.put(streamId, wrapped); @@ -452,8 +480,9 @@ private void handleStreamZero(FrameType type, ByteBuf frame) { case LEASE: break; case KEEPALIVE: - // KeepAlive is handled by corresponding connection interceptor, - // just release its frame here + if (keepAliveFramesAcceptor != null) { + keepAliveFramesAcceptor.receive(frame); + } break; default: // Ignore unknown frames. Throwing an error will close the socket. diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java b/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java index 2560ac01a..34c113481 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java @@ -16,6 +16,9 @@ package io.rsocket; +import static io.rsocket.internal.ClientSetup.DefaultClientSetup; +import static io.rsocket.internal.ClientSetup.ResumableClientSetup; + import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.rsocket.exceptions.InvalidSetupException; @@ -26,9 +29,8 @@ import io.rsocket.frame.decoder.PayloadDecoder; import io.rsocket.internal.ClientServerInputMultiplexer; import io.rsocket.internal.ClientSetup; -import io.rsocket.internal.KeepAliveData; import io.rsocket.internal.ServerSetup; -import io.rsocket.keepalive.KeepAliveConnection; +import io.rsocket.keepalive.KeepAliveHandler; import io.rsocket.plugins.DuplexConnectionInterceptor; import io.rsocket.plugins.PluginRegistry; import io.rsocket.plugins.Plugins; @@ -40,6 +42,7 @@ import io.rsocket.util.EmptyPayload; import java.time.Duration; import java.util.Objects; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -92,6 +95,8 @@ public static class ClientRSocketFactory implements ClientTransportAcceptor { private Supplier> acceptor = () -> rSocket -> new AbstractRSocket() {}; + private BiFunction biAcceptor; + private Consumer errorConsumer = Throwable::printStackTrace; private int mtu = 0; private PluginRegistry plugins = new PluginRegistry(Plugins.defaultPlugins()); @@ -240,6 +245,12 @@ public ClientTransportAcceptor acceptor(Supplier> acc return StartClient::new; } + public ClientTransportAcceptor acceptor( + BiFunction biAcceptor) { + this.biAcceptor = biAcceptor; + return StartClient::new; + } + public ClientRSocketFactory fragment(int mtu) { this.mtu = mtu; return this; @@ -272,9 +283,10 @@ public Mono start() { return newConnection() .flatMap( connection -> { - ClientSetup clientSetup = clientSetup(); - DuplexConnection wrappedConnection = clientSetup.wrappedConnection(connection); + ClientSetup clientSetup = clientSetup(connection); ByteBuf resumeToken = clientSetup.resumeToken(); + KeepAliveHandler keepAliveHandler = clientSetup.keepAliveHandler(); + DuplexConnection wrappedConnection = clientSetup.connection(); ClientServerInputMultiplexer multiplexer = new ClientServerInputMultiplexer(wrappedConnection, plugins); @@ -285,11 +297,32 @@ public Mono start() { multiplexer.asClientConnection(), payloadDecoder, errorConsumer, - StreamIdSupplier.clientSupplier()); + StreamIdSupplier.clientSupplier(), + keepAliveTickPeriod(), + keepAliveTimeout(), + keepAliveHandler); + + ByteBuf setupFrame = + SetupFrameFlyweight.encode( + allocator, + false, + keepAliveTickPeriod(), + keepAliveTimeout(), + resumeToken, + metadataMimeType, + dataMimeType, + setupPayload.sliceMetadata(), + setupPayload.sliceData()); RSocket wrappedRSocketClient = plugins.applyClient(rSocketClient); - RSocket unwrappedServerSocket = acceptor.get().apply(wrappedRSocketClient); + RSocket unwrappedServerSocket; + if (biAcceptor != null) { + ConnectionSetupPayload setup = ConnectionSetupPayload.create(setupFrame); + unwrappedServerSocket = biAcceptor.apply(setup, wrappedRSocketClient); + } else { + unwrappedServerSocket = acceptor.get().apply(wrappedRSocketClient); + } RSocket wrappedRSocketServer = plugins.applyServer(unwrappedServerSocket); @@ -301,35 +334,24 @@ public Mono start() { payloadDecoder, errorConsumer); - ByteBuf setupFrame = - SetupFrameFlyweight.encode( - allocator, - false, - (int) keepAliveTickPeriod(), - (int) keepAliveTimeout(), - resumeToken, - metadataMimeType, - dataMimeType, - setupPayload.sliceMetadata(), - setupPayload.sliceData()); - return wrappedConnection.sendOne(setupFrame).thenReturn(wrappedRSocketClient); }); } - private long keepAliveTickPeriod() { - return tickPeriod.toMillis(); + private int keepAliveTickPeriod() { + return (int) tickPeriod.toMillis(); } - private long keepAliveTimeout() { - return ackTimeout.toMillis() + tickPeriod.toMillis() * missedAcks; + private int keepAliveTimeout() { + return (int) (ackTimeout.toMillis() + tickPeriod.toMillis() * missedAcks); } - private ClientSetup clientSetup() { + private ClientSetup clientSetup(DuplexConnection startConnection) { if (resumeEnabled) { ByteBuf resumeToken = resumeTokenSupplier.get(); - return new ClientSetup.ResumableClientSetup( + return new ResumableClientSetup( allocator, + startConnection, newConnection(), resumeToken, resumeStoreFactory.apply(resumeToken), @@ -338,21 +360,12 @@ private ClientSetup clientSetup() { resumeStrategySupplier, resumeCleanupStoreOnKeepAlive); } else { - return new ClientSetup.DefaultClientSetup(); + return new DefaultClientSetup(startConnection); } } - private Mono newConnection() { - return transportClient - .get() - .connect(mtu) - .map( - connection -> - KeepAliveConnection.ofClient( - allocator, - connection, - notUsed -> new KeepAliveData(keepAliveTickPeriod(), keepAliveTimeout()), - errorConsumer)); + private Mono newConnection() { + return transportClient.get().connect(mtu); } } } @@ -464,9 +477,6 @@ public Start transport(Supplier> tra } private Mono acceptor(ServerSetup serverSetup, DuplexConnection connection) { - connection = - KeepAliveConnection.ofServer( - allocator, connection, serverSetup::keepAliveData, errorConsumer); ClientServerInputMultiplexer multiplexer = new ClientServerInputMultiplexer(connection, plugins); @@ -512,7 +522,7 @@ private Mono acceptSetup( return serverSetup.acceptRSocketSetup( setupFrame, multiplexer, - wrappedMultiplexer -> { + (keepAliveHandler, wrappedMultiplexer) -> { ConnectionSetupPayload setupPayload = ConnectionSetupPayload.create(setupFrame); RSocketClient rSocketClient = @@ -539,7 +549,10 @@ private Mono acceptSetup( wrappedMultiplexer.asClientConnection(), wrappedRSocketServer, payloadDecoder, - errorConsumer); + errorConsumer, + setupPayload.keepAliveInterval(), + setupPayload.keepAliveMaxLifetime(), + keepAliveHandler); }) .doFinally(signalType -> setupPayload.release()) .then(); diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketServer.java b/rsocket-core/src/main/java/io/rsocket/RSocketServer.java index 5aabb0ffa..6ce5ef88d 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketServer.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketServer.java @@ -16,15 +16,22 @@ package io.rsocket; +import static io.rsocket.keepalive.KeepAliveSupport.KeepAlive; +import static io.rsocket.keepalive.KeepAliveSupport.ServerKeepAliveSupport; + import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.util.ReferenceCountUtil; import io.netty.util.collection.IntObjectHashMap; import io.rsocket.exceptions.ApplicationErrorException; +import io.rsocket.exceptions.ConnectionErrorException; import io.rsocket.frame.*; import io.rsocket.frame.decoder.PayloadDecoder; import io.rsocket.internal.LimitableRequestPublisher; import io.rsocket.internal.UnboundedProcessor; +import io.rsocket.keepalive.KeepAliveFramesAcceptor; +import io.rsocket.keepalive.KeepAliveHandler; +import io.rsocket.keepalive.KeepAliveSupport; import java.util.Collections; import java.util.Map; import java.util.function.Consumer; @@ -34,11 +41,7 @@ import org.reactivestreams.Subscription; import reactor.core.Disposable; import reactor.core.Exceptions; -import reactor.core.publisher.BaseSubscriber; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.publisher.SignalType; -import reactor.core.publisher.UnicastProcessor; +import reactor.core.publisher.*; /** Server side RSocket. Receives {@link ByteBuf}s from a {@link RSocketClient} */ class RSocketServer implements ResponderRSocket { @@ -55,14 +58,28 @@ class RSocketServer implements ResponderRSocket { private final UnboundedProcessor sendProcessor; private final ByteBufAllocator allocator; + private final KeepAliveFramesAcceptor keepAliveFramesAcceptor; - /*server responder*/ + /*client responder*/ RSocketServer( ByteBufAllocator allocator, DuplexConnection connection, RSocket requestHandler, PayloadDecoder payloadDecoder, Consumer errorConsumer) { + this(allocator, connection, requestHandler, payloadDecoder, errorConsumer, 0, 0, null); + } + + /*server responder*/ + RSocketServer( + ByteBufAllocator allocator, + DuplexConnection connection, + RSocket requestHandler, + PayloadDecoder payloadDecoder, + Consumer errorConsumer, + int keepAliveTickPeriod, + int keepAliveAckTimeout, + KeepAliveHandler keepAliveHandler) { this.allocator = allocator; this.connection = connection; @@ -80,14 +97,8 @@ class RSocketServer implements ResponderRSocket { // connections this.sendProcessor = new UnboundedProcessor<>(); - sendProcessor - .doOnRequest( - r -> { - for (LimitableRequestPublisher lrp : sendingLimitableSubscriptions.values()) { - lrp.increaseInternalLimit(r); - } - }) - .transform(connection::send) + connection + .send(sendProcessor) .doFinally(this::handleSendProcessorCancel) .subscribe(null, this::handleSendProcessorError); @@ -101,6 +112,22 @@ class RSocketServer implements ResponderRSocket { receiveDisposable.dispose(); }) .subscribe(null, errorConsumer); + + if (keepAliveTickPeriod != 0 && keepAliveHandler != null) { + KeepAliveSupport keepAliveSupport = + new ServerKeepAliveSupport(allocator, keepAliveTickPeriod, keepAliveAckTimeout); + keepAliveFramesAcceptor = + keepAliveHandler.start(keepAliveSupport, sendProcessor::onNext, this::terminate); + } else { + keepAliveFramesAcceptor = null; + } + } + + private void terminate(KeepAlive keepAlive) { + String message = + String.format("No keep-alive acks for %d ms", keepAlive.getTimeout().toMillis()); + errorConsumer.accept(new ConnectionErrorException(message)); + connection.dispose(); } private void handleSendProcessorError(Throwable t) { @@ -283,23 +310,20 @@ private void handleFrame(ByteBuf frame) { handleCancelFrame(streamId); break; case KEEPALIVE: - // KeepAlive is handled by corresponding connection interceptor, - // just release its frame here + handleKeepAliveFrame(frame); break; case REQUEST_N: handleRequestN(streamId, frame); break; case REQUEST_STREAM: - handleStream( - streamId, - requestStream(payloadDecoder.apply(frame)), - RequestStreamFrameFlyweight.initialRequestN(frame)); + int streamInitialRequestN = RequestStreamFrameFlyweight.initialRequestN(frame); + Payload streamPayload = payloadDecoder.apply(frame); + handleStream(streamId, requestStream(streamPayload), streamInitialRequestN); break; case REQUEST_CHANNEL: - handleChannel( - streamId, - payloadDecoder.apply(frame), - RequestChannelFrameFlyweight.initialRequestN(frame)); + int channelInitialRequestN = RequestChannelFrameFlyweight.initialRequestN(frame); + Payload channelPayload = payloadDecoder.apply(frame); + handleChannel(streamId, channelPayload, channelInitialRequestN); break; case METADATA_PUSH: metadataPush(payloadDecoder.apply(frame)); @@ -427,7 +451,7 @@ private void handleStream(int streamId, Flux response, int initialReque .transform( frameFlux -> { LimitableRequestPublisher payloads = - LimitableRequestPublisher.wrap(frameFlux, sendProcessor.available()); + LimitableRequestPublisher.wrap(frameFlux); sendingLimitableSubscriptions.put(streamId, payloads); payloads.request( initialRequestN >= Integer.MAX_VALUE ? Long.MAX_VALUE : initialRequestN); @@ -493,11 +517,17 @@ private void handleChannel(int streamId, Payload payload, int initialRequestN) { } } + private void handleKeepAliveFrame(ByteBuf frame) { + if (keepAliveFramesAcceptor != null) { + keepAliveFramesAcceptor.receive(frame); + } + } + private void handleCancelFrame(int streamId) { Subscription subscription = sendingSubscriptions.remove(streamId); if (subscription == null) { - subscription = sendingLimitableSubscriptions.get(streamId); + subscription = sendingLimitableSubscriptions.remove(streamId); } if (subscription != null) { diff --git a/rsocket-core/src/main/java/io/rsocket/internal/ClientServerConnection.java b/rsocket-core/src/main/java/io/rsocket/internal/ClientServerConnection.java deleted file mode 100644 index 1b89a6edd..000000000 --- a/rsocket-core/src/main/java/io/rsocket/internal/ClientServerConnection.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2015-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.internal; - -import io.rsocket.DuplexConnection; -import io.rsocket.resume.ResumePositionsConnection; -import io.rsocket.resume.ResumeStateHolder; -import io.rsocket.util.DuplexConnectionProxy; - -class ClientServerConnection extends DuplexConnectionProxy implements ResumePositionsConnection { - - private final DuplexConnection resumeAware; - - public ClientServerConnection(DuplexConnection delegate, DuplexConnection resumeAware) { - super(delegate); - this.resumeAware = resumeAware; - } - - @Override - public void acceptResumeState(ResumeStateHolder resumeStateHolder) { - if (resumeAware instanceof ResumePositionsConnection) { - ((ResumePositionsConnection) resumeAware).acceptResumeState(resumeStateHolder); - } - } -} diff --git a/rsocket-core/src/main/java/io/rsocket/internal/ClientServerInputMultiplexer.java b/rsocket-core/src/main/java/io/rsocket/internal/ClientServerInputMultiplexer.java index d1a83cec5..e69c6631c 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/ClientServerInputMultiplexer.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/ClientServerInputMultiplexer.java @@ -23,7 +23,6 @@ import io.rsocket.frame.FrameUtil; import io.rsocket.plugins.DuplexConnectionInterceptor.Type; import io.rsocket.plugins.PluginRegistry; -import io.rsocket.resume.ResumePositionsConnection; import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,7 +51,7 @@ public class ClientServerInputMultiplexer implements Closeable { private final DuplexConnection serverConnection; private final DuplexConnection clientConnection; private final DuplexConnection source; - private final ResumePositionsConnection clientServerConnection; + private final DuplexConnection clientServerConnection; public ClientServerInputMultiplexer(DuplexConnection source) { this(source, emptyPluginRegistry); @@ -71,8 +70,7 @@ public ClientServerInputMultiplexer(DuplexConnection source, PluginRegistry plug plugins.applyConnection(Type.SERVER, new InternalDuplexConnection(source, server)); clientConnection = plugins.applyConnection(Type.CLIENT, new InternalDuplexConnection(source, client)); - clientServerConnection = - new ClientServerConnection(new InternalDuplexConnection(source, client, server), source); + clientServerConnection = new InternalDuplexConnection(source, client, server); source .receive() @@ -115,7 +113,7 @@ public ClientServerInputMultiplexer(DuplexConnection source, PluginRegistry plug }); } - public ResumePositionsConnection asClientServerConnection() { + public DuplexConnection asClientServerConnection() { return clientServerConnection; } diff --git a/rsocket-core/src/main/java/io/rsocket/internal/ClientSetup.java b/rsocket-core/src/main/java/io/rsocket/internal/ClientSetup.java index 5f92488b1..38217bdc2 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/ClientSetup.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/ClientSetup.java @@ -16,12 +16,15 @@ package io.rsocket.internal; +import static io.rsocket.keepalive.KeepAliveHandler.*; + import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; import io.rsocket.DuplexConnection; -import io.rsocket.keepalive.KeepAliveConnection; +import io.rsocket.keepalive.KeepAliveHandler; import io.rsocket.resume.ClientRSocketSession; +import io.rsocket.resume.ResumableDuplexConnection; import io.rsocket.resume.ResumableFramesStore; import io.rsocket.resume.ResumeStrategy; import java.time.Duration; @@ -29,19 +32,30 @@ import reactor.core.publisher.Mono; public interface ClientSetup { - /*Provide different connections for SETUP / RESUME cases*/ - DuplexConnection wrappedConnection(KeepAliveConnection duplexConnection); - /*Provide different resume tokens for SETUP / RESUME cases*/ + DuplexConnection connection(); + + KeepAliveHandler keepAliveHandler(); + ByteBuf resumeToken(); class DefaultClientSetup implements ClientSetup { + private final DuplexConnection connection; + + public DefaultClientSetup(DuplexConnection connection) { + this.connection = connection; + } @Override - public DuplexConnection wrappedConnection(KeepAliveConnection connection) { + public DuplexConnection connection() { return connection; } + @Override + public KeepAliveHandler keepAliveHandler() { + return new DefaultKeepAliveHandler(connection); + } + @Override public ByteBuf resumeToken() { return Unpooled.EMPTY_BUFFER; @@ -50,35 +64,20 @@ public ByteBuf resumeToken() { class ResumableClientSetup implements ClientSetup { private final ByteBuf resumeToken; - private final ByteBufAllocator allocator; - private final Mono newConnectionFactory; - private final Duration resumeSessionDuration; - private final Supplier resumeStrategySupplier; - private final ResumableFramesStore resumableFramesStore; - private final Duration resumeStreamTimeout; - private final boolean cleanupStoreOnKeepAlive; + private final ResumableDuplexConnection duplexConnection; + private final ResumableKeepAliveHandler keepAliveHandler; public ResumableClientSetup( ByteBufAllocator allocator, - Mono newConnectionFactory, + DuplexConnection connection, + Mono newConnectionFactory, ByteBuf resumeToken, ResumableFramesStore resumableFramesStore, Duration resumeSessionDuration, Duration resumeStreamTimeout, Supplier resumeStrategySupplier, boolean cleanupStoreOnKeepAlive) { - this.allocator = allocator; - this.newConnectionFactory = newConnectionFactory; - this.resumeToken = resumeToken; - this.resumeSessionDuration = resumeSessionDuration; - this.resumeStrategySupplier = resumeStrategySupplier; - this.resumableFramesStore = resumableFramesStore; - this.resumeStreamTimeout = resumeStreamTimeout; - this.cleanupStoreOnKeepAlive = cleanupStoreOnKeepAlive; - } - @Override - public DuplexConnection wrappedConnection(KeepAliveConnection connection) { ClientRSocketSession rSocketSession = new ClientRSocketSession( connection, @@ -90,8 +89,19 @@ public DuplexConnection wrappedConnection(KeepAliveConnection connection) { cleanupStoreOnKeepAlive) .continueWith(newConnectionFactory) .resumeToken(resumeToken); + this.duplexConnection = rSocketSession.resumableConnection(); + this.keepAliveHandler = new ResumableKeepAliveHandler(duplexConnection); + this.resumeToken = resumeToken; + } - return rSocketSession.resumableConnection(); + @Override + public DuplexConnection connection() { + return duplexConnection; + } + + @Override + public KeepAliveHandler keepAliveHandler() { + return keepAliveHandler; } @Override diff --git a/rsocket-core/src/main/java/io/rsocket/internal/KeepAliveData.java b/rsocket-core/src/main/java/io/rsocket/internal/KeepAliveData.java deleted file mode 100644 index 25338df31..000000000 --- a/rsocket-core/src/main/java/io/rsocket/internal/KeepAliveData.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright 2015-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.internal; - -import java.time.Duration; - -public class KeepAliveData { - private final Duration tickPeriod; - private final Duration timeout; - - public KeepAliveData(long tickPeriodMillis, long timeoutMillis) { - this(Duration.ofMillis(tickPeriodMillis), Duration.ofMillis(timeoutMillis)); - } - - public KeepAliveData(Duration tickPeriod, Duration timeout) { - this.tickPeriod = tickPeriod; - this.timeout = timeout; - } - - public Duration getTickPeriod() { - return tickPeriod; - } - - public Duration getTimeout() { - return timeout; - } -} diff --git a/rsocket-core/src/main/java/io/rsocket/internal/LimitableRequestPublisher.java b/rsocket-core/src/main/java/io/rsocket/internal/LimitableRequestPublisher.java index 2eafd3d61..8adb7542a 100755 --- a/rsocket-core/src/main/java/io/rsocket/internal/LimitableRequestPublisher.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/LimitableRequestPublisher.java @@ -56,6 +56,10 @@ public static LimitableRequestPublisher wrap(Publisher source, long pr return new LimitableRequestPublisher<>(source, prefetch); } + public static LimitableRequestPublisher wrap(Publisher source) { + return wrap(source, Long.MAX_VALUE); + } + @Override public void subscribe(CoreSubscriber destination) { synchronized (this) { diff --git a/rsocket-core/src/main/java/io/rsocket/internal/ServerSetup.java b/rsocket-core/src/main/java/io/rsocket/internal/ServerSetup.java index 17e9e53cb..dbd8bc173 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/ServerSetup.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/ServerSetup.java @@ -16,36 +16,31 @@ package io.rsocket.internal; +import static io.rsocket.keepalive.KeepAliveHandler.*; + import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; -import io.rsocket.DuplexConnection; import io.rsocket.exceptions.RejectedResumeException; import io.rsocket.exceptions.UnsupportedSetupException; -import io.rsocket.frame.FrameHeaderFlyweight; -import io.rsocket.frame.FrameType; import io.rsocket.frame.ResumeFrameFlyweight; import io.rsocket.frame.SetupFrameFlyweight; +import io.rsocket.keepalive.KeepAliveHandler; import io.rsocket.resume.*; import io.rsocket.util.ConnectionUtils; import java.time.Duration; +import java.util.function.BiFunction; import java.util.function.Function; -import javax.annotation.Nullable; import reactor.core.publisher.Mono; public interface ServerSetup { - /*accept connection as SETUP*/ + Mono acceptRSocketSetup( ByteBuf frame, ClientServerInputMultiplexer multiplexer, - Function> then); + BiFunction> then); - /*accept connection as RESUME*/ Mono acceptRSocketResume(ByteBuf frame, ClientServerInputMultiplexer multiplexer); - /*get KEEP-ALIVE timings based on start frame: SETUP (directly) /RESUME (lookup by resume token)*/ - @Nullable - KeepAliveData keepAliveData(ByteBuf frame); - default void dispose() {} class DefaultServerSetup implements ServerSetup { @@ -59,7 +54,7 @@ public DefaultServerSetup(ByteBufAllocator allocator) { public Mono acceptRSocketSetup( ByteBuf frame, ClientServerInputMultiplexer multiplexer, - Function> then) { + BiFunction> then) { if (SetupFrameFlyweight.resumeEnabled(frame)) { return sendError(multiplexer, new UnsupportedSetupException("resume not supported")) @@ -69,7 +64,7 @@ public Mono acceptRSocketSetup( multiplexer.dispose(); }); } else { - return then.apply(multiplexer); + return then.apply(new DefaultKeepAliveHandler(multiplexer), multiplexer); } } @@ -84,17 +79,6 @@ public Mono acceptRSocketResume(ByteBuf frame, ClientServerInputMultiplexe }); } - @Override - public KeepAliveData keepAliveData(ByteBuf frame) { - if (FrameHeaderFlyweight.frameType(frame) == FrameType.SETUP) { - return new KeepAliveData( - SetupFrameFlyweight.keepAliveInterval(frame), - SetupFrameFlyweight.keepAliveMaxLifetime(frame)); - } else { - return null; - } - } - private Mono sendError(ClientServerInputMultiplexer multiplexer, Exception exception) { return ConnectionUtils.sendError(allocator, multiplexer, exception); } @@ -127,17 +111,12 @@ public ResumableServerSetup( public Mono acceptRSocketSetup( ByteBuf frame, ClientServerInputMultiplexer multiplexer, - Function> then) { + BiFunction> then) { if (SetupFrameFlyweight.resumeEnabled(frame)) { ByteBuf resumeToken = SetupFrameFlyweight.resumeToken(frame); - KeepAliveData keepAliveData = - new KeepAliveData( - SetupFrameFlyweight.keepAliveInterval(frame), - SetupFrameFlyweight.keepAliveMaxLifetime(frame)); - - DuplexConnection resumableConnection = + ResumableDuplexConnection connection = sessionManager .save( new ServerRSocketSession( @@ -147,12 +126,13 @@ public Mono acceptRSocketSetup( resumeStreamTimeout, resumeStoreFactory, resumeToken, - keepAliveData, cleanupStoreOnKeepAlive)) .resumableConnection(); - return then.apply(new ClientServerInputMultiplexer(resumableConnection)); + return then.apply( + new ResumableKeepAliveHandler(connection), + new ClientServerInputMultiplexer(connection)); } else { - return then.apply(multiplexer); + return then.apply(new DefaultKeepAliveHandler(multiplexer), multiplexer); } } @@ -175,22 +155,6 @@ public Mono acceptRSocketResume(ByteBuf frame, ClientServerInputMultiplexe } } - @Override - public KeepAliveData keepAliveData(ByteBuf frame) { - if (FrameHeaderFlyweight.frameType(frame) == FrameType.SETUP) { - return new KeepAliveData( - SetupFrameFlyweight.keepAliveInterval(frame), - SetupFrameFlyweight.keepAliveMaxLifetime(frame)); - } else { - ServerRSocketSession session = sessionManager.get(ResumeFrameFlyweight.token(frame)); - if (session != null) { - return session.keepAliveData(); - } else { - return null; - } - } - } - private Mono sendError(ClientServerInputMultiplexer multiplexer, Exception exception) { return ConnectionUtils.sendError(allocator, multiplexer, exception); } diff --git a/rsocket-core/src/main/java/io/rsocket/keepalive/KeepAliveConnection.java b/rsocket-core/src/main/java/io/rsocket/keepalive/KeepAliveConnection.java deleted file mode 100644 index 738b13b02..000000000 --- a/rsocket-core/src/main/java/io/rsocket/keepalive/KeepAliveConnection.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Copyright 2015-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.keepalive; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.rsocket.DuplexConnection; -import io.rsocket.exceptions.ConnectionErrorException; -import io.rsocket.frame.FrameHeaderFlyweight; -import io.rsocket.frame.FrameType; -import io.rsocket.internal.KeepAliveData; -import io.rsocket.resume.ResumePositionsConnection; -import io.rsocket.resume.ResumeStateHolder; -import io.rsocket.util.DuplexConnectionProxy; -import io.rsocket.util.Function3; -import java.time.Duration; -import java.util.function.Consumer; -import java.util.function.Function; -import javax.annotation.Nullable; -import org.reactivestreams.Publisher; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; - -public class KeepAliveConnection extends DuplexConnectionProxy - implements ResumePositionsConnection { - - private final MonoProcessor keepAliveHandlerReady = MonoProcessor.create(); - private final ByteBufAllocator allocator; - private final Function keepAliveData; - private final Function3 - keepAliveHandlerFactory; - private final Consumer errorConsumer; - private volatile KeepAliveHandler keepAliveHandler; - private volatile ResumeStateHolder resumeStateHolder; - private volatile boolean keepAliveHandlerStarted; - - public static KeepAliveConnection ofClient( - ByteBufAllocator allocator, - DuplexConnection duplexConnection, - Function keepAliveData, - Consumer errorConsumer) { - - return new KeepAliveConnection( - allocator, duplexConnection, keepAliveData, KeepAliveHandler::ofClient, errorConsumer); - } - - public static KeepAliveConnection ofServer( - ByteBufAllocator allocator, - DuplexConnection duplexConnection, - Function keepAliveData, - Consumer errorConsumer) { - - return new KeepAliveConnection( - allocator, duplexConnection, keepAliveData, KeepAliveHandler::ofServer, errorConsumer); - } - - private KeepAliveConnection( - ByteBufAllocator allocator, - DuplexConnection duplexConnection, - Function keepAliveData, - Function3 keepAliveHandlerFactory, - Consumer errorConsumer) { - super(duplexConnection); - this.allocator = allocator; - this.keepAliveData = keepAliveData; - this.keepAliveHandlerFactory = keepAliveHandlerFactory; - this.errorConsumer = errorConsumer; - keepAliveHandlerReady.subscribe(this::startKeepAlives); - } - - private void startKeepAlives(KeepAliveHandler keepAliveHandler) { - this.keepAliveHandler = keepAliveHandler; - - send(keepAliveHandler.send()).subscribe(null, err -> keepAliveHandler.dispose()); - - keepAliveHandler - .timeout() - .subscribe( - keepAlive -> { - String message = - String.format("No keep-alive acks for %d ms", keepAlive.getTimeoutMillis()); - ConnectionErrorException err = new ConnectionErrorException(message); - errorConsumer.accept(err); - dispose(); - }); - keepAliveHandler.start(); - } - - @Override - public Mono send(Publisher frames) { - return super.send(Flux.from(frames).doOnNext(this::startKeepAliveHandlerOnce)); - } - - @Override - public Flux receive() { - return super.receive() - .doOnNext( - f -> { - if (isKeepAliveFrame(f)) { - long receivedPos = keepAliveHandler.receive(f); - if (receivedPos > 0) { - ResumeStateHolder h = this.resumeStateHolder; - if (h != null) { - h.onImpliedPosition(receivedPos); - } - } - } else { - startKeepAliveHandlerOnce(f); - } - }); - } - - @Override - public Mono onClose() { - return super.onClose() - .doFinally( - s -> { - KeepAliveHandler keepAliveHandler = keepAliveHandlerReady.peek(); - if (keepAliveHandler != null) { - keepAliveHandler.dispose(); - } - }); - } - - @Override - public void acceptResumeState(ResumeStateHolder resumeStateHolder) { - this.resumeStateHolder = resumeStateHolder; - keepAliveHandlerReady.subscribe(h -> h.resumeState(resumeStateHolder)); - } - - private void startKeepAliveHandlerOnce(ByteBuf f) { - if (!keepAliveHandlerStarted && isStartFrame(f)) { - keepAliveHandlerStarted = true; - startKeepAliveHandler(keepAliveData.apply(f)); - } - } - - private static boolean isStartFrame(ByteBuf frame) { - FrameType frameType = FrameHeaderFlyweight.frameType(frame); - return frameType == FrameType.SETUP || frameType == FrameType.RESUME; - } - - private static boolean isKeepAliveFrame(ByteBuf frame) { - return FrameHeaderFlyweight.frameType(frame) == FrameType.KEEPALIVE; - } - - private void startKeepAliveHandler(@Nullable KeepAliveData kad) { - if (kad != null) { - KeepAliveHandler handler = - keepAliveHandlerFactory.apply(allocator, kad.getTickPeriod(), kad.getTimeout()); - keepAliveHandlerReady.onNext(handler); - } - } -} diff --git a/rsocket-core/src/main/java/io/rsocket/keepalive/KeepAliveFramesAcceptor.java b/rsocket-core/src/main/java/io/rsocket/keepalive/KeepAliveFramesAcceptor.java new file mode 100644 index 000000000..6fc96d6d2 --- /dev/null +++ b/rsocket-core/src/main/java/io/rsocket/keepalive/KeepAliveFramesAcceptor.java @@ -0,0 +1,8 @@ +package io.rsocket.keepalive; + +import io.netty.buffer.ByteBuf; + +public interface KeepAliveFramesAcceptor { + + void receive(ByteBuf keepAliveFrame); +} diff --git a/rsocket-core/src/main/java/io/rsocket/keepalive/KeepAliveHandler.java b/rsocket-core/src/main/java/io/rsocket/keepalive/KeepAliveHandler.java index 7a07d2ea5..2535c342b 100644 --- a/rsocket-core/src/main/java/io/rsocket/keepalive/KeepAliveHandler.java +++ b/rsocket-core/src/main/java/io/rsocket/keepalive/KeepAliveHandler.java @@ -1,164 +1,57 @@ -/* - * Copyright 2015-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package io.rsocket.keepalive; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.Unpooled; -import io.rsocket.frame.KeepAliveFrameFlyweight; -import io.rsocket.resume.ResumeStateHolder; -import java.time.Duration; -import java.util.concurrent.atomic.AtomicReference; -import reactor.core.Disposable; -import reactor.core.Disposables; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; -import reactor.core.publisher.UnicastProcessor; - -abstract class KeepAliveHandler implements Disposable { - final ByteBufAllocator allocator; - private final Duration keepAlivePeriod; - private final long keepAliveTimeout; - private volatile ResumeStateHolder resumeStateHolder; - private final UnicastProcessor sent = UnicastProcessor.create(); - private final MonoProcessor timeout = MonoProcessor.create(); - private final AtomicReference intervalDisposable = new AtomicReference<>(); - private volatile long lastReceivedMillis; - - static KeepAliveHandler ofServer( - ByteBufAllocator allocator, Duration keepAlivePeriod, Duration keepAliveTimeout) { - return new KeepAliveHandler.Server(allocator, keepAlivePeriod, keepAliveTimeout); - } - - static KeepAliveHandler ofClient( - ByteBufAllocator allocator, Duration keepAlivePeriod, Duration keepAliveTimeout) { - return new KeepAliveHandler.Client(allocator, keepAlivePeriod, keepAliveTimeout); - } - - private KeepAliveHandler( - ByteBufAllocator allocator, Duration keepAlivePeriod, Duration keepAliveTimeout) { - this.allocator = allocator; - this.keepAlivePeriod = keepAlivePeriod; - this.keepAliveTimeout = keepAliveTimeout.toMillis(); - } - - public void start() { - this.lastReceivedMillis = System.currentTimeMillis(); - intervalDisposable.compareAndSet( - null, Flux.interval(keepAlivePeriod).subscribe(v -> onIntervalTick())); - } - - @Override - public void dispose() { - Disposable d = intervalDisposable.getAndSet(Disposables.disposed()); - if (d != null) { - d.dispose(); - } - sent.onComplete(); - timeout.onComplete(); - } - - public long receive(ByteBuf keepAliveFrame) { - this.lastReceivedMillis = System.currentTimeMillis(); - long remoteLastReceivedPos = KeepAliveFrameFlyweight.lastPosition(keepAliveFrame); - if (KeepAliveFrameFlyweight.respondFlag(keepAliveFrame)) { - long localLastReceivedPos = obtainLastReceivedPos(); - doSend( - KeepAliveFrameFlyweight.encode( - allocator, - false, - localLastReceivedPos, - KeepAliveFrameFlyweight.data(keepAliveFrame).retain())); - } - return remoteLastReceivedPos; - } - - public void resumeState(ResumeStateHolder resumeStateHolder) { - this.resumeStateHolder = resumeStateHolder; - } +import io.rsocket.Closeable; +import io.rsocket.keepalive.KeepAliveSupport.KeepAlive; +import io.rsocket.resume.ResumableDuplexConnection; +import java.util.function.Consumer; - public Flux send() { - return sent; - } - - public Mono timeout() { - return timeout; - } - - abstract void onIntervalTick(); - - void doSend(ByteBuf frame) { - sent.onNext(frame); - } - - void doCheckTimeout() { - long now = System.currentTimeMillis(); - if (now - lastReceivedMillis >= keepAliveTimeout) { - timeout.onNext(new KeepAlive(keepAlivePeriod.toMillis(), keepAliveTimeout)); - } - } +public interface KeepAliveHandler { - long obtainLastReceivedPos() { - return resumeStateHolder != null ? resumeStateHolder.impliedPosition() : 0; - } + KeepAliveFramesAcceptor start( + KeepAliveSupport keepAliveSupport, + Consumer onFrameSent, + Consumer onTimeout); - private static class Server extends KeepAliveHandler { + class DefaultKeepAliveHandler implements KeepAliveHandler { + private final Closeable duplexConnection; - Server(ByteBufAllocator allocator, Duration keepAlivePeriod, Duration keepAliveTimeout) { - super(allocator, keepAlivePeriod, keepAliveTimeout); + public DefaultKeepAliveHandler(Closeable duplexConnection) { + this.duplexConnection = duplexConnection; } @Override - void onIntervalTick() { - doCheckTimeout(); + public KeepAliveFramesAcceptor start( + KeepAliveSupport keepAliveSupport, + Consumer onSendKeepAliveFrame, + Consumer onTimeout) { + duplexConnection.onClose().doFinally(s -> keepAliveSupport.stop()).subscribe(); + return keepAliveSupport + .onSendKeepAliveFrame(onSendKeepAliveFrame) + .onTimeout(onTimeout) + .start(); } } - private static final class Client extends KeepAliveHandler { + class ResumableKeepAliveHandler implements KeepAliveHandler { + private final ResumableDuplexConnection resumableDuplexConnection; - Client(ByteBufAllocator allocator, Duration keepAlivePeriod, Duration keepAliveTimeout) { - super(allocator, keepAlivePeriod, keepAliveTimeout); + public ResumableKeepAliveHandler(ResumableDuplexConnection resumableDuplexConnection) { + this.resumableDuplexConnection = resumableDuplexConnection; } @Override - void onIntervalTick() { - doCheckTimeout(); - doSend( - KeepAliveFrameFlyweight.encode( - allocator, true, obtainLastReceivedPos(), Unpooled.EMPTY_BUFFER)); - } - } - - public static final class KeepAlive { - private final long tickPeriod; - private final long timeoutMillis; - - public KeepAlive(long tickPeriod, long timeoutMillis) { - this.tickPeriod = tickPeriod; - this.timeoutMillis = timeoutMillis; - } - - public long getTickPeriod() { - return tickPeriod; - } - - public long getTimeoutMillis() { - return timeoutMillis; + public KeepAliveFramesAcceptor start( + KeepAliveSupport keepAliveSupport, + Consumer onSendKeepAliveFrame, + Consumer onTimeout) { + resumableDuplexConnection.onResume(keepAliveSupport::start); + resumableDuplexConnection.onDisconnect(keepAliveSupport::stop); + return keepAliveSupport + .resumeState(resumableDuplexConnection) + .onSendKeepAliveFrame(onSendKeepAliveFrame) + .onTimeout(keepAlive -> resumableDuplexConnection.disconnect()) + .start(); } } } diff --git a/rsocket-core/src/main/java/io/rsocket/keepalive/KeepAliveSupport.java b/rsocket-core/src/main/java/io/rsocket/keepalive/KeepAliveSupport.java new file mode 100644 index 000000000..ea8a0de22 --- /dev/null +++ b/rsocket-core/src/main/java/io/rsocket/keepalive/KeepAliveSupport.java @@ -0,0 +1,170 @@ +/* + * Copyright 2015-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.keepalive; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.Unpooled; +import io.rsocket.frame.KeepAliveFrameFlyweight; +import io.rsocket.resume.ResumeStateHolder; +import java.time.Duration; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; + +public abstract class KeepAliveSupport implements KeepAliveFramesAcceptor { + final ByteBufAllocator allocator; + private final Duration keepAliveInterval; + private final Duration keepAliveTimeout; + private final long keepAliveTimeoutMillis; + private volatile Consumer onTimeout; + private volatile Consumer onFrameSent; + private volatile Disposable ticksDisposable; + private final AtomicBoolean started = new AtomicBoolean(); + + private volatile ResumeStateHolder resumeStateHolder; + private volatile long lastReceivedMillis; + + private KeepAliveSupport( + ByteBufAllocator allocator, int keepAliveInterval, int keepAliveTimeout) { + this.allocator = allocator; + this.keepAliveInterval = Duration.ofMillis(keepAliveInterval); + this.keepAliveTimeout = Duration.ofMillis(keepAliveTimeout); + this.keepAliveTimeoutMillis = keepAliveTimeout; + } + + public KeepAliveSupport start() { + this.lastReceivedMillis = System.currentTimeMillis(); + if (started.compareAndSet(false, true)) { + ticksDisposable = Flux.interval(keepAliveInterval).subscribe(v -> onIntervalTick()); + } + return this; + } + + public void stop() { + if (started.compareAndSet(true, false)) { + ticksDisposable.dispose(); + } + } + + @Override + public void receive(ByteBuf keepAliveFrame) { + this.lastReceivedMillis = System.currentTimeMillis(); + if (resumeStateHolder != null) { + long remoteLastReceivedPos = remoteLastReceivedPosition(keepAliveFrame); + resumeStateHolder.onImpliedPosition(remoteLastReceivedPos); + } + if (KeepAliveFrameFlyweight.respondFlag(keepAliveFrame)) { + long localLastReceivedPos = localLastReceivedPosition(); + send( + KeepAliveFrameFlyweight.encode( + allocator, + false, + localLastReceivedPos, + KeepAliveFrameFlyweight.data(keepAliveFrame).retain())); + } + } + + public KeepAliveSupport resumeState(ResumeStateHolder resumeStateHolder) { + this.resumeStateHolder = resumeStateHolder; + return this; + } + + public KeepAliveSupport onSendKeepAliveFrame(Consumer onFrameSent) { + this.onFrameSent = onFrameSent; + return this; + } + + public KeepAliveSupport onTimeout(Consumer onTimeout) { + this.onTimeout = onTimeout; + return this; + } + + abstract void onIntervalTick(); + + void send(ByteBuf frame) { + if (onFrameSent != null) { + onFrameSent.accept(frame); + } + } + + void tryTimeout() { + long now = System.currentTimeMillis(); + if (now - lastReceivedMillis >= keepAliveTimeoutMillis) { + if (onTimeout != null) { + onTimeout.accept(new KeepAlive(keepAliveInterval, keepAliveTimeout)); + } + stop(); + } + } + + long localLastReceivedPosition() { + return resumeStateHolder != null ? resumeStateHolder.impliedPosition() : 0; + } + + long remoteLastReceivedPosition(ByteBuf keepAliveFrame) { + return KeepAliveFrameFlyweight.lastPosition(keepAliveFrame); + } + + public static final class ServerKeepAliveSupport extends KeepAliveSupport { + + public ServerKeepAliveSupport( + ByteBufAllocator allocator, int keepAlivePeriod, int keepAliveTimeout) { + super(allocator, keepAlivePeriod, keepAliveTimeout); + } + + @Override + void onIntervalTick() { + tryTimeout(); + } + } + + public static final class ClientKeepAliveSupport extends KeepAliveSupport { + + public ClientKeepAliveSupport( + ByteBufAllocator allocator, int keepAliveInterval, int keepAliveTimeout) { + super(allocator, keepAliveInterval, keepAliveTimeout); + } + + @Override + void onIntervalTick() { + tryTimeout(); + send( + KeepAliveFrameFlyweight.encode( + allocator, true, localLastReceivedPosition(), Unpooled.EMPTY_BUFFER)); + } + } + + public static final class KeepAlive { + private final Duration tickPeriod; + private final Duration timeoutMillis; + + public KeepAlive(Duration tickPeriod, Duration timeoutMillis) { + this.tickPeriod = tickPeriod; + this.timeoutMillis = timeoutMillis; + } + + public Duration getTickPeriod() { + return tickPeriod; + } + + public Duration getTimeout() { + return timeoutMillis; + } + } +} diff --git a/rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java b/rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java index afe1d4a76..b347642e3 100644 --- a/rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java +++ b/rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java @@ -31,17 +31,16 @@ import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; -public class ClientRSocketSession - implements RSocketSession> { +public class ClientRSocketSession implements RSocketSession> { private static final Logger logger = LoggerFactory.getLogger(ClientRSocketSession.class); private final ResumableDuplexConnection resumableConnection; - private volatile Mono newConnection; + private volatile Mono newConnection; private volatile ByteBuf resumeToken; private final ByteBufAllocator allocator; public ClientRSocketSession( - ResumePositionsConnection duplexConnection, + DuplexConnection duplexConnection, ByteBufAllocator allocator, Duration resumeSessionDuration, Supplier resumeStrategy, @@ -78,7 +77,7 @@ public ClientRSocketSession( errors .doOnNext( retryErr -> - logger.debug("Resumption reconnection error: {}", retryErr)) + logger.debug("Resumption reconnection error", retryErr)) .flatMap( retryErr -> Mono.from(reconnectOnError.apply(clientResume, retryErr)) @@ -114,9 +113,8 @@ public ClientRSocketSession( } @Override - public ClientRSocketSession continueWith( - Mono newConnection) { - this.newConnection = newConnection; + public ClientRSocketSession continueWith(Mono connectionFactory) { + this.newConnection = connectionFactory; return this; } @@ -151,12 +149,12 @@ public ClientRSocketSession resumeToken(ByteBuf resumeToken) { } @Override - public void reconnect(ResumePositionsConnection connection) { + public void reconnect(DuplexConnection connection) { resumableConnection.reconnect(connection); } @Override - public DuplexConnection resumableConnection() { + public ResumableDuplexConnection resumableConnection() { return resumableConnection; } diff --git a/rsocket-core/src/main/java/io/rsocket/resume/ClientResume.java b/rsocket-core/src/main/java/io/rsocket/resume/ClientResume.java index a8f9eba05..415a77f92 100644 --- a/rsocket-core/src/main/java/io/rsocket/resume/ClientResume.java +++ b/rsocket-core/src/main/java/io/rsocket/resume/ClientResume.java @@ -19,7 +19,7 @@ import io.netty.buffer.ByteBuf; import java.time.Duration; -class ClientResume { +public class ClientResume { private final Duration sessionDuration; private final ByteBuf resumeToken; diff --git a/rsocket-core/src/main/java/io/rsocket/resume/ClientResumeConfiguration.java b/rsocket-core/src/main/java/io/rsocket/resume/ClientResumeConfiguration.java deleted file mode 100644 index 0eb8b20d3..000000000 --- a/rsocket-core/src/main/java/io/rsocket/resume/ClientResumeConfiguration.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright 2015-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.resume; - -import java.time.Duration; -import java.util.function.Supplier; - -public class ClientResumeConfiguration { - private final Duration sessionDuration; - private final Supplier resumeStrategy; - private final ResumableFramesStore resumableFramesStore; - private final Duration resumeStreamTimeout; - - public ClientResumeConfiguration( - Duration sessionDuration, - Supplier resumeStrategy, - ResumableFramesStore resumableFramesStore, - Duration resumeStreamTimeout) { - this.sessionDuration = sessionDuration; - this.resumeStrategy = resumeStrategy; - this.resumableFramesStore = resumableFramesStore; - this.resumeStreamTimeout = resumeStreamTimeout; - } - - public Duration sessionDuration() { - return sessionDuration; - } - - public Supplier resumptionStrategy() { - return resumeStrategy; - } - - public ResumableFramesStore resumeStore() { - return resumableFramesStore; - } - - public Duration resumeStreamTimeout() { - return resumeStreamTimeout; - } -} diff --git a/rsocket-core/src/main/java/io/rsocket/resume/InMemoryResumableFramesStore.java b/rsocket-core/src/main/java/io/rsocket/resume/InMemoryResumableFramesStore.java index 8a8692cab..1875b7eac 100644 --- a/rsocket-core/src/main/java/io/rsocket/resume/InMemoryResumableFramesStore.java +++ b/rsocket-core/src/main/java/io/rsocket/resume/InMemoryResumableFramesStore.java @@ -84,24 +84,22 @@ public void releaseFrames(long remoteImpliedPos) { @Override public Flux resumeStream() { - return Flux.create( - s -> { - int size = cachedFrames.size(); - int refCnt = upstreamFrameRefCnt; - logger.debug("{} Resuming stream size: {}", tag, size); - /*spsc queue has no iterator - iterating by consuming*/ - for (int i = 0; i < size; i++) { + return Flux.generate( + () -> new ResumeStreamState(cachedFrames.size(), upstreamFrameRefCnt), + (state, sink) -> { + if (state.next()) { + /*spsc queue has no iterator - iterating by consuming*/ ByteBuf frame = cachedFrames.poll(); - /*in the event of connection termination some frames - * are not released on DuplexConnection*/ - if (frame.refCnt() == refCnt) { + if (state.shouldRetain(frame)) { frame.retain(); } cachedFrames.offer(frame); - s.next(frame); + sink.next(frame); + } else { + sink.complete(); + logger.debug("{} Resuming stream completed", tag); } - s.complete(); - logger.debug("{} Resuming stream completed", tag); + return state; }); } @@ -177,11 +175,35 @@ void saveFrame(ByteBuf frame) { } } - private static Queue cachedFramesQueue(int size) { + static class ResumeStreamState { + private final int cacheSize; + private final int expectedRefCnt; + private int cacheCounter; + + public ResumeStreamState(int cacheSize, int expectedRefCnt) { + this.cacheSize = cacheSize; + this.expectedRefCnt = expectedRefCnt; + } + + public boolean next() { + if (cacheCounter < cacheSize) { + cacheCounter++; + return true; + } else { + return false; + } + } + + public boolean shouldRetain(ByteBuf frame) { + return frame.refCnt() == expectedRefCnt; + } + } + + static Queue cachedFramesQueue(int size) { return Queues.get(size).get(); } - private class FramesSubscriber implements Subscriber { + class FramesSubscriber implements Subscriber { private final long firstRequestSize; private final long refillSize; private int received; diff --git a/rsocket-core/src/main/java/io/rsocket/resume/RSocketSession.java b/rsocket-core/src/main/java/io/rsocket/resume/RSocketSession.java index b2a1829a9..7ec0abaee 100644 --- a/rsocket-core/src/main/java/io/rsocket/resume/RSocketSession.java +++ b/rsocket-core/src/main/java/io/rsocket/resume/RSocketSession.java @@ -25,13 +25,13 @@ public interface RSocketSession extends Closeable { ByteBuf token(); - DuplexConnection resumableConnection(); + ResumableDuplexConnection resumableConnection(); - RSocketSession continueWith(T ResumeAwareConnectionFactory); + RSocketSession continueWith(T ConnectionFactory); RSocketSession resumeWith(ByteBuf resumeFrame); - void reconnect(ResumePositionsConnection connection); + void reconnect(DuplexConnection connection); @Override default Mono onClose() { diff --git a/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java index e46a1927f..49401d560 100644 --- a/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java @@ -34,8 +34,9 @@ import reactor.core.publisher.*; import reactor.util.concurrent.Queues; -class ResumableDuplexConnection implements DuplexConnection, ResumeStateHolder { +public class ResumableDuplexConnection implements DuplexConnection, ResumeStateHolder { private static final Logger logger = LoggerFactory.getLogger(ResumableDuplexConnection.class); + private static final Throwable closedChannelException = new ClosedChannelException(); private final String tag; private final ResumableFramesStore resumableFramesStore; @@ -44,13 +45,14 @@ class ResumableDuplexConnection implements DuplexConnection, ResumeStateHolder { private final ReplayProcessor connections = ReplayProcessor.create(1); private final EmitterProcessor connectionErrors = EmitterProcessor.create(); - private volatile ResumePositionsConnection curConnection; + private volatile DuplexConnection curConnection; /*used instead of EmitterProcessor because its autocancel=false capability had no expected effect*/ private final FluxProcessor downStreamFrames = ReplayProcessor.create(0); private final FluxProcessor resumeSaveFrames = EmitterProcessor.create(); private final MonoProcessor resumeSaveCompleted = MonoProcessor.create(); private final Queue actions = Queues.unboundedMultiproducer().get(); private final AtomicInteger actionsWip = new AtomicInteger(); + private final AtomicBoolean disposed = new AtomicBoolean(); private final Mono framesSent; private final RequestListener downStreamRequestListener = new RequestListener(); @@ -63,13 +65,14 @@ class ResumableDuplexConnection implements DuplexConnection, ResumeStateHolder { resumeSaveStreamRequestListener.requests(), this::dispatch); + private volatile Runnable onResume; + private volatile Runnable onDisconnect; private volatile int state; private volatile Disposable resumedStreamDisposable = Disposables.disposed(); - private final AtomicBoolean disposed = new AtomicBoolean(); - ResumableDuplexConnection( + public ResumableDuplexConnection( String tag, - ResumePositionsConnection duplexConnection, + DuplexConnection duplexConnection, ResumableFramesStore resumableFramesStore, Duration resumeStreamTimeout, boolean cleanupOnKeepAlive) { @@ -102,14 +105,28 @@ class ResumableDuplexConnection implements DuplexConnection, ResumeStateHolder { reconnect(duplexConnection); } + public void disconnect() { + DuplexConnection c = this.curConnection; + if (c != null) { + disconnect(c); + } + } + + public void onDisconnect(Runnable onDisconnectAction) { + this.onDisconnect = onDisconnectAction; + } + + public void onResume(Runnable onResumeAction) { + this.onResume = onResumeAction; + } + /*reconnected by session after error. After this downstream can receive frames, * but sending in suppressed until resume() is called*/ - public void reconnect(ResumePositionsConnection connection) { + public void reconnect(DuplexConnection connection) { if (curConnection == null) { logger.debug("{} Resumable duplex connection started with connection: {}", tag, connection); state = State.CONNECTED; onNewConnection(connection); - acceptRemoteResumePositions(); } else { logger.debug( "{} Resumable duplex connection reconnected with connection: {}", tag, connection); @@ -199,10 +216,6 @@ public boolean isDisposed() { return disposed.get(); } - private void acceptRemoteResumePositions() { - curConnection.acceptResumeState(this); - } - private void sendFrame(ByteBuf f) { /*resuming from store so no need to save again*/ if (state != State.RESUME && isResumableFrame(f)) { @@ -233,7 +246,7 @@ private void dispatch(Object action) { } } - private void doResumeStart(ResumePositionsConnection connection) { + private void doResumeStart(DuplexConnection connection) { state = State.RESUME_STARTED; resumedStreamDisposable.dispose(); upstreamSubscriber.resumeStart(); @@ -275,6 +288,13 @@ private void doResume( sendResumeFrame .apply(impliedPositionOrError) + .doOnSuccess( + v -> { + Runnable r = this.onResume; + if (r != null) { + r.run(); + } + }) .then( streamResumedFrames( resumableFramesStore @@ -299,7 +319,6 @@ private void doResumeComplete() { logger.debug("Completing resumption"); state = State.RESUME_COMPLETED; upstreamSubscriber.resumeComplete(); - acceptRemoteResumePositions(); } private Mono streamResumedFrames(Flux frames) { @@ -314,7 +333,7 @@ private Mono streamResumedFrames(Flux frames) { }); } - private void onNewConnection(ResumePositionsConnection connection) { + private void onNewConnection(DuplexConnection connection) { curConnection = connection; connection.onClose().doFinally(v -> disconnect(connection)).subscribe(); connections.onNext(connection); @@ -323,10 +342,17 @@ private void onNewConnection(ResumePositionsConnection connection) { private void disconnect(DuplexConnection connection) { /*do not report late disconnects on old connection if new one is available*/ if (curConnection == connection && state != State.DISCONNECTED) { - Throwable err = new ClosedChannelException(); + connection.dispose(); state = State.DISCONNECTED; - logger.debug("{} Inner connection disconnected: {}", tag, err.getClass().getSimpleName()); - connectionErrors.onNext(err); + logger.debug( + "{} Inner connection disconnected: {}", + tag, + closedChannelException.getClass().getSimpleName()); + connectionErrors.onNext(closedChannelException); + Runnable r = this.onDisconnect; + if (r != null) { + r.run(); + } } } @@ -361,9 +387,9 @@ static class State { } class ResumeStart implements Runnable { - private ResumePositionsConnection connection; + private final DuplexConnection connection; - public ResumeStart(ResumePositionsConnection connection) { + public ResumeStart(DuplexConnection connection) { this.connection = connection; } diff --git a/rsocket-core/src/main/java/io/rsocket/resume/ResumePositionsConnection.java b/rsocket-core/src/main/java/io/rsocket/resume/ResumePositionsConnection.java deleted file mode 100644 index 07f865131..000000000 --- a/rsocket-core/src/main/java/io/rsocket/resume/ResumePositionsConnection.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright 2015-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.resume; - -import io.rsocket.DuplexConnection; - -public interface ResumePositionsConnection extends DuplexConnection { - - void acceptResumeState(ResumeStateHolder resumeStateHolder); -} diff --git a/rsocket-core/src/main/java/io/rsocket/resume/ResumptionState.java b/rsocket-core/src/main/java/io/rsocket/resume/ResumptionState.java deleted file mode 100644 index 4ca1bcc1f..000000000 --- a/rsocket-core/src/main/java/io/rsocket/resume/ResumptionState.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright 2015-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.resume; - -import java.util.Objects; - -class ResumptionState { - private final long pos; - private final long impliedPos; - - ResumptionState(long pos, long impliedPos) { - this.pos = pos; - this.impliedPos = impliedPos; - } - - public static ResumptionState fromServer(long impliedPos) { - return new ResumptionState(-1, impliedPos); - } - - public static ResumptionState fromClient(long pos, long impliedPos) { - return new ResumptionState(pos, impliedPos); - } - - public boolean isServer() { - return pos < 0; - } - - public long position() { - return pos; - } - - public long impliedPosition() { - return impliedPos; - } - - @Override - public String toString() { - return "ResumptionState{" + "pos=" + pos + ", impliedPos=" + impliedPos + '}'; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - ResumptionState that = (ResumptionState) o; - return pos == that.pos && impliedPos == that.impliedPos; - } - - @Override - public int hashCode() { - return Objects.hash(pos, impliedPos); - } -} diff --git a/rsocket-core/src/main/java/io/rsocket/resume/ServerRSocketSession.java b/rsocket-core/src/main/java/io/rsocket/resume/ServerRSocketSession.java index d2a42e99b..1a0605497 100644 --- a/rsocket-core/src/main/java/io/rsocket/resume/ServerRSocketSession.java +++ b/rsocket-core/src/main/java/io/rsocket/resume/ServerRSocketSession.java @@ -23,7 +23,6 @@ import io.rsocket.frame.ErrorFrameFlyweight; import io.rsocket.frame.ResumeFrameFlyweight; import io.rsocket.frame.ResumeOkFrameFlyweight; -import io.rsocket.internal.KeepAliveData; import java.time.Duration; import java.util.function.Function; import org.slf4j.Logger; @@ -32,28 +31,25 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.ReplayProcessor; -public class ServerRSocketSession implements RSocketSession { +public class ServerRSocketSession implements RSocketSession { private static final Logger logger = LoggerFactory.getLogger(ServerRSocketSession.class); private final ResumableDuplexConnection resumableConnection; /*used instead of EmitterProcessor because its autocancel=false capability had no expected effect*/ - private final FluxProcessor newConnections = + private final FluxProcessor newConnections = ReplayProcessor.create(0); private final ByteBufAllocator allocator; - private final KeepAliveData keepAliveData; private final ByteBuf resumeToken; public ServerRSocketSession( - ResumePositionsConnection duplexConnection, + DuplexConnection duplexConnection, ByteBufAllocator allocator, Duration resumeSessionDuration, Duration resumeStreamTimeout, Function resumeStoreFactory, ByteBuf resumeToken, - KeepAliveData keepAliveData, boolean cleanupStoreOnKeepAlive) { this.allocator = allocator; - this.keepAliveData = keepAliveData; this.resumeToken = resumeToken; this.resumableConnection = new ResumableDuplexConnection( @@ -63,19 +59,19 @@ public ServerRSocketSession( resumeStreamTimeout, cleanupStoreOnKeepAlive); - Mono timeout = + Mono timeout = resumableConnection .connectionErrors() .flatMap( err -> { - logger.debug("Starting session timeout due to error: {}", err); + logger.debug("Starting session timeout due to error", err); return newConnections .next() .doOnNext(c -> logger.debug("Connection after error: {}", c)) .timeout(resumeSessionDuration); }) .then() - .cast(ResumePositionsConnection.class); + .cast(DuplexConnection.class); newConnections .mergeWith(timeout) @@ -91,9 +87,9 @@ public ServerRSocketSession( } @Override - public ServerRSocketSession continueWith(ResumePositionsConnection newConnection) { - logger.debug("Server continued with connection: {}", newConnection); - newConnections.onNext(newConnection); + public ServerRSocketSession continueWith(DuplexConnection connectionFactory) { + logger.debug("Server continued with connection: {}", connectionFactory); + newConnections.onNext(connectionFactory); return this; } @@ -121,12 +117,12 @@ public ServerRSocketSession resumeWith(ByteBuf resumeFrame) { } @Override - public void reconnect(ResumePositionsConnection connection) { + public void reconnect(DuplexConnection connection) { resumableConnection.reconnect(connection); } @Override - public DuplexConnection resumableConnection() { + public ResumableDuplexConnection resumableConnection() { return resumableConnection; } @@ -135,10 +131,6 @@ public ByteBuf token() { return resumeToken; } - public KeepAliveData keepAliveData() { - return keepAliveData; - } - private Mono sendFrame(ByteBuf frame) { logger.debug("Sending Resume frame: {}", frame); return resumableConnection.sendOne(frame).onErrorResume(e -> Mono.empty()); diff --git a/rsocket-core/src/main/java/io/rsocket/resume/ServerResumeConfiguration.java b/rsocket-core/src/main/java/io/rsocket/resume/ServerResumeConfiguration.java deleted file mode 100644 index 067db2bb8..000000000 --- a/rsocket-core/src/main/java/io/rsocket/resume/ServerResumeConfiguration.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright 2015-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.resume; - -import io.netty.buffer.ByteBuf; -import java.time.Duration; -import java.util.function.Function; - -public class ServerResumeConfiguration { - private final Duration sessionDuration; - private final Duration resumeStreamTimeout; - private final Function resumeStoreFactory; - - public ServerResumeConfiguration( - Duration sessionDuration, - Duration resumeStreamTimeout, - Function resumeStoreFactory) { - this.sessionDuration = sessionDuration; - this.resumeStreamTimeout = resumeStreamTimeout; - this.resumeStoreFactory = resumeStoreFactory; - } - - public Duration sessionDuration() { - return sessionDuration; - } - - public Duration resumeStreamTimeout() { - return resumeStreamTimeout; - } - - public Function resumeStoreFactory() { - return resumeStoreFactory; - } -} diff --git a/rsocket-core/src/test/java/io/rsocket/KeepAliveTest.java b/rsocket-core/src/test/java/io/rsocket/KeepAliveTest.java index f8708e602..aededb804 100644 --- a/rsocket-core/src/test/java/io/rsocket/KeepAliveTest.java +++ b/rsocket-core/src/test/java/io/rsocket/KeepAliveTest.java @@ -16,221 +16,355 @@ package io.rsocket; +import static io.rsocket.keepalive.KeepAliveHandler.DefaultKeepAliveHandler; +import static io.rsocket.keepalive.KeepAliveHandler.ResumableKeepAliveHandler; + import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; -import io.netty.util.ReferenceCountUtil; import io.rsocket.exceptions.ConnectionErrorException; import io.rsocket.frame.FrameHeaderFlyweight; import io.rsocket.frame.FrameType; import io.rsocket.frame.KeepAliveFrameFlyweight; -import io.rsocket.frame.SetupFrameFlyweight; -import io.rsocket.internal.KeepAliveData; -import io.rsocket.keepalive.KeepAliveConnection; -import io.rsocket.resume.ResumeStateHolder; +import io.rsocket.resume.InMemoryResumableFramesStore; +import io.rsocket.resume.ResumableDuplexConnection; import io.rsocket.test.util.TestDuplexConnection; -import java.nio.charset.StandardCharsets; +import io.rsocket.util.DefaultPayload; import java.time.Duration; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; import java.util.List; import java.util.function.Consumer; -import java.util.function.Function; -import java.util.stream.Collectors; +import java.util.function.Supplier; +import java.util.stream.Stream; import org.assertj.core.api.Assertions; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; public class KeepAliveTest { - private static final int TICK_PERIOD = 100; - private static final int TIMEOUT = 700; + private static final int KEEP_ALIVE_INTERVAL = 100; + private static final int KEEP_ALIVE_TIMEOUT = 1000; + private static final int RESUMABLE_KEEP_ALIVE_TIMEOUT = 200; + + static Stream> rSocketStates() { + return Stream.of( + requester(KEEP_ALIVE_INTERVAL, KEEP_ALIVE_TIMEOUT), + responder(KEEP_ALIVE_INTERVAL, KEEP_ALIVE_TIMEOUT)); + } - private TestDuplexConnection testConnection; - private Function timingsProvider; - private List errors; - private Consumer errorConsumer; - private KeepAliveConnection clientConnection; - private KeepAliveConnection serverConnection; - private ByteBufAllocator allocator; + static Stream> resumableRSocketStates() { + return Stream.of( + resumableRequester(KEEP_ALIVE_INTERVAL, RESUMABLE_KEEP_ALIVE_TIMEOUT), + resumableResponder(KEEP_ALIVE_INTERVAL, RESUMABLE_KEEP_ALIVE_TIMEOUT)); + } - @BeforeEach - void setUp() { - allocator = ByteBufAllocator.DEFAULT; - testConnection = new TestDuplexConnection(); + static Supplier requester(int tickPeriod, int timeout) { + return () -> { + TestDuplexConnection connection = new TestDuplexConnection(); + Errors errors = new Errors(); + RSocketClient rSocket = + new RSocketClient( + ByteBufAllocator.DEFAULT, + connection, + DefaultPayload::create, + errors, + StreamIdSupplier.clientSupplier(), + tickPeriod, + timeout, + new DefaultKeepAliveHandler(connection)); + return new RSocketState(rSocket, errors, connection); + }; + } - timingsProvider = f -> new KeepAliveData(TICK_PERIOD, TIMEOUT); - errors = new ArrayList<>(); - errorConsumer = errors::add; + static Supplier responder(int tickPeriod, int timeout) { + return () -> { + TestDuplexConnection connection = new TestDuplexConnection(); + AbstractRSocket handler = new AbstractRSocket() {}; + Errors errors = new Errors(); + RSocketServer rSocket = + new RSocketServer( + ByteBufAllocator.DEFAULT, + connection, + handler, + DefaultPayload::create, + errors, + tickPeriod, + timeout, + new DefaultKeepAliveHandler(connection)); + return new RSocketState(rSocket, errors, connection); + }; + } - clientConnection = - KeepAliveConnection.ofClient(allocator, testConnection, timingsProvider, errorConsumer); - serverConnection = - KeepAliveConnection.ofServer(allocator, testConnection, timingsProvider, errorConsumer); + static Supplier resumableRequester(int tickPeriod, int timeout) { + return () -> { + TestDuplexConnection connection = new TestDuplexConnection(); + ResumableDuplexConnection resumableConnection = + new ResumableDuplexConnection( + "test", + connection, + new InMemoryResumableFramesStore("test", 10_000), + Duration.ofSeconds(10), + false); + + Errors errors = new Errors(); + RSocketClient rSocket = + new RSocketClient( + ByteBufAllocator.DEFAULT, + resumableConnection, + DefaultPayload::create, + errors, + StreamIdSupplier.clientSupplier(), + tickPeriod, + timeout, + new ResumableKeepAliveHandler(resumableConnection)); + return new ResumableRSocketState(rSocket, errors, connection, resumableConnection); + }; } - @Test - void clientNoFramesBeforeSetup() { + static Supplier resumableResponder(int tickPeriod, int timeout) { + return () -> { + AbstractRSocket handler = new AbstractRSocket() {}; + TestDuplexConnection connection = new TestDuplexConnection(); + ResumableDuplexConnection resumableConnection = + new ResumableDuplexConnection( + "test", + connection, + new InMemoryResumableFramesStore("test", 10_000), + Duration.ofSeconds(10), + false); + Errors errors = new Errors(); + RSocketServer rSocket = + new RSocketServer( + ByteBufAllocator.DEFAULT, + resumableConnection, + handler, + DefaultPayload::create, + errors, + tickPeriod, + timeout, + new ResumableKeepAliveHandler(resumableConnection)); + return new ResumableRSocketState(rSocket, errors, connection, resumableConnection); + }; + } - Mono.delay(Duration.ofSeconds(1)).block(); + @ParameterizedTest + @MethodSource("rSocketStates") + void rSocketNotDisposedOnPresentKeepAlives(Supplier testDataSupplier) { + RSocketState RSocketState = testDataSupplier.get(); + TestDuplexConnection connection = RSocketState.connection(); - Assertions.assertThat(errors).isEmpty(); - Assertions.assertThat(clientConnection.isDisposed()).isFalse(); - Assertions.assertThat(clientConnection.availability()).isEqualTo(testConnection.availability()); - Assertions.assertThat(testConnection.getSent()).isEmpty(); - } + Flux.interval(Duration.ofMillis(100)) + .subscribe( + n -> + connection.addToReceivedBuffer( + KeepAliveFrameFlyweight.encode( + ByteBufAllocator.DEFAULT, true, 0, Unpooled.EMPTY_BUFFER))); - @Test - void clientFramesAfterSetup() { - clientConnection.sendOne(setupFrame()).subscribe(); + Mono.delay(Duration.ofMillis(1500)).block(); - Mono.delay(Duration.ofMillis(500)).block(); + RSocket rSocket = RSocketState.rSocket(); + List errors = RSocketState.errors().errors(); + Assertions.assertThat(rSocket.isDisposed()).isFalse(); Assertions.assertThat(errors).isEmpty(); - Assertions.assertThat(clientConnection.isDisposed()).isFalse(); - Collection sent = testConnection.getSent(); - Collection sentAfterSetup = - sent.stream().filter(f -> frameType(f) != FrameType.SETUP).collect(Collectors.toList()); - - Assertions.assertThat(sentAfterSetup).isNotEmpty(); - sentAfterSetup.forEach( - f -> { - Assertions.assertThat(frameType(f)).isEqualTo(FrameType.KEEPALIVE); - Assertions.assertThat(KeepAliveFrameFlyweight.respondFlag(f)).isEqualTo(true); - Assertions.assertThat(KeepAliveFrameFlyweight.lastPosition(f)).isEqualTo(0); - }); + } - sent.forEach(ReferenceCountUtil::safeRelease); + @ParameterizedTest + @MethodSource("rSocketStates") + void noKeepAlivesSentAfterRSocketDispose(Supplier testDataSupplier) { + RSocketState RSocketState = testDataSupplier.get(); + RSocketState.rSocket().dispose(); + StepVerifier.create( + Flux.from(RSocketState.connection().getSentAsPublisher()).take(Duration.ofMillis(500))) + .expectComplete() + .verify(Duration.ofSeconds(1)); } - @Test - void clientCloseOnMissingKeepalives() { - clientConnection.sendOne(setupFrame()).subscribe(); + @ParameterizedTest + @MethodSource("rSocketStates") + void rSocketDisposedOnMissingKeepAlives(Supplier testDataSupplier) { + RSocketState rSocketState = testDataSupplier.get(); + RSocket rSocket = rSocketState.rSocket(); - Mono.delay(Duration.ofSeconds(1)).block(); + Mono.delay(Duration.ofMillis(1500)).block(); + List errors = rSocketState.errors().errors(); + Assertions.assertThat(rSocket.isDisposed()).isTrue(); Assertions.assertThat(errors).hasSize(1); - Throwable err = errors.get(0); - Assertions.assertThat(err).isExactlyInstanceOf(ConnectionErrorException.class); - Assertions.assertThat(err.getMessage()).isEqualTo("No keep-alive acks for 700 ms"); - Assertions.assertThat(clientConnection.isDisposed()).isTrue(); - - testConnection.getSent().forEach(ReferenceCountUtil::safeRelease); + Throwable throwable = errors.get(0); + Assertions.assertThat(throwable).isInstanceOf(ConnectionErrorException.class); } @Test - void clientResumptionState() { - TestResumeStateHolder resumeStateHolder = new TestResumeStateHolder(); - clientConnection.acceptResumeState(resumeStateHolder); - - clientConnection.sendOne(setupFrame()).subscribe(); - clientConnection.receive().subscribe(); - - testConnection.addToReceivedBuffer(keepAliveFrame(false, 1)); - testConnection.addToReceivedBuffer(keepAliveFrame(false, 2)); - testConnection.addToReceivedBuffer(keepAliveFrame(false, 3)); - - Mono.delay(Duration.ofMillis(500)).block(); - - List receivedPositions = resumeStateHolder.receivedImpliedPositions(); - - Collection sent = testConnection.getSent(); - List sentPositions = - sent.stream() - .filter(f -> frameType(f) != FrameType.SETUP) - .map(KeepAliveFrameFlyweight::lastPosition) - .limit(4) - .collect(Collectors.toList()); - - Assertions.assertThat(sentPositions).isEqualTo(Arrays.asList(1L, 5L, 6L, 8L)); - Assertions.assertThat(receivedPositions).isEqualTo(Arrays.asList(1L, 2L, 3L)); - - sent.forEach(ReferenceCountUtil::safeRelease); + void clientRequesterSendsKeepAlives() { + RSocketState RSocketState = requester(100, 1000).get(); + TestDuplexConnection connection = RSocketState.connection(); + + StepVerifier.create(Flux.from(connection.getSentAsPublisher()).take(3)) + .expectNextMatches(this::keepAliveFrameWithRespondFlag) + .expectNextMatches(this::keepAliveFrameWithRespondFlag) + .expectNextMatches(this::keepAliveFrameWithRespondFlag) + .expectComplete() + .verify(Duration.ofSeconds(5)); } @Test - void serverFrames() { - serverConnection.sendOne(setupFrame()).subscribe(); - serverConnection.receive().subscribe(); - testConnection.addToReceivedBuffer(keepAliveFrame(true, 0)); + void serverResponderSendsKeepAlives() { + RSocketState RSocketState = responder(100, 1000).get(); + TestDuplexConnection connection = RSocketState.connection(); + Mono.delay(Duration.ofMillis(100)) + .subscribe( + l -> + connection.addToReceivedBuffer( + KeepAliveFrameFlyweight.encode( + ByteBufAllocator.DEFAULT, true, 0, Unpooled.EMPTY_BUFFER))); + + StepVerifier.create(Flux.from(connection.getSentAsPublisher()).take(1)) + .expectNextMatches(this::keepAliveFrameWithoutRespondFlag) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } - Assertions.assertThat(errors).isEmpty(); - Assertions.assertThat(clientConnection.isDisposed()).isFalse(); + @Test + void resumableRequesterNoKeepAlivesAfterDisconnect() { + ResumableRSocketState rSocketState = + resumableRequester(KEEP_ALIVE_INTERVAL, KEEP_ALIVE_TIMEOUT).get(); + TestDuplexConnection testConnection = rSocketState.connection(); + ResumableDuplexConnection resumableDuplexConnection = rSocketState.resumableDuplexConnection(); - Collection sent = testConnection.getSent(); - Collection sentAfterSetup = - sent.stream().filter(f -> frameType(f) != FrameType.SETUP).collect(Collectors.toList()); - Assertions.assertThat(sentAfterSetup).isNotEmpty(); + resumableDuplexConnection.disconnect(); - sentAfterSetup.forEach( - f -> { - Assertions.assertThat(frameType(f)).isEqualTo(FrameType.KEEPALIVE); - Assertions.assertThat(KeepAliveFrameFlyweight.respondFlag(f)).isEqualTo(false); - Assertions.assertThat(KeepAliveFrameFlyweight.lastPosition(f)).isEqualTo(0); - }); + StepVerifier.create(Flux.from(testConnection.getSentAsPublisher()).take(Duration.ofMillis(500))) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } - sent.forEach(ReferenceCountUtil::safeRelease); + @Test + void resumableRequesterKeepAlivesAfterReconnect() { + ResumableRSocketState rSocketState = + resumableRequester(KEEP_ALIVE_INTERVAL, KEEP_ALIVE_TIMEOUT).get(); + ResumableDuplexConnection resumableDuplexConnection = rSocketState.resumableDuplexConnection(); + resumableDuplexConnection.disconnect(); + TestDuplexConnection newTestConnection = new TestDuplexConnection(); + resumableDuplexConnection.reconnect(newTestConnection); + resumableDuplexConnection.resume(0, 0, ignored -> Mono.empty()); + + StepVerifier.create(Flux.from(newTestConnection.getSentAsPublisher()).take(1)) + .expectNextMatches(this::keepAliveFrame) + .expectComplete() + .verify(Duration.ofSeconds(5)); } @Test - void serverCloseOnMissingKeepalives() { - serverConnection.sendOne(setupFrame()).subscribe(); + void resumableRequesterNoKeepAlivesAfterDispose() { + ResumableRSocketState rSocketState = + resumableRequester(KEEP_ALIVE_INTERVAL, KEEP_ALIVE_TIMEOUT).get(); + rSocketState.rSocket().dispose(); + StepVerifier.create( + Flux.from(rSocketState.connection().getSentAsPublisher()).take(Duration.ofMillis(500))) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } - Mono.delay(Duration.ofSeconds(1)).block(); + @ParameterizedTest + @MethodSource("resumableRSocketStates") + void resumableRSocketsNotDisposedOnMissingKeepAlives( + Supplier testDataSupplier) { + ResumableRSocketState rSocketState = testDataSupplier.get(); + RSocket rSocket = rSocketState.rSocket(); + List errors = rSocketState.errors().errors(); + TestDuplexConnection connection = rSocketState.connection(); - Assertions.assertThat(errors).hasSize(1); - Throwable err = errors.get(0); - Assertions.assertThat(err).isExactlyInstanceOf(ConnectionErrorException.class); - Assertions.assertThat(err.getMessage()).isEqualTo("No keep-alive acks for 700 ms"); - Assertions.assertThat(clientConnection.isDisposed()).isTrue(); + Mono.delay(Duration.ofMillis(500)).block(); - testConnection.getSent().forEach(ReferenceCountUtil::safeRelease); + Assertions.assertThat(rSocket.isDisposed()).isFalse(); + Assertions.assertThat(errors).hasSize(0); + Assertions.assertThat(connection.isDisposed()).isTrue(); } - private ByteBuf keepAliveFrame(boolean respond, int pos) { - return KeepAliveFrameFlyweight.encode(allocator, respond, pos, Unpooled.EMPTY_BUFFER); + private boolean keepAliveFrame(ByteBuf frame) { + return FrameHeaderFlyweight.frameType(frame) == FrameType.KEEPALIVE; } - private ByteBuf setupFrame() { - return SetupFrameFlyweight.encode( - allocator, - false, - TICK_PERIOD, - TIMEOUT, - "metadataType", - "dataType", - byteBuf("metadata"), - byteBuf("data")); + private boolean keepAliveFrameWithRespondFlag(ByteBuf frame) { + return keepAliveFrame(frame) && KeepAliveFrameFlyweight.respondFlag(frame); } - private ByteBuf byteBuf(String msg) { - return Unpooled.wrappedBuffer(msg.getBytes(StandardCharsets.UTF_8)); + private boolean keepAliveFrameWithoutRespondFlag(ByteBuf frame) { + return keepAliveFrame(frame) && !KeepAliveFrameFlyweight.respondFlag(frame); } - private static FrameType frameType(ByteBuf frame) { - return FrameHeaderFlyweight.frameType(frame); + static class RSocketState { + private final RSocket rSocket; + private final Errors errors; + private final TestDuplexConnection connection; + + public RSocketState(RSocket rSocket, Errors errors, TestDuplexConnection connection) { + this.rSocket = rSocket; + this.errors = errors; + this.connection = connection; + } + + public TestDuplexConnection connection() { + return connection; + } + + public RSocket rSocket() { + return rSocket; + } + + public Errors errors() { + return errors; + } } - private static class TestResumeStateHolder implements ResumeStateHolder { - private final List sentPositions = Arrays.asList(1L, 5L, 6L, 8L); - private final List receivedPositions = new ArrayList<>(); - private int counter = 0; + static class ResumableRSocketState { + private final RSocket rSocket; + private final Errors errors; + private final TestDuplexConnection connection; + private final ResumableDuplexConnection resumableDuplexConnection; + + public ResumableRSocketState( + RSocket rSocket, + Errors errors, + TestDuplexConnection connection, + ResumableDuplexConnection resumableDuplexConnection) { + this.rSocket = rSocket; + this.errors = errors; + this.connection = connection; + this.resumableDuplexConnection = resumableDuplexConnection; + } + + public TestDuplexConnection connection() { + return connection; + } - @Override - public long impliedPosition() { - long res = sentPositions.get(counter); - counter = Math.min(counter + 1, sentPositions.size() - 1); - return res; + public ResumableDuplexConnection resumableDuplexConnection() { + return resumableDuplexConnection; } + public RSocket rSocket() { + return rSocket; + } + + public Errors errors() { + return errors; + } + } + + static class Errors implements Consumer { + private final List errors = new ArrayList<>(); + @Override - public void onImpliedPosition(long remoteImpliedPos) { - receivedPositions.add(remoteImpliedPos); + public void accept(Throwable throwable) { + errors.add(throwable); } - public List receivedImpliedPositions() { - return receivedPositions; + public List errors() { + return new ArrayList<>(errors); } } } diff --git a/rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java b/rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java index 2494cbbca..c60dba312 100644 --- a/rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java +++ b/rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java @@ -28,15 +28,12 @@ import io.rsocket.exceptions.ApplicationErrorException; import io.rsocket.exceptions.RejectedSetupException; import io.rsocket.frame.*; -import io.rsocket.test.util.TestDuplexConnection; import io.rsocket.test.util.TestSubscriber; import io.rsocket.util.DefaultPayload; import io.rsocket.util.EmptyPayload; import java.time.Duration; import java.util.ArrayList; import java.util.List; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.stream.Collectors; import org.assertj.core.api.Assertions; import org.junit.Rule; @@ -214,32 +211,6 @@ public void testChannelRequestServerSideCancellation() { Assertions.assertThat(request.isDisposed()).isTrue(); } - @Test(timeout = 2_000) - @SuppressWarnings("unchecked") - public void - testClientSideRequestChannelShouldNotHangInfinitelySendingElementsAndShouldProduceDataValuingConnectionBackpressure() { - final Queue requests = new ConcurrentLinkedQueue<>(); - rule.connection.dispose(); - rule.connection = new TestDuplexConnection(); - rule.connection.setInitialSendRequestN(256); - rule.init(); - - rule.socket - .requestChannel( - Flux.generate(s -> s.next(EmptyPayload.INSTANCE)).doOnRequest(requests::add)) - .subscribe(); - - int streamId = rule.getStreamIdForRequestType(REQUEST_CHANNEL); - - assertThat("Unexpected error.", rule.errors, is(empty())); - - rule.connection.addToReceivedBuffer( - RequestNFrameFlyweight.encode(ByteBufAllocator.DEFAULT, streamId, 2)); - rule.connection.addToReceivedBuffer( - RequestNFrameFlyweight.encode(ByteBufAllocator.DEFAULT, streamId, Integer.MAX_VALUE)); - Assertions.assertThat(requests).containsOnly(1L, 2L, 253L); - } - public int sendRequestResponse(Publisher response) { Subscriber sub = TestSubscriber.create(); response.subscribe(sub); diff --git a/rsocket-core/src/test/java/io/rsocket/RSocketServerTest.java b/rsocket-core/src/test/java/io/rsocket/RSocketServerTest.java index 9f2541975..32c0406b9 100644 --- a/rsocket-core/src/test/java/io/rsocket/RSocketServerTest.java +++ b/rsocket-core/src/test/java/io/rsocket/RSocketServerTest.java @@ -29,17 +29,12 @@ import io.rsocket.util.DefaultPayload; import io.rsocket.util.EmptyPayload; import java.util.Collection; -import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; -import org.assertj.core.api.Assertions; import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; -import org.mockito.Mockito; -import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class RSocketServerTest { @@ -111,89 +106,6 @@ public Mono requestResponse(Payload payload) { assertThat("Subscription not cancelled.", cancelled.get(), is(true)); } - @Test(timeout = 2_000) - @SuppressWarnings("unchecked") - public void - testServerSideRequestStreamShouldNotHangInfinitelySendingElementsAndShouldProduceDataValuingConnectionBackpressure() { - final int streamId = 5; - final Queue received = new ConcurrentLinkedQueue<>(); - final Queue requests = new ConcurrentLinkedQueue<>(); - - rule.setAcceptingSocket( - new AbstractRSocket() { - @Override - public Flux requestStream(Payload payload) { - return Flux.generate(s -> s.next(payload.retain())).doOnRequest(requests::add); - } - }, - 256); - - rule.sendRequest(streamId, FrameType.REQUEST_STREAM); - - assertThat("Unexpected error.", rule.errors, is(empty())); - - Subscriber next = rule.connection.getSendSubscribers().iterator().next(); - - Mockito.doAnswer( - invocation -> { - received.add(invocation.getArgument(0)); - - if (received.size() == 256) { - throw new RuntimeException(); - } - - return null; - }) - .when(next) - .onNext(Mockito.any()); - - rule.connection.addToReceivedBuffer( - RequestNFrameFlyweight.encode(ByteBufAllocator.DEFAULT, streamId, Integer.MAX_VALUE)); - Assertions.assertThat(requests).containsOnly(1L, 2L, 253L); - } - - @Test(timeout = 2_000) - @SuppressWarnings("unchecked") - public void - testServerSideRequestChannelShouldNotHangInfinitelySendingElementsAndShouldProduceDataValuingConnectionBackpressure() { - final int streamId = 5; - final Queue received = new ConcurrentLinkedQueue<>(); - final Queue requests = new ConcurrentLinkedQueue<>(); - - rule.setAcceptingSocket( - new AbstractRSocket() { - @Override - public Flux requestChannel(Publisher payload) { - return Flux.generate(s -> s.next(EmptyPayload.INSTANCE)) - .doOnRequest(requests::add); - } - }, - 256); - - rule.sendRequest(streamId, FrameType.REQUEST_CHANNEL); - - assertThat("Unexpected error.", rule.errors, is(empty())); - - Subscriber next = rule.connection.getSendSubscribers().iterator().next(); - - Mockito.doAnswer( - invocation -> { - received.add(invocation.getArgument(0)); - - if (received.size() == 256) { - throw new RuntimeException(); - } - - return null; - }) - .when(next) - .onNext(Mockito.any()); - - rule.connection.addToReceivedBuffer( - RequestNFrameFlyweight.encode(ByteBufAllocator.DEFAULT, streamId, Integer.MAX_VALUE)); - Assertions.assertThat(requests).containsOnly(1L, 2L, 253L); - } - public static class ServerSocketRule extends AbstractSocketRule { private RSocket acceptingSocket; diff --git a/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/resume/Files.java b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/resume/Files.java new file mode 100644 index 000000000..e6867f8b5 --- /dev/null +++ b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/resume/Files.java @@ -0,0 +1,134 @@ +package io.rsocket.examples.transport.tcp.resume; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.rsocket.Payload; +import java.io.*; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import reactor.core.publisher.Flux; +import reactor.core.publisher.SynchronousSink; + +class Files { + + public static Flux fileSource(String fileName, int chunkSizeBytes) { + return Flux.generate( + () -> new FileState(fileName, chunkSizeBytes), FileState::consumeNext, FileState::dispose); + } + + public static Subscriber fileSink(String fileName, int windowSize) { + return new Subscriber() { + Subscription s; + int requests = windowSize; + OutputStream outputStream; + int receivedBytes; + int receivedCount; + + @Override + public void onSubscribe(Subscription s) { + this.s = s; + this.s.request(requests); + } + + @Override + public void onNext(Payload payload) { + ByteBuf data = payload.data(); + receivedBytes += data.readableBytes(); + receivedCount += 1; + System.out.println( + "Received file chunk: " + receivedCount + ". Total size: " + receivedBytes); + if (outputStream == null) { + outputStream = open(fileName); + } + write(outputStream, data); + payload.release(); + + requests--; + if (requests == windowSize / 2) { + requests += windowSize; + s.request(windowSize); + } + } + + private void write(OutputStream outputStream, ByteBuf byteBuf) { + try { + byteBuf.readBytes(outputStream, byteBuf.readableBytes()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void onError(Throwable t) { + close(outputStream); + } + + @Override + public void onComplete() { + close(outputStream); + } + + private OutputStream open(String filename) { + try { + /*do not buffer for demo purposes*/ + return new FileOutputStream(filename); + } catch (FileNotFoundException e) { + throw new RuntimeException(e); + } + } + + private void close(OutputStream stream) { + if (stream != null) { + try { + stream.close(); + } catch (IOException e) { + } + } + } + }; + } + + private static class FileState { + private final String fileName; + private final int chunkSizeBytes; + private BufferedInputStream inputStream; + private byte[] chunkBytes; + + public FileState(String fileName, int chunkSizeBytes) { + this.fileName = fileName; + this.chunkSizeBytes = chunkSizeBytes; + } + + public FileState consumeNext(SynchronousSink sink) { + if (inputStream == null) { + InputStream in = getClass().getClassLoader().getResourceAsStream(fileName); + if (in == null) { + sink.error(new FileNotFoundException(fileName)); + return this; + } + this.inputStream = new BufferedInputStream(in); + this.chunkBytes = new byte[chunkSizeBytes]; + } + try { + int consumedBytes = inputStream.read(chunkBytes); + if (consumedBytes == -1) { + sink.complete(); + } else { + sink.next(Unpooled.copiedBuffer(chunkBytes, 0, consumedBytes)); + } + } catch (IOException e) { + sink.error(e); + } + return this; + } + + public void dispose() { + if (inputStream != null) { + try { + inputStream.close(); + } catch (IOException e) { + } + } + } + } +} diff --git a/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/resume/ResumeFileTransfer.java b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/resume/ResumeFileTransfer.java new file mode 100644 index 000000000..df8e801a6 --- /dev/null +++ b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/resume/ResumeFileTransfer.java @@ -0,0 +1,120 @@ +package io.rsocket.examples.transport.tcp.resume; + +import io.rsocket.AbstractRSocket; +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.RSocketFactory; +import io.rsocket.resume.ClientResume; +import io.rsocket.resume.PeriodicResumeStrategy; +import io.rsocket.resume.ResumeStrategy; +import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.transport.netty.server.CloseableChannel; +import io.rsocket.transport.netty.server.TcpServerTransport; +import io.rsocket.util.DefaultPayload; +import java.time.Duration; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class ResumeFileTransfer { + + public static void main(String[] args) { + RequestCodec requestCodec = new RequestCodec(); + + CloseableChannel server = + RSocketFactory.receive() + .resume() + .resumeSessionDuration(Duration.ofMinutes(5)) + .acceptor((setup, rSocket) -> Mono.just(new FileServer(requestCodec))) + .transport(TcpServerTransport.create("localhost", 8000)) + .start() + .block(); + + RSocket client = + RSocketFactory.connect() + .resume() + .resumeStrategy( + () -> new VerboseResumeStrategy(new PeriodicResumeStrategy(Duration.ofSeconds(1)))) + .resumeSessionDuration(Duration.ofMinutes(5)) + .transport(TcpClientTransport.create("localhost", 8001)) + .start() + .block(); + + client + .requestStream(requestCodec.encode(new Request(16, "lorem.txt"))) + .doFinally(s -> server.dispose()) + .subscribe(Files.fileSink("rsocket-examples/out/lorem_output.txt", 256)); + + server.onClose().block(); + } + + private static class FileServer extends AbstractRSocket { + private final RequestCodec requestCodec; + + public FileServer(RequestCodec requestCodec) { + this.requestCodec = requestCodec; + } + + @Override + public Flux requestStream(Payload payload) { + Request request = requestCodec.decode(payload); + payload.release(); + String fileName = request.getFileName(); + int chunkSize = request.getChunkSize(); + + Flux ticks = Flux.interval(Duration.ofMillis(500)).onBackpressureDrop(); + + return Files.fileSource(fileName, chunkSize) + .map(DefaultPayload::create) + .zipWith(ticks, (p, tick) -> p); + } + } + + private static class VerboseResumeStrategy implements ResumeStrategy { + private final ResumeStrategy resumeStrategy; + + public VerboseResumeStrategy(ResumeStrategy resumeStrategy) { + this.resumeStrategy = resumeStrategy; + } + + @Override + public Publisher apply(ClientResume clientResume, Throwable throwable) { + return Flux.from(resumeStrategy.apply(clientResume, throwable)) + .doOnNext(v -> System.out.println("Disconnected. Trying to resume connection...")); + } + } + + private static class RequestCodec { + + public Payload encode(Request request) { + String encoded = request.getChunkSize() + ":" + request.getFileName(); + return DefaultPayload.create(encoded); + } + + public Request decode(Payload payload) { + String encoded = payload.getDataUtf8(); + String[] chunkSizeAndFileName = encoded.split(":"); + int chunkSize = Integer.parseInt(chunkSizeAndFileName[0]); + String fileName = chunkSizeAndFileName[1]; + return new Request(chunkSize, fileName); + } + } + + private static class Request { + private final int chunkSize; + private final String fileName; + + public Request(int chunkSize, String fileName) { + this.chunkSize = chunkSize; + this.fileName = fileName; + } + + public int getChunkSize() { + return chunkSize; + } + + public String getFileName() { + return fileName; + } + } +} diff --git a/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/resume/readme.md b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/resume/readme.md new file mode 100644 index 000000000..55e761fe8 --- /dev/null +++ b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/resume/readme.md @@ -0,0 +1,29 @@ +1. Start socat. It is used for emulation of transport disconnects + +`socat -d TCP-LISTEN:8001,fork,reuseaddr TCP:localhost:8000` + +2. start `ResumeFileTransfer.main` + +3. terminate/start socat periodically for session resumption + +`ResumeFileTransfer` output is as follows + +``` +Received file chunk: 7. Total size: 112 +Received file chunk: 8. Total size: 128 +Received file chunk: 9. Total size: 144 +Received file chunk: 10. Total size: 160 +Disconnected. Trying to resume connection... +Disconnected. Trying to resume connection... +Disconnected. Trying to resume connection... +Disconnected. Trying to resume connection... +Disconnected. Trying to resume connection... +Received file chunk: 11. Total size: 176 +Received file chunk: 12. Total size: 192 +Received file chunk: 13. Total size: 208 +Received file chunk: 14. Total size: 224 +Received file chunk: 15. Total size: 240 +Received file chunk: 16. Total size: 256 +``` + +It transfers file from `resources/lorem.txt` to `build/out/lorem_output.txt` in chunks of 16 bytes every 500 millis diff --git a/rsocket-examples/src/main/resources/lorem.txt b/rsocket-examples/src/main/resources/lorem.txt new file mode 100644 index 000000000..e035ea86d --- /dev/null +++ b/rsocket-examples/src/main/resources/lorem.txt @@ -0,0 +1,32 @@ +Alteration literature to or an sympathize mr imprudence. Of is ferrars subject as enjoyed or tedious cottage. +Procuring as in resembled by in agreeable. Next long no gave mr eyes. Admiration advantages no he celebrated so pianoforte unreserved. +Not its herself forming charmed amiable. Him why feebly expect future now. + +Situation admitting promotion at or to perceived be. Mr acuteness we as estimable enjoyment up. +An held late as felt know. Learn do allow solid to grave. Middleton suspicion age her attention. +Chiefly several bed its wishing. Is so moments on chamber pressed to. Doubtful yet way properly answered humanity its desirous. + Minuter believe service arrived civilly add all. Acuteness allowance an at eagerness favourite in extensive exquisite ye. + + Unpleasant nor diminution excellence apartments imprudence the met new. Draw part them he an to he roof only. + Music leave say doors him. Tore bred form if sigh case as do. Staying he no looking if do opinion. + Sentiments way understood end partiality and his. + + Ladyship it daughter securing procured or am moreover mr. Put sir she exercise vicinity cheerful wondered. + Continual say suspicion provision you neglected sir curiosity unwilling. Simplicity end themselves increasing led day sympathize yet. + General windows effects not are drawing man garrets. Common indeed garden you his ladies out yet. Preference imprudence contrasted to remarkably in on. + Taken now you him trees tears any. Her object giving end sister except oppose. + + No comfort do written conduct at prevent manners on. Celebrated contrasted discretion him sympathize her collecting occasional. + Do answered bachelor occasion in of offended no concerns. Supply worthy warmth branch of no ye. Voice tried known to as my to. + Though wished merits or be. Alone visit use these smart rooms ham. No waiting in on enjoyed placing it inquiry. + + So insisted received is occasion advanced honoured. Among ready to which up. Attacks smiling and may out assured moments man nothing outward. + Thrown any behind afford either the set depend one temper. Instrument melancholy in acceptance collecting frequently be if. + Zealously now pronounce existence add you instantly say offending. Merry their far had widen was. Concerns no in expenses raillery formerly. + + As am hastily invited settled at limited civilly fortune me. Really spring in extent an by. Judge but built gay party world. + Of so am he remember although required. Bachelor unpacked be advanced at. Confined in declared marianne is vicinity. + + In alteration insipidity impression by travelling reasonable up motionless. Of regard warmth by unable sudden garden ladies. + No kept hung am size spot no. Likewise led and dissuade rejoiced welcomed husbands boy. Do listening on he suspected resembled. + Water would still if to. Position boy required law moderate was may. \ No newline at end of file diff --git a/rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java b/rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java index 7bea75318..ba799f763 100644 --- a/rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java +++ b/rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java @@ -374,10 +374,11 @@ public synchronized double availability() { } private synchronized RSocket select() { + refreshSockets(); + if (activeSockets.isEmpty()) { return FAILING_REACTIVE_SOCKET; } - refreshSockets(); int size = activeSockets.size(); if (size == 1) { diff --git a/rsocket-load-balancer/src/test/java/io/rsocket/client/LoadBalancedRSocketMonoTest.java b/rsocket-load-balancer/src/test/java/io/rsocket/client/LoadBalancedRSocketMonoTest.java index b529e426c..4baa106c5 100644 --- a/rsocket-load-balancer/src/test/java/io/rsocket/client/LoadBalancedRSocketMonoTest.java +++ b/rsocket-load-balancer/src/test/java/io/rsocket/client/LoadBalancedRSocketMonoTest.java @@ -19,19 +19,15 @@ import io.rsocket.Payload; import io.rsocket.RSocket; import io.rsocket.client.filter.RSocketSupplier; -import io.rsocket.util.EmptyPayload; -import java.net.InetSocketAddress; -import java.net.SocketAddress; import java.util.Arrays; +import java.util.Collections; import java.util.List; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CompletableFuture; import java.util.function.Function; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -39,11 +35,8 @@ public class LoadBalancedRSocketMonoTest { @Test(timeout = 10_000L) public void testNeverSelectFailingFactories() throws InterruptedException { - InetSocketAddress local0 = InetSocketAddress.createUnresolved("localhost", 7000); - InetSocketAddress local1 = InetSocketAddress.createUnresolved("localhost", 7001); - TestingRSocket socket = new TestingRSocket(Function.identity()); - RSocketSupplier failing = failingClient(local0); + RSocketSupplier failing = failingClient(); RSocketSupplier succeeding = succeedingFactory(socket); List factories = Arrays.asList(failing, succeeding); @@ -52,9 +45,6 @@ public void testNeverSelectFailingFactories() throws InterruptedException { @Test(timeout = 10_000L) public void testNeverSelectFailingSocket() throws InterruptedException { - InetSocketAddress local0 = InetSocketAddress.createUnresolved("localhost", 7000); - InetSocketAddress local1 = InetSocketAddress.createUnresolved("localhost", 7001); - TestingRSocket socket = new TestingRSocket(Function.identity()); TestingRSocket failingSocket = new TestingRSocket(Function.identity()) { @@ -76,6 +66,33 @@ public double availability() { testBalancer(clients); } + @Test(timeout = 10_000L) + public void testRefreshesSocketsOnSelectBeforeReturningFailedAfterNewFactoriesDelivered() { + TestingRSocket socket = new TestingRSocket(Function.identity()); + + CompletableFuture laterSupplier = new CompletableFuture<>(); + Flux> factories = + Flux.create( + s -> { + s.next(Collections.emptyList()); + + laterSupplier.handle( + (RSocketSupplier result, Throwable t) -> { + s.next(Collections.singletonList(result)); + return null; + }); + }); + + LoadBalancedRSocketMono balancer = LoadBalancedRSocketMono.create(factories); + + Assert.assertEquals(0.0, balancer.availability(), 0); + + laterSupplier.complete(succeedingFactory(socket)); + balancer.rSocketMono.block(); + + Assert.assertEquals(1.0, balancer.availability(), 0); + } + private void testBalancer(List factories) throws InterruptedException { Publisher> src = s -> { @@ -92,39 +109,6 @@ private void testBalancer(List factories) throws InterruptedExc Flux.range(0, 100).flatMap(i -> balancer).blockLast(); } - private void makeAcall(RSocket balancer) throws InterruptedException { - CountDownLatch latch = new CountDownLatch(1); - - balancer - .requestResponse(EmptyPayload.INSTANCE) - .subscribe( - new Subscriber() { - @Override - public void onSubscribe(Subscription s) { - s.request(1L); - } - - @Override - public void onNext(Payload payload) { - System.out.println("Successfully receiving a response"); - } - - @Override - public void onError(Throwable t) { - t.printStackTrace(); - Assert.assertTrue(false); - latch.countDown(); - } - - @Override - public void onComplete() { - latch.countDown(); - } - }); - - latch.await(); - } - private static RSocketSupplier succeedingFactory(RSocket socket) { RSocketSupplier mock = Mockito.mock(RSocketSupplier.class); @@ -135,7 +119,7 @@ private static RSocketSupplier succeedingFactory(RSocket socket) { return mock; } - private static RSocketSupplier failingClient(SocketAddress sa) { + private static RSocketSupplier failingClient() { RSocketSupplier mock = Mockito.mock(RSocketSupplier.class); Mockito.when(mock.availability()).thenReturn(0.0); diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/SendPublisher.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/SendPublisher.java deleted file mode 100644 index a697524b3..000000000 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/SendPublisher.java +++ /dev/null @@ -1,312 +0,0 @@ -package io.rsocket.transport.netty; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.Channel; -import io.netty.channel.EventLoop; -import io.netty.util.ReferenceCountUtil; -import io.netty.util.ReferenceCounted; -import java.nio.channels.ClosedChannelException; -import java.util.Queue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import java.util.function.Function; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; -import reactor.core.CoreSubscriber; -import reactor.core.Fuseable; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Operators; -import reactor.netty.FutureMono; -import reactor.util.concurrent.Queues; - -class SendPublisher extends Flux { - - private static final AtomicIntegerFieldUpdater WIP = - AtomicIntegerFieldUpdater.newUpdater(SendPublisher.class, "wip"); - - private static final int MAX_SIZE = Queues.SMALL_BUFFER_SIZE; - private static final int REFILL_SIZE = MAX_SIZE / 2; - private static final AtomicReferenceFieldUpdater INNER_SUBSCRIBER = - AtomicReferenceFieldUpdater.newUpdater(SendPublisher.class, Object.class, "innerSubscriber"); - private static final AtomicIntegerFieldUpdater TERMINATED = - AtomicIntegerFieldUpdater.newUpdater(SendPublisher.class, "terminated"); - private final Publisher source; - private final Channel channel; - private final EventLoop eventLoop; - - private final Queue queue; - private final AtomicBoolean completed = new AtomicBoolean(); - private final Function transformer; - private final SizeOf sizeOf; - - @SuppressWarnings("unused") - private volatile int terminated; - - private int pending; - - @SuppressWarnings("unused") - private volatile int wip; - - @SuppressWarnings("unused") - private volatile Object innerSubscriber; - - private long requested; - - private long requestedUpstream = MAX_SIZE; - - private boolean fuse; - - @SuppressWarnings("unchecked") - SendPublisher( - Publisher source, - Channel channel, - Function transformer, - SizeOf sizeOf) { - this(Queues.small().get(), source, channel, transformer, sizeOf); - } - - @SuppressWarnings("unchecked") - SendPublisher( - Queue queue, - Publisher source, - Channel channel, - Function transformer, - SizeOf sizeOf) { - this.source = source; - this.channel = channel; - this.queue = queue; - this.eventLoop = channel.eventLoop(); - this.transformer = transformer; - this.sizeOf = sizeOf; - - fuse = queue instanceof Fuseable.QueueSubscription; - } - - @SuppressWarnings("unchecked") - private void writeCleanup(V poll) { - if (requested != Long.MAX_VALUE) { - requested--; - } - requestedUpstream--; - pending--; - - InnerSubscriber is = (InnerSubscriber) INNER_SUBSCRIBER.get(SendPublisher.this); - if (is != null) { - is.tryRequestMoreUpstream(); - tryComplete(is); - } - if (poll.refCnt() > 0) { - ReferenceCountUtil.safeRelease(poll); - } - } - - private void tryComplete(InnerSubscriber is) { - if (pending == 0 - && completed.get() - && queue.isEmpty() - && terminated == 0 - && !is.pendingFlush.get()) { - TERMINATED.set(SendPublisher.this, 1); - is.destination.onComplete(); - } - } - - @Override - public void subscribe(CoreSubscriber destination) { - InnerSubscriber innerSubscriber = new InnerSubscriber(destination); - if (!INNER_SUBSCRIBER.compareAndSet(this, null, innerSubscriber)) { - Operators.error( - destination, new IllegalStateException("SendPublisher only allows one subscription")); - } else { - InnerSubscription innerSubscription = new InnerSubscription(innerSubscriber); - destination.onSubscribe(innerSubscription); - source.subscribe(innerSubscriber); - } - } - - @FunctionalInterface - interface SizeOf { - int size(V v); - } - - private class InnerSubscriber implements Subscriber { - final CoreSubscriber destination; - volatile Subscription s; - private AtomicBoolean pendingFlush = new AtomicBoolean(); - - private InnerSubscriber(CoreSubscriber destination) { - this.destination = destination; - FutureMono.from(channel.closeFuture()) - .doFinally(s -> onError(new ClosedChannelException())) - .subscribe(); - } - - @Override - public void onSubscribe(Subscription s) { - this.s = s; - s.request(MAX_SIZE); - tryDrain(); - } - - @Override - public void onNext(ByteBuf t) { - if (terminated == 0) { - if (!fuse && !queue.offer(t)) { - throw new IllegalStateException("missing back pressure"); - } - tryDrain(); - } - } - - @Override - public void onError(Throwable t) { - if (TERMINATED.compareAndSet(SendPublisher.this, 0, 1)) { - try { - s.cancel(); - destination.onError(t); - } finally { - ByteBuf byteBuf = queue.poll(); - while (byteBuf != null) { - ReferenceCountUtil.safeRelease(byteBuf); - byteBuf = queue.poll(); - } - } - } - } - - @Override - public void onComplete() { - if (completed.compareAndSet(false, true)) { - tryDrain(); - } - } - - private void tryRequestMoreUpstream() { - if (requestedUpstream <= REFILL_SIZE && s != null) { - long u = MAX_SIZE - requestedUpstream; - requestedUpstream = Operators.addCap(requestedUpstream, u); - s.request(u); - } - } - - private void flush() { - try { - channel.flush(); - pendingFlush.set(false); - tryComplete(this); - } catch (Throwable t) { - onError(t); - } - } - - private void tryDrain() { - if (terminated == 0 && WIP.getAndIncrement(SendPublisher.this) == 0) { - try { - if (eventLoop.inEventLoop()) { - drain(); - } else { - eventLoop.execute(this::drain); - } - } catch (Throwable t) { - onError(t); - } - } - } - - private void drain() { - try { - boolean scheduleFlush; - int missed = 1; - for (; ; ) { - scheduleFlush = false; - - long r = Math.min(requested, requestedUpstream); - while (r-- > 0) { - ByteBuf ByteBuf = queue.poll(); - if (ByteBuf != null && terminated == 0) { - V poll = transformer.apply(ByteBuf); - int readableBytes = sizeOf.size(poll); - pending++; - if (channel.isWritable() && readableBytes <= channel.bytesBeforeUnwritable()) { - channel - .write(poll) - .addListener( - future -> { - if (future.cause() != null) { - onError(future.cause()); - } else { - writeCleanup(poll); - } - }); - scheduleFlush = true; - } else { - scheduleFlush = false; - channel - .writeAndFlush(poll) - .addListener( - future -> { - if (future.cause() != null) { - onError(future.cause()); - } else { - writeCleanup(poll); - } - }); - } - - tryRequestMoreUpstream(); - } else { - break; - } - } - - if (scheduleFlush) { - pendingFlush.set(true); - eventLoop.execute(this::flush); - } - - if (terminated == 1) { - break; - } - - missed = WIP.addAndGet(SendPublisher.this, -missed); - if (missed == 0) { - break; - } - } - } catch (Throwable t) { - onError(t); - } - } - } - - private class InnerSubscription implements Subscription { - private final InnerSubscriber innerSubscriber; - - private InnerSubscription(InnerSubscriber innerSubscriber) { - this.innerSubscriber = innerSubscriber; - } - - @Override - public void request(long n) { - if (eventLoop.inEventLoop()) { - requested = Operators.addCap(n, requested); - innerSubscriber.tryDrain(); - } else { - eventLoop.execute(() -> request(n)); - } - } - - @Override - public void cancel() { - TERMINATED.set(SendPublisher.this, 1); - while (!queue.isEmpty()) { - ByteBuf poll = queue.poll(); - if (poll != null) { - ReferenceCountUtil.safeRelease(poll); - } - } - } - } -} diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java index ffa5503c8..c9c29f0a9 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java @@ -23,7 +23,6 @@ import io.rsocket.internal.BaseDuplexConnection; import java.util.Objects; import org.reactivestreams.Publisher; -import reactor.core.Fuseable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.netty.Connection; @@ -77,30 +76,15 @@ public Flux receive() { @Override public Mono send(Publisher frames) { - return Flux.from(frames) - .transform( - frameFlux -> { - if (frameFlux instanceof Fuseable.QueueSubscription) { - Fuseable.QueueSubscription queueSubscription = - (Fuseable.QueueSubscription) frameFlux; - queueSubscription.requestFusion(Fuseable.ASYNC); - return new SendPublisher<>( - queueSubscription, - frameFlux, - connection.channel(), - this::encode, - ByteBuf::readableBytes); - } else { - return new SendPublisher<>( - frameFlux, connection.channel(), this::encode, ByteBuf::readableBytes); - } - }) - .then(); + if (frames instanceof Mono) { + return connection.outbound().sendObject(((Mono) frames).map(this::encode)).then(); + } + return connection.outbound().send(Flux.from(frames).map(this::encode)).then(); } private ByteBuf encode(ByteBuf frame) { if (encodeLength) { - return FrameLengthFlyweight.encode(allocator, frame.readableBytes(), frame).retain(); + return FrameLengthFlyweight.encode(allocator, frame.readableBytes(), frame); } else { return frame; } diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java index f83725e47..ead297928 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java @@ -21,11 +21,9 @@ import io.rsocket.internal.BaseDuplexConnection; import java.util.Objects; import org.reactivestreams.Publisher; -import reactor.core.Fuseable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.netty.Connection; -import reactor.util.concurrent.Queues; /** * An implementation of {@link DuplexConnection} that connects via a Websocket. @@ -69,32 +67,15 @@ public Flux receive() { @Override public Mono send(Publisher frames) { - return Flux.from(frames) - .transform( - frameFlux -> { - if (frameFlux instanceof Fuseable.QueueSubscription) { - Fuseable.QueueSubscription queueSubscription = - (Fuseable.QueueSubscription) frameFlux; - queueSubscription.requestFusion(Fuseable.ASYNC); - return new SendPublisher<>( - queueSubscription, - frameFlux, - connection.channel(), - this::toBinaryWebSocketFrame, - binaryWebSocketFrame -> binaryWebSocketFrame.content().readableBytes()); - } else { - return new SendPublisher<>( - Queues.small().get(), - frameFlux, - connection.channel(), - this::toBinaryWebSocketFrame, - binaryWebSocketFrame -> binaryWebSocketFrame.content().readableBytes()); - } - }) + if (frames instanceof Mono) { + return connection + .outbound() + .sendObject(((Mono) frames).map(BinaryWebSocketFrame::new)) + .then(); + } + return connection + .outbound() + .sendObject(Flux.from(frames).map(BinaryWebSocketFrame::new)) .then(); } - - private BinaryWebSocketFrame toBinaryWebSocketFrame(ByteBuf frame) { - return new BinaryWebSocketFrame(frame.retain()); - } }