diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 1341f2c3ffad5..56dd236808a2d 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -189,6 +189,10 @@ jobs: key: tpcds-${{ hashFiles('.github/workflows/benchmark.yml', 'sql/core/src/test/scala/org/apache/spark/sql/TPCDSSchema.scala') }} - name: Run benchmarks run: | + # Raise RLIMIT_MEMLOCK so the Netty io_uring transport (used by AUTO + # on Linux 5.10+) can allocate its submission/completion queue rings. + # Without this, NettyTransportBenchmark silently falls back to EPOLL. + sudo prlimit --pid $$ --memlock=unlimited:unlimited 2>/dev/null || true ./build/sbt -Pscala-${{ inputs.scala }} -Pyarn -Pkubernetes -Phive -Phive-thriftserver -Phadoop-cloud -Pkinesis-asl -Pspark-ganglia-lgpl Test/package # Make less noisy cp conf/log4j2.properties.template conf/log4j2.properties diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index d722444379acd..937dd6d66c54a 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -406,6 +406,11 @@ jobs: run: | # Fix for TTY related issues when launching the Ammonite REPL in tests. export TERM=vt100 + # Raise RLIMIT_MEMLOCK so the Netty io_uring transport can allocate + # its submission/completion queue rings. Without this, AUTO transport + # selection silently falls back to EPOLL on the default 64KB memlock, + # leaving io_uring code paths untested. + sudo prlimit --pid $$ --memlock=unlimited:unlimited 2>/dev/null || true # Hive "other tests" test needs larger metaspace size based on experiment. if [[ "$MODULES_TO_TEST" == "hive" ]] && [[ "$EXCLUDED_TAGS" == "org.apache.spark.tags.SlowHiveTest" ]]; then export METASPACE_SIZE=2g; fi # SPARK-46283: should delete the following env replacement after SPARK 3.x EOL diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml index d4cc71998604a..2aa13412bd7dc 100644 --- a/common/network-common/pom.xml +++ b/common/network-common/pom.xml @@ -51,6 +51,16 @@ netty-transport-native-epoll linux-aarch_64 + + io.netty + netty-transport-native-io_uring + linux-x86_64 + + + io.netty + netty-transport-native-io_uring + linux-aarch_64 + io.netty netty-transport-native-kqueue diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java b/common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java index 8709d30ef1be1..437b63af43c1e 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java @@ -34,7 +34,15 @@ public enum IOMode { */ KQUEUE, /** - * Prefer to use native EPOLL on Linux (or KQUEUE on MacOS) if available. Then, fallback to NIO. + * Native io_uring via JNI, Linux only. Requires kernel 5.10+. + */ + IO_URING, + /** + * Prefer to use a native transport when available. On Linux, io_uring is preferred over EPOLL + * when the running kernel supports it AND the JVM can actually allocate an io_uring ring + * (probed once via {@link NettyUtils#isIoUringUsable()}; environments with low + * {@code RLIMIT_MEMLOCK} fall through to EPOLL). On MacOS/BSD, KQUEUE is used. Falls back to + * NIO when no native transport is available. */ AUTO } diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java index c113b72f557cf..6542df6bd6027 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java @@ -18,6 +18,7 @@ package org.apache.spark.network.util; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.*; @@ -32,15 +33,24 @@ import io.netty.channel.nio.NioIoHandler; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.channel.uring.IoUring; +import io.netty.channel.uring.IoUringIoHandler; +import io.netty.channel.uring.IoUringServerSocketChannel; +import io.netty.channel.uring.IoUringSocketChannel; import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.internal.PlatformDependent; +import org.apache.spark.internal.SparkLogger; +import org.apache.spark.internal.SparkLoggerFactory; + /** * Utilities for creating various Netty constructs based on whether we're using NIO, EPOLL, - * , KQUEUE, or AUTO. + * KQUEUE, IO_URING, or AUTO. */ public class NettyUtils { + private static final SparkLogger logger = SparkLoggerFactory.getLogger(NettyUtils.class); + /** * Specifies an upper bound on the number of Netty threads that Spark requires by default. * In practice, only 2-4 cores should be required to transfer roughly 10 Gb/s, and each core @@ -56,10 +66,70 @@ public class NettyUtils { private static final PooledByteBufAllocator[] _sharedPooledByteBufAllocator = new PooledByteBufAllocator[2]; + /** + * Cached result of probing whether io_uring can actually allocate the rings Spark needs on + * this JVM. `null` means not yet probed; non-null is the probed value. + * + *

