mirror of
https://github.com/PaperMC/Paper.git
synced 2025-01-16 06:30:46 +01:00
Merge chunk task urgent executor thread into the worker queue
By keeping them separate, urgent tasks could not be executed by the worker queue.
This commit is contained in:
parent
b8de21cf48
commit
9dd2c32156
1 changed files with 52 additions and 26 deletions
|
@ -1022,6 +1022,27 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000
|
|||
+ }
|
||||
+
|
||||
+ /**
|
||||
+ * Polls the highest priority task currently available. {@code null} if none.
|
||||
+ */
|
||||
+ public T poll(final int lowestPriority) {
|
||||
+ T task;
|
||||
+ final int max = Math.min(LOWEST_PRIORITY, lowestPriority);
|
||||
+ for (int i = 0; i <= max; ++i) {
|
||||
+ final ConcurrentLinkedQueue<T> queue = this.queues[i];
|
||||
+
|
||||
+ while ((task = queue.poll()) != null) {
|
||||
+ final int prevPriority = task.tryComplete(i);
|
||||
+ if (prevPriority != COMPLETING_PRIORITY && prevPriority <= i) {
|
||||
+ // if the prev priority was greater-than or equal to our current priority
|
||||
+ return task;
|
||||
+ }
|
||||
+ }
|
||||
+ }
|
||||
+
|
||||
+ return null;
|
||||
+ }
|
||||
+
|
||||
+ /**
|
||||
+ * Returns whether this queue may have tasks queued.
|
||||
+ * <p>
|
||||
+ * This operation is not atomic, but is MT-Safe.
|
||||
|
@ -1222,6 +1243,19 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000
|
|||
+ protected volatile ConcurrentLinkedQueue<Thread> flushQueue = new ConcurrentLinkedQueue<>();
|
||||
+ protected volatile long flushCycles;
|
||||
+
|
||||
+ protected int lowestPriorityToPoll = PrioritizedTaskQueue.LOWEST_PRIORITY;
|
||||
+
|
||||
+ public int getLowestPriorityToPoll() {
|
||||
+ return this.lowestPriorityToPoll;
|
||||
+ }
|
||||
+
|
||||
+ public void setLowestPriorityToPoll(final int lowestPriorityToPoll) {
|
||||
+ if (this.isAlive()) {
|
||||
+ throw new IllegalStateException("Cannot set after starting");
|
||||
+ }
|
||||
+ this.lowestPriorityToPoll = lowestPriorityToPoll;
|
||||
+ }
|
||||
+
|
||||
+ public QueueExecutorThread(final PrioritizedTaskQueue<T> queue) {
|
||||
+ this(queue, (int)(1.e6)); // 1.0ms
|
||||
+ }
|
||||
|
@ -1300,7 +1334,7 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000
|
|||
+ Runnable task;
|
||||
+ boolean ret = false;
|
||||
+
|
||||
+ while ((task = this.queue.poll()) != null) {
|
||||
+ while ((task = this.queue.poll(this.lowestPriorityToPoll)) != null) {
|
||||
+ ret = true;
|
||||
+ try {
|
||||
+ task.run();
|
||||
|
@ -1795,9 +1829,7 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000
|
|||
+ private final PrioritizedTaskQueue<ChunkTask> chunkTasks = new PrioritizedTaskQueue<>(); // used if async chunks are disabled in config
|
||||
+
|
||||
+ protected static QueueExecutorThread<ChunkTask>[] globalWorkers;
|
||||
+ protected static QueueExecutorThread<ChunkTask> globalUrgentWorker;
|
||||
+ protected static PrioritizedTaskQueue<ChunkTask> globalQueue;
|
||||
+ protected static PrioritizedTaskQueue<ChunkTask> globalUrgentQueue;
|
||||
+
|
||||
+ protected static final ConcurrentLinkedQueue<Runnable> CHUNK_WAIT_QUEUE = new ConcurrentLinkedQueue<>();
|
||||
+
|
||||
|
@ -1891,12 +1923,12 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000
|
|||
+ if (threads <= 0 || globalWorkers != null) {
|
||||
+ return;
|
||||
+ }
|
||||
+ ++threads; // add one for urgent executor
|
||||
+
|
||||
+ globalWorkers = new QueueExecutorThread[threads];
|
||||
+ globalQueue = new PrioritizedTaskQueue<>();
|
||||
+ globalUrgentQueue = new PrioritizedTaskQueue<>();
|
||||
+
|
||||
+ for (int i = 0; i < threads; ++i) {
|
||||
+ for (int i = 0; i < (threads - 1); ++i) {
|
||||
+ globalWorkers[i] = new QueueExecutorThread<>(globalQueue, (long)0.10e6); //0.1ms
|
||||
+ globalWorkers[i].setName("Paper Async Chunk Task Thread #" + i);
|
||||
+ globalWorkers[i].setPriority(Thread.NORM_PRIORITY - 1);
|
||||
|
@ -1907,14 +1939,14 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000
|
|||
+ globalWorkers[i].start();
|
||||
+ }
|
||||
+
|
||||
+ globalUrgentWorker = new QueueExecutorThread<>(globalUrgentQueue, (long)0.10e6); //0.1ms
|
||||
+ globalUrgentWorker.setName("Paper Async Chunk Urgent Task Thread");
|
||||
+ globalUrgentWorker.setPriority(Thread.NORM_PRIORITY+1);
|
||||
+ globalUrgentWorker.setUncaughtExceptionHandler((final Thread thread, final Throwable throwable) -> {
|
||||
+ globalWorkers[threads - 1] = new QueueExecutorThread<>(globalQueue, (long)0.10e6); //0.1ms
|
||||
+ globalWorkers[threads - 1].setName("Paper Async Chunk Urgent Task Thread");
|
||||
+ globalWorkers[threads - 1].setPriority(Thread.NORM_PRIORITY+1);
|
||||
+ globalWorkers[threads - 1].setUncaughtExceptionHandler((final Thread thread, final Throwable throwable) -> {
|
||||
+ PaperFileIOThread.LOGGER.fatal("Thread '" + thread.getName() + "' threw an uncaught exception!", throwable);
|
||||
+ });
|
||||
+
|
||||
+ globalUrgentWorker.start();
|
||||
+ globalWorkers[threads - 1].setLowestPriorityToPoll(PrioritizedTaskQueue.HIGHEST_PRIORITY);
|
||||
+ globalWorkers[threads - 1].start();
|
||||
+ }
|
||||
+
|
||||
+ /**
|
||||
|
@ -2165,7 +2197,6 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000
|
|||
+ worker.flush();
|
||||
+ }
|
||||
+ }
|
||||
+ if (globalUrgentWorker != null) globalUrgentWorker.flush();
|
||||
+
|
||||
+ // flush again since tasks we execute async saves
|
||||
+ drainChunkWaitQueue();
|
||||
|
@ -2215,15 +2246,10 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000
|
|||
+ if (task.isScheduled() && raised && this.workers != null) {
|
||||
+ // only notify if we're in queue to be executed
|
||||
+ if (priority == PrioritizedTaskQueue.HIGHEST_PRIORITY) {
|
||||
+ // was in another queue but became urgent later, add to urgent queue and the previous
|
||||
+ // queue will just have to ignore this task if it has already been started.
|
||||
+ // Ultimately, we now have 2 potential queues that can pull it out whoever gets it first
|
||||
+ // but the urgent queue has dedicated thread(s) so it's likely to win....
|
||||
+ globalUrgentQueue.add(task);
|
||||
+ // notify urgent worker as well
|
||||
+ this.internalScheduleNotifyUrgent();
|
||||
+ } else {
|
||||
+ this.internalScheduleNotify();
|
||||
+ }
|
||||
+ this.internalScheduleNotify();
|
||||
+ }
|
||||
+ }
|
||||
+
|
||||
|
@ -2235,12 +2261,11 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000
|
|||
+
|
||||
+ // It's important we order the task to be executed before notifying. Avoid a race condition where the worker thread
|
||||
+ // wakes up and goes to sleep before we actually schedule (or it's just about to sleep)
|
||||
+ this.queue.add(task);
|
||||
+ this.internalScheduleNotify();
|
||||
+ if (task.getPriority() == PrioritizedTaskQueue.HIGHEST_PRIORITY) {
|
||||
+ globalUrgentQueue.add(task);
|
||||
+ // notify urgent too
|
||||
+ this.internalScheduleNotifyUrgent();
|
||||
+ } else {
|
||||
+ this.queue.add(task);
|
||||
+ this.internalScheduleNotify();
|
||||
+ }
|
||||
+
|
||||
+ }
|
||||
|
@ -2249,7 +2274,8 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000
|
|||
+ if (this.workers == null) {
|
||||
+ return;
|
||||
+ }
|
||||
+ for (final QueueExecutorThread<ChunkTask> worker : this.workers) {
|
||||
+ for (int i = 0, len = this.workers.length - 1; i < len; ++i) {
|
||||
+ final QueueExecutorThread<ChunkTask> worker = this.workers[i];
|
||||
+ if (worker.notifyTasks()) {
|
||||
+ // break here since we only want to wake up one worker for scheduling one task
|
||||
+ break;
|
||||
|
@ -2259,10 +2285,10 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000
|
|||
+
|
||||
+
|
||||
+ protected void internalScheduleNotifyUrgent() {
|
||||
+ if (globalUrgentWorker == null) {
|
||||
+ if (this.workers == null) {
|
||||
+ return;
|
||||
+ }
|
||||
+ globalUrgentWorker.notifyTasks();
|
||||
+ this.workers[this.workers.length - 1].notifyTasks();
|
||||
+ }
|
||||
+
|
||||
+}
|
||||
|
|
Loading…
Reference in a new issue