From ed86b48cc9a8349e8e99de5f013f68f1edff4121 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 25 Nov 2013 18:27:27 -0800 Subject: [PATCH 1/2] Clean up statically initialized data on shutdown Whenever the runtime is shut down, add a few hooks to clean up some of the statically initialized data of the runtime. Note that this is an unsafe operation because there's no guarantee on behalf of the runtime that there's no other code running which is using the runtime. This helps turn down the noise a bit in the valgrind output related to statically initialized mutexes. It doesn't turn the noise down to 0 because there are still statically initialized mutexes in dynamic_lib and os::with_env_lock, but I believe that it would be easy enough to add exceptions for those cases and I don't think that it's the runtime's job to go and clean up that data. --- src/libstd/rt/args.rs | 9 ++++---- src/libstd/rt/local_ptr.rs | 31 +++++++++++++++++++++------ src/libstd/rt/mod.rs | 17 ++++++++++++--- src/libstd/rt/thread_local_storage.rs | 12 +++++++++++ 4 files changed, 55 insertions(+), 14 deletions(-) diff --git a/src/libstd/rt/args.rs b/src/libstd/rt/args.rs index 43e8096a8b113..7b27161ab5d7c 100644 --- a/src/libstd/rt/args.rs +++ b/src/libstd/rt/args.rs @@ -32,8 +32,8 @@ pub unsafe fn init(argc: int, argv: **u8) { imp::init(argc, argv) } pub unsafe fn init(argc: int, argv: **u8) { realargs::init(argc, argv) } /// One-time global cleanup. -#[cfg(not(test))] pub fn cleanup() { imp::cleanup() } -#[cfg(test)] pub fn cleanup() { realargs::cleanup() } +#[cfg(not(test))] pub unsafe fn cleanup() { imp::cleanup() } +#[cfg(test)] pub unsafe fn cleanup() { realargs::cleanup() } /// Take the global arguments from global storage. #[cfg(not(test))] pub fn take() -> Option<~[~str]> { imp::take() } @@ -74,14 +74,16 @@ mod imp { use vec; static mut global_args_ptr: uint = 0; + static mut lock: Mutex = MUTEX_INIT; pub unsafe fn init(argc: int, argv: **u8) { let args = load_argc_and_argv(argc, argv); put(args); } - pub fn cleanup() { + pub unsafe fn cleanup() { rtassert!(take().is_some()); + lock.destroy(); } pub fn take() -> Option<~[~str]> { @@ -108,7 +110,6 @@ mod imp { } fn with_lock(f: || -> T) -> T { - static mut lock: Mutex = MUTEX_INIT; (|| { unsafe { lock.lock(); diff --git a/src/libstd/rt/local_ptr.rs b/src/libstd/rt/local_ptr.rs index e0e8750e146fc..86f0f643c9962 100644 --- a/src/libstd/rt/local_ptr.rs +++ b/src/libstd/rt/local_ptr.rs @@ -41,28 +41,45 @@ pub static mut RT_TLS_PTR: *mut c_void = 0 as *mut c_void; #[cfg(stage0)] #[cfg(windows)] static mut RT_TLS_KEY: tls::Key = -1; +static mut tls_lock: Mutex = MUTEX_INIT; +static mut tls_initialized: bool = false; /// Initialize the TLS key. Other ops will fail if this isn't executed first. #[inline(never)] #[cfg(stage0)] #[cfg(windows)] pub fn init_tls_key() { - static mut lock: Mutex = MUTEX_INIT; - static mut initialized: bool = false; - unsafe { - lock.lock(); - if !initialized { + tls_lock.lock(); + if !tls_initialized { tls::create(&mut RT_TLS_KEY); - initialized = true; + tls_initialized = true; } - lock.unlock(); + tls_lock.unlock(); } } #[cfg(not(stage0), not(windows))] pub fn init_tls_key() {} +#[cfg(windows)] +pub unsafe fn cleanup() { + // No real use to acquiring a lock around these operations. All we're + // going to do is destroy the lock anyway which races locking itself. This + // is why the whole function is labeled as 'unsafe' + assert!(tls_initialized); + tls::destroy(RT_TLS_KEY); + tls_lock.destroy(); + tls_initialized = false; +} + +#[cfg(not(windows))] +pub unsafe fn cleanup() { + assert!(tls_initialized); + tls_lock.destroy(); + tls_initialized = false; +} + /// Give a pointer to thread-local storage. /// /// # Safety note diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 860b65b20c665..79b7dbf2aabf4 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -215,7 +215,8 @@ pub fn start(argc: int, argv: **u8, main: proc()) -> int { init(argc, argv); let exit_code = run(main); - cleanup(); + // unsafe is ok b/c we're sure that the runtime is gone + unsafe { cleanup(); } return exit_code; } @@ -228,7 +229,8 @@ pub fn start(argc: int, argv: **u8, main: proc()) -> int { pub fn start_on_main_thread(argc: int, argv: **u8, main: proc()) -> int { init(argc, argv); let exit_code = run_on_main_thread(main); - cleanup(); + // unsafe is ok b/c we're sure that the runtime is gone + unsafe { cleanup(); } return exit_code; } @@ -249,8 +251,17 @@ pub fn init(argc: int, argv: **u8) { } /// One-time runtime cleanup. -pub fn cleanup() { +/// +/// This function is unsafe because it performs no checks to ensure that the +/// runtime has completely ceased running. It is the responsibility of the +/// caller to ensure that the runtime is entirely shut down and nothing will be +/// poking around at the internal components. +/// +/// Invoking cleanup while portions of the runtime are still in use may cause +/// undefined behavior. +pub unsafe fn cleanup() { args::cleanup(); + local_ptr::cleanup(); } /// Execute the main function in a scheduler. diff --git a/src/libstd/rt/thread_local_storage.rs b/src/libstd/rt/thread_local_storage.rs index 8fa64852846a8..62e1b6c50d65f 100644 --- a/src/libstd/rt/thread_local_storage.rs +++ b/src/libstd/rt/thread_local_storage.rs @@ -34,6 +34,11 @@ pub unsafe fn get(key: Key) -> *mut c_void { pthread_getspecific(key) } +#[cfg(unix)] +pub unsafe fn destroy(key: Key) { + assert_eq!(0, pthread_key_delete(key)); +} + #[cfg(target_os="macos")] #[allow(non_camel_case_types)] // foreign type type pthread_key_t = ::libc::c_ulong; @@ -47,6 +52,7 @@ type pthread_key_t = ::libc::c_uint; #[cfg(unix)] extern { fn pthread_key_create(key: *mut pthread_key_t, dtor: *u8) -> c_int; + fn pthread_key_delete(key: pthread_key_t) -> c_int; fn pthread_getspecific(key: pthread_key_t) -> *mut c_void; fn pthread_setspecific(key: pthread_key_t, value: *mut c_void) -> c_int; } @@ -71,9 +77,15 @@ pub unsafe fn get(key: Key) -> *mut c_void { TlsGetValue(key) } +#[cfg(windows)] +pub unsafe fn destroy(key: Key) { + assert!(TlsFree(key) != 0); +} + #[cfg(windows)] extern "system" { fn TlsAlloc() -> DWORD; + fn TlsFree(dwTlsIndex: DWORD) -> BOOL; fn TlsGetValue(dwTlsIndex: DWORD) -> LPVOID; fn TlsSetValue(dwTlsIndex: DWORD, lpTlsvalue: LPVOID) -> BOOL; } From 5d6dbf3f262fabcb6cb920dd08be6f9d8df75d5c Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 25 Nov 2013 18:08:31 -0800 Subject: [PATCH 2/2] Improve the rt::thread module * Added doc comments explaining what all public functionality does. * Added the ability to spawn a detached thread * Added the ability for the procs to return a value in 'join' --- src/libstd/rt/local_ptr.rs | 9 +- src/libstd/rt/test.rs | 2 +- src/libstd/rt/thread.rs | 203 ++++++++++++++++++++++++++----------- src/libstd/task/spawn.rs | 2 +- 4 files changed, 154 insertions(+), 62 deletions(-) diff --git a/src/libstd/rt/local_ptr.rs b/src/libstd/rt/local_ptr.rs index 86f0f643c9962..6355de36d43bb 100644 --- a/src/libstd/rt/local_ptr.rs +++ b/src/libstd/rt/local_ptr.rs @@ -41,6 +41,8 @@ pub static mut RT_TLS_PTR: *mut c_void = 0 as *mut c_void; #[cfg(stage0)] #[cfg(windows)] static mut RT_TLS_KEY: tls::Key = -1; +#[cfg(stage0)] +#[cfg(windows)] static mut tls_lock: Mutex = MUTEX_INIT; static mut tls_initialized: bool = false; @@ -60,7 +62,11 @@ pub fn init_tls_key() { } #[cfg(not(stage0), not(windows))] -pub fn init_tls_key() {} +pub fn init_tls_key() { + unsafe { + tls_initialized = true; + } +} #[cfg(windows)] pub unsafe fn cleanup() { @@ -76,7 +82,6 @@ pub unsafe fn cleanup() { #[cfg(not(windows))] pub unsafe fn cleanup() { assert!(tls_initialized); - tls_lock.destroy(); tls_initialized = false; } diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs index 867d997e98d15..943b76dd1a0ec 100644 --- a/src/libstd/rt/test.rs +++ b/src/libstd/rt/test.rs @@ -336,7 +336,7 @@ pub fn spawntask_try(f: proc()) -> Result<(),()> { } /// Spawn a new task in a new scheduler and return a thread handle. -pub fn spawntask_thread(f: proc()) -> Thread { +pub fn spawntask_thread(f: proc()) -> Thread<()> { let f = Cell::new(f); diff --git a/src/libstd/rt/thread.rs b/src/libstd/rt/thread.rs index a0e66d2fd4eb4..9031147f8b139 100644 --- a/src/libstd/rt/thread.rs +++ b/src/libstd/rt/thread.rs @@ -8,13 +8,21 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +//! Native os-thread management +//! +//! This modules contains bindings necessary for managing OS-level threads. +//! These functions operate outside of the rust runtime, creating threads +//! which are not used for scheduling in any way. + #[allow(non_camel_case_types)]; use cast; +use kinds::Send; use libc; use ops::Drop; -use uint; +use option::{Option, Some, None}; use ptr; +use uint; #[cfg(windows)] use libc::types::os::arch::extra::{LPSECURITY_ATTRIBUTES, SIZE_T, @@ -22,112 +30,191 @@ use libc::types::os::arch::extra::{LPSECURITY_ATTRIBUTES, SIZE_T, #[cfg(windows)] type rust_thread = HANDLE; #[cfg(unix)] type rust_thread = libc::pthread_t; +#[cfg(windows)] type rust_thread_return = DWORD; +#[cfg(unix)] type rust_thread_return = *libc::c_void; -pub struct Thread { +type StartFn = extern "C" fn(*libc::c_void) -> rust_thread_return; + +/// This struct represents a native thread's state. This is used to join on an +/// existing thread created in the join-able state. +pub struct Thread { priv native: rust_thread, - priv joined: bool + priv joined: bool, + priv packet: ~Option, } static DEFAULT_STACK_SIZE: libc::size_t = 1024*1024; -#[cfg(windows)] type rust_thread_return = DWORD; -#[cfg(unix)] type rust_thread_return = *libc::c_void; +// This is the starting point of rust os threads. The first thing we do +// is make sure that we don't trigger __morestack (also why this has a +// no_split_stack annotation), and then we extract the main function +// and invoke it. +#[no_split_stack] +extern fn thread_start(main: *libc::c_void) -> rust_thread_return { + use rt::context; + unsafe { + context::record_stack_bounds(0, uint::max_value); + let f: ~proc() = cast::transmute(main); + (*f)(); + cast::transmute(0 as rust_thread_return) + } +} -impl Thread { - - pub fn start(main: proc()) -> Thread { - // This is the starting point of rust os threads. The first thing we do - // is make sure that we don't trigger __morestack (also why this has a - // no_split_stack annotation), and then we extract the main function - // and invoke it. - #[no_split_stack] - extern "C" fn thread_start(trampoline: *libc::c_void) -> rust_thread_return { - use rt::context; - unsafe { - context::record_stack_bounds(0, uint::max_value); - let f: ~proc() = cast::transmute(trampoline); - (*f)(); - } - unsafe { cast::transmute(0 as rust_thread_return) } - } +// There are two impl blocks b/c if T were specified at the top then it's just a +// pain to specify a type parameter on Thread::spawn (which doesn't need the +// type parameter). +impl Thread<()> { + + /// Starts execution of a new OS thread. + /// + /// This function will not wait for the thread to join, but a handle to the + /// thread will be returned. + /// + /// Note that the handle returned is used to acquire the return value of the + /// procedure `main`. The `join` function will wait for the thread to finish + /// and return the value that `main` generated. + /// + /// Also note that the `Thread` returned will *always* wait for the thread + /// to finish executing. This means that even if `join` is not explicitly + /// called, when the `Thread` falls out of scope its destructor will block + /// waiting for the OS thread. + pub fn start(main: proc() -> T) -> Thread { + + // We need the address of the packet to fill in to be stable so when + // `main` fills it in it's still valid, so allocate an extra ~ box to do + // so. + let packet = ~None; + let packet2: *mut Option = unsafe { + *cast::transmute::<&~Option, **mut Option>(&packet) + }; + let main: proc() = proc() unsafe { *packet2 = Some(main()); }; + let native = unsafe { native_thread_create(~main) }; - let native = native_thread_create(thread_start, ~main); Thread { native: native, joined: false, + packet: packet, } } - pub fn join(mut self) { + /// This will spawn a new thread, but it will not wait for the thread to + /// finish, nor is it possible to wait for the thread to finish. + /// + /// This corresponds to creating threads in the 'detached' state on unix + /// systems. Note that platforms may not keep the main program alive even if + /// there are detached thread still running around. + pub fn spawn(main: proc()) { + unsafe { + let handle = native_thread_create(~main); + native_thread_detach(handle); + } + } +} + +impl Thread { + /// Wait for this thread to finish, returning the result of the thread's + /// calculation. + pub fn join(mut self) -> T { assert!(!self.joined); - native_thread_join(self.native); + unsafe { native_thread_join(self.native) }; self.joined = true; + assert!(self.packet.is_some()); + self.packet.take_unwrap() } } -#[cfg(windows)] -fn native_thread_create(thread_start: extern "C" fn(*libc::c_void) -> rust_thread_return, - tramp: ~proc()) -> rust_thread { - unsafe { - let ptr: *mut libc::c_void = cast::transmute(tramp); - CreateThread(ptr::mut_null(), DEFAULT_STACK_SIZE, thread_start, ptr, 0, ptr::mut_null()) +#[unsafe_destructor] +impl Drop for Thread { + fn drop(&mut self) { + // This is required for correctness. If this is not done then the thread + // would fill in a return box which no longer exists. + if !self.joined { + unsafe { native_thread_join(self.native) }; + } } } #[cfg(windows)] -fn native_thread_join(native: rust_thread) { +unsafe fn native_thread_create(p: ~proc()) -> rust_thread { + let arg: *mut libc::c_void = cast::transmute(p); + CreateThread(ptr::mut_null(), DEFAULT_STACK_SIZE, thread_start, + arg, 0, ptr::mut_null()) +} + +#[cfg(windows)] +unsafe fn native_thread_join(native: rust_thread) { use libc::consts::os::extra::INFINITE; - unsafe { WaitForSingleObject(native, INFINITE); } + WaitForSingleObject(native, INFINITE); +} + +#[cfg(windows)] +unsafe fn native_thread_detach(native: rust_thread) { + assert!(libc::CloseHandle(native) != 0); } #[cfg(unix)] -fn native_thread_create(thread_start: extern "C" fn(*libc::c_void) -> rust_thread_return, - tramp: ~proc()) -> rust_thread { +unsafe fn native_thread_create(p: ~proc()) -> rust_thread { use unstable::intrinsics; - let mut native: libc::pthread_t = unsafe { intrinsics::uninit() }; - - unsafe { - use libc::consts::os::posix01::PTHREAD_CREATE_JOINABLE; + use libc::consts::os::posix01::PTHREAD_CREATE_JOINABLE; - let mut attr: libc::pthread_attr_t = intrinsics::uninit(); - assert!(pthread_attr_init(&mut attr) == 0); - assert!(pthread_attr_setstacksize(&mut attr, DEFAULT_STACK_SIZE) == 0); - assert!(pthread_attr_setdetachstate(&mut attr, PTHREAD_CREATE_JOINABLE) == 0); + let mut native: libc::pthread_t = intrinsics::uninit(); + let mut attr: libc::pthread_attr_t = intrinsics::uninit(); + assert_eq!(pthread_attr_init(&mut attr), 0); + assert_eq!(pthread_attr_setstacksize(&mut attr, DEFAULT_STACK_SIZE), 0); + assert_eq!(pthread_attr_setdetachstate(&mut attr, PTHREAD_CREATE_JOINABLE), 0); - let ptr: *libc::c_void = cast::transmute(tramp); - assert!(pthread_create(&mut native, &attr, thread_start, ptr) == 0); - } + let arg: *libc::c_void = cast::transmute(p); + assert_eq!(pthread_create(&mut native, &attr, thread_start, arg), 0); native } #[cfg(unix)] -fn native_thread_join(native: rust_thread) { - unsafe { assert!(pthread_join(native, ptr::null()) == 0) } +unsafe fn native_thread_join(native: rust_thread) { + assert_eq!(pthread_join(native, ptr::null()), 0); } -impl Drop for Thread { - fn drop(&mut self) { - assert!(self.joined); - } +#[cfg(unix)] +fn native_thread_detach(native: rust_thread) { + unsafe { assert_eq!(pthread_detach(native), 0) } } #[cfg(windows)] extern "system" { - fn CreateThread(lpThreadAttributes: LPSECURITY_ATTRIBUTES, dwStackSize: SIZE_T, - lpStartAddress: extern "C" fn(*libc::c_void) -> rust_thread_return, - lpParameter: LPVOID, dwCreationFlags: DWORD, lpThreadId: LPDWORD) -> HANDLE; + fn CreateThread(lpThreadAttributes: LPSECURITY_ATTRIBUTES, + dwStackSize: SIZE_T, + lpStartAddress: StartFn, + lpParameter: LPVOID, + dwCreationFlags: DWORD, + lpThreadId: LPDWORD) -> HANDLE; fn WaitForSingleObject(hHandle: HANDLE, dwMilliseconds: DWORD) -> DWORD; } #[cfg(unix)] extern { - fn pthread_create(native: *mut libc::pthread_t, attr: *libc::pthread_attr_t, - f: extern "C" fn(*libc::c_void) -> rust_thread_return, + fn pthread_create(native: *mut libc::pthread_t, + attr: *libc::pthread_attr_t, + f: StartFn, value: *libc::c_void) -> libc::c_int; - fn pthread_join(native: libc::pthread_t, value: **libc::c_void) -> libc::c_int; + fn pthread_join(native: libc::pthread_t, + value: **libc::c_void) -> libc::c_int; fn pthread_attr_init(attr: *mut libc::pthread_attr_t) -> libc::c_int; fn pthread_attr_setstacksize(attr: *mut libc::pthread_attr_t, stack_size: libc::size_t) -> libc::c_int; fn pthread_attr_setdetachstate(attr: *mut libc::pthread_attr_t, state: libc::c_int) -> libc::c_int; + fn pthread_detach(thread: libc::pthread_t) -> libc::c_int; +} + +#[cfg(test)] +mod tests { + use super::Thread; + + #[test] + fn smoke() { do Thread::start {}.join(); } + + #[test] + fn data() { assert_eq!(do Thread::start { 1 }.join(), 1); } + + #[test] + fn detached() { do Thread::spawn {} } } diff --git a/src/libstd/task/spawn.rs b/src/libstd/task/spawn.rs index 6c1c28c980559..198fe596a896e 100644 --- a/src/libstd/task/spawn.rs +++ b/src/libstd/task/spawn.rs @@ -139,7 +139,7 @@ pub fn spawn_raw(mut opts: TaskOpts, f: proc()) { let join_task = do Task::build_child(None) { debug!("running join task"); let thread_port = thread_port_cell.take(); - let thread: Thread = thread_port.recv(); + let thread: Thread<()> = thread_port.recv(); thread.join(); };