From f6de3363073c06ad3237d9b46b9750eef58930aa Mon Sep 17 00:00:00 2001 From: Spigot Date: Sun, 21 Apr 2013 08:39:39 +1000 Subject: [PATCH] Revert "Dramatically reduce the idle network activity by collecting and framing packets for a max of 10 milliseconds. This reduces bandwidth consumption as much as possible whilst not generating network garbage and other nasties." This reverts commit bde545c82a8fb8117bf04d24b2f62ede22d512a4. By: md_5 --- CraftBukkit-Patches/0021-Netty.patch | 202 +++++++++++++++------------ 1 file changed, 116 insertions(+), 86 deletions(-) diff --git a/CraftBukkit-Patches/0021-Netty.patch b/CraftBukkit-Patches/0021-Netty.patch index fa500a95a5..7723ae830f 100644 --- a/CraftBukkit-Patches/0021-Netty.patch +++ b/CraftBukkit-Patches/0021-Netty.patch @@ -1,4 +1,4 @@ -From c9f5ce7c8d93b8ae0346eaad87e2d272b0f2a462 Mon Sep 17 00:00:00 2001 +From efe9a9aecb849b6886372c7d9445cd79dd706687 Mon Sep 17 00:00:00 2001 From: md_5 Date: Fri, 19 Apr 2013 17:44:39 +1000 Subject: [PATCH] Netty @@ -417,10 +417,10 @@ index 0000000..c8ea80a +} diff --git a/src/main/java/org/spigotmc/netty/CipherCodec.java b/src/main/java/org/spigotmc/netty/CipherCodec.java new file mode 100644 -index 0000000..5e3a5f9 +index 0000000..2dbbf6c --- /dev/null +++ b/src/main/java/org/spigotmc/netty/CipherCodec.java -@@ -0,0 +1,59 @@ +@@ -0,0 +1,67 @@ +package org.spigotmc.netty; + +import io.netty.buffer.ByteBuf; @@ -428,6 +428,7 @@ index 0000000..5e3a5f9 +import io.netty.handler.codec.ByteToByteCodec; +import javax.crypto.Cipher; +import javax.crypto.ShortBufferException; ++import net.minecraft.server.Packet252KeyResponse; + +/** + * This class is a complete solution for encrypting and decoding bytes in a @@ -438,6 +439,7 @@ index 0000000..5e3a5f9 + + private Cipher encrypt; + private Cipher decrypt; ++ private Packet252KeyResponse responsePacket; + private ThreadLocal heapInLocal = new EmptyByteThreadLocal(); + private ThreadLocal heapOutLocal = new EmptyByteThreadLocal(); + @@ -449,9 +451,15 @@ index 0000000..5e3a5f9 + } + } + -+ public CipherCodec(Cipher encrypt, Cipher decrypt) { ++ public CipherCodec(Cipher encrypt, Cipher decrypt, Packet252KeyResponse responsePacket) { + this.encrypt = encrypt; + this.decrypt = decrypt; ++ this.responsePacket = responsePacket; ++ } ++ ++ @Override ++ public void beforeAdd(ChannelHandlerContext ctx) throws Exception { ++ ctx.channel().write(responsePacket); + } + + @Override @@ -482,34 +490,27 @@ index 0000000..5e3a5f9 +} diff --git a/src/main/java/org/spigotmc/netty/NettyNetworkManager.java b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java new file mode 100644 -index 0000000..2036b8e +index 0000000..0e1b1fd --- /dev/null +++ b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java -@@ -0,0 +1,298 @@ +@@ -0,0 +1,253 @@ +package org.spigotmc.netty; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; -+import io.netty.buffer.ByteBuf; -+import io.netty.buffer.ByteBufOutputStream; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundMessageHandlerAdapter; +import io.netty.channel.socket.SocketChannel; -+import io.netty.util.concurrent.ScheduledFuture; -+import java.io.DataOutputStream; -+import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.security.PrivateKey; -+import java.util.ArrayList; ++import java.util.AbstractList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; -+import java.util.concurrent.TimeUnit; -+import java.util.concurrent.locks.ReentrantLock; +import javax.crypto.Cipher; +import javax.crypto.SecretKey; +import net.minecraft.server.Connection; @@ -534,10 +535,22 @@ index 0000000..2036b8e + private static final MultiplexingServerConnection serverConnection = (MultiplexingServerConnection) server.ae(); + /*========================================================================*/ + private final Queue syncPackets = new ConcurrentLinkedQueue(); -+ private final List highPriorityQueue = new ArrayList(); -+ private final ReentrantLock writeLock = new ReentrantLock(); -+ private Runnable packetDispatcher; -+ private ScheduledFuture scheduledTask; ++ private final List highPriorityQueue = new AbstractList() { ++ @Override ++ public void add(int index, Packet element) { ++ // NOP ++ } ++ ++ @Override ++ public Packet get(int index) { ++ throw new UnsupportedOperationException(); ++ } ++ ++ @Override ++ public int size() { ++ return 0; ++ } ++ }; + private volatile boolean connected; + private Channel channel; + private SocketAddress address; @@ -556,7 +569,6 @@ index 0000000..2036b8e + // Check the throttle + if (serverConnection.throttle(((InetSocketAddress) channel.remoteAddress()).getAddress())) { + channel.close(); -+ return; + } + // Then the socket adaptor + socketAdaptor = NettySocketAdaptor.adapt((SocketChannel) channel); @@ -565,55 +577,10 @@ index 0000000..2036b8e + // Finally register the connection + connected = true; + serverConnection.register((PendingConnection) connection); -+ // And register our send dispatcher -+ packetDispatcher = new Runnable() { -+ public void run() { -+ // Ensure exclusive access to the queue -+ if (!writeLock.isHeldByCurrentThread()) { -+ writeLock.lock(); -+ } -+ try { -+ if (highPriorityQueue.size() == 0) { -+ return; -+ } -+ // Try and get a bearing on the size -+ int estimatedSize = 0; -+ for (Packet packet : highPriorityQueue) { -+ estimatedSize += packet.a(); -+ } -+ // Allocate a buffer -+ ByteBuf buf = channel.alloc().directBuffer(estimatedSize); -+ // And an outputstream to this buffer -+ DataOutputStream out = new DataOutputStream(new ByteBufOutputStream(buf)); -+ // Loop through all packets -+ for (Packet packet : highPriorityQueue) { -+ // Write packet ID -+ buf.writeByte(packet.n()); -+ try { -+ // Write actual packet -+ packet.a(out); -+ } catch (IOException ex) { -+ // Catch exception in case it ever happens (should never) -+ a("disconnect.genericReason", new Object[]{"Exception writing packet: " + ex}); -+ } -+ } -+ // Clear existing packets so we can unlock our lock -+ highPriorityQueue.clear(); -+ // Send the whole buffer down -+ writtenBytes += buf.readableBytes(); -+ channel.write(buf); -+ } finally { -+ writeLock.unlock(); -+ } -+ } -+ }; -+ scheduledTask = ctx.executor().scheduleWithFixedDelay(packetDispatcher, 10, 10, TimeUnit.MILLISECONDS); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { -+ // Cleanup the timer task -+ scheduledTask.cancel(false); + a("disconnect.endOfStream", new Object[0]); + } + @@ -675,24 +642,18 @@ index 0000000..2036b8e + packet = PacketListener.callQueued(this, connection, packet); + // If handler indicates packet send + if (packet != null) { -+ // Aquire lock -+ writeLock.lock(); -+ try { -+ highPriorityQueue.add(packet); -+ // If needed, check and prepare encryption phase -+ if (packet instanceof Packet252KeyResponse) { -+ Cipher encrypt = NettyServerConnection.getCipher(Cipher.ENCRYPT_MODE, secret); -+ Cipher decrypt = NettyServerConnection.getCipher(Cipher.DECRYPT_MODE, secret); -+ CipherCodec codec = new CipherCodec(encrypt, decrypt); -+ // Flush send queue -+ packetDispatcher.run(); -+ channel.pipeline().addBefore("decoder", "cipher", codec); -+ } -+ } finally { -+ // If we still have a lock, we need to get ri -+ if (writeLock.isHeldByCurrentThread()) { -+ writeLock.unlock(); -+ } ++ highPriorityQueue.add(packet); ++ ++ // If needed, check and prepare encryption phase ++ // We don't send the packet here as it is sent just before the cipher handler has been added to ensure we can safeguard from any race conditions ++ // Which are caused by the slow first initialization of the cipher SPI ++ if (packet instanceof Packet252KeyResponse) { ++ Cipher encrypt = NettyServerConnection.getCipher(Cipher.ENCRYPT_MODE, secret); ++ Cipher decrypt = NettyServerConnection.getCipher(Cipher.DECRYPT_MODE, secret); ++ CipherCodec codec = new CipherCodec(encrypt, decrypt, (Packet252KeyResponse) packet); ++ channel.pipeline().addBefore("decoder", "cipher", codec); ++ } else { ++ channel.write(packet); + } + } + } @@ -748,8 +709,6 @@ index 0000000..2036b8e + public void d() { + if (connected) { + connected = false; -+ // Send all pending packets -+ packetDispatcher.run(); + channel.close(); + } + } @@ -783,13 +742,17 @@ index 0000000..2036b8e + public long getWrittenBytes() { + return writtenBytes; + } ++ ++ public void addWrittenBytes(int written) { ++ writtenBytes += written; ++ } +} diff --git a/src/main/java/org/spigotmc/netty/NettyServerConnection.java b/src/main/java/org/spigotmc/netty/NettyServerConnection.java new file mode 100644 -index 0000000..9ad9c52 +index 0000000..e5d24f7 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/NettyServerConnection.java -@@ -0,0 +1,79 @@ +@@ -0,0 +1,90 @@ +package org.spigotmc.netty; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -805,10 +768,17 @@ index 0000000..9ad9c52 +import java.net.InetAddress; +import java.security.GeneralSecurityException; +import java.security.Key; ++import java.util.ArrayList; ++import java.util.Collections; ++import java.util.HashMap; ++import java.util.List; ++import java.util.logging.Level; +import javax.crypto.Cipher; +import javax.crypto.spec.IvParameterSpec; +import net.minecraft.server.MinecraftServer; ++import net.minecraft.server.PendingConnection; +import net.minecraft.server.ServerConnection; ++import org.bukkit.Bukkit; + +/** + * This is the NettyServerConnection class. It implements @@ -820,6 +790,8 @@ index 0000000..9ad9c52 + + private final ChannelFuture socket; + ++ ++ + public NettyServerConnection(MinecraftServer ms, InetAddress host, int port) { + super(ms); + int threads = Integer.getInteger("org.spigotmc.netty.threads", 3); @@ -836,12 +808,14 @@ index 0000000..9ad9c52 + ch.pipeline() + .addLast("timer", new ReadTimeoutHandler(30)) + .addLast("decoder", new PacketDecoder()) ++ .addLast("encoder", new PacketEncoder(networkManager)) + .addLast("manager", networkManager); + } + }).group(new NioEventLoopGroup(threads, new ThreadFactoryBuilder().setNameFormat("Netty IO Thread - %1$d").build())).localAddress(host, port).bind(); + MinecraftServer.getServer().getLogger().info("Using Netty NIO with " + threads + " threads for network connections."); + } + ++ + /** + * Shutdown. This method is called when the server is shutting down and the + * server socket and all clients should be terminated with no further @@ -1193,6 +1167,62 @@ index 0000000..65074d2 + packet = null; + } +} +diff --git a/src/main/java/org/spigotmc/netty/PacketEncoder.java b/src/main/java/org/spigotmc/netty/PacketEncoder.java +new file mode 100644 +index 0000000..c8832d6 +--- /dev/null ++++ b/src/main/java/org/spigotmc/netty/PacketEncoder.java +@@ -0,0 +1,50 @@ ++package org.spigotmc.netty; ++ ++import io.netty.buffer.ByteBuf; ++import io.netty.buffer.ByteBufOutputStream; ++import io.netty.channel.ChannelHandlerContext; ++import io.netty.handler.codec.MessageToByteEncoder; ++import java.io.DataOutputStream; ++import net.minecraft.server.Packet; ++ ++/** ++ * Netty encoder which takes a packet and encodes it, and adds a byte packet id ++ * header. ++ */ ++public class PacketEncoder extends MessageToByteEncoder { ++ ++ private ByteBuf outBuf; ++ private DataOutputStream dataOut; ++ private final NettyNetworkManager networkManager; ++ ++ public PacketEncoder(NettyNetworkManager networkManager) { ++ this.networkManager = networkManager; ++ } ++ ++ @Override ++ public void encode(ChannelHandlerContext ctx, Packet msg, ByteBuf out) throws Exception { ++ if (outBuf == null) { ++ outBuf = ctx.alloc().directBuffer(); ++ } ++ if (dataOut == null) { ++ dataOut = new DataOutputStream(new ByteBufOutputStream(outBuf)); ++ } ++ ++ out.writeByte(msg.n()); ++ msg.a(dataOut); ++ ++ networkManager.addWrittenBytes(outBuf.readableBytes()); ++ out.writeBytes(outBuf); ++ out.discardSomeReadBytes(); ++ } ++ ++ @Override ++ public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception { ++ super.freeOutboundBuffer(ctx); ++ if (outBuf != null) { ++ outBuf.release(); ++ outBuf = null; ++ } ++ dataOut = null; ++ } ++} diff --git a/src/main/java/org/spigotmc/netty/PacketListener.java b/src/main/java/org/spigotmc/netty/PacketListener.java new file mode 100644 index 0000000..8e3b932