From 6ae23e3f033e8dcbf49d4d7067062d322a9a3c7f Mon Sep 17 00:00:00 2001
From: Rigby <rigby@onarandombox.com>
Date: Tue, 26 Jul 2011 17:03:52 +0100
Subject: [PATCH] Chunk Compression on seperate thread. Thanks to Zeerix.

---
 .../net/minecraft/server/EntityPlayer.java    |   3 +-
 .../minecraft/server/NetServerHandler.java    |   5 +
 .../minecraft/server/Packet51MapChunk.java    |   7 +-
 .../craftbukkit/ChunkCompressionThread.java   | 135 ++++++++++++++++++
 .../org/bukkit/craftbukkit/CraftServer.java   |   2 +
 5 files changed, 149 insertions(+), 3 deletions(-)
 create mode 100644 src/main/java/org/bukkit/craftbukkit/ChunkCompressionThread.java

diff --git a/src/main/java/net/minecraft/server/EntityPlayer.java b/src/main/java/net/minecraft/server/EntityPlayer.java
index 04628ddb24..6481e0d05b 100644
--- a/src/main/java/net/minecraft/server/EntityPlayer.java
+++ b/src/main/java/net/minecraft/server/EntityPlayer.java
@@ -7,6 +7,7 @@ import java.util.Set;
 
 // CraftBukkit start
 import org.bukkit.Bukkit;
+import org.bukkit.craftbukkit.ChunkCompressionThread;
 import org.bukkit.craftbukkit.CraftWorld;
 import org.bukkit.craftbukkit.inventory.CraftItemStack;
 import org.bukkit.event.entity.EntityDeathEvent;
