From 0b214638e3660629c7bc2252f61a0478e0b36f33 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Thu, 14 May 2026 20:01:25 +0800 Subject: [PATCH 1/7] [SPARK-XXXXX][CORE] Add IO_URING transport mode for Linux 5.10+ ### What changes were proposed in this pull request? Enable Netty's io_uring native transport in Spark by: 1. Removing the `netty-transport-classes-io_uring` and `netty-transport-native-io_uring` exclusions from `netty-all` in the root `pom.xml`, and adding explicit `linux-x86_64` / `linux-aarch_64` classifier dependencies under `dependencyManagement`. 2. Mirroring the same native classifier dependencies in `common/network-common/pom.xml` and `core/pom.xml`. 3. Adding `IO_URING` to `IOMode` and wiring it into `NettyUtils.createEventLoop` / `getClientChannelClass` / `getServerChannelClass`. In `AUTO`, io_uring is preferred on Linux when `IoUring.isAvailable()` reports the running kernel supports it, then EPOLL, then KQUEUE on macOS, then NIO. 4. Adding `ShuffleNettyIoUringSuite` (gated on `Utils.isLinux && IoUring.isAvailable`) so the existing shuffle coverage exercises the new mode where the platform supports it. 5. Refreshing `NettyTransportBenchmark` comments so the AUTO behavior change is visible at the call sites; the existing `NIO vs AUTO` suites automatically exercise io_uring on Linux 5.10+. 6. Regenerating `dev/deps/spark-deps-hadoop-3-hive-2.3` to include the new `netty-transport-classes-io_uring` and `netty-transport-native-io_uring` (linux-x86_64 / linux-aarch_64 / linux-riscv64) entries. ### Why are the changes needed? io_uring graduated from incubator to a first-class transport in Netty 4.2 (`io.netty.channel.uring`). Compared to EPOLL it batches I/O operations through submission/completion queues, reducing per-op syscall overhead on busy executors, and uses `IORING_OP_SPLICE` for `FileRegion` writes -- functionally equivalent to `sendfile()` but fully asynchronous. SPARK-56279 already updated `MessageEncoder` to emit the header `ByteBuf` and the bare `DefaultFileRegion` separately when the body is a `FileSegmentManagedBuffer`, which means the io_uring write path can recognize the `DefaultFileRegion` and apply splice without any additional Spark-side change. ### Does this PR introduce _any_ user-facing change? Yes. On Linux kernels 5.10+, `spark.shuffle.io.mode=AUTO` (the default) now selects io_uring instead of EPOLL when io_uring is available. Operators who want the previous behavior can set `spark.shuffle.io.mode=EPOLL` explicitly. A new explicit `IO_URING` mode is also available. ### How was this patch tested? - Manual SBT compile of `network-common`, `core`, `core/Test`, `network-shuffle`, and (with `-Pyarn`) `network-yarn` on macOS. - `ShuffleNettyIoUringSuite` mirrors `ShuffleNettyEpollSuite` and runs the existing `ShuffleSuite` cases under `IO_URING` on Linux 5.10+ via GitHub Actions. - macOS runs continue to take the KQUEUE path; Linux runs without io_uring kernel support fall back to EPOLL. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Cursor 1.x --- common/network-common/pom.xml | 10 +++++++++ .../org/apache/spark/network/util/IOMode.java | 8 ++++++- .../apache/spark/network/util/NettyUtils.java | 21 +++++++++++++++---- core/pom.xml | 10 +++++++++ .../org/apache/spark/ShuffleNettySuite.scala | 7 +++++++ .../network/NettyTransportBenchmark.scala | 12 ++++++----- dev/deps/spark-deps-hadoop-3-hive-2.3 | 4 ++++ pom.xml | 20 +++++++++++------- 8 files changed, 74 insertions(+), 18 deletions(-) diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml index d4cc71998604a..2aa13412bd7dc 100644 --- a/common/network-common/pom.xml +++ b/common/network-common/pom.xml @@ -51,6 +51,16 @@ netty-transport-native-epoll linux-aarch_64 + + io.netty + netty-transport-native-io_uring + linux-x86_64 + + + io.netty + netty-transport-native-io_uring + linux-aarch_64 + io.netty netty-transport-native-kqueue diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java b/common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java index 8709d30ef1be1..19d223d4a44ab 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java @@ -34,7 +34,13 @@ public enum IOMode { */ KQUEUE, /** - * Prefer to use native EPOLL on Linux (or KQUEUE on MacOS) if available. Then, fallback to NIO. + * Native io_uring via JNI, Linux only. Requires kernel 5.10+. + */ + IO_URING, + /** + * Prefer to use a native transport when available. On Linux, io_uring is preferred over EPOLL + * when the running kernel supports it; otherwise EPOLL is used. On MacOS/BSD, KQUEUE is used. + * Falls back to NIO when no native transport is available. */ AUTO } diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java index c113b72f557cf..e7a0849cc6b21 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java @@ -32,12 +32,16 @@ import io.netty.channel.nio.NioIoHandler; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.channel.uring.IoUring; +import io.netty.channel.uring.IoUringIoHandler; +import io.netty.channel.uring.IoUringServerSocketChannel; +import io.netty.channel.uring.IoUringSocketChannel; import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.internal.PlatformDependent; /** * Utilities for creating various Netty constructs based on whether we're using NIO, EPOLL, - * , KQUEUE, or AUTO. + * KQUEUE, IO_URING, or AUTO. */ public class NettyUtils { @@ -73,8 +77,11 @@ public static EventLoopGroup createEventLoop(IOMode mode, int numThreads, String case NIO -> NioIoHandler.newFactory(); case EPOLL -> EpollIoHandler.newFactory(); case KQUEUE -> KQueueIoHandler.newFactory(); + case IO_URING -> IoUringIoHandler.newFactory(); case AUTO -> { - if (JavaUtils.isLinux && Epoll.isAvailable()) { + if (JavaUtils.isLinux && IoUring.isAvailable()) { + yield IoUringIoHandler.newFactory(); + } else if (JavaUtils.isLinux && Epoll.isAvailable()) { yield EpollIoHandler.newFactory(); } else if (JavaUtils.isMac && KQueue.isAvailable()) { yield KQueueIoHandler.newFactory(); @@ -92,8 +99,11 @@ public static Class getClientChannelClass(IOMode mode) { case NIO -> NioSocketChannel.class; case EPOLL -> EpollSocketChannel.class; case KQUEUE -> KQueueSocketChannel.class; + case IO_URING -> IoUringSocketChannel.class; case AUTO -> { - if (JavaUtils.isLinux && Epoll.isAvailable()) { + if (JavaUtils.isLinux && IoUring.isAvailable()) { + yield IoUringSocketChannel.class; + } else if (JavaUtils.isLinux && Epoll.isAvailable()) { yield EpollSocketChannel.class; } else if (JavaUtils.isMac && KQueue.isAvailable()) { yield KQueueSocketChannel.class; @@ -110,8 +120,11 @@ public static Class getServerChannelClass(IOMode mode) case NIO -> NioServerSocketChannel.class; case EPOLL -> EpollServerSocketChannel.class; case KQUEUE -> KQueueServerSocketChannel.class; + case IO_URING -> IoUringServerSocketChannel.class; case AUTO -> { - if (JavaUtils.isLinux && Epoll.isAvailable()) { + if (JavaUtils.isLinux && IoUring.isAvailable()) { + yield IoUringServerSocketChannel.class; + } else if (JavaUtils.isLinux && Epoll.isAvailable()) { yield EpollServerSocketChannel.class; } else if (JavaUtils.isMac && KQueue.isAvailable()) { yield KQueueServerSocketChannel.class; diff --git a/core/pom.xml b/core/pom.xml index 88f6525b8caf3..414486ce7c48d 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -323,6 +323,16 @@ netty-transport-native-epoll linux-aarch_64 + + io.netty + netty-transport-native-io_uring + linux-x86_64 + + + io.netty + netty-transport-native-io_uring + linux-aarch_64 + io.netty netty-transport-native-kqueue diff --git a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala index de47c79360357..a5cd498e83da5 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala @@ -20,6 +20,8 @@ package org.apache.spark import org.scalactic.source.Position import org.scalatest.Tag +import _root_.io.netty.channel.uring.IoUring + import org.apache.spark.network.util.IOMode import org.apache.spark.util.Utils @@ -56,6 +58,11 @@ class ShuffleNettyKQueueSuite extends ShuffleNettySuite { override def ioMode: IOMode = IOMode.KQUEUE } +class ShuffleNettyIoUringSuite extends ShuffleNettySuite { + override def shouldRunTests: Boolean = Utils.isLinux && IoUring.isAvailable + override def ioMode: IOMode = IOMode.IO_URING +} + class ShuffleNettyAutoSuite extends ShuffleNettySuite { override def ioMode: IOMode = IOMode.AUTO } diff --git a/core/src/test/scala/org/apache/spark/network/NettyTransportBenchmark.scala b/core/src/test/scala/org/apache/spark/network/NettyTransportBenchmark.scala index 0a3831a675c38..afed4b4fcc6e5 100644 --- a/core/src/test/scala/org/apache/spark/network/NettyTransportBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/network/NettyTransportBenchmark.scala @@ -281,9 +281,10 @@ object NettyTransportBenchmark extends BenchmarkBase { /** * Suite 3: IOMode Comparison (NIO vs AUTO). * AUTO selects the best native transport via NettyUtils.createEventLoop - * (EPOLL on Linux, KQUEUE on macOS, NIO fallback), so comparing NIO vs AUTO - * shows the benefit of native transport without needing manual probing. - * Uses concurrent load (8 clients) to amplify transport-level differences. + * (IO_URING on Linux 5.10+, then EPOLL on Linux, KQUEUE on macOS, NIO fallback), + * so comparing NIO vs AUTO shows the benefit of native transport without needing + * manual probing. Uses concurrent load (8 clients) to amplify transport-level + * differences. */ private def ioModeComparisonBenchmark(): Unit = { val payload = new Array[Byte](MEDIUM_PAYLOAD) @@ -562,8 +563,9 @@ object NettyTransportBenchmark extends BenchmarkBase { * and fetches them using client.fetchChunk(). This exercises the DefaultFileRegion * zero-copy sendfile/splice path. * - * Compares NIO vs AUTO to verify that native transports (EPOLL/KQUEUE) use sendfile() - * for file-backed transfers. AUTO should be equal to or faster than NIO. + * Compares NIO vs AUTO to verify that native transports use the kernel zero-copy + * path for file-backed transfers (sendfile() for EPOLL/KQUEUE, splice() via the + * io_uring submission queue for IO_URING). AUTO should be equal to or faster than NIO. */ private def fileBackedShuffleBenchmark(): Unit = { val numFiles = 100 diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 b/dev/deps/spark-deps-hadoop-3-hive-2.3 index 7c182a16d8d78..80d7fe49e4e2c 100644 --- a/dev/deps/spark-deps-hadoop-3-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3-hive-2.3 @@ -217,10 +217,14 @@ netty-tcnative-boringssl-static/2.0.77.Final/osx-x86_64/netty-tcnative-boringssl netty-tcnative-boringssl-static/2.0.77.Final/windows-x86_64/netty-tcnative-boringssl-static-2.0.77.Final-windows-x86_64.jar netty-tcnative-classes/2.0.77.Final//netty-tcnative-classes-2.0.77.Final.jar netty-transport-classes-epoll/4.2.13.Final//netty-transport-classes-epoll-4.2.13.Final.jar +netty-transport-classes-io_uring/4.2.13.Final//netty-transport-classes-io_uring-4.2.13.Final.jar netty-transport-classes-kqueue/4.2.13.Final//netty-transport-classes-kqueue-4.2.13.Final.jar netty-transport-native-epoll/4.2.13.Final/linux-aarch_64/netty-transport-native-epoll-4.2.13.Final-linux-aarch_64.jar netty-transport-native-epoll/4.2.13.Final/linux-riscv64/netty-transport-native-epoll-4.2.13.Final-linux-riscv64.jar netty-transport-native-epoll/4.2.13.Final/linux-x86_64/netty-transport-native-epoll-4.2.13.Final-linux-x86_64.jar +netty-transport-native-io_uring/4.2.13.Final/linux-aarch_64/netty-transport-native-io_uring-4.2.13.Final-linux-aarch_64.jar +netty-transport-native-io_uring/4.2.13.Final/linux-riscv64/netty-transport-native-io_uring-4.2.13.Final-linux-riscv64.jar +netty-transport-native-io_uring/4.2.13.Final/linux-x86_64/netty-transport-native-io_uring-4.2.13.Final-linux-x86_64.jar netty-transport-native-kqueue/4.2.13.Final/osx-aarch_64/netty-transport-native-kqueue-4.2.13.Final-osx-aarch_64.jar netty-transport-native-kqueue/4.2.13.Final/osx-x86_64/netty-transport-native-kqueue-4.2.13.Final-osx-x86_64.jar netty-transport-native-unix-common/4.2.13.Final//netty-transport-native-unix-common-4.2.13.Final.jar diff --git a/pom.xml b/pom.xml index 1d2b847a2f8f6..534a3fe3fa42a 100644 --- a/pom.xml +++ b/pom.xml @@ -1004,14 +1004,6 @@ io.netty netty-transport-udt - - io.netty - netty-transport-classes-io_uring - - - io.netty - netty-transport-native-io_uring - io.netty netty-handler-ssl-ocsp @@ -1037,6 +1029,18 @@ ${netty.version} linux-aarch_64 + + io.netty + netty-transport-native-io_uring + ${netty.version} + linux-x86_64 + + + io.netty + netty-transport-native-io_uring + ${netty.version} + linux-aarch_64 + io.netty netty-transport-native-kqueue From a03118b890c15886511645a802127abec2e608d8 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Thu, 14 May 2026 20:12:20 +0800 Subject: [PATCH 2/7] [SPARK-XXXXX][CORE][FOLLOWUP] Hide io_uring availability check behind NettyUtils helper Replace the `_root_.io.netty.channel.uring.IoUring.isAvailable` reference in `ShuffleNettyIoUringSuite` with a small `NettyUtils.isIoUringAvailable()` helper. The `_root_.` prefix was needed because `org.apache.spark.io` shadows `io.netty.*` in this file's package, but it reads as unusual. The helper keeps the Netty-specific class out of the test scope. --- .../main/java/org/apache/spark/network/util/NettyUtils.java | 5 +++++ .../src/test/scala/org/apache/spark/ShuffleNettySuite.scala | 6 ++---- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java index e7a0849cc6b21..361122a1579ab 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java @@ -64,6 +64,11 @@ public static long freeDirectMemory() { return PlatformDependent.maxDirectMemory() - PlatformDependent.usedDirectMemory(); } + /** Returns true if the io_uring native transport is available on the running JVM. */ + public static boolean isIoUringAvailable() { + return IoUring.isAvailable(); + } + /** Creates a new ThreadFactory which prefixes each thread with the given name. */ public static ThreadFactory createThreadFactory(String threadPoolPrefix) { return new DefaultThreadFactory(threadPoolPrefix, true); diff --git a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala index a5cd498e83da5..ddad931d4150e 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala @@ -20,9 +20,7 @@ package org.apache.spark import org.scalactic.source.Position import org.scalatest.Tag -import _root_.io.netty.channel.uring.IoUring - -import org.apache.spark.network.util.IOMode +import org.apache.spark.network.util.{IOMode, NettyUtils} import org.apache.spark.util.Utils abstract class ShuffleNettySuite extends ShuffleSuite { @@ -59,7 +57,7 @@ class ShuffleNettyKQueueSuite extends ShuffleNettySuite { } class ShuffleNettyIoUringSuite extends ShuffleNettySuite { - override def shouldRunTests: Boolean = Utils.isLinux && IoUring.isAvailable + override def shouldRunTests: Boolean = Utils.isLinux && NettyUtils.isIoUringAvailable override def ioMode: IOMode = IOMode.IO_URING } From 9e65d44928019b767a51700f2120f1c559cc65c6 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Thu, 14 May 2026 20:16:30 +0800 Subject: [PATCH 3/7] [SPARK-XXXXX][CORE][FOLLOWUP] Shade io_uring native libs in YARN shuffle service Add antrun `move` rules so the YARN external shuffle service relocates the io_uring native libraries alongside the existing epoll/kqueue/tcnative ones. Without this, the shaded `org.sparkproject.io.netty` classes look for `liborg_sparkproject_netty_transport_native_io_uring42_*.so`, but the unshaded files are named `libnetty_transport_native_io_uring42_*.so`, so `IoUring.isAvailable()` returns `false` inside the YARN shuffle service JVM and io_uring is silently unused. Note: Netty 4.2 names the io_uring native lib `io_uring42_` (with the major+minor version suffix to allow multiple Netty versions to coexist), unlike epoll which uses the unsuffixed `epoll_`. Verified with `build/mvn -pl common/network-yarn -am -Pyarn -DskipTests package`: the resulting `spark-*-yarn-shuffle.jar` contains `META-INF/native/liborg_sparkproject_netty_transport_native_io_uring42_{x86_64,aarch_64,riscv64}.so`. --- common/network-yarn/pom.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml index cf00d806ba835..606b49758b750 100644 --- a/common/network-yarn/pom.xml +++ b/common/network-yarn/pom.xml @@ -184,6 +184,12 @@ tofile="${project.build.directory}/exploded/META-INF/native/lib${spark.shade.native.packageName}_netty_transport_native_kqueue_aarch_64.jnilib" /> + + + Date: Thu, 14 May 2026 23:32:40 +0800 Subject: [PATCH 4/7] [SPARK-XXXXX][CORE][FOLLOWUP] Probe io_uring ring allocation for AUTO fallback `IoUring.isAvailable()` only verifies that the JNI library loaded and the basic syscalls work; it does not detect environments where the kernel supports io_uring but `RLIMIT_MEMLOCK` is too low to actually allocate the submission/completion queue rings. This is common in containers, GitHub Actions runners, and other restricted environments, and surfaces as: java.lang.IllegalStateException: failed to create a child event loop Caused by: java.lang.RuntimeException: failed to allocate memory for io_uring ring; try raising memlock limit (see getrlimit(RLIMIT_MEMLOCK, ...) or ulimit -l): Cannot allocate memory at io.netty.channel.uring.IoUringIoHandler.(...) After SPARK-XXXXX (the parent change) made AUTO prefer io_uring on Linux, this caused unconditional failures in such environments rather than graceful fallback to EPOLL. This change adds a one-time JVM-wide probe in `NettyUtils` that creates a single-thread `MultiThreadIoEventLoopGroup` with the io_uring handler factory and shuts it down. If construction throws, the result is cached as `false` and AUTO falls back to EPOLL. The probe is consulted by AUTO mode and by `ShuffleNettyIoUringSuite.shouldRunTests`. An explicit `IOMode.IO_URING` does not consult the probe and surfaces the underlying error so users see what's wrong. The previous `isIoUringAvailable()` helper (which just delegated to `IoUring.isAvailable()`) is replaced by `isIoUringUsable()`, which returns the probed result. --- .../org/apache/spark/network/util/IOMode.java | 6 +- .../apache/spark/network/util/NettyUtils.java | 68 +++++++++++++++++-- .../org/apache/spark/ShuffleNettySuite.scala | 2 +- 3 files changed, 67 insertions(+), 9 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java b/common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java index 19d223d4a44ab..437b63af43c1e 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java @@ -39,8 +39,10 @@ public enum IOMode { IO_URING, /** * Prefer to use a native transport when available. On Linux, io_uring is preferred over EPOLL - * when the running kernel supports it; otherwise EPOLL is used. On MacOS/BSD, KQUEUE is used. - * Falls back to NIO when no native transport is available. + * when the running kernel supports it AND the JVM can actually allocate an io_uring ring + * (probed once via {@link NettyUtils#isIoUringUsable()}; environments with low + * {@code RLIMIT_MEMLOCK} fall through to EPOLL). On MacOS/BSD, KQUEUE is used. Falls back to + * NIO when no native transport is available. */ AUTO } diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java index 361122a1579ab..32e09d7fae4ad 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java @@ -18,6 +18,7 @@ package org.apache.spark.network.util; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.*; @@ -39,12 +40,17 @@ import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.internal.PlatformDependent; +import org.apache.spark.internal.SparkLogger; +import org.apache.spark.internal.SparkLoggerFactory; + /** * Utilities for creating various Netty constructs based on whether we're using NIO, EPOLL, * KQUEUE, IO_URING, or AUTO. */ public class NettyUtils { + private static final SparkLogger logger = SparkLoggerFactory.getLogger(NettyUtils.class); + /** * Specifies an upper bound on the number of Netty threads that Spark requires by default. * In practice, only 2-4 cores should be required to transfer roughly 10 Gb/s, and each core @@ -60,13 +66,63 @@ public class NettyUtils { private static final PooledByteBufAllocator[] _sharedPooledByteBufAllocator = new PooledByteBufAllocator[2]; + /** + * Cached result of probing whether io_uring can actually allocate a ring on this JVM. + * `null` means not yet probed; non-null is the probed value. + * + *

