From 7acc085a95961261de6b3e68d46ec46e974a2c3b Mon Sep 17 00:00:00 2001 From: md_5 Date: Tue, 28 Jan 2014 20:32:07 +1100 Subject: [PATCH] Implement Threaded Bulk Chunk Compression and Caching diff --git a/src/main/java/net/minecraft/server/PacketPlayOutMapChunkBulk.java b/src/main/java/net/minecraft/server/PacketPlayOutMapChunkBulk.java index 30bf8a7..e05c870 100644 --- a/src/main/java/net/minecraft/server/PacketPlayOutMapChunkBulk.java +++ b/src/main/java/net/minecraft/server/PacketPlayOutMapChunkBulk.java @@ -8,13 +8,13 @@ import java.util.zip.Inflater; public class PacketPlayOutMapChunkBulk extends Packet { - private int[] a; - private int[] b; + public int[] a; // Spigot + public int[] b; // Spigot private int[] c; private int[] d; - private byte[] buffer; + public byte[] buffer; // Spigot private byte[][] inflatedBuffers; - private int size; + public int size; // Spigot private boolean h; private byte[] buildBuffer = new byte[0]; // CraftBukkit - remove static // CraftBukkit start @@ -174,7 +174,7 @@ public class PacketPlayOutMapChunkBulk extends Packet { } public void b(PacketDataSerializer packetdataserializer) throws IOException { // CraftBukkit - throws IOException - compress(); // CraftBukkit + // compress(); // CraftBukkit // Spigot - removed packetdataserializer.writeShort(this.a.length); packetdataserializer.writeInt(this.size); packetdataserializer.writeBoolean(this.h); diff --git a/src/main/java/net/minecraft/server/ServerConnectionChannel.java b/src/main/java/net/minecraft/server/ServerConnectionChannel.java index fb95be4..2875c94 100644 --- a/src/main/java/net/minecraft/server/ServerConnectionChannel.java +++ b/src/main/java/net/minecraft/server/ServerConnectionChannel.java @@ -1,15 +1,26 @@ package net.minecraft.server; +import com.google.common.util.concurrent.ThreadFactoryBuilder; // Spigot import net.minecraft.util.io.netty.channel.Channel; import net.minecraft.util.io.netty.channel.ChannelException; import net.minecraft.util.io.netty.channel.ChannelInitializer; import net.minecraft.util.io.netty.channel.ChannelOption; import net.minecraft.util.io.netty.handler.timeout.ReadTimeoutHandler; +// Spigot Start +import net.minecraft.util.io.netty.util.concurrent.DefaultEventExecutorGroup; +import net.minecraft.util.io.netty.util.concurrent.EventExecutorGroup; +import org.spigotmc.ChunkCompressor; +import org.spigotmc.SpigotConfig; +// Spigot End class ServerConnectionChannel extends ChannelInitializer { final ServerConnection a; - + // Spigot Start + private static final EventExecutorGroup threadPool = new DefaultEventExecutorGroup( SpigotConfig.compressionThreads, new ThreadFactoryBuilder().setNameFormat( "Chunk Compressor #%d" ).setDaemon( true ).build() ); + private static final ChunkCompressor chunkCompressor = new ChunkCompressor(); + // Spigot End + ServerConnectionChannel(ServerConnection serverconnection) { this.a = serverconnection; } @@ -27,7 +38,8 @@ class ServerConnectionChannel extends ChannelInitializer { ; } - channel.pipeline().addLast("timeout", new ReadTimeoutHandler(30)).addLast("legacy_query", new LegacyPingHandler(this.a)).addLast("splitter", new PacketSplitter()).addLast("decoder", new PacketDecoder()).addLast("prepender", new PacketPrepender()).addLast("encoder", new PacketEncoder()); + channel.pipeline().addLast("timeout", new ReadTimeoutHandler(30)).addLast("legacy_query", new LegacyPingHandler(this.a)).addLast("splitter", new PacketSplitter()).addLast("decoder", new PacketDecoder()).addLast("prepender", new PacketPrepender()).addLast("encoder", new PacketEncoder()) + .addLast( threadPool, "compressor", chunkCompressor ); // Spigot NetworkManager networkmanager = new NetworkManager(false); ServerConnection.a(this.a).add(networkmanager); diff --git a/src/main/java/org/spigotmc/ChunkCompressor.java b/src/main/java/org/spigotmc/ChunkCompressor.java new file mode 100644 index 0000000..023900f --- /dev/null +++ b/src/main/java/org/spigotmc/ChunkCompressor.java @@ -0,0 +1,65 @@ +package org.spigotmc; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; +import net.minecraft.server.PacketPlayOutMapChunkBulk; +import net.minecraft.util.io.netty.channel.ChannelHandler; +import net.minecraft.util.io.netty.channel.ChannelHandlerContext; +import net.minecraft.util.io.netty.channel.ChannelOutboundHandlerAdapter; +import net.minecraft.util.io.netty.channel.ChannelPromise; + +@ChannelHandler.Sharable +public class ChunkCompressor extends ChannelOutboundHandlerAdapter +{ + + private final LinkedHashMap cache = new LinkedHashMap( 16, 0.75f, true ); // Defaults, order by access + private volatile int cacheSize; + + @Override + public synchronized void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception + { + long start = System.currentTimeMillis(); + boolean cached = false; + if ( msg instanceof PacketPlayOutMapChunkBulk ) + { + PacketPlayOutMapChunkBulk chunk = (PacketPlayOutMapChunkBulk) msg; + // Here we assign a hash to the chunk based on the array of its coordinates: x1, z1, x2, z2, x3, z3 etc etc + int[] series = new int[ chunk.a.length * 2 ]; + int pos = 0; // TODO: Can this be determined mathematically? + for ( int i = 0; i < chunk.a.length; i++ ) + { + series[pos++] = chunk.a[i]; + series[pos++] = chunk.b[i]; + } + int hash = Arrays.hashCode( series ); + + byte[] deflated = cache.get( hash ); + if ( deflated != null ) + { + chunk.buffer = deflated; + chunk.size = deflated.length; + cached = true; + } else + { + chunk.compress(); // Compress the chunk + byte[] buffer = Arrays.copyOf( chunk.buffer, chunk.size ); // Resize the array to correct sizing + + Iterator> iter = cache.entrySet().iterator(); // Grab a single iterator reference + // Whilst this next entry is too big for us, and we have stuff to remove + while ( cacheSize + buffer.length > org.spigotmc.SpigotConfig.chunkCacheBytes && iter.hasNext() ) + { + Map.Entry entry = iter.next(); // Grab entry + cacheSize -= entry.getValue().length; // Update size table + iter.remove(); // Remove it alltogether + } + cacheSize += buffer.length; // Update size table + cache.put( hash, buffer ); // Pop the new one in the cache + } + // System.out.println( "Time: " + ( System.currentTimeMillis() - start ) + " " + cached + " " + cacheSize ); + } + + super.write( ctx, msg, promise ); + } +} diff --git a/src/main/java/org/spigotmc/SpigotConfig.java b/src/main/java/org/spigotmc/SpigotConfig.java index 552266b..e6d414a 100755 --- a/src/main/java/org/spigotmc/SpigotConfig.java +++ b/src/main/java/org/spigotmc/SpigotConfig.java @@ -266,4 +266,16 @@ public class SpigotConfig { playerShuffle = getInt( "settings.player-shuffle", 0 ); } + + public static int compressionThreads; + public static int chunkCacheBytes; + private static void chunkStuff() + { + compressionThreads = getInt( "settings.compression-threads", 4 ); + Bukkit.getLogger().log( Level.INFO, "Using {0} threads for async chunk compression", compressionThreads ); + + chunkCacheBytes = getInt( "settings.compressed-chunk-cache", 64 ) << 20; + Bukkit.getLogger().log( Level.INFO, "Reserving {0} bytes for compressed chunk cache", chunkCacheBytes ); + + } } -- 1.8.3.2