{@link IoUring#isAvailable()} only checks that the JNI library loaded and basic syscalls + * work; it does not detect environments where the kernel allows io_uring but + * {@code RLIMIT_MEMLOCK} is too low for the submission/completion queue rings (common in + * containers, GitHub Actions runners, and other restricted environments). The probe creates + * a {@link MultiThreadIoEventLoopGroup} sized to {@link #MAX_DEFAULT_NETTY_THREADS} -- which + * is the worst case Spark allocates by default for a single event loop group -- and shuts it + * down to verify ring allocation actually succeeds. Probing with one thread is insufficient + * because some restricted environments allow a single ring to fit in memlock but not the eight + * rings Spark needs in practice. + */ + private static volatile Boolean ioUringUsable = null; + public static long freeDirectMemory() { return PlatformDependent.maxDirectMemory() - PlatformDependent.usedDirectMemory(); } + /** + * Returns true if io_uring can actually be used on the running JVM. Probes once (with the + * result cached) by attempting a real ring allocation, which catches environments where + * {@link IoUring#isAvailable()} returns true but {@code RLIMIT_MEMLOCK} is too low to allocate + * the submission/completion queues (common in containers, GitHub Actions runners, and other + * restricted environments). + * + *

Used by AUTO mode and by tests that gate execution on io_uring being usable. An explicit + * {@link IOMode#IO_URING} mode does not consult this and surfaces the underlying error. + */ + public static boolean isIoUringUsable() { + Boolean cached = ioUringUsable; + if (cached != null) { + return cached; + } + synchronized (NettyUtils.class) { + if (ioUringUsable != null) { + return ioUringUsable; + } + if (!JavaUtils.isLinux || !IoUring.isAvailable()) { + ioUringUsable = false; + return false; + } + MultiThreadIoEventLoopGroup probe = null; + try { + probe = new MultiThreadIoEventLoopGroup( + MAX_DEFAULT_NETTY_THREADS, IoUringIoHandler.newFactory()); + ioUringUsable = true; + } catch (Throwable t) { + logger.warn("io_uring is reported as available but allocation of " + + MAX_DEFAULT_NETTY_THREADS + " rings failed; " + + "AUTO will fall back to EPOLL on this JVM. " + + "Common cause: RLIMIT_MEMLOCK too low (containers, restricted environments). " + + "To force io_uring, set spark.shuffle.io.mode=IO_URING explicitly.", t); + ioUringUsable = false; + } finally { + if (probe != null) { + probe.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS); + } + } + return ioUringUsable; + } + } + /** Creates a new ThreadFactory which prefixes each thread with the given name. */ public static ThreadFactory createThreadFactory(String threadPoolPrefix) { return new DefaultThreadFactory(threadPoolPrefix, true); @@ -73,8 +143,11 @@ public static EventLoopGroup createEventLoop(IOMode mode, int numThreads, String case NIO -> NioIoHandler.newFactory(); case EPOLL -> EpollIoHandler.newFactory(); case KQUEUE -> KQueueIoHandler.newFactory(); + case IO_URING -> IoUringIoHandler.newFactory(); case AUTO -> { - if (JavaUtils.isLinux && Epoll.isAvailable()) { + if (isIoUringUsable()) { + yield IoUringIoHandler.newFactory(); + } else if (JavaUtils.isLinux && Epoll.isAvailable()) { yield EpollIoHandler.newFactory(); } else if (JavaUtils.isMac && KQueue.isAvailable()) { yield KQueueIoHandler.newFactory(); @@ -92,8 +165,11 @@ public static Class getClientChannelClass(IOMode mode) { case NIO -> NioSocketChannel.class; case EPOLL -> EpollSocketChannel.class; case KQUEUE -> KQueueSocketChannel.class; + case IO_URING -> IoUringSocketChannel.class; case AUTO -> { - if (JavaUtils.isLinux && Epoll.isAvailable()) { + if (isIoUringUsable()) { + yield IoUringSocketChannel.class; + } else if (JavaUtils.isLinux && Epoll.isAvailable()) { yield EpollSocketChannel.class; } else if (JavaUtils.isMac && KQueue.isAvailable()) { yield KQueueSocketChannel.class; @@ -110,8 +186,11 @@ public static Class getServerChannelClass(IOMode mode) case NIO -> NioServerSocketChannel.class; case EPOLL -> EpollServerSocketChannel.class; case KQUEUE -> KQueueServerSocketChannel.class; + case IO_URING -> IoUringServerSocketChannel.class; case AUTO -> { - if (JavaUtils.isLinux && Epoll.isAvailable()) { + if (isIoUringUsable()) { + yield IoUringServerSocketChannel.class; + } else if (JavaUtils.isLinux && Epoll.isAvailable()) { yield EpollServerSocketChannel.class; } else if (JavaUtils.isMac && KQueue.isAvailable()) { yield KQueueServerSocketChannel.class; diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml index cf00d806ba835..606b49758b750 100644 --- a/common/network-yarn/pom.xml +++ b/common/network-yarn/pom.xml @@ -184,6 +184,12 @@ tofile="${project.build.directory}/exploded/META-INF/native/lib${spark.shade.native.packageName}_netty_transport_native_kqueue_aarch_64.jnilib" /> + + + netty-transport-native-epoll linux-aarch_64 + + io.netty + netty-transport-native-io_uring + linux-x86_64 + + + io.netty + netty-transport-native-io_uring + linux-aarch_64 + io.netty netty-transport-native-kqueue diff --git a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala index de47c79360357..b80d3607d41b1 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala @@ -20,7 +20,7 @@ package org.apache.spark import org.scalactic.source.Position import org.scalatest.Tag -import org.apache.spark.network.util.IOMode +import org.apache.spark.network.util.{IOMode, NettyUtils} import org.apache.spark.util.Utils abstract class ShuffleNettySuite extends ShuffleSuite { @@ -56,6 +56,11 @@ class ShuffleNettyKQueueSuite extends ShuffleNettySuite { override def ioMode: IOMode = IOMode.KQUEUE } +class ShuffleNettyIoUringSuite extends ShuffleNettySuite { + override def shouldRunTests: Boolean = Utils.isLinux && NettyUtils.isIoUringUsable + override def ioMode: IOMode = IOMode.IO_URING +} + class ShuffleNettyAutoSuite extends ShuffleNettySuite { override def ioMode: IOMode = IOMode.AUTO } diff --git a/core/src/test/scala/org/apache/spark/network/NettyTransportBenchmark.scala b/core/src/test/scala/org/apache/spark/network/NettyTransportBenchmark.scala index 0a3831a675c38..afed4b4fcc6e5 100644 --- a/core/src/test/scala/org/apache/spark/network/NettyTransportBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/network/NettyTransportBenchmark.scala @@ -281,9 +281,10 @@ object NettyTransportBenchmark extends BenchmarkBase { /** * Suite 3: IOMode Comparison (NIO vs AUTO). * AUTO selects the best native transport via NettyUtils.createEventLoop - * (EPOLL on Linux, KQUEUE on macOS, NIO fallback), so comparing NIO vs AUTO - * shows the benefit of native transport without needing manual probing. - * Uses concurrent load (8 clients) to amplify transport-level differences. + * (IO_URING on Linux 5.10+, then EPOLL on Linux, KQUEUE on macOS, NIO fallback), + * so comparing NIO vs AUTO shows the benefit of native transport without needing + * manual probing. Uses concurrent load (8 clients) to amplify transport-level + * differences. */ private def ioModeComparisonBenchmark(): Unit = { val payload = new Array[Byte](MEDIUM_PAYLOAD) @@ -562,8 +563,9 @@ object NettyTransportBenchmark extends BenchmarkBase { * and fetches them using client.fetchChunk(). This exercises the DefaultFileRegion * zero-copy sendfile/splice path. * - * Compares NIO vs AUTO to verify that native transports (EPOLL/KQUEUE) use sendfile() - * for file-backed transfers. AUTO should be equal to or faster than NIO. + * Compares NIO vs AUTO to verify that native transports use the kernel zero-copy + * path for file-backed transfers (sendfile() for EPOLL/KQUEUE, splice() via the + * io_uring submission queue for IO_URING). AUTO should be equal to or faster than NIO. */ private def fileBackedShuffleBenchmark(): Unit = { val numFiles = 100 diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 b/dev/deps/spark-deps-hadoop-3-hive-2.3 index 7c182a16d8d78..80d7fe49e4e2c 100644 --- a/dev/deps/spark-deps-hadoop-3-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3-hive-2.3 @@ -217,10 +217,14 @@ netty-tcnative-boringssl-static/2.0.77.Final/osx-x86_64/netty-tcnative-boringssl netty-tcnative-boringssl-static/2.0.77.Final/windows-x86_64/netty-tcnative-boringssl-static-2.0.77.Final-windows-x86_64.jar netty-tcnative-classes/2.0.77.Final//netty-tcnative-classes-2.0.77.Final.jar netty-transport-classes-epoll/4.2.13.Final//netty-transport-classes-epoll-4.2.13.Final.jar +netty-transport-classes-io_uring/4.2.13.Final//netty-transport-classes-io_uring-4.2.13.Final.jar netty-transport-classes-kqueue/4.2.13.Final//netty-transport-classes-kqueue-4.2.13.Final.jar netty-transport-native-epoll/4.2.13.Final/linux-aarch_64/netty-transport-native-epoll-4.2.13.Final-linux-aarch_64.jar netty-transport-native-epoll/4.2.13.Final/linux-riscv64/netty-transport-native-epoll-4.2.13.Final-linux-riscv64.jar netty-transport-native-epoll/4.2.13.Final/linux-x86_64/netty-transport-native-epoll-4.2.13.Final-linux-x86_64.jar +netty-transport-native-io_uring/4.2.13.Final/linux-aarch_64/netty-transport-native-io_uring-4.2.13.Final-linux-aarch_64.jar +netty-transport-native-io_uring/4.2.13.Final/linux-riscv64/netty-transport-native-io_uring-4.2.13.Final-linux-riscv64.jar +netty-transport-native-io_uring/4.2.13.Final/linux-x86_64/netty-transport-native-io_uring-4.2.13.Final-linux-x86_64.jar netty-transport-native-kqueue/4.2.13.Final/osx-aarch_64/netty-transport-native-kqueue-4.2.13.Final-osx-aarch_64.jar netty-transport-native-kqueue/4.2.13.Final/osx-x86_64/netty-transport-native-kqueue-4.2.13.Final-osx-x86_64.jar netty-transport-native-unix-common/4.2.13.Final//netty-transport-native-unix-common-4.2.13.Final.jar diff --git a/pom.xml b/pom.xml index 1d2b847a2f8f6..534a3fe3fa42a 100644 --- a/pom.xml +++ b/pom.xml @@ -1004,14 +1004,6 @@ io.netty netty-transport-udt - - io.netty - netty-transport-classes-io_uring - - - io.netty - netty-transport-native-io_uring - io.netty netty-handler-ssl-ocsp @@ -1037,6 +1029,18 @@ ${netty.version} linux-aarch_64 + + io.netty + netty-transport-native-io_uring + ${netty.version} + linux-x86_64 + + + io.netty + netty-transport-native-io_uring + ${netty.version} + linux-aarch_64 + io.netty netty-transport-native-kqueue