{@link IoUring#isAvailable()} only checks that the JNI library loaded and basic syscalls + * work; it does not detect environments where the kernel allows io_uring but + * {@code RLIMIT_MEMLOCK} is too low for the submission/completion queue rings (common in + * containers, GitHub Actions runners, and other restricted environments). The probe creates + * a one-thread {@link MultiThreadIoEventLoopGroup} and shuts it down to verify ring + * allocation actually succeeds. + */ + private static volatile Boolean ioUringUsable = null; + public static long freeDirectMemory() { return PlatformDependent.maxDirectMemory() - PlatformDependent.usedDirectMemory(); } - /** Returns true if the io_uring native transport is available on the running JVM. */ - public static boolean isIoUringAvailable() { - return IoUring.isAvailable(); + /** + * Returns true if io_uring can actually be used on the running JVM. Probes once (with the + * result cached) by attempting a real ring allocation, which catches environments where + * {@link IoUring#isAvailable()} returns true but {@code RLIMIT_MEMLOCK} is too low to allocate + * the submission/completion queues (common in containers, GitHub Actions runners, and other + * restricted environments). + * + *

Used by AUTO mode and by tests that gate execution on io_uring being usable. An explicit + * {@link IOMode#IO_URING} mode does not consult this and surfaces the underlying error. + */ + public static boolean isIoUringUsable() { + Boolean cached = ioUringUsable; + if (cached != null) { + return cached; + } + synchronized (NettyUtils.class) { + if (ioUringUsable != null) { + return ioUringUsable; + } + if (!JavaUtils.isLinux || !IoUring.isAvailable()) { + ioUringUsable = false; + return false; + } + MultiThreadIoEventLoopGroup probe = null; + try { + probe = new MultiThreadIoEventLoopGroup(1, IoUringIoHandler.newFactory()); + ioUringUsable = true; + } catch (Throwable t) { + logger.warn("io_uring is reported as available but ring allocation failed; " + + "AUTO will fall back to EPOLL on this JVM. " + + "Common cause: RLIMIT_MEMLOCK too low (containers, restricted environments). " + + "To force io_uring, set spark.shuffle.io.mode=IO_URING explicitly.", t); + ioUringUsable = false; + } finally { + if (probe != null) { + probe.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS); + } + } + return ioUringUsable; + } } /** Creates a new ThreadFactory which prefixes each thread with the given name. */ @@ -84,7 +140,7 @@ public static EventLoopGroup createEventLoop(IOMode mode, int numThreads, String case KQUEUE -> KQueueIoHandler.newFactory(); case IO_URING -> IoUringIoHandler.newFactory(); case AUTO -> { - if (JavaUtils.isLinux && IoUring.isAvailable()) { + if (isIoUringUsable()) { yield IoUringIoHandler.newFactory(); } else if (JavaUtils.isLinux && Epoll.isAvailable()) { yield EpollIoHandler.newFactory(); @@ -106,7 +162,7 @@ public static Class getClientChannelClass(IOMode mode) { case KQUEUE -> KQueueSocketChannel.class; case IO_URING -> IoUringSocketChannel.class; case AUTO -> { - if (JavaUtils.isLinux && IoUring.isAvailable()) { + if (isIoUringUsable()) { yield IoUringSocketChannel.class; } else if (JavaUtils.isLinux && Epoll.isAvailable()) { yield EpollSocketChannel.class; @@ -127,7 +183,7 @@ public static Class getServerChannelClass(IOMode mode) case KQUEUE -> KQueueServerSocketChannel.class; case IO_URING -> IoUringServerSocketChannel.class; case AUTO -> { - if (JavaUtils.isLinux && IoUring.isAvailable()) { + if (isIoUringUsable()) { yield IoUringServerSocketChannel.class; } else if (JavaUtils.isLinux && Epoll.isAvailable()) { yield EpollServerSocketChannel.class; diff --git a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala index ddad931d4150e..b80d3607d41b1 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala @@ -57,7 +57,7 @@ class ShuffleNettyKQueueSuite extends ShuffleNettySuite { } class ShuffleNettyIoUringSuite extends ShuffleNettySuite { - override def shouldRunTests: Boolean = Utils.isLinux && NettyUtils.isIoUringAvailable + override def shouldRunTests: Boolean = Utils.isLinux && NettyUtils.isIoUringUsable override def ioMode: IOMode = IOMode.IO_URING } From aecb9a16436cef910f9e4bc8f5bdcf4762515f59 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Thu, 14 May 2026 23:50:42 +0800 Subject: [PATCH 5/7] [SPARK-XXXXX][INFRA][FOLLOWUP] Raise RLIMIT_MEMLOCK in CI so io_uring is exercised The probe-based fallback added by the previous follow-up makes Spark gracefully degrade to EPOLL when AUTO cannot allocate an io_uring ring (low `RLIMIT_MEMLOCK`, common in containers and GitHub Actions runners). Without raising the limit in CI, the io_uring code path would be silently skipped on every PR and never exercised before release. Add `sudo prlimit --pid $$ --memlock=unlimited:unlimited` (Linux-only, fail-soft via `2>/dev/null || true` so it's a no-op on macOS/Windows runners) at the top of: - `.github/workflows/build_and_test.yml` "Run tests" step, so module builds (yarn, core, network-shuffle, mllib, etc.) that hit `IOMode.AUTO` actually use io_uring on Linux 5.10+ and `ShuffleNettyIoUringSuite` runs instead of skipping via `NettyUtils.isIoUringUsable`. - `.github/workflows/benchmark.yml` "Run benchmarks" step, so `NettyTransportBenchmark`'s NIO-vs-AUTO comparison and file-backed shuffle suite measure io_uring rather than EPOLL. The fail-soft is important: stock GHA Linux runners support sudo prlimit, but stripped-down environments (e.g., custom containers used by some matrix jobs) might not, and we don't want the CI step to fail just because memlock could not be raised. The probe in `NettyUtils` will then degrade to EPOLL as designed. --- .github/workflows/benchmark.yml | 4 ++++ .github/workflows/build_and_test.yml | 7 ++++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 1341f2c3ffad5..56dd236808a2d 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -189,6 +189,10 @@ jobs: key: tpcds-${{ hashFiles('.github/workflows/benchmark.yml', 'sql/core/src/test/scala/org/apache/spark/sql/TPCDSSchema.scala') }} - name: Run benchmarks run: | + # Raise RLIMIT_MEMLOCK so the Netty io_uring transport (used by AUTO + # on Linux 5.10+) can allocate its submission/completion queue rings. + # Without this, NettyTransportBenchmark silently falls back to EPOLL. + sudo prlimit --pid $$ --memlock=unlimited:unlimited 2>/dev/null || true ./build/sbt -Pscala-${{ inputs.scala }} -Pyarn -Pkubernetes -Phive -Phive-thriftserver -Phadoop-cloud -Pkinesis-asl -Pspark-ganglia-lgpl Test/package # Make less noisy cp conf/log4j2.properties.template conf/log4j2.properties diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index d722444379acd..519e7d01f72f8 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -406,10 +406,15 @@ jobs: run: | # Fix for TTY related issues when launching the Ammonite REPL in tests. export TERM=vt100 + # Raise RLIMIT_MEMLOCK so the Netty io_uring transport can allocate + # its submission/completion queue rings. Without this, AUTO transport + # selection silently falls back to EPOLL on the default 64KB memlock, + # leaving io_uring code paths untested. + sudo prlimit --pid $$ --memlock=unlimited:unlimited 2>/dev/null || true # Hive "other tests" test needs larger metaspace size based on experiment. if [[ "$MODULES_TO_TEST" == "hive" ]] && [[ "$EXCLUDED_TAGS" == "org.apache.spark.tags.SlowHiveTest" ]]; then export METASPACE_SIZE=2g; fi # SPARK-46283: should delete the following env replacement after SPARK 3.x EOL - if [[ "$MODULES_TO_TEST" == *"streaming-kinesis-asl"* ]] && [[ "${{ inputs.branch }}" =~ ^branch-3 ]]; then + if [[ "$MODULES_TO_TEST" == *"streaming-kinesis-asl"* ]] && [[ "${{ inputs.branch }}" =~ ^branch-3 ]]; then MODULES_TO_TEST=${MODULES_TO_TEST//streaming-kinesis-asl, /} fi export SERIAL_SBT_TESTS=1 From 645500757027fc851399a22049d0619de03fb223 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Thu, 14 May 2026 23:56:18 +0800 Subject: [PATCH 6/7] [SPARK-XXXXX][INFRA][FOLLOWUP] Restore trailing whitespace on build_and_test.yml line 417 Inadvertently stripped by the previous CI commit's surrounding edit. Pure whitespace; no behavioral change. --- .github/workflows/build_and_test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 519e7d01f72f8..937dd6d66c54a 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -414,7 +414,7 @@ jobs: # Hive "other tests" test needs larger metaspace size based on experiment. if [[ "$MODULES_TO_TEST" == "hive" ]] && [[ "$EXCLUDED_TAGS" == "org.apache.spark.tags.SlowHiveTest" ]]; then export METASPACE_SIZE=2g; fi # SPARK-46283: should delete the following env replacement after SPARK 3.x EOL - if [[ "$MODULES_TO_TEST" == *"streaming-kinesis-asl"* ]] && [[ "${{ inputs.branch }}" =~ ^branch-3 ]]; then + if [[ "$MODULES_TO_TEST" == *"streaming-kinesis-asl"* ]] && [[ "${{ inputs.branch }}" =~ ^branch-3 ]]; then MODULES_TO_TEST=${MODULES_TO_TEST//streaming-kinesis-asl, /} fi export SERIAL_SBT_TESTS=1 From aaec23eadbe6a36da58e837182bb76c59a66c6a8 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Fri, 15 May 2026 11:09:43 +0800 Subject: [PATCH 7/7] [SPARK-XXXXX][CORE][FOLLOWUP] Probe io_uring with worst-case thread count, not 1 The previous probe created a one-thread MultiThreadIoEventLoopGroup to verify io_uring ring allocation works. This is insufficient in environments (e.g., GHA Docker container jobs for pyspark) where the container's RLIMIT_MEMLOCK is just large enough for one io_uring ring but not the eight rings Spark allocates by default per event loop group. The probe would succeed, AUTO would pick io_uring, then TransportServer.init -> createEventLoop(numThreads=8) would crash with `failed to allocate memory for io_uring ring` and propagate the exception out of SparkContext construction. Probe with MAX_DEFAULT_NETTY_THREADS rings instead. This matches the worst-case allocation size Spark uses by default for a single event loop group, so any environment whose memlock can't support real Spark usage now correctly falls back to EPOLL at probe time. Users who explicitly raise spark.shuffle.io.serverThreads (or the analogous client/chunk-fetch knobs) above MAX_DEFAULT_NETTY_THREADS remain responsible for ensuring their environment can support the larger ring count; otherwise they should set spark.shuffle.io.mode to EPOLL explicitly. Observed in the pyspark CI matrix where runs sit inside a Docker container that does not honor `sudo prlimit --memlock=unlimited` from the workflow shell, leaving the JVM with the container's default memlock. --- .../apache/spark/network/util/NettyUtils.java | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java index 32e09d7fae4ad..6542df6bd6027 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java @@ -67,15 +67,18 @@ public class NettyUtils { new PooledByteBufAllocator[2]; /** - * Cached result of probing whether io_uring can actually allocate a ring on this JVM. - * `null` means not yet probed; non-null is the probed value. + * Cached result of probing whether io_uring can actually allocate the rings Spark needs on + * this JVM. `null` means not yet probed; non-null is the probed value. * *

{@link IoUring#isAvailable()} only checks that the JNI library loaded and basic syscalls * work; it does not detect environments where the kernel allows io_uring but * {@code RLIMIT_MEMLOCK} is too low for the submission/completion queue rings (common in * containers, GitHub Actions runners, and other restricted environments). The probe creates - * a one-thread {@link MultiThreadIoEventLoopGroup} and shuts it down to verify ring - * allocation actually succeeds. + * a {@link MultiThreadIoEventLoopGroup} sized to {@link #MAX_DEFAULT_NETTY_THREADS} -- which + * is the worst case Spark allocates by default for a single event loop group -- and shuts it + * down to verify ring allocation actually succeeds. Probing with one thread is insufficient + * because some restricted environments allow a single ring to fit in memlock but not the eight + * rings Spark needs in practice. */ private static volatile Boolean ioUringUsable = null; @@ -108,10 +111,12 @@ public static boolean isIoUringUsable() { } MultiThreadIoEventLoopGroup probe = null; try { - probe = new MultiThreadIoEventLoopGroup(1, IoUringIoHandler.newFactory()); + probe = new MultiThreadIoEventLoopGroup( + MAX_DEFAULT_NETTY_THREADS, IoUringIoHandler.newFactory()); ioUringUsable = true; } catch (Throwable t) { - logger.warn("io_uring is reported as available but ring allocation failed; " + + logger.warn("io_uring is reported as available but allocation of " + + MAX_DEFAULT_NETTY_THREADS + " rings failed; " + "AUTO will fall back to EPOLL on this JVM. " + "Common cause: RLIMIT_MEMLOCK too low (containers, restricted environments). " + "To force io_uring, set spark.shuffle.io.mode=IO_URING explicitly.", t);