From 05e889f3468b07b2897d64ee1df3e26e763408b1 Mon Sep 17 00:00:00 2001 From: Wesley Wolfe Date: Sun, 7 Oct 2012 15:08:21 -0500 Subject: [PATCH] Queue tasks from secondary threads. Fixes BUKKIT-2546 and BUKKIT-2600 This change affects the old chat compatibility layer from an implementation only standpoint. It does not queue the 'event' to fire, but rather queues a runnable that allows the calling thread to wait for execution to finish. The other effect of this change is that rcon connects now have their commands queued to be run on next server tick using the same implementation. The internal implementation is in org.bukkit.craftbukkit.util.Waitable. It is very similar to a Future task, but only contains minimal implementation with object.wait() and object.notify() calls under the hood of waitable.get() and waitable.run(). PlayerPreLoginEvent now properly implements thread-safe event execution by queuing the events similar to chat and rcon. This is still a poor way albeit proper way to implement thread-safety; PlayerPreLoginEvent will stay deprecated. --- .../net/minecraft/server/MinecraftServer.java | 61 +++++++++---------- .../minecraft/server/NetServerHandler.java | 41 ++++++++++++- .../minecraft/server/ThreadLoginVerifier.java | 30 ++++++--- .../org/bukkit/craftbukkit/util/Waitable.java | 46 ++++++++++++++ 4 files changed, 136 insertions(+), 42 deletions(-) create mode 100644 src/main/java/org/bukkit/craftbukkit/util/Waitable.java diff --git a/src/main/java/net/minecraft/server/MinecraftServer.java b/src/main/java/net/minecraft/server/MinecraftServer.java index 477de015b1..3fb1215c5a 100644 --- a/src/main/java/net/minecraft/server/MinecraftServer.java +++ b/src/main/java/net/minecraft/server/MinecraftServer.java @@ -13,13 +13,14 @@ import java.util.logging.Level; import java.util.logging.Logger; // CraftBukkit start +import java.util.concurrent.ExecutionException; import jline.console.ConsoleReader; import joptsimple.OptionSet; import org.bukkit.World.Environment; +import org.bukkit.craftbukkit.util.Waitable; import org.bukkit.event.server.RemoteServerCommandEvent; import org.bukkit.event.world.WorldSaveEvent; -import org.bukkit.event.player.PlayerChatEvent; // CraftBukkit end public abstract class MinecraftServer implements Runnable, IMojangStatistics, ICommandListener { @@ -79,7 +80,7 @@ public abstract class MinecraftServer implements Runnable, IMojangStatistics, IC public ConsoleReader reader; public static int currentTick; public final Thread primaryThread; - public java.util.Queue chatQueue = new java.util.concurrent.ConcurrentLinkedQueue(); + public java.util.Queue processQueue = new java.util.concurrent.ConcurrentLinkedQueue(); public int autosavePeriod; // CraftBukkit end @@ -508,26 +509,9 @@ public abstract class MinecraftServer implements Runnable, IMojangStatistics, IC // CraftBukkit start - only send timeupdates to the people in that world this.server.getScheduler().mainThreadHeartbeat(this.ticks); - // Fix for old plugins still using deprecated event - while (!chatQueue.isEmpty()) { - PlayerChatEvent event = chatQueue.remove(); - org.bukkit.Bukkit.getPluginManager().callEvent(event); - - if (event.isCancelled()) { - continue; - } - - String message = String.format(event.getFormat(), event.getPlayer().getDisplayName(), event.getMessage()); - console.sendMessage(message); - if (((org.bukkit.craftbukkit.util.LazyPlayerSet) event.getRecipients()).isLazy()) { - for (Object player : getServerConfigurationManager().players) { - ((EntityPlayer) player).sendMessage(message); - } - } else { - for (org.bukkit.entity.Player player : event.getRecipients()) { - player.sendMessage(message); - } - } + // Run tasks that are waiting on processing + while (!processQueue.isEmpty()) { + processQueue.remove().run(); } // Send timeupdates to everyone, it will get the right time from the world the player is in. @@ -782,16 +766,31 @@ public abstract class MinecraftServer implements Runnable, IMojangStatistics, IC // CraftBukkit end } - public String i(String s) { - RemoteControlCommandListener.instance.b(); - // CraftBukkit start - RemoteServerCommandEvent event = new RemoteServerCommandEvent(this.remoteConsole, s); - this.server.getPluginManager().callEvent(event); - ServerCommand servercommand = new ServerCommand(event.getCommand(), RemoteControlCommandListener.instance); - // this.q.a(RemoteControlCommandListener.instance, s); - this.server.dispatchServerCommand(this.remoteConsole, servercommand); // CraftBukkit + // CraftBukkit start + public String i(final String s) { // CraftBukkit - final parameter + Waitable waitable = new Waitable() { + @Override + protected String evaluate() { + RemoteControlCommandListener.instance.b(); + // Event changes start + RemoteServerCommandEvent event = new RemoteServerCommandEvent(MinecraftServer.this.remoteConsole, s); + MinecraftServer.this.server.getPluginManager().callEvent(event); + // Event changes end + ServerCommand servercommand = new ServerCommand(event.getCommand(), RemoteControlCommandListener.instance); + // this.q.a(RemoteControlCommandListener.instance, s); + MinecraftServer.this.server.dispatchServerCommand(MinecraftServer.this.remoteConsole, servercommand); // CraftBukkit + return RemoteControlCommandListener.instance.c(); + }}; + processQueue.add(waitable); + try { + return waitable.get(); + } catch (ExecutionException e) { + throw new RuntimeException("Exception processing rcon command " + s, e.getCause()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // Maintain interrupted state + throw new RuntimeException("Interrupted processing rcon command " + s, e); + } // CraftBukkit end - return RemoteControlCommandListener.instance.c(); } public boolean isDebugging() { diff --git a/src/main/java/net/minecraft/server/NetServerHandler.java b/src/main/java/net/minecraft/server/NetServerHandler.java index b27cc152ad..d41da15f18 100644 --- a/src/main/java/net/minecraft/server/NetServerHandler.java +++ b/src/main/java/net/minecraft/server/NetServerHandler.java @@ -13,11 +13,13 @@ import java.io.UnsupportedEncodingException; import java.util.logging.Level; import java.util.HashSet; +import org.bukkit.Bukkit; import org.bukkit.Location; import org.bukkit.craftbukkit.CraftWorld; import org.bukkit.craftbukkit.inventory.CraftInventoryView; import org.bukkit.craftbukkit.inventory.CraftItemStack; import org.bukkit.craftbukkit.util.LazyPlayerSet; +import org.bukkit.craftbukkit.util.Waitable; import org.bukkit.craftbukkit.entity.CraftPlayer; import org.bukkit.craftbukkit.event.CraftEventFactory; import org.bukkit.entity.Player; @@ -832,9 +834,42 @@ public class NetServerHandler extends NetHandler { if (PlayerChatEvent.getHandlerList().getRegisteredListeners().length != 0) { // Evil plugins still listening to deprecated event - PlayerChatEvent queueEvent = new PlayerChatEvent(player, event.getMessage(), event.getFormat(), event.getRecipients()); + final PlayerChatEvent queueEvent = new PlayerChatEvent(player, event.getMessage(), event.getFormat(), event.getRecipients()); queueEvent.setCancelled(event.isCancelled()); - minecraftServer.chatQueue.add(queueEvent); + Waitable waitable = new Waitable() { + @Override + protected Object evaluate() { + Bukkit.getPluginManager().callEvent(queueEvent); + + if (queueEvent.isCancelled()) { + return null; + } + + String message = String.format(queueEvent.getFormat(), queueEvent.getPlayer().getDisplayName(), queueEvent.getMessage()); + NetServerHandler.this.minecraftServer.console.sendMessage(message); + if (((LazyPlayerSet) queueEvent.getRecipients()).isLazy()) { + for (Object player : NetServerHandler.this.minecraftServer.getServerConfigurationManager().players) { + ((EntityPlayer) player).sendMessage(message); + } + } else { + for (Player player : queueEvent.getRecipients()) { + player.sendMessage(message); + } + } + return null; + }}; + if (async) { + minecraftServer.processQueue.add(waitable); + } else { + waitable.run(); + } + try { + waitable.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // This is proper habit for java. If we aren't handling it, pass it on! + } catch (java.util.concurrent.ExecutionException e) { + throw new RuntimeException("Exception processing chat event", e.getCause()); + } } else { if (event.isCancelled()) { return; @@ -1047,7 +1082,7 @@ public class NetServerHandler extends NetHandler { PlayerPortalEvent event = new PlayerPortalEvent(this.player.getBukkitEntity(), this.player.getBukkitEntity().getLocation(), toLocation, pta, PlayerPortalEvent.TeleportCause.END_PORTAL); event.useTravelAgent(false); - org.bukkit.Bukkit.getServer().getPluginManager().callEvent(event); + Bukkit.getServer().getPluginManager().callEvent(event); this.player = this.minecraftServer.getServerConfigurationManager().moveToWorld(this.player, 0, true, event.getTo()); // CraftBukkit end } else if (this.player.q().getWorldData().isHardcore()) { diff --git a/src/main/java/net/minecraft/server/ThreadLoginVerifier.java b/src/main/java/net/minecraft/server/ThreadLoginVerifier.java index dba7d42a54..08a2703df3 100644 --- a/src/main/java/net/minecraft/server/ThreadLoginVerifier.java +++ b/src/main/java/net/minecraft/server/ThreadLoginVerifier.java @@ -8,6 +8,7 @@ import java.net.URLEncoder; // CraftBukkit start import org.bukkit.craftbukkit.CraftServer; +import org.bukkit.craftbukkit.util.Waitable; import org.bukkit.event.player.AsyncPlayerPreLoginEvent; import org.bukkit.event.player.PlayerPreLoginEvent; // CraftBukkit end @@ -46,15 +47,28 @@ class ThreadLoginVerifier extends Thread { AsyncPlayerPreLoginEvent asyncEvent = new AsyncPlayerPreLoginEvent(NetLoginHandler.d(this.netLoginHandler), this.netLoginHandler.getSocket().getInetAddress()); this.server.getPluginManager().callEvent(asyncEvent); - PlayerPreLoginEvent event = new PlayerPreLoginEvent(NetLoginHandler.d(this.netLoginHandler), this.netLoginHandler.getSocket().getInetAddress()); - if (asyncEvent.getResult() != PlayerPreLoginEvent.Result.ALLOWED) { - event.disallow(asyncEvent.getResult(), asyncEvent.getKickMessage()); - } - this.server.getPluginManager().callEvent(event); + if (PlayerPreLoginEvent.getHandlerList().getRegisteredListeners().length != 0) { + final PlayerPreLoginEvent event = new PlayerPreLoginEvent(NetLoginHandler.d(this.netLoginHandler), this.netLoginHandler.getSocket().getInetAddress()); + if (asyncEvent.getResult() != PlayerPreLoginEvent.Result.ALLOWED) { + event.disallow(asyncEvent.getResult(), asyncEvent.getKickMessage()); + } + Waitable waitable = new Waitable() { + @Override + protected PlayerPreLoginEvent.Result evaluate() { + ThreadLoginVerifier.this.server.getPluginManager().callEvent(event); + return event.getResult(); + }}; - if (event.getResult() != PlayerPreLoginEvent.Result.ALLOWED) { - this.netLoginHandler.disconnect(event.getKickMessage()); - return; + NetLoginHandler.b(this.netLoginHandler).processQueue.add(waitable); + if (waitable.get() != PlayerPreLoginEvent.Result.ALLOWED) { + this.netLoginHandler.disconnect(event.getKickMessage()); + return; + } + } else { + if (asyncEvent.getLoginResult() != AsyncPlayerPreLoginEvent.Result.ALLOWED) { + this.netLoginHandler.disconnect(asyncEvent.getKickMessage()); + return; + } } // CraftBukkit end diff --git a/src/main/java/org/bukkit/craftbukkit/util/Waitable.java b/src/main/java/org/bukkit/craftbukkit/util/Waitable.java new file mode 100644 index 0000000000..5cd1154348 --- /dev/null +++ b/src/main/java/org/bukkit/craftbukkit/util/Waitable.java @@ -0,0 +1,46 @@ +package org.bukkit.craftbukkit.util; + +import java.util.concurrent.ExecutionException; + + +public abstract class Waitable implements Runnable { + private enum Status { + WAITING, + RUNNING, + FINISHED, + } + Throwable t = null; + T value = null; + Status status = Status.WAITING; + + public final void run() { + synchronized (this) { + if (status != Status.WAITING) { + throw new IllegalStateException("Invalid state " + status); + } + status = Status.RUNNING; + } + try { + value = evaluate(); + } catch (Throwable t) { + this.t = t; + } finally { + synchronized (this) { + status = Status.FINISHED; + this.notifyAll(); + } + } + } + + protected abstract T evaluate(); + + public synchronized T get() throws InterruptedException, ExecutionException { + while (status != Status.FINISHED) { + this.wait(); + } + if (t != null) { + throw new ExecutionException(t); + } + return value; + } +}