From 48cf8759487754486c207f668ea1f04163f93f26 Mon Sep 17 00:00:00 2001 From: md_5 Date: Thu, 14 Feb 2013 17:32:20 +1100 Subject: [PATCH] Netty Implement an uber efficient network engine based on the Java NIO framework Netty. This is basically a complete rewrite of the Minecraft network engine with many distinct advantages. First and foremost, there will no longer be the horrid, and redundant case of 2, or even at times, 3 threads per a connection. Instead low level select/epoll based NIO is used. The number of threads used for network reading and writing will scale automatically to the number of cores for use on your server. In most cases this will be around 8 threads for a 4 core server, much better than the up to 1000 threads that could be in use at one time with the old engine. To facilitate asynchronous packet sending or receiving (currently only chat), a thread pool of 16 threads is kept handy. == Plugin incompatibilities As a side effect of this change, plugins which rely on very specific implementation level details within Minecraft are broken. At this point in time, TagAPI and ProtocolLib are affected. If you are a user of ProtocolLib you are advised to update to the latest build, where full support is enabled. If you are a user of TagAPI, support has not yet been added, so you will need to install the updated ProtocolLib so that TagAPI may use its functions. == Stability The code within this commit has been very lightly tested in production (300 players for approximately 24 hours), however it is not guaranteed to be free from all bugs. If you experence weird connection behaviour, reporting the bug and steps to reproduce are advised. You are also free to downgrade to the latest recommend build, which is guaranteed to be stable. == Summary This commit provides a reduction in threads, which gives the CPU / operating system more time to allocate to the main server threads, as well as various other side benefits such as chat thread pooling and a slight reduction in latency. This commit is licensed under the Creative Commons Attribution-ShareAlike 3.0 Unported license. --- pom.xml | 5 + .../java/net/minecraft/server/DedicatedServer.java | 8 +- .../java/net/minecraft/server/INetworkManager.java | 24 ++ .../net/minecraft/server/Packet51MapChunk.java | 2 +- .../net/minecraft/server/Packet56MapChunkBulk.java | 2 +- .../net/minecraft/server/PendingConnection.java | 11 +- .../net/minecraft/server/ThreadCommandReader.java | 1 + .../net/minecraft/server/ThreadLoginVerifier.java | 1 + src/main/java/org/bukkit/craftbukkit/Spigot.java | 8 + .../craftbukkit/scheduler/CraftScheduler.java | 2 +- src/main/java/org/spigotmc/netty/CipherCodec.java | 67 ++++++ .../org/spigotmc/netty/NettyNetworkManager.java | 229 +++++++++++++++++++ .../org/spigotmc/netty/NettyServerConnection.java | 109 +++++++++ .../org/spigotmc/netty/NettySocketAdaptor.java | 248 +++++++++++++++++++++ .../java/org/spigotmc/netty/PacketDecoder.java | 63 ++++++ .../java/org/spigotmc/netty/PacketEncoder.java | 43 ++++ .../java/org/spigotmc/netty/PacketListener.java | 100 +++++++++ src/main/java/org/spigotmc/netty/ReadState.java | 16 ++ 18 files changed, 931 insertions(+), 8 deletions(-) create mode 100644 src/main/java/net/minecraft/server/INetworkManager.java create mode 100644 src/main/java/org/spigotmc/netty/CipherCodec.java create mode 100644 src/main/java/org/spigotmc/netty/NettyNetworkManager.java create mode 100644 src/main/java/org/spigotmc/netty/NettyServerConnection.java create mode 100644 src/main/java/org/spigotmc/netty/NettySocketAdaptor.java create mode 100644 src/main/java/org/spigotmc/netty/PacketDecoder.java create mode 100644 src/main/java/org/spigotmc/netty/PacketEncoder.java create mode 100644 src/main/java/org/spigotmc/netty/PacketListener.java create mode 100644 src/main/java/org/spigotmc/netty/ReadState.java diff --git a/pom.xml b/pom.xml index f17bd19..6b314ec 100644 --- a/pom.xml +++ b/pom.xml @@ -132,6 +132,11 @@ trove4j 3.0.2 + + io.netty + netty-all + 4.0.0.Beta1 + diff --git a/src/main/java/net/minecraft/server/DedicatedServer.java b/src/main/java/net/minecraft/server/DedicatedServer.java index bd0377a..273b60e 100644 --- a/src/main/java/net/minecraft/server/DedicatedServer.java +++ b/src/main/java/net/minecraft/server/DedicatedServer.java @@ -32,7 +32,7 @@ public class DedicatedServer extends MinecraftServer implements IMinecraftServer public DedicatedServer(joptsimple.OptionSet options) { super(options); // CraftBukkit end - new ThreadSleepForever(this); + // new ThreadSleepForever(this); // Spigot } protected boolean init() throws java.net.UnknownHostException { // CraftBukkit - throws UnknownHostException @@ -93,7 +93,11 @@ public class DedicatedServer extends MinecraftServer implements IMinecraftServer log.info("Starting Minecraft server on " + (this.getServerIp().length() == 0 ? "*" : this.getServerIp()) + ":" + this.G()); try { - this.r = new DedicatedServerConnection(this, inetaddress, this.G()); + // Spigot start + this.r = (org.bukkit.craftbukkit.Spigot.netty) + ? new org.spigotmc.netty.NettyServerConnection(this, inetaddress, this.G()) + : new DedicatedServerConnection(this, inetaddress, this.G()); + // Spigot end } catch (Throwable ioexception) { // CraftBukkit - IOException -> Throwable log.warning("**** FAILED TO BIND TO PORT!"); log.log(Level.WARNING, "The exception was: " + ioexception.toString()); diff --git a/src/main/java/net/minecraft/server/INetworkManager.java b/src/main/java/net/minecraft/server/INetworkManager.java new file mode 100644 index 0000000..ff3daae --- /dev/null +++ b/src/main/java/net/minecraft/server/INetworkManager.java @@ -0,0 +1,24 @@ +package net.minecraft.server; + +import java.net.SocketAddress; + +public interface INetworkManager { + + void a(Connection connection); + + void queue(Packet packet); + + void a(); + + void b(); + + SocketAddress getSocketAddress(); + + void d(); + + int e(); + + void a(String s, Object... aobject); + + java.net.Socket getSocket(); +} diff --git a/src/main/java/net/minecraft/server/Packet51MapChunk.java b/src/main/java/net/minecraft/server/Packet51MapChunk.java index 230dd62..2ba0464 100644 --- a/src/main/java/net/minecraft/server/Packet51MapChunk.java +++ b/src/main/java/net/minecraft/server/Packet51MapChunk.java @@ -42,7 +42,7 @@ public class Packet51MapChunk extends Packet { this.b = chunk.z; this.e = flag; ChunkMap chunkmap = a(chunk, flag, i); - Deflater deflater = new Deflater(-1); + Deflater deflater = new Deflater(4); this.d = chunkmap.c; this.c = chunkmap.b; diff --git a/src/main/java/net/minecraft/server/Packet56MapChunkBulk.java b/src/main/java/net/minecraft/server/Packet56MapChunkBulk.java index 9d5cee7..8486d82 100644 --- a/src/main/java/net/minecraft/server/Packet56MapChunkBulk.java +++ b/src/main/java/net/minecraft/server/Packet56MapChunkBulk.java @@ -24,7 +24,7 @@ public class Packet56MapChunkBulk extends Packet { @Override protected Deflater initialValue() { // Don't use higher compression level, slows things down too much - return new Deflater(6); + return new Deflater(4); } }; // CraftBukkit end diff --git a/src/main/java/net/minecraft/server/PendingConnection.java b/src/main/java/net/minecraft/server/PendingConnection.java index 8413a15..cdd456f 100644 --- a/src/main/java/net/minecraft/server/PendingConnection.java +++ b/src/main/java/net/minecraft/server/PendingConnection.java @@ -17,7 +17,7 @@ public class PendingConnection extends Connection { private byte[] d; public static Logger logger = Logger.getLogger("Minecraft"); private static Random random = new Random(); - public NetworkManager networkManager; + public INetworkManager networkManager; public boolean c = false; private MinecraftServer server; private int g = 0; @@ -28,10 +28,15 @@ public class PendingConnection extends Connection { private SecretKey l = null; public String hostname = ""; // CraftBukkit - add field + public PendingConnection(MinecraftServer minecraftserver, org.spigotmc.netty.NettyNetworkManager networkManager) { + this.server = minecraftserver; + this.networkManager = networkManager; + } + public PendingConnection(MinecraftServer minecraftserver, Socket socket, String s) throws java.io.IOException { // CraftBukkit - throws IOException this.server = minecraftserver; this.networkManager = new NetworkManager(socket, s, this, minecraftserver.F().getPrivate()); - this.networkManager.e = 0; + // this.networkManager.e = 0; } // CraftBukkit start @@ -147,7 +152,7 @@ public class PendingConnection extends Connection { // CraftBukkit org.bukkit.event.server.ServerListPingEvent pingEvent = org.bukkit.craftbukkit.event.CraftEventFactory.callServerListPingEvent(this.server.server, getSocket().getInetAddress(), this.server.getMotd(), playerlist.getPlayerCount(), playerlist.getMaxPlayers()); - if (packet254getinfo.a == 1) { + if (true) { // CraftBukkit start - fix decompile issues, don't create a list from an array Object[] list = new Object[] { 1, 51, this.server.getVersion(), pingEvent.getMotd(), playerlist.getPlayerCount(), pingEvent.getMaxPlayers() }; diff --git a/src/main/java/net/minecraft/server/ThreadCommandReader.java b/src/main/java/net/minecraft/server/ThreadCommandReader.java index 64eaa4c..fbf6fe6 100644 --- a/src/main/java/net/minecraft/server/ThreadCommandReader.java +++ b/src/main/java/net/minecraft/server/ThreadCommandReader.java @@ -11,6 +11,7 @@ class ThreadCommandReader extends Thread { final DedicatedServer server; ThreadCommandReader(DedicatedServer dedicatedserver) { + super("Command Reader Thread"); // Spigot this.server = dedicatedserver; } diff --git a/src/main/java/net/minecraft/server/ThreadLoginVerifier.java b/src/main/java/net/minecraft/server/ThreadLoginVerifier.java index 58d30eb..e4e5049 100644 --- a/src/main/java/net/minecraft/server/ThreadLoginVerifier.java +++ b/src/main/java/net/minecraft/server/ThreadLoginVerifier.java @@ -21,6 +21,7 @@ class ThreadLoginVerifier extends Thread { CraftServer server; ThreadLoginVerifier(PendingConnection pendingconnection, CraftServer server) { + super("Login Verifier Thread"); this.server = server; // CraftBukkit end this.pendingConnection = pendingconnection; diff --git a/src/main/java/org/bukkit/craftbukkit/Spigot.java b/src/main/java/org/bukkit/craftbukkit/Spigot.java index 15acc97..dbc3e48 100644 --- a/src/main/java/org/bukkit/craftbukkit/Spigot.java +++ b/src/main/java/org/bukkit/craftbukkit/Spigot.java @@ -18,6 +18,8 @@ public class Spigot { public static boolean tabPing = false; private static Metrics metrics; + public static boolean netty = true; + public static int nettyThreads = 3; public static void initialize(CraftServer server, SimpleCommandMap commandMap, YamlConfiguration configuration) { commandMap.register("bukkit", new org.bukkit.craftbukkit.command.TicksPerSecondCommand("tps")); @@ -53,6 +55,12 @@ public class Spigot { tabPing = configuration.getBoolean("settings.tab-ping", tabPing); + netty = configuration.getBoolean("settings.use-netty", netty); + nettyThreads = configuration.getInt("settings.netty-threads", nettyThreads); + if (!netty) { + server.getLogger().severe("[Warning] You have opted not to use Netty, in the future this option may be removed!"); + } + if (metrics == null) { try { metrics = new Metrics(); diff --git a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftScheduler.java b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftScheduler.java index 0a5c61a..35badf3 100644 --- a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftScheduler.java +++ b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftScheduler.java @@ -70,7 +70,7 @@ public class CraftScheduler implements BukkitScheduler { */ private final ConcurrentHashMap runners = new ConcurrentHashMap(); private volatile int currentTick = -1; - private final Executor executor = Executors.newCachedThreadPool(); + private final Executor executor = Executors.newCachedThreadPool(new com.google.common.util.concurrent.ThreadFactoryBuilder().setNameFormat("Craft Scheduler Thread - %1$d").build()); // Spigot private CraftAsyncDebugger debugHead = new CraftAsyncDebugger(-1, null, null) {@Override StringBuilder debugTo(StringBuilder string) {return string;}}; private CraftAsyncDebugger debugTail = debugHead; private static final int RECENT_TICKS; diff --git a/src/main/java/org/spigotmc/netty/CipherCodec.java b/src/main/java/org/spigotmc/netty/CipherCodec.java new file mode 100644 index 0000000..15e3466 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/CipherCodec.java @@ -0,0 +1,67 @@ +package org.spigotmc.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToByteCodec; +import javax.crypto.Cipher; +import javax.crypto.ShortBufferException; +import org.bouncycastle.crypto.BufferedBlockCipher; + +/** + * This class is a complete solution for encrypting and decoding bytes in a + * Netty stream. It takes two {@link BufferedBlockCipher} instances, used for + * encryption and decryption respectively. + */ +public class CipherCodec extends ByteToByteCodec { + + private Cipher encrypt; + private Cipher decrypt; + private ByteBuf heapOut; + + public CipherCodec(Cipher encrypt, Cipher decrypt) { + this.encrypt = encrypt; + this.decrypt = decrypt; + } + + @Override + public void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception { + if (heapOut == null) { + heapOut = ctx.alloc().heapBuffer(); + } + cipher(encrypt, in, heapOut); + out.writeBytes(heapOut); + heapOut.discardSomeReadBytes(); + } + + @Override + public void decode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception { + cipher(decrypt, in, out); + } + + @Override + public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception { + super.freeInboundBuffer(ctx); + decrypt = null; + } + + @Override + public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception { + super.freeOutboundBuffer(ctx); + if (heapOut != null) { + heapOut.release(); + heapOut = null; + } + encrypt = null; + } + + private void cipher(Cipher cipher, ByteBuf in, ByteBuf out) throws ShortBufferException { + int available = in.readableBytes(); + int outputSize = cipher.getOutputSize(available); + if (out.capacity() < outputSize) { + out.capacity(outputSize); + } + int processed = cipher.update(in.array(), in.arrayOffset() + in.readerIndex(), available, out.array(), out.arrayOffset() + out.writerIndex()); + in.readerIndex(in.readerIndex() + processed); + out.writerIndex(out.writerIndex() + processed); + } +} diff --git a/src/main/java/org/spigotmc/netty/NettyNetworkManager.java b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java new file mode 100644 index 0000000..6cb1b98 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java @@ -0,0 +1,229 @@ +package org.spigotmc.netty; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundMessageHandlerAdapter; +import io.netty.channel.socket.SocketChannel; +import java.net.Socket; +import java.net.SocketAddress; +import java.security.PrivateKey; +import java.util.AbstractList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import javax.crypto.Cipher; +import javax.crypto.SecretKey; +import net.minecraft.server.Connection; +import net.minecraft.server.INetworkManager; +import net.minecraft.server.MinecraftServer; +import net.minecraft.server.Packet; +import net.minecraft.server.Packet252KeyResponse; +import net.minecraft.server.PendingConnection; +import net.minecraft.server.PlayerConnection; + +/** + * This class forms the basis of the Netty integration. It implements + * {@link INetworkManager} and handles all events and inbound messages provided + * by the upstream Netty process. + */ +public class NettyNetworkManager extends ChannelInboundMessageHandlerAdapter implements INetworkManager { + + private static final ExecutorService threadPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("Async Packet Handler - %1$d").build()); + private static final MinecraftServer server = MinecraftServer.getServer(); + private static final PrivateKey key = server.F().getPrivate(); + private static final NettyServerConnection serverConnection = (NettyServerConnection) server.ae(); + /*========================================================================*/ + private final Queue syncPackets = new ConcurrentLinkedQueue(); + private final List highPriorityQueue = new AbstractList() { + @Override + public void add(int index, Packet element) { + // NOP + } + + @Override + public Packet get(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public int size() { + return 0; + } + }; + private volatile boolean connected; + private Channel channel; + private SocketAddress address; + private Connection connection; + private SecretKey secret; + private String dcReason; + private Object[] dcArgs; + private Socket socketAdaptor; + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + // Channel and address groundwork first + channel = ctx.channel(); + address = channel.remoteAddress(); + // Then the socket adaptor + socketAdaptor = NettySocketAdaptor.adapt((SocketChannel) channel); + // Followed by their first handler + connection = new PendingConnection(server, this); + // Finally register the connection + connected = true; + serverConnection.pendingConnections.add((PendingConnection) connection); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + a("disconnect.endOfStream", new Object[0]); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + // TODO: Remove this once we are more stable + // Bukkit.getServer().getLogger().severe("======================= Start Netty Debug Log ======================="); + // Bukkit.getServer().getLogger().log(Level.SEVERE, "Error caught whilst handling " + channel, cause); + // Bukkit.getServer().getLogger().severe("======================= End Netty Debug Log ======================="); + // Disconnect with generic reason + exception + a("disconnect.genericReason", new Object[]{"Internal exception: " + cause}); + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, final Packet msg) throws Exception { + if (msg instanceof Packet252KeyResponse) { + secret = ((Packet252KeyResponse) msg).a(key); + } + + if (msg.a_()) { + threadPool.submit(new Runnable() { + public void run() { + Packet packet = PacketListener.callReceived(NettyNetworkManager.this, connection, msg); + if (packet != null) { + packet.handle(connection); + } + } + }); + } else { + syncPackets.add(msg); + } + } + + public Socket getSocket() { + return socketAdaptor; + } + + /** + * setHandler. Set the {@link NetHandler} used to process received packets. + * + * @param nh the new {@link NetHandler} instance + */ + public void a(Connection nh) { + connection = nh; + } + + /** + * queue. Queue a packet for sending, or in this case send it to be write it + * straight to the channel. + * + * @param packet the packet to queue + */ + public void queue(Packet packet) { + // Only send if channel is still connected + if (connected) { + // Process packet via handler + packet = PacketListener.callQueued(this, connection, packet); + // If handler indicates packet send + if (packet != null) { + highPriorityQueue.add(packet); + channel.write(packet); + + // If needed, check and prepare encryption phase + if (packet instanceof Packet252KeyResponse) { + Cipher encrypt = NettyServerConnection.getCipher(Cipher.ENCRYPT_MODE, secret); + Cipher decrypt = NettyServerConnection.getCipher(Cipher.DECRYPT_MODE, secret); + CipherCodec codec = new CipherCodec(encrypt, decrypt); + channel.pipeline().addBefore("decoder", "cipher", codec); + } + } + } + } + + /** + * wakeThreads. In Vanilla this method will interrupt the network read and + * write threads, thus waking them. + */ + public void a() { + } + + /** + * processPackets. Remove up to 1000 packets from the queue and process + * them. This method should only be called from the main server thread. + */ + public void b() { + for (int i = 1000; !syncPackets.isEmpty() && i >= 0; i--) { + if (connection instanceof PendingConnection ? ((PendingConnection) connection).c : ((PlayerConnection) connection).disconnected) { + syncPackets.clear(); + break; + } + + Packet packet = PacketListener.callReceived(this, connection, syncPackets.poll()); + if (packet != null) { + packet.handle(connection); + } + } + + // Disconnect via the handler - this performs all plugin related cleanup + logging + if (!connected && (dcReason != null || dcArgs != null)) { + connection.a(dcReason, dcArgs); + } + } + + /** + * getSocketAddress. Return the remote address of the connected user. It is + * important that this method returns a value even after disconnect. + * + * @return the remote address of this connection + */ + public SocketAddress getSocketAddress() { + return address; + } + + /** + * close. Close and release all resources associated with this connection. + */ + public void d() { + if (connected) { + connected = false; + channel.close(); + } + } + + /** + * queueSize. Return the number of packets in the low priority queue. In a + * NIO environment this will always be 0. + * + * @return the size of the packet send queue + */ + public int e() { + return 0; + } + + /** + * networkShutdown. Shuts down this connection, storing the reason and + * parameters, used to notify the current {@link Connection}. + * + * @param reason the main disconnect reason + * @param arguments additional disconnect arguments, for example, the + * exception which triggered the disconnect. + */ + public void a(String reason, Object... arguments) { + if (connected) { + dcReason = reason; + dcArgs = arguments; + d(); + } + } +} diff --git a/src/main/java/org/spigotmc/netty/NettyServerConnection.java b/src/main/java/org/spigotmc/netty/NettyServerConnection.java new file mode 100644 index 0000000..586be63 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/NettyServerConnection.java @@ -0,0 +1,109 @@ +package org.spigotmc.netty; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelException; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.timeout.ReadTimeoutHandler; +import java.net.InetAddress; +import java.security.GeneralSecurityException; +import java.security.Key; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.logging.Level; +import javax.crypto.Cipher; +import javax.crypto.spec.IvParameterSpec; +import net.minecraft.server.MinecraftServer; +import net.minecraft.server.PendingConnection; +import net.minecraft.server.ServerConnection; +import org.bukkit.Bukkit; +import org.bukkit.craftbukkit.Spigot; + +/** + * This is the NettyServerConnection class. It implements + * {@link ServerConnection} and is the main interface between the Minecraft + * server and this NIO implementation. It handles starting, stopping and + * processing the Netty backend. + */ +public class NettyServerConnection extends ServerConnection { + + private final ChannelFuture socket; + final List pendingConnections = Collections.synchronizedList(new ArrayList()); + + public NettyServerConnection(MinecraftServer ms, InetAddress host, int port) { + super(ms); + socket = new ServerBootstrap().channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer() { + @Override + public void initChannel(Channel ch) throws Exception { + try { + ch.config().setOption(ChannelOption.IP_TOS, 0x18); + } catch (ChannelException ex) { + // IP_TOS is not supported (Windows XP / Windows Server 2003) + } + + ch.pipeline() + .addLast("timer", new ReadTimeoutHandler(30)) + .addLast("decoder", new PacketDecoder()) + .addLast("encoder", new PacketEncoder()) + .addLast("manager", new NettyNetworkManager()); + } + }).group(new NioEventLoopGroup(Spigot.nettyThreads, new ThreadFactoryBuilder().setNameFormat("Netty IO Thread - %1$d").build())).localAddress(host, port).bind(); + MinecraftServer.log.info("Using Netty NIO with {0} threads for network connections."); + } + + /** + * Pulse. This method pulses all connections causing them to update. It is + * called from the main server thread a few times a tick. + */ + @Override + public void b() { + super.b(); // pulse PlayerConnections + for (int i = 0; i < pendingConnections.size(); ++i) { + PendingConnection connection = pendingConnections.get(i); + + try { + connection.c(); + } catch (Exception ex) { + connection.disconnect("Internal server error"); + Bukkit.getServer().getLogger().log(Level.WARNING, "Failed to handle packet: " + ex, ex); + } + + if (connection.c) { + pendingConnections.remove(i--); + } + } + } + + /** + * Shutdown. This method is called when the server is shutting down and the + * server socket and all clients should be terminated with no further + * action. + */ + @Override + public void a() { + socket.channel().close().syncUninterruptibly(); + } + + /** + * Return a Minecraft compatible cipher instance from the specified key. + * + * @param opMode the mode to initialize the cipher in + * @param key to use as the initial vector + * @return the initialized cipher + */ + public static Cipher getCipher(int opMode, Key key) { + try { + Cipher cip = Cipher.getInstance("AES/CFB8/NoPadding"); + cip.init(opMode, key, new IvParameterSpec(key.getEncoded())); + return cip; + } catch (GeneralSecurityException ex) { + throw new RuntimeException(ex); + } + } +} diff --git a/src/main/java/org/spigotmc/netty/NettySocketAdaptor.java b/src/main/java/org/spigotmc/netty/NettySocketAdaptor.java new file mode 100644 index 0000000..a3b86b8 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/NettySocketAdaptor.java @@ -0,0 +1,248 @@ +package org.spigotmc.netty; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelOption; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.SocketException; +import java.nio.channels.SocketChannel; + +/** + * This class wraps a Netty {@link Channel} in a {@link Socket}. It overrides + * all methods in {@link Socket} to ensure that calls are not mistakingly made + * to the unsupported super socket. All operations that can be sanely applied to + * a {@link Channel} are implemented here. Those which cannot will throw an + * {@link UnsupportedOperationException}. + */ +public class NettySocketAdaptor extends Socket { + + private final io.netty.channel.socket.SocketChannel ch; + + private NettySocketAdaptor(io.netty.channel.socket.SocketChannel ch) { + this.ch = ch; + } + + public static NettySocketAdaptor adapt(io.netty.channel.socket.SocketChannel ch) { + return new NettySocketAdaptor(ch); + } + + @Override + public void bind(SocketAddress bindpoint) throws IOException { + ch.bind(bindpoint).syncUninterruptibly(); + } + + @Override + public synchronized void close() throws IOException { + ch.close().syncUninterruptibly(); + } + + @Override + public void connect(SocketAddress endpoint) throws IOException { + ch.connect(endpoint).syncUninterruptibly(); + } + + @Override + public void connect(SocketAddress endpoint, int timeout) throws IOException { + ch.config().setConnectTimeoutMillis(timeout); + ch.connect(endpoint).syncUninterruptibly(); + } + + @Override + public boolean equals(Object obj) { + return obj instanceof NettySocketAdaptor && ch.equals(((NettySocketAdaptor) obj).ch); + } + + @Override + public SocketChannel getChannel() { + throw new UnsupportedOperationException("Operation not supported on Channel wrapper."); + } + + @Override + public InetAddress getInetAddress() { + return ch.remoteAddress().getAddress(); + } + + @Override + public InputStream getInputStream() throws IOException { + throw new UnsupportedOperationException("Operation not supported on Channel wrapper."); + } + + @Override + public boolean getKeepAlive() throws SocketException { + return ch.config().getOption(ChannelOption.SO_KEEPALIVE); + } + + @Override + public InetAddress getLocalAddress() { + return ch.localAddress().getAddress(); + } + + @Override + public int getLocalPort() { + return ch.localAddress().getPort(); + } + + @Override + public SocketAddress getLocalSocketAddress() { + return ch.localAddress(); + } + + @Override + public boolean getOOBInline() throws SocketException { + throw new UnsupportedOperationException("Operation not supported on Channel wrapper."); + } + + @Override + public OutputStream getOutputStream() throws IOException { + throw new UnsupportedOperationException("Operation not supported on Channel wrapper."); + } + + @Override + public int getPort() { + return ch.remoteAddress().getPort(); + } + + @Override + public synchronized int getReceiveBufferSize() throws SocketException { + return ch.config().getOption(ChannelOption.SO_RCVBUF); + } + + @Override + public SocketAddress getRemoteSocketAddress() { + return ch.remoteAddress(); + } + + @Override + public boolean getReuseAddress() throws SocketException { + return ch.config().getOption(ChannelOption.SO_REUSEADDR); + } + + @Override + public synchronized int getSendBufferSize() throws SocketException { + return ch.config().getOption(ChannelOption.SO_SNDBUF); + } + + @Override + public int getSoLinger() throws SocketException { + return ch.config().getOption(ChannelOption.SO_LINGER); + } + + @Override + public synchronized int getSoTimeout() throws SocketException { + throw new UnsupportedOperationException("Operation not supported on Channel wrapper."); + } + + @Override + public boolean getTcpNoDelay() throws SocketException { + return ch.config().getOption(ChannelOption.TCP_NODELAY); + } + + @Override + public int getTrafficClass() throws SocketException { + return ch.config().getOption(ChannelOption.IP_TOS); + } + + @Override + public int hashCode() { + return ch.hashCode(); + } + + @Override + public boolean isBound() { + return ch.localAddress() != null; + } + + @Override + public boolean isClosed() { + return !ch.isOpen(); + } + + @Override + public boolean isConnected() { + return ch.isActive(); + } + + @Override + public boolean isInputShutdown() { + return ch.isInputShutdown(); + } + + @Override + public boolean isOutputShutdown() { + return ch.isOutputShutdown(); + } + + @Override + public void sendUrgentData(int data) throws IOException { + throw new UnsupportedOperationException("Operation not supported on Channel wrapper."); + } + + @Override + public void setKeepAlive(boolean on) throws SocketException { + ch.config().setOption(ChannelOption.SO_KEEPALIVE, on); + } + + @Override + public void setOOBInline(boolean on) throws SocketException { + throw new UnsupportedOperationException("Operation not supported on Channel wrapper."); + } + + @Override + public void setPerformancePreferences(int connectionTime, int latency, int bandwidth) { + throw new UnsupportedOperationException("Operation not supported on Channel wrapper."); + } + + @Override + public synchronized void setReceiveBufferSize(int size) throws SocketException { + ch.config().setOption(ChannelOption.SO_RCVBUF, size); + } + + @Override + public void setReuseAddress(boolean on) throws SocketException { + ch.config().setOption(ChannelOption.SO_REUSEADDR, on); + } + + @Override + public synchronized void setSendBufferSize(int size) throws SocketException { + ch.config().setOption(ChannelOption.SO_SNDBUF, size); + } + + @Override + public void setSoLinger(boolean on, int linger) throws SocketException { + ch.config().setOption(ChannelOption.SO_LINGER, linger); + } + + @Override + public synchronized void setSoTimeout(int timeout) throws SocketException { + throw new UnsupportedOperationException("Operation not supported on Channel wrapper."); + } + + @Override + public void setTcpNoDelay(boolean on) throws SocketException { + ch.config().setOption(ChannelOption.TCP_NODELAY, on); + } + + @Override + public void setTrafficClass(int tc) throws SocketException { + ch.config().setOption(ChannelOption.IP_TOS, tc); + } + + @Override + public void shutdownInput() throws IOException { + throw new UnsupportedOperationException("Operation not supported on Channel wrapper."); + } + + @Override + public void shutdownOutput() throws IOException { + ch.shutdownOutput().syncUninterruptibly(); + } + + @Override + public String toString() { + return ch.toString(); + } +} diff --git a/src/main/java/org/spigotmc/netty/PacketDecoder.java b/src/main/java/org/spigotmc/netty/PacketDecoder.java new file mode 100644 index 0000000..6ecbca7 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/PacketDecoder.java @@ -0,0 +1,63 @@ +package org.spigotmc.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ReplayingDecoder; +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.IOException; +import net.minecraft.server.Packet; + +/** + * Packet decoding class backed by a reusable {@link DataInputStream} which + * backs the input {@link ByteBuf}. Reads an unsigned byte packet header and + * then decodes the packet accordingly. + */ +public class PacketDecoder extends ReplayingDecoder { + + private DataInputStream input; + private Packet packet; + + public PacketDecoder() { + super(ReadState.HEADER); + } + + @Override + public Packet decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + if (input == null) { + input = new DataInputStream(new ByteBufInputStream(in)); + } + + switch (state()) { + case HEADER: + short packetId = in.readUnsignedByte(); + packet = Packet.d(packetId); + if (packet == null) { + throw new IOException("Bad packet id " + packetId); + } + checkpoint(ReadState.DATA); + case DATA: + try { + packet.a(input); + } catch (EOFException ex) { + return null; + } + + checkpoint(ReadState.HEADER); + Packet ret = packet; + packet = null; + + return ret; + default: + throw new IllegalStateException(); + } + } + + @Override + public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception { + super.freeInboundBuffer(ctx); + input = null; + packet = null; + } +} diff --git a/src/main/java/org/spigotmc/netty/PacketEncoder.java b/src/main/java/org/spigotmc/netty/PacketEncoder.java new file mode 100644 index 0000000..9d0b06c --- /dev/null +++ b/src/main/java/org/spigotmc/netty/PacketEncoder.java @@ -0,0 +1,43 @@ +package org.spigotmc.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; +import java.io.DataOutputStream; +import net.minecraft.server.Packet; + +/** + * Netty encoder which takes a packet and encodes it, and adds a byte packet id + * header. + */ +public class PacketEncoder extends MessageToByteEncoder { + + private ByteBuf outBuf; + private DataOutputStream dataOut; + + @Override + public void encode(ChannelHandlerContext ctx, Packet msg, ByteBuf out) throws Exception { + if (outBuf == null) { + outBuf = ctx.alloc().directBuffer(); + } + if (dataOut == null) { + dataOut = new DataOutputStream(new ByteBufOutputStream(outBuf)); + } + + out.writeByte(msg.k()); + msg.a(dataOut); + out.writeBytes(outBuf); + out.discardSomeReadBytes(); + } + + @Override + public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception { + super.freeOutboundBuffer(ctx); + if (outBuf != null) { + outBuf.release(); + outBuf = null; + } + dataOut = null; + } +} diff --git a/src/main/java/org/spigotmc/netty/PacketListener.java b/src/main/java/org/spigotmc/netty/PacketListener.java new file mode 100644 index 0000000..8e3b932 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/PacketListener.java @@ -0,0 +1,100 @@ +package org.spigotmc.netty; + +import com.google.common.base.Preconditions; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.logging.Level; +import net.minecraft.server.Connection; +import net.minecraft.server.INetworkManager; +import net.minecraft.server.Packet; +import org.bukkit.Bukkit; +import org.bukkit.plugin.Plugin; + +/** + * This class is used for plugins that wish to register to listen to incoming + * and outgoing packets. To use this class, simply create a new instance, + * override the methods you wish to use, and call + * {@link #register(org.spigotmc.netty.PacketListener, org.bukkit.plugin.Plugin)}. + */ +public class PacketListener { + + /** + * A mapping of all registered listeners and their owning plugins. + */ + private static final Map listeners = new HashMap(); + /** + * A baked list of all listeners, for efficiency sake. + */ + private static PacketListener[] baked = new PacketListener[0]; + + /** + * Used to register a handler for receiving notifications of packet + * activity. + * + * @param listener the listener to register + * @param plugin the plugin owning this listener + */ + public static synchronized void register(PacketListener listener, Plugin plugin) { + Preconditions.checkNotNull(listener, "listener"); + Preconditions.checkNotNull(plugin, "plugin"); + Preconditions.checkState(!listeners.containsKey(listener), "listener already registered"); + + int size = listeners.size(); + Preconditions.checkState(baked.length == size); + listeners.put(listener, plugin); + baked = Arrays.copyOf(baked, size + 1); + baked[size] = listener; + } + + static Packet callReceived(INetworkManager networkManager, Connection connection, Packet packet) { + for (PacketListener listener : baked) { + try { + packet = listener.packetReceived(networkManager, connection, packet); + } catch (Throwable t) { + Bukkit.getServer().getLogger().log(Level.SEVERE, "Error whilst firing receive hook for packet", t); + } + } + return packet; + } + + static Packet callQueued(INetworkManager networkManager, Connection connection, Packet packet) { + for (PacketListener listener : baked) { + try { + packet = listener.packetQueued(networkManager, connection, packet); + } catch (Throwable t) { + Bukkit.getServer().getLogger().log(Level.SEVERE, "Error whilst firing queued hook for packet", t); + } + } + return packet; + } + + /** + * Called when a packet has been received and is about to be handled by the + * current {@link Connection}. The returned packet will be the packet passed + * on for handling, or in the case of null being returned, not handled at + * all. + * + * @param networkManager the NetworkManager receiving the packet + * @param connection the connection which will handle the packet + * @param packet the received packet + * @return the packet to be handled, or null to cancel + */ + public Packet packetReceived(INetworkManager networkManager, Connection connection, Packet packet) { + return packet; + } + + /** + * Called when a packet is queued to be sent. The returned packet will be + * the packet sent. In the case of null being returned, the packet will not + * be sent. + * + * @param networkManager the NetworkManager which will send the packet + * @param connection the connection which queued the packet + * @param packet the queue packet + * @return the packet to be sent, or null if the packet will not be sent. + */ + public Packet packetQueued(INetworkManager networkManager, Connection connection, Packet packet) { + return packet; + } +} diff --git a/src/main/java/org/spigotmc/netty/ReadState.java b/src/main/java/org/spigotmc/netty/ReadState.java new file mode 100644 index 0000000..5dc3754 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/ReadState.java @@ -0,0 +1,16 @@ +package org.spigotmc.netty; + +/** + * Stores the state of the packet currently being read. + */ +public enum ReadState { + + /** + * Indicates the byte representing the ID has been read. + */ + HEADER, + /** + * Shows the packet body is being read. + */ + DATA; +} -- 1.8.1-rc2