diff --git a/Cargo.toml b/Cargo.toml index a5aaee9cd..527ce8fb3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,13 +19,16 @@ exclude = ["/.github", "tests", "src/bin"] edition = "2018" rust-version = "1.53" +[dependencies] +jobserver = { version = "0.1.20", default-features = false, optional = true } + [target.'cfg(unix)'.dependencies] # Don't turn on the feature "std" for this, see https://github.com/rust-lang/cargo/issues/4866 # which is still an issue with `resolver = "1"`. libc = { version = "0.2.62", default-features = false, optional = true } [features] -parallel = ["libc"] +parallel = ["libc", "jobserver"] [dev-dependencies] tempfile = "3" diff --git a/src/lib.rs b/src/lib.rs index 0018d4ff2..fbcf7d29a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -312,6 +312,9 @@ enum ErrorKind { ToolNotFound, /// One of the function arguments failed validation. InvalidArgument, + #[cfg(feature = "parallel")] + /// jobserver helpthread failure + JobserverHelpThreadError, } /// Represents an internal error that occurred, with an explanation. @@ -1505,13 +1508,7 @@ impl Build { let spawn_future = async { for obj in objs { let (mut cmd, program) = self.create_compile_object_cmd(obj)?; - let token = loop { - if let Some(token) = tokens.try_acquire()? { - break token; - } else { - YieldOnce::default().await - } - }; + let token = tokens.acquire().await?; let mut child = spawn(&mut cmd, &program, &self.cargo_output)?; let mut stderr_forwarder = StderrForwarder::new(&mut child); stderr_forwarder.set_non_blocking()?; diff --git a/src/parallel/job_token.rs b/src/parallel/job_token.rs new file mode 100644 index 000000000..4fec982f8 --- /dev/null +++ b/src/parallel/job_token.rs @@ -0,0 +1,255 @@ +use std::{marker::PhantomData, mem::MaybeUninit, sync::Once}; + +use crate::Error; + +pub(crate) struct JobToken(PhantomData<()>); + +impl JobToken { + fn new() -> Self { + Self(PhantomData) + } +} + +impl Drop for JobToken { + fn drop(&mut self) { + match JobTokenServer::new() { + JobTokenServer::Inherited(jobserver) => jobserver.release_token_raw(), + JobTokenServer::InProcess(jobserver) => jobserver.release_token_raw(), + } + } +} + +enum JobTokenServer { + Inherited(inherited_jobserver::JobServer), + InProcess(inprocess_jobserver::JobServer), +} + +impl JobTokenServer { + /// This function returns a static reference to the jobserver because + /// - creating a jobserver from env is a bit fd-unsafe (e.g. the fd might + /// be closed by other jobserver users in the process) and better do it + /// at the start of the program. + /// - in case a jobserver cannot be created from env (e.g. it's not + /// present), we will create a global in-process only jobserver + /// that has to be static so that it will be shared by all cc + /// compilation. + fn new() -> &'static Self { + static INIT: Once = Once::new(); + static mut JOBSERVER: MaybeUninit = MaybeUninit::uninit(); + + unsafe { + INIT.call_once(|| { + let server = inherited_jobserver::JobServer::from_env() + .map(Self::Inherited) + .unwrap_or_else(|| Self::InProcess(inprocess_jobserver::JobServer::new())); + JOBSERVER = MaybeUninit::new(server); + }); + // TODO: Poor man's assume_init_ref, as that'd require a MSRV of 1.55. + &*JOBSERVER.as_ptr() + } + } +} + +pub(crate) enum ActiveJobTokenServer { + Inherited(inherited_jobserver::ActiveJobServer<'static>), + InProcess(&'static inprocess_jobserver::JobServer), +} + +impl ActiveJobTokenServer { + pub(crate) fn new() -> Result { + match JobTokenServer::new() { + JobTokenServer::Inherited(inherited_jobserver) => { + inherited_jobserver.enter_active().map(Self::Inherited) + } + JobTokenServer::InProcess(inprocess_jobserver) => { + Ok(Self::InProcess(inprocess_jobserver)) + } + } + } + + pub(crate) async fn acquire(&self) -> Result { + match &self { + Self::Inherited(jobserver) => jobserver.acquire().await, + Self::InProcess(jobserver) => Ok(jobserver.acquire().await), + } + } +} + +mod inherited_jobserver { + use super::JobToken; + + use crate::{parallel::async_executor::YieldOnce, Error, ErrorKind}; + + use std::{ + io, mem, + sync::{mpsc, Mutex, MutexGuard, PoisonError}, + }; + + pub(super) struct JobServer { + /// Implicit token for this process which is obtained and will be + /// released in parent. Since JobTokens only give back what they got, + /// there should be at most one global implicit token in the wild. + /// + /// Since Rust does not execute any `Drop` for global variables, + /// we can't just put it back to jobserver and then re-acquire it at + /// the end of the process. + /// + /// Use `Mutex` to avoid race between acquire and release. + /// If an `AtomicBool` is used, then it's possible for: + /// - `release_token_raw`: Tries to set `global_implicit_token` to true, but it is already + /// set to `true`, continue to release it to jobserver + /// - `acquire` takes the global implicit token, set `global_implicit_token` to false + /// - `release_token_raw` now writes the token back into the jobserver, while + /// `global_implicit_token` is `false` + /// + /// If the program exits here, then cc effectively increases parallelism by one, which is + /// incorrect, hence we use a `Mutex` here. + global_implicit_token: Mutex, + inner: jobserver::Client, + } + + impl JobServer { + pub(super) unsafe fn from_env() -> Option { + jobserver::Client::from_env().map(|inner| Self { + inner, + global_implicit_token: Mutex::new(true), + }) + } + + fn get_global_implicit_token(&self) -> MutexGuard<'_, bool> { + self.global_implicit_token + .lock() + .unwrap_or_else(PoisonError::into_inner) + } + + /// All tokens except for the global implicit token will be put back into the jobserver + /// immediately and they cannot be cached, since Rust does not call `Drop::drop` on + /// global variables. + pub(super) fn release_token_raw(&self) { + let mut global_implicit_token = self.get_global_implicit_token(); + + if *global_implicit_token { + // There's already a global implicit token, so this token must + // be released back into jobserver. + // + // `release_raw` should not block + let _ = self.inner.release_raw(); + } else { + *global_implicit_token = true; + } + } + + pub(super) fn enter_active(&self) -> Result, Error> { + ActiveJobServer::new(self) + } + } + + pub(crate) struct ActiveJobServer<'a> { + jobserver: &'a JobServer, + helper_thread: jobserver::HelperThread, + /// When rx is dropped, all the token stored within it will be dropped. + rx: mpsc::Receiver>, + } + + impl<'a> ActiveJobServer<'a> { + fn new(jobserver: &'a JobServer) -> Result { + let (tx, rx) = mpsc::channel(); + + Ok(Self { + rx, + helper_thread: jobserver.inner.clone().into_helper_thread(move |res| { + let _ = tx.send(res); + })?, + jobserver, + }) + } + + pub(super) async fn acquire(&self) -> Result { + let mut has_requested_token = false; + + loop { + // Fast path + if mem::replace(&mut *self.jobserver.get_global_implicit_token(), false) { + break Ok(JobToken::new()); + } + + // Cold path, no global implicit token, obtain one + match self.rx.try_recv() { + Ok(res) => { + let acquired = res?; + acquired.drop_without_releasing(); + break Ok(JobToken::new()); + } + Err(mpsc::TryRecvError::Disconnected) => { + break Err(Error::new( + ErrorKind::JobserverHelpThreadError, + "jobserver help thread has returned before ActiveJobServer is dropped", + )) + } + Err(mpsc::TryRecvError::Empty) => { + if !has_requested_token { + self.helper_thread.request_token(); + has_requested_token = true; + } + YieldOnce::default().await + } + } + } + } + } +} + +mod inprocess_jobserver { + use super::JobToken; + + use crate::parallel::async_executor::YieldOnce; + + use std::{ + env::var, + sync::atomic::{ + AtomicU32, + Ordering::{AcqRel, Acquire}, + }, + }; + + pub(crate) struct JobServer(AtomicU32); + + impl JobServer { + pub(super) fn new() -> Self { + // Use `NUM_JOBS` if set (it's configured by Cargo) and otherwise + // just fall back to a semi-reasonable number. + // + // Note that we could use `num_cpus` here but it's an extra + // dependency that will almost never be used, so + // it's generally not too worth it. + let mut parallelism = 4; + // TODO: Use std::thread::available_parallelism as an upper bound + // when MSRV is bumped. + if let Ok(amt) = var("NUM_JOBS") { + if let Ok(amt) = amt.parse() { + parallelism = amt; + } + } + + Self(AtomicU32::new(parallelism)) + } + + pub(super) async fn acquire(&self) -> JobToken { + loop { + let res = self + .0 + .fetch_update(AcqRel, Acquire, |tokens| tokens.checked_sub(1)); + + if res.is_ok() { + break JobToken::new(); + } + + YieldOnce::default().await + } + } + + pub(super) fn release_token_raw(&self) { + self.0.fetch_add(1, AcqRel); + } + } +} diff --git a/src/parallel/job_token/mod.rs b/src/parallel/job_token/mod.rs deleted file mode 100644 index a04d7625b..000000000 --- a/src/parallel/job_token/mod.rs +++ /dev/null @@ -1,257 +0,0 @@ -use std::{mem::MaybeUninit, sync::Once}; - -use crate::Error; - -#[cfg(unix)] -#[path = "unix.rs"] -mod sys; - -#[cfg(windows)] -#[path = "windows.rs"] -mod sys; - -pub(crate) struct JobToken(); - -impl Drop for JobToken { - fn drop(&mut self) { - match JobTokenServer::new() { - JobTokenServer::Inherited(jobserver) => jobserver.release_token_raw(), - JobTokenServer::InProcess(jobserver) => jobserver.release_token_raw(), - } - } -} - -enum JobTokenServer { - Inherited(inherited_jobserver::JobServer), - InProcess(inprocess_jobserver::JobServer), -} - -impl JobTokenServer { - /// This function returns a static reference to the jobserver because - /// - creating a jobserver from env is a bit fd-unsafe (e.g. the fd might - /// be closed by other jobserver users in the process) and better do it - /// at the start of the program. - /// - in case a jobserver cannot be created from env (e.g. it's not - /// present), we will create a global in-process only jobserver - /// that has to be static so that it will be shared by all cc - /// compilation. - fn new() -> &'static Self { - static INIT: Once = Once::new(); - static mut JOBSERVER: MaybeUninit = MaybeUninit::uninit(); - - unsafe { - INIT.call_once(|| { - let server = inherited_jobserver::JobServer::from_env() - .map(Self::Inherited) - .unwrap_or_else(|| Self::InProcess(inprocess_jobserver::JobServer::new())); - JOBSERVER = MaybeUninit::new(server); - }); - // TODO: Poor man's assume_init_ref, as that'd require a MSRV of 1.55. - &*JOBSERVER.as_ptr() - } - } -} - -pub(crate) struct ActiveJobTokenServer(&'static JobTokenServer); - -impl ActiveJobTokenServer { - pub(crate) fn new() -> Result { - let jobserver = JobTokenServer::new(); - - #[cfg(unix)] - if let JobTokenServer::Inherited(inherited_jobserver) = &jobserver { - inherited_jobserver.enter_active()?; - } - - Ok(Self(jobserver)) - } - - pub(crate) fn try_acquire(&self) -> Result, Error> { - match &self.0 { - JobTokenServer::Inherited(jobserver) => jobserver.try_acquire(), - JobTokenServer::InProcess(jobserver) => Ok(jobserver.try_acquire()), - } - } -} - -impl Drop for ActiveJobTokenServer { - fn drop(&mut self) { - #[cfg(unix)] - if let JobTokenServer::Inherited(inherited_jobserver) = &self.0 { - inherited_jobserver.exit_active(); - } - } -} - -mod inherited_jobserver { - use super::{sys, Error, JobToken}; - - use std::{ - env::var_os, - sync::atomic::{ - AtomicBool, - Ordering::{AcqRel, Acquire}, - }, - }; - - #[cfg(unix)] - use std::sync::{Mutex, MutexGuard, PoisonError}; - - pub(crate) struct JobServer { - /// Implicit token for this process which is obtained and will be - /// released in parent. Since JobTokens only give back what they got, - /// there should be at most one global implicit token in the wild. - /// - /// Since Rust does not execute any `Drop` for global variables, - /// we can't just put it back to jobserver and then re-acquire it at - /// the end of the process. - global_implicit_token: AtomicBool, - inner: sys::JobServerClient, - /// number of active clients is required to know when it is safe to clear non-blocking - /// flags - #[cfg(unix)] - active_clients_cnt: Mutex, - } - - impl JobServer { - pub(super) unsafe fn from_env() -> Option { - let var = var_os("CARGO_MAKEFLAGS") - .or_else(|| var_os("MAKEFLAGS")) - .or_else(|| var_os("MFLAGS"))?; - - #[cfg(unix)] - let var = std::os::unix::ffi::OsStrExt::as_bytes(var.as_os_str()); - #[cfg(not(unix))] - let var = var.to_str()?.as_bytes(); - - let makeflags = var.split(u8::is_ascii_whitespace); - - // `--jobserver-auth=` is the only documented makeflags. - // `--jobserver-fds=` is actually an internal only makeflags, so we should - // always prefer `--jobserver-auth=`. - // - // Also, according to doc of makeflags, if there are multiple `--jobserver-auth=` - // the last one is used - if let Some(flag) = makeflags - .clone() - .filter_map(|s| s.strip_prefix(b"--jobserver-auth=")) - .last() - { - sys::JobServerClient::open(flag) - } else { - sys::JobServerClient::open( - makeflags - .filter_map(|s| s.strip_prefix(b"--jobserver-fds=")) - .last()?, - ) - } - .map(|inner| Self { - inner, - global_implicit_token: AtomicBool::new(true), - #[cfg(unix)] - active_clients_cnt: Mutex::new(0), - }) - } - - #[cfg(unix)] - fn get_locked_active_cnt(&self) -> MutexGuard<'_, usize> { - self.active_clients_cnt - .lock() - .unwrap_or_else(PoisonError::into_inner) - } - - #[cfg(unix)] - pub(super) fn enter_active(&self) -> Result<(), Error> { - let mut active_cnt = self.get_locked_active_cnt(); - if *active_cnt == 0 { - self.inner.prepare_for_acquires()?; - } - - *active_cnt += 1; - - Ok(()) - } - - #[cfg(unix)] - pub(super) fn exit_active(&self) { - let mut active_cnt = self.get_locked_active_cnt(); - *active_cnt -= 1; - - if *active_cnt == 0 { - self.inner.done_acquires(); - } - } - - pub(super) fn try_acquire(&self) -> Result, Error> { - if !self.global_implicit_token.swap(false, AcqRel) { - // Cold path, no global implicit token, obtain one - if self.inner.try_acquire()?.is_none() { - return Ok(None); - } - } - Ok(Some(JobToken())) - } - - pub(super) fn release_token_raw(&self) { - // All tokens will be put back into the jobserver immediately - // and they cannot be cached, since Rust does not call `Drop::drop` - // on global variables. - if self - .global_implicit_token - .compare_exchange(false, true, AcqRel, Acquire) - .is_err() - { - // There's already a global implicit token, so this token must - // be released back into jobserver - let _ = self.inner.release(); - } - } - } -} - -mod inprocess_jobserver { - use super::JobToken; - - use std::{ - env::var, - sync::atomic::{ - AtomicU32, - Ordering::{AcqRel, Acquire}, - }, - }; - - pub(crate) struct JobServer(AtomicU32); - - impl JobServer { - pub(super) fn new() -> Self { - // Use `NUM_JOBS` if set (it's configured by Cargo) and otherwise - // just fall back to a semi-reasonable number. - // - // Note that we could use `num_cpus` here but it's an extra - // dependency that will almost never be used, so - // it's generally not too worth it. - let mut parallelism = 4; - // TODO: Use std::thread::available_parallelism as an upper bound - // when MSRV is bumped. - if let Ok(amt) = var("NUM_JOBS") { - if let Ok(amt) = amt.parse() { - parallelism = amt; - } - } - - Self(AtomicU32::new(parallelism)) - } - - pub(super) fn try_acquire(&self) -> Option { - let res = self - .0 - .fetch_update(AcqRel, Acquire, |tokens| tokens.checked_sub(1)); - - res.ok().map(|_| JobToken()) - } - - pub(super) fn release_token_raw(&self) { - self.0.fetch_add(1, AcqRel); - } - } -} diff --git a/src/parallel/job_token/unix.rs b/src/parallel/job_token/unix.rs deleted file mode 100644 index 7f8e1b881..000000000 --- a/src/parallel/job_token/unix.rs +++ /dev/null @@ -1,176 +0,0 @@ -use std::{ - ffi::OsStr, - fs::{self, File}, - io::{self, Read, Write}, - mem::ManuallyDrop, - os::{raw::c_int, unix::prelude::*}, - path::Path, -}; - -use crate::parallel::stderr::{set_blocking, set_non_blocking}; - -pub(super) struct JobServerClient { - read: File, - write: Option, -} - -impl JobServerClient { - pub(super) unsafe fn open(var: &[u8]) -> Option { - if let Some(fifo) = var.strip_prefix(b"fifo:") { - Self::from_fifo(Path::new(OsStr::from_bytes(fifo))) - } else { - Self::from_pipe(OsStr::from_bytes(var).to_str()?) - } - } - - /// `--jobserver-auth=fifo:PATH` - fn from_fifo(path: &Path) -> Option { - let file = fs::OpenOptions::new() - .read(true) - .write(true) - .open(path) - .ok()?; - - if is_pipe(&file)? { - // File in Rust is always closed-on-exec as long as it's opened by - // `File::open` or `fs::OpenOptions::open`. - set_non_blocking(&file).ok()?; - - Some(Self { - read: file, - write: None, - }) - } else { - None - } - } - - /// `--jobserver-auth=fd-for-R,fd-for-W` - unsafe fn from_pipe(s: &str) -> Option { - let (read, write) = s.split_once(',')?; - - let read = read.parse().ok()?; - let write = write.parse().ok()?; - - let read = ManuallyDrop::new(File::from_raw_fd(read)); - let write = ManuallyDrop::new(File::from_raw_fd(write)); - - // Ok so we've got two integers that look like file descriptors, but - // for extra sanity checking let's see if they actually look like - // instances of a pipe before we return the client. - // - // If we're called from `make` *without* the leading + on our rule - // then we'll have `MAKEFLAGS` env vars but won't actually have - // access to the file descriptors. - match ( - is_pipe(&read), - is_pipe(&write), - get_access_mode(&read), - get_access_mode(&write), - ) { - ( - Some(true), - Some(true), - Some(libc::O_RDONLY) | Some(libc::O_RDWR), - Some(libc::O_WRONLY) | Some(libc::O_RDWR), - ) => { - // Optimization: Try converting it to a fifo by using /dev/fd - if let Some(jobserver) = - Self::from_fifo(Path::new(&format!("/dev/fd/{}", read.as_raw_fd()))) - { - return Some(jobserver); - } - - let read = read.try_clone().ok()?; - let write = write.try_clone().ok()?; - - Some(Self { - read, - write: Some(write), - }) - } - _ => None, - } - } - - pub(super) fn prepare_for_acquires(&self) -> Result<(), crate::Error> { - if let Some(write) = self.write.as_ref() { - set_non_blocking(&self.read)?; - set_non_blocking(write)?; - } - - Ok(()) - } - - pub(super) fn done_acquires(&self) { - if let Some(write) = self.write.as_ref() { - let _ = set_blocking(&self.read); - let _ = set_blocking(write); - } - } - - /// Must call `prepare_for_acquire` before using it. - pub(super) fn try_acquire(&self) -> io::Result> { - let mut fds = [libc::pollfd { - fd: self.read.as_raw_fd(), - events: libc::POLLIN, - revents: 0, - }]; - - let ret = cvt(unsafe { libc::poll(fds.as_mut_ptr(), 1, 0) })?; - if ret == 1 { - let mut buf = [0]; - match (&self.read).read(&mut buf) { - Ok(1) => Ok(Some(())), - Ok(_) => Ok(None), // 0, eof - Err(e) - if e.kind() == io::ErrorKind::Interrupted - || e.kind() == io::ErrorKind::WouldBlock => - { - Ok(None) - } - Err(e) => Err(e), - } - } else { - Ok(None) - } - } - - pub(super) fn release(&self) -> io::Result<()> { - // For write to block, this would mean that pipe is full. - // If all every release are pair with an acquire, then this cannot - // happen. - // - // If it does happen, it is likely a bug in the program using this - // crate or some other programs that use the same jobserver have a - // bug in their code. - // - // If that turns out to not be the case we'll get an error anyway! - let mut write = self.write.as_ref().unwrap_or(&self.read); - match write.write(&[b'+'])? { - 1 => Ok(()), - _ => Err(io::Error::from(io::ErrorKind::UnexpectedEof)), - } - } -} - -fn cvt(t: c_int) -> io::Result { - if t == -1 { - Err(io::Error::last_os_error()) - } else { - Ok(t) - } -} - -fn is_pipe(file: &File) -> Option { - Some(file.metadata().ok()?.file_type().is_fifo()) -} - -fn get_access_mode(file: &File) -> Option { - let ret = unsafe { libc::fcntl(file.as_raw_fd(), libc::F_GETFL) }; - if ret == -1 { - return None; - } - - Some(ret & libc::O_ACCMODE) -} diff --git a/src/parallel/job_token/windows.rs b/src/parallel/job_token/windows.rs deleted file mode 100644 index 434fe169e..000000000 --- a/src/parallel/job_token/windows.rs +++ /dev/null @@ -1,68 +0,0 @@ -use std::{ffi::CString, io, ptr, str}; - -use crate::windows::windows_sys::{ - OpenSemaphoreA, ReleaseSemaphore, WaitForSingleObject, FALSE, HANDLE, SEMAPHORE_MODIFY_STATE, - THREAD_SYNCHRONIZE, WAIT_ABANDONED, WAIT_FAILED, WAIT_OBJECT_0, WAIT_TIMEOUT, -}; - -pub(super) struct JobServerClient { - sem: HANDLE, -} - -unsafe impl Sync for JobServerClient {} -unsafe impl Send for JobServerClient {} - -impl JobServerClient { - pub(super) unsafe fn open(var: &[u8]) -> Option { - let var = str::from_utf8(var).ok()?; - if !var.is_ascii() { - // `OpenSemaphoreA` only accepts ASCII, not utf-8. - // - // Upstream implementation jobserver and jobslot also uses the - // same function and they works without problem, so there's no - // motivation to support utf-8 here using `OpenSemaphoreW` - // which only makes the code harder to maintain by making it more - // different than upstream. - return None; - } - - let name = CString::new(var).ok()?; - - let sem = OpenSemaphoreA( - THREAD_SYNCHRONIZE | SEMAPHORE_MODIFY_STATE, - FALSE, - name.as_bytes().as_ptr(), - ); - if sem != ptr::null_mut() { - Some(Self { sem }) - } else { - None - } - } - - pub(super) fn try_acquire(&self) -> io::Result> { - match unsafe { WaitForSingleObject(self.sem, 0) } { - WAIT_OBJECT_0 => Ok(Some(())), - WAIT_TIMEOUT => Ok(None), - WAIT_FAILED => Err(io::Error::last_os_error()), - // We believe this should be impossible for a semaphore, but still - // check the error code just in case it happens. - WAIT_ABANDONED => Err(io::Error::new( - io::ErrorKind::Other, - "Wait on jobserver semaphore returned WAIT_ABANDONED", - )), - _ => unreachable!("Unexpected return value from WaitForSingleObject"), - } - } - - pub(super) fn release(&self) -> io::Result<()> { - // SAFETY: ReleaseSemaphore will write to prev_count is it is Some - // and release semaphore self.sem by 1. - let r = unsafe { ReleaseSemaphore(self.sem, 1, ptr::null_mut()) }; - if r != 0 { - Ok(()) - } else { - Err(io::Error::last_os_error()) - } - } -} diff --git a/src/parallel/stderr.rs b/src/parallel/stderr.rs index 2b85772ad..47fa085db 100644 --- a/src/parallel/stderr.rs +++ b/src/parallel/stderr.rs @@ -39,16 +39,6 @@ fn set_flags(fd: std::os::unix::io::RawFd, flags: std::os::raw::c_int) -> Result } } -#[cfg(unix)] -pub fn set_blocking(pipe: &impl std::os::unix::io::AsRawFd) -> Result<(), Error> { - // On Unix, switch the pipe to non-blocking mode. - // On Windows, we have a different way to be non-blocking. - let fd = pipe.as_raw_fd(); - - let flags = get_flags(fd)?; - set_flags(fd, flags & (!libc::O_NONBLOCK)) -} - #[cfg(unix)] pub fn set_non_blocking(pipe: &impl std::os::unix::io::AsRawFd) -> Result<(), Error> { // On Unix, switch the pipe to non-blocking mode.