diff --git a/patches/api/0419-Folia-scheduler-and-owned-region-API.patch b/patches/api/0419-Folia-scheduler-and-owned-region-API.patch
new file mode 100644
index 0000000000..17626be7ae
--- /dev/null
+++ b/patches/api/0419-Folia-scheduler-and-owned-region-API.patch
@@ -0,0 +1,790 @@
+From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001
+From: Spottedleaf <Spottedleaf@users.noreply.github.com>
+Date: Sat, 17 Jun 2023 11:52:41 +0200
+Subject: [PATCH] Folia scheduler and owned region API
+
+Pulling Folia API to Paper is primarily intended for plugins
+that want to target both Paper and Folia without unnecessary
+compatibility layers.
+
+Add both a location based scheduler, an entity based scheduler,
+and a global region scheduler.
+
+Owned region API may be useful for plugins which want to perform
+operations over large areas outside of the buffer zone provided
+by the regionaliser, as it is not guaranteed that anything
+outside of the buffer zone is owned. Then, the plugins may use
+the schedulers depending on the result of the ownership check.
+
+diff --git a/src/main/java/io/papermc/paper/threadedregions/scheduler/AsyncScheduler.java b/src/main/java/io/papermc/paper/threadedregions/scheduler/AsyncScheduler.java
+new file mode 100644
+index 0000000000000000000000000000000000000000..d9cdd04660c5e60e494a8fed91ae437e6cb733ed
+--- /dev/null
++++ b/src/main/java/io/papermc/paper/threadedregions/scheduler/AsyncScheduler.java
+@@ -0,0 +1,51 @@
++package io.papermc.paper.threadedregions.scheduler;
++
++import org.bukkit.plugin.Plugin;
++import org.jetbrains.annotations.NotNull;
++
++import java.util.concurrent.TimeUnit;
++import java.util.function.Consumer;
++
++/**
++ * Scheduler that may be used by plugins to schedule tasks to execute asynchronously from the server tick process.
++ */
++public interface AsyncScheduler {
++
++    /**
++     * Schedules the specified task to be executed asynchronously immediately.
++     * @param plugin Plugin which owns the specified task.
++     * @param task Specified task.
++     * @return The {@link ScheduledTask} that represents the scheduled task.
++     */
++    @NotNull ScheduledTask runNow(@NotNull Plugin plugin, @NotNull Consumer<ScheduledTask> task);
++
++    /**
++     * Schedules the specified task to be executed asynchronously after the time delay has passed.
++     * @param plugin Plugin which owns the specified task.
++     * @param task Specified task.
++     * @param delay The time delay to pass before the task should be executed.
++     * @param unit The time unit for the time delay.
++     * @return The {@link ScheduledTask} that represents the scheduled task.
++     */
++    @NotNull ScheduledTask runDelayed(@NotNull Plugin plugin, @NotNull Consumer<ScheduledTask> task, long delay,
++                                             @NotNull TimeUnit unit);
++
++    /**
++     * Schedules the specified task to be executed asynchronously after the initial delay has passed,
++     * and then periodically executed with the specified period.
++     * @param plugin Plugin which owns the specified task.
++     * @param task Specified task.
++     * @param initialDelay The time delay to pass before the first execution of the task.
++     * @param period The time between task executions after the first execution of the task.
++     * @param unit The time unit for the initial delay and period.
++     * @return The {@link ScheduledTask} that represents the scheduled task.
++     */
++    @NotNull ScheduledTask runAtFixedRate(@NotNull Plugin plugin, @NotNull Consumer<ScheduledTask> task,
++                                                 long initialDelay, long period, @NotNull TimeUnit unit);
++
++    /**
++     * Attempts to cancel all tasks scheduled by the specified plugin.
++     * @param plugin Specified plugin.
++     */
++    void cancelTasks(@NotNull Plugin plugin);
++}
+diff --git a/src/main/java/io/papermc/paper/threadedregions/scheduler/EntityScheduler.java b/src/main/java/io/papermc/paper/threadedregions/scheduler/EntityScheduler.java
+new file mode 100644
+index 0000000000000000000000000000000000000000..9f69e189be8202a0ab1450540f5d12187ba6c987
+--- /dev/null
++++ b/src/main/java/io/papermc/paper/threadedregions/scheduler/EntityScheduler.java
+@@ -0,0 +1,104 @@
++package io.papermc.paper.threadedregions.scheduler;
++
++import org.bukkit.plugin.Plugin;
++import org.jetbrains.annotations.NotNull;
++import org.jetbrains.annotations.Nullable;
++
++import java.util.function.Consumer;
++
++/**
++ * An entity can move between worlds with an arbitrary tick delay, be temporarily removed
++ * for players (i.e end credits), be partially removed from world state (i.e inactive but not removed),
++ * teleport between ticking regions, teleport between worlds, and even be removed entirely from the server.
++ * The uncertainty of an entity's state can make it difficult to schedule tasks without worrying about undefined
++ * behaviors resulting from any of the states listed previously.
++ *
++ * <p>
++ * This class is designed to eliminate those states by providing an interface to run tasks only when an entity
++ * is contained in a world, on the owning thread for the region, and by providing the current Entity object.
++ * The scheduler also allows a task to provide a callback, the "retired" callback, that will be invoked
++ * if the entity is removed before a task that was scheduled could be executed. The scheduler is also
++ * completely thread-safe, allowing tasks to be scheduled from any thread context. The scheduler also indicates
++ * properly whether a task was scheduled successfully (i.e scheduler not retired), thus the code scheduling any task
++ * knows whether the given callbacks will be invoked eventually or not - which may be critical for off-thread
++ * contexts.
++ * </p>
++ */
++public interface EntityScheduler {
++
++    /**
++     * Schedules a task with the given delay. If the task failed to schedule because the scheduler is retired (entity
++     * removed), then returns {@code false}. Otherwise, either the run callback will be invoked after the specified delay,
++     * or the retired callback will be invoked if the scheduler is retired.
++     * Note that the retired callback is invoked in critical code, so it should not attempt to remove the entity, remove
++     * other entities, load chunks, load worlds, modify ticket levels, etc.
++     *
++     * <p>
++     * It is guaranteed that the run and retired callback are invoked on the region which owns the entity.
++     * </p>
++     * @param run The callback to run after the specified delay, may not be null.
++     * @param retired Retire callback to run if the entity is retired before the run callback can be invoked, may be null.
++     * @param delay The delay in ticks before the run callback is invoked. Any value less-than 1 is treated as 1.
++     * @return {@code true} if the task was scheduled, which means that either the run function or the retired function
++     *         will be invoked (but never both), or {@code false} indicating neither the run nor retired function will be invoked
++     *         since the scheduler has been retired.
++     */
++    boolean execute(@NotNull Plugin plugin, @NotNull Runnable run, @Nullable Runnable retired, long delay);
++
++    /**
++     * Schedules a task to execute on the next tick. If the task failed to schedule because the scheduler is retired (entity
++     * removed), then returns {@code null}. Otherwise, either the task callback will be invoked after the specified delay,
++     * or the retired callback will be invoked if the scheduler is retired.
++     * Note that the retired callback is invoked in critical code, so it should not attempt to remove the entity, remove
++     * other entities, load chunks, load worlds, modify ticket levels, etc.
++     *
++     * <p>
++     * It is guaranteed that the task and retired callback are invoked on the region which owns the entity.
++     * </p>
++     * @param plugin The plugin that owns the task
++     * @param task The task to execute
++     * @param retired Retire callback to run if the entity is retired before the run callback can be invoked, may be null.
++     * @return The {@link ScheduledTask} that represents the scheduled task, or {@code null} if the entity has been removed.
++     */
++    @Nullable ScheduledTask run(@NotNull Plugin plugin, @NotNull Consumer<ScheduledTask> task,
++                                       @Nullable Runnable retired);
++
++    /**
++     * Schedules a task with the given delay. If the task failed to schedule because the scheduler is retired (entity
++     * removed), then returns {@code null}. Otherwise, either the task callback will be invoked after the specified delay,
++     * or the retired callback will be invoked if the scheduler is retired.
++     * Note that the retired callback is invoked in critical code, so it should not attempt to remove the entity, remove
++     * other entities, load chunks, load worlds, modify ticket levels, etc.
++     *
++     * <p>
++     * It is guaranteed that the task and retired callback are invoked on the region which owns the entity.
++     * </p>
++     * @param plugin The plugin that owns the task
++     * @param task The task to execute
++     * @param retired Retire callback to run if the entity is retired before the run callback can be invoked, may be null.
++     * @param delayTicks The delay, in ticks.
++     * @return The {@link ScheduledTask} that represents the scheduled task, or {@code null} if the entity has been removed.
++     */
++    @Nullable ScheduledTask runDelayed(@NotNull Plugin plugin, @NotNull Consumer<ScheduledTask> task,
++                                              @Nullable Runnable retired, long delayTicks);
++
++    /**
++     * Schedules a repeating task with the given delay and period. If the task failed to schedule because the scheduler
++     * is retired (entity removed), then returns {@code null}. Otherwise, either the task callback will be invoked after
++     * the specified delay, or the retired callback will be invoked if the scheduler is retired.
++     * Note that the retired callback is invoked in critical code, so it should not attempt to remove the entity, remove
++     * other entities, load chunks, load worlds, modify ticket levels, etc.
++     *
++     * <p>
++     * It is guaranteed that the task and retired callback are invoked on the region which owns the entity.
++     * </p>
++     * @param plugin The plugin that owns the task
++     * @param task The task to execute
++     * @param retired Retire callback to run if the entity is retired before the run callback can be invoked, may be null.
++     * @param initialDelayTicks The initial delay, in ticks.
++     * @param periodTicks The period, in ticks.
++     * @return The {@link ScheduledTask} that represents the scheduled task, or {@code null} if the entity has been removed.
++     */
++    @Nullable ScheduledTask runAtFixedRate(@NotNull Plugin plugin, @NotNull Consumer<ScheduledTask> task,
++                                                  @Nullable Runnable retired, long initialDelayTicks, long periodTicks);
++}
+diff --git a/src/main/java/io/papermc/paper/threadedregions/scheduler/GlobalRegionScheduler.java b/src/main/java/io/papermc/paper/threadedregions/scheduler/GlobalRegionScheduler.java
+new file mode 100644
+index 0000000000000000000000000000000000000000..365b53fea8dee09cdc11f4399dea5f00c6ee70e2
+--- /dev/null
++++ b/src/main/java/io/papermc/paper/threadedregions/scheduler/GlobalRegionScheduler.java
+@@ -0,0 +1,58 @@
++package io.papermc.paper.threadedregions.scheduler;
++
++import org.bukkit.plugin.Plugin;
++import org.jetbrains.annotations.NotNull;
++
++import java.util.function.Consumer;
++
++/**
++ * The global region task scheduler may be used to schedule tasks that will execute on the global region.
++ * <p>
++ * The global region is responsible for maintaining world day time, world game time, weather cycle,
++ * sleep night skipping, executing commands for console, and other misc. tasks that do not belong to any specific region.
++ * </p>
++ */
++public interface GlobalRegionScheduler {
++
++    /**
++     * Schedules a task to be executed on the global region.
++     * @param plugin The plugin that owns the task
++     * @param run The task to execute
++     */
++    void execute(@NotNull Plugin plugin, @NotNull Runnable run);
++
++    /**
++     * Schedules a task to be executed on the global region on the next tick.
++     * @param plugin The plugin that owns the task
++     * @param task The task to execute
++     * @return The {@link ScheduledTask} that represents the scheduled task.
++     */
++    @NotNull ScheduledTask run(@NotNull Plugin plugin, @NotNull Consumer<ScheduledTask> task);
++
++    /**
++     * Schedules a task to be executed on the global region after the specified delay in ticks.
++     * @param plugin The plugin that owns the task
++     * @param task The task to execute
++     * @param delayTicks The delay, in ticks.
++     * @return The {@link ScheduledTask} that represents the scheduled task.
++     */
++    @NotNull ScheduledTask runDelayed(@NotNull Plugin plugin, @NotNull Consumer<ScheduledTask> task, long delayTicks);
++
++    /**
++     * Schedules a repeating task to be executed on the global region after the initial delay with the
++     * specified period.
++     * @param plugin The plugin that owns the task
++     * @param task The task to execute
++     * @param initialDelayTicks The initial delay, in ticks.
++     * @param periodTicks The period, in ticks.
++     * @return The {@link ScheduledTask} that represents the scheduled task.
++     */
++    @NotNull ScheduledTask runAtFixedRate(@NotNull Plugin plugin, @NotNull Consumer<ScheduledTask> task,
++                                                 long initialDelayTicks, long periodTicks);
++
++    /**
++     * Attempts to cancel all tasks scheduled by the specified plugin.
++     * @param plugin Specified plugin.
++     */
++     void cancelTasks(@NotNull Plugin plugin);
++}
+diff --git a/src/main/java/io/papermc/paper/threadedregions/scheduler/RegionScheduler.java b/src/main/java/io/papermc/paper/threadedregions/scheduler/RegionScheduler.java
+new file mode 100644
+index 0000000000000000000000000000000000000000..7557e170f84cde7292869fbd92b44b0e6eb43b4f
+--- /dev/null
++++ b/src/main/java/io/papermc/paper/threadedregions/scheduler/RegionScheduler.java
+@@ -0,0 +1,127 @@
++package io.papermc.paper.threadedregions.scheduler;
++
++import org.bukkit.Location;
++import org.bukkit.World;
++import org.bukkit.entity.Entity;
++import org.bukkit.plugin.Plugin;
++import org.jetbrains.annotations.NotNull;
++
++import java.util.function.Consumer;
++
++/**
++ * The region task scheduler can be used to schedule tasks by location to be executed on the region which owns the location.
++ * <p>
++ * <b>Note</b>: It is entirely inappropriate to use the region scheduler to schedule tasks for entities.
++ * If you wish to schedule tasks to perform actions on entities, you should be using {@link Entity#getScheduler()}
++ * as the entity scheduler will "follow" an entity if it is teleported, whereas the region task scheduler
++ * will not.
++ * </p>
++ */
++public interface RegionScheduler {
++
++    /**
++     * Schedules a task to be executed on the region which owns the location.
++     *
++     * @param plugin The plugin that owns the task
++     * @param world  The world of the region that owns the task
++     * @param chunkX The chunk X coordinate of the region that owns the task
++     * @param chunkZ The chunk Z coordinate of the region that owns the task
++     * @param run    The task to execute
++     */
++    void execute(@NotNull Plugin plugin, @NotNull World world, int chunkX, int chunkZ, @NotNull Runnable run);
++
++    /**
++     * Schedules a task to be executed on the region which owns the location.
++     *
++     * @param plugin   The plugin that owns the task
++     * @param location The location at which the region executing should own
++     * @param run      The task to execute
++     */
++    default void execute(@NotNull Plugin plugin, @NotNull Location location, @NotNull Runnable run) {
++        this.execute(plugin, location.getWorld(), location.getBlockX() >> 4, location.getBlockZ() >> 4, run);
++    }
++
++    /**
++     * Schedules a task to be executed on the region which owns the location on the next tick.
++     *
++     * @param plugin The plugin that owns the task
++     * @param world  The world of the region that owns the task
++     * @param chunkX The chunk X coordinate of the region that owns the task
++     * @param chunkZ The chunk Z coordinate of the region that owns the task
++     * @param task   The task to execute
++     * @return The {@link ScheduledTask} that represents the scheduled task.
++     */
++    @NotNull ScheduledTask run(@NotNull Plugin plugin, @NotNull World world, int chunkX, int chunkZ, @NotNull Consumer<ScheduledTask> task);
++
++    /**
++     * Schedules a task to be executed on the region which owns the location on the next tick.
++     *
++     * @param plugin   The plugin that owns the task
++     * @param location The location at which the region executing should own
++     * @param task     The task to execute
++     * @return The {@link ScheduledTask} that represents the scheduled task.
++     */
++    default @NotNull ScheduledTask run(@NotNull Plugin plugin, @NotNull Location location, @NotNull Consumer<ScheduledTask> task) {
++        return this.run(plugin, location.getWorld(), location.getBlockX() >> 4, location.getBlockZ() >> 4, task);
++    }
++
++    /**
++     * Schedules a task to be executed on the region which owns the location after the specified delay in ticks.
++     *
++     * @param plugin     The plugin that owns the task
++     * @param world      The world of the region that owns the task
++     * @param chunkX     The chunk X coordinate of the region that owns the task
++     * @param chunkZ     The chunk Z coordinate of the region that owns the task
++     * @param task       The task to execute
++     * @param delayTicks The delay, in ticks.
++     * @return The {@link ScheduledTask} that represents the scheduled task.
++     */
++    @NotNull ScheduledTask runDelayed(@NotNull Plugin plugin, @NotNull World world, int chunkX, int chunkZ, @NotNull Consumer<ScheduledTask> task,
++                                      long delayTicks);
++
++    /**
++     * Schedules a task to be executed on the region which owns the location after the specified delay in ticks.
++     *
++     * @param plugin     The plugin that owns the task
++     * @param location   The location at which the region executing should own
++     * @param task       The task to execute
++     * @param delayTicks The delay, in ticks.
++     * @return The {@link ScheduledTask} that represents the scheduled task.
++     */
++    default @NotNull ScheduledTask runDelayed(@NotNull Plugin plugin, @NotNull Location location, @NotNull Consumer<ScheduledTask> task,
++                                              long delayTicks) {
++        return this.runDelayed(plugin, location.getWorld(), location.getBlockX() >> 4, location.getBlockZ() >> 4, task, delayTicks);
++    }
++
++    /**
++     * Schedules a repeating task to be executed on the region which owns the location after the initial delay with the
++     * specified period.
++     *
++     * @param plugin            The plugin that owns the task
++     * @param world             The world of the region that owns the task
++     * @param chunkX            The chunk X coordinate of the region that owns the task
++     * @param chunkZ            The chunk Z coordinate of the region that owns the task
++     * @param task              The task to execute
++     * @param initialDelayTicks The initial delay, in ticks.
++     * @param periodTicks       The period, in ticks.
++     * @return The {@link ScheduledTask} that represents the scheduled task.
++     */
++    @NotNull ScheduledTask runAtFixedRate(@NotNull Plugin plugin, @NotNull World world, int chunkX, int chunkZ, @NotNull Consumer<ScheduledTask> task,
++                                          long initialDelayTicks, long periodTicks);
++
++    /**
++     * Schedules a repeating task to be executed on the region which owns the location after the initial delay with the
++     * specified period.
++     *
++     * @param plugin            The plugin that owns the task
++     * @param location          The location at which the region executing should own
++     * @param task              The task to execute
++     * @param initialDelayTicks The initial delay, in ticks.
++     * @param periodTicks       The period, in ticks.
++     * @return The {@link ScheduledTask} that represents the scheduled task.
++     */
++    default @NotNull ScheduledTask runAtFixedRate(@NotNull Plugin plugin, @NotNull Location location, @NotNull Consumer<ScheduledTask> task,
++                                                  long initialDelayTicks, long periodTicks) {
++        return this.runAtFixedRate(plugin, location.getWorld(), location.getBlockX() >> 4, location.getBlockZ() >> 4, task, initialDelayTicks, periodTicks);
++    }
++}
+diff --git a/src/main/java/io/papermc/paper/threadedregions/scheduler/ScheduledTask.java b/src/main/java/io/papermc/paper/threadedregions/scheduler/ScheduledTask.java
+new file mode 100644
+index 0000000000000000000000000000000000000000..a6b50c9d8af589cc4747e14d343d2045216c249c
+--- /dev/null
++++ b/src/main/java/io/papermc/paper/threadedregions/scheduler/ScheduledTask.java
+@@ -0,0 +1,112 @@
++package io.papermc.paper.threadedregions.scheduler;
++
++import org.bukkit.plugin.Plugin;
++import org.jetbrains.annotations.NotNull;
++
++/**
++ * Represents a task scheduled to a scheduler.
++ */
++public interface ScheduledTask {
++
++    /**
++     * Returns the plugin that scheduled this task.
++     * @return the plugin that scheduled this task.
++     */
++    @NotNull Plugin getOwningPlugin();
++
++    /**
++     * Returns whether this task executes on a fixed period, as opposed to executing only once.
++     * @return whether this task executes on a fixed period, as opposed to executing only once.
++     */
++    boolean isRepeatingTask();
++
++    /**
++     * Attempts to cancel this task, returning the result of the attempt. In all cases, if the task is currently
++     * being executed no attempt is made to halt the task, however any executions in the future are halted.
++     * @return the result of the cancellation attempt.
++     */
++    @NotNull CancelledState cancel();
++
++    /**
++     * Returns the current execution state of this task.
++     * @return the current execution state of this task.
++     */
++    @NotNull ExecutionState getExecutionState();
++
++    /**
++     * Returns whether the current execution state is {@link ExecutionState#CANCELLED} or {@link ExecutionState#CANCELLED_RUNNING}.
++     * @return whether the current execution state is {@link ExecutionState#CANCELLED} or {@link ExecutionState#CANCELLED_RUNNING}.
++     */
++    default boolean isCancelled() {
++        final ExecutionState state = this.getExecutionState();
++        return state == ExecutionState.CANCELLED || state == ExecutionState.CANCELLED_RUNNING;
++    }
++
++    /**
++     * Represents the result of attempting to cancel a task.
++     */
++    enum CancelledState {
++        /**
++         * The task (repeating or not) has been successfully cancelled by the caller thread. The task is not executing
++         * currently, and it will not begin execution in the future.
++         */
++        CANCELLED_BY_CALLER,
++        /**
++         * The task (repeating or not) is already cancelled. The task is not executing currently, and it will not
++         * begin execution in the future.
++         */
++        CANCELLED_ALREADY,
++
++        /**
++         * The task is not a repeating task, and could not be cancelled because the task is being executed.
++         */
++        RUNNING,
++        /**
++         * The task is not a repeating task, and could not be cancelled because the task has already finished execution.
++         */
++        ALREADY_EXECUTED,
++
++        /**
++         * The caller thread successfully stopped future executions of a repeating task, but the task is currently
++         * being executed.
++         */
++        NEXT_RUNS_CANCELLED,
++
++        /**
++         * The repeating task's future executions are cancelled already, but the task is currently
++         * being executed.
++         */
++        NEXT_RUNS_CANCELLED_ALREADY,
++    }
++
++    /**
++     * Represents the current execution state of the task.
++     */
++    enum ExecutionState {
++        /**
++         * The task is currently not executing, but may begin execution in the future.
++         */
++        IDLE,
++
++        /**
++         * The task is currently executing.
++         */
++        RUNNING,
++
++        /**
++         * The task is not repeating, and the task finished executing.
++         */
++        FINISHED,
++
++        /**
++         * The task is not executing and will not begin execution in the future. If this task is not repeating, then
++         * this task was never executed.
++         */
++        CANCELLED,
++
++        /**
++         * The task is repeating and currently executing, but future executions are cancelled and will not occur.
++         */
++        CANCELLED_RUNNING;
++    }
++}
+diff --git a/src/main/java/org/bukkit/Bukkit.java b/src/main/java/org/bukkit/Bukkit.java
+index ef36d793ab77c7b7208f8f5994815599cff470d1..1ab3d8aee3101ca08d9adf5d9ff9f83de911594c 100644
+--- a/src/main/java/org/bukkit/Bukkit.java
++++ b/src/main/java/org/bukkit/Bukkit.java
+@@ -2496,6 +2496,141 @@ public final class Bukkit {
+     }
+     // Paper end
+ 
++    // Paper start - Folia region threading API
++    /**
++     * Returns the region task scheduler. The region task scheduler can be used to schedule
++     * tasks by location to be executed on the region which owns the location.
++     * <p>
++     * <b>Note</b>: It is entirely inappropriate to use the region scheduler to schedule tasks for entities.
++     * If you wish to schedule tasks to perform actions on entities, you should be using {@link Entity#getScheduler()}
++     * as the entity scheduler will "follow" an entity if it is teleported, whereas the region task scheduler
++     * will not.
++     * </p>
++     * <p><b>If you do not need/want to make your plugin run on Folia, use {@link #getScheduler()} instead.</b></p>
++     * @return the region task scheduler
++     */
++    public static @NotNull io.papermc.paper.threadedregions.scheduler.RegionScheduler getRegionScheduler() {
++        return server.getRegionScheduler();
++    }
++
++    /**
++     * Returns the async task scheduler. The async task scheduler can be used to schedule tasks
++     * that execute asynchronously from the server tick process.
++     * @return the async task scheduler
++     */
++    public static @NotNull io.papermc.paper.threadedregions.scheduler.AsyncScheduler getAsyncScheduler() {
++        return server.getAsyncScheduler();
++    }
++
++    /**
++     * Returns the global region task scheduler. The global task scheduler can be used to schedule
++     * tasks to execute on the global region.
++     * <p>
++     * The global region is responsible for maintaining world day time, world game time, weather cycle,
++     * sleep night skipping, executing commands for console, and other misc. tasks that do not belong to any specific region.
++     * </p>
++     * <p><b>If you do not need/want to make your plugin run on Folia, use {@link #getScheduler()} instead.</b></p>
++     * @return the global region scheduler
++     */
++    public static @NotNull io.papermc.paper.threadedregions.scheduler.GlobalRegionScheduler getGlobalRegionScheduler() {
++        return server.getGlobalRegionScheduler();
++    }
++
++    /**
++     * Returns whether the current thread is ticking a region and that the region being ticked
++     * owns the chunk at the specified world and block position.
++     * @param world Specified world.
++     * @param position Specified block position.
++     */
++    public static boolean isOwnedByCurrentRegion(@NotNull World world, @NotNull io.papermc.paper.math.Position position) {
++        return server.isOwnedByCurrentRegion(world, position);
++    }
++
++    /**
++     * Returns whether the current thread is ticking a region and that the region being ticked
++     * owns the chunks centered at the specified block position within the specified square radius.
++     * Specifically, this function checks that every chunk with position x in [centerX - radius, centerX + radius] and
++     * position z in [centerZ - radius, centerZ + radius] is owned by the current ticking region.
++     * @param world Specified world.
++     * @param position Specified block position.
++     * @param squareRadiusChunks Specified square radius. Must be >= 0. Note that this parameter is <i>not</i> a <i>squared</i>
++     *                           radius, but rather a <i>Chebyshev Distance</i>.
++     */
++    public static boolean isOwnedByCurrentRegion(@NotNull World world, @NotNull io.papermc.paper.math.Position position, int squareRadiusChunks) {
++        return server.isOwnedByCurrentRegion(world, position, squareRadiusChunks);
++    }
++
++    /**
++     * Returns whether the current thread is ticking a region and that the region being ticked
++     * owns the chunk at the specified world and block position as included in the specified location.
++     * @param location Specified location, must have a non-null world.
++     */
++    public static boolean isOwnedByCurrentRegion(@NotNull Location location) {
++        return server.isOwnedByCurrentRegion(location);
++    }
++
++    /**
++     * Returns whether the current thread is ticking a region and that the region being ticked
++     * owns the chunks centered at the specified world and block position as included in the specified location
++     * within the specified square radius.
++     * Specifically, this function checks that every chunk with position x in [centerX - radius, centerX + radius] and
++     * position z in [centerZ - radius, centerZ + radius] is owned by the current ticking region.
++     * @param location Specified location, must have a non-null world.
++     * @param squareRadiusChunks Specified square radius. Must be >= 0. Note that this parameter is <i>not</i> a <i>squared</i>
++     *                           radius, but rather a <i>Chebyshev Distance</i>.
++     */
++    public static boolean isOwnedByCurrentRegion(@NotNull Location location, int squareRadiusChunks) {
++        return server.isOwnedByCurrentRegion(location, squareRadiusChunks);
++    }
++
++    /**
++     * Returns whether the current thread is ticking a region and that the region being ticked
++     * owns the chunk at the specified block position.
++     * @param block Specified block position.
++     */
++    public static boolean isOwnedByCurrentRegion(@NotNull org.bukkit.block.Block block) {
++        return server.isOwnedByCurrentRegion(block.getLocation());
++    }
++
++    /**
++     * Returns whether the current thread is ticking a region and that the region being ticked
++     * owns the chunk at the specified world and chunk position.
++     * @param world Specified world.
++     * @param chunkX Specified x-coordinate of the chunk position.
++     * @param chunkZ Specified z-coordinate of the chunk position.
++     */
++    public static boolean isOwnedByCurrentRegion(@NotNull World world, int chunkX, int chunkZ) {
++        return server.isOwnedByCurrentRegion(world, chunkX, chunkZ);
++    }
++
++    /**
++     * Returns whether the current thread is ticking a region and that the region being ticked
++     * owns the chunks centered at the specified world and chunk position within the specified
++     * square radius.
++     * Specifically, this function checks that every chunk with position x in [centerX - radius, centerX + radius] and
++     * position z in [centerZ - radius, centerZ + radius] is owned by the current ticking region.
++     * @param world Specified world.
++     * @param chunkX Specified x-coordinate of the chunk position.
++     * @param chunkZ Specified z-coordinate of the chunk position.
++     * @param squareRadiusChunks Specified square radius. Must be >= 0. Note that this parameter is <i>not</i> a <i>squared</i>
++     *                           radius, but rather a <i>Chebyshev Distance</i>.
++     */
++    public static boolean isOwnedByCurrentRegion(@NotNull World world, int chunkX, int chunkZ, int squareRadiusChunks) {
++        return server.isOwnedByCurrentRegion(world, chunkX, chunkZ, squareRadiusChunks);
++    }
++
++    /**
++     * Returns whether the current thread is ticking a region and that the region being ticked
++     * owns the specified entity. Note that this function is the only appropriate method of checking
++     * for ownership of an entity, as retrieving the entity's location is undefined unless the entity is owned
++     * by the current region.
++     * @param entity Specified entity.
++     */
++    public static boolean isOwnedByCurrentRegion(@NotNull Entity entity) {
++        return server.isOwnedByCurrentRegion(entity);
++    }
++    // Paper end - Folia region threading API
++
+     @NotNull
+     public static Server.Spigot spigot() {
+         return server.spigot();
+diff --git a/src/main/java/org/bukkit/Server.java b/src/main/java/org/bukkit/Server.java
+index 5b225bbb128893d67251a96ab318035802a0cf76..96a267a676b41dc10f7b18ead826e45c5f6db425 100644
+--- a/src/main/java/org/bukkit/Server.java
++++ b/src/main/java/org/bukkit/Server.java
+@@ -2175,4 +2175,119 @@ public interface Server extends PluginMessageRecipient, net.kyori.adventure.audi
+      */
+     @NotNull org.bukkit.potion.PotionBrewer getPotionBrewer();
+     // Paper end
++
++    // Paper start - Folia region threading API
++    /**
++     * Returns the Folia region task scheduler. The region task scheduler can be used to schedule
++     * tasks by location to be executed on the region which owns the location.
++     * <p>
++     * <b>Note</b>: It is entirely inappropriate to use the region scheduler to schedule tasks for entities.
++     * If you wish to schedule tasks to perform actions on entities, you should be using {@link Entity#getScheduler()}
++     * as the entity scheduler will "follow" an entity if it is teleported, whereas the region task scheduler
++     * will not.
++     * </p>
++     * <p><b>If you do not need/want to make your plugin run on Folia, use {@link #getScheduler()} instead.</b></p>
++     * @return the region task scheduler
++     */
++    @NotNull io.papermc.paper.threadedregions.scheduler.RegionScheduler getRegionScheduler();
++
++    /**
++     * Returns the Folia async task scheduler. The async task scheduler can be used to schedule tasks
++     * that execute asynchronously from the server tick process.
++     * @return the async task scheduler
++     */
++    @NotNull io.papermc.paper.threadedregions.scheduler.AsyncScheduler getAsyncScheduler();
++
++    /**
++     * Returns the Folia global region task scheduler. The global task scheduler can be used to schedule
++     * tasks to execute on the global region.
++     * <p>
++     * The global region is responsible for maintaining world day time, world game time, weather cycle,
++     * sleep night skipping, executing commands for console, and other misc. tasks that do not belong to any specific region.
++     * </p>
++     * <p><b>If you do not need/want to make your plugin run on Folia, use {@link #getScheduler()} instead.</b></p>
++     * @return the global region scheduler
++     */
++    @NotNull io.papermc.paper.threadedregions.scheduler.GlobalRegionScheduler getGlobalRegionScheduler();
++
++    /**
++     * Returns whether the current thread is ticking a region and that the region being ticked
++     * owns the chunk at the specified world and block position.
++     * @param world Specified world.
++     * @param position Specified block position.
++     */
++    boolean isOwnedByCurrentRegion(@NotNull World world, @NotNull io.papermc.paper.math.Position position);
++
++    /**
++     * Returns whether the current thread is ticking a region and that the region being ticked
++     * owns the chunks centered at the specified block position within the specified square radius.
++     * Specifically, this function checks that every chunk with position x in [centerX - radius, centerX + radius] and
++     * position z in [centerZ - radius, centerZ + radius] is owned by the current ticking region.
++     * @param world Specified world.
++     * @param position Specified block position.
++     * @param squareRadiusChunks Specified square radius. Must be >= 0. Note that this parameter is <i>not</i> a <i>squared</i>
++     *                           radius, but rather a <i>Chebyshev Distance</i>.
++     */
++    boolean isOwnedByCurrentRegion(@NotNull World world, @NotNull io.papermc.paper.math.Position position, int squareRadiusChunks);
++
++    /**
++     * Returns whether the current thread is ticking a region and that the region being ticked
++     * owns the chunk at the specified world and block position as included in the specified location.
++     * @param location Specified location, must have a non-null world.
++     */
++    boolean isOwnedByCurrentRegion(@NotNull Location location);
++
++    /**
++     * Returns whether the current thread is ticking a region and that the region being ticked
++     * owns the chunks centered at the specified world and block position as included in the specified location
++     * within the specified square radius.
++     * Specifically, this function checks that every chunk with position x in [centerX - radius, centerX + radius] and
++     * position z in [centerZ - radius, centerZ + radius] is owned by the current ticking region.
++     * @param location Specified location, must have a non-null world.
++     * @param squareRadiusChunks Specified square radius. Must be >= 0. Note that this parameter is <i>not</i> a <i>squared</i>
++     *                           radius, but rather a <i>Chebyshev Distance</i>.
++     */
++    boolean isOwnedByCurrentRegion(@NotNull Location location, int squareRadiusChunks);
++
++    /**
++     * Returns whether the current thread is ticking a region and that the region being ticked
++     * owns the chunk at the specified block position.
++     * @param block Specified block position.
++     */
++    default boolean isOwnedByCurrentRegion(@NotNull org.bukkit.block.Block block) {
++        return isOwnedByCurrentRegion(block.getLocation());
++    }
++
++    /**
++     * Returns whether the current thread is ticking a region and that the region being ticked
++     * owns the chunk at the specified world and chunk position.
++     * @param world Specified world.
++     * @param chunkX Specified x-coordinate of the chunk position.
++     * @param chunkZ Specified z-coordinate of the chunk position.
++     */
++    boolean isOwnedByCurrentRegion(@NotNull World world, int chunkX, int chunkZ);
++
++    /**
++     * Returns whether the current thread is ticking a region and that the region being ticked
++     * owns the chunks centered at the specified world and chunk position within the specified
++     * square radius.
++     * Specifically, this function checks that every chunk with position x in [centerX - radius, centerX + radius] and
++     * position z in [centerZ - radius, centerZ + radius] is owned by the current ticking region.
++     * @param world Specified world.
++     * @param chunkX Specified x-coordinate of the chunk position.
++     * @param chunkZ Specified z-coordinate of the chunk position.
++     * @param squareRadiusChunks Specified square radius. Must be >= 0. Note that this parameter is <i>not</i> a <i>squared</i>
++     *                           radius, but rather a <i>Chebyshev Distance</i>.
++     */
++    boolean isOwnedByCurrentRegion(@NotNull World world, int chunkX, int chunkZ, int squareRadiusChunks);
++
++    /**
++     * Returns whether the current thread is ticking a region and that the region being ticked
++     * owns the specified entity. Note that this function is the only appropriate method of checking
++     * for ownership of an entity, as retrieving the entity's location is undefined unless the entity is owned
++     * by the current region.
++     * @param entity Specified entity.
++     */
++    boolean isOwnedByCurrentRegion(@NotNull Entity entity);
++    // Paper end - Folia region threading API
+ }
+diff --git a/src/main/java/org/bukkit/entity/Entity.java b/src/main/java/org/bukkit/entity/Entity.java
+index a2a423d4e4c2702ba5967223cab0432dd7d04732..6b842453589cf148ab32c1507cf374056826316e 100644
+--- a/src/main/java/org/bukkit/entity/Entity.java
++++ b/src/main/java/org/bukkit/entity/Entity.java
+@@ -954,4 +954,15 @@ public interface Entity extends Metadatable, CommandSender, Nameable, Persistent
+      */
+     boolean wouldCollideUsing(@NotNull BoundingBox boundingBox);
+     // Paper End - Collision API
++
++    // Paper start - Folia schedulers
++    /**
++     * Returns the task scheduler for this entity. The entity scheduler can be used to schedule tasks
++     * that are guaranteed to always execute on the tick thread that owns the entity.
++     * <p><b>If you do not need/want to make your plugin run on Folia, use {@link org.bukkit.Server#getScheduler()} instead.</b></p>
++     * @return the task scheduler for this entity.
++     * @see io.papermc.paper.threadedregions.scheduler.EntityScheduler
++     */
++    @NotNull io.papermc.paper.threadedregions.scheduler.EntityScheduler getScheduler();
++    // Paper end - Folia schedulers
+ }
diff --git a/patches/server/0977-Folia-scheduler-and-owned-region-API.patch b/patches/server/0977-Folia-scheduler-and-owned-region-API.patch
new file mode 100644
index 0000000000..dd099b06ef
--- /dev/null
+++ b/patches/server/0977-Folia-scheduler-and-owned-region-API.patch
@@ -0,0 +1,1337 @@
+From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001
+From: Spottedleaf <Spottedleaf@users.noreply.github.com>
+Date: Sat, 17 Jun 2023 11:52:52 +0200
+Subject: [PATCH] Folia scheduler and owned region API
+
+Pulling Folia API to Paper is primarily intended for plugins
+that want to target both Paper and Folia without unnecessary
+compatibility layers.
+
+Add both a location based scheduler, an entity based scheduler,
+and a global region scheduler.
+
+Owned region API may be useful for plugins which want to perform
+operations over large areas outside of the buffer zone provided
+by the regionaliser, as it is not guaranteed that anything
+outside of the buffer zone is owned. Then, the plugins may use
+the schedulers depending on the result of the ownership check.
+
+diff --git a/src/main/java/io/papermc/paper/threadedregions/EntityScheduler.java b/src/main/java/io/papermc/paper/threadedregions/EntityScheduler.java
+new file mode 100644
+index 0000000000000000000000000000000000000000..62484ebf4550b05182f693a3180bbac5d5fd906d
+--- /dev/null
++++ b/src/main/java/io/papermc/paper/threadedregions/EntityScheduler.java
+@@ -0,0 +1,181 @@
++package io.papermc.paper.threadedregions;
++
++import ca.spottedleaf.concurrentutil.util.Validate;
++import io.papermc.paper.util.TickThread;
++import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
++import net.minecraft.world.entity.Entity;
++import org.bukkit.craftbukkit.entity.CraftEntity;
++
++import java.util.ArrayDeque;
++import java.util.ArrayList;
++import java.util.List;
++import java.util.function.Consumer;
++
++/**
++ * An entity can move between worlds with an arbitrary tick delay, be temporarily removed
++ * for players (i.e end credits), be partially removed from world state (i.e inactive but not removed),
++ * teleport between ticking regions, teleport between worlds (which will change the underlying Entity object
++ * for non-players), and even be removed entirely from the server. The uncertainty of an entity's state can make
++ * it difficult to schedule tasks without worrying about undefined behaviors resulting from any of the states listed
++ * previously.
++ *
++ * <p>
++ * This class is designed to eliminate those states by providing an interface to run tasks only when an entity
++ * is contained in a world, on the owning thread for the region, and by providing the current Entity object.
++ * The scheduler also allows a task to provide a callback, the "retired" callback, that will be invoked
++ * if the entity is removed before a task that was scheduled could be executed. The scheduler is also
++ * completely thread-safe, allowing tasks to be scheduled from any thread context. The scheduler also indicates
++ * properly whether a task was scheduled successfully (i.e scheduler not retired), thus the code scheduling any task
++ * knows whether the given callbacks will be invoked eventually or not - which may be critical for off-thread
++ * contexts.
++ * </p>
++ */
++public final class EntityScheduler {
++
++    /**
++     * The Entity. Note that it is the CraftEntity, since only that class properly tracks world transfers.
++     */
++    public final CraftEntity entity;
++
++    private static final record ScheduledTask(Consumer<? extends Entity> run, Consumer<? extends Entity> retired) {}
++
++    private long tickCount = 0L;
++    private static final long RETIRED_TICK_COUNT = -1L;
++    private final Object stateLock = new Object();
++    private final Long2ObjectOpenHashMap<List<ScheduledTask>> oneTimeDelayed = new Long2ObjectOpenHashMap<>();
++
++    private final ArrayDeque<ScheduledTask> currentlyExecuting = new ArrayDeque<>();
++
++    public EntityScheduler(final CraftEntity entity) {
++        this.entity = Validate.notNull(entity);
++    }
++
++    /**
++     * Retires the scheduler, preventing new tasks from being scheduled and invoking the retired callback
++     * on all currently scheduled tasks.
++     *
++     * <p>
++     * Note: This should only be invoked after synchronously removing the entity from the world.
++     * </p>
++     *
++     * @throws IllegalStateException If the scheduler is already retired.
++     */
++    public void retire() {
++        synchronized (this.stateLock) {
++            if (this.tickCount == RETIRED_TICK_COUNT) {
++                throw new IllegalStateException("Already retired");
++            }
++            this.tickCount = RETIRED_TICK_COUNT;
++        }
++
++        final Entity thisEntity = this.entity.getHandleRaw();
++
++        // correctly handle and order retiring while running executeTick
++        for (int i = 0, len = this.currentlyExecuting.size(); i < len; ++i) {
++            final ScheduledTask task = this.currentlyExecuting.pollFirst();
++            final Consumer<Entity> retireTask = (Consumer<Entity>)task.retired;
++            if (retireTask == null) {
++                continue;
++            }
++
++            retireTask.accept(thisEntity);
++        }
++
++        for (final List<ScheduledTask> tasks : this.oneTimeDelayed.values()) {
++            for (int i = 0, len = tasks.size(); i < len; ++i) {
++                final ScheduledTask task = tasks.get(i);
++                final Consumer<Entity> retireTask = (Consumer<Entity>)task.retired;
++                if (retireTask == null) {
++                    continue;
++                }
++
++                retireTask.accept(thisEntity);
++            }
++        }
++    }
++
++    /**
++     * Schedules a task with the given delay. If the task failed to schedule because the scheduler is retired (entity
++     * removed), then returns {@code false}. Otherwise, either the run callback will be invoked after the specified delay,
++     * or the retired callback will be invoked if the scheduler is retired.
++     * Note that the retired callback is invoked in critical code, so it should not attempt to remove the entity, remove
++     * other entities, load chunks, load worlds, modify ticket levels, etc.
++     *
++     * <p>
++     * It is guaranteed that the run and retired callback are invoked on the region which owns the entity.
++     * </p>
++     * <p>
++     * The run and retired callback take an Entity parameter representing the current object entity that the scheduler
++     * is tied to. Since the scheduler is transferred when an entity changes dimensions, it is possible the entity parameter
++     * is not the same when the task was first scheduled. Thus, <b>only</b> the parameter provided should be used.
++     * </p>
++     * @param run The callback to run after the specified delay, may not be null.
++     * @param retired Retire callback to run if the entity is retired before the run callback can be invoked, may be null.
++     * @param delay The delay in ticks before the run callback is invoked. Any value less-than 1 is treated as 1.
++     * @return {@code true} if the task was scheduled, which means that either the run function or the retired function
++     *         will be invoked (but never both), or {@code false} indicating neither the run nor retired function will be invoked
++     *         since the scheduler has been retired.
++     */
++    public boolean schedule(final Consumer<? extends Entity> run, final Consumer<? extends Entity> retired, final long delay) {
++        Validate.notNull(run, "Run task may not be null");
++
++        final ScheduledTask task = new ScheduledTask(run, retired);
++        synchronized (this.stateLock) {
++            if (this.tickCount == RETIRED_TICK_COUNT) {
++                return false;
++            }
++            this.oneTimeDelayed.computeIfAbsent(this.tickCount + Math.max(1L, delay), (final long keyInMap) -> {
++                return new ArrayList<>();
++            }).add(task);
++        }
++
++        return true;
++    }
++
++    /**
++     * Executes a tick for the scheduler.
++     *
++     * @throws IllegalStateException If the scheduler is retired.
++     */
++    public void executeTick() {
++        final Entity thisEntity = this.entity.getHandleRaw();
++
++        TickThread.ensureTickThread(thisEntity, "May not tick entity scheduler asynchronously");
++        final List<ScheduledTask> toRun;
++        synchronized (this.stateLock) {
++            if (this.tickCount == RETIRED_TICK_COUNT) {
++                throw new IllegalStateException("Ticking retired scheduler");
++            }
++            ++this.tickCount;
++            if (this.oneTimeDelayed.isEmpty()) {
++                toRun = null;
++            } else {
++                toRun = this.oneTimeDelayed.remove(this.tickCount);
++            }
++        }
++
++        if (toRun != null) {
++            for (int i = 0, len = toRun.size(); i < len; ++i) {
++                this.currentlyExecuting.addLast(toRun.get(i));
++            }
++        }
++
++        // Note: It is allowed for the tasks executed to retire the entity in a given task.
++        for (int i = 0, len = this.currentlyExecuting.size(); i < len; ++i) {
++            if (!TickThread.isTickThreadFor(thisEntity)) {
++                // tp has been queued sync by one of the tasks
++                // in this case, we need to delay the tasks for next tick
++                break;
++            }
++            final ScheduledTask task = this.currentlyExecuting.pollFirst();
++
++            if (this.tickCount != RETIRED_TICK_COUNT) {
++                ((Consumer<Entity>)task.run).accept(thisEntity);
++            } else {
++                // retired synchronously
++                // note: here task is null
++                break;
++            }
++        }
++    }
++}
+diff --git a/src/main/java/io/papermc/paper/threadedregions/scheduler/FallbackRegionScheduler.java b/src/main/java/io/papermc/paper/threadedregions/scheduler/FallbackRegionScheduler.java
+new file mode 100644
+index 0000000000000000000000000000000000000000..94056d61a304ee012ae1828a33412516095f996f
+--- /dev/null
++++ b/src/main/java/io/papermc/paper/threadedregions/scheduler/FallbackRegionScheduler.java
+@@ -0,0 +1,30 @@
++package io.papermc.paper.threadedregions.scheduler;
++
++import org.bukkit.World;
++import org.bukkit.plugin.Plugin;
++import org.jetbrains.annotations.NotNull;
++
++import java.util.function.Consumer;
++
++public final class FallbackRegionScheduler implements RegionScheduler {
++
++    @Override
++    public void execute(@NotNull final Plugin plugin, @NotNull final World world, final int chunkX, final int chunkZ, @NotNull final Runnable run) {
++        plugin.getServer().getGlobalRegionScheduler().execute(plugin, run);
++    }
++
++    @Override
++    public @NotNull ScheduledTask run(@NotNull final Plugin plugin, @NotNull final World world, final int chunkX, final int chunkZ, @NotNull final Consumer<ScheduledTask> task) {
++        return plugin.getServer().getGlobalRegionScheduler().run(plugin, task);
++    }
++
++    @Override
++    public @NotNull ScheduledTask runDelayed(@NotNull final Plugin plugin, @NotNull final World world, final int chunkX, final int chunkZ, @NotNull final Consumer<ScheduledTask> task, final long delayTicks) {
++        return plugin.getServer().getGlobalRegionScheduler().runDelayed(plugin, task, delayTicks);
++    }
++
++    @Override
++    public @NotNull ScheduledTask runAtFixedRate(@NotNull final Plugin plugin, @NotNull final World world, final int chunkX, final int chunkZ, @NotNull final Consumer<ScheduledTask> task, final long initialDelayTicks, final long periodTicks) {
++        return plugin.getServer().getGlobalRegionScheduler().runAtFixedRate(plugin, task, initialDelayTicks, periodTicks);
++    }
++}
+diff --git a/src/main/java/io/papermc/paper/threadedregions/scheduler/FoliaAsyncScheduler.java b/src/main/java/io/papermc/paper/threadedregions/scheduler/FoliaAsyncScheduler.java
+new file mode 100644
+index 0000000000000000000000000000000000000000..374abffb9f1ce1a308822aed13038e77fe9ca08b
+--- /dev/null
++++ b/src/main/java/io/papermc/paper/threadedregions/scheduler/FoliaAsyncScheduler.java
+@@ -0,0 +1,328 @@
++package io.papermc.paper.threadedregions.scheduler;
++
++import ca.spottedleaf.concurrentutil.util.Validate;
++import com.mojang.logging.LogUtils;
++import org.bukkit.plugin.IllegalPluginAccessException;
++import org.bukkit.plugin.Plugin;
++import org.slf4j.Logger;
++
++import java.util.Set;
++import java.util.concurrent.ConcurrentHashMap;
++import java.util.concurrent.Executor;
++import java.util.concurrent.Executors;
++import java.util.concurrent.ScheduledExecutorService;
++import java.util.concurrent.ScheduledFuture;
++import java.util.concurrent.SynchronousQueue;
++import java.util.concurrent.ThreadFactory;
++import java.util.concurrent.ThreadPoolExecutor;
++import java.util.concurrent.TimeUnit;
++import java.util.concurrent.atomic.AtomicInteger;
++import java.util.function.Consumer;
++import java.util.logging.Level;
++
++public final class FoliaAsyncScheduler implements AsyncScheduler {
++
++    private static final Logger LOGGER = LogUtils.getClassLogger();
++
++    private final Executor executors = new ThreadPoolExecutor(Math.max(4, Runtime.getRuntime().availableProcessors() / 2), Integer.MAX_VALUE,
++        30L, TimeUnit.SECONDS, new SynchronousQueue<>(),
++        new ThreadFactory() {
++            private final AtomicInteger idGenerator = new AtomicInteger();
++
++            @Override
++            public Thread newThread(final Runnable run) {
++                final Thread ret = new Thread(run);
++
++                ret.setName("Folia Async Scheduler Thread #" + this.idGenerator.getAndIncrement());
++                ret.setPriority(Thread.NORM_PRIORITY - 1);
++                ret.setUncaughtExceptionHandler((final Thread thread, final Throwable thr) -> {
++                    LOGGER.error("Uncaught exception in thread: " + thread.getName(), thr);
++                });
++
++                return ret;
++            }
++        }
++    );
++
++    private final ScheduledExecutorService timerThread = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
++        @Override
++        public Thread newThread(final Runnable run) {
++            final Thread ret = new Thread(run);
++
++            ret.setName("Folia Async Scheduler Thread Timer");
++            ret.setPriority(Thread.NORM_PRIORITY + 1);
++            ret.setUncaughtExceptionHandler((final Thread thread, final Throwable thr) -> {
++                LOGGER.error("Uncaught exception in thread: " + thread.getName(), thr);
++            });
++
++            return ret;
++        }
++    });
++
++    private final Set<AsyncScheduledTask> tasks = ConcurrentHashMap.newKeySet();
++
++    @Override
++    public ScheduledTask runNow(final Plugin plugin, final Consumer<ScheduledTask> task) {
++        Validate.notNull(plugin, "Plugin may not be null");
++        Validate.notNull(task, "Task may not be null");
++
++        if (!plugin.isEnabled()) {
++            throw new IllegalPluginAccessException("Plugin attempted to register task while disabled");
++        }
++
++        final AsyncScheduledTask ret = new AsyncScheduledTask(plugin, -1L, task, null, -1L);
++
++        this.tasks.add(ret);
++        this.executors.execute(ret);
++
++        if (!plugin.isEnabled()) {
++            // handle race condition where plugin is disabled asynchronously
++            ret.cancel();
++        }
++
++        return ret;
++    }
++
++    @Override
++    public ScheduledTask runDelayed(final Plugin plugin, final Consumer<ScheduledTask> task, final long delay,
++                                    final TimeUnit unit) {
++        Validate.notNull(plugin, "Plugin may not be null");
++        Validate.notNull(task, "Task may not be null");
++        Validate.notNull(unit, "Time unit may not be null");
++        if (delay < 0L) {
++            throw new IllegalArgumentException("Delay may not be < 0");
++        }
++
++        if (!plugin.isEnabled()) {
++            throw new IllegalPluginAccessException("Plugin attempted to register task while disabled");
++        }
++
++        return this.scheduleTimerTask(plugin, task, delay, -1L, unit);
++    }
++
++    @Override
++    public ScheduledTask runAtFixedRate(final Plugin plugin, final Consumer<ScheduledTask> task, final long initialDelay,
++                                        final long period, final TimeUnit unit) {
++        Validate.notNull(plugin, "Plugin may not be null");
++        Validate.notNull(task, "Task may not be null");
++        Validate.notNull(unit, "Time unit may not be null");
++        if (initialDelay < 0L) {
++            throw new IllegalArgumentException("Initial delay may not be < 0");
++        }
++        if (period <= 0L) {
++            throw new IllegalArgumentException("Period may not be <= 0");
++        }
++
++        if (!plugin.isEnabled()) {
++            throw new IllegalPluginAccessException("Plugin attempted to register task while disabled");
++        }
++
++        return this.scheduleTimerTask(plugin, task, initialDelay, period, unit);
++    }
++
++    private AsyncScheduledTask scheduleTimerTask(final Plugin plugin, final Consumer<ScheduledTask> task, final long initialDelay,
++                                                 final long period, final TimeUnit unit) {
++        final AsyncScheduledTask ret = new AsyncScheduledTask(
++            plugin, period <= 0 ? period : unit.toNanos(period), task, null,
++            System.nanoTime() + unit.toNanos(initialDelay)
++        );
++
++        synchronized (ret) {
++            // even though ret is not published, we need to synchronise while scheduling to avoid a race condition
++            // for when a scheduled task immediately executes before we update the delay field and state field
++            ret.setDelay(this.timerThread.schedule(ret, initialDelay, unit));
++            this.tasks.add(ret);
++        }
++
++        if (!plugin.isEnabled()) {
++            // handle race condition where plugin is disabled asynchronously
++            ret.cancel();
++        }
++
++        return ret;
++    }
++
++    @Override
++    public void cancelTasks(final Plugin plugin) {
++        Validate.notNull(plugin, "Plugin may not be null");
++
++        for (final AsyncScheduledTask task : this.tasks) {
++            if (task.plugin == plugin) {
++                task.cancel();
++            }
++        }
++    }
++
++    private final class AsyncScheduledTask implements ScheduledTask, Runnable {
++
++        private static final int STATE_ON_TIMER            = 0;
++        private static final int STATE_SCHEDULED_EXECUTOR  = 1;
++        private static final int STATE_EXECUTING           = 2;
++        private static final int STATE_EXECUTING_CANCELLED = 3;
++        private static final int STATE_FINISHED            = 4;
++        private static final int STATE_CANCELLED           = 5;
++
++        private final Plugin plugin;
++        private final long repeatDelay; // in ns
++        private Consumer<ScheduledTask> run;
++        private ScheduledFuture<?> delay;
++        private int state;
++        private long scheduleTarget;
++
++        public AsyncScheduledTask(final Plugin plugin, final long repeatDelay, final Consumer<ScheduledTask> run,
++                                  final ScheduledFuture<?> delay, final long firstTarget) {
++            this.plugin = plugin;
++            this.repeatDelay = repeatDelay;
++            this.run = run;
++            this.delay = delay;
++            this.state = delay == null ? STATE_SCHEDULED_EXECUTOR : STATE_ON_TIMER;
++            this.scheduleTarget = firstTarget;
++        }
++
++        private void setDelay(final ScheduledFuture<?> delay) {
++            this.delay = delay;
++            this.state = STATE_SCHEDULED_EXECUTOR;
++        }
++
++        @Override
++        public void run() {
++            final boolean repeating = this.isRepeatingTask();
++            // try to advance state
++            final boolean timer;
++            synchronized (this) {
++                if (this.state == STATE_ON_TIMER) {
++                    timer = true;
++                    this.delay = null;
++                    this.state = STATE_SCHEDULED_EXECUTOR;
++                } else if (this.state != STATE_SCHEDULED_EXECUTOR) {
++                    // cancelled
++                    if (this.state != STATE_CANCELLED) {
++                        throw new IllegalStateException("Wrong state: " + this.state);
++                    }
++                    return;
++                } else {
++                    timer = false;
++                    this.state = STATE_EXECUTING;
++                }
++            }
++
++            if (timer) {
++                // the scheduled executor is single thread, and unfortunately not expandable with threads
++                // so we just schedule onto the executor
++                FoliaAsyncScheduler.this.executors.execute(this);
++                return;
++            }
++
++            try {
++                this.run.accept(this);
++            } catch (final Throwable throwable) {
++                this.plugin.getLogger().log(Level.WARNING, "Async task for " + this.plugin.getDescription().getFullName() + " generated an exception", throwable);
++            } finally {
++                boolean removeFromTasks = false;
++                synchronized (this) {
++                    if (!repeating) {
++                        // only want to execute once, so we're done
++                        removeFromTasks = true;
++                        this.state = STATE_FINISHED;
++                    } else if (this.state != STATE_EXECUTING_CANCELLED) {
++                        this.state = STATE_ON_TIMER;
++                        // account for any delays, whether it be by task exec. or scheduler issues so that we keep
++                        // the fixed schedule
++                        final long currTime = System.nanoTime();
++                        final long delay = Math.max(0L, this.scheduleTarget + this.repeatDelay - currTime);
++                        this.scheduleTarget = currTime + delay;
++                        this.delay = FoliaAsyncScheduler.this.timerThread.schedule(this, delay, TimeUnit.NANOSECONDS);
++                    } else {
++                        // cancelled repeating task
++                        removeFromTasks = true;
++                    }
++                }
++
++                if (removeFromTasks) {
++                    this.run = null;
++                    FoliaAsyncScheduler.this.tasks.remove(this);
++                }
++            }
++        }
++
++        @Override
++        public Plugin getOwningPlugin() {
++            return this.plugin;
++        }
++
++        @Override
++        public boolean isRepeatingTask() {
++            return this.repeatDelay > 0L;
++        }
++
++        @Override
++        public CancelledState cancel() {
++            ScheduledFuture<?> delay = null;
++            CancelledState ret;
++            synchronized (this) {
++                switch (this.state) {
++                    case STATE_ON_TIMER: {
++                        delay = this.delay;
++                        this.delay = null;
++                        this.state = STATE_CANCELLED;
++                        ret = CancelledState.CANCELLED_BY_CALLER;
++                        break;
++                    }
++                    case STATE_SCHEDULED_EXECUTOR: {
++                        this.state = STATE_CANCELLED;
++                        ret = CancelledState.CANCELLED_BY_CALLER;
++                        break;
++                    }
++                    case STATE_EXECUTING: {
++                        if (!this.isRepeatingTask()) {
++                            return CancelledState.RUNNING;
++                        }
++                        this.state = STATE_EXECUTING_CANCELLED;
++                        return CancelledState.NEXT_RUNS_CANCELLED;
++                    }
++                    case STATE_EXECUTING_CANCELLED: {
++                        return CancelledState.NEXT_RUNS_CANCELLED_ALREADY;
++                    }
++                    case STATE_FINISHED: {
++                        return CancelledState.ALREADY_EXECUTED;
++                    }
++                    case STATE_CANCELLED: {
++                        return CancelledState.CANCELLED_ALREADY;
++                    }
++                    default: {
++                        throw new IllegalStateException("Unknown state: " + this.state);
++                    }
++                }
++            }
++
++            if (delay != null) {
++                delay.cancel(false);
++            }
++            this.run = null;
++            FoliaAsyncScheduler.this.tasks.remove(this);
++            return ret;
++        }
++
++        @Override
++        public ExecutionState getExecutionState() {
++            synchronized (this) {
++                switch (this.state) {
++                    case STATE_ON_TIMER:
++                    case STATE_SCHEDULED_EXECUTOR:
++                        return ExecutionState.IDLE;
++                    case STATE_EXECUTING:
++                        return ExecutionState.RUNNING;
++                    case STATE_EXECUTING_CANCELLED:
++                        return ExecutionState.CANCELLED_RUNNING;
++                    case STATE_FINISHED:
++                        return ExecutionState.FINISHED;
++                    case STATE_CANCELLED:
++                        return ExecutionState.CANCELLED;
++                    default: {
++                        throw new IllegalStateException("Unknown state: " + this.state);
++                    }
++                }
++            }
++        }
++    }
++}
+diff --git a/src/main/java/io/papermc/paper/threadedregions/scheduler/FoliaEntityScheduler.java b/src/main/java/io/papermc/paper/threadedregions/scheduler/FoliaEntityScheduler.java
+new file mode 100644
+index 0000000000000000000000000000000000000000..011754962896e32f51ed4606dcbea18a430a2bc1
+--- /dev/null
++++ b/src/main/java/io/papermc/paper/threadedregions/scheduler/FoliaEntityScheduler.java
+@@ -0,0 +1,268 @@
++package io.papermc.paper.threadedregions.scheduler;
++
++import ca.spottedleaf.concurrentutil.util.ConcurrentUtil;
++import ca.spottedleaf.concurrentutil.util.Validate;
++import net.minecraft.world.entity.Entity;
++import org.bukkit.craftbukkit.entity.CraftEntity;
++import org.bukkit.plugin.IllegalPluginAccessException;
++import org.bukkit.plugin.Plugin;
++import org.jetbrains.annotations.Nullable;
++
++import java.lang.invoke.VarHandle;
++import java.util.function.Consumer;
++import java.util.logging.Level;
++
++public final class FoliaEntityScheduler implements EntityScheduler {
++
++    private final CraftEntity entity;
++
++    public FoliaEntityScheduler(final CraftEntity entity) {
++        this.entity = entity;
++    }
++
++    private static Consumer<? extends Entity> wrap(final Plugin plugin, final Runnable runnable) {
++        Validate.notNull(plugin, "Plugin may not be null");
++        Validate.notNull(runnable, "Runnable may not be null");
++
++        return (final Entity nmsEntity) -> {
++            if (!plugin.isEnabled()) {
++                // don't execute if the plugin is disabled
++                return;
++            }
++            try {
++                runnable.run();
++            } catch (final Throwable throwable) {
++                plugin.getLogger().log(Level.WARNING, "Entity task for " + plugin.getDescription().getFullName() + " generated an exception", throwable);
++            }
++        };
++    }
++
++    @Override
++    public boolean execute(final Plugin plugin, final Runnable run, final Runnable retired,
++                           final long delay) {
++        final Consumer<? extends Entity> runNMS = wrap(plugin, run);
++        final Consumer<? extends Entity> runRetired = retired == null ? null : wrap(plugin, retired);
++
++        return this.entity.taskScheduler.schedule(runNMS, runRetired, delay);
++    }
++
++    @Override
++    public @Nullable ScheduledTask run(final Plugin plugin, final Consumer<ScheduledTask> task, final Runnable retired) {
++        return this.runDelayed(plugin, task, retired, 1);
++    }
++
++    @Override
++    public @Nullable ScheduledTask runDelayed(final Plugin plugin, final Consumer<ScheduledTask> task, final Runnable retired,
++                                              final long delayTicks) {
++        Validate.notNull(plugin, "Plugin may not be null");
++        Validate.notNull(task, "Task may not be null");
++        if (delayTicks <= 0) {
++            throw new IllegalArgumentException("Delay ticks may not be <= 0");
++        }
++
++        if (!plugin.isEnabled()) {
++            throw new IllegalPluginAccessException("Plugin attempted to register task while disabled");
++        }
++
++        final EntityScheduledTask ret = new EntityScheduledTask(plugin, -1, task, retired);
++
++        if (!this.scheduleInternal(ret, delayTicks)) {
++            return null;
++        }
++
++        if (!plugin.isEnabled()) {
++            // handle race condition where plugin is disabled asynchronously
++            ret.cancel();
++        }
++
++        return ret;
++    }
++
++    @Override
++    public @Nullable ScheduledTask runAtFixedRate(final Plugin plugin, final Consumer<ScheduledTask> task,
++                                                  final Runnable retired, final long initialDelayTicks, final long periodTicks) {
++        Validate.notNull(plugin, "Plugin may not be null");
++        Validate.notNull(task, "Task may not be null");
++        if (initialDelayTicks <= 0) {
++            throw new IllegalArgumentException("Initial delay ticks may not be <= 0");
++        }
++        if (periodTicks <= 0) {
++            throw new IllegalArgumentException("Period ticks may not be <= 0");
++        }
++
++        if (!plugin.isEnabled()) {
++            throw new IllegalPluginAccessException("Plugin attempted to register task while disabled");
++        }
++
++        final EntityScheduledTask ret = new EntityScheduledTask(plugin, periodTicks, task, retired);
++
++        if (!this.scheduleInternal(ret, initialDelayTicks)) {
++            return null;
++        }
++
++        if (!plugin.isEnabled()) {
++            // handle race condition where plugin is disabled asynchronously
++            ret.cancel();
++        }
++
++        return ret;
++    }
++
++    private boolean scheduleInternal(final EntityScheduledTask ret, final long delay) {
++        return this.entity.taskScheduler.schedule(ret, ret, delay);
++    }
++
++    private final class EntityScheduledTask implements ScheduledTask, Consumer<Entity> {
++
++        private static final int STATE_IDLE                = 0;
++        private static final int STATE_EXECUTING           = 1;
++        private static final int STATE_EXECUTING_CANCELLED = 2;
++        private static final int STATE_FINISHED            = 3;
++        private static final int STATE_CANCELLED           = 4;
++
++        private final Plugin plugin;
++        private final long repeatDelay; // in ticks
++        private Consumer<ScheduledTask> run;
++        private Runnable retired;
++        private volatile int state;
++
++        private static final VarHandle STATE_HANDLE = ConcurrentUtil.getVarHandle(EntityScheduledTask.class, "state", int.class);
++
++        private EntityScheduledTask(final Plugin plugin, final long repeatDelay, final Consumer<ScheduledTask> run, final Runnable retired) {
++            this.plugin = plugin;
++            this.repeatDelay = repeatDelay;
++            this.run = run;
++            this.retired = retired;
++        }
++
++        private final int getStateVolatile() {
++            return (int)STATE_HANDLE.get(this);
++        }
++
++        private final int compareAndExchangeStateVolatile(final int expect, final int update) {
++            return (int)STATE_HANDLE.compareAndExchange(this, expect, update);
++        }
++
++        private final void setStateVolatile(final int value) {
++            STATE_HANDLE.setVolatile(this, value);
++        }
++
++        @Override
++        public void accept(final Entity entity) {
++            if (!this.plugin.isEnabled()) {
++                // don't execute if the plugin is disabled
++                this.setStateVolatile(STATE_CANCELLED);
++                return;
++            }
++
++            final boolean repeating = this.isRepeatingTask();
++            if (STATE_IDLE != this.compareAndExchangeStateVolatile(STATE_IDLE, STATE_EXECUTING)) {
++                // cancelled
++                return;
++            }
++
++            final boolean retired = entity.isRemoved();
++
++            try {
++                if (!retired) {
++                    this.run.accept(this);
++                } else {
++                    if (this.retired != null) {
++                        this.retired.run();
++                    }
++                }
++            } catch (final Throwable throwable) {
++                this.plugin.getLogger().log(Level.WARNING, "Entity task for " + this.plugin.getDescription().getFullName() + " generated an exception", throwable);
++            } finally {
++                boolean reschedule = false;
++                 if (!repeating && !retired) {
++                    this.setStateVolatile(STATE_FINISHED);
++                } else if (retired || !this.plugin.isEnabled()) {
++                     this.setStateVolatile(STATE_CANCELLED);
++                } else if (STATE_EXECUTING == this.compareAndExchangeStateVolatile(STATE_EXECUTING, STATE_IDLE)) {
++                    reschedule = true;
++                } // else: cancelled repeating task
++
++                if (!reschedule) {
++                    this.run = null;
++                    this.retired = null;
++                } else {
++                    if (!FoliaEntityScheduler.this.scheduleInternal(this, this.repeatDelay)) {
++                        // the task itself must have removed the entity, so in this case we need to mark as cancelled
++                        this.setStateVolatile(STATE_CANCELLED);
++                    }
++                }
++            }
++        }
++
++        @Override
++        public Plugin getOwningPlugin() {
++            return this.plugin;
++        }
++
++        @Override
++        public boolean isRepeatingTask() {
++            return this.repeatDelay > 0;
++        }
++
++        @Override
++        public CancelledState cancel() {
++            for (int curr = this.getStateVolatile();;) {
++                switch (curr) {
++                    case STATE_IDLE: {
++                        if (STATE_IDLE == (curr = this.compareAndExchangeStateVolatile(STATE_IDLE, STATE_CANCELLED))) {
++                            this.state = STATE_CANCELLED;
++                            this.run = null;
++                            this.retired = null;
++                            return CancelledState.CANCELLED_BY_CALLER;
++                        }
++                        // try again
++                        continue;
++                    }
++                    case STATE_EXECUTING: {
++                        if (!this.isRepeatingTask()) {
++                            return CancelledState.RUNNING;
++                        }
++                        if (STATE_EXECUTING == (curr = this.compareAndExchangeStateVolatile(STATE_EXECUTING, STATE_EXECUTING_CANCELLED))) {
++                            return CancelledState.NEXT_RUNS_CANCELLED;
++                        }
++                        // try again
++                        continue;
++                    }
++                    case STATE_EXECUTING_CANCELLED: {
++                        return CancelledState.NEXT_RUNS_CANCELLED_ALREADY;
++                    }
++                    case STATE_FINISHED: {
++                        return CancelledState.ALREADY_EXECUTED;
++                    }
++                    case STATE_CANCELLED: {
++                        return CancelledState.CANCELLED_ALREADY;
++                    }
++                    default: {
++                        throw new IllegalStateException("Unknown state: " + curr);
++                    }
++                }
++            }
++        }
++
++        @Override
++        public ExecutionState getExecutionState() {
++            final int state = this.getStateVolatile();
++            switch (state) {
++                case STATE_IDLE:
++                    return ExecutionState.IDLE;
++                case STATE_EXECUTING:
++                    return ExecutionState.RUNNING;
++                case STATE_EXECUTING_CANCELLED:
++                    return ExecutionState.CANCELLED_RUNNING;
++                case STATE_FINISHED:
++                    return ExecutionState.FINISHED;
++                case STATE_CANCELLED:
++                    return ExecutionState.CANCELLED;
++                default: {
++                    throw new IllegalStateException("Unknown state: " + state);
++                }
++            }
++        }
++    }
++}
+diff --git a/src/main/java/io/papermc/paper/threadedregions/scheduler/FoliaGlobalRegionScheduler.java b/src/main/java/io/papermc/paper/threadedregions/scheduler/FoliaGlobalRegionScheduler.java
+new file mode 100644
+index 0000000000000000000000000000000000000000..d306f911757a4d556c82c0070d4837db87afc497
+--- /dev/null
++++ b/src/main/java/io/papermc/paper/threadedregions/scheduler/FoliaGlobalRegionScheduler.java
+@@ -0,0 +1,267 @@
++package io.papermc.paper.threadedregions.scheduler;
++
++import ca.spottedleaf.concurrentutil.util.ConcurrentUtil;
++import ca.spottedleaf.concurrentutil.util.Validate;
++import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
++import org.bukkit.plugin.IllegalPluginAccessException;
++import org.bukkit.plugin.Plugin;
++
++import java.lang.invoke.VarHandle;
++import java.util.ArrayList;
++import java.util.List;
++import java.util.function.Consumer;
++import java.util.logging.Level;
++
++public class FoliaGlobalRegionScheduler implements GlobalRegionScheduler {
++
++    private long tickCount = 0L;
++    private final Object stateLock = new Object();
++    private final Long2ObjectOpenHashMap<List<GlobalScheduledTask>> tasksByDeadline = new Long2ObjectOpenHashMap<>();
++
++    public void tick() {
++        final List<GlobalScheduledTask> run;
++        synchronized (this.stateLock) {
++            ++this.tickCount;
++            if (this.tasksByDeadline.isEmpty()) {
++                run = null;
++            } else {
++                run = this.tasksByDeadline.remove(this.tickCount);
++            }
++        }
++
++        if (run == null) {
++            return;
++        }
++
++        for (int i = 0, len = run.size(); i < len; ++i) {
++            run.get(i).run();
++        }
++    }
++
++    @Override
++    public void execute(final Plugin plugin, final Runnable run) {
++        Validate.notNull(plugin, "Plugin may not be null");
++        Validate.notNull(run, "Runnable may not be null");
++
++        this.run(plugin, (final ScheduledTask task) -> {
++            run.run();
++        });
++    }
++
++    @Override
++    public ScheduledTask run(final Plugin plugin, final Consumer<ScheduledTask> task) {
++        return this.runDelayed(plugin, task, 1);
++    }
++
++    @Override
++    public ScheduledTask runDelayed(final Plugin plugin, final Consumer<ScheduledTask> task, final long delayTicks) {
++        Validate.notNull(plugin, "Plugin may not be null");
++        Validate.notNull(task, "Task may not be null");
++        if (delayTicks <= 0) {
++            throw new IllegalArgumentException("Delay ticks may not be <= 0");
++        }
++
++        if (!plugin.isEnabled()) {
++            throw new IllegalPluginAccessException("Plugin attempted to register task while disabled");
++        }
++
++        final GlobalScheduledTask ret = new GlobalScheduledTask(plugin, -1, task);
++
++        this.scheduleInternal(ret, delayTicks);
++
++        if (!plugin.isEnabled()) {
++            // handle race condition where plugin is disabled asynchronously
++            ret.cancel();
++        }
++
++        return ret;
++    }
++
++    @Override
++    public ScheduledTask runAtFixedRate(final Plugin plugin, final Consumer<ScheduledTask> task, final long initialDelayTicks, final long periodTicks) {
++        Validate.notNull(plugin, "Plugin may not be null");
++        Validate.notNull(task, "Task may not be null");
++        if (initialDelayTicks <= 0) {
++            throw new IllegalArgumentException("Initial delay ticks may not be <= 0");
++        }
++        if (periodTicks <= 0) {
++            throw new IllegalArgumentException("Period ticks may not be <= 0");
++        }
++
++        if (!plugin.isEnabled()) {
++            throw new IllegalPluginAccessException("Plugin attempted to register task while disabled");
++        }
++
++        final GlobalScheduledTask ret = new GlobalScheduledTask(plugin, periodTicks, task);
++
++        this.scheduleInternal(ret, initialDelayTicks);
++
++        if (!plugin.isEnabled()) {
++            // handle race condition where plugin is disabled asynchronously
++            ret.cancel();
++        }
++
++        return ret;
++    }
++
++    @Override
++    public void cancelTasks(final Plugin plugin) {
++        Validate.notNull(plugin, "Plugin may not be null");
++
++        final List<GlobalScheduledTask> toCancel = new ArrayList<>();
++        synchronized (this.stateLock) {
++            for (final List<GlobalScheduledTask> tasks : this.tasksByDeadline.values()) {
++                for (int i = 0, len = tasks.size(); i < len; ++i) {
++                    final GlobalScheduledTask task = tasks.get(i);
++                    if (task.plugin == plugin) {
++                        toCancel.add(task);
++                    }
++                }
++            }
++        }
++
++        for (int i = 0, len = toCancel.size(); i < len; ++i) {
++            toCancel.get(i).cancel();
++        }
++    }
++
++    private void scheduleInternal(final GlobalScheduledTask task, final long delay) {
++        // note: delay > 0
++        synchronized (this.stateLock) {
++            this.tasksByDeadline.computeIfAbsent(this.tickCount + delay, (final long keyInMap) -> {
++                return new ArrayList<>();
++            }).add(task);
++        }
++    }
++
++    private final class GlobalScheduledTask implements ScheduledTask, Runnable {
++
++        private static final int STATE_IDLE                = 0;
++        private static final int STATE_EXECUTING           = 1;
++        private static final int STATE_EXECUTING_CANCELLED = 2;
++        private static final int STATE_FINISHED            = 3;
++        private static final int STATE_CANCELLED           = 4;
++
++        private final Plugin plugin;
++        private final long repeatDelay; // in ticks
++        private Consumer<ScheduledTask> run;
++        private volatile int state;
++
++        private static final VarHandle STATE_HANDLE = ConcurrentUtil.getVarHandle(GlobalScheduledTask.class, "state", int.class);
++
++        private GlobalScheduledTask(final Plugin plugin, final long repeatDelay, final Consumer<ScheduledTask> run) {
++            this.plugin = plugin;
++            this.repeatDelay = repeatDelay;
++            this.run = run;
++        }
++
++        private final int getStateVolatile() {
++            return (int)STATE_HANDLE.get(this);
++        }
++
++        private final int compareAndExchangeStateVolatile(final int expect, final int update) {
++            return (int)STATE_HANDLE.compareAndExchange(this, expect, update);
++        }
++
++        private final void setStateVolatile(final int value) {
++            STATE_HANDLE.setVolatile(this, value);
++        }
++
++        @Override
++        public void run() {
++            final boolean repeating = this.isRepeatingTask();
++            if (STATE_IDLE != this.compareAndExchangeStateVolatile(STATE_IDLE, STATE_EXECUTING)) {
++                // cancelled
++                return;
++            }
++
++            try {
++                this.run.accept(this);
++            } catch (final Throwable throwable) {
++                this.plugin.getLogger().log(Level.WARNING, "Global task for " + this.plugin.getDescription().getFullName() + " generated an exception", throwable);
++            } finally {
++                boolean reschedule = false;
++                if (!repeating) {
++                    this.setStateVolatile(STATE_FINISHED);
++                } else if (STATE_EXECUTING == this.compareAndExchangeStateVolatile(STATE_EXECUTING, STATE_IDLE)) {
++                    reschedule = true;
++                } // else: cancelled repeating task
++
++                if (!reschedule) {
++                    this.run = null;
++                } else {
++                    FoliaGlobalRegionScheduler.this.scheduleInternal(this, this.repeatDelay);
++                }
++            }
++        }
++
++        @Override
++        public Plugin getOwningPlugin() {
++            return this.plugin;
++        }
++
++        @Override
++        public boolean isRepeatingTask() {
++            return this.repeatDelay > 0;
++        }
++
++        @Override
++        public CancelledState cancel() {
++            for (int curr = this.getStateVolatile();;) {
++                switch (curr) {
++                    case STATE_IDLE: {
++                        if (STATE_IDLE == (curr = this.compareAndExchangeStateVolatile(STATE_IDLE, STATE_CANCELLED))) {
++                            this.state = STATE_CANCELLED;
++                            this.run = null;
++                            return CancelledState.CANCELLED_BY_CALLER;
++                        }
++                        // try again
++                        continue;
++                    }
++                    case STATE_EXECUTING: {
++                        if (!this.isRepeatingTask()) {
++                            return CancelledState.RUNNING;
++                        }
++                        if (STATE_EXECUTING == (curr = this.compareAndExchangeStateVolatile(STATE_EXECUTING, STATE_EXECUTING_CANCELLED))) {
++                            return CancelledState.NEXT_RUNS_CANCELLED;
++                        }
++                        // try again
++                        continue;
++                    }
++                    case STATE_EXECUTING_CANCELLED: {
++                        return CancelledState.NEXT_RUNS_CANCELLED_ALREADY;
++                    }
++                    case STATE_FINISHED: {
++                        return CancelledState.ALREADY_EXECUTED;
++                    }
++                    case STATE_CANCELLED: {
++                        return CancelledState.CANCELLED_ALREADY;
++                    }
++                    default: {
++                        throw new IllegalStateException("Unknown state: " + curr);
++                    }
++                }
++            }
++        }
++
++        @Override
++        public ExecutionState getExecutionState() {
++            final int state = this.getStateVolatile();
++            switch (state) {
++                case STATE_IDLE:
++                    return ExecutionState.IDLE;
++                case STATE_EXECUTING:
++                    return ExecutionState.RUNNING;
++                case STATE_EXECUTING_CANCELLED:
++                    return ExecutionState.CANCELLED_RUNNING;
++                case STATE_FINISHED:
++                    return ExecutionState.FINISHED;
++                case STATE_CANCELLED:
++                    return ExecutionState.CANCELLED;
++                default: {
++                    throw new IllegalStateException("Unknown state: " + state);
++                }
++            }
++        }
++    }
++}
+diff --git a/src/main/java/net/minecraft/server/MinecraftServer.java b/src/main/java/net/minecraft/server/MinecraftServer.java
+index 80cf4852e4010eeeadaf920ab927a40df0179b40..912affb0ab1c4c40f7a655194e95ec6f0ebf1fcd 100644
+--- a/src/main/java/net/minecraft/server/MinecraftServer.java
++++ b/src/main/java/net/minecraft/server/MinecraftServer.java
+@@ -1479,6 +1479,18 @@ public abstract class MinecraftServer extends ReentrantBlockableEventLoop<TickTa
+         MinecraftTimings.bukkitSchedulerTimer.startTiming(); // Spigot // Paper
+         this.server.getScheduler().mainThreadHeartbeat(this.tickCount); // CraftBukkit
+         MinecraftTimings.bukkitSchedulerTimer.stopTiming(); // Spigot // Paper
++        // Paper start - Folia scheduler API
++        ((io.papermc.paper.threadedregions.scheduler.FoliaGlobalRegionScheduler) Bukkit.getGlobalRegionScheduler()).tick();
++        getAllLevels().forEach(level -> level.getAllEntities().forEach(entity -> {
++            if (entity.isRemoved()) {
++                return;
++            }
++            org.bukkit.craftbukkit.entity.CraftEntity bukkit = entity.getBukkitEntityRaw();
++            if (bukkit != null) {
++                bukkit.taskScheduler.executeTick();
++            }
++        }));
++        // Paper end - Folia scheduler API
+         io.papermc.paper.adventure.providers.ClickCallbackProviderImpl.CALLBACK_MANAGER.handleQueue(this.tickCount); // Paper
+         this.profiler.push("commandFunctions");
+         MinecraftTimings.commandFunctionsTimer.startTiming(); // Spigot // Paper
+diff --git a/src/main/java/net/minecraft/server/players/PlayerList.java b/src/main/java/net/minecraft/server/players/PlayerList.java
+index 8547e7ff2f1f5b7701fb0f3c3010c14601a5f83e..fff7ad7a45f310783ac96b44575ad3db13d537fa 100644
+--- a/src/main/java/net/minecraft/server/players/PlayerList.java
++++ b/src/main/java/net/minecraft/server/players/PlayerList.java
+@@ -643,6 +643,7 @@ public abstract class PlayerList {
+ 
+         entityplayer.unRide();
+         worldserver.removePlayerImmediately(entityplayer, Entity.RemovalReason.UNLOADED_WITH_PLAYER);
++        entityplayer.retireScheduler(); // Paper - Folia schedulers
+         entityplayer.getAdvancements().stopListening();
+         this.players.remove(entityplayer);
+         this.playersByName.remove(entityplayer.getScoreboardName().toLowerCase(java.util.Locale.ROOT)); // Spigot
+diff --git a/src/main/java/net/minecraft/world/entity/Entity.java b/src/main/java/net/minecraft/world/entity/Entity.java
+index b38c4cbcf0405d82c7b6e018e80a3174e460c1a4..513c34aa02d63f7e3c178eade818e156af4541db 100644
+--- a/src/main/java/net/minecraft/world/entity/Entity.java
++++ b/src/main/java/net/minecraft/world/entity/Entity.java
+@@ -246,11 +246,23 @@ public abstract class Entity implements Nameable, EntityAccess, CommandSource {
+     public @Nullable Throwable addedToWorldStack; // Paper - entity debug
+     public CraftEntity getBukkitEntity() {
+         if (this.bukkitEntity == null) {
+-            this.bukkitEntity = CraftEntity.getEntity(this.level.getCraftServer(), this);
++            // Paper start - Folia schedulers
++            synchronized (this) {
++                if (this.bukkitEntity == null) {
++                    return this.bukkitEntity = CraftEntity.getEntity(this.level.getCraftServer(), this);
++                }
++            }
++            // Paper end - Folia schedulers
+         }
+         return this.bukkitEntity;
+     }
+ 
++    // Paper start
++    public CraftEntity getBukkitEntityRaw() {
++        return this.bukkitEntity;
++    }
++    // Paper end
++
+     @Override
+     public CommandSender getBukkitSender(CommandSourceStack wrapper) {
+         return this.getBukkitEntity();
+@@ -665,6 +677,7 @@ public abstract class Entity implements Nameable, EntityAccess, CommandSource {
+     }
+ 
+     public final void discard() {
++        if (this.isRemoved()) return; // Paper
+         this.remove(Entity.RemovalReason.DISCARDED);
+     }
+ 
+@@ -4678,12 +4691,28 @@ public abstract class Entity implements Nameable, EntityAccess, CommandSource {
+ 
+         if (reason != RemovalReason.UNLOADED_TO_CHUNK) this.getPassengers().forEach(Entity::stopRiding); // Paper - chunk system - don't adjust passenger state when unloading, it's just not safe (and messes with our logic in entity chunk unload)
+         this.levelCallback.onRemove(reason);
++        // Paper start - Folia schedulers
++        if (!(this instanceof ServerPlayer) && reason != RemovalReason.CHANGED_DIMENSION) {
++            // Players need to be special cased, because they are regularly removed from the world
++            this.retireScheduler();
++        }
++        // Paper end - Folia schedulers
+     }
+ 
+     public void unsetRemoved() {
+         this.removalReason = null;
+     }
+ 
++    // Paper start - Folia schedulers
++    /**
++     * Invoked only when the entity is truly removed from the server, never to be added to any world.
++     */
++    public final void retireScheduler() {
++        // we need to force create the bukkit entity so that the scheduler can be retired...
++        this.getBukkitEntity().taskScheduler.retire();
++    }
++    // Paper end - Folia schedulers
++
+     @Override
+     public void setLevelCallback(EntityInLevelCallback changeListener) {
+         this.levelCallback = changeListener;
+diff --git a/src/main/java/org/bukkit/craftbukkit/CraftServer.java b/src/main/java/org/bukkit/craftbukkit/CraftServer.java
+index 59984cb8ba9ffb66b59a2c907e4f04b5a51ea8ed..0859f11567aecc8cae993a1409cfac7c53ab3dd5 100644
+--- a/src/main/java/org/bukkit/craftbukkit/CraftServer.java
++++ b/src/main/java/org/bukkit/craftbukkit/CraftServer.java
+@@ -302,6 +302,76 @@ public final class CraftServer implements Server {
+     private final io.papermc.paper.logging.SysoutCatcher sysoutCatcher = new io.papermc.paper.logging.SysoutCatcher(); // Paper
+     private final CraftPotionBrewer potionBrewer = new CraftPotionBrewer(); // Paper
+ 
++    // Paper start - Folia region threading API
++    private final io.papermc.paper.threadedregions.scheduler.FallbackRegionScheduler regionizedScheduler = new io.papermc.paper.threadedregions.scheduler.FallbackRegionScheduler();
++    private final io.papermc.paper.threadedregions.scheduler.FoliaAsyncScheduler asyncScheduler = new io.papermc.paper.threadedregions.scheduler.FoliaAsyncScheduler();
++    private final io.papermc.paper.threadedregions.scheduler.FoliaGlobalRegionScheduler globalRegionScheduler = new io.papermc.paper.threadedregions.scheduler.FoliaGlobalRegionScheduler();
++
++    @Override
++    public final io.papermc.paper.threadedregions.scheduler.RegionScheduler getRegionScheduler() {
++        return this.regionizedScheduler;
++    }
++
++    @Override
++    public final io.papermc.paper.threadedregions.scheduler.AsyncScheduler getAsyncScheduler() {
++        return this.asyncScheduler;
++    }
++
++    @Override
++    public final io.papermc.paper.threadedregions.scheduler.FoliaGlobalRegionScheduler getGlobalRegionScheduler() {
++        return this.globalRegionScheduler;
++    }
++
++    @Override
++    public final boolean isOwnedByCurrentRegion(World world, io.papermc.paper.math.Position position) {
++        return io.papermc.paper.util.TickThread.isTickThreadFor(
++            ((CraftWorld) world).getHandle(), position.blockX() >> 4, position.blockZ() >> 4
++        );
++    }
++
++    @Override
++    public final boolean isOwnedByCurrentRegion(World world, io.papermc.paper.math.Position position, int squareRadiusChunks) {
++        return io.papermc.paper.util.TickThread.isTickThreadFor(
++            ((CraftWorld) world).getHandle(), position.blockX() >> 4, position.blockZ() >> 4, squareRadiusChunks
++        );
++    }
++
++    @Override
++    public final boolean isOwnedByCurrentRegion(Location location) {
++        World world = location.getWorld();
++        return io.papermc.paper.util.TickThread.isTickThreadFor(
++            ((CraftWorld) world).getHandle(), location.getBlockX() >> 4, location.getBlockZ() >> 4
++        );
++    }
++
++    @Override
++    public final boolean isOwnedByCurrentRegion(Location location, int squareRadiusChunks) {
++        World world = location.getWorld();
++        return io.papermc.paper.util.TickThread.isTickThreadFor(
++            ((CraftWorld) world).getHandle(), location.getBlockX() >> 4, location.getBlockZ() >> 4, squareRadiusChunks
++        );
++    }
++
++    @Override
++    public final boolean isOwnedByCurrentRegion(World world, int chunkX, int chunkZ) {
++        return io.papermc.paper.util.TickThread.isTickThreadFor(
++            ((CraftWorld) world).getHandle(), chunkX, chunkZ
++        );
++    }
++
++    @Override
++    public final boolean isOwnedByCurrentRegion(World world, int chunkX, int chunkZ, int squareRadiusChunks) {
++        return io.papermc.paper.util.TickThread.isTickThreadFor(
++            ((CraftWorld) world).getHandle(), chunkX, chunkZ, squareRadiusChunks
++        );
++    }
++
++    @Override
++    public final boolean isOwnedByCurrentRegion(Entity entity) {
++        return io.papermc.paper.util.TickThread.isTickThreadFor(((org.bukkit.craftbukkit.entity.CraftEntity) entity).getHandleRaw());
++    }
++    // Paper end - Folia reagion threading API
++
+     static {
+         ConfigurationSerialization.registerClass(CraftOfflinePlayer.class);
+         ConfigurationSerialization.registerClass(CraftPlayerProfile.class);
+diff --git a/src/main/java/org/bukkit/craftbukkit/entity/CraftEntity.java b/src/main/java/org/bukkit/craftbukkit/entity/CraftEntity.java
+index 733158b6f2c2bd03fbe798562ff7bc33280548dc..fc0dc8e607cc24020106ea1af92b4421a5f9393d 100644
+--- a/src/main/java/org/bukkit/craftbukkit/entity/CraftEntity.java
++++ b/src/main/java/org/bukkit/craftbukkit/entity/CraftEntity.java
+@@ -203,6 +203,15 @@ public abstract class CraftEntity implements org.bukkit.entity.Entity {
+     private EntityDamageEvent lastDamageEvent;
+     private final CraftPersistentDataContainer persistentDataContainer = new CraftPersistentDataContainer(CraftEntity.DATA_TYPE_REGISTRY);
+     protected net.kyori.adventure.pointer.Pointers adventure$pointers; // Paper - implement pointers
++    // Paper start - Folia shedulers
++    public final io.papermc.paper.threadedregions.EntityScheduler taskScheduler = new io.papermc.paper.threadedregions.EntityScheduler(this);
++    private final io.papermc.paper.threadedregions.scheduler.FoliaEntityScheduler apiScheduler = new io.papermc.paper.threadedregions.scheduler.FoliaEntityScheduler(this);
++
++    @Override
++    public final io.papermc.paper.threadedregions.scheduler.EntityScheduler getScheduler() {
++        return this.apiScheduler;
++    };
++    // Paper end - Folia schedulers
+ 
+     public CraftEntity(final CraftServer server, final Entity entity) {
+         this.server = server;
+@@ -825,6 +834,12 @@ public abstract class CraftEntity implements org.bukkit.entity.Entity {
+         return this.entity;
+     }
+ 
++    // Paper start
++    public Entity getHandleRaw() {
++        return this.entity;
++    }
++    // Paper end
++
+     @Override
+     public void playEffect(EntityEffect type) {
+         Preconditions.checkArgument(type != null, "type");