Queue tasks from secondary threads. Fixes BUKKIT-2546 and BUKKIT-2600

This change affects the old chat compatibility layer from an
implementation only standpoint. It does not queue the 'event' to fire,
but rather queues a runnable that allows the calling thread to wait for
execution to finish.

The other effect of this change is that rcon connects now have their
commands queued to be run on next server tick using the same
implementation.

The internal implementation is in org.bukkit.craftbukkit.util.Waitable.
It is very similar to a Future<T> task, but only contains minimal
implementation with object.wait() and object.notify() calls
under the hood of waitable.get() and waitable.run().

PlayerPreLoginEvent now properly implements thread-safe event execution
by queuing the events similar to chat and rcon. This is still a poor way
albeit proper way to implement thread-safety; PlayerPreLoginEvent will
stay deprecated.
This commit is contained in:
Wesley Wolfe 2012-10-07 15:08:21 -05:00
parent 93a79cd0e6
commit 05e889f346
4 changed files with 136 additions and 42 deletions

View file

@ -13,13 +13,14 @@ import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
// CraftBukkit start // CraftBukkit start
import java.util.concurrent.ExecutionException;
import jline.console.ConsoleReader; import jline.console.ConsoleReader;
import joptsimple.OptionSet; import joptsimple.OptionSet;
import org.bukkit.World.Environment; import org.bukkit.World.Environment;
import org.bukkit.craftbukkit.util.Waitable;
import org.bukkit.event.server.RemoteServerCommandEvent; import org.bukkit.event.server.RemoteServerCommandEvent;
import org.bukkit.event.world.WorldSaveEvent; import org.bukkit.event.world.WorldSaveEvent;
import org.bukkit.event.player.PlayerChatEvent;
// CraftBukkit end // CraftBukkit end
public abstract class MinecraftServer implements Runnable, IMojangStatistics, ICommandListener { public abstract class MinecraftServer implements Runnable, IMojangStatistics, ICommandListener {
@ -79,7 +80,7 @@ public abstract class MinecraftServer implements Runnable, IMojangStatistics, IC
public ConsoleReader reader; public ConsoleReader reader;
public static int currentTick; public static int currentTick;
public final Thread primaryThread; public final Thread primaryThread;
public java.util.Queue<PlayerChatEvent> chatQueue = new java.util.concurrent.ConcurrentLinkedQueue<PlayerChatEvent>(); public java.util.Queue<Runnable> processQueue = new java.util.concurrent.ConcurrentLinkedQueue<Runnable>();
public int autosavePeriod; public int autosavePeriod;
// CraftBukkit end // CraftBukkit end
@ -508,26 +509,9 @@ public abstract class MinecraftServer implements Runnable, IMojangStatistics, IC
// CraftBukkit start - only send timeupdates to the people in that world // CraftBukkit start - only send timeupdates to the people in that world
this.server.getScheduler().mainThreadHeartbeat(this.ticks); this.server.getScheduler().mainThreadHeartbeat(this.ticks);
// Fix for old plugins still using deprecated event // Run tasks that are waiting on processing
while (!chatQueue.isEmpty()) { while (!processQueue.isEmpty()) {
PlayerChatEvent event = chatQueue.remove(); processQueue.remove().run();
org.bukkit.Bukkit.getPluginManager().callEvent(event);
if (event.isCancelled()) {
continue;
}
String message = String.format(event.getFormat(), event.getPlayer().getDisplayName(), event.getMessage());
console.sendMessage(message);
if (((org.bukkit.craftbukkit.util.LazyPlayerSet) event.getRecipients()).isLazy()) {
for (Object player : getServerConfigurationManager().players) {
((EntityPlayer) player).sendMessage(message);
}
} else {
for (org.bukkit.entity.Player player : event.getRecipients()) {
player.sendMessage(message);
}
}
} }
// Send timeupdates to everyone, it will get the right time from the world the player is in. // Send timeupdates to everyone, it will get the right time from the world the player is in.
@ -782,16 +766,31 @@ public abstract class MinecraftServer implements Runnable, IMojangStatistics, IC
// CraftBukkit end // CraftBukkit end
} }
public String i(String s) { // CraftBukkit start
RemoteControlCommandListener.instance.b(); public String i(final String s) { // CraftBukkit - final parameter
// CraftBukkit start Waitable<String> waitable = new Waitable<String>() {
RemoteServerCommandEvent event = new RemoteServerCommandEvent(this.remoteConsole, s); @Override
this.server.getPluginManager().callEvent(event); protected String evaluate() {
ServerCommand servercommand = new ServerCommand(event.getCommand(), RemoteControlCommandListener.instance); RemoteControlCommandListener.instance.b();
// this.q.a(RemoteControlCommandListener.instance, s); // Event changes start
this.server.dispatchServerCommand(this.remoteConsole, servercommand); // CraftBukkit RemoteServerCommandEvent event = new RemoteServerCommandEvent(MinecraftServer.this.remoteConsole, s);
MinecraftServer.this.server.getPluginManager().callEvent(event);
// Event changes end
ServerCommand servercommand = new ServerCommand(event.getCommand(), RemoteControlCommandListener.instance);
// this.q.a(RemoteControlCommandListener.instance, s);
MinecraftServer.this.server.dispatchServerCommand(MinecraftServer.this.remoteConsole, servercommand); // CraftBukkit
return RemoteControlCommandListener.instance.c();
}};
processQueue.add(waitable);
try {
return waitable.get();
} catch (ExecutionException e) {
throw new RuntimeException("Exception processing rcon command " + s, e.getCause());
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Maintain interrupted state
throw new RuntimeException("Interrupted processing rcon command " + s, e);
}
// CraftBukkit end // CraftBukkit end
return RemoteControlCommandListener.instance.c();
} }
public boolean isDebugging() { public boolean isDebugging() {

View file

@ -13,11 +13,13 @@ import java.io.UnsupportedEncodingException;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.HashSet; import java.util.HashSet;
import org.bukkit.Bukkit;
import org.bukkit.Location; import org.bukkit.Location;
import org.bukkit.craftbukkit.CraftWorld; import org.bukkit.craftbukkit.CraftWorld;
import org.bukkit.craftbukkit.inventory.CraftInventoryView; import org.bukkit.craftbukkit.inventory.CraftInventoryView;
import org.bukkit.craftbukkit.inventory.CraftItemStack; import org.bukkit.craftbukkit.inventory.CraftItemStack;
import org.bukkit.craftbukkit.util.LazyPlayerSet; import org.bukkit.craftbukkit.util.LazyPlayerSet;
import org.bukkit.craftbukkit.util.Waitable;
import org.bukkit.craftbukkit.entity.CraftPlayer; import org.bukkit.craftbukkit.entity.CraftPlayer;
import org.bukkit.craftbukkit.event.CraftEventFactory; import org.bukkit.craftbukkit.event.CraftEventFactory;
import org.bukkit.entity.Player; import org.bukkit.entity.Player;
@ -832,9 +834,42 @@ public class NetServerHandler extends NetHandler {
if (PlayerChatEvent.getHandlerList().getRegisteredListeners().length != 0) { if (PlayerChatEvent.getHandlerList().getRegisteredListeners().length != 0) {
// Evil plugins still listening to deprecated event // Evil plugins still listening to deprecated event
PlayerChatEvent queueEvent = new PlayerChatEvent(player, event.getMessage(), event.getFormat(), event.getRecipients()); final PlayerChatEvent queueEvent = new PlayerChatEvent(player, event.getMessage(), event.getFormat(), event.getRecipients());
queueEvent.setCancelled(event.isCancelled()); queueEvent.setCancelled(event.isCancelled());
minecraftServer.chatQueue.add(queueEvent); Waitable waitable = new Waitable() {
@Override
protected Object evaluate() {
Bukkit.getPluginManager().callEvent(queueEvent);
if (queueEvent.isCancelled()) {
return null;
}
String message = String.format(queueEvent.getFormat(), queueEvent.getPlayer().getDisplayName(), queueEvent.getMessage());
NetServerHandler.this.minecraftServer.console.sendMessage(message);
if (((LazyPlayerSet) queueEvent.getRecipients()).isLazy()) {
for (Object player : NetServerHandler.this.minecraftServer.getServerConfigurationManager().players) {
((EntityPlayer) player).sendMessage(message);
}
} else {
for (Player player : queueEvent.getRecipients()) {
player.sendMessage(message);
}
}
return null;
}};
if (async) {
minecraftServer.processQueue.add(waitable);
} else {
waitable.run();
}
try {
waitable.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // This is proper habit for java. If we aren't handling it, pass it on!
} catch (java.util.concurrent.ExecutionException e) {
throw new RuntimeException("Exception processing chat event", e.getCause());
}
} else { } else {
if (event.isCancelled()) { if (event.isCancelled()) {
return; return;
@ -1047,7 +1082,7 @@ public class NetServerHandler extends NetHandler {
PlayerPortalEvent event = new PlayerPortalEvent(this.player.getBukkitEntity(), this.player.getBukkitEntity().getLocation(), toLocation, pta, PlayerPortalEvent.TeleportCause.END_PORTAL); PlayerPortalEvent event = new PlayerPortalEvent(this.player.getBukkitEntity(), this.player.getBukkitEntity().getLocation(), toLocation, pta, PlayerPortalEvent.TeleportCause.END_PORTAL);
event.useTravelAgent(false); event.useTravelAgent(false);
org.bukkit.Bukkit.getServer().getPluginManager().callEvent(event); Bukkit.getServer().getPluginManager().callEvent(event);
this.player = this.minecraftServer.getServerConfigurationManager().moveToWorld(this.player, 0, true, event.getTo()); this.player = this.minecraftServer.getServerConfigurationManager().moveToWorld(this.player, 0, true, event.getTo());
// CraftBukkit end // CraftBukkit end
} else if (this.player.q().getWorldData().isHardcore()) { } else if (this.player.q().getWorldData().isHardcore()) {

View file

@ -8,6 +8,7 @@ import java.net.URLEncoder;
// CraftBukkit start // CraftBukkit start
import org.bukkit.craftbukkit.CraftServer; import org.bukkit.craftbukkit.CraftServer;
import org.bukkit.craftbukkit.util.Waitable;
import org.bukkit.event.player.AsyncPlayerPreLoginEvent; import org.bukkit.event.player.AsyncPlayerPreLoginEvent;
import org.bukkit.event.player.PlayerPreLoginEvent; import org.bukkit.event.player.PlayerPreLoginEvent;
// CraftBukkit end // CraftBukkit end
@ -46,15 +47,28 @@ class ThreadLoginVerifier extends Thread {
AsyncPlayerPreLoginEvent asyncEvent = new AsyncPlayerPreLoginEvent(NetLoginHandler.d(this.netLoginHandler), this.netLoginHandler.getSocket().getInetAddress()); AsyncPlayerPreLoginEvent asyncEvent = new AsyncPlayerPreLoginEvent(NetLoginHandler.d(this.netLoginHandler), this.netLoginHandler.getSocket().getInetAddress());
this.server.getPluginManager().callEvent(asyncEvent); this.server.getPluginManager().callEvent(asyncEvent);
PlayerPreLoginEvent event = new PlayerPreLoginEvent(NetLoginHandler.d(this.netLoginHandler), this.netLoginHandler.getSocket().getInetAddress()); if (PlayerPreLoginEvent.getHandlerList().getRegisteredListeners().length != 0) {
if (asyncEvent.getResult() != PlayerPreLoginEvent.Result.ALLOWED) { final PlayerPreLoginEvent event = new PlayerPreLoginEvent(NetLoginHandler.d(this.netLoginHandler), this.netLoginHandler.getSocket().getInetAddress());
event.disallow(asyncEvent.getResult(), asyncEvent.getKickMessage()); if (asyncEvent.getResult() != PlayerPreLoginEvent.Result.ALLOWED) {
} event.disallow(asyncEvent.getResult(), asyncEvent.getKickMessage());
this.server.getPluginManager().callEvent(event); }
Waitable<PlayerPreLoginEvent.Result> waitable = new Waitable<PlayerPreLoginEvent.Result>() {
@Override
protected PlayerPreLoginEvent.Result evaluate() {
ThreadLoginVerifier.this.server.getPluginManager().callEvent(event);
return event.getResult();
}};
if (event.getResult() != PlayerPreLoginEvent.Result.ALLOWED) { NetLoginHandler.b(this.netLoginHandler).processQueue.add(waitable);
this.netLoginHandler.disconnect(event.getKickMessage()); if (waitable.get() != PlayerPreLoginEvent.Result.ALLOWED) {
return; this.netLoginHandler.disconnect(event.getKickMessage());
return;
}
} else {
if (asyncEvent.getLoginResult() != AsyncPlayerPreLoginEvent.Result.ALLOWED) {
this.netLoginHandler.disconnect(asyncEvent.getKickMessage());
return;
}
} }
// CraftBukkit end // CraftBukkit end

View file

@ -0,0 +1,46 @@
package org.bukkit.craftbukkit.util;
import java.util.concurrent.ExecutionException;
public abstract class Waitable<T> implements Runnable {
private enum Status {
WAITING,
RUNNING,
FINISHED,
}
Throwable t = null;
T value = null;
Status status = Status.WAITING;
public final void run() {
synchronized (this) {
if (status != Status.WAITING) {
throw new IllegalStateException("Invalid state " + status);
}
status = Status.RUNNING;
}
try {
value = evaluate();
} catch (Throwable t) {
this.t = t;
} finally {
synchronized (this) {
status = Status.FINISHED;
this.notifyAll();
}
}
}
protected abstract T evaluate();
public synchronized T get() throws InterruptedException, ExecutionException {
while (status != Status.FINISHED) {
this.wait();
}
if (t != null) {
throw new ExecutionException(t);
}
return value;
}
}