Skip to content

Commit

Permalink
Merge branch 'release/0.12.2-RC4'
Browse files Browse the repository at this point in the history
  • Loading branch information
robertroeser committed Jun 11, 2019
2 parents fe3b390 + 7a5f9ff commit ab570b4
Show file tree
Hide file tree
Showing 33 changed files with 2,195 additions and 179 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Learn more at http://rsocket.io

## Build and Binaries

[![Build Status](https://travis-ci.org/rsocket/rsocket-java.svg?branch=1.0.x)](https://travis-ci.org/rsocket/rsocket-java)
[![Build Status](https://travis-ci.org/rsocket/rsocket-java.svg?branch=develop)](https://travis-ci.org/rsocket/rsocket-java)

Releases are available via Maven Central.

Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
version=0.12.2-RC3
version=0.12.2-RC4
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package io.rsocket.metadata;

import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.infra.Blackhole;

@BenchmarkMode(Mode.Throughput)
@Fork(value = 1)
@Warmup(iterations = 10)
@Measurement(iterations = 10)
@State(Scope.Thread)
public class WellKnownMimeTypePerf {

// this is the old values() looping implementation of fromIdentifier
private WellKnownMimeType fromIdValuesLoop(int id) {
if (id < 0 || id > 127) {
return WellKnownMimeType.UNPARSEABLE_MIME_TYPE;
}
for (WellKnownMimeType value : WellKnownMimeType.values()) {
if (value.getIdentifier() == id) {
return value;
}
}
return WellKnownMimeType.UNKNOWN_RESERVED_MIME_TYPE;
}

// this is the core of the old values() looping implementation of fromString
private WellKnownMimeType fromStringValuesLoop(String mimeType) {
for (WellKnownMimeType value : WellKnownMimeType.values()) {
if (mimeType.equals(value.getString())) {
return value;
}
}
return WellKnownMimeType.UNPARSEABLE_MIME_TYPE;
}

@Benchmark
public void fromIdArrayLookup(final Blackhole bh) {
// negative lookup
bh.consume(WellKnownMimeType.fromIdentifier(-10));
bh.consume(WellKnownMimeType.fromIdentifier(-1));
// too large lookup
bh.consume(WellKnownMimeType.fromIdentifier(129));
// first lookup
bh.consume(WellKnownMimeType.fromIdentifier(0));
// middle lookup
bh.consume(WellKnownMimeType.fromIdentifier(37));
// reserved lookup
bh.consume(WellKnownMimeType.fromIdentifier(63));
// last lookup
bh.consume(WellKnownMimeType.fromIdentifier(127));
}

@Benchmark
public void fromIdValuesLoopLookup(final Blackhole bh) {
// negative lookup
bh.consume(fromIdValuesLoop(-10));
bh.consume(fromIdValuesLoop(-1));
// too large lookup
bh.consume(fromIdValuesLoop(129));
// first lookup
bh.consume(fromIdValuesLoop(0));
// middle lookup
bh.consume(fromIdValuesLoop(37));
// reserved lookup
bh.consume(fromIdValuesLoop(63));
// last lookup
bh.consume(fromIdValuesLoop(127));
}

@Benchmark
public void fromStringMapLookup(final Blackhole bh) {
// unknown lookup
bh.consume(WellKnownMimeType.fromString("foo/bar"));
// first lookup
bh.consume(WellKnownMimeType.fromString(WellKnownMimeType.APPLICATION_AVRO.getString()));
// middle lookup
bh.consume(WellKnownMimeType.fromString(WellKnownMimeType.VIDEO_VP8.getString()));
// last lookup
bh.consume(
WellKnownMimeType.fromString(
WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString()));
}

@Benchmark
public void fromStringValuesLoopLookup(final Blackhole bh) {
// unknown lookup
bh.consume(fromStringValuesLoop("foo/bar"));
// first lookup
bh.consume(fromStringValuesLoop(WellKnownMimeType.APPLICATION_AVRO.getString()));
// middle lookup
bh.consume(fromStringValuesLoop(WellKnownMimeType.VIDEO_VP8.getString()));
// last lookup
bh.consume(
fromStringValuesLoop(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString()));
}
}
74 changes: 48 additions & 26 deletions rsocket-core/src/main/java/io/rsocket/RSocketFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,25 @@ public ClientRSocketFactory addConnectionPlugin(DuplexConnectionInterceptor inte
plugins.addConnectionPlugin(interceptor);
return this;
}

/** Deprecated. Use {@link #addRequesterPlugin(RSocketInterceptor)} instead */
@Deprecated
public ClientRSocketFactory addClientPlugin(RSocketInterceptor interceptor) {
plugins.addClientPlugin(interceptor);
return addRequesterPlugin(interceptor);
}

public ClientRSocketFactory addRequesterPlugin(RSocketInterceptor interceptor) {
plugins.addRequesterPlugin(interceptor);
return this;
}

/** Deprecated. Use {@link #addResponderPlugin(RSocketInterceptor)} instead */
@Deprecated
public ClientRSocketFactory addServerPlugin(RSocketInterceptor interceptor) {
plugins.addServerPlugin(interceptor);
return addResponderPlugin(interceptor);
}

public ClientRSocketFactory addResponderPlugin(RSocketInterceptor interceptor) {
plugins.addResponderPlugin(interceptor);
return this;
}

Expand Down Expand Up @@ -291,8 +302,8 @@ public Mono<RSocket> start() {
ClientServerInputMultiplexer multiplexer =
new ClientServerInputMultiplexer(wrappedConnection, plugins);

RSocketClient rSocketClient =
new RSocketClient(
RSocketRequester rSocketRequester =
new RSocketRequester(
allocator,
multiplexer.asClientConnection(),
payloadDecoder,
Expand All @@ -314,27 +325,27 @@ public Mono<RSocket> start() {
setupPayload.sliceMetadata(),
setupPayload.sliceData());

RSocket wrappedRSocketClient = plugins.applyClient(rSocketClient);
RSocket wrappedRSocketRequester = plugins.applyRequester(rSocketRequester);

RSocket unwrappedServerSocket;
RSocket rSocketHandler;
if (biAcceptor != null) {
ConnectionSetupPayload setup = ConnectionSetupPayload.create(setupFrame);
unwrappedServerSocket = biAcceptor.apply(setup, wrappedRSocketClient);
rSocketHandler = biAcceptor.apply(setup, wrappedRSocketRequester);
} else {
unwrappedServerSocket = acceptor.get().apply(wrappedRSocketClient);
rSocketHandler = acceptor.get().apply(wrappedRSocketRequester);
}

RSocket wrappedRSocketServer = plugins.applyServer(unwrappedServerSocket);
RSocket wrappedRSocketHandler = plugins.applyResponder(rSocketHandler);

RSocketServer rSocketServer =
new RSocketServer(
RSocketResponder rSocketResponder =
new RSocketResponder(
allocator,
multiplexer.asServerConnection(),
wrappedRSocketServer,
wrappedRSocketHandler,
payloadDecoder,
errorConsumer);

return wrappedConnection.sendOne(setupFrame).thenReturn(wrappedRSocketClient);
return wrappedConnection.sendOne(setupFrame).thenReturn(wrappedRSocketRequester);
});
}

Expand Down Expand Up @@ -397,14 +408,25 @@ public ServerRSocketFactory addConnectionPlugin(DuplexConnectionInterceptor inte
plugins.addConnectionPlugin(interceptor);
return this;
}

/** Deprecated. Use {@link #addRequesterPlugin(RSocketInterceptor)} instead */
@Deprecated
public ServerRSocketFactory addClientPlugin(RSocketInterceptor interceptor) {
plugins.addClientPlugin(interceptor);
return addRequesterPlugin(interceptor);
}

public ServerRSocketFactory addRequesterPlugin(RSocketInterceptor interceptor) {
plugins.addRequesterPlugin(interceptor);
return this;
}

/** Deprecated. Use {@link #addResponderPlugin(RSocketInterceptor)} instead */
@Deprecated
public ServerRSocketFactory addServerPlugin(RSocketInterceptor interceptor) {
plugins.addServerPlugin(interceptor);
return addResponderPlugin(interceptor);
}

public ServerRSocketFactory addResponderPlugin(RSocketInterceptor interceptor) {
plugins.addResponderPlugin(interceptor);
return this;
}

Expand Down Expand Up @@ -525,29 +547,29 @@ private Mono<Void> acceptSetup(
(keepAliveHandler, wrappedMultiplexer) -> {
ConnectionSetupPayload setupPayload = ConnectionSetupPayload.create(setupFrame);

RSocketClient rSocketClient =
new RSocketClient(
RSocketRequester rSocketRequester =
new RSocketRequester(
allocator,
wrappedMultiplexer.asServerConnection(),
payloadDecoder,
errorConsumer,
StreamIdSupplier.serverSupplier());

RSocket wrappedRSocketClient = plugins.applyClient(rSocketClient);
RSocket wrappedRSocketRequester = plugins.applyRequester(rSocketRequester);

return acceptor
.accept(setupPayload, wrappedRSocketClient)
.accept(setupPayload, wrappedRSocketRequester)
.onErrorResume(
err -> sendError(multiplexer, rejectedSetupError(err)).then(Mono.error(err)))
.doOnNext(
unwrappedServerSocket -> {
RSocket wrappedRSocketServer = plugins.applyServer(unwrappedServerSocket);
rSocketHandler -> {
RSocket wrappedRSocketHandler = plugins.applyResponder(rSocketHandler);

RSocketServer rSocketServer =
new RSocketServer(
RSocketResponder rSocketResponder =
new RSocketResponder(
allocator,
wrappedMultiplexer.asClientConnection(),
wrappedRSocketServer,
wrappedRSocketHandler,
payloadDecoder,
errorConsumer,
setupPayload.keepAliveInterval(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@
import org.reactivestreams.Subscriber;
import reactor.core.publisher.*;

/** Client Side of a RSocket socket. Sends {@link ByteBuf}s to a {@link RSocketServer} */
class RSocketClient implements RSocket {
/**
* Requester Side of a RSocket socket. Sends {@link ByteBuf}s to a {@link RSocketResponder} of peer
*/
class RSocketRequester implements RSocket {

private final DuplexConnection connection;
private final PayloadDecoder payloadDecoder;
Expand All @@ -60,7 +62,7 @@ class RSocketClient implements RSocket {
private final KeepAliveFramesAcceptor keepAliveFramesAcceptor;

/*client requester*/
RSocketClient(
RSocketRequester(
ByteBufAllocator allocator,
DuplexConnection connection,
PayloadDecoder payloadDecoder,
Expand Down Expand Up @@ -99,7 +101,7 @@ class RSocketClient implements RSocket {
}

/*server requester*/
RSocketClient(
RSocketRequester(
ByteBufAllocator allocator,
DuplexConnection connection,
PayloadDecoder payloadDecoder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@
import reactor.core.Exceptions;
import reactor.core.publisher.*;

/** Server side RSocket. Receives {@link ByteBuf}s from a {@link RSocketClient} */
class RSocketServer implements ResponderRSocket {
/** Responder side of RSocket. Receives {@link ByteBuf}s from a peer's {@link RSocketRequester} */
class RSocketResponder implements ResponderRSocket {

private final DuplexConnection connection;
private final RSocket requestHandler;
Expand All @@ -61,7 +61,7 @@ class RSocketServer implements ResponderRSocket {
private final KeepAliveFramesAcceptor keepAliveFramesAcceptor;

/*client responder*/
RSocketServer(
RSocketResponder(
ByteBufAllocator allocator,
DuplexConnection connection,
RSocket requestHandler,
Expand All @@ -71,7 +71,7 @@ class RSocketServer implements ResponderRSocket {
}

/*server responder*/
RSocketServer(
RSocketResponder(
ByteBufAllocator allocator,
DuplexConnection connection,
RSocket requestHandler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.rsocket.frame.FrameLengthFlyweight;
import io.rsocket.frame.FrameType;
import java.util.Objects;
import javax.annotation.Nullable;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -57,13 +58,10 @@ public FragmentationDuplexConnection(
String type) {
Objects.requireNonNull(delegate, "delegate must not be null");
Objects.requireNonNull(allocator, "byteBufAllocator must not be null");
if (mtu < MIN_MTU_SIZE) {
throw new IllegalArgumentException("smallest allowed mtu size is " + MIN_MTU_SIZE + " bytes");
}
this.encodeLength = encodeLength;
this.allocator = allocator;
this.delegate = delegate;
this.mtu = mtu;
this.mtu = assertMtu(mtu);
this.frameReassembler = new FrameReassembler(allocator);
this.type = type;

Expand All @@ -74,6 +72,32 @@ private boolean shouldFragment(FrameType frameType, int readableBytes) {
return frameType.isFragmentable() && readableBytes > mtu;
}

/*TODO this is nullable and not returning empty to workaround javac 11.0.3 compiler issue on ubuntu (at least) */
@Nullable
public static <T> Mono<T> checkMtu(int mtu) {
if (isInsufficientMtu(mtu)) {
String msg =
String.format("smallest allowed mtu size is %d bytes, provided: %d", MIN_MTU_SIZE, mtu);
return Mono.error(new IllegalArgumentException(msg));
} else {
return null;
}
}

private static int assertMtu(int mtu) {
if (isInsufficientMtu(mtu)) {
String msg =
String.format("smallest allowed mtu size is %d bytes, provided: %d", MIN_MTU_SIZE, mtu);
throw new IllegalArgumentException(msg);
} else {
return mtu;
}
}

private static boolean isInsufficientMtu(int mtu) {
return mtu > 0 && mtu < MIN_MTU_SIZE || mtu < 0;
}

@Override
public Mono<Void> send(Publisher<ByteBuf> frames) {
return Flux.from(frames).concatMap(this::sendOne).then();
Expand All @@ -89,13 +113,13 @@ public Mono<Void> sendOne(ByteBuf frame) {
Flux.from(fragmentFrame(allocator, mtu, frame, frameType, encodeLength))
.doOnNext(
byteBuf -> {
ByteBuf frame1 = FrameLengthFlyweight.frame(byteBuf);
ByteBuf f = encodeLength ? FrameLengthFlyweight.frame(byteBuf) : byteBuf;
logger.debug(
"{} - stream id {} - frame type {} - \n {}",
type,
FrameHeaderFlyweight.streamId(frame1),
FrameHeaderFlyweight.frameType(frame1),
ByteBufUtil.prettyHexDump(frame1));
FrameHeaderFlyweight.streamId(f),
FrameHeaderFlyweight.frameType(f),
ByteBufUtil.prettyHexDump(f));
}));
} else {
return delegate.send(
Expand All @@ -108,7 +132,7 @@ public Mono<Void> sendOne(ByteBuf frame) {

private ByteBuf encode(ByteBuf frame) {
if (encodeLength) {
return FrameLengthFlyweight.encode(allocator, frame.readableBytes(), frame).retain();
return FrameLengthFlyweight.encode(allocator, frame.readableBytes(), frame);
} else {
return frame;
}
Expand Down
Loading

0 comments on commit ab570b4

Please sign in to comment.