mirror of
https://github.com/GeyserMC/Geyser.git
synced 2025-01-08 19:33:58 +01:00
Update to new MCPL (#4902)
* Update to new MCPL * Add flow control to localsession to resolve race conditions * Update listeners * Update mcpl * Bump mcpl * Remove default listeners override * Update core/src/main/java/org/geysermc/geyser/network/netty/LocalSession.java Co-authored-by: chris <github@onechris.mozmail.com> * Bump MCPL * Update mcpl impl * Bump mcpl * Update mcpl * Inline lambda * update mcpl * back to mcpl snapshots instead of jitpack --------- Co-authored-by: Konicai <71294714+Konicai@users.noreply.github.com> Co-authored-by: chris <github@onechris.mozmail.com>
This commit is contained in:
parent
4820893792
commit
c656e415f3
3 changed files with 71 additions and 78 deletions
|
@ -27,15 +27,29 @@ package org.geysermc.geyser.network.netty;
|
||||||
|
|
||||||
import io.netty.bootstrap.Bootstrap;
|
import io.netty.bootstrap.Bootstrap;
|
||||||
import io.netty.buffer.ByteBufAllocator;
|
import io.netty.buffer.ByteBufAllocator;
|
||||||
import io.netty.channel.*;
|
import io.netty.channel.Channel;
|
||||||
|
import io.netty.channel.ChannelInitializer;
|
||||||
|
import io.netty.channel.ChannelOption;
|
||||||
|
import io.netty.channel.ChannelPipeline;
|
||||||
|
import io.netty.channel.DefaultEventLoopGroup;
|
||||||
import io.netty.channel.unix.PreferredDirectByteBufAllocator;
|
import io.netty.channel.unix.PreferredDirectByteBufAllocator;
|
||||||
import io.netty.handler.codec.haproxy.*;
|
import io.netty.handler.codec.haproxy.HAProxyCommand;
|
||||||
|
import io.netty.handler.codec.haproxy.HAProxyMessage;
|
||||||
|
import io.netty.handler.codec.haproxy.HAProxyMessageEncoder;
|
||||||
|
import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
|
||||||
|
import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol;
|
||||||
|
import io.netty.handler.timeout.ReadTimeoutHandler;
|
||||||
|
import io.netty.handler.timeout.WriteTimeoutHandler;
|
||||||
import io.netty.util.concurrent.DefaultThreadFactory;
|
import io.netty.util.concurrent.DefaultThreadFactory;
|
||||||
import org.checkerframework.checker.nullness.qual.NonNull;
|
import org.checkerframework.checker.nullness.qual.NonNull;
|
||||||
import org.geysermc.mcprotocollib.network.BuiltinFlags;
|
import org.geysermc.mcprotocollib.network.BuiltinFlags;
|
||||||
import org.geysermc.mcprotocollib.network.codec.PacketCodecHelper;
|
import org.geysermc.mcprotocollib.network.codec.PacketCodecHelper;
|
||||||
import org.geysermc.mcprotocollib.network.packet.PacketProtocol;
|
import org.geysermc.mcprotocollib.network.packet.PacketProtocol;
|
||||||
|
import org.geysermc.mcprotocollib.network.tcp.FlushHandler;
|
||||||
|
import org.geysermc.mcprotocollib.network.tcp.TcpFlowControlHandler;
|
||||||
import org.geysermc.mcprotocollib.network.tcp.TcpPacketCodec;
|
import org.geysermc.mcprotocollib.network.tcp.TcpPacketCodec;
|
||||||
|
import org.geysermc.mcprotocollib.network.tcp.TcpPacketCompression;
|
||||||
|
import org.geysermc.mcprotocollib.network.tcp.TcpPacketEncryptor;
|
||||||
import org.geysermc.mcprotocollib.network.tcp.TcpPacketSizer;
|
import org.geysermc.mcprotocollib.network.tcp.TcpPacketSizer;
|
||||||
import org.geysermc.mcprotocollib.network.tcp.TcpSession;
|
import org.geysermc.mcprotocollib.network.tcp.TcpSession;
|
||||||
import org.geysermc.mcprotocollib.protocol.codec.MinecraftCodecHelper;
|
import org.geysermc.mcprotocollib.protocol.codec.MinecraftCodecHelper;
|
||||||
|
@ -43,6 +57,7 @@ import org.geysermc.mcprotocollib.protocol.codec.MinecraftCodecHelper;
|
||||||
import java.net.Inet4Address;
|
import java.net.Inet4Address;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.SocketAddress;
|
import java.net.SocketAddress;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -72,44 +87,53 @@ public final class LocalSession extends TcpSession {
|
||||||
if (DEFAULT_EVENT_LOOP_GROUP == null) {
|
if (DEFAULT_EVENT_LOOP_GROUP == null) {
|
||||||
DEFAULT_EVENT_LOOP_GROUP = new DefaultEventLoopGroup(new DefaultThreadFactory(this.getClass(), true));
|
DEFAULT_EVENT_LOOP_GROUP = new DefaultEventLoopGroup(new DefaultThreadFactory(this.getClass(), true));
|
||||||
Runtime.getRuntime().addShutdownHook(new Thread(
|
Runtime.getRuntime().addShutdownHook(new Thread(
|
||||||
() -> DEFAULT_EVENT_LOOP_GROUP.shutdownGracefully(100, 500, TimeUnit.MILLISECONDS)));
|
() -> DEFAULT_EVENT_LOOP_GROUP.shutdownGracefully(100, 500, TimeUnit.MILLISECONDS)));
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
final Bootstrap bootstrap = new Bootstrap();
|
||||||
final Bootstrap bootstrap = new Bootstrap();
|
bootstrap.channel(LocalChannelWithRemoteAddress.class);
|
||||||
bootstrap.channel(LocalChannelWithRemoteAddress.class);
|
bootstrap.handler(new ChannelInitializer<LocalChannelWithRemoteAddress>() {
|
||||||
bootstrap.handler(new ChannelInitializer<LocalChannelWithRemoteAddress>() {
|
@Override
|
||||||
@Override
|
public void initChannel(@NonNull LocalChannelWithRemoteAddress channel) {
|
||||||
public void initChannel(@NonNull LocalChannelWithRemoteAddress channel) {
|
channel.spoofedRemoteAddress(new InetSocketAddress(clientIp, 0));
|
||||||
channel.spoofedRemoteAddress(new InetSocketAddress(clientIp, 0));
|
PacketProtocol protocol = getPacketProtocol();
|
||||||
PacketProtocol protocol = getPacketProtocol();
|
protocol.newClientSession(LocalSession.this, transferring);
|
||||||
protocol.newClientSession(LocalSession.this, transferring);
|
|
||||||
|
|
||||||
refreshReadTimeoutHandler(channel);
|
ChannelPipeline pipeline = channel.pipeline();
|
||||||
refreshWriteTimeoutHandler(channel);
|
|
||||||
|
|
||||||
ChannelPipeline pipeline = channel.pipeline();
|
initializeHAProxySupport(channel);
|
||||||
pipeline.addLast("sizer", new TcpPacketSizer(LocalSession.this, protocol.getPacketHeader().getLengthSize()));
|
|
||||||
pipeline.addLast("codec", new TcpPacketCodec(LocalSession.this, true));
|
|
||||||
pipeline.addLast("manager", LocalSession.this);
|
|
||||||
|
|
||||||
addHAProxySupport(pipeline);
|
pipeline.addLast("read-timeout", new ReadTimeoutHandler(getFlag(BuiltinFlags.READ_TIMEOUT, 30)));
|
||||||
}
|
pipeline.addLast("write-timeout", new WriteTimeoutHandler(getFlag(BuiltinFlags.WRITE_TIMEOUT, 0)));
|
||||||
}).group(DEFAULT_EVENT_LOOP_GROUP).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout() * 1000);
|
|
||||||
|
|
||||||
if (PREFERRED_DIRECT_BYTE_BUF_ALLOCATOR != null) {
|
pipeline.addLast("encryption", new TcpPacketEncryptor());
|
||||||
bootstrap.option(ChannelOption.ALLOCATOR, PREFERRED_DIRECT_BYTE_BUF_ALLOCATOR);
|
pipeline.addLast("sizer", new TcpPacketSizer(protocol.getPacketHeader(), getCodecHelper()));
|
||||||
|
pipeline.addLast("compression", new TcpPacketCompression(getCodecHelper()));
|
||||||
|
|
||||||
|
pipeline.addLast("flow-control", new TcpFlowControlHandler());
|
||||||
|
pipeline.addLast("codec", new TcpPacketCodec(LocalSession.this, true));
|
||||||
|
pipeline.addLast("flush-handler", new FlushHandler());
|
||||||
|
pipeline.addLast("manager", LocalSession.this);
|
||||||
|
}
|
||||||
|
}).group(DEFAULT_EVENT_LOOP_GROUP).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getFlag(BuiltinFlags.CLIENT_CONNECT_TIMEOUT, 30) * 1000);
|
||||||
|
|
||||||
|
if (PREFERRED_DIRECT_BYTE_BUF_ALLOCATOR != null) {
|
||||||
|
bootstrap.option(ChannelOption.ALLOCATOR, PREFERRED_DIRECT_BYTE_BUF_ALLOCATOR);
|
||||||
|
}
|
||||||
|
|
||||||
|
bootstrap.remoteAddress(targetAddress);
|
||||||
|
|
||||||
|
CompletableFuture<Void> handleFuture = new CompletableFuture<>();
|
||||||
|
bootstrap.connect().addListener((futureListener) -> {
|
||||||
|
if (!futureListener.isSuccess()) {
|
||||||
|
exceptionCaught(null, futureListener.cause());
|
||||||
}
|
}
|
||||||
|
|
||||||
bootstrap.remoteAddress(targetAddress);
|
handleFuture.complete(null);
|
||||||
|
});
|
||||||
|
|
||||||
bootstrap.connect().addListener((future) -> {
|
if (wait) {
|
||||||
if (!future.isSuccess()) {
|
handleFuture.join();
|
||||||
exceptionCaught(null, future.cause());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} catch (Throwable t) {
|
|
||||||
exceptionCaught(null, t);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -118,32 +142,20 @@ public final class LocalSession extends TcpSession {
|
||||||
return (MinecraftCodecHelper) this.codecHelper;
|
return (MinecraftCodecHelper) this.codecHelper;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO duplicate code
|
private void initializeHAProxySupport(Channel channel) {
|
||||||
private void addHAProxySupport(ChannelPipeline pipeline) {
|
|
||||||
InetSocketAddress clientAddress = getFlag(BuiltinFlags.CLIENT_PROXIED_ADDRESS);
|
InetSocketAddress clientAddress = getFlag(BuiltinFlags.CLIENT_PROXIED_ADDRESS);
|
||||||
if (getFlag(BuiltinFlags.ENABLE_CLIENT_PROXY_PROTOCOL, false) && clientAddress != null) {
|
if (clientAddress == null) {
|
||||||
pipeline.addFirst("proxy-protocol-packet-sender", new ChannelInboundHandlerAdapter() {
|
return;
|
||||||
@Override
|
|
||||||
public void channelActive(@NonNull ChannelHandlerContext ctx) throws Exception {
|
|
||||||
HAProxyProxiedProtocol proxiedProtocol = clientAddress.getAddress() instanceof Inet4Address ? HAProxyProxiedProtocol.TCP4 : HAProxyProxiedProtocol.TCP6;
|
|
||||||
InetSocketAddress remoteAddress;
|
|
||||||
if (ctx.channel().remoteAddress() instanceof InetSocketAddress) {
|
|
||||||
remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
|
|
||||||
} else {
|
|
||||||
remoteAddress = new InetSocketAddress(host, port);
|
|
||||||
}
|
|
||||||
ctx.channel().writeAndFlush(new HAProxyMessage(
|
|
||||||
HAProxyProtocolVersion.V2, HAProxyCommand.PROXY, proxiedProtocol,
|
|
||||||
clientAddress.getAddress().getHostAddress(), remoteAddress.getAddress().getHostAddress(),
|
|
||||||
clientAddress.getPort(), remoteAddress.getPort()
|
|
||||||
));
|
|
||||||
ctx.pipeline().remove(this);
|
|
||||||
ctx.pipeline().remove("proxy-protocol-encoder");
|
|
||||||
super.channelActive(ctx);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
pipeline.addFirst("proxy-protocol-encoder", HAProxyMessageEncoder.INSTANCE);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
channel.pipeline().addLast("proxy-protocol-encoder", HAProxyMessageEncoder.INSTANCE);
|
||||||
|
HAProxyProxiedProtocol proxiedProtocol = clientAddress.getAddress() instanceof Inet4Address ? HAProxyProxiedProtocol.TCP4 : HAProxyProxiedProtocol.TCP6;
|
||||||
|
InetSocketAddress remoteAddress = (InetSocketAddress) channel.remoteAddress();
|
||||||
|
channel.writeAndFlush(new HAProxyMessage(
|
||||||
|
HAProxyProtocolVersion.V2, HAProxyCommand.PROXY, proxiedProtocol,
|
||||||
|
clientAddress.getAddress().getHostAddress(), remoteAddress.getAddress().getHostAddress(),
|
||||||
|
clientAddress.getPort(), remoteAddress.getPort()
|
||||||
|
)).addListener(future -> channel.pipeline().remove("proxy-protocol-encoder"));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -963,8 +963,6 @@ public class GeyserSession implements GeyserConnection, GeyserCommandSource {
|
||||||
// Start ticking
|
// Start ticking
|
||||||
tickThread = eventLoop.scheduleAtFixedRate(this::tick, 50, 50, TimeUnit.MILLISECONDS);
|
tickThread = eventLoop.scheduleAtFixedRate(this::tick, 50, 50, TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
this.protocol.setUseDefaultListeners(false);
|
|
||||||
|
|
||||||
TcpSession downstream;
|
TcpSession downstream;
|
||||||
if (geyser.getBootstrap().getSocketAddress() != null) {
|
if (geyser.getBootstrap().getSocketAddress() != null) {
|
||||||
// We're going to connect through the JVM and not through TCP
|
// We're going to connect through the JVM and not through TCP
|
||||||
|
@ -990,7 +988,6 @@ public class GeyserSession implements GeyserConnection, GeyserCommandSource {
|
||||||
this.downstream.getSession().setFlag(MinecraftConstants.FOLLOW_TRANSFERS, false);
|
this.downstream.getSession().setFlag(MinecraftConstants.FOLLOW_TRANSFERS, false);
|
||||||
|
|
||||||
if (geyser.getConfig().getRemote().isUseProxyProtocol()) {
|
if (geyser.getConfig().getRemote().isUseProxyProtocol()) {
|
||||||
downstream.setFlag(BuiltinFlags.ENABLE_CLIENT_PROXY_PROTOCOL, true);
|
|
||||||
downstream.setFlag(BuiltinFlags.CLIENT_PROXIED_ADDRESS, upstream.getAddress());
|
downstream.setFlag(BuiltinFlags.CLIENT_PROXIED_ADDRESS, upstream.getAddress());
|
||||||
}
|
}
|
||||||
if (geyser.getConfig().isForwardPlayerPing()) {
|
if (geyser.getConfig().isForwardPlayerPing()) {
|
||||||
|
@ -1000,22 +997,6 @@ public class GeyserSession implements GeyserConnection, GeyserCommandSource {
|
||||||
// We'll handle this since we have the registry data on hand
|
// We'll handle this since we have the registry data on hand
|
||||||
downstream.setFlag(MinecraftConstants.SEND_BLANK_KNOWN_PACKS_RESPONSE, false);
|
downstream.setFlag(MinecraftConstants.SEND_BLANK_KNOWN_PACKS_RESPONSE, false);
|
||||||
|
|
||||||
// This isn't a great solution, but... we want to make sure the finish configuration packet cannot be sent
|
|
||||||
// before the KnownPacks packet.
|
|
||||||
this.downstream.getSession().addListener(new ClientListener(ProtocolState.LOGIN, loginEvent.transferring()) {
|
|
||||||
@Override
|
|
||||||
public void packetReceived(Session session, Packet packet) {
|
|
||||||
if (protocol.getState() == ProtocolState.CONFIGURATION) {
|
|
||||||
if (packet instanceof ClientboundFinishConfigurationPacket) {
|
|
||||||
// Prevent
|
|
||||||
GeyserSession.this.ensureInEventLoop(() -> GeyserSession.this.sendDownstreamPacket(new ServerboundFinishConfigurationPacket()));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
super.packetReceived(session, packet);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
downstream.addListener(new SessionAdapter() {
|
downstream.addListener(new SessionAdapter() {
|
||||||
@Override
|
@Override
|
||||||
public void packetSending(PacketSendingEvent event) {
|
public void packetSending(PacketSendingEvent event) {
|
||||||
|
@ -1788,8 +1769,8 @@ public class GeyserSession implements GeyserConnection, GeyserCommandSource {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (protocol.getState() != intendedState) {
|
if (protocol.getOutboundState() != intendedState) {
|
||||||
geyser.getLogger().debug("Tried to send " + packet.getClass().getSimpleName() + " packet while not in " + intendedState.name() + " state");
|
geyser.getLogger().debug("Tried to send " + packet.getClass().getSimpleName() + " packet while not in " + intendedState.name() + " outbound state");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1823,7 +1804,7 @@ public class GeyserSession implements GeyserConnection, GeyserCommandSource {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void sendDownstreamPacket0(Packet packet) {
|
private void sendDownstreamPacket0(Packet packet) {
|
||||||
ProtocolState state = protocol.getState();
|
ProtocolState state = protocol.getOutboundState();
|
||||||
if (state == ProtocolState.GAME || state == ProtocolState.CONFIGURATION || packet.getClass() == ServerboundCustomQueryAnswerPacket.class) {
|
if (state == ProtocolState.GAME || state == ProtocolState.CONFIGURATION || packet.getClass() == ServerboundCustomQueryAnswerPacket.class) {
|
||||||
downstream.sendPacket(packet);
|
downstream.sendPacket(packet);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -15,7 +15,7 @@ protocol-common = "3.0.0.Beta5-20240916.181041-6"
|
||||||
protocol-codec = "3.0.0.Beta5-20240916.181041-6"
|
protocol-codec = "3.0.0.Beta5-20240916.181041-6"
|
||||||
raknet = "1.0.0.CR3-20240416.144209-1"
|
raknet = "1.0.0.CR3-20240416.144209-1"
|
||||||
minecraftauth = "4.1.1"
|
minecraftauth = "4.1.1"
|
||||||
mcprotocollib = "1.21-20240725.013034-16"
|
mcprotocollib = "1.21-20241008.134549-23"
|
||||||
adventure = "4.14.0"
|
adventure = "4.14.0"
|
||||||
adventure-platform = "4.3.0"
|
adventure-platform = "4.3.0"
|
||||||
junit = "5.9.2"
|
junit = "5.9.2"
|
||||||
|
|
Loading…
Reference in a new issue