Fix race conditions in flush allowing for previously scheduled tasks to execute later than the flush call (#2548)

This commit is contained in:
Spottedleaf 2019-09-15 21:02:13 -07:00 committed by Zach
parent 9e1620e3d3
commit 30f9955e2a

View file

@ -1,4 +1,4 @@
From 78fb1395fde968993c5cc8d1f76ed4989fde3b7f Mon Sep 17 00:00:00 2001 From b0905f4b72eaba949fcedd2c1fdbbb90d24aced4 Mon Sep 17 00:00:00 2001
From: Spottedleaf <Spottedleaf@users.noreply.github.com> From: Spottedleaf <Spottedleaf@users.noreply.github.com>
Date: Sat, 13 Jul 2019 09:23:10 -0700 Date: Sat, 13 Jul 2019 09:23:10 -0700
Subject: [PATCH] Asynchronous chunk IO and loading Subject: [PATCH] Asynchronous chunk IO and loading
@ -1053,10 +1053,10 @@ index 000000000..4f10a8311
+} +}
diff --git a/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java b/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java diff --git a/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java b/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java
new file mode 100644 new file mode 100644
index 000000000..c3ca3c4a1 index 000000000..78bd238f4
--- /dev/null --- /dev/null
+++ b/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java +++ b/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java
@@ -0,0 +1,258 @@ @@ -0,0 +1,276 @@
+package com.destroystokyo.paper.io; +package com.destroystokyo.paper.io;
+ +
+import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentLinkedQueue;
@ -1160,6 +1160,24 @@ index 000000000..c3ca3c4a1
+ } + }
+ +
+ /** + /**
+ * Returns whether this queue may have tasks queued.
+ * <p>
+ * This operation is not atomic, but is MT-Safe.
+ * </p>
+ * @return {@code true} if tasks may be queued, {@code false} otherwise
+ */
+ public boolean hasTasks() {
+ for (int i = 0; i < TOTAL_PRIORITIES; ++i) {
+ final ConcurrentLinkedQueue<T> queue = this.queues[i];
+
+ if (queue.peek() != null) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Prevent further additions to this queue. Attempts to add after this call has completed (potentially during) will + * Prevent further additions to this queue. Attempts to add after this call has completed (potentially during) will
+ * result in {@link IllegalStateException} being thrown. + * result in {@link IllegalStateException} being thrown.
+ * <p> + * <p>
@ -1317,10 +1335,10 @@ index 000000000..c3ca3c4a1
+} +}
diff --git a/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java b/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java diff --git a/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java b/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java
new file mode 100644 new file mode 100644
index 000000000..f127ef236 index 000000000..ee906b594
--- /dev/null --- /dev/null
+++ b/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java +++ b/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java
@@ -0,0 +1,244 @@ @@ -0,0 +1,241 @@
+package com.destroystokyo.paper.io; +package com.destroystokyo.paper.io;
+ +
+import net.minecraft.server.MinecraftServer; +import net.minecraft.server.MinecraftServer;
@ -1342,11 +1360,7 @@ index 000000000..f127ef236
+ protected final AtomicBoolean parked = new AtomicBoolean(); + protected final AtomicBoolean parked = new AtomicBoolean();
+ +
+ protected volatile ConcurrentLinkedQueue<Thread> flushQueue = new ConcurrentLinkedQueue<>(); + protected volatile ConcurrentLinkedQueue<Thread> flushQueue = new ConcurrentLinkedQueue<>();
+ + protected volatile long flushCycles;
+ // this is required to synchronize LockSupport#park()
+ // LockSupport explicitly states that it will only follow ordering with respect to volatile access
+ // see flush() for more details
+ protected volatile long flushCounter;
+ +
+ public QueueExecutorThread(final PrioritizedTaskQueue<T> queue) { + public QueueExecutorThread(final PrioritizedTaskQueue<T> queue) {
+ this(queue, (int)(1.e6)); // 1.0ms + this(queue, (int)(1.e6)); // 1.0ms
@ -1392,20 +1406,14 @@ index 000000000..f127ef236
+ } + }
+ +
+ this.parked.set(true); + this.parked.set(true);
+
+ // We need to parse here to avoid a race condition where a thread queues a task before we set parked to true + // We need to parse here to avoid a race condition where a thread queues a task before we set parked to true
+ // (i.e it will not notify us) + // (i.e it will not notify us)
+
+ // it also resolves race condition where we've overriden a concurrent thread's flush call which set parked to false
+ // the important ordering: (volatile guarantees we cannot re-order the below events)
+ // us: parked -> true, parse tasks -> writeCounter + 1 -> drain flush queue
+ // them: read write counter -> add to flush queue -> write parked to false -> park loop
+
+ // if we overwrite their set parked to false call then they're in the park loop or about to be, and we're about to
+ // drain the flush queue
+ if (this.pollTasks(true)) { + if (this.pollTasks(true)) {
+ this.parked.set(false); + this.parked.set(false);
+ continue; + continue;
+ } + }
+
+ if (this.handleClose()) { + if (this.handleClose()) {
+ return; + return;
+ } + }
@ -1452,19 +1460,22 @@ index 000000000..f127ef236
+ } + }
+ +
+ protected void handleFlushThreads(final boolean shutdown) { + protected void handleFlushThreads(final boolean shutdown) {
+ final ConcurrentLinkedQueue<Thread> flushQueue = this.flushQueue; // Note: this can be a plain read + Thread parking;
+ if (shutdown) { + ConcurrentLinkedQueue<Thread> flushQueue = this.flushQueue;
+ this.flushQueue = null; // Note: this can be a release write + do {
+ ++flushCycles; // may be plain read opaque write
+ while ((parking = flushQueue.poll()) != null) {
+ LockSupport.unpark(parking);
+ } + }
+ } while (this.pollTasks(false));
+ +
+ Thread current; + if (shutdown) {
+ this.flushQueue = null;
+ +
+ while ((current = flushQueue.poll()) != null) { + // defend against a race condition where a flush thread double-checks right before we set to null
+ this.pollTasks(false); + while ((parking = flushQueue.poll()) != null) {
+ // increment flush counter so threads will wake up after being unparked() + LockSupport.unpark(parking);
+ //noinspection NonAtomicOperationOnVolatileField + }
+ ++this.flushCounter; // may be plain read plain write if we order before poll() (also would need to re-order pollTasks)
+ LockSupport.unpark(current);
+ } + }
+ } + }
+ +
@ -1485,7 +1496,6 @@ index 000000000..f127ef236
+ this.notifyTasks(); + this.notifyTasks();
+ } + }
+ +
+
+ /** + /**
+ * Waits until this thread's queue is empty. + * Waits until this thread's queue is empty.
+ * + *
@ -1501,42 +1511,47 @@ index 000000000..f127ef236
+ +
+ // order is important + // order is important
+ +
+ long flushCounter = this.flushCounter; + int successes = 0;
+ long lastCycle = -1L;
+ +
+ ConcurrentLinkedQueue<Thread> flushQueue = this.flushQueue; + do {
+ + final ConcurrentLinkedQueue<Thread> flushQueue = this.flushQueue;
+ // it's important to read the flush queue after the flush counter to ensure that if we proceed from here
+ // we have a flush counter that would be different from the final flush counter if the queue executor shuts down
+ // the double read of the flush queue is not enough to account for this since
+ if (flushQueue == null) { + if (flushQueue == null) {
+ return; // queue executor has received shutdown and emptied queue + return;
+ } + }
+ +
+ flushQueue.add(currentThread); + flushQueue.add(currentThread);
+ +
+ // re-check null flush queue, we need to guarantee the executor is not shutting down before parking + // double check flush queue
+
+ if (this.flushQueue == null) { + if (this.flushQueue == null) {
+ // cannot guarantee state of flush queue now, the executor is done though
+ return; + return;
+ } + }
+ +
+ // force a response from the IO thread, we're not sure of its state currently + final long currentCycle = this.flushCycles; // may be opaque read
+
+ if (currentCycle == lastCycle) {
+ Thread.yield();
+ continue;
+ }
+
+ // force response
+ this.parked.set(false); + this.parked.set(false);
+ LockSupport.unpark(this); + LockSupport.unpark(this);
+ +
+ // Note: see the run() function for handling of a race condition where the queue executor overwrites our parked write + LockSupport.park("flushing queue executor thread");
+ +
+ boolean interrupted = false; // preserve interrupted status + // returns whether there are tasks queued, does not return whether there are tasks executing
+ + // this is why we cycle twice twice through flush (we know a pollTask call is made after a flush cycle)
+ while (this.flushCounter == flushCounter) { + // we really only need to guarantee that the tasks this thread has queued has gone through, and can leave
+ interrupted |= Thread.interrupted(); + // tasks queued concurrently that are unsychronized with this thread as undefined behavior
+ LockSupport.park(); + if (this.queue.hasTasks()) {
+ successes = 0;
+ } else {
+ ++successes;
+ } + }
+ +
+ if (interrupted) { + } while (successes != 2);
+ Thread.currentThread().interrupt(); +
+ }
+ } + }
+ +
+ /** + /**