mirror of
https://github.com/PaperMC/Paper.git
synced 2025-01-01 08:56:23 +01:00
148 lines
7.4 KiB
Diff
148 lines
7.4 KiB
Diff
|
From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001
|
||
|
From: Spottedleaf <spottedleaf@spottedleaf.dev>
|
||
|
Date: Sat, 4 Apr 2020 15:27:44 -0700
|
||
|
Subject: [PATCH] Allow controlled flushing for network manager
|
||
|
|
||
|
Only make one flush call when emptying the packet queue too
|
||
|
|
||
|
This patch will be used to optimise out flush calls in later
|
||
|
patches.
|
||
|
|
||
|
diff --git a/src/main/java/net/minecraft/network/Connection.java b/src/main/java/net/minecraft/network/Connection.java
|
||
|
index 9d09ec3b127e3440bef6b248578dec109407f9ff..103657ad936f1a75ffbb92ca5eafb816e977e363 100644
|
||
|
--- a/src/main/java/net/minecraft/network/Connection.java
|
||
|
+++ b/src/main/java/net/minecraft/network/Connection.java
|
||
|
@@ -94,6 +94,39 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||
|
public ConnectionProtocol protocol;
|
||
|
// Paper end
|
||
|
|
||
|
+ // Paper start - allow controlled flushing
|
||
|
+ volatile boolean canFlush = true;
|
||
|
+ private final java.util.concurrent.atomic.AtomicInteger packetWrites = new java.util.concurrent.atomic.AtomicInteger();
|
||
|
+ private int flushPacketsStart;
|
||
|
+ private final Object flushLock = new Object();
|
||
|
+
|
||
|
+ public void disableAutomaticFlush() {
|
||
|
+ synchronized (this.flushLock) {
|
||
|
+ this.flushPacketsStart = this.packetWrites.get(); // must be volatile and before canFlush = false
|
||
|
+ this.canFlush = false;
|
||
|
+ }
|
||
|
+ }
|
||
|
+
|
||
|
+ public void enableAutomaticFlush() {
|
||
|
+ synchronized (this.flushLock) {
|
||
|
+ this.canFlush = true;
|
||
|
+ if (this.packetWrites.get() != this.flushPacketsStart) { // must be after canFlush = true
|
||
|
+ this.flush(); // only make the flush call if we need to
|
||
|
+ }
|
||
|
+ }
|
||
|
+ }
|
||
|
+
|
||
|
+ private final void flush() {
|
||
|
+ if (this.channel.eventLoop().inEventLoop()) {
|
||
|
+ this.channel.flush();
|
||
|
+ } else {
|
||
|
+ this.channel.eventLoop().execute(() -> {
|
||
|
+ this.channel.flush();
|
||
|
+ });
|
||
|
+ }
|
||
|
+ }
|
||
|
+ // Paper end - allow controlled flushing
|
||
|
+
|
||
|
public Connection(PacketFlow side) {
|
||
|
this.receiving = side;
|
||
|
}
|
||
|
@@ -255,7 +288,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||
|
net.minecraft.server.MCUtil.isMainThread() && packet.isReady() && this.queue.isEmpty() &&
|
||
|
(packet.getExtraPackets() == null || packet.getExtraPackets().isEmpty())
|
||
|
))) {
|
||
|
- this.sendPacket(packet, callback);
|
||
|
+ this.writePacket(packet, callback, null); // Paper
|
||
|
return;
|
||
|
}
|
||
|
// write the packets to the queue, then flush - antixray hooks there already
|
||
|
@@ -279,6 +312,14 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||
|
}
|
||
|
|
||
|
private void sendPacket(Packet<?> packet, @Nullable GenericFutureListener<? extends Future<? super Void>> callback) {
|
||
|
+ // Paper start - add flush parameter
|
||
|
+ this.writePacket(packet, callback, Boolean.TRUE);
|
||
|
+ }
|
||
|
+ private void writePacket(Packet<?> packet, @Nullable GenericFutureListener<? extends Future<? super Void>> callback, Boolean flushConditional) {
|
||
|
+ this.packetWrites.getAndIncrement(); // must be befeore using canFlush
|
||
|
+ boolean effectiveFlush = flushConditional == null ? this.canFlush : flushConditional.booleanValue();
|
||
|
+ final boolean flush = effectiveFlush || packet instanceof net.minecraft.network.protocol.game.ClientboundKeepAlivePacket || packet instanceof ClientboundDisconnectPacket; // no delay for certain packets
|
||
|
+ // Paper end - add flush parameter
|
||
|
ConnectionProtocol enumprotocol = ConnectionProtocol.getProtocolForPacket(packet);
|
||
|
ConnectionProtocol enumprotocol1 = this.getCurrentProtocol();
|
||
|
|
||
|
@@ -289,16 +330,21 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||
|
}
|
||
|
|
||
|
if (this.channel.eventLoop().inEventLoop()) {
|
||
|
- this.a(packet, callback, enumprotocol, enumprotocol1);
|
||
|
+ this.a(packet, callback, enumprotocol, enumprotocol1, flush); // Paper - add flush parameter
|
||
|
} else {
|
||
|
this.channel.eventLoop().execute(() -> {
|
||
|
- this.a(packet, callback, enumprotocol, enumprotocol1);
|
||
|
+ this.a(packet, callback, enumprotocol, enumprotocol1, flush); // Paper - add flush parameter
|
||
|
});
|
||
|
}
|
||
|
|
||
|
}
|
||
|
|
||
|
private void a(Packet<?> packet, @Nullable GenericFutureListener<? extends Future<? super Void>> genericfuturelistener, ConnectionProtocol enumprotocol, ConnectionProtocol enumprotocol1) {
|
||
|
+ // Paper start - add flush parameter
|
||
|
+ this.a(packet, genericfuturelistener, enumprotocol, enumprotocol1, true);
|
||
|
+ }
|
||
|
+ private void a(Packet<?> packet, @Nullable GenericFutureListener<? extends Future<? super Void>> genericfuturelistener, ConnectionProtocol enumprotocol, ConnectionProtocol enumprotocol1, boolean flush) {
|
||
|
+ // Paper end - add flush parameter
|
||
|
if (enumprotocol != enumprotocol1) {
|
||
|
this.setProtocol(enumprotocol);
|
||
|
}
|
||
|
@@ -312,7 +358,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||
|
|
||
|
try {
|
||
|
// Paper end
|
||
|
- ChannelFuture channelfuture = this.channel.writeAndFlush(packet);
|
||
|
+ ChannelFuture channelfuture = flush ? this.channel.writeAndFlush(packet) : this.channel.write(packet); // Paper - add flush parameter
|
||
|
|
||
|
if (genericfuturelistener != null) {
|
||
|
channelfuture.addListener(genericfuturelistener);
|
||
|
@@ -354,6 +400,10 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||
|
}
|
||
|
private boolean processQueue() {
|
||
|
if (this.queue.isEmpty()) return true;
|
||
|
+ // Paper start - make only one flush call per sendPacketQueue() call
|
||
|
+ final boolean needsFlush = this.canFlush;
|
||
|
+ boolean hasWrotePacket = false;
|
||
|
+ // Paper end - make only one flush call per sendPacketQueue() call
|
||
|
// If we are on main, we are safe here in that nothing else should be processing queue off main anymore
|
||
|
// But if we are not on main due to login/status, the parent is synchronized on packetQueue
|
||
|
java.util.Iterator<PacketHolder> iterator = this.queue.iterator();
|
||
|
@@ -361,16 +411,22 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||
|
PacketHolder queued = iterator.next(); // poll -> peek
|
||
|
|
||
|
// Fix NPE (Spigot bug caused by handleDisconnection())
|
||
|
- if (queued == null) {
|
||
|
+ if (false && queued == null) { // Paper - diff on change, this logic is redundant: iterator guarantees ret of an element - on change, hook the flush logic here
|
||
|
return true;
|
||
|
}
|
||
|
|
||
|
Packet<?> packet = queued.packet;
|
||
|
if (!packet.isReady()) {
|
||
|
+ // Paper start - make only one flush call per sendPacketQueue() call
|
||
|
+ if (hasWrotePacket && (needsFlush || this.canFlush)) {
|
||
|
+ this.flush();
|
||
|
+ }
|
||
|
+ // Paper end - make only one flush call per sendPacketQueue() call
|
||
|
return false;
|
||
|
} else {
|
||
|
iterator.remove();
|
||
|
- this.sendPacket(packet, queued.listener);
|
||
|
+ this.writePacket(packet, queued.listener, (!iterator.hasNext() && (needsFlush || this.canFlush)) ? Boolean.TRUE : Boolean.FALSE); // Paper - make only one flush call per sendPacketQueue() call
|
||
|
+ hasWrotePacket = true; // Paper - make only one flush call per sendPacketQueue() call
|
||
|
}
|
||
|
}
|
||
|
return true;
|