From bde545c82a8fb8117bf04d24b2f62ede22d512a4 Mon Sep 17 00:00:00 2001 From: Spigot Date: Sat, 20 Apr 2013 19:41:41 +1000 Subject: [PATCH] Dramatically reduce the idle network activity by collecting and framing packets for a max of 10 milliseconds. This reduces bandwidth consumption as much as possible whilst not generating network garbage and other nasties. By: md_5 --- CraftBukkit-Patches/0021-Netty.patch | 202 ++++++++++++--------------- 1 file changed, 86 insertions(+), 116 deletions(-) diff --git a/CraftBukkit-Patches/0021-Netty.patch b/CraftBukkit-Patches/0021-Netty.patch index 7723ae830f..fa500a95a5 100644 --- a/CraftBukkit-Patches/0021-Netty.patch +++ b/CraftBukkit-Patches/0021-Netty.patch @@ -1,4 +1,4 @@ -From efe9a9aecb849b6886372c7d9445cd79dd706687 Mon Sep 17 00:00:00 2001 +From c9f5ce7c8d93b8ae0346eaad87e2d272b0f2a462 Mon Sep 17 00:00:00 2001 From: md_5 Date: Fri, 19 Apr 2013 17:44:39 +1000 Subject: [PATCH] Netty @@ -417,10 +417,10 @@ index 0000000..c8ea80a +} diff --git a/src/main/java/org/spigotmc/netty/CipherCodec.java b/src/main/java/org/spigotmc/netty/CipherCodec.java new file mode 100644 -index 0000000..2dbbf6c +index 0000000..5e3a5f9 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/CipherCodec.java -@@ -0,0 +1,67 @@ +@@ -0,0 +1,59 @@ +package org.spigotmc.netty; + +import io.netty.buffer.ByteBuf; @@ -428,7 +428,6 @@ index 0000000..2dbbf6c +import io.netty.handler.codec.ByteToByteCodec; +import javax.crypto.Cipher; +import javax.crypto.ShortBufferException; -+import net.minecraft.server.Packet252KeyResponse; + +/** + * This class is a complete solution for encrypting and decoding bytes in a @@ -439,7 +438,6 @@ index 0000000..2dbbf6c + + private Cipher encrypt; + private Cipher decrypt; -+ private Packet252KeyResponse responsePacket; + private ThreadLocal heapInLocal = new EmptyByteThreadLocal(); + private ThreadLocal heapOutLocal = new EmptyByteThreadLocal(); + @@ -451,15 +449,9 @@ index 0000000..2dbbf6c + } + } + -+ public CipherCodec(Cipher encrypt, Cipher decrypt, Packet252KeyResponse responsePacket) { ++ public CipherCodec(Cipher encrypt, Cipher decrypt) { + this.encrypt = encrypt; + this.decrypt = decrypt; -+ this.responsePacket = responsePacket; -+ } -+ -+ @Override -+ public void beforeAdd(ChannelHandlerContext ctx) throws Exception { -+ ctx.channel().write(responsePacket); + } + + @Override @@ -490,27 +482,34 @@ index 0000000..2dbbf6c +} diff --git a/src/main/java/org/spigotmc/netty/NettyNetworkManager.java b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java new file mode 100644 -index 0000000..0e1b1fd +index 0000000..2036b8e --- /dev/null +++ b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java -@@ -0,0 +1,253 @@ +@@ -0,0 +1,298 @@ +package org.spigotmc.netty; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; ++import io.netty.buffer.ByteBuf; ++import io.netty.buffer.ByteBufOutputStream; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundMessageHandlerAdapter; +import io.netty.channel.socket.SocketChannel; ++import io.netty.util.concurrent.ScheduledFuture; ++import java.io.DataOutputStream; ++import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.security.PrivateKey; -+import java.util.AbstractList; ++import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; ++import java.util.concurrent.TimeUnit; ++import java.util.concurrent.locks.ReentrantLock; +import javax.crypto.Cipher; +import javax.crypto.SecretKey; +import net.minecraft.server.Connection; @@ -535,22 +534,10 @@ index 0000000..0e1b1fd + private static final MultiplexingServerConnection serverConnection = (MultiplexingServerConnection) server.ae(); + /*========================================================================*/ + private final Queue syncPackets = new ConcurrentLinkedQueue(); -+ private final List highPriorityQueue = new AbstractList() { -+ @Override -+ public void add(int index, Packet element) { -+ // NOP -+ } -+ -+ @Override -+ public Packet get(int index) { -+ throw new UnsupportedOperationException(); -+ } -+ -+ @Override -+ public int size() { -+ return 0; -+ } -+ }; ++ private final List highPriorityQueue = new ArrayList(); ++ private final ReentrantLock writeLock = new ReentrantLock(); ++ private Runnable packetDispatcher; ++ private ScheduledFuture scheduledTask; + private volatile boolean connected; + private Channel channel; + private SocketAddress address; @@ -569,6 +556,7 @@ index 0000000..0e1b1fd + // Check the throttle + if (serverConnection.throttle(((InetSocketAddress) channel.remoteAddress()).getAddress())) { + channel.close(); ++ return; + } + // Then the socket adaptor + socketAdaptor = NettySocketAdaptor.adapt((SocketChannel) channel); @@ -577,10 +565,55 @@ index 0000000..0e1b1fd + // Finally register the connection + connected = true; + serverConnection.register((PendingConnection) connection); ++ // And register our send dispatcher ++ packetDispatcher = new Runnable() { ++ public void run() { ++ // Ensure exclusive access to the queue ++ if (!writeLock.isHeldByCurrentThread()) { ++ writeLock.lock(); ++ } ++ try { ++ if (highPriorityQueue.size() == 0) { ++ return; ++ } ++ // Try and get a bearing on the size ++ int estimatedSize = 0; ++ for (Packet packet : highPriorityQueue) { ++ estimatedSize += packet.a(); ++ } ++ // Allocate a buffer ++ ByteBuf buf = channel.alloc().directBuffer(estimatedSize); ++ // And an outputstream to this buffer ++ DataOutputStream out = new DataOutputStream(new ByteBufOutputStream(buf)); ++ // Loop through all packets ++ for (Packet packet : highPriorityQueue) { ++ // Write packet ID ++ buf.writeByte(packet.n()); ++ try { ++ // Write actual packet ++ packet.a(out); ++ } catch (IOException ex) { ++ // Catch exception in case it ever happens (should never) ++ a("disconnect.genericReason", new Object[]{"Exception writing packet: " + ex}); ++ } ++ } ++ // Clear existing packets so we can unlock our lock ++ highPriorityQueue.clear(); ++ // Send the whole buffer down ++ writtenBytes += buf.readableBytes(); ++ channel.write(buf); ++ } finally { ++ writeLock.unlock(); ++ } ++ } ++ }; ++ scheduledTask = ctx.executor().scheduleWithFixedDelay(packetDispatcher, 10, 10, TimeUnit.MILLISECONDS); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { ++ // Cleanup the timer task ++ scheduledTask.cancel(false); + a("disconnect.endOfStream", new Object[0]); + } + @@ -642,18 +675,24 @@ index 0000000..0e1b1fd + packet = PacketListener.callQueued(this, connection, packet); + // If handler indicates packet send + if (packet != null) { -+ highPriorityQueue.add(packet); -+ -+ // If needed, check and prepare encryption phase -+ // We don't send the packet here as it is sent just before the cipher handler has been added to ensure we can safeguard from any race conditions -+ // Which are caused by the slow first initialization of the cipher SPI -+ if (packet instanceof Packet252KeyResponse) { -+ Cipher encrypt = NettyServerConnection.getCipher(Cipher.ENCRYPT_MODE, secret); -+ Cipher decrypt = NettyServerConnection.getCipher(Cipher.DECRYPT_MODE, secret); -+ CipherCodec codec = new CipherCodec(encrypt, decrypt, (Packet252KeyResponse) packet); -+ channel.pipeline().addBefore("decoder", "cipher", codec); -+ } else { -+ channel.write(packet); ++ // Aquire lock ++ writeLock.lock(); ++ try { ++ highPriorityQueue.add(packet); ++ // If needed, check and prepare encryption phase ++ if (packet instanceof Packet252KeyResponse) { ++ Cipher encrypt = NettyServerConnection.getCipher(Cipher.ENCRYPT_MODE, secret); ++ Cipher decrypt = NettyServerConnection.getCipher(Cipher.DECRYPT_MODE, secret); ++ CipherCodec codec = new CipherCodec(encrypt, decrypt); ++ // Flush send queue ++ packetDispatcher.run(); ++ channel.pipeline().addBefore("decoder", "cipher", codec); ++ } ++ } finally { ++ // If we still have a lock, we need to get ri ++ if (writeLock.isHeldByCurrentThread()) { ++ writeLock.unlock(); ++ } + } + } + } @@ -709,6 +748,8 @@ index 0000000..0e1b1fd + public void d() { + if (connected) { + connected = false; ++ // Send all pending packets ++ packetDispatcher.run(); + channel.close(); + } + } @@ -742,17 +783,13 @@ index 0000000..0e1b1fd + public long getWrittenBytes() { + return writtenBytes; + } -+ -+ public void addWrittenBytes(int written) { -+ writtenBytes += written; -+ } +} diff --git a/src/main/java/org/spigotmc/netty/NettyServerConnection.java b/src/main/java/org/spigotmc/netty/NettyServerConnection.java new file mode 100644 -index 0000000..e5d24f7 +index 0000000..9ad9c52 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/NettyServerConnection.java -@@ -0,0 +1,90 @@ +@@ -0,0 +1,79 @@ +package org.spigotmc.netty; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -768,17 +805,10 @@ index 0000000..e5d24f7 +import java.net.InetAddress; +import java.security.GeneralSecurityException; +import java.security.Key; -+import java.util.ArrayList; -+import java.util.Collections; -+import java.util.HashMap; -+import java.util.List; -+import java.util.logging.Level; +import javax.crypto.Cipher; +import javax.crypto.spec.IvParameterSpec; +import net.minecraft.server.MinecraftServer; -+import net.minecraft.server.PendingConnection; +import net.minecraft.server.ServerConnection; -+import org.bukkit.Bukkit; + +/** + * This is the NettyServerConnection class. It implements @@ -790,8 +820,6 @@ index 0000000..e5d24f7 + + private final ChannelFuture socket; + -+ -+ + public NettyServerConnection(MinecraftServer ms, InetAddress host, int port) { + super(ms); + int threads = Integer.getInteger("org.spigotmc.netty.threads", 3); @@ -808,14 +836,12 @@ index 0000000..e5d24f7 + ch.pipeline() + .addLast("timer", new ReadTimeoutHandler(30)) + .addLast("decoder", new PacketDecoder()) -+ .addLast("encoder", new PacketEncoder(networkManager)) + .addLast("manager", networkManager); + } + }).group(new NioEventLoopGroup(threads, new ThreadFactoryBuilder().setNameFormat("Netty IO Thread - %1$d").build())).localAddress(host, port).bind(); + MinecraftServer.getServer().getLogger().info("Using Netty NIO with " + threads + " threads for network connections."); + } + -+ + /** + * Shutdown. This method is called when the server is shutting down and the + * server socket and all clients should be terminated with no further @@ -1167,62 +1193,6 @@ index 0000000..65074d2 + packet = null; + } +} -diff --git a/src/main/java/org/spigotmc/netty/PacketEncoder.java b/src/main/java/org/spigotmc/netty/PacketEncoder.java -new file mode 100644 -index 0000000..c8832d6 ---- /dev/null -+++ b/src/main/java/org/spigotmc/netty/PacketEncoder.java -@@ -0,0 +1,50 @@ -+package org.spigotmc.netty; -+ -+import io.netty.buffer.ByteBuf; -+import io.netty.buffer.ByteBufOutputStream; -+import io.netty.channel.ChannelHandlerContext; -+import io.netty.handler.codec.MessageToByteEncoder; -+import java.io.DataOutputStream; -+import net.minecraft.server.Packet; -+ -+/** -+ * Netty encoder which takes a packet and encodes it, and adds a byte packet id -+ * header. -+ */ -+public class PacketEncoder extends MessageToByteEncoder { -+ -+ private ByteBuf outBuf; -+ private DataOutputStream dataOut; -+ private final NettyNetworkManager networkManager; -+ -+ public PacketEncoder(NettyNetworkManager networkManager) { -+ this.networkManager = networkManager; -+ } -+ -+ @Override -+ public void encode(ChannelHandlerContext ctx, Packet msg, ByteBuf out) throws Exception { -+ if (outBuf == null) { -+ outBuf = ctx.alloc().directBuffer(); -+ } -+ if (dataOut == null) { -+ dataOut = new DataOutputStream(new ByteBufOutputStream(outBuf)); -+ } -+ -+ out.writeByte(msg.n()); -+ msg.a(dataOut); -+ -+ networkManager.addWrittenBytes(outBuf.readableBytes()); -+ out.writeBytes(outBuf); -+ out.discardSomeReadBytes(); -+ } -+ -+ @Override -+ public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception { -+ super.freeOutboundBuffer(ctx); -+ if (outBuf != null) { -+ outBuf.release(); -+ outBuf = null; -+ } -+ dataOut = null; -+ } -+} diff --git a/src/main/java/org/spigotmc/netty/PacketListener.java b/src/main/java/org/spigotmc/netty/PacketListener.java new file mode 100644 index 0000000..8e3b932