diff --git a/Cargo.toml b/Cargo.toml index 3128f0367..3117b8e21 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ edition = "2018" [dependencies] jobserver = { version = "0.1.16", optional = true } +os_pipe = "1" [features] parallel = ["jobserver"] diff --git a/src/lib.rs b/src/lib.rs index 4bb110162..688adbd8e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1218,8 +1218,7 @@ impl Build { } #[cfg(feature = "parallel")] - fn compile_objects<'me>(&'me self, objs: &[Object]) -> Result<(), Error> { - use std::sync::atomic::{AtomicBool, Ordering::SeqCst}; + fn compile_objects(&self, objs: &[Object]) -> Result<(), Error> { use std::sync::Once; // Limit our parallelism globally with a jobserver. Start off by @@ -1242,56 +1241,28 @@ impl Build { // Note that this jobserver is cached globally so we only used one per // process and only worry about creating it once. // - // * Next we use a raw `thread::spawn` per thread to actually compile - // objects in parallel. We only actually spawn a thread after we've - // acquired a token to perform some work - // - // * Finally though we want to keep the dependencies of this crate - // pretty light, so we avoid using a safe abstraction like `rayon` and - // instead rely on some bits of `unsafe` code. We know that this stack - // frame persists while everything is compiling so we use all the - // stack-allocated objects without cloning/reallocating. We use a - // transmute to `State` with a `'static` lifetime to persist - // everything we need across the boundary, and the join-on-drop - // semantics of `JoinOnDrop` should ensure that our stack frame is - // alive while threads are alive. + // * Next we use spawn the process to actually compile objects in + // parallel after we've acquired a token to perform some work // // With all that in mind we compile all objects in a loop here, after we // acquire the appropriate tokens, Once all objects have been compiled - // we join on all the threads and propagate the results of compilation. - // - // Note that as a slight optimization we try to break out as soon as - // possible as soon as any compilation fails to ensure that errors get - // out to the user as fast as possible. - let error = AtomicBool::new(false); - let mut threads = Vec::new(); - for obj in objs { - if error.load(SeqCst) { - break; - } - let token = server.acquire()?; - let state = State { - build: self, - obj, - error: &error, - }; - let state = unsafe { std::mem::transmute::>(state) }; - let thread = thread::spawn(|| { - let state: State<'me> = state; // erase the `'static` lifetime - let result = state.build.compile_object(state.obj); - if result.is_err() { - state.error.store(true, SeqCst); - } - drop(token); // make sure our jobserver token is released after the compile - return result; - }); - threads.push(JoinOnDrop(Some(thread))); - } + // we wait on all the processes and propagate the results of compilation. + let print = PrintThread::new()?; - for mut thread in threads { - if let Some(thread) = thread.0.take() { - thread.join().expect("thread should not panic")?; - } + let children = objs + .iter() + .map(|obj| { + let (mut cmd, program) = self.create_compile_object_cmd(obj)?; + let token = server.acquire()?; + + let child = spawn(&mut cmd, &program, print.pipe_writer_cloned()?.unwrap())?; + + Ok((cmd, program, KillOnDrop(child), token)) + }) + .collect::, Error>>()?; + + for (cmd, program, mut child, _token) in children { + wait_on_child(&cmd, &program, &mut child.0)?; } // Reacquire our process's token before we proceed, which we released @@ -1302,16 +1273,6 @@ impl Build { return Ok(()); - /// Shared state from the parent thread to the child thread. This - /// package of pointers is temporarily transmuted to a `'static` - /// lifetime to cross the thread boundary and then once the thread is - /// running we erase the `'static` to go back to an anonymous lifetime. - struct State<'a> { - build: &'a Build, - obj: &'a Object, - error: &'a AtomicBool, - } - /// Returns a suitable `jobserver::Client` used to coordinate /// parallelism between build scripts. fn jobserver() -> &'static jobserver::Client { @@ -1357,26 +1318,30 @@ impl Build { return client; } - struct JoinOnDrop(Option>>); + struct KillOnDrop(Child); - impl Drop for JoinOnDrop { + impl Drop for KillOnDrop { fn drop(&mut self) { - if let Some(thread) = self.0.take() { - drop(thread.join()); - } + let child = &mut self.0; + + child.kill().ok(); } } } #[cfg(not(feature = "parallel"))] fn compile_objects(&self, objs: &[Object]) -> Result<(), Error> { + let print = PrintThread::new()?; + for obj in objs { - self.compile_object(obj)?; + let (mut cmd, name) = self.create_compile_object_cmd(obj)?; + run_inner(&mut cmd, &name, print.pipe_writer_cloned()?.unwrap())?; } + Ok(()) } - fn compile_object(&self, obj: &Object) -> Result<(), Error> { + fn create_compile_object_cmd(&self, obj: &Object) -> Result<(Command, String), Error> { let asm_ext = AsmFileExt::from_path(&obj.src); let is_asm = asm_ext.is_some(); let target = self.get_target()?; @@ -1425,8 +1390,7 @@ impl Build { self.fix_env_for_apple_os(&mut cmd)?; } - run(&mut cmd, &name)?; - Ok(()) + Ok((cmd, name)) } /// This will return a result instead of panicing; see expand() for the complete description. @@ -3463,21 +3427,19 @@ impl Tool { } } -fn run(cmd: &mut Command, program: &str) -> Result<(), Error> { - let (mut child, print) = spawn(cmd, program)?; +fn wait_on_child(cmd: &Command, program: &str, child: &mut Child) -> Result<(), Error> { let status = match child.wait() { Ok(s) => s, - Err(_) => { + Err(e) => { return Err(Error::new( ErrorKind::ToolExecError, &format!( - "Failed to wait on spawned child process, command {:?} with args {:?}.", - cmd, program + "Failed to wait on spawned child process, command {:?} with args {:?}: {}.", + cmd, program, e ), )); } }; - print.join().unwrap(); println!("{}", status); if status.success() { @@ -3493,9 +3455,28 @@ fn run(cmd: &mut Command, program: &str) -> Result<(), Error> { } } +fn run_inner( + cmd: &mut Command, + program: &str, + pipe_writer: os_pipe::PipeWriter, +) -> Result<(), Error> { + let mut child = spawn(cmd, program, pipe_writer)?; + wait_on_child(cmd, program, &mut child) +} + +fn run(cmd: &mut Command, program: &str) -> Result<(), Error> { + let mut print = PrintThread::new()?; + run_inner(cmd, program, print.pipe_writer().take().unwrap())?; + + Ok(()) +} + fn run_output(cmd: &mut Command, program: &str) -> Result, Error> { cmd.stdout(Stdio::piped()); - let (mut child, print) = spawn(cmd, program)?; + + let mut print = PrintThread::new()?; + let mut child = spawn(cmd, program, print.pipe_writer().take().unwrap())?; + let mut stdout = vec![]; child .stdout @@ -3503,53 +3484,33 @@ fn run_output(cmd: &mut Command, program: &str) -> Result, Error> { .unwrap() .read_to_end(&mut stdout) .unwrap(); - let status = match child.wait() { - Ok(s) => s, - Err(_) => { - return Err(Error::new( - ErrorKind::ToolExecError, - &format!( - "Failed to wait on spawned child process, command {:?} with args {:?}.", - cmd, program - ), - )); - } - }; - print.join().unwrap(); - println!("{}", status); - if status.success() { - Ok(stdout) - } else { - Err(Error::new( - ErrorKind::ToolExecError, - &format!( - "Command {:?} with args {:?} did not execute successfully (status code {}).", - cmd, program, status - ), - )) - } + wait_on_child(cmd, program, &mut child)?; + + Ok(stdout) } -fn spawn(cmd: &mut Command, program: &str) -> Result<(Child, JoinHandle<()>), Error> { - println!("running: {:?}", cmd); +fn spawn( + cmd: &mut Command, + program: &str, + pipe_writer: os_pipe::PipeWriter, +) -> Result { + struct ResetStderr<'cmd>(&'cmd mut Command); - // Capture the standard error coming from these programs, and write it out - // with cargo:warning= prefixes. Note that this is a bit wonky to avoid - // requiring the output to be UTF-8, we instead just ship bytes from one - // location to another. - match cmd.stderr(Stdio::piped()).spawn() { - Ok(mut child) => { - let stderr = BufReader::new(child.stderr.take().unwrap()); - let print = thread::spawn(move || { - for line in stderr.split(b'\n').filter_map(|l| l.ok()) { - print!("cargo:warning="); - std::io::stdout().write_all(&line).unwrap(); - println!(""); - } - }); - Ok((child, print)) + impl Drop for ResetStderr<'_> { + fn drop(&mut self) { + // Reset stderr to default to release pipe_writer so that print thread will + // not block forever. + self.0.stderr(Stdio::inherit()); } + } + + println!("running: {:?}", cmd); + + let cmd = ResetStderr(cmd); + + match cmd.0.stderr(pipe_writer).spawn() { + Ok(child) => Ok(child), Err(ref e) if e.kind() == io::ErrorKind::NotFound => { let extra = if cfg!(windows) { " (see https://github.com/rust-lang/cc-rs#compile-time-requirements \ @@ -3562,11 +3523,11 @@ fn spawn(cmd: &mut Command, program: &str) -> Result<(Child, JoinHandle<()>), Er &format!("Failed to find tool. Is `{}` installed?{}", program, extra), )) } - Err(ref e) => Err(Error::new( + Err(e) => Err(Error::new( ErrorKind::ToolExecError, &format!( "Command {:?} with args {:?} failed to start: {:?}", - cmd, program, e + cmd.0, program, e ), )), } @@ -3767,3 +3728,58 @@ impl AsmFileExt { None } } + +struct PrintThread { + handle: Option>, + pipe_writer: Option, +} + +impl PrintThread { + fn new() -> Result { + let (pipe_reader, pipe_writer) = os_pipe::pipe()?; + + // Capture the standard error coming from compilation, and write it out + // with cargo:warning= prefixes. Note that this is a bit wonky to avoid + // requiring the output to be UTF-8, we instead just ship bytes from one + // location to another. + let print = thread::spawn(move || { + let mut stderr = BufReader::with_capacity(4096, pipe_reader); + let mut line = String::with_capacity(20); + let mut stdout = io::stdout(); + + // read_line returns 0 on Eof + while stderr.read_line(&mut line).unwrap() != 0 { + writeln!(&mut stdout, "cargo:warning={}", line).ok(); + + // read_line does not clear the buffer + line.clear(); + } + }); + + Ok(Self { + handle: Some(print), + pipe_writer: Some(pipe_writer), + }) + } + + fn pipe_writer(&mut self) -> &mut Option { + &mut self.pipe_writer + } + + fn pipe_writer_cloned(&self) -> Result, Error> { + self.pipe_writer + .as_ref() + .map(os_pipe::PipeWriter::try_clone) + .transpose() + .map_err(From::from) + } +} + +impl Drop for PrintThread { + fn drop(&mut self) { + // Drop pipe_writer first to avoid deadlock + self.pipe_writer.take(); + + self.handle.take().unwrap().join().unwrap(); + } +}