diff --git a/core/src/main/java/org/geysermc/geyser/network/netty/Bootstraps.java b/core/src/main/java/org/geysermc/geyser/network/netty/Bootstraps.java new file mode 100644 index 000000000..9ffc45650 --- /dev/null +++ b/core/src/main/java/org/geysermc/geyser/network/netty/Bootstraps.java @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2019-2023 GeyserMC. http://geysermc.org + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + * + * @author GeyserMC + * @link https://github.com/GeyserMC/Geyser + */ + +package org.geysermc.geyser.network.netty; + +import io.netty.bootstrap.AbstractBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.epoll.Native; +import io.netty.channel.unix.UnixChannelOption; +import lombok.experimental.UtilityClass; + +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +@SuppressWarnings("OptionalUsedAsFieldOrParameterType") +@UtilityClass +public final class Bootstraps { + private static final Optional KERNEL_VERSION; + + // The REUSEPORT_AVAILABLE socket option is available starting from kernel version 3.9. + // This option allows multiple sockets to listen on the same IP address and port without conflict. + private static final int[] REUSEPORT_VERSION = new int[]{3, 9, 0}; + private static final boolean REUSEPORT_AVAILABLE; + + static { + String kernelVersion; + try { + kernelVersion = Native.KERNEL_VERSION; + } catch (Throwable e) { + kernelVersion = null; + } + if (kernelVersion != null && kernelVersion.contains("-")) { + int index = kernelVersion.indexOf('-'); + if (index > -1) { + kernelVersion = kernelVersion.substring(0, index); + } + int[] kernelVer = fromString(kernelVersion); + KERNEL_VERSION = Optional.of(kernelVer); + REUSEPORT_AVAILABLE = checkVersion(kernelVer, 0); + } else { + KERNEL_VERSION = Optional.empty(); + REUSEPORT_AVAILABLE = false; + } + } + + public static Optional getKernelVersion() { + return KERNEL_VERSION; + } + + public static boolean isReusePortAvailable() { + return REUSEPORT_AVAILABLE; + } + + @SuppressWarnings({"rawtypes, unchecked"}) + public static void setupBootstrap(AbstractBootstrap bootstrap) { + if (REUSEPORT_AVAILABLE) { + bootstrap.option(UnixChannelOption.SO_REUSEPORT, true); + } + } + + private static int[] fromString(String ver) { + String[] parts = ver.split("\\."); + if (parts.length < 2) { + throw new IllegalArgumentException("At least 2 version numbers required"); + } + + return new int[]{ + Integer.parseInt(parts[0]), + Integer.parseInt(parts[1]), + parts.length == 2 ? 0 : Integer.parseInt(parts[2]) + }; + } + + private static boolean checkVersion(int[] ver, int i) { + if (ver[i] > REUSEPORT_VERSION[i]) { + return true; + } else if (ver[i] == REUSEPORT_VERSION[i]) { + if (ver.length == (i + 1)) { + return true; + } else { + return checkVersion(ver, i + 1); + } + } + return false; + } + + public static CompletableFuture allOf(ChannelFuture... futures) { + if (futures == null || futures.length == 0) { + return CompletableFuture.completedFuture(null); + } + @SuppressWarnings("unchecked") + CompletableFuture[] completableFutures = new CompletableFuture[futures.length]; + for (int i = 0; i < futures.length; i++) { + ChannelFuture channelFuture = futures[i]; + CompletableFuture completableFuture = new CompletableFuture<>(); + channelFuture.addListener(future -> { + if (future.cause() != null) { + completableFuture.completeExceptionally(future.cause()); + } + completableFuture.complete(channelFuture.channel()); + }); + completableFutures[i] = completableFuture; + } + + return CompletableFuture.allOf(completableFutures); + } +} diff --git a/core/src/main/java/org/geysermc/geyser/network/netty/GeyserServer.java b/core/src/main/java/org/geysermc/geyser/network/netty/GeyserServer.java index 401a7f2cf..ea1dcb509 100644 --- a/core/src/main/java/org/geysermc/geyser/network/netty/GeyserServer.java +++ b/core/src/main/java/org/geysermc/geyser/network/netty/GeyserServer.java @@ -94,13 +94,16 @@ public final class GeyserServer { private final GeyserImpl geyser; private EventLoopGroup group; + // Split childGroup may improve IO + private EventLoopGroup childGroup; private final ServerBootstrap bootstrap; private EventLoopGroup playerGroup; @Getter private final ExpiringMap proxiedAddresses; + private final int listenCount; - private ChannelFuture bootstrapFuture; + private ChannelFuture[] bootstrapFutures; /** * The port to broadcast in the pong. This can be different from the port the server is bound to, e.g. due to port forwarding. @@ -109,9 +112,14 @@ public final class GeyserServer { public GeyserServer(GeyserImpl geyser, int threadCount) { this.geyser = geyser; - this.group = TRANSPORT.eventLoopGroupFactory().apply(threadCount); + this.listenCount = Bootstraps.isReusePortAvailable() ? Integer.getInteger("Geyser.ListenCount", 2) : 1; + GeyserImpl.getInstance().getLogger().debug("Listen thread count: " + listenCount); + this.group = TRANSPORT.eventLoopGroupFactory().apply(listenCount); + this.childGroup = TRANSPORT.eventLoopGroupFactory().apply(threadCount); - this.bootstrap = this.createBootstrap(this.group); + this.bootstrap = this.createBootstrap(); + // setup SO_REUSEPORT if exists + Bootstraps.setupBootstrap(this.bootstrap); if (this.geyser.getConfig().getBedrock().isEnableProxyProtocol()) { this.proxiedAddresses = ExpiringMap.builder() @@ -130,46 +138,51 @@ public final class GeyserServer { } public CompletableFuture bind(InetSocketAddress address) { - CompletableFuture future = new CompletableFuture<>(); - this.bootstrapFuture = this.bootstrap.bind(address).addListener(bindResult -> { - if (bindResult.cause() != null) { - future.completeExceptionally(bindResult.cause()); - return; - } - future.complete(null); - }); + bootstrapFutures = new ChannelFuture[listenCount]; + for (int i = 0; i < listenCount; i++) { + ChannelFuture future = bootstrap.bind(address); + addHandlers(future); + bootstrapFutures[i] = future; + } - Channel channel = this.bootstrapFuture.channel(); + return Bootstraps.allOf(bootstrapFutures); + } + private void addHandlers(ChannelFuture future) { + Channel channel = future.channel(); // Add our ping handler channel.pipeline() .addFirst(RakConnectionRequestHandler.NAME, new RakConnectionRequestHandler(this)) .addAfter(RakServerOfflineHandler.NAME, RakPingHandler.NAME, new RakPingHandler(this)); - + // Add proxy handler if (this.geyser.getConfig().getBedrock().isEnableProxyProtocol()) { channel.pipeline().addFirst("proxy-protocol-decoder", new ProxyServerHandler()); } - - return future; } public void shutdown() { try { - Future future1 = this.group.shutdownGracefully(SHUTDOWN_QUIET_PERIOD_MS, SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS); + Future futureChildGroup = this.childGroup.shutdownGracefully(SHUTDOWN_QUIET_PERIOD_MS, SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS); + this.childGroup = null; + Future futureGroup = this.group.shutdownGracefully(SHUTDOWN_QUIET_PERIOD_MS, SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS); this.group = null; - Future future2 = this.playerGroup.shutdownGracefully(SHUTDOWN_QUIET_PERIOD_MS, SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS); + Future futurePlayerGroup = this.playerGroup.shutdownGracefully(SHUTDOWN_QUIET_PERIOD_MS, SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS); this.playerGroup = null; - future1.sync(); - future2.sync(); + + futureChildGroup.sync(); + futureGroup.sync(); + futurePlayerGroup.sync(); SkinProvider.shutdown(); } catch (InterruptedException e) { GeyserImpl.getInstance().getLogger().severe("Exception in shutdown process", e); } - this.bootstrapFuture.channel().closeFuture().syncUninterruptibly(); + for (ChannelFuture f : bootstrapFutures) { + f.channel().closeFuture().syncUninterruptibly(); + } } - private ServerBootstrap createBootstrap(EventLoopGroup group) { + private ServerBootstrap createBootstrap() { if (this.geyser.getConfig().isDebugMode()) { this.geyser.getLogger().debug("EventLoop type: " + TRANSPORT.datagramChannel()); if (TRANSPORT.datagramChannel() == NioDatagramChannel.class) { @@ -188,7 +201,7 @@ public final class GeyserServer { this.geyser.getLogger().debug("Setting MTU to " + this.geyser.getConfig().getMtu()); return new ServerBootstrap() .channelFactory(RakChannelFactory.server(TRANSPORT.datagramChannel())) - .group(group) + .group(group, childGroup) .option(RakChannelOption.RAK_HANDLE_PING, true) .option(RakChannelOption.RAK_MAX_MTU, this.geyser.getConfig().getMtu()) .childHandler(serverInitializer); @@ -224,7 +237,7 @@ public final class GeyserServer { return true; } - public BedrockPong onQuery(InetSocketAddress inetSocketAddress) { + public BedrockPong onQuery(Channel channel, InetSocketAddress inetSocketAddress) { if (geyser.getConfig().isDebugMode() && PRINT_DEBUG_PINGS) { String ip; if (geyser.getConfig().isLogPlayerIpAddresses()) { @@ -257,7 +270,7 @@ public final class GeyserServer { .version(GameProtocol.DEFAULT_BEDROCK_CODEC.getMinecraftVersion()) // Required to not be empty as of 1.16.210.59. Can only contain . and numbers. .ipv4Port(this.broadcastPort) .ipv6Port(this.broadcastPort) - .serverId(bootstrapFuture.channel().config().getOption(RakChannelOption.RAK_GUID)); + .serverId(channel.config().getOption(RakChannelOption.RAK_GUID)); if (config.isPassthroughMotd() && pingInfo != null && pingInfo.getDescription() != null) { String[] motd = MessageTranslator.convertMessageLenient(pingInfo.getDescription()).split("\n"); diff --git a/core/src/main/java/org/geysermc/geyser/network/netty/handler/RakPingHandler.java b/core/src/main/java/org/geysermc/geyser/network/netty/handler/RakPingHandler.java index e63bf9dd2..62b9c6d12 100644 --- a/core/src/main/java/org/geysermc/geyser/network/netty/handler/RakPingHandler.java +++ b/core/src/main/java/org/geysermc/geyser/network/netty/handler/RakPingHandler.java @@ -45,7 +45,7 @@ public class RakPingHandler extends SimpleChannelInboundHandler { protected void channelRead0(ChannelHandlerContext ctx, RakPing msg) { long guid = ctx.channel().config().getOption(RakChannelOption.RAK_GUID); - RakPong pong = msg.reply(guid, this.server.onQuery(msg.getSender()).toByteBuf()); + RakPong pong = msg.reply(guid, this.server.onQuery(ctx.channel(), msg.getSender()).toByteBuf()); ctx.writeAndFlush(pong); } }