@@ -209,7 +210,7 @@ public class EntityPlayer extends EntityHuman implements ICrafting {
             if (chunkcoordintpair != null) {
                 boolean flag1 = false;
 
-                if (this.netServerHandler.b() < 4) {
+                if (this.netServerHandler.b() + ChunkCompressionThread.getPlayerQueueSize(this) < 4) { // CraftBukkit - Add check against Chunk Packets in the ChunkCompressionThread.
                     flag1 = true;
                 }
 
diff --git a/src/main/java/net/minecraft/server/NetServerHandler.java b/src/main/java/net/minecraft/server/NetServerHandler.java
index 4744063d56..43b4564782 100644
--- a/src/main/java/net/minecraft/server/NetServerHandler.java
+++ b/src/main/java/net/minecraft/server/NetServerHandler.java
@@ -7,6 +7,7 @@ import java.util.logging.Logger;
 
 // CraftBukkit start
 import org.bukkit.ChatColor;
+import org.bukkit.craftbukkit.ChunkCompressionThread;
 import org.bukkit.craftbukkit.command.ColouredConsoleSender;
 import org.bukkit.Location;
 import org.bukkit.command.CommandException;
@@ -629,6 +630,10 @@ public class NetServerHandler extends NetHandler implements ICommandListener {
                 this.networkManager.queue(new Packet3Chat(line));
             }
             packet = null;
+        } else if (packet.k == true) {
+            // Reroute all low-priority packets through to compression thread.
+            ChunkCompressionThread.sendPacket(this.player, packet);
+            packet = null;
         }
         if (packet != null) this.networkManager.queue(packet);
         // CraftBukkit end
diff --git a/src/main/java/net/minecraft/server/Packet51MapChunk.java b/src/main/java/net/minecraft/server/Packet51MapChunk.java
index c2ca83bdd2..3bf163f621 100644
--- a/src/main/java/net/minecraft/server/Packet51MapChunk.java
+++ b/src/main/java/net/minecraft/server/Packet51MapChunk.java
@@ -16,7 +16,8 @@ public class Packet51MapChunk extends Packet {
     public int e;
     public int f;
     public byte[] g;
-    private int h;
+    public int h; // CraftBukkit - private -> public
+    public byte[] rawData; // CraftBukkit
 
     public Packet51MapChunk() {
         this.k = true;
@@ -36,6 +37,7 @@ public class Packet51MapChunk extends Packet {
         this.d = l;
         this.e = i1;
         this.f = j1;
+        /* CraftBukkit - Moved compression into its own method.
         byte[] abyte = data; // CraftBukkit - uses data from above constructor
         Deflater deflater = new Deflater(-1);
 
@@ -46,7 +48,8 @@ public class Packet51MapChunk extends Packet {
             this.h = deflater.deflate(this.g);
         } finally {
             deflater.end();
-        }
+        }*/
+        this.rawData = data; // CraftBukkit
     }
 
     public void a(DataInputStream datainputstream) throws IOException { // CraftBukkit - throws IOEXception
diff --git a/src/main/java/org/bukkit/craftbukkit/ChunkCompressionThread.java b/src/main/java/org/bukkit/craftbukkit/ChunkCompressionThread.java
new file mode 100644
index 0000000000..7490002f6d
--- /dev/null
+++ b/src/main/java/org/bukkit/craftbukkit/ChunkCompressionThread.java
@@ -0,0 +1,135 @@
+package org.bukkit.craftbukkit;
+
+import java.util.HashMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.zip.Deflater;
+
+import net.minecraft.server.EntityPlayer;
+import net.minecraft.server.Packet;
+import net.minecraft.server.Packet51MapChunk;
+
+public final class ChunkCompressionThread implements Runnable {
+
+    private static final ChunkCompressionThread instance = new ChunkCompressionThread();
+    private static boolean isRunning = false;
+
+    private final int QUEUE_CAPACITY = 1024 * 10;
+    private final HashMap<EntityPlayer, Integer> queueSizePerPlayer = new HashMap<EntityPlayer, Integer>();
+    private final BlockingQueue<QueuedPacket> packetQueue = new LinkedBlockingQueue<QueuedPacket>(QUEUE_CAPACITY);
+
+    private final int CHUNK_SIZE = 16 * 128 * 16 * 5 / 2;
+    private final int REDUCED_DEFLATE_THRESHOLD = CHUNK_SIZE / 4;
+    private final int DEFLATE_LEVEL_CHUNKS = 6;
+    private final int DEFLATE_LEVEL_PARTS = 1;
+
+    private final Deflater deflater = new Deflater();
+    private byte[] deflateBuffer = new byte[CHUNK_SIZE + 100];
+
+    public static void startThread() {
+        if (!isRunning) {
+            isRunning = true;
+            new Thread(instance).start();
+        }
+    }
+
+    public void run() {
+        while (true) {
+            try {
+                handleQueuedPacket(packetQueue.take());
+            } catch (InterruptedException ie) {
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private void handleQueuedPacket(QueuedPacket queuedPacket) {
+        addToPlayerQueueSize(queuedPacket.player, -1);
+        // Compress the packet if necessary.
+        if (queuedPacket.compress) {
+            handleMapChunk(queuedPacket);
+        }
+        sendToNetworkQueue(queuedPacket);
+    }
+
+    private void handleMapChunk(QueuedPacket queuedPacket) {
+        Packet51MapChunk packet = (Packet51MapChunk) queuedPacket.packet;
+
+        // If 'packet.g' is set then this packet has already been compressed.
+        if (packet.g != null) {
+            return;
+        }
+
+        int dataSize = packet.rawData.length;
+        if (deflateBuffer.length < dataSize + 100) {
+            deflateBuffer = new byte[dataSize + 100];
+        }
+
+        deflater.reset();
+        deflater.setLevel(dataSize < REDUCED_DEFLATE_THRESHOLD ? DEFLATE_LEVEL_PARTS : DEFLATE_LEVEL_CHUNKS);
+        deflater.setInput(packet.rawData);
+        deflater.finish();
+        int size = deflater.deflate(deflateBuffer);
+        if (size == 0) {
+            size = deflater.deflate(deflateBuffer);
+        }
+
+        // copy compressed data to packet
+        packet.g = new byte[size];
+        packet.h = size;
+        System.arraycopy(deflateBuffer, 0, packet.g, 0, size);
+    }
+
+    private void sendToNetworkQueue(QueuedPacket queuedPacket) {
+        queuedPacket.player.netServerHandler.networkManager.queue(queuedPacket.packet);
+    }
+
+    public static void sendPacket(EntityPlayer player, Packet packet) {
+        if (packet instanceof Packet51MapChunk) {
+            // MapChunk Packets need compressing.
+            instance.addQueuedPacket(new QueuedPacket(player, packet, true));
+        } else {
+            // Other Packets don't.
+            instance.addQueuedPacket(new QueuedPacket(player, packet, false));
+        }
+    }
+
+    private void addToPlayerQueueSize(EntityPlayer player, int amount) {
+        synchronized (queueSizePerPlayer) {
+            Integer count = queueSizePerPlayer.get(player);
+            queueSizePerPlayer.put(player, (count == null ? 0 : count) + amount);
+        }
+    }
+
+    public static int getPlayerQueueSize(EntityPlayer player) {
+        synchronized (instance.queueSizePerPlayer) {
+            Integer count = instance.queueSizePerPlayer.get(player);
+            return count == null ? 0 : count;
+        }
+    }
+
+    private void addQueuedPacket(QueuedPacket task) {
+        addToPlayerQueueSize(task.player, +1);
+
+        while (true) {
+            try {
+                packetQueue.put(task);
+                return;
+            } catch (InterruptedException e) {
+            }
+        }
+    }
+
+    private static class QueuedPacket {
+        final EntityPlayer player;
+        final Packet packet;
+        final boolean compress;
+
+        QueuedPacket(EntityPlayer player, Packet packet, boolean compress) {
+            this.player = player;
+            this.packet = packet;
+            this.compress = compress;
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/main/java/org/bukkit/craftbukkit/CraftServer.java b/src/main/java/org/bukkit/craftbukkit/CraftServer.java
index 692fdd6d11..c4a87b8079 100644
--- a/src/main/java/org/bukkit/craftbukkit/CraftServer.java
+++ b/src/main/java/org/bukkit/craftbukkit/CraftServer.java
@@ -96,6 +96,8 @@ public final class CraftServer implements Server {
         loadConfig();
         loadPlugins();
         enablePlugins(PluginLoadOrder.STARTUP);
+
+        ChunkCompressionThread.startThread();
     }
 
     private void loadConfig() {