mirror of
https://github.com/PaperMC/Paper.git
synced 2025-01-17 23:01:01 +01:00
Improved Async Task Scheduler
The Craft Scheduler still uses the primary thread for task scheduling. This results in the main thread still having to do work as part of the dispatching of async tasks. If plugins make use of lots of async tasks, such as particle emitters that want to keep the logic off the main thread, the main thread still receives quite a bit of load from processing all of these queued tasks. Additionally, resizing and managing the pending entries for all of these asynchronous tasks takes up time on the main thread too. This commit replaces the implementation of the scheduler when working with asynchronous tasks, by forwarding calls to the new scheduler. The Async Scheduler uses a single thread executor for "management" tasks. The Management Thread is responsible for all adding and dispatching of scheduled tasks. The mainThreadHeartbeat will send a heartbeat task to the management thread with the currentTick value, so that it can find which tasks to execute. Scheduling of an async tasks also dispatches a management task, ensuring that any Queue resizing operation occurs off of the main thread. The async queue uses a complete separate PriorityQueue, ensuring that resize operations are decoupled from the sync tasks queue.
This commit is contained in:
parent
3f0a574e10
commit
a13afc05a6
2 changed files with 194 additions and 8 deletions
|
@ -0,0 +1,122 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2018 Daniel Ennis (Aikar) MIT License
|
||||||
|
*
|
||||||
|
* Permission is hereby granted, free of charge, to any person obtaining
|
||||||
|
* a copy of this software and associated documentation files (the
|
||||||
|
* "Software"), to deal in the Software without restriction, including
|
||||||
|
* without limitation the rights to use, copy, modify, merge, publish,
|
||||||
|
* distribute, sublicense, and/or sell copies of the Software, and to
|
||||||
|
* permit persons to whom the Software is furnished to do so, subject to
|
||||||
|
* the following conditions:
|
||||||
|
*
|
||||||
|
* The above copyright notice and this permission notice shall be
|
||||||
|
* included in all copies or substantial portions of the Software.
|
||||||
|
*
|
||||||
|
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||||
|
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||||
|
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||||
|
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
|
||||||
|
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
|
||||||
|
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
|
||||||
|
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.bukkit.craftbukkit.scheduler;
|
||||||
|
|
||||||
|
import com.destroystokyo.paper.ServerSchedulerReportingWrapper;
|
||||||
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
import org.bukkit.plugin.Plugin;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.SynchronousQueue;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
public class CraftAsyncScheduler extends CraftScheduler {
|
||||||
|
|
||||||
|
private final ThreadPoolExecutor executor = new ThreadPoolExecutor(
|
||||||
|
4, Integer.MAX_VALUE,30L, TimeUnit.SECONDS, new SynchronousQueue<>(),
|
||||||
|
new ThreadFactoryBuilder().setNameFormat("Craft Scheduler Thread - %1$d").build());
|
||||||
|
private final Executor management = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
|
||||||
|
.setNameFormat("Craft Async Scheduler Management Thread").build());
|
||||||
|
private final List<CraftTask> temp = new ArrayList<>();
|
||||||
|
|
||||||
|
CraftAsyncScheduler() {
|
||||||
|
super(true);
|
||||||
|
executor.allowCoreThreadTimeOut(true);
|
||||||
|
executor.prestartAllCoreThreads();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void cancelTask(int taskId) {
|
||||||
|
this.management.execute(() -> this.removeTask(taskId));
|
||||||
|
}
|
||||||
|
|
||||||
|
private synchronized void removeTask(int taskId) {
|
||||||
|
parsePending();
|
||||||
|
this.pending.removeIf((task) -> {
|
||||||
|
if (task.getTaskId() == taskId) {
|
||||||
|
task.cancel0();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void mainThreadHeartbeat() {
|
||||||
|
this.currentTick++;
|
||||||
|
this.management.execute(() -> this.runTasks(currentTick));
|
||||||
|
}
|
||||||
|
|
||||||
|
private synchronized void runTasks(int currentTick) {
|
||||||
|
parsePending();
|
||||||
|
while (!this.pending.isEmpty() && this.pending.peek().getNextRun() <= currentTick) {
|
||||||
|
CraftTask task = this.pending.remove();
|
||||||
|
if (executeTask(task)) {
|
||||||
|
final long period = task.getPeriod();
|
||||||
|
if (period > 0) {
|
||||||
|
task.setNextRun(currentTick + period);
|
||||||
|
temp.add(task);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
parsePending();
|
||||||
|
}
|
||||||
|
this.pending.addAll(temp);
|
||||||
|
temp.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean executeTask(CraftTask task) {
|
||||||
|
if (isValid(task)) {
|
||||||
|
this.runners.put(task.getTaskId(), task);
|
||||||
|
this.executor.execute(new ServerSchedulerReportingWrapper(task));
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void cancelTasks(Plugin plugin) {
|
||||||
|
parsePending();
|
||||||
|
for (Iterator<CraftTask> iterator = this.pending.iterator(); iterator.hasNext(); ) {
|
||||||
|
CraftTask task = iterator.next();
|
||||||
|
if (task.getTaskId() != -1 && (plugin == null || task.getOwner().equals(plugin))) {
|
||||||
|
task.cancel0();
|
||||||
|
iterator.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Task is not cancelled
|
||||||
|
* @param runningTask
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
static boolean isValid(CraftTask runningTask) {
|
||||||
|
return runningTask.getPeriod() >= CraftTask.NO_REPEATING;
|
||||||
|
}
|
||||||
|
}
|
|
@ -75,7 +75,7 @@ public class CraftScheduler implements BukkitScheduler {
|
||||||
/**
|
/**
|
||||||
* Main thread logic only
|
* Main thread logic only
|
||||||
*/
|
*/
|
||||||
private final PriorityQueue<CraftTask> pending = new PriorityQueue<CraftTask>(10,
|
final PriorityQueue<CraftTask> pending = new PriorityQueue<CraftTask>(10, // Paper
|
||||||
new Comparator<CraftTask>() {
|
new Comparator<CraftTask>() {
|
||||||
@Override
|
@Override
|
||||||
public int compare(final CraftTask o1, final CraftTask o2) {
|
public int compare(final CraftTask o1, final CraftTask o2) {
|
||||||
|
@ -92,12 +92,13 @@ public class CraftScheduler implements BukkitScheduler {
|
||||||
/**
|
/**
|
||||||
* These are tasks that are currently active. It's provided for 'viewing' the current state.
|
* These are tasks that are currently active. It's provided for 'viewing' the current state.
|
||||||
*/
|
*/
|
||||||
private final ConcurrentHashMap<Integer, CraftTask> runners = new ConcurrentHashMap<Integer, CraftTask>();
|
final ConcurrentHashMap<Integer, CraftTask> runners = new ConcurrentHashMap<Integer, CraftTask>(); // Paper
|
||||||
/**
|
/**
|
||||||
* The sync task that is currently running on the main thread.
|
* The sync task that is currently running on the main thread.
|
||||||
*/
|
*/
|
||||||
private volatile CraftTask currentTask = null;
|
private volatile CraftTask currentTask = null;
|
||||||
private volatile int currentTick = -1;
|
// Paper start - Improved Async Task Scheduler
|
||||||
|
volatile int currentTick = -1;/*
|
||||||
private final Executor executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("Craft Scheduler Thread - %d").build());
|
private final Executor executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("Craft Scheduler Thread - %d").build());
|
||||||
private CraftAsyncDebugger debugHead = new CraftAsyncDebugger(-1, null, null) {
|
private CraftAsyncDebugger debugHead = new CraftAsyncDebugger(-1, null, null) {
|
||||||
@Override
|
@Override
|
||||||
|
@ -106,12 +107,31 @@ public class CraftScheduler implements BukkitScheduler {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
private CraftAsyncDebugger debugTail = this.debugHead;
|
private CraftAsyncDebugger debugTail = this.debugHead;
|
||||||
|
|
||||||
|
*/ // Paper end
|
||||||
private static final int RECENT_TICKS;
|
private static final int RECENT_TICKS;
|
||||||
|
|
||||||
static {
|
static {
|
||||||
RECENT_TICKS = 30;
|
RECENT_TICKS = 30;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Paper start
|
||||||
|
private final CraftScheduler asyncScheduler;
|
||||||
|
private final boolean isAsyncScheduler;
|
||||||
|
public CraftScheduler() {
|
||||||
|
this(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public CraftScheduler(boolean isAsync) {
|
||||||
|
this.isAsyncScheduler = isAsync;
|
||||||
|
if (isAsync) {
|
||||||
|
this.asyncScheduler = this;
|
||||||
|
} else {
|
||||||
|
this.asyncScheduler = new CraftAsyncScheduler();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Paper end
|
||||||
@Override
|
@Override
|
||||||
public int scheduleSyncDelayedTask(final Plugin plugin, final Runnable task) {
|
public int scheduleSyncDelayedTask(final Plugin plugin, final Runnable task) {
|
||||||
return this.scheduleSyncDelayedTask(plugin, task, 0L);
|
return this.scheduleSyncDelayedTask(plugin, task, 0L);
|
||||||
|
@ -228,7 +248,7 @@ public class CraftScheduler implements BukkitScheduler {
|
||||||
} else if (period < CraftTask.NO_REPEATING) {
|
} else if (period < CraftTask.NO_REPEATING) {
|
||||||
period = CraftTask.NO_REPEATING;
|
period = CraftTask.NO_REPEATING;
|
||||||
}
|
}
|
||||||
return this.handle(new CraftAsyncTask(this.runners, plugin, runnable, this.nextId(), period), delay);
|
return this.handle(new CraftAsyncTask(this.asyncScheduler.runners, plugin, runnable, this.nextId(), period), delay); // Paper
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -244,6 +264,11 @@ public class CraftScheduler implements BukkitScheduler {
|
||||||
if (taskId <= 0) {
|
if (taskId <= 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
// Paper start
|
||||||
|
if (!this.isAsyncScheduler) {
|
||||||
|
this.asyncScheduler.cancelTask(taskId);
|
||||||
|
}
|
||||||
|
// Paper end
|
||||||
CraftTask task = this.runners.get(taskId);
|
CraftTask task = this.runners.get(taskId);
|
||||||
if (task != null) {
|
if (task != null) {
|
||||||
task.cancel0();
|
task.cancel0();
|
||||||
|
@ -286,6 +311,11 @@ public class CraftScheduler implements BukkitScheduler {
|
||||||
@Override
|
@Override
|
||||||
public void cancelTasks(final Plugin plugin) {
|
public void cancelTasks(final Plugin plugin) {
|
||||||
Preconditions.checkArgument(plugin != null, "Cannot cancel tasks of null plugin");
|
Preconditions.checkArgument(plugin != null, "Cannot cancel tasks of null plugin");
|
||||||
|
// Paper start
|
||||||
|
if (!this.isAsyncScheduler) {
|
||||||
|
this.asyncScheduler.cancelTasks(plugin);
|
||||||
|
}
|
||||||
|
// Paper end
|
||||||
final CraftTask task = new CraftTask(
|
final CraftTask task = new CraftTask(
|
||||||
new Runnable() {
|
new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -325,6 +355,13 @@ public class CraftScheduler implements BukkitScheduler {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isCurrentlyRunning(final int taskId) {
|
public boolean isCurrentlyRunning(final int taskId) {
|
||||||
|
// Paper start
|
||||||
|
if (!this.isAsyncScheduler) {
|
||||||
|
if (this.asyncScheduler.isCurrentlyRunning(taskId)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Paper end
|
||||||
final CraftTask task = this.runners.get(taskId);
|
final CraftTask task = this.runners.get(taskId);
|
||||||
if (task == null) {
|
if (task == null) {
|
||||||
return false;
|
return false;
|
||||||
|
@ -343,6 +380,11 @@ public class CraftScheduler implements BukkitScheduler {
|
||||||
if (taskId <= 0) {
|
if (taskId <= 0) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
// Paper start
|
||||||
|
if (!this.isAsyncScheduler && this.asyncScheduler.isQueued(taskId)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
// Paper end
|
||||||
for (CraftTask task = this.head.getNext(); task != null; task = task.getNext()) {
|
for (CraftTask task = this.head.getNext(); task != null; task = task.getNext()) {
|
||||||
if (task.getTaskId() == taskId) {
|
if (task.getTaskId() == taskId) {
|
||||||
return task.getPeriod() >= CraftTask.NO_REPEATING; // The task will run
|
return task.getPeriod() >= CraftTask.NO_REPEATING; // The task will run
|
||||||
|
@ -354,6 +396,12 @@ public class CraftScheduler implements BukkitScheduler {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<BukkitWorker> getActiveWorkers() {
|
public List<BukkitWorker> getActiveWorkers() {
|
||||||
|
// Paper start
|
||||||
|
if (!isAsyncScheduler) {
|
||||||
|
//noinspection TailRecursion
|
||||||
|
return this.asyncScheduler.getActiveWorkers();
|
||||||
|
}
|
||||||
|
// Paper end
|
||||||
final ArrayList<BukkitWorker> workers = new ArrayList<BukkitWorker>();
|
final ArrayList<BukkitWorker> workers = new ArrayList<BukkitWorker>();
|
||||||
for (final CraftTask taskObj : this.runners.values()) {
|
for (final CraftTask taskObj : this.runners.values()) {
|
||||||
// Iterator will be a best-effort (may fail to grab very new values) if called from an async thread
|
// Iterator will be a best-effort (may fail to grab very new values) if called from an async thread
|
||||||
|
@ -391,6 +439,11 @@ public class CraftScheduler implements BukkitScheduler {
|
||||||
pending.add(task);
|
pending.add(task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Paper start
|
||||||
|
if (!this.isAsyncScheduler) {
|
||||||
|
pending.addAll(this.asyncScheduler.getPendingTasks());
|
||||||
|
}
|
||||||
|
// Paper end
|
||||||
return pending;
|
return pending;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -399,6 +452,11 @@ public class CraftScheduler implements BukkitScheduler {
|
||||||
*/
|
*/
|
||||||
public void mainThreadHeartbeat() {
|
public void mainThreadHeartbeat() {
|
||||||
this.currentTick++;
|
this.currentTick++;
|
||||||
|
// Paper start
|
||||||
|
if (!this.isAsyncScheduler) {
|
||||||
|
this.asyncScheduler.mainThreadHeartbeat();
|
||||||
|
}
|
||||||
|
// Paper end
|
||||||
final List<CraftTask> temp = this.temp;
|
final List<CraftTask> temp = this.temp;
|
||||||
this.parsePending();
|
this.parsePending();
|
||||||
while (this.isReady(this.currentTick)) {
|
while (this.isReady(this.currentTick)) {
|
||||||
|
@ -433,7 +491,7 @@ public class CraftScheduler implements BukkitScheduler {
|
||||||
this.parsePending();
|
this.parsePending();
|
||||||
} else {
|
} else {
|
||||||
// this.debugTail = this.debugTail.setNext(new CraftAsyncDebugger(this.currentTick + CraftScheduler.RECENT_TICKS, task.getOwner(), task.getTaskClass())); // Paper
|
// this.debugTail = this.debugTail.setNext(new CraftAsyncDebugger(this.currentTick + CraftScheduler.RECENT_TICKS, task.getOwner(), task.getTaskClass())); // Paper
|
||||||
this.executor.execute(new com.destroystokyo.paper.ServerSchedulerReportingWrapper(task)); // Paper
|
task.getOwner().getLogger().log(Level.SEVERE, "Unexpected Async Task in the Sync Scheduler. Report this to Paper"); // Paper
|
||||||
// We don't need to parse pending
|
// We don't need to parse pending
|
||||||
// (async tasks must live with race-conditions if they attempt to cancel between these few lines of code)
|
// (async tasks must live with race-conditions if they attempt to cancel between these few lines of code)
|
||||||
}
|
}
|
||||||
|
@ -450,12 +508,18 @@ public class CraftScheduler implements BukkitScheduler {
|
||||||
//this.debugHead = this.debugHead.getNextHead(this.currentTick); // Paper
|
//this.debugHead = this.debugHead.getNextHead(this.currentTick); // Paper
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addTask(final CraftTask task) {
|
protected void addTask(final CraftTask task) {
|
||||||
final CraftTask tailTask = this.tail.getAndSet(task);
|
final CraftTask tailTask = this.tail.getAndSet(task);
|
||||||
tailTask.setNext(task);
|
tailTask.setNext(task);
|
||||||
}
|
}
|
||||||
|
|
||||||
private CraftTask handle(final CraftTask task, final long delay) {
|
protected CraftTask handle(final CraftTask task, final long delay) { // Paper
|
||||||
|
// Paper start
|
||||||
|
if (!this.isAsyncScheduler && !task.isSync()) {
|
||||||
|
this.asyncScheduler.handle(task, delay);
|
||||||
|
return task;
|
||||||
|
}
|
||||||
|
// Paper end
|
||||||
task.setNextRun(this.currentTick + delay);
|
task.setNextRun(this.currentTick + delay);
|
||||||
this.addTask(task);
|
this.addTask(task);
|
||||||
return task;
|
return task;
|
||||||
|
@ -478,7 +542,7 @@ public class CraftScheduler implements BukkitScheduler {
|
||||||
return id;
|
return id;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void parsePending() {
|
void parsePending() { // Paper
|
||||||
CraftTask head = this.head;
|
CraftTask head = this.head;
|
||||||
CraftTask task = head.getNext();
|
CraftTask task = head.getNext();
|
||||||
CraftTask lastTask = head;
|
CraftTask lastTask = head;
|
||||||
|
|
Loading…
Reference in a new issue