PaperMC/CraftBukkit-Patches/0026-Netty.patch
Spigot 27b63340b1 Don't scare users with debug mode.
By: md_5 <md_5@live.com.au>
2013-02-09 20:42:08 +11:00

948 lines
36 KiB
Diff

From 2fce59cd82dd5f03174884b81b94d8ddecf484a3 Mon Sep 17 00:00:00 2001
From: md_5 <md_5@live.com.au>
Date: Sun, 3 Feb 2013 10:24:33 +1100
Subject: [PATCH] Netty
Implement an uber efficient network engine based on the Java NIO framework Netty. This is basically a complete rewrite of the Minecraft network engine with many distinct advantages. First and foremost, there will no longer be the horrid, and redundant case of 2, or even at times, 3 threads per a connection. Instead low level select/epoll based NIO is used. The number of threads used for network reading and writing will scale automatically to the number of cores for use on your server. In most cases this will be around 8 threads for a 4 core server, much better than the up to 1000 threads that could be in use at one time with the old engine. To facilitate asynchronous packet sending or receiving (currently only chat), a cached thread pool is kept handy. Currently this pool is unbounded, however at this stage we do not need to worry about servers being resource starved due to excessive spam or chat.
== Plugin incompatibilities
As a side effect of this change, plugins which rely on very specific implementation level details within Minecraft are broken. At this point in time, TagAPI and ProtocolLib are affected. If you are a user of ProtocolLib you are advised to update to the latest build, where full support is enabled. If you are a user of TagAPI, support has not yet been added, so you will need to install the updated ProtocolLib so that TagAPI may use its functions.
== Stability
The code within this commit has been very lightly tested in production (300 players for approximately 24 hours), however it is not guaranteed to be free from all bugs. If you experence weird connection behaviour, reporting the bug and steps to reproduce are advised. You are also free to downgrade to the latest recommend build, which is guaranteed to be stable.
== Summary
This commit provides a reduction in threads, which gives the CPU / operating system more time to allocate to the main server threads, as well as various other side benefits such as chat thread pooling and a slight reduction in latency.
This commit is licensed under the Creative Commons Attribution-ShareAlike 3.0 Unported license.
---
pom.xml | 10 +
.../java/net/minecraft/server/DedicatedServer.java | 2 +-
.../net/minecraft/server/PendingConnection.java | 11 +-
src/main/java/org/spigotmc/netty/CipherCodec.java | 65 ++++++
.../org/spigotmc/netty/NettyNetworkManager.java | 192 ++++++++++++++++
.../org/spigotmc/netty/NettyServerConnection.java | 98 ++++++++
.../org/spigotmc/netty/NettySocketAdaptor.java | 248 +++++++++++++++++++++
.../java/org/spigotmc/netty/PacketDecoder.java | 52 +++++
.../java/org/spigotmc/netty/PacketEncoder.java | 43 ++++
.../java/org/spigotmc/netty/PacketListener.java | 100 +++++++++
10 files changed, 817 insertions(+), 4 deletions(-)
create mode 100644 src/main/java/org/spigotmc/netty/CipherCodec.java
create mode 100644 src/main/java/org/spigotmc/netty/NettyNetworkManager.java
create mode 100644 src/main/java/org/spigotmc/netty/NettyServerConnection.java
create mode 100644 src/main/java/org/spigotmc/netty/NettySocketAdaptor.java
create mode 100644 src/main/java/org/spigotmc/netty/PacketDecoder.java
create mode 100644 src/main/java/org/spigotmc/netty/PacketEncoder.java
create mode 100644 src/main/java/org/spigotmc/netty/PacketListener.java
diff --git a/pom.xml b/pom.xml
index f17bd19..fc7bfa0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -29,6 +29,11 @@
<id>repobo-snap</id>
<url>http://repo.bukkit.org/content/groups/public</url>
</repository>
+ <repository>
+ <id>sonatype-nexus-snapshots</id>
+ <name>Sonatype Nexus Snapshots</name>
+ <url>https://oss.sonatype.org/content/repositories/snapshots</url>
+ </repository>
</repositories>
<dependencies>
@@ -132,6 +137,11 @@
<artifactId>trove4j</artifactId>
<version>3.0.2</version>
</dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ <version>4.0.0.Beta1-SNAPSHOT</version>
+ </dependency>
</dependencies>
<!-- This builds a completely 'ready to start' jar with all dependencies inside -->
diff --git a/src/main/java/net/minecraft/server/DedicatedServer.java b/src/main/java/net/minecraft/server/DedicatedServer.java
index bd0377a..68feb71 100644
--- a/src/main/java/net/minecraft/server/DedicatedServer.java
+++ b/src/main/java/net/minecraft/server/DedicatedServer.java
@@ -93,7 +93,7 @@ public class DedicatedServer extends MinecraftServer implements IMinecraftServer
log.info("Starting Minecraft server on " + (this.getServerIp().length() == 0 ? "*" : this.getServerIp()) + ":" + this.G());
try {
- this.r = new DedicatedServerConnection(this, inetaddress, this.G());
+ this.r = new org.spigotmc.netty.NettyServerConnection(this, inetaddress, this.G());
} catch (Throwable ioexception) { // CraftBukkit - IOException -> Throwable
log.warning("**** FAILED TO BIND TO PORT!");
log.log(Level.WARNING, "The exception was: " + ioexception.toString());
diff --git a/src/main/java/net/minecraft/server/PendingConnection.java b/src/main/java/net/minecraft/server/PendingConnection.java
index 8413a15..b586386 100644
--- a/src/main/java/net/minecraft/server/PendingConnection.java
+++ b/src/main/java/net/minecraft/server/PendingConnection.java
@@ -17,7 +17,7 @@ public class PendingConnection extends Connection {
private byte[] d;
public static Logger logger = Logger.getLogger("Minecraft");
private static Random random = new Random();
- public NetworkManager networkManager;
+ public org.spigotmc.netty.NettyNetworkManager networkManager;
public boolean c = false;
private MinecraftServer server;
private int g = 0;
@@ -28,10 +28,15 @@ public class PendingConnection extends Connection {
private SecretKey l = null;
public String hostname = ""; // CraftBukkit - add field
+ public PendingConnection(MinecraftServer minecraftserver, org.spigotmc.netty.NettyNetworkManager networkManager) {
+ this.server = minecraftserver;
+ this.networkManager = networkManager;
+ }
+
public PendingConnection(MinecraftServer minecraftserver, Socket socket, String s) throws java.io.IOException { // CraftBukkit - throws IOException
this.server = minecraftserver;
- this.networkManager = new NetworkManager(socket, s, this, minecraftserver.F().getPrivate());
- this.networkManager.e = 0;
+ // this.networkManager = new NetworkManager(socket, s, this, minecraftserver.F().getPrivate());
+ // this.networkManager.e = 0;
}
// CraftBukkit start
diff --git a/src/main/java/org/spigotmc/netty/CipherCodec.java b/src/main/java/org/spigotmc/netty/CipherCodec.java
new file mode 100644
index 0000000..cfc0535
--- /dev/null
+++ b/src/main/java/org/spigotmc/netty/CipherCodec.java
@@ -0,0 +1,65 @@
+package org.spigotmc.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToByteCodec;
+import org.bouncycastle.crypto.BufferedBlockCipher;
+
+/**
+ * This class is a complete solution for encrypting and decoding bytes in a
+ * Netty stream. It takes two {@link BufferedBlockCipher} instances, used for
+ * encryption and decryption respectively.
+ */
+public class CipherCodec extends ByteToByteCodec {
+
+ private BufferedBlockCipher encrypt;
+ private BufferedBlockCipher decrypt;
+ private ByteBuf heapOut;
+
+ public CipherCodec(BufferedBlockCipher encrypt, BufferedBlockCipher decrypt) {
+ this.encrypt = encrypt;
+ this.decrypt = decrypt;
+ }
+
+ @Override
+ public void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
+ if (heapOut == null) {
+ heapOut = ctx.alloc().heapBuffer();
+ }
+ cipher(encrypt, in, heapOut);
+ out.writeBytes(heapOut);
+ heapOut.discardSomeReadBytes();
+ }
+
+ @Override
+ public void decode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
+ cipher(decrypt, in, out);
+ }
+
+ @Override
+ public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
+ super.freeInboundBuffer(ctx);
+ decrypt = null;
+ }
+
+ @Override
+ public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
+ super.freeOutboundBuffer(ctx);
+ if (heapOut != null) {
+ heapOut.free();
+ heapOut = null;
+ }
+ decrypt = null;
+ }
+
+ private void cipher(BufferedBlockCipher cipher, ByteBuf in, ByteBuf out) {
+ int available = in.readableBytes();
+ int outputSize = cipher.b(available); // getUpdateOutputSize
+ if (out.capacity() < outputSize) {
+ out.capacity(outputSize);
+ }
+ int processed = cipher.a(in.array(), in.arrayOffset() + in.readerIndex(), available, out.array(), out.arrayOffset() + out.writerIndex()); // processBytes
+ in.readerIndex(in.readerIndex() + processed);
+ out.writerIndex(out.writerIndex() + processed);
+ }
+}
diff --git a/src/main/java/org/spigotmc/netty/NettyNetworkManager.java b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java
new file mode 100644
index 0000000..a281ff9
--- /dev/null
+++ b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java
@@ -0,0 +1,192 @@
+package org.spigotmc.netty;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundMessageHandlerAdapter;
+import io.netty.channel.socket.SocketChannel;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.logging.Level;
+import javax.crypto.SecretKey;
+import net.minecraft.server.Connection;
+import net.minecraft.server.INetworkManager;
+import net.minecraft.server.MinecraftServer;
+import net.minecraft.server.Packet;
+import net.minecraft.server.Packet252KeyResponse;
+import net.minecraft.server.PendingConnection;
+import org.bouncycastle.crypto.BufferedBlockCipher;
+import org.bukkit.Bukkit;
+
+/**
+ * This class forms the basis of the Netty integration. It implements
+ * {@link INetworkManager} and handles all events and inbound messages provided
+ * by the upstream Netty process.
+ */
+public class NettyNetworkManager extends ChannelInboundMessageHandlerAdapter<Packet> implements INetworkManager {
+
+ private static final ExecutorService threadPool = Executors.newCachedThreadPool();
+ private static final MinecraftServer server = MinecraftServer.getServer();
+ private static final NettyServerConnection serverConnection = (NettyServerConnection) server.ae();
+ /*========================================================================*/
+ private Queue<Packet> syncPackets = new ConcurrentLinkedQueue<Packet>();
+ private Channel channel;
+ private SocketAddress address;
+ private Connection handler;
+ private SecretKey secret;
+ private String dcReason;
+ private Object[] dcArgs;
+ private Socket socketAdaptor;
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ // Channel and address groundwork first
+ channel = ctx.channel();
+ address = channel.remoteAddress();
+ // Then the socket adaptor
+ socketAdaptor = NettySocketAdaptor.adapt((SocketChannel) channel);
+ // Followed by their first handler
+ handler = new PendingConnection(server, this);
+ // Finally register the connection
+ serverConnection.pendingConnections.add((PendingConnection) handler);
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ // Disconnect via the handler - this performs all plugin related cleanup + logging
+ if (dcReason != null || dcArgs != null) {
+ handler.a(dcReason, dcArgs);
+ }
+ // Remove channel reference to indicate we are done
+ channel = null;
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ // TODO: Remove this once we are more stable
+ // Bukkit.getServer().getLogger().severe("======================= Start Netty Debug Log =======================");
+ // Bukkit.getServer().getLogger().log(Level.SEVERE, "Error caught whilst handling " + channel, cause);
+ // Bukkit.getServer().getLogger().severe("======================= End Netty Debug Log =======================");
+
+ // Disconnect with generic reason + exception
+ a("disconnect.genericReason", new Object[]{"Internal exception: " + cause});
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, final Packet msg) throws Exception {
+ if (msg instanceof Packet252KeyResponse) {
+ secret = ((Packet252KeyResponse) msg).a(MinecraftServer.getServer().F().getPrivate());
+ }
+ final Packet packet = PacketListener.callReceived(this, handler, msg);
+ if (packet != null) {
+ if (msg.a_()) {
+ threadPool.submit(new Runnable() {
+ public void run() {
+ msg.handle(handler);
+ }
+ });
+ } else {
+ syncPackets.add(msg);
+ }
+ }
+ }
+
+ public Socket getSocket() {
+ return socketAdaptor;
+ }
+
+ /**
+ * setHandler. Set the {@link NetHandler} used to process received packets.
+ *
+ * @param nh the new {@link NetHandler} instance
+ */
+ public void a(Connection nh) {
+ handler = nh;
+ }
+
+ /**
+ * queue. Queue a packet for sending, or in this case send it to be write it
+ * straight to the channel.
+ *
+ * @param packet the packet to queue
+ */
+ public void queue(Packet packet) {
+ if (channel != null) {
+ packet = PacketListener.callQueued(this, handler, packet);
+ if (packet != null) {
+ channel.write(packet);
+ }
+
+ if (packet instanceof Packet252KeyResponse) {
+ BufferedBlockCipher encrypt = NettyServerConnection.getCipher(true, secret);
+ BufferedBlockCipher decrypt = NettyServerConnection.getCipher(false, secret);
+ channel.pipeline().addBefore("decoder", "cipher", new CipherCodec(encrypt, decrypt));
+ }
+ }
+ }
+
+ /**
+ * wakeThreads. In Vanilla this method will interrupt the network read and
+ * write threads, thus waking them.
+ */
+ public void a() {
+ }
+
+ /**
+ * processPackets. Remove up to 1000 packets from the queue and process
+ * them. This method should only be called from the main server thread.
+ */
+ public void b() {
+ for (int i = 1000; channel != null && syncPackets != null && !syncPackets.isEmpty() && i >= 0; i--) {
+ syncPackets.poll().handle(handler);
+ }
+ }
+
+ /**
+ * getSocketAddress. Return the remote address of the connected user. It is
+ * important that this method returns a value even after disconnect.
+ *
+ * @return the remote address of this connection
+ */
+ public SocketAddress getSocketAddress() {
+ return address;
+ }
+
+ /**
+ * close. Close and release all resources associated with this connection.
+ */
+ public void d() {
+ if (channel != null) {
+ channel.close();
+ }
+ }
+
+ /**
+ * queueSize. Return the number of packets in the low priority queue. In a
+ * NIO environment this will always be 0.
+ *
+ * @return the size of the packet send queue
+ */
+ public int e() {
+ return 0;
+ }
+
+ /**
+ * networkShutdown. Shuts down this connection, storing the reason and
+ * parameters, used to notify the current {@link Connection}.
+ *
+ * @param reason the main disconnect reason
+ * @param arguments additional disconnect arguments, for example, the
+ * exception which triggered the disconnect.
+ */
+ public void a(String reason, Object... arguments) {
+ if (channel != null) {
+ dcReason = reason;
+ dcArgs = arguments;
+ channel.close();
+ }
+ }
+}
diff --git a/src/main/java/org/spigotmc/netty/NettyServerConnection.java b/src/main/java/org/spigotmc/netty/NettyServerConnection.java
new file mode 100644
index 0000000..427662e
--- /dev/null
+++ b/src/main/java/org/spigotmc/netty/NettyServerConnection.java
@@ -0,0 +1,98 @@
+package org.spigotmc.netty;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.socket.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.timeout.ReadTimeoutHandler;
+import java.net.InetAddress;
+import java.security.Key;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.logging.Level;
+import net.minecraft.server.MinecraftServer;
+import net.minecraft.server.PendingConnection;
+import net.minecraft.server.ServerConnection;
+import org.bouncycastle.crypto.BufferedBlockCipher;
+import org.bouncycastle.crypto.engines.AESFastEngine;
+import org.bouncycastle.crypto.modes.CFBBlockCipher;
+import org.bouncycastle.crypto.params.KeyParameter;
+import org.bouncycastle.crypto.params.ParametersWithIV;
+import org.bukkit.Bukkit;
+
+/**
+ * This is the NettyServerConnection class. It implements
+ * {@link ServerConnection} and is the main interface between the Minecraft
+ * server and this NIO implementation. It handles starting, stopping and
+ * processing the Netty backend.
+ */
+public class NettyServerConnection extends ServerConnection {
+
+ private final ChannelFuture socket;
+ final List<PendingConnection> pendingConnections = Collections.synchronizedList(new ArrayList<PendingConnection>());
+
+ public NettyServerConnection(MinecraftServer ms, InetAddress host, int port) {
+ super(ms);
+ socket = new ServerBootstrap().channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer() {
+ @Override
+ public void initChannel(Channel ch) throws Exception {
+ ch.pipeline()
+ .addLast("timer", new ReadTimeoutHandler(30))
+ .addLast("decoder", new PacketDecoder())
+ .addLast("encoder", new PacketEncoder())
+ .addLast("manager", new NettyNetworkManager());
+ }
+ }).childOption(ChannelOption.IP_TOS, 0x18).childOption(ChannelOption.TCP_NODELAY, true).group(new NioEventLoopGroup()).localAddress(host, port).bind();
+ }
+
+ /**
+ * Pulse. This method pulses all connections causing them to update. It is
+ * called from the main server thread a few times a tick.
+ */
+ @Override
+ public void b() {
+ super.b(); // pulse PlayerConnections
+ for (int i = 0; i < pendingConnections.size(); ++i) {
+ PendingConnection connection = pendingConnections.get(i);
+
+ try {
+ connection.c();
+ } catch (Exception ex) {
+ connection.disconnect("Internal server error");
+ Bukkit.getServer().getLogger().log(Level.WARNING, "Failed to handle packet: " + ex, ex);
+ }
+
+ if (connection.c) {
+ pendingConnections.remove(i--);
+ }
+ }
+ }
+
+ /**
+ * Shutdown. This method is called when the server is shutting down and the
+ * server socket and all clients should be terminated with no further
+ * action.
+ */
+ @Override
+ public void a() {
+ socket.channel().close().syncUninterruptibly();
+ }
+
+ /**
+ * Return a Minecraft compatible cipher instance from the specified key.
+ *
+ * @param forEncryption whether the returned cipher shall be usable for
+ * encryption or decryption
+ * @param key to use as the initial vector
+ * @return the initialized cipher
+ */
+ public static BufferedBlockCipher getCipher(boolean forEncryption, Key key) {
+ BufferedBlockCipher cip = new BufferedBlockCipher(new CFBBlockCipher(new AESFastEngine(), 8));
+ cip.a(forEncryption, new ParametersWithIV(new KeyParameter(key.getEncoded()), key.getEncoded(), 0, 16));
+ return cip;
+ }
+}
diff --git a/src/main/java/org/spigotmc/netty/NettySocketAdaptor.java b/src/main/java/org/spigotmc/netty/NettySocketAdaptor.java
new file mode 100644
index 0000000..a3b86b8
--- /dev/null
+++ b/src/main/java/org/spigotmc/netty/NettySocketAdaptor.java
@@ -0,0 +1,248 @@
+package org.spigotmc.netty;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelOption;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.nio.channels.SocketChannel;
+
+/**
+ * This class wraps a Netty {@link Channel} in a {@link Socket}. It overrides
+ * all methods in {@link Socket} to ensure that calls are not mistakingly made
+ * to the unsupported super socket. All operations that can be sanely applied to
+ * a {@link Channel} are implemented here. Those which cannot will throw an
+ * {@link UnsupportedOperationException}.
+ */
+public class NettySocketAdaptor extends Socket {
+
+ private final io.netty.channel.socket.SocketChannel ch;
+
+ private NettySocketAdaptor(io.netty.channel.socket.SocketChannel ch) {
+ this.ch = ch;
+ }
+
+ public static NettySocketAdaptor adapt(io.netty.channel.socket.SocketChannel ch) {
+ return new NettySocketAdaptor(ch);
+ }
+
+ @Override
+ public void bind(SocketAddress bindpoint) throws IOException {
+ ch.bind(bindpoint).syncUninterruptibly();
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ ch.close().syncUninterruptibly();
+ }
+
+ @Override
+ public void connect(SocketAddress endpoint) throws IOException {
+ ch.connect(endpoint).syncUninterruptibly();
+ }
+
+ @Override
+ public void connect(SocketAddress endpoint, int timeout) throws IOException {
+ ch.config().setConnectTimeoutMillis(timeout);
+ ch.connect(endpoint).syncUninterruptibly();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof NettySocketAdaptor && ch.equals(((NettySocketAdaptor) obj).ch);
+ }
+
+ @Override
+ public SocketChannel getChannel() {
+ throw new UnsupportedOperationException("Operation not supported on Channel wrapper.");
+ }
+
+ @Override
+ public InetAddress getInetAddress() {
+ return ch.remoteAddress().getAddress();
+ }
+
+ @Override
+ public InputStream getInputStream() throws IOException {
+ throw new UnsupportedOperationException("Operation not supported on Channel wrapper.");
+ }
+
+ @Override
+ public boolean getKeepAlive() throws SocketException {
+ return ch.config().getOption(ChannelOption.SO_KEEPALIVE);
+ }
+
+ @Override
+ public InetAddress getLocalAddress() {
+ return ch.localAddress().getAddress();
+ }
+
+ @Override
+ public int getLocalPort() {
+ return ch.localAddress().getPort();
+ }
+
+ @Override
+ public SocketAddress getLocalSocketAddress() {
+ return ch.localAddress();
+ }
+
+ @Override
+ public boolean getOOBInline() throws SocketException {
+ throw new UnsupportedOperationException("Operation not supported on Channel wrapper.");
+ }
+
+ @Override
+ public OutputStream getOutputStream() throws IOException {
+ throw new UnsupportedOperationException("Operation not supported on Channel wrapper.");
+ }
+
+ @Override
+ public int getPort() {
+ return ch.remoteAddress().getPort();
+ }
+
+ @Override
+ public synchronized int getReceiveBufferSize() throws SocketException {
+ return ch.config().getOption(ChannelOption.SO_RCVBUF);
+ }
+
+ @Override
+ public SocketAddress getRemoteSocketAddress() {
+ return ch.remoteAddress();
+ }
+
+ @Override
+ public boolean getReuseAddress() throws SocketException {
+ return ch.config().getOption(ChannelOption.SO_REUSEADDR);
+ }
+
+ @Override
+ public synchronized int getSendBufferSize() throws SocketException {
+ return ch.config().getOption(ChannelOption.SO_SNDBUF);
+ }
+
+ @Override
+ public int getSoLinger() throws SocketException {
+ return ch.config().getOption(ChannelOption.SO_LINGER);
+ }
+
+ @Override
+ public synchronized int getSoTimeout() throws SocketException {
+ throw new UnsupportedOperationException("Operation not supported on Channel wrapper.");
+ }
+
+ @Override
+ public boolean getTcpNoDelay() throws SocketException {
+ return ch.config().getOption(ChannelOption.TCP_NODELAY);
+ }
+
+ @Override
+ public int getTrafficClass() throws SocketException {
+ return ch.config().getOption(ChannelOption.IP_TOS);
+ }
+
+ @Override
+ public int hashCode() {
+ return ch.hashCode();
+ }
+
+ @Override
+ public boolean isBound() {
+ return ch.localAddress() != null;
+ }
+
+ @Override
+ public boolean isClosed() {
+ return !ch.isOpen();
+ }
+
+ @Override
+ public boolean isConnected() {
+ return ch.isActive();
+ }
+
+ @Override
+ public boolean isInputShutdown() {
+ return ch.isInputShutdown();
+ }
+
+ @Override
+ public boolean isOutputShutdown() {
+ return ch.isOutputShutdown();
+ }
+
+ @Override
+ public void sendUrgentData(int data) throws IOException {
+ throw new UnsupportedOperationException("Operation not supported on Channel wrapper.");
+ }
+
+ @Override
+ public void setKeepAlive(boolean on) throws SocketException {
+ ch.config().setOption(ChannelOption.SO_KEEPALIVE, on);
+ }
+
+ @Override
+ public void setOOBInline(boolean on) throws SocketException {
+ throw new UnsupportedOperationException("Operation not supported on Channel wrapper.");
+ }
+
+ @Override
+ public void setPerformancePreferences(int connectionTime, int latency, int bandwidth) {
+ throw new UnsupportedOperationException("Operation not supported on Channel wrapper.");
+ }
+
+ @Override
+ public synchronized void setReceiveBufferSize(int size) throws SocketException {
+ ch.config().setOption(ChannelOption.SO_RCVBUF, size);
+ }
+
+ @Override
+ public void setReuseAddress(boolean on) throws SocketException {
+ ch.config().setOption(ChannelOption.SO_REUSEADDR, on);
+ }
+
+ @Override
+ public synchronized void setSendBufferSize(int size) throws SocketException {
+ ch.config().setOption(ChannelOption.SO_SNDBUF, size);
+ }
+
+ @Override
+ public void setSoLinger(boolean on, int linger) throws SocketException {
+ ch.config().setOption(ChannelOption.SO_LINGER, linger);
+ }
+
+ @Override
+ public synchronized void setSoTimeout(int timeout) throws SocketException {
+ throw new UnsupportedOperationException("Operation not supported on Channel wrapper.");
+ }
+
+ @Override
+ public void setTcpNoDelay(boolean on) throws SocketException {
+ ch.config().setOption(ChannelOption.TCP_NODELAY, on);
+ }
+
+ @Override
+ public void setTrafficClass(int tc) throws SocketException {
+ ch.config().setOption(ChannelOption.IP_TOS, tc);
+ }
+
+ @Override
+ public void shutdownInput() throws IOException {
+ throw new UnsupportedOperationException("Operation not supported on Channel wrapper.");
+ }
+
+ @Override
+ public void shutdownOutput() throws IOException {
+ ch.shutdownOutput().syncUninterruptibly();
+ }
+
+ @Override
+ public String toString() {
+ return ch.toString();
+ }
+}
diff --git a/src/main/java/org/spigotmc/netty/PacketDecoder.java b/src/main/java/org/spigotmc/netty/PacketDecoder.java
new file mode 100644
index 0000000..e3fdd95
--- /dev/null
+++ b/src/main/java/org/spigotmc/netty/PacketDecoder.java
@@ -0,0 +1,52 @@
+package org.spigotmc.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ReplayingDecoder;
+import java.io.DataInputStream;
+import java.io.IOException;
+import net.minecraft.server.Packet;
+import net.minecraft.server.Packet254GetInfo;
+
+/**
+ * Packet decoding class backed by a reusable {@link DataInputStream} which
+ * backs the input {@link ByteBuf}. Reads an unsigned byte packet header and
+ * then decodes the packet accordingly.
+ */
+public class PacketDecoder extends ReplayingDecoder<Packet> {
+
+ private DataInputStream input;
+ private long lastPingRead;
+
+ @Override
+ public Packet decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
+ if (input == null) {
+ input = new DataInputStream(new ByteBufInputStream(in));
+ }
+
+ short packetId = in.readUnsignedByte();
+ Packet packet = Packet.d(packetId);
+ if (packet == null) {
+ throw new IOException("Bad packet id " + packetId);
+ }
+
+ packet.a(input);
+
+ if (packetId == 0xFE && ((Packet254GetInfo) packet).a == 0) {
+ long currentTime = System.currentTimeMillis();
+ if ((lastPingRead == 0 && (lastPingRead = currentTime) == currentTime) || currentTime - lastPingRead < 1500) {
+ return null;
+ }
+ }
+
+ return packet;
+ }
+
+ @Override
+ public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
+ super.freeInboundBuffer(ctx);
+ input = null;
+ lastPingRead = 0;
+ }
+}
diff --git a/src/main/java/org/spigotmc/netty/PacketEncoder.java b/src/main/java/org/spigotmc/netty/PacketEncoder.java
new file mode 100644
index 0000000..11aa05f
--- /dev/null
+++ b/src/main/java/org/spigotmc/netty/PacketEncoder.java
@@ -0,0 +1,43 @@
+package org.spigotmc.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+import java.io.DataOutputStream;
+import net.minecraft.server.Packet;
+
+/**
+ * Netty encoder which takes a packet and encodes it, and adds a byte packet id
+ * header.
+ */
+public class PacketEncoder extends MessageToByteEncoder<Packet> {
+
+ private ByteBuf outBuf;
+ private DataOutputStream dataOut;
+
+ @Override
+ public void encode(ChannelHandlerContext ctx, Packet msg, ByteBuf out) throws Exception {
+ if (outBuf == null) {
+ outBuf = ctx.alloc().directBuffer();
+ }
+ if (dataOut == null) {
+ dataOut = new DataOutputStream(new ByteBufOutputStream(outBuf));
+ }
+
+ out.writeByte(msg.k());
+ msg.a(dataOut);
+ out.writeBytes(outBuf);
+ out.discardSomeReadBytes();
+ }
+
+ @Override
+ public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
+ super.freeOutboundBuffer(ctx);
+ if (outBuf != null) {
+ outBuf.free();
+ outBuf = null;
+ }
+ dataOut = null;
+ }
+}
diff --git a/src/main/java/org/spigotmc/netty/PacketListener.java b/src/main/java/org/spigotmc/netty/PacketListener.java
new file mode 100644
index 0000000..8e3b932
--- /dev/null
+++ b/src/main/java/org/spigotmc/netty/PacketListener.java
@@ -0,0 +1,100 @@
+package org.spigotmc.netty;
+
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Level;
+import net.minecraft.server.Connection;
+import net.minecraft.server.INetworkManager;
+import net.minecraft.server.Packet;
+import org.bukkit.Bukkit;
+import org.bukkit.plugin.Plugin;
+
+/**
+ * This class is used for plugins that wish to register to listen to incoming
+ * and outgoing packets. To use this class, simply create a new instance,
+ * override the methods you wish to use, and call
+ * {@link #register(org.spigotmc.netty.PacketListener, org.bukkit.plugin.Plugin)}.
+ */
+public class PacketListener {
+
+ /**
+ * A mapping of all registered listeners and their owning plugins.
+ */
+ private static final Map<PacketListener, Plugin> listeners = new HashMap<PacketListener, Plugin>();
+ /**
+ * A baked list of all listeners, for efficiency sake.
+ */
+ private static PacketListener[] baked = new PacketListener[0];
+
+ /**
+ * Used to register a handler for receiving notifications of packet
+ * activity.
+ *
+ * @param listener the listener to register
+ * @param plugin the plugin owning this listener
+ */
+ public static synchronized void register(PacketListener listener, Plugin plugin) {
+ Preconditions.checkNotNull(listener, "listener");
+ Preconditions.checkNotNull(plugin, "plugin");
+ Preconditions.checkState(!listeners.containsKey(listener), "listener already registered");
+
+ int size = listeners.size();
+ Preconditions.checkState(baked.length == size);
+ listeners.put(listener, plugin);
+ baked = Arrays.copyOf(baked, size + 1);
+ baked[size] = listener;
+ }
+
+ static Packet callReceived(INetworkManager networkManager, Connection connection, Packet packet) {
+ for (PacketListener listener : baked) {
+ try {
+ packet = listener.packetReceived(networkManager, connection, packet);
+ } catch (Throwable t) {
+ Bukkit.getServer().getLogger().log(Level.SEVERE, "Error whilst firing receive hook for packet", t);
+ }
+ }
+ return packet;
+ }
+
+ static Packet callQueued(INetworkManager networkManager, Connection connection, Packet packet) {
+ for (PacketListener listener : baked) {
+ try {
+ packet = listener.packetQueued(networkManager, connection, packet);
+ } catch (Throwable t) {
+ Bukkit.getServer().getLogger().log(Level.SEVERE, "Error whilst firing queued hook for packet", t);
+ }
+ }
+ return packet;
+ }
+
+ /**
+ * Called when a packet has been received and is about to be handled by the
+ * current {@link Connection}. The returned packet will be the packet passed
+ * on for handling, or in the case of null being returned, not handled at
+ * all.
+ *
+ * @param networkManager the NetworkManager receiving the packet
+ * @param connection the connection which will handle the packet
+ * @param packet the received packet
+ * @return the packet to be handled, or null to cancel
+ */
+ public Packet packetReceived(INetworkManager networkManager, Connection connection, Packet packet) {
+ return packet;
+ }
+
+ /**
+ * Called when a packet is queued to be sent. The returned packet will be
+ * the packet sent. In the case of null being returned, the packet will not
+ * be sent.
+ *
+ * @param networkManager the NetworkManager which will send the packet
+ * @param connection the connection which queued the packet
+ * @param packet the queue packet
+ * @return the packet to be sent, or null if the packet will not be sent.
+ */
+ public Packet packetQueued(INetworkManager networkManager, Connection connection, Packet packet) {
+ return packet;
+ }
+}
--
1.8.1-rc2