diff --git a/.Rbuildignore b/.Rbuildignore index 6604e4c..5c34272 100644 --- a/.Rbuildignore +++ b/.Rbuildignore @@ -13,4 +13,5 @@ revdep ^\.github$ ^CRAN-RELEASE$ .vscode/ -new-benchmarks.R \ No newline at end of file +new-benchmarks.R +bench* diff --git a/.github/workflows/R-CMD-check.yaml b/.github/workflows/R-CMD-check.yaml index f58ad71..76c6558 100644 --- a/.github/workflows/R-CMD-check.yaml +++ b/.github/workflows/R-CMD-check.yaml @@ -80,6 +80,8 @@ jobs: _R_CHECK_CRAN_INCOMING_: false run: rcmdcheck::rcmdcheck(args = c("--no-manual", "--as-cran"), error_on = "warning", check_dir = "check") shell: Rscript {0} + # run: cd .. && R CMD build RcppThread && R CMD check RcppThread_1.1.0.tar.gz + # shell: bash - name: Show testthat output if: always() diff --git a/.gitignore b/.gitignore index 896c7fb..9ddbf5f 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,5 @@ inst/doc revdep __pycache__ .vscode/ -new-benchmarks.R \ No newline at end of file +new-benchmarks.R +bench* diff --git a/NEWS.md b/NEWS.md index ea61483..ef190f2 100644 --- a/NEWS.md +++ b/NEWS.md @@ -5,7 +5,7 @@ * Add classes `ProgressCounter` and `ProgressBar` for tracking progress in long- running loops (#49). -* Increased speed for short running tasks due to lock-free queue (#50, #51). +* Increased speed due to lock-free queue (#51). # RcppThread 1.0.0 diff --git a/inst/include/RcppThread/Progress.hpp b/inst/include/RcppThread/Progress.hpp index 1868eb5..9fb353c 100644 --- a/inst/include/RcppThread/Progress.hpp +++ b/inst/include/RcppThread/Progress.hpp @@ -181,7 +181,7 @@ class ProgressBar : public ProgressPrinter { std::string makeBar(size_t pct, size_t numBars = 40) { std::ostringstream msg; msg << "["; - int i = 0; + size_t i = 0; for (; i < pct / 100.0 * numBars; i++) msg << "="; for (; i < numBars; i++) diff --git a/inst/include/RcppThread/ThreadPool.hpp b/inst/include/RcppThread/ThreadPool.hpp index 52f151c..29669ed 100644 --- a/inst/include/RcppThread/ThreadPool.hpp +++ b/inst/include/RcppThread/ThreadPool.hpp @@ -22,16 +22,6 @@ namespace RcppThread { -namespace util { -void -waitAndSync(tpool::FinishLine& finishLine) -{ - finishLine.wait_for(std::chrono::milliseconds(50)); - Rcout << ""; - checkUserInterrupt(); -} -} - //! Implemenation of the thread pool pattern based on `Thread`. class ThreadPool { @@ -46,13 +36,6 @@ class ThreadPool ThreadPool& operator=(const ThreadPool&) = delete; ThreadPool& operator=(ThreadPool&& other) = delete; - //! @brief returns a reference to the global thread pool instance. - static ThreadPool& globalInstance() - { - static ThreadPool instance_; - return instance_; - } - template void push(F&& f, Args&&... args); @@ -76,21 +59,13 @@ class ThreadPool void clear(); private: - void startWorker(); void joinWorkers(); + void execute(std::function& task); - template - void executeSafely(Task& task); - - bool allJobsDone(); - void waitForEvents(); - void rethrowExceptions(); - - std::vector workers_; - size_t nWorkers_; - // variables for synchronization between workers + // variables for synchronization between workers (destructed last) tpool::detail::TaskManager taskManager_; - tpool::FinishLine finishLine_{ 0 }; + tpool::TodoList todoList_; + std::vector workers_; }; //! constructs a thread pool with as many workers as there are cores. @@ -102,10 +77,21 @@ inline ThreadPool::ThreadPool() //! @param nWorkers number of worker threads to create; if `nWorkers = 0`, all //! work pushed to the pool will be done in the main thread. inline ThreadPool::ThreadPool(size_t nWorkers) - : nWorkers_(nWorkers) + : taskManager_{ nWorkers } { - for (size_t w = 0; w < nWorkers_; w++) - this->startWorker(); + for (size_t id = 0; id < nWorkers; id++) { + workers_.emplace_back([this, id] { + std::function task; + while (!taskManager_.stopped()) { + taskManager_.wait_for_jobs(id); + do { + // use inner while to save a few cash misses calling done() + while (taskManager_.try_pop(task, id)) + execute(task); + } while (!todoList_.empty()); + } + }); + } } //! destructor joins all threads if possible. @@ -129,9 +115,10 @@ template void ThreadPool::push(F&& f, Args&&... args) { - if (nWorkers_ == 0) { + if (workers_.size() == 0) { f(args...); // if there are no workers, do the job in the main thread } else { + todoList_.add(); taskManager_.push( std::bind(std::forward(f), std::forward(args)...)); } @@ -148,11 +135,11 @@ auto ThreadPool::pushReturn(F&& f, Args&&... args) -> std::future { - using task = std::packaged_task; - auto pack = std::bind(std::forward(f), std::forward(args)...); - auto taskPtr = std::make_shared(std::move(pack)); - taskManager_.push([taskPtr] { (*taskPtr)(); }); - return taskPtr->get_future(); + auto task = std::bind(std::forward(f), std::forward(args)...); + using pack_t = std::packaged_task; + auto ptr = std::make_shared(std::move(task)); + this->push([ptr] { (*ptr)(); }); + return ptr->get_future(); } //! maps a function on a list of items, possibly running tasks in parallel. @@ -202,7 +189,7 @@ ThreadPool::parallelFor(ptrdiff_t begin, size_t size, F&& f, size_t nBatches) for (ptrdiff_t i = b.begin; i < b.end; i++) f(i); }; - auto batches = createBatches(begin, size, nWorkers_, nBatches); + auto batches = createBatches(begin, size, workers_.size(), nBatches); auto pushJob = [=] { for (const auto& batch : batches) this->push(doBatch, batch); @@ -244,8 +231,12 @@ ThreadPool::parallelForEach(I& items, F&& f, size_t nBatches) inline void ThreadPool::wait() { - while (!taskManager_.empty()) - util::waitAndSync(finishLine_); + while (!todoList_.empty()) { + todoList_.wait(50); + Rcout << ""; + checkUserInterrupt(); + } + Rcout << ""; } //! waits for all jobs to finish and joins all threads. @@ -264,31 +255,14 @@ ThreadPool::clear() taskManager_.clear(); } -//! spawns a worker thread waiting for jobs to arrive. -inline void -ThreadPool::startWorker() -{ - workers_.emplace_back([this] { - std::function task; - while (!taskManager_.stopped()) { - taskManager_.wait_for_jobs(); - - finishLine_.start(); - while (taskManager_.try_pop(task)) - executeSafely(task); - finishLine_.cross(); - } - }); -} - -template inline void -ThreadPool::executeSafely(Task& task) +ThreadPool::execute(std::function& task) { try { task(); + todoList_.cross(); } catch (...) { - finishLine_.abort(std::current_exception()); + todoList_.clear(std::current_exception()); } } diff --git a/inst/include/RcppThread/parallelFor.hpp b/inst/include/RcppThread/parallelFor.hpp index a4baf79..994ccb7 100644 --- a/inst/include/RcppThread/parallelFor.hpp +++ b/inst/include/RcppThread/parallelFor.hpp @@ -15,7 +15,9 @@ namespace RcppThread { //! @param begin first index of the loop. //! @param end the loop runs in the range `[begin, end)`. //! @param f a function (the 'loop body'). -//! @param nThreads deprecated; loop is run on global thread pool. +//! @param nThreads the number of threads to use; the default uses the number +//! of cores in the machine; if `nThreads = 0`, all work will be done in the +//! main thread. //! @param nBatches the number of batches to create; the default (0) //! triggers a heuristic to automatically determine the number of batches. //! @details Consider the following code: @@ -31,8 +33,9 @@ namespace RcppThread { //! x[i] = i; //! }); //! ``` -//! The function dispatches to a global thread pool, so it can safely be nested -//! or called multiple times with almost no overhead. +//! The function sets up a `ThreadPool` object to do the scheduling. If you +//! want to run multiple parallel for loops, consider creating a `ThreadPool` +//! yourself and using `ThreadPool::parallelFor()`. //! //! **Caution**: if the iterations are not independent from another, //! the tasks need to be synchronized manually (e.g., using mutexes). @@ -49,24 +52,18 @@ parallelFor(int begin, if (end == begin) return; - nThreads = std::thread::hardware_concurrency(); - auto batches = createBatches(begin, end - begin, nThreads, nBatches); - tpool::FinishLine finishLine{ batches.size() }; - auto doBatch = [f, &finishLine](const Batch& b) { - for (ptrdiff_t i = b.begin; i < b.end; i++) - f(i); - finishLine.cross(); - }; - for (const auto& batch : batches) - ThreadPool::globalInstance().push(doBatch, batch); - util::waitAndSync(finishLine); + ThreadPool pool(nThreads); + pool.parallelFor(begin, end, std::forward(f), nBatches); + pool.join(); } //! computes a range-based for loop in parallel batches. //! @param items an object allowing for `items.size()` and whose elements //! are accessed by the `[]` operator. //! @param f a function (the 'loop body'). -//! @param nThreads deprecated; loop is run on global thread pool. +//! @param nThreads the number of threads to use; the default uses the number +//! of cores in the machine; if `nThreads = 0`, all work will be done in the +//! main thread. //! @param nBatches the number of batches to create; the default (0) //! triggers a heuristic to automatically determine the number of batches. //! @details Consider the following code: @@ -82,8 +79,9 @@ parallelFor(int begin, //! xx *= 2; //! }); //! ``` -//! The function dispatches to a global thread pool, so it can safely be nested -//! or called multiple times with almost no overhead. +//! The function sets up a `ThreadPool` object to do the scheduling. If you +//! want to run multiple parallel for loops, consider creating a `ThreadPool` +//! yourself and using `ThreadPool::parallelForEach()`. //! //! **Caution**: if the iterations are not independent from another, //! the tasks need to be synchronized manually (e.g., using mutexes). @@ -95,9 +93,9 @@ parallelForEach(I& items, size_t nBatches = 0) { // loop ranges ranges indicate iterator offset - const auto begin_it = std::begin(items); - const auto end_it = std::end(items); - auto size = std::distance(begin_it, end_it); - parallelFor(0, size, [f, &items, &begin_it](int i) { f(*(begin_it + i)); }); + ThreadPool pool(nThreads); + pool.parallelForEach(items, std::forward(f), nBatches); + pool.join(); } + } diff --git a/inst/include/RcppThread/tpool.hpp b/inst/include/RcppThread/tpool.hpp index af04a26..9600b23 100644 --- a/inst/include/RcppThread/tpool.hpp +++ b/inst/include/RcppThread/tpool.hpp @@ -30,70 +30,70 @@ //! tpool namespace namespace tpool { -//! @brief Finish line - a synchronization primitive. -//! -//! Lets some threads wait until others reach a control point. Start a runner -//! with `FinishLine::start()`, and wait for all runners to finish with -//! `FinishLine::wait()`. -class FinishLine +//! @brief Todo list - a synchronization primitive. +//! @details Add a task with `add()`, cross it off with `cross()`, and wait for +//! all tasks to complete with `wait()`. +class TodoList { public: - //! constructs a finish line. - //! @param runners number of initial runners. - FinishLine(size_t runners = 0) noexcept - : runners_(runners) + //! constructs the todo list. + //! @param num_tasks initial number of tasks. + TodoList(size_t num_tasks = 0) noexcept + : num_tasks_(num_tasks) {} - //! adds runners. - //! @param runners adds runners to the race. - void add(size_t runners = 1) noexcept { runners_ = runners_ + runners; } + //! adds tasks to the list. + //! @param num_tasks add that many tasks to the list. + void add(size_t num_tasks = 1) noexcept { num_tasks_.fetch_add(num_tasks); } - //! adds a single runner. - void start() noexcept { ++runners_; } - - //! indicates that a runner has crossed the finish line. - void cross() noexcept + //! crosses tasks from the list. + //! @param num_tasks cross that many tasks to the list. + void cross(size_t num_tasks = 1) { - if (--runners_ <= 0) { + num_tasks_.fetch_sub(num_tasks); + if (num_tasks_ <= 0) { + { + std::lock_guard lk(mtx_); // must lock before signal + } cv_.notify_all(); } } - //! waits for all active runners to cross the finish line. - void wait() noexcept - { - std::unique_lock lk(mtx_); - while ((runners_ > 0) && !exception_ptr_) - cv_.wait(lk); - if (exception_ptr_) - std::rethrow_exception(exception_ptr_); - } + //! checks whether list is empty. + bool empty() const noexcept { return num_tasks_ == 0; } - //! waits for all active runners to cross the finish line. - //! @param duration maximal waiting time (as `std::chrono::duration`). - template - void wait_for(const Duration& duration) noexcept + //! waits for the list to be empty. + //! @param millis if > 0; waiting aborts after waiting that many + //! milliseconds. + void wait(size_t millis = 0) { + std::this_thread::yield(); + auto wake_up = [this] { return (num_tasks_ <= 0) || exception_ptr_; }; std::unique_lock lk(mtx_); - while ((runners_ > 0) && !exception_ptr_) - cv_.wait_for(lk, duration); + if (millis == 0) { + cv_.wait(lk, wake_up); + } else { + cv_.wait_for(lk, std::chrono::milliseconds(millis), wake_up); + } if (exception_ptr_) std::rethrow_exception(exception_ptr_); } - //! aborts the race. + //! clears the list. //! @param eptr (optional) pointer to an active exception to be rethrown by //! a waiting thread; typically retrieved from `std::current_exception()`. - void abort(std::exception_ptr eptr = nullptr) noexcept + void clear(std::exception_ptr eptr = nullptr) noexcept { - std::lock_guard lk(mtx_); - runners_ = 0; - exception_ptr_ = eptr; + { + std::lock_guard lk(mtx_); + num_tasks_ = 0; + exception_ptr_ = eptr; + } cv_.notify_all(); } private: - alignas(64) std::atomic runners_; + alignas(64) std::atomic_int num_tasks_{ 0 }; std::mutex mtx_; std::condition_variable cv_; std::exception_ptr exception_ptr_{ nullptr }; @@ -108,9 +108,10 @@ class RingBuffer { public: explicit RingBuffer(size_t capacity) - : capacity_{ capacity } + : buffer_{ std::unique_ptr(new T[capacity]) } + , capacity_{ capacity } , mask_{ capacity - 1 } - , buffer_{ std::unique_ptr(new T[capacity]) } + { if (capacity_ & (capacity_ - 1)) throw std::runtime_error("capacity must be a power of two"); @@ -118,16 +119,15 @@ class RingBuffer size_t capacity() const { return capacity_; } - void store(size_t i, T&& x) { buffer_[i & mask_] = std::move(x); } + void set_entry(size_t i, T val) { buffer_[i & mask_] = val; } - T load(size_t i) const { return buffer_[i & mask_]; } + T get_entry(size_t i) const { return buffer_[i & mask_]; } - // creates a new ring buffer with pointers to current elements. - RingBuffer* enlarge(size_t bottom, size_t top) const + RingBuffer* enlarged_copy(size_t bottom, size_t top) const { RingBuffer* new_buffer = new RingBuffer{ 2 * capacity_ }; for (size_t i = top; i != bottom; ++i) - new_buffer->store(i, this->load(i)); + new_buffer->set_entry(i, this->get_entry(i)); return new_buffer; } @@ -139,12 +139,12 @@ class RingBuffer // exchange is not available in C++11, use implementatino from // https://en.cppreference.com/w/cpp/utility/exchange -template +template T -exchange(T& obj, U&& new_value) noexcept +exchange(T& obj, T&& new_value) noexcept { T old_value = std::move(obj); - obj = std::forward(new_value); + obj = std::forward(new_value); return old_value; } @@ -154,63 +154,91 @@ class TaskQueue using Task = std::function; public: - //! constructs the que with a given capacity. + //! constructs the queue with a given capacity. //! @param capacity must be a power of two. TaskQueue(size_t capacity = 256) - : buffer_{ new RingBuffer(capacity) } + : buffer_{ new RingBuffer(capacity) } {} - ~TaskQueue() noexcept { delete buffer_.load(); } - TaskQueue(TaskQueue const& other) = delete; - TaskQueue& operator=(TaskQueue const& other) = delete; - - //! queries the size. - size_t size() const + ~TaskQueue() noexcept { - auto b = bottom_.load(m_relaxed); - auto t = top_.load(m_relaxed); - return static_cast(b >= t ? b - t : 0); + // must free memory allocated by push(), but not deallocated by pop() + auto buf_ptr = buffer_.load(); + for (int i = top_; i < bottom_.load(m_relaxed); ++i) + delete buf_ptr->get_entry(i); + delete buf_ptr; } - //! queries the capacity. - size_t capacity() const { return buffer_.load(m_relaxed)->capacity(); } + TaskQueue(TaskQueue const& other) = delete; + TaskQueue& operator=(TaskQueue const& other) = delete; //! checks if queue is empty. - bool empty() const { return (this->size() == 0); } + bool empty() const + { + return (bottom_.load(m_relaxed) <= top_.load(m_relaxed)); + } //! clears the queue. void clear() { + std::lock_guard lk(mutex_); // prevents concurrent push + auto buf_ptr = buffer_.load(); auto b = bottom_.load(m_relaxed); - top_.store(b, m_release); + int t; + while (true) { + // try until we can set top = bottom; this might fail if someone + // pops concurrently + t = top_.load(m_relaxed); + if (top_.compare_exchange_weak(t, b, m_release, m_relaxed)) + break; + } + for (int i = t; i < b; ++i) + delete buf_ptr->get_entry(i); // free memory for unpopped items } //! pushes a task to the bottom of the queue; returns false if queue is //! currently locked; enlarges the queue if full. bool try_push(Task&& task) { - // must hold lock in case there are multiple producers, abort if already - // taken, so we can check out next queue - std::unique_lock lk(mutex_, std::try_to_lock); - if (!lk) - return false; + { + // must hold lock in case of multiple producers, abort if already + // taken, so we can check out next queue + std::unique_lock lk(mutex_, std::try_to_lock); + if (!lk) + return false; + this->push_unsafe(std::forward(task)); + } + cv_.notify_one(); + return true; + } + + //! pushes a task to the bottom of the queue; enlarges the queue if full. + void force_push(Task&& task) + { + { + // must hold lock in case of multiple producers + std::lock_guard lk(mutex_); + this->push_unsafe(std::forward(task)); + } + cv_.notify_one(); + } + //! pushes a task without locking the queue (enough for single producer) + void push_unsafe(Task&& task) + { auto b = bottom_.load(m_relaxed); auto t = top_.load(m_acquire); - RingBuffer* buf_ptr = buffer_.load(m_relaxed); + RingBuffer* buf_ptr = buffer_.load(m_relaxed); - if (buf_ptr->capacity() < (b - t) + 1) { + if (static_cast(buf_ptr->capacity()) < (b - t) + 1) { + // buffer is full, create enlarged copy before continuing old_buffers_.emplace_back( - exchange(buf_ptr, buf_ptr->enlarge(b, t))); + exchange(buf_ptr, buf_ptr->enlarged_copy(b, t))); buffer_.store(buf_ptr, m_relaxed); } - buf_ptr->store(b, std::move(task)); - - std::atomic_thread_fence(m_release); - bottom_.store(b + 1, m_relaxed); - - return true; + buf_ptr->set_entry(b, new Task{ std::forward(task) }); + bottom_.store(b + 1, m_release); } //! pops a task from the top of the queue; returns false if lost race. @@ -221,69 +249,81 @@ class TaskQueue auto b = bottom_.load(m_acquire); if (t < b) { - // must load task pointer before acquiring the slot - task = buffer_.load(m_consume)->load(t); + // must load task pointer before acquiring the slot, because it + // could be overwritten immediately after + auto task_ptr = buffer_.load(m_acquire)->get_entry(t); + if (top_.compare_exchange_strong(t, t + 1, m_seq_cst, m_relaxed)) { - return true; // won race + task = std::move(*task_ptr); // won race, get task + delete task_ptr; // fre memory allocated in push_unsafe() + return true; } } return false; // queue is empty or lost race } + void wait() + { + std::unique_lock lk(mutex_); + cv_.wait(lk, [this] { return !this->empty() || stopped_; }); + } + + void stop() + { + { + std::lock_guard lk(mutex_); + stopped_ = true; + } + cv_.notify_all(); + } + private: - alignas(64) std::atomic_ptrdiff_t top_{ 0 }; - alignas(64) std::atomic_ptrdiff_t bottom_{ 0 }; - alignas(64) std::atomic*> buffer_{ nullptr }; - std::vector>> old_buffers_; + alignas(64) std::atomic_int top_{ 0 }; + alignas(64) std::atomic_int bottom_{ 0 }; + alignas(64) std::atomic*> buffer_{ nullptr }; + std::vector>> old_buffers_; std::mutex mutex_; + std::condition_variable cv_; + std::atomic stopped_; // convenience aliases static constexpr std::memory_order m_relaxed = std::memory_order_relaxed; static constexpr std::memory_order m_acquire = std::memory_order_acquire; static constexpr std::memory_order m_release = std::memory_order_release; static constexpr std::memory_order m_seq_cst = std::memory_order_seq_cst; - static constexpr std::memory_order m_consume = std::memory_order_consume; }; //! Task manager based on work stealing struct TaskManager { std::vector queues_; - std::mutex m_; - std::condition_variable cv_; - std::atomic_bool stopped_{ false }; - alignas(64) std::atomic_size_t push_idx_; - alignas(64) std::atomic_size_t pop_idx_; size_t num_queues_; + alignas(64) std::atomic_size_t push_idx_{ 0 }; + std::atomic_bool stopped_{ false }; + std::atomic_size_t todo_list_{ 0 }; - TaskManager(size_t num_queues = 1) - : num_queues_{ num_queues } - , queues_{ std::vector(num_queues) } + explicit TaskManager(size_t num_queues) + : queues_{ std::vector(num_queues) } + , num_queues_{ num_queues } {} template void push(Task&& task) { - while (!queues_[push_idx_++ % num_queues_].try_push(task)) - continue; - cv_.notify_one(); - } - - bool empty() - { - for (auto& q : queues_) { - if (!q.empty()) - return false; + for (size_t count = 0; count < num_queues_ * 20; count++) { + if (queues_[push_idx_++ % num_queues_].try_push(task)) + return; } - return true; + queues_[push_idx_++ % num_queues_].force_push(task); } - bool try_pop(std::function& task) + template + bool try_pop(Task& task, size_t worker_id = 0) { - do { - if (queues_[pop_idx_++ % num_queues_].try_pop(task)) + for (size_t k = 0; k <= num_queues_; k++) { + if (queues_[(worker_id + k) % num_queues_].try_pop(task)) return true; - } while (!this->empty()); + } return false; } @@ -293,208 +333,18 @@ struct TaskManager q.clear(); } - bool stopped() { return stopped_; } - - void wait_for_jobs() - { - std::unique_lock lk(m_); - cv_.wait(lk, [this] { return !this->empty() || stopped_; }); - } + void wait_for_jobs(size_t id) { queues_[id].wait(); } void stop() { - { - std::lock_guard lk(m_); - stopped_ = true; - } - cv_.notify_all(); - this->clear(); - } -}; - -} // end namespace detail - -//! A work stealing thread pool. -class ThreadPool -{ - public: - //! constructs a thread pool. - //! @param n_workers number of worker threads to create; defaults to number - //! of available (virtual) hardware cores. - explicit ThreadPool( - size_t num_threads = std::thread::hardware_concurrency()); - - ThreadPool(ThreadPool&&) = delete; - ThreadPool(const ThreadPool&) = delete; - ThreadPool& operator=(const ThreadPool&) = delete; - ThreadPool& operator=(ThreadPool&& other) = delete; - ~ThreadPool() noexcept; - - //! @brief returns a reference to the global thread pool instance. - static ThreadPool& global_instance() - { - static ThreadPool instance_; - return instance_; + for (auto& q : queues_) + q.stop(); + stopped_ = true; } - //! @brief pushes a job to the thread pool. - //! @param f a function. - //! @param args (optional) arguments passed to `f`. - template - void push(Function&& f, Args&&... args); - - //! @brief executes a job asynchronously the global thread pool. - //! @param f a function. - //! @param args (optional) arguments passed to `f`. - //! @return A `std::future` for the task. Call `future.get()` to retrieve - //! the results at a later point in time (blocking). - template - auto async(Function&& f, Args&&... args) - -> std::future; - - //! @brief waits for all jobs currently running on the global thread pool. - void wait(); - //! @brief clears all jobs currently running on the global thread pool. - void clear(); - - private: - void start_worker(); - void join_workers(); - template - void execute_safely(Task& task); - - std::vector workers_; - detail::TaskManager task_manager_; - size_t n_workers_; - std::exception_ptr error_ptr_{ nullptr }; - FinishLine finish_line_{ 0 }; + bool stopped() { return stopped_; } }; -inline ThreadPool::ThreadPool(size_t n_workers) - : n_workers_{ n_workers } -{ - for (size_t id = 0; id < n_workers; ++id) - this->start_worker(); -} - -inline ThreadPool::~ThreadPool() noexcept -{ - try { - task_manager_.stop(); - this->join_workers(); - } catch (...) { - // destructors should never throw - } -} - -template -void -ThreadPool::push(Function&& f, Args&&... args) -{ - task_manager_.push( - std::bind(std::forward(f), std::forward(args)...)); -} - -template -auto -ThreadPool::async(Function&& f, Args&&... args) - -> std::future -{ - using task = std::packaged_task; - auto pack = - std::bind(std::forward(f), std::forward(args)...); - auto task_ptr = std::make_shared(std::move(pack)); - task_manager_.push([task_ptr] { (*task_ptr)(); }); - return task_ptr->get_future(); -} - -void -ThreadPool::wait() -{ - while (!task_manager_.empty()) - finish_line_.wait(); -} - -void -ThreadPool::clear() -{ - task_manager_.clear(); -} - -inline void -ThreadPool::start_worker() -{ - workers_.emplace_back([this] { - std::function task; - while (!task_manager_.stopped()) { - task_manager_.wait_for_jobs(); - - finish_line_.start(); - while (task_manager_.try_pop(task)) - execute_safely(task); - finish_line_.cross(); - } - }); -} - -template -inline void -ThreadPool::execute_safely(Task& task) -{ - try { - task(); - } catch (...) { - finish_line_.abort(std::current_exception()); - } -} - -inline void -ThreadPool::join_workers() -{ - for (auto& worker : workers_) { - if (worker.joinable()) - worker.join(); - } -} - -//! Direct access to the global thread pool ------------------------------------ - -//! @brief push a job to the global thread pool. -//! @param f a function. -//! @param args (optional) arguments passed to `f`. -template -void -push(Function&& f, Args&&... args) -{ - ThreadPool::global_instance().push(std::forward(f), - std::forward(args)...); -} - -//! @brief executes a job asynchronously the global thread pool. -//! @param f a function. -//! @param args (optional) arguments passed to `f`. -//! @return A `std::future` for the task. Call `future.get()` to retrieve the -//! results at a later point in time (blocking). -template -auto -async(Function&& f, Args&&... args) -> std::future -{ - return ThreadPool::global_instance().async(std::forward(f), - std::forward(args)...); -} - -//! @brief waits for all jobs currently running on the global thread pool. -void -wait() -{ - ThreadPool::global_instance().wait(); -} - -//! @brief clears all jobs currently running on the global thread pool. -void -clear() -{ - ThreadPool::global_instance().clear(); -} +} // end namespace detail -} \ No newline at end of file +} // end namespace tpool diff --git a/tests/tests.cpp b/tests/tests.cpp index 68a3818..9e78b3a 100644 --- a/tests/tests.cpp +++ b/tests/tests.cpp @@ -11,15 +11,18 @@ using namespace RcppThread; // [[Rcpp::export]] -void testMonitor() +void +testMonitor() { - auto checks = [] () -> void { - checkUserInterrupt(); // should have no effect since not main - Rcout << "RcppThread says hi!" << std::endl; // should print to R console + auto checks = []() -> void { + checkUserInterrupt(); // should have no effect since not main + Rcout << "RcppThread says hi!" + << std::endl; // should print to R console if (isInterrupted()) - throw std::runtime_error("isInterrupted should not return 'true'"); + Rcout << "isInterrupted should not return 'true'" << std::endl; if (isInterrupted(false)) - throw std::runtime_error("isInterrupted checks despite condition is 'false'"); + Rcout << "isInterrupted checks despite condition is 'false'" + << std::endl; }; std::thread t = std::thread(checks); @@ -28,12 +31,13 @@ void testMonitor() } // [[Rcpp::export]] -void testThreadClass() +void +testThreadClass() { // check if all methods work std::atomic printID; printID = 1; - auto dummy = [&] () -> void { + auto dummy = [&]() -> void { checkUserInterrupt(); Rcout << printID++; std::this_thread::sleep_for(std::chrono::milliseconds(200)); @@ -43,7 +47,7 @@ void testThreadClass() std::this_thread::sleep_for(std::chrono::milliseconds(200)); t0.detach(); if (t0.joinable()) - throw std::runtime_error("thread wasn't detached"); + Rcout << "thread wasn't detached" << std::endl; Thread t1 = Thread(dummy); Thread t2 = Thread(dummy); t1.swap(t2); @@ -53,114 +57,111 @@ void testThreadClass() } // [[Rcpp::export]] -void testThreadPoolPush() +void +testThreadPoolPush() { - ThreadPool pool(2); - std::vector x(1000000, 1); - auto dummy = [&] (size_t i) -> void { + ThreadPool pool; + std::vector x(100000, 1); + auto dummy = [&](size_t i) -> void { checkUserInterrupt(); x[i] = 2 * x[i]; }; - for (size_t i = 0; i < x.size() / 2; i++) + for (size_t i = 0; i < x.size(); i++) pool.push(dummy, i); - pool.join(); + pool.wait(); size_t count_wrong = 0; - for (size_t i = 0; i < x.size() / 2; i++) + for (size_t i = 0; i < x.size(); i++) count_wrong += (x[i] != 2); - for (size_t i = x.size() / 2 + 1; i < x.size(); i++) - count_wrong += (x[i] != 1); if (count_wrong > 0) - throw std::runtime_error("push gives wrong result"); + Rcout << "push gives wrong result" << std::endl; } // [[Rcpp::export]] -void testThreadPoolPushReturn() +void +testThreadPoolPushReturn() { - ThreadPool pool(2); - std::vector x(1000000, 1); - auto dummy = [&] (size_t i) { + ThreadPool pool; + std::vector x(100000, 1); + auto dummy = [&x](size_t i) -> size_t { checkUserInterrupt(); - return 2 * x[i]; + x[i] = 2 * x[i]; + return x[i]; }; std::vector> fut(x.size()); - for (size_t i = 0; i < x.size() / 2; i++) + for (size_t i = 0; i < x.size(); i++) fut[i] = pool.pushReturn(dummy, i); - for (size_t i = 0; i < x.size() / 2; i++) - x[i] = fut[i].get(); + for (size_t i = 0; i < x.size(); i++) + fut[i].get(); pool.join(); size_t count_wrong = 0; - for (size_t i = 0; i < x.size() / 2; i++) + for (size_t i = 0; i < x.size(); i++) count_wrong += (x[i] != 2); - for (size_t i = x.size() / 2 + 1; i < x.size(); i++) - count_wrong += (x[i] != 1); if (count_wrong > 0) - throw std::runtime_error("push gives wrong result"); + Rcout << "push gives wrong result" << std::endl; } // [[Rcpp::export]] -void testThreadPoolMap() +void +testThreadPoolMap() { - ThreadPool pool(2); + ThreadPool pool; std::vector x(1000000, 1); - auto dummy = [&] (size_t i) -> void { + auto dummy = [&](size_t i) -> void { checkUserInterrupt(); x[i] = 2 * x[i]; }; - auto ids = std::vector(x.size() / 2); + auto ids = std::vector(x.size()); for (size_t i = 0; i < ids.size(); i++) ids[i] = i; pool.map(dummy, ids); pool.join(); size_t count_wrong = 0; - for (size_t i = 0; i < x.size() / 2; i++) + for (size_t i = 0; i < x.size(); i++) count_wrong += (x[i] != 2); - for (size_t i = x.size() / 2 + 1; i < x.size(); i++) - count_wrong += (x[i] != 1); if (count_wrong > 0) - throw std::runtime_error("map gives wrong result"); + Rcout << "map gives wrong result" << std::endl; + ; } // [[Rcpp::export]] -void testThreadPoolParallelFor() +void +testThreadPoolParallelFor() { - ThreadPool pool(2); + ThreadPool pool; std::vector x(1000000, 1); - auto dummy = [&] (size_t i) -> void { + auto dummy = [&](size_t i) -> void { checkUserInterrupt(); x[i] = 2 * x[i]; }; - pool.parallelFor(0, x.size() / 2, dummy, 1); + pool.parallelFor(0, x.size(), dummy, 1); pool.join(); size_t count_wrong = 0; - for (size_t i = 0; i < x.size() / 2; i++) + for (size_t i = 0; i < x.size(); i++) count_wrong += (x[i] != 2); - for (size_t i = x.size() / 2 + 1; i < x.size(); i++) - count_wrong += (x[i] != 1); if (count_wrong > 0) - throw std::runtime_error("parallelFor gives wrong result"); + Rcout << "parallelFor gives wrong result" << std::endl; } // [[Rcpp::export]] -void testThreadPoolNestedParallelFor() +void +testThreadPoolNestedParallelFor() { - ThreadPool pool(2); + ThreadPool pool; std::vector> x(100); - for (auto &xx : x) + for (auto& xx : x) xx = std::vector(100, 1.0); - pool.parallelFor(0, x.size(), [&] (int i) { - pool.parallelFor(0, x[i].size(), [&x, i] (int j) { - x[i][j] *= 2; - }); + pool.parallelFor(0, x.size(), [&](int i) { + pool.parallelFor(0, x[i].size(), [&x, i](int j) { x[i][j] *= 2; }); }); pool.wait(); @@ -170,53 +171,46 @@ void testThreadPoolNestedParallelFor() count_wrong += xxx != 2; } if (count_wrong > 0) { - for (auto xx : x) { - for (auto xxx : xx) - std::cout << xxx; - std::cout << std::endl; - } - throw std::runtime_error("nested parallelFor gives wrong result"); + Rcout << "nested parallelFor gives wrong result" << std::endl; } } // [[Rcpp::export]] -void testThreadPoolParallelForEach() +void +testThreadPoolParallelForEach() { - ThreadPool pool(2); + ThreadPool pool; std::vector x(1000000, 1); - auto dummy = [&] (size_t i) -> void { + auto dummy = [&](size_t i) -> void { checkUserInterrupt(); x[i] = 2 * x[i]; }; - auto ids = std::vector(x.size() / 2); + auto ids = std::vector(x.size()); for (size_t i = 0; i < ids.size(); i++) ids[i] = i; pool.parallelForEach(ids, dummy); pool.join(); size_t count_wrong = 0; - for (size_t i = 0; i < x.size() / 2; i++) + for (size_t i = 0; i < x.size(); i++) count_wrong += (x[i] != 2); - for (size_t i = x.size() / 2 + 1; i < x.size(); i++) - count_wrong += (x[i] != 1); if (count_wrong > 0) - throw std::runtime_error("parallelForEach gives wrong result"); + Rcout << "parallelForEach gives wrong result" << std::endl; } // [[Rcpp::export]] -void testThreadPoolNestedParallelForEach() +void +testThreadPoolNestedParallelForEach() { - ThreadPool pool(2); + ThreadPool pool; std::vector> x(100); - for (auto &xx : x) + for (auto& xx : x) xx = std::vector(100, 1.0); - pool.parallelForEach(x, [&pool] (std::vector& xx) { - pool.parallelForEach(xx, [] (double& xxx) { - xxx *= 2; - }); + pool.parallelForEach(x, [&pool](std::vector& xx) { + pool.parallelForEach(xx, [](double& xxx) { xxx *= 2; }); }); pool.wait(); @@ -226,79 +220,76 @@ void testThreadPoolNestedParallelForEach() count_wrong += xxx != 2; } if (count_wrong > 0) { - for (auto xx : x) { - for (auto xxx : xx) - std::cout << xxx; - std::cout << std::endl; - } - throw std::runtime_error("nested parallelFor gives wrong result"); + Rcout << "nested parallelFor gives wrong result" << std::endl; } } // [[Rcpp::export]] -void testThreadPoolSingleThreaded() +void +testThreadPoolSingleThreaded() { ThreadPool pool(0); std::vector x(1000000, 1); - auto dummy = [&] (size_t i) -> void { + auto dummy = [&](size_t i) -> void { checkUserInterrupt(); x[i] = 2 * x[i]; }; - for (size_t i = 0; i < x.size() / 2; i++) + for (size_t i = 0; i < x.size(); i++) pool.push(dummy, i); pool.wait(); size_t count_wrong = 0; - for (size_t i = 0; i < x.size() / 2; i++) + for (size_t i = 0; i < x.size(); i++) count_wrong += (x[i] != 2); - for (size_t i = x.size() / 2 + 1; i < x.size(); i++) - count_wrong += (x[i] != 1); if (count_wrong > 0) - throw std::runtime_error("push gives wrong result"); + Rcout << "push gives wrong result" << std::endl; pool.join(); } // [[Rcpp::export]] -void testThreadPoolDestructWOJoin() +void +testThreadPoolDestructWOJoin() { - ThreadPool pool(2); + ThreadPool pool; } - // [[Rcpp::export]] -void testParallelFor() +void +testParallelFor() { std::vector x(1000000, 1); - auto dummy = [&] (size_t i) -> void { + auto dummy = [&](size_t i) -> void { checkUserInterrupt(); x[i] = 2 * x[i]; }; - parallelFor(0, x.size() / 2, dummy, 2); - parallelFor(0, x.size() / 2, dummy, 0); + parallelFor(0, x.size(), dummy, 2); + parallelFor(0, x.size(), dummy, 0); size_t count_wrong = 0; - for (size_t i = 0; i < x.size() / 2; i++) + for (size_t i = 0; i < x.size(); i++) count_wrong += (x[i] != 4); - for (size_t i = x.size() / 2 + 1; i < x.size(); i++) - count_wrong += (x[i] != 1); if (count_wrong > 0) - throw std::runtime_error("parallelFor gives wrong result"); + Rcout << "parallelFor gives wrong result" << std::endl; } // [[Rcpp::export]] -void testNestedParallelFor() +void +testNestedParallelFor() { std::vector> x(1); - for (auto &xx : x) + for (auto& xx : x) xx = std::vector(1, 1.0); - parallelFor(0, x.size(), [&x] (int i) { - parallelFor(0, x[i].size(), [&x, i] (int j) { - x[i][j] *= 2; - }, 1); - }, 1); + parallelFor( + 0, + x.size(), + [&x](int i) { + parallelFor( + 0, x[i].size(), [&x, i](int j) { x[i][j] *= 2; }, 1); + }, + 1); size_t count_wrong = 0; for (auto xx : x) { @@ -306,51 +297,48 @@ void testNestedParallelFor() count_wrong += xxx != 2; } if (count_wrong > 0) { - for (auto xx : x) { - for (auto xxx : xx) - std::cout << xxx; - std::cout << std::endl; - } - throw std::runtime_error("nested parallelFor gives wrong result"); + Rcout << "nested parallelFor gives wrong result" << std::endl; } } // [[Rcpp::export]] -void testParallelForEach() +void +testParallelForEach() { std::vector x(1000000, 1); - auto dummy = [&] (size_t i) -> void { + auto dummy = [&](size_t i) -> void { checkUserInterrupt(); x[i] = 2 * x[i]; }; - auto ids = std::vector(x.size() / 2); + auto ids = std::vector(x.size()); for (size_t i = 0; i < ids.size(); i++) ids[i] = i; parallelForEach(ids, dummy, 2); parallelForEach(ids, dummy, 0); size_t count_wrong = 0; - for (size_t i = 0; i < x.size() / 2; i++) + for (size_t i = 0; i < x.size(); i++) count_wrong += (x[i] != 4); - for (size_t i = x.size() / 2 + 1; i < x.size(); i++) - count_wrong += (x[i] != 1); if (count_wrong > 0) - throw std::runtime_error("forEach gives wrong result"); + Rcout << "forEach gives wrong result" << std::endl; } // [[Rcpp::export]] -void testNestedParallelForEach() +void +testNestedParallelForEach() { std::vector> x(1); - for (auto &xx : x) + for (auto& xx : x) xx = std::vector(1, 1.0); - parallelForEach(x, [&] (std::vector& xx) { - parallelForEach(xx, [&] (double& xxx) { - xxx *= 2; - }, 1); - }, 1); + parallelForEach( + x, + [&](std::vector& xx) { + parallelForEach( + xx, [&](double& xxx) { xxx *= 2; }, 1); + }, + 1); size_t count_wrong = 0; for (auto xx : x) { @@ -358,8 +346,7 @@ void testNestedParallelForEach() count_wrong += xxx != 2; } if (count_wrong > 0) - throw std::runtime_error("nested parallelForEach gives wrong result"); - + Rcout << "nested parallelForEach gives wrong result" << std::endl; } // // [[Rcpp::export]] @@ -373,53 +360,55 @@ void testNestedParallelForEach() // t.join(); // std::this_thread::sleep_for(std::chrono::milliseconds(5000)); // } -// -// // [[Rcpp::export]] -// void testThreadPoolInterruptJoin() -// { -// ThreadPool pool(2); -// auto dummy = [] { -// std::this_thread::sleep_for(std::chrono::milliseconds(100)); -// checkUserInterrupt(); -// }; -// for (size_t i = 0; i < 20; i++) -// pool.push(dummy); -// pool.join(); -// std::this_thread::sleep_for(std::chrono::milliseconds(5000)); -// } -// -// // [[Rcpp::export]] -// void testThreadPoolInterruptWait() -// { -// ThreadPool pool(2); -// auto dummy = [] { -// std::this_thread::sleep_for(std::chrono::milliseconds(100)); -// checkUserInterrupt(); -// }; -// for (size_t i = 0; i < 20; i++) { -// pool.push(dummy); -// } -// pool.wait(); -// std::this_thread::sleep_for(std::chrono::milliseconds(3000)); -// } +// [[Rcpp::export]] +void testThreadPoolInterruptJoin() +{ + ThreadPool pool; + auto dummy = [] { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + checkUserInterrupt(); + }; + for (size_t i = 0; i < 20; i++) + pool.push(dummy); + pool.join(); + std::this_thread::sleep_for(std::chrono::milliseconds(5000)); +} + +// [[Rcpp::export]] +void testThreadPoolInterruptWait() +{ + ThreadPool pool; + auto dummy = [] { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + checkUserInterrupt(); + }; + for (size_t i = 0; i < 20; i++) { + pool.push(dummy); + } + pool.wait(); + std::this_thread::sleep_for(std::chrono::milliseconds(3000)); +} // [[Rcpp::export]] -void testProgressCounter() +void +testProgressCounter() { + // 20 iterations in loop, update progress every 1 sec RcppThread::ProgressCounter cntr(20, 1); - RcppThread::parallelFor(0, 20, [&] (int i) { + RcppThread::parallelFor(0, 20, [&](int i) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); cntr++; }); } // [[Rcpp::export]] -void testProgressBar() +void +testProgressBar() { // 20 iterations in loop, update progress every 1 sec RcppThread::ProgressBar bar(20, 1); - RcppThread::parallelFor(0, 20, [&] (int i) { + RcppThread::parallelFor(0, 20, [&](int i) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); ++bar; }); diff --git a/tests/testthat/tests.R b/tests/testthat/tests.R index e82b87a..8fc8f2a 100644 --- a/tests/testthat/tests.R +++ b/tests/testthat/tests.R @@ -2,84 +2,93 @@ context("Compile test functions") Rcpp::sourceCpp(file = normalizePath("../tests.cpp")) -## ------------------------------------------------------- -context("R-monitor") -test_that("R-monitor works", { - expect_output(testMonitor(), "RcppThread says hi!", fixed = TRUE) -}) +runs <- 10 +for (run in seq_len(runs)) { + context(paste0("---------------------------- run ", run, "/", runs)) + test_that("start", expect_true(TRUE)) + ## ------------------------------------------------------- + context("R-monitor") + test_that("R-monitor works", { + expect_output(testMonitor(), "RcppThread says hi!", fixed = TRUE) + }) -## ------------------------------------------------------- -context("Thread class") -test_that("Thread class works", { - expect_output(testThreadClass()) -}) + ## ------------------------------------------------------- + context("Thread class") + test_that("Thread class works", { + expect_output(testThreadClass()) + }) -## ------------------------------------------------------- -context("Thread pool") -test_that("push works", { - expect_silent(testThreadPoolPush()) -}) -test_that("pushReturn works", { - expect_silent(testThreadPoolPushReturn()) -}) + ## ------------------------------------------------------- + context("Thread pool") -test_that("map works", { - expect_silent(testThreadPoolMap()) -}) + test_that("push works", { + testThreadPoolPush() + expect_silent(testThreadPoolPush()) + }) -test_that("parallelFor works", { - expect_silent(testThreadPoolParallelFor()) -}) + test_that("pushReturn works", { + expect_silent(testThreadPoolPushReturn()) + }) -test_that("nested parallelFor works", { - expect_silent(testThreadPoolNestedParallelFor()) -}) + test_that("map works", { + expect_silent(testThreadPoolMap()) + }) -test_that("parallelForEach works", { - expect_silent(testThreadPoolParallelForEach()) -}) + test_that("parallelFor works", { + expect_silent(testThreadPoolParallelFor()) + }) -test_that("nested parallelForEach works", { - expect_silent(testThreadPoolNestedParallelForEach()) -}) + test_that("nested parallelFor works", { + expect_silent(testThreadPoolNestedParallelFor()) + }) -test_that("works single threaded", { - expect_silent(testThreadPoolSingleThreaded()) -}) + test_that("parallelForEach works", { + expect_silent(testThreadPoolParallelForEach()) + }) -test_that("destructible without join", { - expect_silent(testThreadPoolDestructWOJoin()) -}) + test_that("nested parallelForEach works", { + expect_silent(testThreadPoolNestedParallelForEach()) + }) + test_that("works single threaded", { + expect_silent(testThreadPoolSingleThreaded()) + }) -## ------------------------------------------------------- -context("Parallel for functions") -test_that("parallelFor works", { - expect_silent(testParallelFor()) -}) + test_that("destructible without join", { + expect_silent(testThreadPoolDestructWOJoin()) + }) + + + ## ------------------------------------------------------- + context("Parallel for functions") + + test_that("parallelFor works", { + expect_silent(testParallelFor()) + }) -test_that("nested parallelFor works", { - expect_silent(testNestedParallelFor()) -}) + test_that("nested parallelFor works", { + expect_silent(testNestedParallelFor()) + }) -test_that("parallelForEach works", { - expect_silent(testParallelForEach()) -}) + test_that("parallelForEach works", { + expect_silent(testParallelForEach()) + }) -test_that("nested parallelForEach works", { - expect_silent(testNestedParallelForEach()) -}) + test_that("nested parallelForEach works", { + expect_silent(testNestedParallelForEach()) + }) + # ------------------------------------------------------ + context("Progress tracking") + test_that("ProgressCounter works", { + expect_output(testProgressCounter(), "100% \\(done\\)") + }) -## ------------------------------------------------------ -context("Progress tracking") -test_that("ProgressCounter works", { - expect_output(testProgressCounter(), "100% \\(done\\)") -}) + test_that("ProgressBar works", { + expect_output(testProgressBar(), "100% \\(done\\)") + }) -test_that("ProgressBar works", { - expect_output(testProgressBar(),"100% \\(done\\)") -}) +}