Skip to content

Commit

Permalink
Update to new MCPL (#4902)
Browse files Browse the repository at this point in the history
* Update to new MCPL

* Add flow control to localsession to resolve race conditions

* Update listeners

* Update mcpl

* Bump mcpl

* Remove default listeners override

* Update core/src/main/java/org/geysermc/geyser/network/netty/LocalSession.java

Co-authored-by: chris <[email protected]>

* Bump MCPL

* Update mcpl impl

* Bump mcpl

* Update mcpl

* Inline lambda

* update mcpl

* back to mcpl snapshots instead of jitpack

---------

Co-authored-by: Konicai <[email protected]>
Co-authored-by: chris <[email protected]>
  • Loading branch information
3 people authored Oct 8, 2024
1 parent 4820893 commit c656e41
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 82 deletions.
130 changes: 71 additions & 59 deletions core/src/main/java/org/geysermc/geyser/network/netty/LocalSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,37 @@

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.unix.PreferredDirectByteBufAllocator;
import io.netty.handler.codec.haproxy.*;
import io.netty.handler.codec.haproxy.HAProxyCommand;
import io.netty.handler.codec.haproxy.HAProxyMessage;
import io.netty.handler.codec.haproxy.HAProxyMessageEncoder;
import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.geysermc.mcprotocollib.network.BuiltinFlags;
import org.geysermc.mcprotocollib.network.codec.PacketCodecHelper;
import org.geysermc.mcprotocollib.network.packet.PacketProtocol;
import org.geysermc.mcprotocollib.network.tcp.FlushHandler;
import org.geysermc.mcprotocollib.network.tcp.TcpFlowControlHandler;
import org.geysermc.mcprotocollib.network.tcp.TcpPacketCodec;
import org.geysermc.mcprotocollib.network.tcp.TcpPacketCompression;
import org.geysermc.mcprotocollib.network.tcp.TcpPacketEncryptor;
import org.geysermc.mcprotocollib.network.tcp.TcpPacketSizer;
import org.geysermc.mcprotocollib.network.tcp.TcpSession;
import org.geysermc.mcprotocollib.protocol.codec.MinecraftCodecHelper;

import java.net.Inet4Address;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -72,44 +87,53 @@ public void connect(boolean wait, boolean transferring) {
if (DEFAULT_EVENT_LOOP_GROUP == null) {
DEFAULT_EVENT_LOOP_GROUP = new DefaultEventLoopGroup(new DefaultThreadFactory(this.getClass(), true));
Runtime.getRuntime().addShutdownHook(new Thread(
() -> DEFAULT_EVENT_LOOP_GROUP.shutdownGracefully(100, 500, TimeUnit.MILLISECONDS)));
() -> DEFAULT_EVENT_LOOP_GROUP.shutdownGracefully(100, 500, TimeUnit.MILLISECONDS)));
}

try {
final Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(LocalChannelWithRemoteAddress.class);
bootstrap.handler(new ChannelInitializer<LocalChannelWithRemoteAddress>() {
@Override
public void initChannel(@NonNull LocalChannelWithRemoteAddress channel) {
channel.spoofedRemoteAddress(new InetSocketAddress(clientIp, 0));
PacketProtocol protocol = getPacketProtocol();
protocol.newClientSession(LocalSession.this, transferring);

refreshReadTimeoutHandler(channel);
refreshWriteTimeoutHandler(channel);

ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("sizer", new TcpPacketSizer(LocalSession.this, protocol.getPacketHeader().getLengthSize()));
pipeline.addLast("codec", new TcpPacketCodec(LocalSession.this, true));
pipeline.addLast("manager", LocalSession.this);

addHAProxySupport(pipeline);
}
}).group(DEFAULT_EVENT_LOOP_GROUP).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout() * 1000);

