Skip to content

use tnagler/tpool as work stealing backend #51

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Nov 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .Rbuildignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ revdep
^\.github$
^CRAN-RELEASE$
.vscode/
new-benchmarks.R
new-benchmarks.R
bench*
2 changes: 2 additions & 0 deletions .github/workflows/R-CMD-check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ jobs:
_R_CHECK_CRAN_INCOMING_: false
run: rcmdcheck::rcmdcheck(args = c("--no-manual", "--as-cran"), error_on = "warning", check_dir = "check")
shell: Rscript {0}
# run: cd .. && R CMD build RcppThread && R CMD check RcppThread_1.1.0.tar.gz
# shell: bash

- name: Show testthat output
if: always()
Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ inst/doc
revdep
__pycache__
.vscode/
new-benchmarks.R
new-benchmarks.R
bench*
2 changes: 1 addition & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* Add classes `ProgressCounter` and `ProgressBar` for tracking progress in long-
running loops (#49).

* Increased speed for short running tasks due to lock-free queue (#50).
* Increased speed due to lock-free queue (#51).


# RcppThread 1.0.0
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ or the [API documentation](https://tnagler.github.io/RcppThread/).

Since then, the following features have been added:

- Faster runtimes thanks to a work stealing queue with lock-free pops (from [tpool](https://github.com/tnagler/tpool)).

- An R function `RcppThread::detectCores()` to determine the number of (logical)
cores on your machine.

Expand Down
2 changes: 1 addition & 1 deletion inst/include/RcppThread/Progress.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class ProgressBar : public ProgressPrinter {
std::string makeBar(size_t pct, size_t numBars = 40) {
std::ostringstream msg;
msg << "[";
int i = 0;
size_t i = 0;
for (; i < pct / 100.0 * numBars; i++)
msg << "=";
for (; i < numBars; i++)
Expand Down
200 changes: 40 additions & 160 deletions inst/include/RcppThread/ThreadPool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#include "RcppThread/Batch.hpp"
#include "RcppThread/RMonitor.hpp"
#include "RcppThread/Rcout.hpp"
#include "moodyCamel/blockingconcurrentqueue.h"
#include "RcppThread/tpool.hpp"

#include <atomic>
#include <condition_variable>
Expand All @@ -22,8 +22,6 @@

namespace RcppThread {

using RcppThreadJob = std::function<void()>;

//! Implemenation of the thread pool pattern based on `Thread`.
class ThreadPool
{
Expand Down Expand Up @@ -61,30 +59,13 @@ class ThreadPool
void clear();

private:
void startWorker();
void doJob(RcppThreadJob&& job);
void waitForJobs(moodycamel::ConsumerToken& tk);
void processJobs(moodycamel::ConsumerToken& tk);
void announceStop();
void joinWorkers();
void execute(std::function<void()>& task);

bool hasErrored();
bool allJobsDone();
void waitForEvents();
void rethrowExceptions();

moodycamel::BlockingConcurrentQueue<RcppThreadJob> jobs_;

// variables for synchronization between workers
size_t nWorkers_;
// variables for synchronization between workers (destructed last)
tpool::detail::TaskManager taskManager_;
tpool::TodoList todoList_;
std::vector<std::thread> workers_;

alignas(64) std::atomic_size_t numJobs_{ 0 };
std::mutex mDone_;
std::condition_variable cvDone_;

alignas(64) std::atomic_bool stopped_{ false };
std::exception_ptr errorPtr_{ nullptr };
};

//! constructs a thread pool with as many workers as there are cores.
Expand All @@ -96,20 +77,31 @@ inline ThreadPool::ThreadPool()
//! @param nWorkers number of worker threads to create; if `nWorkers = 0`, all
//! work pushed to the pool will be done in the main thread.
inline ThreadPool::ThreadPool(size_t nWorkers)
: nWorkers_(nWorkers)
: taskManager_{ nWorkers }
{
for (size_t w = 0; w != nWorkers_; w++)
this->startWorker();
for (size_t id = 0; id < nWorkers; id++) {
workers_.emplace_back([this, id] {
std::function<void()> task;
while (!taskManager_.stopped()) {
taskManager_.wait_for_jobs(id);
do {
// use inner while to save a few cash misses calling done()
while (taskManager_.try_pop(task, id))
execute(task);
} while (!todoList_.empty());
}
});
}
}

//! destructor joins all threads if possible.
inline ThreadPool::~ThreadPool() noexcept
{
// destructors should never throw
try {
this->announceStop();
taskManager_.stop();
this->joinWorkers();
} catch (...) {
// destructors should never throw
}
}

Expand All @@ -123,14 +115,12 @@ template<class F, class... Args>
void
ThreadPool::push(F&& f, Args&&... args)
{
if (nWorkers_ == 0) {
if (workers_.size() == 0) {
f(args...); // if there are no workers, do the job in the main thread
} else {
if (stopped_.load(std::memory_order_relaxed))
throw std::runtime_error("cannot push to joined thread pool");
numJobs_.fetch_add(1, std::memory_order_release);
auto job = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
jobs_.enqueue(job);
todoList_.add();
taskManager_.push(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
}
}

Expand All @@ -145,11 +135,11 @@ auto
ThreadPool::pushReturn(F&& f, Args&&... args)
-> std::future<decltype(f(args...))>
{
using jobPackage = std::packaged_task<decltype(f(args...))()>;
auto job = std::make_shared<jobPackage>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
this->push([job] { (*job)(); });
return job->get_future();
auto task = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
using pack_t = std::packaged_task<decltype(f(args...))()>;
auto ptr = std::make_shared<pack_t>(std::move(task));
this->push([ptr] { (*ptr)(); });
return ptr->get_future();
}

//! maps a function on a list of items, possibly running tasks in parallel.
Expand Down Expand Up @@ -199,7 +189,7 @@ ThreadPool::parallelFor(ptrdiff_t begin, size_t size, F&& f, size_t nBatches)
for (ptrdiff_t i = b.begin; i < b.end; i++)
f(i);
};
auto batches = createBatches(begin, size, nWorkers_, nBatches);
auto batches = createBatches(begin, size, workers_.size(), nBatches);
auto pushJob = [=] {
for (const auto& batch : batches)
this->push(doBatch, batch);
Expand Down Expand Up @@ -241,112 +231,38 @@ ThreadPool::parallelForEach(I& items, F&& f, size_t nBatches)
inline void
ThreadPool::wait()
{
while (numJobs_.load(std::memory_order_acquire) != 0) {
waitForEvents(); // non-spinning wait for mDone_
if (this->hasErrored()) {
this->announceStop(); // stop thread pool
continue; // wait for currently running jobs
}
if (this->hasErrored() | isInterrupted())
break;
while (!todoList_.empty()) {
todoList_.wait(50);
Rcout << "";
std::this_thread::yield();
checkUserInterrupt();
}

Rcout << "";
this->rethrowExceptions();
}

//! waits for all jobs to finish and joins all threads.
inline void
ThreadPool::join()
{
this->wait();
this->announceStop();
taskManager_.stop();
this->joinWorkers();
}

//! clears the pool from all open jobs.
inline void
ThreadPool::clear()
{
RcppThreadJob job;
while (numJobs_.load(std::memory_order_acquire) != 0) {
numJobs_.fetch_sub(jobs_.try_dequeue(job), std::memory_order_release);
}
}

//! spawns a worker thread waiting for jobs to arrive.
inline void
ThreadPool::startWorker()
{
workers_.emplace_back([this] {
thread_local moodycamel::ConsumerToken tk(jobs_);
while (!stopped_.load(std::memory_order_relaxed)) {
this->waitForJobs(tk);
this->processJobs(tk);
// if all jobs are done, notify potentially waiting threads
if (!numJobs_.load(std::memory_order_acquire))
cvDone_.notify_all();
}
});
}

//! blocking wait for elements in the queue; also processes first job.
inline void
ThreadPool::waitForJobs(moodycamel::ConsumerToken& tk)
{
RcppThreadJob job;
jobs_.wait_dequeue(tk, job);
// popped job needs to be done here
if (!stopped_.load(std::memory_order_relaxed))
this->doJob(std::move(job));
}

//! process jobs until none are left.
inline void
ThreadPool::processJobs(moodycamel::ConsumerToken& tk)
{
while (numJobs_.load(std::memory_order_acquire) != 0) {
// inner loop avoids acquire read above in hot path
while (true) {
if (stopped_.load(std::memory_order_relaxed))
return;
RcppThreadJob job;
if (jobs_.try_dequeue(tk, job))
this->doJob(std::move(job));
else
break;
}
}
taskManager_.clear();
}

//! executes a job safely and decrements the job count.
//! @param job job to be exectued.
inline void
ThreadPool::doJob(RcppThreadJob&& job)
ThreadPool::execute(std::function<void()>& task)
{
try {
job();
numJobs_.fetch_sub(1, std::memory_order_release);
task();
todoList_.cross();
} catch (...) {
{
std::lock_guard<std::mutex> lk(mDone_);
errorPtr_ = std::current_exception();
}
cvDone_.notify_all();
}
}

//! signals threads that no more new work is coming.
inline void
ThreadPool::announceStop()
{
stopped_ = true;
// push empty jobs to wake up waiting workers
for (size_t i = 0; i < nWorkers_; i++) {
numJobs_.fetch_add(1, std::memory_order_release);
jobs_.enqueue([] {});
todoList_.clear(std::current_exception());
}
}

Expand All @@ -360,40 +276,4 @@ ThreadPool::joinWorkers()
}
}

//! checks if an error occured.
inline bool
ThreadPool::hasErrored()
{
return static_cast<bool>(errorPtr_);
}

//! check whether all jobs are done
inline bool
ThreadPool::allJobsDone()
{
// acquire might prevent an unncessary loop
return numJobs_.load(std::memory_order_acquire) == 0;
}

//! checks whether wait() needs to wake up
inline void
ThreadPool::waitForEvents()
{
static auto timeout = std::chrono::milliseconds(50);
auto isWakeUpEvent = [this] {
return this->allJobsDone() | this->hasErrored();
};
std::unique_lock<std::mutex> lk(mDone_);
cvDone_.wait_for(lk, timeout, isWakeUpEvent);
}

//! rethrows exceptions (exceptions from workers are caught and stored; the
//! wait loop only checks, but does not throw for interruptions)
inline void
ThreadPool::rethrowExceptions()
{
checkUserInterrupt();
if (errorPtr_)
std::rethrow_exception(errorPtr_);
}
}
Loading