From 81e5978088bb8d24321fbe1b93591bea61cb5cdd Mon Sep 17 00:00:00 2001 From: Aikar Date: Fri, 2 Nov 2018 22:48:33 -0400 Subject: [PATCH] Many major improvements to Async Chunk Loading Fixes some bugs with urgent priority, improves priority all around to optimize blocking chunk requests as much as possible. fixes casing on the -Dpaper.maxchunkthreads to now be -Dpaper.maxChunkThreads adds -Dpaper.genThreadPriority=3 -Dpaper.loadThreadPriority=4 lowering thread priorities will help ensure main has more priority over chunk threads --- ...1-Async-Chunk-Loading-and-Generation.patch | 389 +++++++++++------- 1 file changed, 233 insertions(+), 156 deletions(-) diff --git a/Spigot-Server-Patches/0371-Async-Chunk-Loading-and-Generation.patch b/Spigot-Server-Patches/0371-Async-Chunk-Loading-and-Generation.patch index 094e737e46..2a68b2a457 100644 --- a/Spigot-Server-Patches/0371-Async-Chunk-Loading-and-Generation.patch +++ b/Spigot-Server-Patches/0371-Async-Chunk-Loading-and-Generation.patch @@ -1,4 +1,4 @@ -From b224cfd87e0c3056aad5899c80a7caa90e1c2360 Mon Sep 17 00:00:00 2001 +From e0e4f447246b2747683ec10f598abca50cc5fe85 Mon Sep 17 00:00:00 2001 From: Aikar Date: Sat, 21 Jul 2018 16:55:04 -0400 Subject: [PATCH] Async Chunk Loading and Generation @@ -43,7 +43,7 @@ reading or writing to the chunk will be safe, so plugins still should not be touching chunks asynchronously! diff --git a/src/main/java/com/destroystokyo/paper/PaperConfig.java b/src/main/java/com/destroystokyo/paper/PaperConfig.java -index b703e08486..77d35ac99d 100644 +index b703e08486..73b0c23944 100644 --- a/src/main/java/com/destroystokyo/paper/PaperConfig.java +++ b/src/main/java/com/destroystokyo/paper/PaperConfig.java @@ -385,4 +385,57 @@ public class PaperConfig { @@ -70,7 +70,7 @@ index b703e08486..77d35ac99d 100644 + asyncChunkGenThreadPerWorld = getBoolean("settings.async-chunks.thread-per-world-generation", true); + asyncChunkLoadThreads = getInt("settings.async-chunks.load-threads", -1); + if (asyncChunkLoadThreads <= 0) { -+ asyncChunkLoadThreads = (int) Math.min(Integer.getInteger("paper.maxchunkthreads", 8), Runtime.getRuntime().availableProcessors() * 1.5); ++ asyncChunkLoadThreads = (int) Math.min(Integer.getInteger("paper.maxChunkThreads", 8), Runtime.getRuntime().availableProcessors() * 1.5); + } + + // Let Shared Host set some limits @@ -106,15 +106,12 @@ index b703e08486..77d35ac99d 100644 } diff --git a/src/main/java/com/destroystokyo/paper/util/PriorityQueuedExecutor.java b/src/main/java/com/destroystokyo/paper/util/PriorityQueuedExecutor.java new file mode 100644 -index 0000000000..e589aa356c +index 0000000000..8f18c28695 --- /dev/null +++ b/src/main/java/com/destroystokyo/paper/util/PriorityQueuedExecutor.java -@@ -0,0 +1,298 @@ +@@ -0,0 +1,347 @@ +package com.destroystokyo.paper.util; + -+import com.google.common.util.concurrent.ThreadFactoryBuilder; -+import net.minecraft.server.NamedIncrementingThreadFactory; -+ +import javax.annotation.Nonnull; +import java.util.ArrayList; +import java.util.List; @@ -122,7 +119,6 @@ index 0000000000..e589aa356c +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.RejectedExecutionException; -+import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; @@ -132,33 +128,47 @@ index 0000000000..e589aa356c + * Implements an Executor Service that allows specifying Task Priority + * and bumping of task priority. + * -+ * @author aikar ++ * This is a non blocking executor with 3 priority levels. ++ * ++ * URGENT: Rarely used, something that is critical to take action now. ++ * HIGH: Something with more importance than the base tasks ++ * ++ * @author Daniel Ennis <aikar@aikar.co> + */ +@SuppressWarnings({"WeakerAccess", "UnusedReturnValue", "unused"}) +public class PriorityQueuedExecutor extends AbstractExecutorService { ++ + private final ConcurrentLinkedQueue urgent = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue high = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue normal = new ConcurrentLinkedQueue<>(); ++ private final List threads = new ArrayList<>(); + private final RejectionHandler handler; ++ + private volatile boolean shuttingDown = false; + private volatile boolean shuttingDownNow = false; -+ private final List threads = new ArrayList<>(); + + public PriorityQueuedExecutor(String name) { -+ this(name, Runtime.getRuntime().availableProcessors(), null); ++ this(name, Math.max(1, Runtime.getRuntime().availableProcessors() - 1)); + } + + public PriorityQueuedExecutor(String name, int threads) { -+ this(name, threads, null); ++ this(name, threads, Thread.NORM_PRIORITY, null); ++ } ++ ++ public PriorityQueuedExecutor(String name, int threads, int threadPriority) { ++ this(name, threads, threadPriority, null); + } + + public PriorityQueuedExecutor(String name, int threads, RejectionHandler handler) { -+ ThreadFactory factory = new ThreadFactoryBuilder() -+ .setThreadFactory(new NamedIncrementingThreadFactory(name)) -+ .setDaemon(true) -+ .build(); ++ this(name, threads, Thread.NORM_PRIORITY, handler); ++ } ++ ++ public PriorityQueuedExecutor(String name, int threads, int threadPriority, RejectionHandler handler) { + for (int i = 0; i < threads; i++) { -+ final Thread thread = factory.newThread(this::processQueues); ++ ExecutorThread thread = new ExecutorThread(this::processQueues); ++ thread.setDaemon(true); ++ thread.setName(threads == 1 ? name : name + "-" + (i + 1)); ++ thread.setPriority(threadPriority); + thread.start(); + this.threads.add(thread); + } @@ -168,6 +178,17 @@ index 0000000000..e589aa356c + this.handler = handler; + } + ++ /** ++ * If the Current thread belongs to a PriorityQueuedExecutor, return that Executro ++ * @return The executor that controls this thread ++ */ ++ public static PriorityQueuedExecutor getExecutor() { ++ if (!(Thread.currentThread() instanceof ExecutorThread)) { ++ return null; ++ } ++ return ((ExecutorThread) Thread.currentThread()).getExecutor(); ++ } ++ + public void shutdown() { + shuttingDown = true; + synchronized (this) { @@ -235,28 +256,20 @@ index 0000000000..e589aa356c + } + + public PendingTask submitTask(Runnable run) { -+ return submitTask(createPendingTask(run)); ++ return createPendingTask(run).submit(); + } + + public PendingTask submitTask(Runnable run, Priority priority) { -+ return submitTask(createPendingTask(run, priority)); ++ return createPendingTask(run, priority).submit(); + } + + public PendingTask submitTask(Supplier run) { -+ return submitTask(createPendingTask(run)); ++ return createPendingTask(run).submit(); + } + + public PendingTask submitTask(Supplier run, Priority priority) { -+ return submitTask(createPendingTask(run, priority)); -+ } -+ -+ public PendingTask submitTask(PendingTask task) { -+ if (shuttingDown) { -+ handler.onRejection(task, this); -+ return task; -+ } -+ task.submit(this); -+ return task; ++ PendingTask task = createPendingTask(run, priority); ++ return task.submit(); + } + + @Override @@ -264,7 +277,19 @@ index 0000000000..e589aa356c + submitTask(command); + } + -+ private Runnable getTask() { ++ public boolean isCurrentThread() { ++ final Thread thread = Thread.currentThread(); ++ if (!(thread instanceof ExecutorThread)) { ++ return false; ++ } ++ return ((ExecutorThread) thread).getExecutor() == this; ++ } ++ ++ public Runnable getUrgentTask() { ++ return urgent.poll(); ++ } ++ ++ public Runnable getTask() { + Runnable run = urgent.poll(); + if (run != null) { + return run; @@ -304,10 +329,30 @@ index 0000000000..e589aa356c + } + } + ++ public boolean processUrgentTasks() { ++ Runnable run; ++ boolean hadTask = false; ++ while ((run = getUrgentTask()) != null) { ++ run.run(); ++ hadTask = true; ++ } ++ return hadTask; ++ } ++ + public enum Priority { + NORMAL, HIGH, URGENT + } + ++ public class ExecutorThread extends Thread { ++ public ExecutorThread(Runnable runnable) { ++ super(runnable); ++ } ++ ++ public PriorityQueuedExecutor getExecutor() { ++ return PriorityQueuedExecutor.this; ++ } ++ } ++ + public class PendingTask implements Runnable { + + private final AtomicBoolean hasRan = new AtomicBoolean(); @@ -350,31 +395,35 @@ index 0000000000..e589aa356c + public void bumpPriority(Priority newPriority) { + for (;;) { + int current = this.priority.get(); -+ if (current >= newPriority.ordinal()) { -+ return; -+ } -+ if (priority.compareAndSet(current, newPriority.ordinal())) { ++ int ordinal = newPriority.ordinal(); ++ if (current >= ordinal || priority.compareAndSet(current, ordinal)) { + break; + } + } + -+ if (this.executor == null) { ++ ++ if (this.submitted.get() == -1 || this.hasRan.get()) { + return; + } -+ // If we have already been submitted, resubmit with new priority -+ submit(this.executor); ++ ++ // Only resubmit if it hasnt ran yet and has been submitted ++ submit(); + } + + public CompletableFuture onDone() { + return future; + } + -+ public void submit(PriorityQueuedExecutor executor) { ++ public PendingTask submit() { ++ if (shuttingDown) { ++ handler.onRejection(this, PriorityQueuedExecutor.this); ++ return this; ++ } + for (;;) { + final int submitted = this.submitted.get(); + final int priority = this.priority.get(); + if (submitted == priority) { -+ return; ++ return this; + } + if (this.submitted.compareAndSet(submitted, priority)) { + if (priority == Priority.URGENT.ordinal()) { @@ -389,11 +438,11 @@ index 0000000000..e589aa356c + } + } + -+ //noinspection SynchronizationOnLocalVariableOrMethodParameter -+ synchronized (executor) { ++ synchronized (PriorityQueuedExecutor.this) { + // Wake up a thread to take this work -+ executor.notify(); ++ PriorityQueuedExecutor.this.notify(); + } ++ return this; + } + } + public interface RejectionHandler { @@ -933,7 +982,7 @@ index 49fba0979e..9ad646f8d4 100644 fx = fx % 360.0F; if (fx >= 180.0F) { diff --git a/src/main/java/net/minecraft/server/MinecraftServer.java b/src/main/java/net/minecraft/server/MinecraftServer.java -index 763130b036..67722440fd 100644 +index 763130b036..69b3218756 100644 --- a/src/main/java/net/minecraft/server/MinecraftServer.java +++ b/src/main/java/net/minecraft/server/MinecraftServer.java @@ -503,6 +503,7 @@ public abstract class MinecraftServer implements IAsyncTaskHandler, IMojangStati @@ -978,7 +1027,7 @@ index 763130b036..67722440fd 100644 + while (waitForChunks && !completablefuture.isDone() && isRunning()) { // Paper try { - completablefuture.get(1L, TimeUnit.SECONDS); -+ PaperAsyncChunkProvider.processChunkLoads(worldserver); // Paper ++ PaperAsyncChunkProvider.processMainThreadQueue(this); // Paper + completablefuture.get(50L, TimeUnit.MILLISECONDS); // Paper } catch (InterruptedException interruptedexception) { throw new RuntimeException(interruptedexception); @@ -1009,7 +1058,7 @@ index 763130b036..67722440fd 100644 while ((futuretask = (FutureTask) this.f.poll()) != null) { SystemUtils.a(futuretask, MinecraftServer.LOGGER); } -+ PaperAsyncChunkProvider.processChunkLoads(this); // Paper ++ PaperAsyncChunkProvider.processMainThreadQueue(this); // Paper MinecraftTimings.minecraftSchedulerTimer.stopTiming(); // Paper this.methodProfiler.c("commandFunctions"); @@ -1017,24 +1066,16 @@ index 763130b036..67722440fd 100644 // CraftBukkit - dropTickTime for (Iterator iterator = this.getWorlds().iterator(); iterator.hasNext();) { WorldServer worldserver = (WorldServer) iterator.next(); -+ PaperAsyncChunkProvider.processChunkLoads(worldserver); // Paper ++ PaperAsyncChunkProvider.processMainThreadQueue(worldserver); // Paper TileEntityHopper.skipHopperEvents = worldserver.paperConfig.disableHopperMoveEvents || org.bukkit.event.inventory.InventoryMoveItemEvent.getHandlerList().getRegisteredListeners().length == 0; // Paper i = SystemUtils.c(); if (true || worldserver.worldProvider.getDimensionManager() == DimensionManager.OVERWORLD || this.getAllowNether()) { // CraftBukkit -@@ -1109,6 +1109,7 @@ public abstract class MinecraftServer implements IAsyncTaskHandler, IMojangStati - this.methodProfiler.e(); - this.methodProfiler.e(); - worldserver.explosionDensityCache.clear(); // Paper - Optimize explosions -+ PaperAsyncChunkProvider.processChunkLoads(worldserver); // Paper - } - } - diff --git a/src/main/java/net/minecraft/server/PaperAsyncChunkProvider.java b/src/main/java/net/minecraft/server/PaperAsyncChunkProvider.java new file mode 100644 -index 0000000000..c334462f20 +index 0000000000..e9a38f9d90 --- /dev/null +++ b/src/main/java/net/minecraft/server/PaperAsyncChunkProvider.java -@@ -0,0 +1,619 @@ +@@ -0,0 +1,655 @@ +/* + * This file is licensed under the MIT License (MIT). + * @@ -1076,7 +1117,7 @@ index 0000000000..c334462f20 +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CompletableFuture; -+import java.util.concurrent.ConcurrentLinkedQueue; ++import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; @@ -1084,11 +1125,11 @@ index 0000000000..c334462f20 +@SuppressWarnings("unused") +public class PaperAsyncChunkProvider extends ChunkProviderServer { + -+ private static final PriorityQueuedExecutor EXECUTOR = new PriorityQueuedExecutor("PaperChunkLoader", PaperConfig.asyncChunks ? PaperConfig.asyncChunkLoadThreads : 0); -+ private static final PriorityQueuedExecutor SINGLE_GEN_EXECUTOR = new PriorityQueuedExecutor("PaperChunkGenerator", PaperConfig.asyncChunks && PaperConfig.asyncChunkGeneration && !PaperConfig.asyncChunkGenThreadPerWorld ? 1 : 0); -+ private static final ConcurrentLinkedQueue MAIN_THREAD_QUEUE = new ConcurrentLinkedQueue<>(); -+ private static final ThreadLocal IS_CHUNK_THREAD = ThreadLocal.withInitial(() -> false); -+ private static final ThreadLocal IS_CHUNK_GEN_THREAD = ThreadLocal.withInitial(() -> false); ++ private static final int GEN_THREAD_PRIORITY = Integer.getInteger("paper.genThreadPriority", 3); ++ private static final int LOAD_THREAD_PRIORITY = Integer.getInteger("paper.loadThreadPriority", 4); ++ private static final PriorityQueuedExecutor EXECUTOR = new PriorityQueuedExecutor("PaperChunkLoader", PaperConfig.asyncChunks ? PaperConfig.asyncChunkLoadThreads : 0, LOAD_THREAD_PRIORITY); ++ private static final PriorityQueuedExecutor SINGLE_GEN_EXECUTOR = new PriorityQueuedExecutor("PaperChunkGenerator", PaperConfig.asyncChunks && PaperConfig.asyncChunkGeneration && !PaperConfig.asyncChunkGenThreadPerWorld ? 1 : 0, GEN_THREAD_PRIORITY); ++ private static final ConcurrentLinkedDeque MAIN_THREAD_QUEUE = new ConcurrentLinkedDeque<>(); + + private final PriorityQueuedExecutor generationExecutor; + //private static final PriorityQueuedExecutor generationExecutor = new PriorityQueuedExecutor("PaperChunkGen", 1); @@ -1109,26 +1150,7 @@ index 0000000000..c334462f20 + this.chunkLoader = chunkLoader; + String worldName = this.world.getWorld().getName(); + this.shouldGenSync = generator instanceof CustomChunkGenerator && !(((CustomChunkGenerator) generator).asyncSupported) || !PaperConfig.asyncChunkGeneration; -+ this.generationExecutor = PaperConfig.asyncChunkGenThreadPerWorld ? new PriorityQueuedExecutor("PaperChunkGen-" + worldName, shouldGenSync ? 0 : 1) : SINGLE_GEN_EXECUTOR; -+ } -+ -+ static void processChunkLoads(MinecraftServer server) { -+ for (WorldServer world : server.getWorlds()) { -+ processChunkLoads(world); -+ } -+ } -+ -+ static void processChunkLoads(World world) { -+ IChunkProvider chunkProvider = world.getChunkProvider(); -+ if (chunkProvider instanceof PaperAsyncChunkProvider) { -+ ((PaperAsyncChunkProvider) chunkProvider).processChunkLoads(); -+ } -+ } -+ -+ static void stop(MinecraftServer server) { -+ for (WorldServer world : server.getWorlds()) { -+ world.getPlayerChunkMap().shutdown(); -+ } ++ this.generationExecutor = PaperConfig.asyncChunkGenThreadPerWorld ? new PriorityQueuedExecutor("PaperChunkGen-" + worldName, shouldGenSync ? 0 : 1, GEN_THREAD_PRIORITY) : SINGLE_GEN_EXECUTOR; + } + + private static Priority calculatePriority(boolean isBlockingMain, boolean priority) { @@ -1143,19 +1165,44 @@ index 0000000000..c334462f20 + return Priority.NORMAL; + } + -+ private boolean processChunkLoads() { ++ static void stop(MinecraftServer server) { ++ for (WorldServer world : server.getWorlds()) { ++ world.getPlayerChunkMap().shutdown(); ++ } ++ } ++ ++ static void processMainThreadQueue(MinecraftServer server) { ++ for (WorldServer world : server.getWorlds()) { ++ processMainThreadQueue(world); ++ } ++ } ++ ++ static void processMainThreadQueue(World world) { ++ IChunkProvider chunkProvider = world.getChunkProvider(); ++ if (chunkProvider instanceof PaperAsyncChunkProvider) { ++ ((PaperAsyncChunkProvider) chunkProvider).processMainThreadQueue(); ++ } ++ } ++ ++ private void processMainThreadQueue() { ++ processMainThreadQueue((PendingChunk) null); ++ } ++ private boolean processMainThreadQueue(PendingChunk pending) { + Runnable run; + boolean hadLoad = false; + while ((run = MAIN_THREAD_QUEUE.poll()) != null) { + run.run(); + hadLoad = true; ++ if (pending != null && pending.hasPosted) { ++ break; ++ } + } + return hadLoad; + } + + @Override + public void bumpPriority(ChunkCoordIntPair coords) { -+ PendingChunk pending = pendingChunks.get(coords.asLong()); ++ final PendingChunk pending = pendingChunks.get(coords.asLong()); + if (pending != null) { + pending.bumpPriority(Priority.HIGH); + } @@ -1170,8 +1217,8 @@ index 0000000000..c334462f20 + @Nullable + @Override + public Chunk getChunkAt(int x, int z, boolean load, boolean gen, boolean priority, Consumer consumer) { -+ long key = ChunkCoordIntPair.asLong(x, z); -+ Chunk chunk = this.chunks.get(key); ++ final long key = ChunkCoordIntPair.asLong(x, z); ++ final Chunk chunk = this.chunks.get(key); + if (chunk != null || !load) { // return null if we aren't loading + if (consumer != null) { + consumer.accept(chunk); @@ -1185,50 +1232,55 @@ index 0000000000..c334462f20 + return requestChunk(x, z, gen, priority, consumer).getChunk(); + } + -+ PendingChunkRequest requestChunk(int x, int z, boolean gen, boolean priority, Consumer consumer) { -+ final long key = ChunkCoordIntPair.asLong(x, z); ++ final PendingChunkRequest requestChunk(int x, int z, boolean gen, boolean priority, Consumer consumer) { ++ try (co.aikar.timings.Timing timing = world.timings.syncChunkLoadTimer.startTiming()) { ++ final long key = ChunkCoordIntPair.asLong(x, z); ++ final boolean isChunkThread = isChunkThread(); ++ final boolean isBlockingMain = consumer == null && server.isMainThread(); ++ final boolean loadOnThisThread = isChunkThread || isBlockingMain; ++ final Priority taskPriority = calculatePriority(isBlockingMain, priority); + -+ // Obtain a PendingChunk -+ final PendingChunk pending; -+ final boolean isBlockingMain = consumer == null && server.isMainThread(); -+ synchronized (pendingChunks) { -+ PendingChunk pendingChunk = pendingChunks.get(key); -+ if (pendingChunk == null) { -+ pending = new PendingChunk(x, z, key, gen, calculatePriority(isBlockingMain, priority)); -+ pendingChunks.put(key, pending); -+ } else if (pendingChunk.hasFinished && gen && !pendingChunk.canGenerate && pendingChunk.chunk == null) { -+ // need to overwrite the old -+ pending = new PendingChunk(x, z, key, true, calculatePriority(isBlockingMain, priority)); -+ pendingChunks.put(key, pending); -+ } else { -+ pending = pendingChunk; -+ -+ Priority newPriority = calculatePriority(isBlockingMain, priority); -+ if (pending.taskPriority != newPriority) { -+ pending.bumpPriority(newPriority); ++ // Obtain a PendingChunk ++ final PendingChunk pending; ++ synchronized (pendingChunks) { ++ PendingChunk pendingChunk = pendingChunks.get(key); ++ if (pendingChunk == null) { ++ pending = new PendingChunk(x, z, key, gen, taskPriority); ++ pendingChunks.put(key, pending); ++ } else if (pendingChunk.hasFinished && gen && !pendingChunk.canGenerate && pendingChunk.chunk == null) { ++ // need to overwrite the old ++ pending = new PendingChunk(x, z, key, true, taskPriority); ++ pendingChunks.put(key, pending); ++ } else { ++ pending = pendingChunk; ++ if (pending.taskPriority != taskPriority) { ++ pending.bumpPriority(taskPriority); ++ } + } + } -+ } -+ // Listen for when result is ready -+ final CompletableFuture future = new CompletableFuture<>(); -+ PendingChunkRequest request = pending.addListener(future, gen); -+ if (IS_CHUNK_THREAD.get()) { -+ pending.loadTask.run(); -+ } + -+ if (isBlockingMain && pending.hasFinished) { -+ processChunkLoads(); -+ request.initialReturnChunk = pending.postChunk(); -+ return request; -+ } ++ // Listen for when result is ready ++ final CompletableFuture future = new CompletableFuture<>(); ++ final PendingChunkRequest request = pending.addListener(future, gen, !loadOnThisThread); + -+ if (isBlockingMain) { -+ try (co.aikar.timings.Timing timing = world.timings.syncChunkLoadTimer.startTiming()) { ++ // Chunk Generation can trigger Chunk Loading, those loads may need to convert, and could be slow ++ // Give an opportunity for urgent tasks to jump in at these times ++ if (isChunkThread) { ++ processUrgentTasks(); ++ } ++ ++ if (loadOnThisThread) { ++ // do loads on main if blocking, or on current if we are a load/gen thread ++ // gen threads do trigger chunk loads ++ pending.loadTask.run(); ++ } ++ ++ if (isBlockingMain) { + while (!future.isDone()) { + // We aren't done, obtain lock on queue + synchronized (MAIN_THREAD_QUEUE) { + // We may of received our request now, check it -+ if (processChunkLoads()) { ++ if (processMainThreadQueue(pending)) { + // If we processed SOMETHING, don't wait + continue; + } @@ -1239,29 +1291,36 @@ index 0000000000..c334462f20 + } + } + // Queue has been notified or timed out, process it -+ processChunkLoads(); ++ processMainThreadQueue(pending); + } + // We should be done AND posted into chunk map now, return it ++ request.initialReturnChunk = pending.postChunk(); ++ } else if (consumer == null) { ++ // This is on another thread + request.initialReturnChunk = future.join(); ++ } else { ++ future.thenAccept((c) -> this.asyncHandler.postToMainThread(() -> consumer.accept(c))); + } -+ } else if (consumer == null) { -+ // This is on another thread -+ request.initialReturnChunk = future.join(); -+ } else { -+ future.thenAccept((c) -> this.asyncHandler.postToMainThread(() -> consumer.accept(c))); -+ } + -+ return request; ++ return request; ++ } ++ } ++ ++ private void processUrgentTasks() { ++ final PriorityQueuedExecutor executor = PriorityQueuedExecutor.getExecutor(); ++ if (executor != null) { ++ executor.processUrgentTasks(); ++ } + } + + @Override + public CompletableFuture loadAllChunks(Iterable iterable, Consumer consumer) { -+ Iterator iterator = iterable.iterator(); ++ final Iterator iterator = iterable.iterator(); + -+ List> all = new ArrayList<>(); ++ final List> all = new ArrayList<>(); + while (iterator.hasNext()) { -+ ChunkCoordIntPair chunkcoordintpair = iterator.next(); -+ CompletableFuture future = new CompletableFuture<>(); ++ final ChunkCoordIntPair chunkcoordintpair = iterator.next(); ++ final CompletableFuture future = new CompletableFuture<>(); + all.add(future); + this.getChunkAt(chunkcoordintpair.x, chunkcoordintpair.z, true, true, chunk -> { + future.complete(chunk); @@ -1275,8 +1334,7 @@ index 0000000000..c334462f20 + + boolean chunkGoingToExists(int x, int z) { + synchronized (pendingChunks) { -+ long key = ChunkCoordIntPair.asLong(x, z); -+ PendingChunk pendingChunk = pendingChunks.get(key); ++ PendingChunk pendingChunk = pendingChunks.get(ChunkCoordIntPair.asLong(x, z)); + return pendingChunk != null && pendingChunk.canGenerate; + } + } @@ -1350,10 +1408,22 @@ index 0000000000..c334462f20 + } + } + ++ private boolean isLoadThread() { ++ return EXECUTOR.isCurrentThread(); ++ } ++ ++ private boolean isGenThread() { ++ return generationExecutor.isCurrentThread(); ++ } ++ private boolean isChunkThread() { ++ return isLoadThread() || isGenThread(); ++ } ++ + private class PendingChunk implements Runnable { + private final int x; + private final int z; + private final long key; ++ private final long started = System.currentTimeMillis(); + private final CompletableFuture loadOnly = new CompletableFuture<>(); + private final CompletableFuture generate = new CompletableFuture<>(); + private final AtomicInteger requests = new AtomicInteger(0); @@ -1402,11 +1472,6 @@ index 0000000000..c334462f20 + } + } + -+ private Chunk generateChunkExecutor() { -+ IS_CHUNK_THREAD.set(true); -+ IS_CHUNK_GEN_THREAD.set(true); -+ return generateChunk(); -+ } + private Chunk generateChunk() { + synchronized (this) { + if (requests.get() <= 0) { @@ -1494,10 +1559,19 @@ index 0000000000..c334462f20 + this.hasFinished = true; + } + ++ if (server.isMainThread()) { ++ postChunk(); ++ return; ++ } ++ + // Don't post here, even if on main, it must enter the queue so we can exit any open batch + // schedulers, as post stage may trigger a new generation and cause errors + synchronized (MAIN_THREAD_QUEUE) { -+ MAIN_THREAD_QUEUE.add(this::postChunk); ++ if (this.taskPriority == Priority.URGENT) { ++ MAIN_THREAD_QUEUE.addFirst(this::postChunk); ++ } else { ++ MAIN_THREAD_QUEUE.addLast(this::postChunk); ++ } + MAIN_THREAD_QUEUE.notify(); + } + } @@ -1544,7 +1618,7 @@ index 0000000000..c334462f20 + } + } + -+ synchronized PendingChunkRequest addListener(CompletableFuture future, boolean gen) { ++ synchronized PendingChunkRequest addListener(CompletableFuture future, boolean gen, boolean autoSubmit) { + if (hasFinished) { + future.complete(chunk); + return new PendingChunkRequest(this); @@ -1564,24 +1638,18 @@ index 0000000000..c334462f20 + if (loadTask == null) { + // Take care of a race condition in that a request could be cancelled after the synchronize + // on pendingChunks, but before a listener is added, which would erase these pending tasks. -+ if (shouldGenSync) { -+ genTask = generationExecutor.createPendingTask(this::generateChunk, taskPriority); -+ } else { -+ genTask = generationExecutor.createPendingTask(this::generateChunkExecutor, taskPriority); -+ } ++ genTask = generationExecutor.createPendingTask(this::generateChunk, taskPriority); + loadTask = EXECUTOR.createPendingTask(this, taskPriority); -+ if (!IS_CHUNK_THREAD.get()) { ++ if (autoSubmit) { + // We will execute it outside of the synchronized context immediately after -+ EXECUTOR.submitTask(loadTask); ++ loadTask.submit(); + } + } + return new PendingChunkRequest(this, gen); + } + -+ + @Override + public void run() { -+ IS_CHUNK_THREAD.set(true); + try { + if (!loadFinished(loadChunk(x, z))) { + return; @@ -1597,18 +1665,23 @@ index 0000000000..c334462f20 + if (shouldGenSync) { + synchronized (this) { + setStatus(PendingStatus.GENERATION_PENDING); -+ MAIN_THREAD_QUEUE.add(() -> generateFinished(this.generateChunk())); ++ if (this.taskPriority == Priority.URGENT) { ++ MAIN_THREAD_QUEUE.addFirst(() -> generateFinished(this.generateChunk())); ++ } else { ++ MAIN_THREAD_QUEUE.addLast(() -> generateFinished(this.generateChunk())); ++ } ++ + } + synchronized (MAIN_THREAD_QUEUE) { + MAIN_THREAD_QUEUE.notify(); + } + } else { -+ if (IS_CHUNK_GEN_THREAD.get()) { ++ if (isGenThread()) { + // ideally we should never run into 1 chunk generating another chunk... + // but if we do, let's apply same solution + genTask.run(); + } else { -+ generationExecutor.submitTask(genTask); ++ genTask.submit(); + } + } + } @@ -1618,6 +1691,10 @@ index 0000000000..c334462f20 + } + + void bumpPriority(Priority newPriority) { ++ if (taskPriority.ordinal() >= newPriority.ordinal()) { ++ return; ++ } ++ + this.taskPriority = newPriority; + PriorityQueuedExecutor.PendingTask loadTask = this.loadTask; + PriorityQueuedExecutor.PendingTask genTask = this.genTask;