Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 43 additions & 51 deletions src/uucore/src/lib/features/pipes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,24 +92,31 @@ pub fn splice_unbounded_auto(source: &impl AsFd, dest: &mut impl AsFd) -> PipeRe
let Some((pipe_rd, pipe_wr)) = PIPE_CACHE.get_or_init(|| pipe::<false>().ok()) else {
return Ok(Err(()));
};

// fcntl for input would not improve throughput since
// - sender with splice probably increased size already
// - sender without splice is bottleneck
let _ = fcntl_setpipe_size(&mut *dest, MAX_ROOTLESS_PIPE_SIZE);
// pre-generate page caches for splice
let _ = rustix::fs::fadvise(source, 0, None, rustix::fs::Advice::Sequential);
loop {
match splice(&source, &pipe_wr, MAX_ROOTLESS_PIPE_SIZE) {
Ok(0) => return Ok(Ok(())),
Ok(n) => {
if drain_pipe(pipe_rd, dest, n)?.is_err() {
return Ok(Err(()));
}
// check support of splice and fallback
match splice(&source, &pipe_wr, MAX_ROOTLESS_PIPE_SIZE) {
Ok(0) => return Ok(Ok(())),
Ok(n) => {
if drain_pipe(pipe_rd, dest, n)?.is_err() {
return Ok(Err(()));
}
Err(_) => return Ok(Err(())),
}
Err(_) => return Ok(Err(())),
}
// GNU catches all strace injections for except for 1st one
while let mut n @ 1.. =
splice(&source, &pipe_wr, MAX_ROOTLESS_PIPE_SIZE).map_err(std::io::Error::from)?
{
while n > 0 {
n -= splice(&pipe_rd, dest, n)?;
}
}
Ok(Ok(()))
}

/// splice `n` bytes with read/write fallback
Expand All @@ -125,52 +132,37 @@ pub fn send_n_bytes(input: impl AsFd, target: impl AsFd, n: u64) -> std::io::Res
}
let mut n = n;
let mut bytes_written: u64 = 0;
let succeed_or_fuse = loop {
if n == 0 {
// avoid unnecessary syscall
return Ok(bytes_written);
}
match splice(&input, &target, n as usize) {
Ok(0) => break true,
Ok(s) => {
n -= s as u64;
bytes_written += s as u64;
let succeed_or_fuse = if let Some((broker_r, broker_w)) = PIPE_CACHE
.get_or_init(|| {
// use std::io::pipe to avoid unnecessary fcntl
let pair = std::io::pipe().ok()?;
if pipe_size > KERNEL_DEFAULT_PIPE_SIZE {
let _ = fcntl_setpipe_size(&pair.0, pipe_size);
}
_ => break false, // input or output is not pipe
}
};
let succeed_or_fuse = succeed_or_fuse
|| if let Some((broker_r, broker_w)) = PIPE_CACHE
.get_or_init(|| {
// use std::io::pipe to avoid unnecessary fcntl
let pair = std::io::pipe().ok()?;
if pipe_size > KERNEL_DEFAULT_PIPE_SIZE {
let _ = fcntl_setpipe_size(&pair.0, pipe_size);
}
Some(pair)
})
.as_ref()
{
// todo: create fn splice_bounded_broker
loop {
if n == 0 {
return Ok(bytes_written);
}
match splice(&input, &broker_w, n as usize) {
Ok(0) => break true,
Ok(s) => {
n -= s as u64;
bytes_written += s as u64;
if drain_pipe(broker_r, &target, s)?.is_err() {
break false;
}
Some(pair)
})
.as_ref()
{
// todo: create fn splice_bounded_broker
loop {
if n == 0 {
return Ok(bytes_written);
}
match splice(&input, &broker_w, n as usize) {
Ok(0) => break true,
Ok(s) => {
n -= s as u64;
bytes_written += s as u64;
if drain_pipe(broker_r, &target, s)?.is_err() {
break false;
}
_ => break false,
}
_ => break false,
}
} else {
false
};
}
} else {
false
};
// do not always fallback to write for fuse, or 2 Ctrl+D is required to exit on tty
// todo: move fuse patch to callers
if !succeed_or_fuse || might_fuse(&input) {
Expand Down
Loading