[EXPERIMENTAL] Take #3 at an even better / faster buffering system, use at own peril!

This commit is contained in:
md_5 2013-04-27 22:13:33 +10:00
parent fb51e3066b
commit 8a035561c6

View file

@ -1,4 +1,4 @@
From 73f2cb6cf3b18d0cca0ee1057d4563ebc46fbded Mon Sep 17 00:00:00 2001 From f6adddd68afa44beb63b7da41c1ee732bb24d482 Mon Sep 17 00:00:00 2001
From: md_5 <md_5@live.com.au> From: md_5 <md_5@live.com.au>
Date: Tue, 23 Apr 2013 11:47:32 +1000 Date: Tue, 23 Apr 2013 11:47:32 +1000
Subject: [PATCH] Netty Subject: [PATCH] Netty
@ -449,22 +449,29 @@ index 0000000..2dbbf6c
+} +}
diff --git a/src/main/java/org/spigotmc/netty/NettyNetworkManager.java b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java diff --git a/src/main/java/org/spigotmc/netty/NettyNetworkManager.java b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java
new file mode 100644 new file mode 100644
index 0000000..0e1b1fd index 0000000..f581384
--- /dev/null --- /dev/null
+++ b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java +++ b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java
@@ -0,0 +1,253 @@ @@ -0,0 +1,293 @@
+package org.spigotmc.netty; +package org.spigotmc.netty;
+ +
+import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.channel.Channel; +import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundMessageHandlerAdapter; +import io.netty.channel.ChannelInboundMessageHandlerAdapter;
+import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.SocketChannel;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress; +import java.net.InetSocketAddress;
+import java.net.Socket; +import java.net.Socket;
+import java.net.SocketAddress; +import java.net.SocketAddress;
+import java.security.PrivateKey; +import java.security.PrivateKey;
+import java.util.AbstractList; +import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.List; +import java.util.List;
+import java.util.Queue; +import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentLinkedQueue;
@ -510,6 +517,7 @@ index 0000000..0e1b1fd
+ return 0; + return 0;
+ } + }
+ }; + };
+ private final Queue<Packet> realQueue = new ConcurrentLinkedQueue<Packet>();
+ private volatile boolean connected; + private volatile boolean connected;
+ private Channel channel; + private Channel channel;
+ private SocketAddress address; + private SocketAddress address;
@ -519,6 +527,7 @@ index 0000000..0e1b1fd
+ private Object[] dcArgs; + private Object[] dcArgs;
+ private Socket socketAdaptor; + private Socket socketAdaptor;
+ private long writtenBytes; + private long writtenBytes;
+ private long lastFlush;
+ +
+ @Override + @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception { + public void channelActive(ChannelHandlerContext ctx) throws Exception {
@ -594,7 +603,7 @@ index 0000000..0e1b1fd
+ * + *
+ * @param packet the packet to queue + * @param packet the packet to queue
+ */ + */
+ public void queue(Packet packet) { + public synchronized void queue(Packet packet) {
+ // Only send if channel is still connected + // Only send if channel is still connected
+ if (connected) { + if (connected) {
+ // Process packet via handler + // Process packet via handler
@ -602,6 +611,7 @@ index 0000000..0e1b1fd
+ // If handler indicates packet send + // If handler indicates packet send
+ if (packet != null) { + if (packet != null) {
+ highPriorityQueue.add(packet); + highPriorityQueue.add(packet);
+ realQueue.add(packet);
+ +
+ // If needed, check and prepare encryption phase + // If needed, check and prepare encryption phase
+ // We don't send the packet here as it is sent just before the cipher handler has been added to ensure we can safeguard from any race conditions + // We don't send the packet here as it is sent just before the cipher handler has been added to ensure we can safeguard from any race conditions
@ -612,7 +622,37 @@ index 0000000..0e1b1fd
+ CipherCodec codec = new CipherCodec(encrypt, decrypt, (Packet252KeyResponse) packet); + CipherCodec codec = new CipherCodec(encrypt, decrypt, (Packet252KeyResponse) packet);
+ channel.pipeline().addBefore("decoder", "cipher", codec); + channel.pipeline().addBefore("decoder", "cipher", codec);
+ } else { + } else {
+ channel.write(packet); + if (System.currentTimeMillis() - lastFlush > 10) {
+ lastFlush = System.currentTimeMillis();
+
+ int estimatedCapacity = 0;
+ for (Packet p : realQueue) {
+ estimatedCapacity += p.a();
+ }
+ final ByteBuf buf = channel.alloc().buffer(estimatedCapacity);
+ DataOutputStream out = new DataOutputStream(new ByteBufOutputStream(buf));
+ for (Packet p : realQueue) {
+ buf.writeByte(p.n());
+ try {
+ p.a(out);
+ } catch (IOException ex) {
+ throw new RuntimeException("Writing packet", ex);
+ }
+ }
+
+ channel.write(buf).addListener(new ChannelFutureListener() {
+ public void operationComplete(ChannelFuture future) throws Exception {
+ // TODO: Check on new netty release, can't take chances!
+ if (buf.refCnt() != 0) {
+ buf.release();
+ }
+ if (buf.refCnt() != 0) {
+ throw new AssertionError("refCnt");
+ }
+ }
+ });
+ realQueue.clear();
+ }
+ } + }
+ } + }
+ } + }
@ -708,10 +748,10 @@ index 0000000..0e1b1fd
+} +}
diff --git a/src/main/java/org/spigotmc/netty/NettyServerConnection.java b/src/main/java/org/spigotmc/netty/NettyServerConnection.java diff --git a/src/main/java/org/spigotmc/netty/NettyServerConnection.java b/src/main/java/org/spigotmc/netty/NettyServerConnection.java
new file mode 100644 new file mode 100644
index 0000000..cb58bd2 index 0000000..2a9aa0a
--- /dev/null --- /dev/null
+++ b/src/main/java/org/spigotmc/netty/NettyServerConnection.java +++ b/src/main/java/org/spigotmc/netty/NettyServerConnection.java
@@ -0,0 +1,81 @@ @@ -0,0 +1,80 @@
+package org.spigotmc.netty; +package org.spigotmc.netty;
+ +
+import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -756,7 +796,6 @@ index 0000000..cb58bd2
+ +
+ NettyNetworkManager networkManager = new NettyNetworkManager(); + NettyNetworkManager networkManager = new NettyNetworkManager();
+ ch.pipeline() + ch.pipeline()
+ .addLast("flusher", new OutboundManager())
+ .addLast("timer", new ReadTimeoutHandler(30)) + .addLast("timer", new ReadTimeoutHandler(30))
+ .addLast("decoder", new PacketDecoder()) + .addLast("decoder", new PacketDecoder())
+ .addLast("encoder", new PacketEncoder(networkManager)) + .addLast("encoder", new PacketEncoder(networkManager))
@ -1047,30 +1086,6 @@ index 0000000..a3b86b8
+ return ch.toString(); + return ch.toString();
+ } + }
+} +}
diff --git a/src/main/java/org/spigotmc/netty/OutboundManager.java b/src/main/java/org/spigotmc/netty/OutboundManager.java
new file mode 100644
index 0000000..80205ed
--- /dev/null
+++ b/src/main/java/org/spigotmc/netty/OutboundManager.java
@@ -0,0 +1,18 @@
+package org.spigotmc.netty;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOperationHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+
+public class OutboundManager extends ChannelOperationHandlerAdapter {
+
+ private static final int FLUSH_TIME = 10;
+ private long lastFlush;
+
+ public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+ if (System.currentTimeMillis() - lastFlush > FLUSH_TIME) {
+ lastFlush = System.currentTimeMillis();
+ ctx.flush(promise);
+ }
+ }
+}
diff --git a/src/main/java/org/spigotmc/netty/PacketDecoder.java b/src/main/java/org/spigotmc/netty/PacketDecoder.java diff --git a/src/main/java/org/spigotmc/netty/PacketDecoder.java b/src/main/java/org/spigotmc/netty/PacketDecoder.java
new file mode 100644 new file mode 100644
index 0000000..65074d2 index 0000000..65074d2