mirror of
https://github.com/PaperMC/Paper.git
synced 2024-12-27 23:10:16 +01:00
Update Optimize-Network-Manager
This commit is contained in:
parent
a148433e1e
commit
797abd11dd
1 changed files with 85 additions and 89 deletions
|
@ -27,34 +27,34 @@ and then catch exceptions and close if they fire.
|
|||
|
||||
Part of this commit was authored by: Spottedleaf, sandtechnology
|
||||
|
||||
diff --git a/src/main/java/net/minecraft/network/Connection.java b/src/main/java/net/minecraft/network/Connection.java
|
||||
index 0000000000000000000000000000000000000000..0000000000000000000000000000000000000000 100644
|
||||
--- a/src/main/java/net/minecraft/network/Connection.java
|
||||
+++ b/src/main/java/net/minecraft/network/Connection.java
|
||||
@@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||
diff --git a/net/minecraft/network/Connection.java b/net/minecraft/network/Connection.java
|
||||
index b624f001ba9d98c4dc68fcd66c0bc2de0a12308c..c4bb28857ee11dccc9924666634488044c666fd1 100644
|
||||
--- a/net/minecraft/network/Connection.java
|
||||
+++ b/net/minecraft/network/Connection.java
|
||||
@@ -85,7 +85,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||
private static final ProtocolInfo<ServerHandshakePacketListener> INITIAL_PROTOCOL = HandshakeProtocols.SERVERBOUND;
|
||||
private final PacketFlow receiving;
|
||||
private volatile boolean sendLoginDisconnect = true;
|
||||
- private final Queue<Consumer<Connection>> pendingActions = Queues.newConcurrentLinkedQueue();
|
||||
+ private final Queue<WrappedConsumer> pendingActions = Queues.newConcurrentLinkedQueue(); // Paper
|
||||
+ private final Queue<WrappedConsumer> pendingActions = Queues.newConcurrentLinkedQueue(); // Paper - Optimize network
|
||||
public Channel channel;
|
||||
public SocketAddress address;
|
||||
// Spigot Start
|
||||
@@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||
public java.net.InetSocketAddress virtualHost;
|
||||
private static boolean enableExplicitFlush = Boolean.getBoolean("paper.explicit-flush"); // Paper - Disable explicit network manager flushing
|
||||
// Paper end
|
||||
@@ -145,6 +145,10 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||
}
|
||||
// Paper end - packet limiter
|
||||
@Nullable public SocketAddress haProxyAddress; // Paper - Add API to get player's proxy address
|
||||
+ // Paper start - Optimize network
|
||||
+ public boolean isPending = true;
|
||||
+ public boolean queueImmunity;
|
||||
+ // Paper end - Optimize network
|
||||
|
||||
// Paper start - add utility methods
|
||||
public final net.minecraft.server.level.ServerPlayer getPlayer() {
|
||||
@@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||
public Connection(PacketFlow receiving) {
|
||||
this.receiving = receiving;
|
||||
@@ -425,11 +429,38 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||
}
|
||||
|
||||
public void send(Packet<?> packet, @Nullable PacketSendListener callbacks, boolean flush) {
|
||||
public void send(Packet<?> packet, @Nullable PacketSendListener listener, boolean flush) {
|
||||
- if (this.isConnected()) {
|
||||
- this.flushQueue();
|
||||
+ // Paper start - Optimize network: Handle oversized packets better
|
||||
|
@ -67,17 +67,14 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000
|
|||
+ if (connected && (InnerUtil.canSendImmediate(this, packet)
|
||||
+ || (io.papermc.paper.util.MCUtil.isMainThread() && packet.isReady() && this.pendingActions.isEmpty()
|
||||
+ && (packet.getExtraPackets() == null || packet.getExtraPackets().isEmpty())))) {
|
||||
this.sendPacket(packet, callbacks, flush);
|
||||
this.sendPacket(packet, listener, flush);
|
||||
} else {
|
||||
- this.pendingActions.add((networkmanager) -> {
|
||||
- networkmanager.sendPacket(packet, callbacks, flush);
|
||||
- });
|
||||
- }
|
||||
- this.pendingActions.add(connection -> connection.sendPacket(packet, listener, flush));
|
||||
+ // Write the packets to the queue, then flush - antixray hooks there already
|
||||
+ final java.util.List<Packet<?>> extraPackets = InnerUtil.buildExtraPackets(packet);
|
||||
+ final boolean hasExtraPackets = extraPackets != null && !extraPackets.isEmpty();
|
||||
+ if (!hasExtraPackets) {
|
||||
+ this.pendingActions.add(new PacketSendAction(packet, callbacks, flush));
|
||||
+ this.pendingActions.add(new PacketSendAction(packet, listener, flush));
|
||||
+ } else {
|
||||
+ final java.util.List<PacketSendAction> actions = new java.util.ArrayList<>(1 + extraPackets.size());
|
||||
+ actions.add(new PacketSendAction(packet, null, false)); // Delay the future listener until the end of the extra packets
|
||||
|
@ -85,31 +82,30 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000
|
|||
+ for (int i = 0, len = extraPackets.size(); i < len;) {
|
||||
+ final Packet<?> extraPacket = extraPackets.get(i);
|
||||
+ final boolean end = ++i == len;
|
||||
+ actions.add(new PacketSendAction(extraPacket, end ? callbacks : null, end)); // Append listener to the end
|
||||
+ actions.add(new PacketSendAction(extraPacket, end ? listener : null, end)); // Append listener to the end
|
||||
+ }
|
||||
+
|
||||
+ this.pendingActions.addAll(actions);
|
||||
+ }
|
||||
|
||||
+
|
||||
+ this.flushQueue();
|
||||
+ // Paper end - Optimize network
|
||||
+ }
|
||||
}
|
||||
|
||||
public void runOnceConnected(Consumer<Connection> task) {
|
||||
@@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||
this.flushQueue();
|
||||
task.accept(this);
|
||||
} else {
|
||||
- this.pendingActions.add(task);
|
||||
+ this.pendingActions.add(new WrappedConsumer(task)); // Paper - Optimize network
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||
}
|
||||
|
||||
private void doSendPacket(Packet<?> packet, @Nullable PacketSendListener callbacks, boolean flush) {
|
||||
@@ -438,7 +469,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||
this.flushQueue();
|
||||
action.accept(this);
|
||||
} else {
|
||||
- this.pendingActions.add(action);
|
||||
+ this.pendingActions.add(new WrappedConsumer(action)); // Paper - Optimize network
|
||||
}
|
||||
}
|
||||
|
||||
@@ -452,6 +483,14 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||
}
|
||||
|
||||
private void doSendPacket(Packet<?> packet, @Nullable PacketSendListener sendListener, boolean flush) {
|
||||
+ // Paper start - Optimize network
|
||||
+ final net.minecraft.server.level.ServerPlayer player = this.getPlayer();
|
||||
+ if (!this.isConnected()) {
|
||||
|
@ -118,18 +114,18 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000
|
|||
+ }
|
||||
+ try {
|
||||
+ // Paper end - Optimize network
|
||||
ChannelFuture channelfuture = flush ? this.channel.writeAndFlush(packet) : this.channel.write(packet);
|
||||
|
||||
if (callbacks != null) {
|
||||
@@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||
ChannelFuture channelFuture = flush ? this.channel.writeAndFlush(packet) : this.channel.write(packet);
|
||||
if (sendListener != null) {
|
||||
channelFuture.addListener(future -> {
|
||||
@@ -467,14 +506,24 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||
});
|
||||
}
|
||||
|
||||
+ // Paper start - Optimize network
|
||||
+ if (packet.hasFinishListener()) {
|
||||
+ channelfuture.addListener((ChannelFutureListener) channelFuture -> packet.onPacketDispatchFinish(player, channelFuture));
|
||||
+ channelFuture.addListener((ChannelFutureListener) future -> packet.onPacketDispatchFinish(player, future));
|
||||
+ }
|
||||
channelfuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
|
||||
channelFuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
|
||||
+ } catch (final Exception e) {
|
||||
+ LOGGER.error("NetworkException: {}", player, e);
|
||||
+ this.disconnect(Component.translatable("disconnect.genericReason", "Internal Exception: " + e.getMessage()));
|
||||
|
@ -145,16 +141,14 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000
|
|||
- this.pendingActions.add(Connection::flush);
|
||||
+ this.pendingActions.add(new WrappedConsumer(Connection::flush)); // Paper - Optimize network
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||
|
||||
@@ -486,16 +535,57 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||
}
|
||||
}
|
||||
|
||||
- private void flushQueue() {
|
||||
- if (this.channel != null && this.channel.isOpen()) {
|
||||
- Queue queue = this.pendingActions;
|
||||
-
|
||||
+ // Paper start - Optimize network: Rewrite this to be safer if ran off main thread
|
||||
+ private boolean flushQueue() {
|
||||
+ if (!this.isConnected()) {
|
||||
|
@ -165,7 +159,9 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000
|
|||
+ } else if (this.isPending) {
|
||||
+ // Should only happen during login/status stages
|
||||
synchronized (this.pendingActions) {
|
||||
- Consumer consumer;
|
||||
- Consumer<Connection> consumer;
|
||||
- while ((consumer = this.pendingActions.poll()) != null) {
|
||||
- consumer.accept(this);
|
||||
+ return this.processQueue();
|
||||
+ }
|
||||
+ }
|
||||
|
@ -176,9 +172,7 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000
|
|||
+ if (this.pendingActions.isEmpty()) {
|
||||
+ return true;
|
||||
+ }
|
||||
|
||||
- while ((consumer = (Consumer) this.pendingActions.poll()) != null) {
|
||||
- consumer.accept(this);
|
||||
+
|
||||
+ // If we are on main, we are safe here in that nothing else should be processing queue off main anymore
|
||||
+ // But if we are not on main due to login/status, the parent is synchronized on packetQueue
|
||||
+ final java.util.Iterator<WrappedConsumer> iterator = this.pendingActions.iterator();
|
||||
|
@ -199,12 +193,12 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000
|
|||
+ if (!packet.isReady()) {
|
||||
+ return false;
|
||||
}
|
||||
+ }
|
||||
|
||||
}
|
||||
+
|
||||
+ iterator.remove();
|
||||
+ if (queued.tryMarkConsumed()) {
|
||||
+ queued.accept(this);
|
||||
}
|
||||
+ }
|
||||
}
|
||||
+ return true;
|
||||
}
|
||||
|
@ -212,35 +206,35 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000
|
|||
|
||||
private static final int MAX_PER_TICK = io.papermc.paper.configuration.GlobalConfiguration.get().misc.maxJoinsPerTick; // Paper - Buffer joins to world
|
||||
private static int joinAttemptsThisTick; // Paper - Buffer joins to world
|
||||
@@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||
public void disconnect(DisconnectionDetails disconnectionInfo) {
|
||||
@@ -561,6 +651,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||
// Spigot Start
|
||||
this.preparing = false;
|
||||
+ this.clearPacketQueue(); // Paper - Optimize network
|
||||
// Spigot End
|
||||
+ this.clearPacketQueue(); // Paper - Optimize network
|
||||
if (this.channel == null) {
|
||||
this.delayedDisconnect = disconnectionInfo;
|
||||
@@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||
this.delayedDisconnect = disconnectionDetails;
|
||||
}
|
||||
@@ -749,7 +840,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||
public void handleDisconnection() {
|
||||
if (this.channel != null && !this.channel.isOpen()) {
|
||||
if (this.disconnectionHandled) {
|
||||
- Connection.LOGGER.warn("handleDisconnection() called twice");
|
||||
+ // Connection.LOGGER.warn("handleDisconnection() called twice"); // Paper - Don't log useless message
|
||||
- LOGGER.warn("handleDisconnection() called twice");
|
||||
+ // LOGGER.warn("handleDisconnection() called twice"); // Paper - Don't log useless message
|
||||
} else {
|
||||
this.disconnectionHandled = true;
|
||||
PacketListener packetlistener = this.getPacketListener();
|
||||
@@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||
|
||||
packetlistener1.onDisconnect(disconnectiondetails);
|
||||
PacketListener packetListener = this.getPacketListener();
|
||||
@@ -760,7 +851,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||
);
|
||||
packetListener1.onDisconnect(disconnectionDetails);
|
||||
}
|
||||
- this.pendingActions.clear(); // Free up packet queue.
|
||||
+ this.clearPacketQueue(); // Paper - Optimize network
|
||||
// Paper start - Add PlayerConnectionCloseEvent
|
||||
final PacketListener packetListener = this.getPacketListener();
|
||||
if (packetListener instanceof net.minecraft.server.network.ServerCommonPacketListenerImpl commonPacketListener) {
|
||||
@@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||
public void setBandwidthLogger(LocalSampleLogger log) {
|
||||
this.bandwidthDebugMonitor = new BandwidthDebugMonitor(log);
|
||||
/* Player was logged in, either game listener or configuration listener */
|
||||
@@ -795,4 +886,93 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||
public void setBandwidthLogger(LocalSampleLogger bandwithLogger) {
|
||||
this.bandwidthDebugMonitor = new BandwidthDebugMonitor(bandwithLogger);
|
||||
}
|
||||
+
|
||||
+ // Paper start - Optimize network
|
||||
|
@ -332,11 +326,11 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000
|
|||
+ }
|
||||
+ // Paper end - Optimize network
|
||||
}
|
||||
diff --git a/src/main/java/net/minecraft/network/protocol/Packet.java b/src/main/java/net/minecraft/network/protocol/Packet.java
|
||||
index 0000000000000000000000000000000000000000..0000000000000000000000000000000000000000 100644
|
||||
--- a/src/main/java/net/minecraft/network/protocol/Packet.java
|
||||
+++ b/src/main/java/net/minecraft/network/protocol/Packet.java
|
||||
@@ -0,0 +0,0 @@ public interface Packet<T extends PacketListener> {
|
||||
diff --git a/net/minecraft/network/protocol/Packet.java b/net/minecraft/network/protocol/Packet.java
|
||||
index 65ff8b9112ec76eeac48c679044fc02ae7d4ffeb..e4789584cbe43959681a8522c66eab58369bebd0 100644
|
||||
--- a/net/minecraft/network/protocol/Packet.java
|
||||
+++ b/net/minecraft/network/protocol/Packet.java
|
||||
@@ -35,4 +35,32 @@ public interface Packet<T extends PacketListener> {
|
||||
static <B extends ByteBuf, T extends Packet<?>> StreamCodec<B, T> codec(StreamMemberEncoder<B, T> encoder, StreamDecoder<B, T> decoder) {
|
||||
return StreamCodec.ofMember(encoder, decoder);
|
||||
}
|
||||
|
@ -352,7 +346,8 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000
|
|||
+ * @param player Null if not at PLAY stage yet
|
||||
+ * @param future Can be null if packet was cancelled
|
||||
+ */
|
||||
+ default void onPacketDispatchFinish(@org.jetbrains.annotations.Nullable net.minecraft.server.level.ServerPlayer player, @org.jetbrains.annotations.Nullable io.netty.channel.ChannelFuture future) {}
|
||||
+ default void onPacketDispatchFinish(@org.jetbrains.annotations.Nullable net.minecraft.server.level.ServerPlayer player, @org.jetbrains.annotations.Nullable io.netty.channel.ChannelFuture future) {
|
||||
+ }
|
||||
+
|
||||
+ default boolean hasFinishListener() {
|
||||
+ return false;
|
||||
|
@ -368,28 +363,29 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000
|
|||
+ }
|
||||
+ // Paper end
|
||||
}
|
||||
diff --git a/src/main/java/net/minecraft/server/network/ServerConnectionListener.java b/src/main/java/net/minecraft/server/network/ServerConnectionListener.java
|
||||
index 0000000000000000000000000000000000000000..0000000000000000000000000000000000000000 100644
|
||||
--- a/src/main/java/net/minecraft/server/network/ServerConnectionListener.java
|
||||
+++ b/src/main/java/net/minecraft/server/network/ServerConnectionListener.java
|
||||
@@ -0,0 +0,0 @@ public class ServerConnectionListener {
|
||||
final List<Connection> connections = Collections.synchronizedList(Lists.newArrayList());
|
||||
diff --git a/net/minecraft/server/network/ServerConnectionListener.java b/net/minecraft/server/network/ServerConnectionListener.java
|
||||
index 18fa53903cd6500ae65d993a6fe7f49d6b069339..b68adf37af7172671163d4a8074d2bfa97724b4b 100644
|
||||
--- a/net/minecraft/server/network/ServerConnectionListener.java
|
||||
+++ b/net/minecraft/server/network/ServerConnectionListener.java
|
||||
@@ -66,11 +66,13 @@ public class ServerConnectionListener {
|
||||
|
||||
// Paper start - prevent blocking on adding a new connection while the server is ticking
|
||||
private final java.util.Queue<Connection> pending = new java.util.concurrent.ConcurrentLinkedQueue<>();
|
||||
+ private static final boolean disableFlushConsolidation = Boolean.getBoolean("Paper.disableFlushConsolidate"); // Paper - Optimize network
|
||||
|
||||
private final void addPending() {
|
||||
Connection connection;
|
||||
while ((connection = pending.poll()) != null) {
|
||||
connections.add(connection);
|
||||
while ((connection = this.pending.poll()) != null) {
|
||||
this.connections.add(connection);
|
||||
+ connection.isPending = false; // Paper - Optimize network
|
||||
}
|
||||
}
|
||||
// Paper end - prevent blocking on adding a new connection while the server is ticking
|
||||
@@ -0,0 +0,0 @@ public class ServerConnectionListener {
|
||||
;
|
||||
}
|
||||
@@ -120,6 +122,7 @@ public class ServerConnectionListener {
|
||||
} catch (ChannelException var5) {
|
||||
}
|
||||
|
||||
+ if (!disableFlushConsolidation) channel.pipeline().addFirst(new io.netty.handler.flush.FlushConsolidationHandler()); // Paper - Optimize network
|
||||
ChannelPipeline channelpipeline = channel.pipeline().addLast("timeout", new ReadTimeoutHandler(30));
|
||||
|
||||
if (ServerConnectionListener.this.server.repliesToStatus()) {
|
||||
+ if (!disableFlushConsolidation) channel.pipeline().addFirst(new io.netty.handler.flush.FlushConsolidationHandler()); // Paper - Optimize network
|
||||
ChannelPipeline channelPipeline = channel.pipeline().addLast("timeout", new ReadTimeoutHandler(30));
|
||||
if (ServerConnectionListener.this.server.repliesToStatus()) {
|
||||
channelPipeline.addLast("legacy_query", new LegacyQueryHandler(ServerConnectionListener.this.getServer()));
|
||||
|
|
Loading…
Reference in a new issue