if (PREFERRED_DIRECT_BYTE_BUF_ALLOCATOR != null) {
bootstrap.option(ChannelOption.ALLOCATOR, PREFERRED_DIRECT_BYTE_BUF_ALLOCATOR);
final Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(LocalChannelWithRemoteAddress.class);
bootstrap.handler(new ChannelInitializer<LocalChannelWithRemoteAddress>() {
@Override
public void initChannel(@NonNull LocalChannelWithRemoteAddress channel) {
channel.spoofedRemoteAddress(new InetSocketAddress(clientIp, 0));
PacketProtocol protocol = getPacketProtocol();
protocol.newClientSession(LocalSession.this, transferring);

ChannelPipeline pipeline = channel.pipeline();

initializeHAProxySupport(channel);

pipeline.addLast("read-timeout", new ReadTimeoutHandler(getFlag(BuiltinFlags.READ_TIMEOUT, 30)));
pipeline.addLast("write-timeout", new WriteTimeoutHandler(getFlag(BuiltinFlags.WRITE_TIMEOUT, 0)));

pipeline.addLast("encryption", new TcpPacketEncryptor());
pipeline.addLast("sizer", new TcpPacketSizer(protocol.getPacketHeader(), getCodecHelper()));
pipeline.addLast("compression", new TcpPacketCompression(getCodecHelper()));

pipeline.addLast("flow-control", new TcpFlowControlHandler());
pipeline.addLast("codec", new TcpPacketCodec(LocalSession.this, true));
pipeline.addLast("flush-handler", new FlushHandler());
pipeline.addLast("manager", LocalSession.this);
}
}).group(DEFAULT_EVENT_LOOP_GROUP).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getFlag(BuiltinFlags.CLIENT_CONNECT_TIMEOUT, 30) * 1000);

bootstrap.remoteAddress(targetAddress);
if (PREFERRED_DIRECT_BYTE_BUF_ALLOCATOR != null) {
bootstrap.option(ChannelOption.ALLOCATOR, PREFERRED_DIRECT_BYTE_BUF_ALLOCATOR);
}

bootstrap.connect().addListener((future) -> {
if (!future.isSuccess()) {
exceptionCaught(null, future.cause());
}
});
} catch (Throwable t) {
exceptionCaught(null, t);
bootstrap.remoteAddress(targetAddress);

CompletableFuture<Void> handleFuture = new CompletableFuture<>();
bootstrap.connect().addListener((futureListener) -> {
if (!futureListener.isSuccess()) {
exceptionCaught(null, futureListener.cause());
}

handleFuture.complete(null);
});

if (wait) {
handleFuture.join();
}
}

Expand All @@ -118,32 +142,20 @@ public MinecraftCodecHelper getCodecHelper() {
return (MinecraftCodecHelper) this.codecHelper;
}

