From b86a89596f36280a77d9a7c98ef2a6782870310d Mon Sep 17 00:00:00 2001 From: oech3 <79379754+oech3@users.noreply.github.com> Date: Sat, 6 Jun 2026 01:32:42 +0900 Subject: [PATCH 1/2] cat, tail, head:allways splice with broker pipe for throughput --- src/uucore/src/lib/features/pipes.rs | 106 +++++++++------------------ 1 file changed, 36 insertions(+), 70 deletions(-) diff --git a/src/uucore/src/lib/features/pipes.rs b/src/uucore/src/lib/features/pipes.rs index 076e4fbdd59..e6be54c81d1 100644 --- a/src/uucore/src/lib/features/pipes.rs +++ b/src/uucore/src/lib/features/pipes.rs @@ -82,24 +82,22 @@ pub fn might_fuse(source: &impl AsFd) -> bool { rustix::fs::fstatfs(source).map_or(true, |stats| stats.f_type == 0x6573_5546) // FUSE magic number, too many platform specific clippy warning with const } -/// splice all of source to dest -/// returns Ok(()) at end of file -#[inline] -fn splice_unbounded(source: &impl AsFd, dest: &mut impl AsFd) -> rustix::io::Result<()> { - while splice(&source, &dest, MAX_ROOTLESS_PIPE_SIZE)? > 0 {} - Ok(()) -} - /// force-splice source to dest even both of them are not pipe via broker pipe /// -/// This should not be used if one of them are pipe to save resources +/// throughput is better than direct splice for the case one of in/output is pipe by unknown reason +/// This includes read ahead and optimization for stdout's pipe size #[inline] -fn splice_unbounded_broker(source: &impl AsFd, dest: &mut impl AsFd) -> PipeRes { +pub fn splice_unbounded_auto(source: &impl AsFd, dest: &mut impl AsFd) -> PipeRes { static PIPE_CACHE: OnceLock> = OnceLock::new(); let Some((pipe_rd, pipe_wr)) = PIPE_CACHE.get_or_init(|| pipe::().ok()) else { return Ok(Err(())); }; - + // fcntl for input would not improve throughput since + // - sender with splice probably increased size already + // - sender without splice is bottleneck + let _ = fcntl_setpipe_size(&mut *dest, MAX_ROOTLESS_PIPE_SIZE); + // pre-generate page caches for splice + let _ = rustix::fs::fadvise(source, 0, None, rustix::fs::Advice::Sequential); loop { match splice(&source, &pipe_wr, MAX_ROOTLESS_PIPE_SIZE) { Ok(0) => return Ok(Ok(())), @@ -113,23 +111,6 @@ fn splice_unbounded_broker(source: &impl AsFd, dest: &mut impl AsFd) -> PipeRes } } -/// try splice_unbounded 1st and splice_unbounded_broker if both of in/output are not pipe -/// This includes read ahead and optimization for stdout's pipe size -#[inline] -pub fn splice_unbounded_auto(source: &impl AsFd, dest: &mut impl AsFd) -> PipeRes { - // fcntl for input would not improve throughput since - // - sender with splice probably increased size already - // - sender without splice is bottleneck - let is_pipe_out = fcntl_setpipe_size(&mut *dest, MAX_ROOTLESS_PIPE_SIZE).is_ok(); - // pre-generate page caches for splice - let is_file_in = rustix::fs::fadvise(source, 0, None, rustix::fs::Advice::Sequential).is_ok(); - if (is_file_in && !is_pipe_out) || splice_unbounded(source, dest).is_err() { - // both of in/output are not pipe - return splice_unbounded_broker(source, dest); - } - Ok(Ok(())) -} - /// splice `n` bytes with read/write fallback /// return actually sent bytes #[inline] @@ -143,52 +124,37 @@ pub fn send_n_bytes(input: impl AsFd, target: impl AsFd, n: u64) -> std::io::Res } let mut n = n; let mut bytes_written: u64 = 0; - let succeed_or_fuse = loop { - if n == 0 { - // avoid unnecessary syscall - return Ok(bytes_written); - } - match splice(&input, &target, n as usize) { - Ok(0) => break true, - Ok(s) => { - n -= s as u64; - bytes_written += s as u64; + let succeed_or_fuse = if let Some((broker_r, broker_w)) = PIPE_CACHE + .get_or_init(|| { + // use std::io::pipe to avoid unnecessary fcntl + let pair = std::io::pipe().ok()?; + if pipe_size > KERNEL_DEFAULT_PIPE_SIZE { + let _ = fcntl_setpipe_size(&pair.0, pipe_size); } - _ => break false, // input or output is not pipe - } - }; - let succeed_or_fuse = succeed_or_fuse - || if let Some((broker_r, broker_w)) = PIPE_CACHE - .get_or_init(|| { - // use std::io::pipe to avoid unnecessary fcntl - let pair = std::io::pipe().ok()?; - if pipe_size > KERNEL_DEFAULT_PIPE_SIZE { - let _ = fcntl_setpipe_size(&pair.0, pipe_size); - } - Some(pair) - }) - .as_ref() - { - // todo: create fn splice_bounded_broker - loop { - if n == 0 { - return Ok(bytes_written); - } - match splice(&input, &broker_w, n as usize) { - Ok(0) => break true, - Ok(s) => { - n -= s as u64; - bytes_written += s as u64; - if drain_pipe(broker_r, &target, s)?.is_err() { - break false; - } + Some(pair) + }) + .as_ref() + { + // todo: create fn splice_bounded_broker + loop { + if n == 0 { + return Ok(bytes_written); + } + match splice(&input, &broker_w, n as usize) { + Ok(0) => break true, + Ok(s) => { + n -= s as u64; + bytes_written += s as u64; + if drain_pipe(broker_r, &target, s)?.is_err() { + break false; } - _ => break false, } + _ => break false, } - } else { - false - }; + } + } else { + false + }; // do not always fallback to write for fuse, or 2 Ctrl+D is required to exit on tty // todo: move fuse patch to callers if !succeed_or_fuse || might_fuse(&input) { From b3f2833ff01acff1b7e26647b0dc1b2a8aedcd2f Mon Sep 17 00:00:00 2001 From: oech3 <> Date: Sat, 13 Jun 2026 18:38:07 +0900 Subject: [PATCH 2/2] cat: catch strace injection of 2nd+ splice --- src/uucore/src/lib/features/pipes.rs | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/src/uucore/src/lib/features/pipes.rs b/src/uucore/src/lib/features/pipes.rs index e6be54c81d1..2d53950a7ff 100644 --- a/src/uucore/src/lib/features/pipes.rs +++ b/src/uucore/src/lib/features/pipes.rs @@ -98,17 +98,25 @@ pub fn splice_unbounded_auto(source: &impl AsFd, dest: &mut impl AsFd) -> PipeRe let _ = fcntl_setpipe_size(&mut *dest, MAX_ROOTLESS_PIPE_SIZE); // pre-generate page caches for splice let _ = rustix::fs::fadvise(source, 0, None, rustix::fs::Advice::Sequential); - loop { - match splice(&source, &pipe_wr, MAX_ROOTLESS_PIPE_SIZE) { - Ok(0) => return Ok(Ok(())), - Ok(n) => { - if drain_pipe(pipe_rd, dest, n)?.is_err() { - return Ok(Err(())); - } + // check support of splice and fallback + match splice(&source, &pipe_wr, MAX_ROOTLESS_PIPE_SIZE) { + Ok(0) => return Ok(Ok(())), + Ok(n) => { + if drain_pipe(pipe_rd, dest, n)?.is_err() { + return Ok(Err(())); } - Err(_) => return Ok(Err(())), } + Err(_) => return Ok(Err(())), } + // GNU catches all strace injections for except for 1st one + while let mut n @ 1.. = + splice(&source, &pipe_wr, MAX_ROOTLESS_PIPE_SIZE).map_err(std::io::Error::from)? + { + while n > 0 { + n -= splice(&pipe_rd, dest, n)?; + } + } + Ok(Ok(())) } /// splice `n` bytes with read/write fallback