mirror of
https://github.com/PaperMC/Paper.git
synced 2024-12-28 07:20:24 +01:00
Readd network optimization patch
This commit is contained in:
parent
7a108cda40
commit
663a0d893e
3 changed files with 276 additions and 245 deletions
|
@ -12,8 +12,8 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000
|
||||||
--- a/src/main/java/net/minecraft/network/Connection.java
|
--- a/src/main/java/net/minecraft/network/Connection.java
|
||||||
+++ b/src/main/java/net/minecraft/network/Connection.java
|
+++ b/src/main/java/net/minecraft/network/Connection.java
|
||||||
@@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
@@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
// Paper end - Optimize network
|
||||||
|
|
||||||
+ private static final int MAX_PER_TICK = io.papermc.paper.configuration.GlobalConfiguration.get().misc.maxJoinsPerTick; // Paper
|
+ private static final int MAX_PER_TICK = io.papermc.paper.configuration.GlobalConfiguration.get().misc.maxJoinsPerTick; // Paper
|
||||||
+ private static int joinAttemptsThisTick; // Paper
|
+ private static int joinAttemptsThisTick; // Paper
|
||||||
|
|
|
@ -513,7 +513,7 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000
|
||||||
+ return new DefaultEventLoopGroup(0, (new ThreadFactoryBuilder()).setNameFormat("Netty Local Client IO #%d").setDaemon(true).setUncaughtExceptionHandler(new net.minecraft.DefaultUncaughtExceptionHandlerWithName(LOGGER)).build()); // Paper
|
+ return new DefaultEventLoopGroup(0, (new ThreadFactoryBuilder()).setNameFormat("Netty Local Client IO #%d").setDaemon(true).setUncaughtExceptionHandler(new net.minecraft.DefaultUncaughtExceptionHandlerWithName(LOGGER)).build()); // Paper
|
||||||
});
|
});
|
||||||
private final PacketFlow receiving;
|
private final PacketFlow receiving;
|
||||||
private final Queue<Consumer<Connection>> pendingActions = Queues.newConcurrentLinkedQueue();
|
private final Queue<WrappedConsumer> pendingActions = Queues.newConcurrentLinkedQueue();
|
||||||
@@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
@@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,60 +32,258 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000
|
||||||
--- a/src/main/java/net/minecraft/network/Connection.java
|
--- a/src/main/java/net/minecraft/network/Connection.java
|
||||||
+++ b/src/main/java/net/minecraft/network/Connection.java
|
+++ b/src/main/java/net/minecraft/network/Connection.java
|
||||||
@@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
@@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||||
public int protocolVersion;
|
return new DefaultEventLoopGroup(0, (new ThreadFactoryBuilder()).setNameFormat("Netty Local Client IO #%d").setDaemon(true).build());
|
||||||
|
});
|
||||||
|
private final PacketFlow receiving;
|
||||||
|
- private final Queue<Consumer<Connection>> pendingActions = Queues.newConcurrentLinkedQueue();
|
||||||
|
+ private final Queue<WrappedConsumer> pendingActions = Queues.newConcurrentLinkedQueue();
|
||||||
|
public Channel channel;
|
||||||
|
public SocketAddress address;
|
||||||
|
// Spigot Start
|
||||||
|
@@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||||
public java.net.InetSocketAddress virtualHost;
|
public java.net.InetSocketAddress virtualHost;
|
||||||
private static boolean enableExplicitFlush = Boolean.getBoolean("paper.explicit-flush");
|
private static boolean enableExplicitFlush = Boolean.getBoolean("paper.explicit-flush");
|
||||||
+ // Optimize network
|
|
||||||
+ public boolean isPending = true;
|
|
||||||
+ public boolean queueImmunity = false;
|
|
||||||
+ public ConnectionProtocol protocol;
|
|
||||||
// Paper end
|
// Paper end
|
||||||
|
+ // Paper start - Optimize network
|
||||||
|
+ public boolean isPending = true;
|
||||||
|
+ public boolean queueImmunity;
|
||||||
|
+ // Paper end - Optimize network
|
||||||
|
|
||||||
public Connection(PacketFlow side) {
|
// Paper start - add utility methods
|
||||||
|
public final net.minecraft.server.level.ServerPlayer getPlayer() {
|
||||||
@@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
@@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setProtocol(ConnectionProtocol state) {
|
public void send(Packet<?> packet, @Nullable PacketSendListener callbacks, boolean flush) {
|
||||||
+ protocol = state; // Paper
|
- if (this.isConnected()) {
|
||||||
this.channel.attr(Connection.ATTRIBUTE_PROTOCOL).set(state);
|
- this.flushQueue();
|
||||||
this.channel.attr(BundlerInfo.BUNDLER_PROVIDER).set(state);
|
+ // Paper start - Optimize network: Handle oversized packets better
|
||||||
this.channel.config().setAutoRead(true);
|
+ final boolean connected = this.isConnected();
|
||||||
@@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
+ if (!connected && !this.preparing) {
|
||||||
Validate.notNull(listener, "packetListener", new Object[0]);
|
+ return;
|
||||||
this.packetListener = listener;
|
|
||||||
}
|
|
||||||
+ // Paper start
|
|
||||||
+ public @Nullable net.minecraft.server.level.ServerPlayer getPlayer() {
|
|
||||||
+ if (packetListener instanceof net.minecraft.server.network.ServerGamePacketListenerImpl serverGamePacketListener) {
|
|
||||||
+ return serverGamePacketListener.player;
|
|
||||||
+ } else {
|
|
||||||
+ return null;
|
|
||||||
+ }
|
+ }
|
||||||
|
+
|
||||||
|
+ packet.onPacketDispatch(this.getPlayer());
|
||||||
|
+ if (connected && (InnerUtil.canSendImmediate(this, packet)
|
||||||
|
+ || (io.papermc.paper.util.MCUtil.isMainThread() && packet.isReady() && this.pendingActions.isEmpty()
|
||||||
|
+ && (packet.getExtraPackets() == null || packet.getExtraPackets().isEmpty())))) {
|
||||||
|
this.sendPacket(packet, callbacks, flush);
|
||||||
|
} else {
|
||||||
|
- this.pendingActions.add((networkmanager) -> {
|
||||||
|
- networkmanager.sendPacket(packet, callbacks, flush);
|
||||||
|
- });
|
||||||
|
- }
|
||||||
|
+ // Write the packets to the queue, then flush - antixray hooks there already
|
||||||
|
+ final java.util.List<Packet<?>> extraPackets = InnerUtil.buildExtraPackets(packet);
|
||||||
|
+ final boolean hasExtraPackets = extraPackets != null && !extraPackets.isEmpty();
|
||||||
|
+ if (!hasExtraPackets) {
|
||||||
|
+ this.pendingActions.add(new PacketSendAction(packet, callbacks, flush));
|
||||||
|
+ } else {
|
||||||
|
+ final java.util.List<PacketSendAction> actions = new java.util.ArrayList<>(1 + extraPackets.size());
|
||||||
|
+ actions.add(new PacketSendAction(packet, null, false)); // Delay the future listener until the end of the extra packets
|
||||||
|
|
||||||
|
+ for (int i = 0, len = extraPackets.size(); i < len;) {
|
||||||
|
+ final Packet<?> extraPacket = extraPackets.get(i);
|
||||||
|
+ final boolean end = ++i == len;
|
||||||
|
+ actions.add(new PacketSendAction(extraPacket, end ? callbacks : null, end)); // Append listener to the end
|
||||||
|
+ }
|
||||||
|
+
|
||||||
|
+ this.pendingActions.addAll(actions);
|
||||||
|
+ }
|
||||||
|
+
|
||||||
|
+ this.flushQueue();
|
||||||
|
+ // Paper end - Optimize network
|
||||||
|
+ }
|
||||||
|
}
|
||||||
|
|
||||||
|
public void runOnceConnected(Consumer<Connection> task) {
|
||||||
|
@@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||||
|
this.flushQueue();
|
||||||
|
task.accept(this);
|
||||||
|
} else {
|
||||||
|
- this.pendingActions.add(task);
|
||||||
|
+ this.pendingActions.add(new WrappedConsumer(task)); // Paper - Optimize network
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
@@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doSendPacket(Packet<?> packet, @Nullable PacketSendListener callbacks, boolean flush) {
|
||||||
|
+ // Paper start - Optimize network
|
||||||
|
+ final net.minecraft.server.level.ServerPlayer player = this.getPlayer();
|
||||||
|
+ if (!this.isConnected()) {
|
||||||
|
+ packet.onPacketDispatchFinish(player, null);
|
||||||
|
+ return;
|
||||||
|
+ }
|
||||||
|
+ try {
|
||||||
|
+ // Paper end - Optimize network
|
||||||
|
ChannelFuture channelfuture = flush ? this.channel.writeAndFlush(packet) : this.channel.write(packet);
|
||||||
|
|
||||||
|
if (callbacks != null) {
|
||||||
|
@@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
+ // Paper start - Optimize network
|
||||||
|
+ if (packet.hasFinishListener()) {
|
||||||
|
+ channelfuture.addListener((ChannelFutureListener) channelFuture -> packet.onPacketDispatchFinish(player, channelFuture));
|
||||||
|
+ }
|
||||||
|
channelfuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
|
||||||
|
+ } catch (final Exception e) {
|
||||||
|
+ LOGGER.error("NetworkException: {}", player, e);
|
||||||
|
+ this.disconnect(Component.translatable("disconnect.genericReason", "Internal Exception: " + e.getMessage()));
|
||||||
|
+ packet.onPacketDispatchFinish(player, null);
|
||||||
|
+ }
|
||||||
|
+ // Paper end - Optimize network
|
||||||
|
}
|
||||||
|
|
||||||
|
public void flushChannel() {
|
||||||
|
if (this.isConnected()) {
|
||||||
|
this.flush();
|
||||||
|
} else {
|
||||||
|
- this.pendingActions.add(Connection::flush);
|
||||||
|
+ this.pendingActions.add(new WrappedConsumer(Connection::flush)); // Paper - Optimize network
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
@@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||||
|
return attributekey;
|
||||||
|
}
|
||||||
|
|
||||||
|
- private void flushQueue() {
|
||||||
|
- if (this.channel != null && this.channel.isOpen()) {
|
||||||
|
- Queue queue = this.pendingActions;
|
||||||
|
-
|
||||||
|
+ // Paper start - Optimize network: Rewrite this to be safer if ran off main thread
|
||||||
|
+ private boolean flushQueue() {
|
||||||
|
+ if (!this.isConnected()) {
|
||||||
|
+ return true;
|
||||||
|
+ }
|
||||||
|
+ if (io.papermc.paper.util.MCUtil.isMainThread()) {
|
||||||
|
+ return this.processQueue();
|
||||||
|
+ } else if (this.isPending) {
|
||||||
|
+ // Should only happen during login/status stages
|
||||||
|
synchronized (this.pendingActions) {
|
||||||
|
- Consumer consumer;
|
||||||
|
+ return this.processQueue();
|
||||||
|
+ }
|
||||||
|
+ }
|
||||||
|
+ return false;
|
||||||
+ }
|
+ }
|
||||||
+ private static class InnerUtil { // Attempt to hide these methods from ProtocolLib so it doesn't accidently pick them up.
|
+
|
||||||
+ private static java.util.List<Packet> buildExtraPackets(Packet packet) {
|
+ private boolean processQueue() {
|
||||||
+ java.util.List<Packet> extra = packet.getExtraPackets();
|
+ if (this.pendingActions.isEmpty()) {
|
||||||
|
+ return true;
|
||||||
|
+ }
|
||||||
|
|
||||||
|
- while ((consumer = (Consumer) this.pendingActions.poll()) != null) {
|
||||||
|
- consumer.accept(this);
|
||||||
|
+ // If we are on main, we are safe here in that nothing else should be processing queue off main anymore
|
||||||
|
+ // But if we are not on main due to login/status, the parent is synchronized on packetQueue
|
||||||
|
+ final java.util.Iterator<WrappedConsumer> iterator = this.pendingActions.iterator();
|
||||||
|
+ while (iterator.hasNext()) {
|
||||||
|
+ final WrappedConsumer queued = iterator.next(); // poll -> peek
|
||||||
|
+
|
||||||
|
+ // Fix NPE (Spigot bug caused by handleDisconnection())
|
||||||
|
+ if (queued == null) {
|
||||||
|
+ return true;
|
||||||
|
+ }
|
||||||
|
+
|
||||||
|
+ if (queued.isConsumed()) {
|
||||||
|
+ continue;
|
||||||
|
+ }
|
||||||
|
+
|
||||||
|
+ if (queued instanceof PacketSendAction packetSendAction) {
|
||||||
|
+ final Packet<?> packet = packetSendAction.packet;
|
||||||
|
+ if (!packet.isReady()) {
|
||||||
|
+ return false;
|
||||||
|
}
|
||||||
|
+ }
|
||||||
|
|
||||||
|
+ iterator.remove();
|
||||||
|
+ if (queued.tryMarkConsumed()) {
|
||||||
|
+ queued.accept(this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
+ return true;
|
||||||
|
}
|
||||||
|
+ // Paper end - Optimize network
|
||||||
|
|
||||||
|
public void tick() {
|
||||||
|
this.flushQueue();
|
||||||
|
@@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||||
|
public void disconnect(Component disconnectReason) {
|
||||||
|
// Spigot Start
|
||||||
|
this.preparing = false;
|
||||||
|
+ this.clearPacketQueue(); // Paper - Optimize network
|
||||||
|
// Spigot End
|
||||||
|
if (this.channel == null) {
|
||||||
|
this.delayedDisconnect = disconnectReason;
|
||||||
|
@@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||||
|
public void handleDisconnection() {
|
||||||
|
if (this.channel != null && !this.channel.isOpen()) {
|
||||||
|
if (this.disconnectionHandled) {
|
||||||
|
- Connection.LOGGER.warn("handleDisconnection() called twice");
|
||||||
|
+ // Connection.LOGGER.warn("handleDisconnection() called twice"); // Paper - Don't log useless message
|
||||||
|
} else {
|
||||||
|
this.disconnectionHandled = true;
|
||||||
|
PacketListener packetlistener = this.getPacketListener();
|
||||||
|
@@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||||
|
|
||||||
|
packetlistener1.onDisconnect(ichatbasecomponent);
|
||||||
|
}
|
||||||
|
- this.pendingActions.clear(); // Free up packet queue.
|
||||||
|
+ this.clearPacketQueue(); // Paper - Optimize network
|
||||||
|
// Paper start - Add PlayerConnectionCloseEvent
|
||||||
|
final PacketListener packetListener = this.getPacketListener();
|
||||||
|
if (packetListener instanceof net.minecraft.server.network.ServerGamePacketListenerImpl) {
|
||||||
|
@@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||||
|
public void setBandwidthLogger(SampleLogger log) {
|
||||||
|
this.bandwidthDebugMonitor = new BandwidthDebugMonitor(log);
|
||||||
|
}
|
||||||
|
+
|
||||||
|
+ // Paper start - Optimize network
|
||||||
|
+ public void clearPacketQueue() {
|
||||||
|
+ final net.minecraft.server.level.ServerPlayer player = getPlayer();
|
||||||
|
+ for (final Consumer<Connection> queuedAction : this.pendingActions) {
|
||||||
|
+ if (queuedAction instanceof PacketSendAction packetSendAction) {
|
||||||
|
+ final Packet<?> packet = packetSendAction.packet;
|
||||||
|
+ if (packet.hasFinishListener()) {
|
||||||
|
+ packet.onPacketDispatchFinish(player, null);
|
||||||
|
+ }
|
||||||
|
+ }
|
||||||
|
+ }
|
||||||
|
+ this.pendingActions.clear();
|
||||||
|
+ }
|
||||||
|
+
|
||||||
|
+ private static class InnerUtil { // Attempt to hide these methods from ProtocolLib, so it doesn't accidently pick them up.
|
||||||
|
+
|
||||||
|
+ @Nullable
|
||||||
|
+ private static java.util.List<Packet<?>> buildExtraPackets(final Packet<?> packet) {
|
||||||
|
+ final java.util.List<Packet<?>> extra = packet.getExtraPackets();
|
||||||
+ if (extra == null || extra.isEmpty()) {
|
+ if (extra == null || extra.isEmpty()) {
|
||||||
+ return null;
|
+ return null;
|
||||||
+ }
|
+ }
|
||||||
+ java.util.List<Packet> ret = new java.util.ArrayList<>(1 + extra.size());
|
+
|
||||||
|
+ final java.util.List<Packet<?>> ret = new java.util.ArrayList<>(1 + extra.size());
|
||||||
+ buildExtraPackets0(extra, ret);
|
+ buildExtraPackets0(extra, ret);
|
||||||
+ return ret;
|
+ return ret;
|
||||||
+ }
|
+ }
|
||||||
+
|
+
|
||||||
+ private static void buildExtraPackets0(java.util.List<Packet> extraPackets, java.util.List<Packet> into) {
|
+ private static void buildExtraPackets0(final java.util.List<Packet<?>> extraPackets, final java.util.List<Packet<?>> into) {
|
||||||
+ for (Packet extra : extraPackets) {
|
+ for (final Packet<?> extra : extraPackets) {
|
||||||
+ into.add(extra);
|
+ into.add(extra);
|
||||||
+ java.util.List<Packet> extraExtra = extra.getExtraPackets();
|
+ final java.util.List<Packet<?>> extraExtra = extra.getExtraPackets();
|
||||||
+ if (extraExtra != null && !extraExtra.isEmpty()) {
|
+ if (extraExtra != null && !extraExtra.isEmpty()) {
|
||||||
+ buildExtraPackets0(extraExtra, into);
|
+ buildExtraPackets0(extraExtra, into);
|
||||||
+ }
|
+ }
|
||||||
+ }
|
+ }
|
||||||
+ }
|
+ }
|
||||||
+ // Paper start
|
+
|
||||||
+ private static boolean canSendImmediate(Connection networkManager, Packet<?> packet) {
|
+ private static boolean canSendImmediate(final Connection networkManager, final net.minecraft.network.protocol.Packet<?> packet) {
|
||||||
+ return networkManager.isPending || networkManager.protocol != ConnectionProtocol.PLAY ||
|
+ return networkManager.isPending || networkManager.packetListener.protocol() != ConnectionProtocol.PLAY ||
|
||||||
+ packet instanceof net.minecraft.network.protocol.game.ClientboundKeepAlivePacket ||
|
+ packet instanceof net.minecraft.network.protocol.common.ClientboundKeepAlivePacket ||
|
||||||
+ packet instanceof net.minecraft.network.protocol.game.ClientboundPlayerChatPacket ||
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundPlayerChatPacket ||
|
||||||
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSystemChatPacket ||
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSystemChatPacket ||
|
||||||
+ packet instanceof net.minecraft.network.protocol.game.ClientboundCommandSuggestionsPacket ||
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundCommandSuggestionsPacket ||
|
||||||
|
@ -96,218 +294,40 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000
|
||||||
+ packet instanceof net.minecraft.network.protocol.game.ClientboundClearTitlesPacket ||
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundClearTitlesPacket ||
|
||||||
+ packet instanceof net.minecraft.network.protocol.game.ClientboundBossEventPacket;
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundBossEventPacket;
|
||||||
+ }
|
+ }
|
||||||
+ // Paper end
|
|
||||||
+ }
|
+ }
|
||||||
+ // Paper end
|
+
|
||||||
|
+ private static class WrappedConsumer implements Consumer<Connection> {
|
||||||
public void send(Packet<?> packet) {
|
+ private final Consumer<Connection> delegate;
|
||||||
this.send(packet, (PacketSendListener) null);
|
+ private final java.util.concurrent.atomic.AtomicBoolean consumed = new java.util.concurrent.atomic.AtomicBoolean(false);
|
||||||
}
|
+
|
||||||
|
+ private WrappedConsumer(final Consumer<Connection> delegate) {
|
||||||
public void send(Packet<?> packet, @Nullable PacketSendListener callbacks) {
|
+ this.delegate = delegate;
|
||||||
- if (this.isConnected()) {
|
|
||||||
- this.flushQueue();
|
|
||||||
+ // Paper start - handle oversized packets better
|
|
||||||
+ boolean connected = this.isConnected();
|
|
||||||
+ if (!connected && !preparing) {
|
|
||||||
+ return; // Do nothing
|
|
||||||
+ }
|
|
||||||
+ packet.onPacketDispatch(getPlayer());
|
|
||||||
+ if (connected && (InnerUtil.canSendImmediate(this, packet) || (
|
|
||||||
+ io.papermc.paper.util.MCUtil.isMainThread() && packet.isReady() && this.queue.isEmpty() &&
|
|
||||||
+ (packet.getExtraPackets() == null || packet.getExtraPackets().isEmpty())
|
|
||||||
+ ))) {
|
|
||||||
this.sendPacket(packet, callbacks);
|
|
||||||
- } else {
|
|
||||||
- this.queue.add(new Connection.PacketHolder(packet, callbacks));
|
|
||||||
+ return;
|
|
||||||
}
|
|
||||||
+ // write the packets to the queue, then flush - antixray hooks there already
|
|
||||||
+ java.util.List<Packet> extraPackets = InnerUtil.buildExtraPackets(packet);
|
|
||||||
+ boolean hasExtraPackets = extraPackets != null && !extraPackets.isEmpty();
|
|
||||||
+ if (!hasExtraPackets) {
|
|
||||||
+ this.queue.add(new Connection.PacketHolder(packet, callbacks));
|
|
||||||
+ } else {
|
|
||||||
+ java.util.List<Connection.PacketHolder> packets = new java.util.ArrayList<>(1 + extraPackets.size());
|
|
||||||
+ packets.add(new Connection.PacketHolder(packet, null)); // delay the future listener until the end of the extra packets
|
|
||||||
|
|
||||||
+ for (int i = 0, len = extraPackets.size(); i < len;) {
|
|
||||||
+ Packet extra = extraPackets.get(i);
|
|
||||||
+ boolean end = ++i == len;
|
|
||||||
+ packets.add(new Connection.PacketHolder(extra, end ? callbacks : null)); // append listener to the end
|
|
||||||
+ }
|
|
||||||
+ this.queue.addAll(packets); // atomic
|
|
||||||
+ }
|
|
||||||
+ this.flushQueue();
|
|
||||||
+ // Paper end
|
|
||||||
}
|
|
||||||
|
|
||||||
private void sendPacket(Packet<?> packet, @Nullable PacketSendListener callbacks) {
|
|
||||||
@@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
||||||
this.setProtocol(packetState);
|
|
||||||
}
|
|
||||||
|
|
||||||
+ // Paper start
|
|
||||||
+ net.minecraft.server.level.ServerPlayer player = getPlayer();
|
|
||||||
+ if (!isConnected()) {
|
|
||||||
+ packet.onPacketDispatchFinish(player, null);
|
|
||||||
+ return;
|
|
||||||
+ }
|
+ }
|
||||||
+
|
+
|
||||||
+ try {
|
+ @Override
|
||||||
+ // Paper end
|
+ public void accept(final Connection connection) {
|
||||||
ChannelFuture channelfuture = this.channel.writeAndFlush(packet);
|
+ this.delegate.accept(connection);
|
||||||
|
|
||||||
if (callbacks != null) {
|
|
||||||
@@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
||||||
|
|
||||||
});
|
|
||||||
}
|
|
||||||
+ // Paper start
|
|
||||||
+ if (packet.hasFinishListener()) {
|
|
||||||
+ channelfuture.addListener((ChannelFutureListener) channelFuture -> packet.onPacketDispatchFinish(player, channelFuture));
|
|
||||||
+ }
|
+ }
|
||||||
+ // Paper end
|
|
||||||
|
|
||||||
channelfuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
|
|
||||||
+ // Paper start
|
|
||||||
+ } catch (Exception e) {
|
|
||||||
+ LOGGER.error("NetworkException: " + player, e);
|
|
||||||
+ disconnect(Component.translatable("disconnect.genericReason", "Internal Exception: " + e.getMessage()));
|
|
||||||
+ packet.onPacketDispatchFinish(player, null);
|
|
||||||
+ }
|
|
||||||
+ // Paper end
|
|
||||||
}
|
|
||||||
|
|
||||||
private ConnectionProtocol getCurrentProtocol() {
|
|
||||||
return (ConnectionProtocol) this.channel.attr(Connection.ATTRIBUTE_PROTOCOL).get();
|
|
||||||
}
|
|
||||||
|
|
||||||
- private void flushQueue() {
|
|
||||||
+ // Paper start - rewrite this to be safer if ran off main thread
|
|
||||||
+ private boolean flushQueue() { // void -> boolean
|
|
||||||
+ if (!isConnected()) {
|
|
||||||
+ return true;
|
|
||||||
+ }
|
|
||||||
+ if (io.papermc.paper.util.MCUtil.isMainThread()) {
|
|
||||||
+ return processQueue();
|
|
||||||
+ } else if (isPending) {
|
|
||||||
+ // Should only happen during login/status stages
|
|
||||||
+ synchronized (this.queue) {
|
|
||||||
+ return this.processQueue();
|
|
||||||
+ }
|
|
||||||
+ }
|
|
||||||
+ return false;
|
|
||||||
+ }
|
|
||||||
+ private boolean processQueue() {
|
|
||||||
try { // Paper - add pending task queue
|
|
||||||
- if (this.channel != null && this.channel.isOpen()) {
|
|
||||||
- Queue queue = this.queue;
|
|
||||||
+ if (this.queue.isEmpty()) return true;
|
|
||||||
+ // If we are on main, we are safe here in that nothing else should be processing queue off main anymore
|
|
||||||
+ // But if we are not on main due to login/status, the parent is synchronized on packetQueue
|
|
||||||
+ java.util.Iterator<PacketHolder> iterator = this.queue.iterator();
|
|
||||||
+ while (iterator.hasNext()) {
|
|
||||||
+ PacketHolder queued = iterator.next(); // poll -> peek
|
|
||||||
+
|
|
||||||
+ // Fix NPE (Spigot bug caused by handleDisconnection())
|
|
||||||
+ if (queued == null) {
|
|
||||||
+ return true;
|
|
||||||
+ }
|
|
||||||
|
|
||||||
- synchronized (this.queue) {
|
|
||||||
- Connection.PacketHolder networkmanager_queuedpacket;
|
|
||||||
+ // Paper start - checking isConsumed flag and skipping packet sending
|
|
||||||
+ if (queued.isConsumed()) {
|
|
||||||
+ continue;
|
|
||||||
+ }
|
|
||||||
+ // Paper end - checking isConsumed flag and skipping packet sending
|
|
||||||
|
|
||||||
- while ((networkmanager_queuedpacket = (Connection.PacketHolder) this.queue.poll()) != null) {
|
|
||||||
- this.sendPacket(networkmanager_queuedpacket.packet, networkmanager_queuedpacket.listener);
|
|
||||||
+ Packet<?> packet = queued.packet;
|
|
||||||
+ if (!packet.isReady()) {
|
|
||||||
+ return false;
|
|
||||||
+ } else {
|
|
||||||
+ iterator.remove();
|
|
||||||
+ if (queued.tryMarkConsumed()) { // Paper - try to mark isConsumed flag for de-duplicating packet
|
|
||||||
+ this.sendPacket(packet, queued.listener);
|
|
||||||
}
|
|
||||||
-
|
|
||||||
}
|
|
||||||
}
|
|
||||||
+ return true;
|
|
||||||
} finally { // Paper start - add pending task queue
|
|
||||||
Runnable r;
|
|
||||||
while ((r = this.pendingTasks.poll()) != null) {
|
|
||||||
@@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
||||||
}
|
|
||||||
} // Paper end - add pending task queue
|
|
||||||
}
|
|
||||||
+ // Paper end
|
|
||||||
|
|
||||||
public void tick() {
|
|
||||||
this.flushQueue();
|
|
||||||
@@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
||||||
return this.address;
|
|
||||||
}
|
|
||||||
|
|
||||||
+ // Paper start
|
|
||||||
+ public void clearPacketQueue() {
|
|
||||||
+ net.minecraft.server.level.ServerPlayer player = getPlayer();
|
|
||||||
+ queue.forEach(queuedPacket -> {
|
|
||||||
+ Packet<?> packet = queuedPacket.packet;
|
|
||||||
+ if (packet.hasFinishListener()) {
|
|
||||||
+ packet.onPacketDispatchFinish(player, null);
|
|
||||||
+ }
|
|
||||||
+ });
|
|
||||||
+ queue.clear();
|
|
||||||
+ }
|
|
||||||
+ // Paper end
|
|
||||||
public void disconnect(Component disconnectReason) {
|
|
||||||
// Spigot Start
|
|
||||||
this.preparing = false;
|
|
||||||
+ clearPacketQueue(); // Paper
|
|
||||||
// Spigot End
|
|
||||||
if (this.channel == null) {
|
|
||||||
this.delayedDisconnect = disconnectReason;
|
|
||||||
@@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
||||||
public void handleDisconnection() {
|
|
||||||
if (this.channel != null && !this.channel.isOpen()) {
|
|
||||||
if (this.disconnectionHandled) {
|
|
||||||
- Connection.LOGGER.warn("handleDisconnection() called twice");
|
|
||||||
+ //Connection.LOGGER.warn("handleDisconnection() called twice"); // Paper - Do not log useless message
|
|
||||||
} else {
|
|
||||||
this.disconnectionHandled = true;
|
|
||||||
if (this.getDisconnectedReason() != null) {
|
|
||||||
@@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
||||||
} else if (this.getPacketListener() != null) {
|
|
||||||
this.getPacketListener().onDisconnect(Component.translatable("multiplayer.disconnect.generic"));
|
|
||||||
}
|
|
||||||
- this.queue.clear(); // Free up packet queue.
|
|
||||||
+ clearPacketQueue(); // Paper
|
|
||||||
// Paper start - Add PlayerConnectionCloseEvent
|
|
||||||
final PacketListener packetListener = this.getPacketListener();
|
|
||||||
if (packetListener instanceof net.minecraft.server.network.ServerGamePacketListenerImpl) {
|
|
||||||
@@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
||||||
@Nullable
|
|
||||||
final PacketSendListener listener;
|
|
||||||
|
|
||||||
+ // Paper start - isConsumed flag for the connection
|
|
||||||
+ private java.util.concurrent.atomic.AtomicBoolean isConsumed = new java.util.concurrent.atomic.AtomicBoolean(false);
|
|
||||||
+
|
+
|
||||||
+ public boolean tryMarkConsumed() {
|
+ public boolean tryMarkConsumed() {
|
||||||
+ return isConsumed.compareAndSet(false, true);
|
+ return consumed.compareAndSet(false, true);
|
||||||
+ }
|
+ }
|
||||||
+
|
+
|
||||||
+ public boolean isConsumed() {
|
+ public boolean isConsumed() {
|
||||||
+ return isConsumed.get();
|
+ return consumed.get();
|
||||||
+ }
|
+ }
|
||||||
+ // Paper end - isConsumed flag for the connection
|
+ }
|
||||||
+
|
+
|
||||||
public PacketHolder(Packet<?> packet, @Nullable PacketSendListener callbacks) {
|
+ private static final class PacketSendAction extends WrappedConsumer {
|
||||||
this.packet = packet;
|
+ private final Packet<?> packet;
|
||||||
this.listener = callbacks;
|
+
|
||||||
|
+ private PacketSendAction(final Packet<?> packet, @Nullable final PacketSendListener packetSendListener, final boolean flush) {
|
||||||
|
+ super(connection -> connection.sendPacket(packet, packetSendListener, flush));
|
||||||
|
+ this.packet = packet;
|
||||||
|
+ }
|
||||||
|
+ }
|
||||||
|
+ // Paper end - Optimize network
|
||||||
|
}
|
||||||
diff --git a/src/main/java/net/minecraft/network/protocol/Packet.java b/src/main/java/net/minecraft/network/protocol/Packet.java
|
diff --git a/src/main/java/net/minecraft/network/protocol/Packet.java b/src/main/java/net/minecraft/network/protocol/Packet.java
|
||||||
index 0000000000000000000000000000000000000000..0000000000000000000000000000000000000000 100644
|
index 0000000000000000000000000000000000000000..0000000000000000000000000000000000000000 100644
|
||||||
--- a/src/main/java/net/minecraft/network/protocol/Packet.java
|
--- a/src/main/java/net/minecraft/network/protocol/Packet.java
|
||||||
|
@ -319,16 +339,27 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000
|
||||||
+ /**
|
+ /**
|
||||||
+ * @param player Null if not at PLAY stage yet
|
+ * @param player Null if not at PLAY stage yet
|
||||||
+ */
|
+ */
|
||||||
+ default void onPacketDispatch(@javax.annotation.Nullable net.minecraft.server.level.ServerPlayer player) {}
|
+ default void onPacketDispatch(@Nullable net.minecraft.server.level.ServerPlayer player) {
|
||||||
|
+ }
|
||||||
+
|
+
|
||||||
+ /**
|
+ /**
|
||||||
+ * @param player Null if not at PLAY stage yet
|
+ * @param player Null if not at PLAY stage yet
|
||||||
+ * @param future Can be null if packet was cancelled
|
+ * @param future Can be null if packet was cancelled
|
||||||
+ */
|
+ */
|
||||||
+ default void onPacketDispatchFinish(@javax.annotation.Nullable net.minecraft.server.level.ServerPlayer player, @javax.annotation.Nullable io.netty.channel.ChannelFuture future) {}
|
+ default void onPacketDispatchFinish(@Nullable net.minecraft.server.level.ServerPlayer player, @Nullable io.netty.channel.ChannelFuture future) {}
|
||||||
+ default boolean hasFinishListener() { return false; }
|
+
|
||||||
+ default boolean isReady() { return true; }
|
+ default boolean hasFinishListener() {
|
||||||
+ default java.util.List<Packet> getExtraPackets() { return null; }
|
+ return false;
|
||||||
|
+ }
|
||||||
|
+
|
||||||
|
+ default boolean isReady() {
|
||||||
|
+ return true;
|
||||||
|
+ }
|
||||||
|
+
|
||||||
|
+ @Nullable
|
||||||
|
+ default java.util.List<Packet<?>> getExtraPackets() {
|
||||||
|
+ return null;
|
||||||
|
+ }
|
||||||
default boolean packetTooLarge(net.minecraft.network.Connection manager) {
|
default boolean packetTooLarge(net.minecraft.network.Connection manager) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -340,12 +371,12 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000
|
||||||
final List<Connection> connections = Collections.synchronizedList(Lists.newArrayList());
|
final List<Connection> connections = Collections.synchronizedList(Lists.newArrayList());
|
||||||
// Paper start - prevent blocking on adding a new network manager while the server is ticking
|
// Paper start - prevent blocking on adding a new network manager while the server is ticking
|
||||||
private final java.util.Queue<Connection> pending = new java.util.concurrent.ConcurrentLinkedQueue<>();
|
private final java.util.Queue<Connection> pending = new java.util.concurrent.ConcurrentLinkedQueue<>();
|
||||||
+ private static final boolean disableFlushConsolidation = Boolean.getBoolean("Paper.disableFlushConsolidate"); // Paper
|
+ private static final boolean disableFlushConsolidation = Boolean.getBoolean("Paper.disableFlushConsolidate"); // Paper - Optimize network
|
||||||
private final void addPending() {
|
private final void addPending() {
|
||||||
Connection manager = null;
|
Connection manager = null;
|
||||||
while ((manager = pending.poll()) != null) {
|
while ((manager = pending.poll()) != null) {
|
||||||
connections.add(manager);
|
connections.add(manager);
|
||||||
+ manager.isPending = false;
|
+ manager.isPending = false; // Paper - Optimize network
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Paper end
|
// Paper end
|
||||||
|
@ -353,7 +384,7 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000
|
||||||
;
|
;
|
||||||
}
|
}
|
||||||
|
|
||||||
+ if (!disableFlushConsolidation) channel.pipeline().addFirst(new io.netty.handler.flush.FlushConsolidationHandler()); // Paper
|
+ if (!disableFlushConsolidation) channel.pipeline().addFirst(new io.netty.handler.flush.FlushConsolidationHandler()); // Paper - Optimize network
|
||||||
ChannelPipeline channelpipeline = channel.pipeline().addLast("timeout", new ReadTimeoutHandler(30)).addLast("legacy_query", new LegacyQueryHandler(ServerConnectionListener.this));
|
ChannelPipeline channelpipeline = channel.pipeline().addLast("timeout", new ReadTimeoutHandler(30)).addLast("legacy_query", new LegacyQueryHandler(ServerConnectionListener.this.getServer()));
|
||||||
|
|
||||||
Connection.configureSerialization(channelpipeline, PacketFlow.SERVERBOUND);
|
Connection.configureSerialization(channelpipeline, PacketFlow.SERVERBOUND, (BandwidthDebugMonitor) null);
|
Loading…
Reference in a new issue