// TODO duplicate code
private void addHAProxySupport(ChannelPipeline pipeline) {
private void initializeHAProxySupport(Channel channel) {
InetSocketAddress clientAddress = getFlag(BuiltinFlags.CLIENT_PROXIED_ADDRESS);
if (getFlag(BuiltinFlags.ENABLE_CLIENT_PROXY_PROTOCOL, false) && clientAddress != null) {
pipeline.addFirst("proxy-protocol-packet-sender", new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(@NonNull ChannelHandlerContext ctx) throws Exception {
HAProxyProxiedProtocol proxiedProtocol = clientAddress.getAddress() instanceof Inet4Address ? HAProxyProxiedProtocol.TCP4 : HAProxyProxiedProtocol.TCP6;
InetSocketAddress remoteAddress;
if (ctx.channel().remoteAddress() instanceof InetSocketAddress) {
remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
} else {
remoteAddress = new InetSocketAddress(host, port);
}
ctx.channel().writeAndFlush(new HAProxyMessage(
HAProxyProtocolVersion.V2, HAProxyCommand.PROXY, proxiedProtocol,
clientAddress.getAddress().getHostAddress(), remoteAddress.getAddress().getHostAddress(),
clientAddress.getPort(), remoteAddress.getPort()
));
ctx.pipeline().remove(this);
ctx.pipeline().remove("proxy-protocol-encoder");
super.channelActive(ctx);
}
});
pipeline.addFirst("proxy-protocol-encoder", HAProxyMessageEncoder.INSTANCE);
if (clientAddress == null) {
return;
}

channel.pipeline().addLast("proxy-protocol-encoder", HAProxyMessageEncoder.INSTANCE);
HAProxyProxiedProtocol proxiedProtocol = clientAddress.getAddress() instanceof Inet4Address ? HAProxyProxiedProtocol.TCP4 : HAProxyProxiedProtocol.TCP6;
InetSocketAddress remoteAddress = (InetSocketAddress) channel.remoteAddress();
channel.writeAndFlush(new HAProxyMessage(
HAProxyProtocolVersion.V2, HAProxyCommand.PROXY, proxiedProtocol,
clientAddress.getAddress().getHostAddress(), remoteAddress.getAddress().getHostAddress(),
clientAddress.getPort(), remoteAddress.getPort()
)).addListener(future -> channel.pipeline().remove("proxy-protocol-encoder"));
}

/**
Expand Down
25 changes: 3 additions & 22 deletions core/src/main/java/org/geysermc/geyser/session/GeyserSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -963,8 +963,6 @@ private void connectDownstream() {
// Start ticking
tickThread = eventLoop.scheduleAtFixedRate(this::tick, 50, 50, TimeUnit.MILLISECONDS);

this.protocol.setUseDefaultListeners(false);

TcpSession downstream;
if (geyser.getBootstrap().getSocketAddress() != null) {
// We're going to connect through the JVM and not through TCP
Expand All @@ -990,7 +988,6 @@ private void connectDownstream() {
this.downstream.getSession().setFlag(MinecraftConstants.FOLLOW_TRANSFERS, false);

if (geyser.getConfig().getRemote().isUseProxyProtocol()) {
downstream.setFlag(BuiltinFlags.ENABLE_CLIENT_PROXY_PROTOCOL, true);
downstream.setFlag(BuiltinFlags.CLIENT_PROXIED_ADDRESS, upstream.getAddress());
}
if (geyser.getConfig().isForwardPlayerPing()) {
Expand All @@ -1000,22 +997,6 @@ private void connectDownstream() {
// We'll handle this since we have the registry data on hand
downstream.setFlag(MinecraftConstants.SEND_BLANK_KNOWN_PACKS_RESPONSE, false);

// This isn't a great solution, but... we want to make sure the finish configuration packet cannot be sent
// before the KnownPacks packet.
this.downstream.getSession().addListener(new ClientListener(ProtocolState.LOGIN, loginEvent.transferring()) {
@Override
public void packetReceived(Session session, Packet packet) {
if (protocol.getState() == ProtocolState.CONFIGURATION) {
if (packet instanceof ClientboundFinishConfigurationPacket) {
// Prevent
GeyserSession.this.ensureInEventLoop(() -> GeyserSession.this.sendDownstreamPacket(new ServerboundFinishConfigurationPacket()));
return;
}
}
super.packetReceived(session, packet);
}
});

downstream.addListener(new SessionAdapter() {
@Override
public void packetSending(PacketSendingEvent event) {
Expand Down Expand Up @@ -1788,8 +1769,8 @@ public void sendDownstreamPacket(Packet packet, ProtocolState intendedState) {
return;
}

if (protocol.getState() != intendedState) {
geyser.getLogger().debug("Tried to send " + packet.getClass().getSimpleName() + " packet while not in " + intendedState.name() + " state");
if (protocol.getOutboundState() != intendedState) {
geyser.getLogger().debug("Tried to send " + packet.getClass().getSimpleName() + " packet while not in " + intendedState.name() + " outbound state");
return;
}

Expand Down Expand Up @@ -1823,7 +1804,7 @@ public void sendDownstreamPacket(Packet packet) {
}

private void sendDownstreamPacket0(Packet packet) {
ProtocolState state = protocol.getState();
ProtocolState state = protocol.getOutboundState();
if (state == ProtocolState.GAME || state == ProtocolState.CONFIGURATION || packet.getClass() == ServerboundCustomQueryAnswerPacket.class) {
downstream.sendPacket(packet);
} else {
Expand Down
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ protocol-common = "3.0.0.Beta5-20240916.181041-6"
protocol-codec = "3.0.0.Beta5-20240916.181041-6"
raknet = "1.0.0.CR3-20240416.144209-1"
minecraftauth = "4.1.1"
mcprotocollib = "1.21-20240725.013034-16"
mcprotocollib = "1.21-20241008.134549-23"
adventure = "4.14.0"
adventure-platform = "4.3.0"
junit = "5.9.2"
Expand Down

0 comments on commit c656e41

Please sign in to comment.