Skip to content

refactor #52

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 103 commits into from
Nov 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
103 commits
Select commit Hold shift + click to select a range
c2ddb73
try locks on pop
Nov 14, 2021
ab78b82
new tpool
Nov 14, 2021
cf4ad82
run less tests
Nov 14, 2021
05599f0
fix compiler warnings
Nov 14, 2021
c8817ad
test runs :/
Nov 14, 2021
4e5a546
compiler warning
Nov 14, 2021
b83a8bd
just pushing
Nov 14, 2021
adf288c
try own check
Nov 14, 2021
9eaa372
tryfix
Nov 14, 2021
0da2427
meh
Nov 14, 2021
f1aea65
only thread
Nov 14, 2021
9f99b6a
push10
Nov 14, 2021
400d340
couts
Nov 14, 2021
d8d44ab
stupid
Nov 14, 2021
bb07ea2
more
Nov 14, 2021
5a24b13
enable push
Nov 14, 2021
bd8ea5c
dumdum
Nov 14, 2021
60f8416
now
Nov 14, 2021
557aebb
more
Nov 14, 2021
c15f23a
more
Nov 14, 2021
daac241
nowait
Nov 14, 2021
5b45fa4
no push lock
Nov 14, 2021
cd1e96f
waitk
Nov 14, 2021
97d16ae
msgs
Nov 20, 2021
702a810
stopped check
Nov 20, 2021
f646b96
cleanup
Nov 20, 2021
628cea3
other tests
Nov 20, 2021
aa68a1d
thread localized steals
Nov 20, 2021
057db7e
Rcout on wait
Nov 20, 2021
f1cf42e
no Rcout
Nov 20, 2021
d77cdad
progress
Nov 20, 2021
663fc8a
no pushreturn
Nov 20, 2021
9422cd4
mtx name
Nov 20, 2021
db7ee60
no pushreturn
Nov 20, 2021
3d96b74
wait on destruct
Nov 20, 2021
502f080
more progress
Nov 20, 2021
e34f225
more msgs
Nov 20, 2021
eb10245
time stamps
Nov 20, 2021
f780de4
fix
Nov 20, 2021
c06af80
no pushreturn
Nov 20, 2021
934faf1
away
Nov 20, 2021
719e130
wtf
Nov 20, 2021
f2f80fb
run nothing
Nov 20, 2021
bd3e87d
detach on destruct
Nov 20, 2021
7bb8482
fix
Nov 20, 2021
8d43acd
destruct print
Nov 20, 2021
7a7f6de
more
Nov 20, 2021
42ecefe
no print on wait
Nov 20, 2021
6245f19
time on destruct
Nov 20, 2021
7f7943c
delete print
Nov 20, 2021
82d9fcd
more
Nov 20, 2021
3d1474b
actually use multiple queues
Nov 20, 2021
25fd224
destruction order
Nov 20, 2021
a3a7187
more order
Nov 20, 2021
c86b851
time print
Nov 20, 2021
8a82b77
finish print
Nov 20, 2021
975bfda
fix
Nov 20, 2021
f59e3d2
no globals
Nov 20, 2021
dda6b3f
print too
Nov 20, 2021
b96e30b
clear on stop
Nov 21, 2021
0391bc9
fix
Nov 21, 2021
4251aaa
parfor
Nov 21, 2021
2b35ccc
don't wait on join
Nov 21, 2021
3be8f91
after cv
Nov 21, 2021
38424fb
before cv
Nov 21, 2021
84b5190
ptr to taskmanager
Nov 21, 2021
8b425e4
don't stop on mngr dtor
Nov 21, 2021
f6e2655
don't clear either
Nov 21, 2021
e72779e
reorder
Nov 21, 2021
fea63d3
manually destroy cv
Nov 21, 2021
b1176cb
wrap in pointer
Nov 21, 2021
761556b
learned: windows hangs when destructing static cv
Nov 21, 2021
762b0bc
no global instance in parallel for
Nov 21, 2021
d4d9033
5 test runs by default
Nov 21, 2021
4ba99a0
with print
Nov 21, 2021
eff6866
fix nBatches docs
Nov 21, 2021
1f911cc
get future before push
Nov 21, 2021
181de82
get after join
Nov 21, 2021
be81eea
copy function on push
Nov 21, 2021
6fb994e
check future result
Nov 21, 2021
8e02d1b
leftover
Nov 21, 2021
bcbd29c
prints
Nov 21, 2021
79a53ed
run 5x
Nov 21, 2021
0aade9a
get future?
Nov 21, 2021
9e410fa
get first
Nov 21, 2021
85dd007
no print
Nov 21, 2021
035df2d
run 10
Nov 21, 2021
9a60e43
other OSs
Nov 21, 2021
7fd5c23
fix2
Nov 21, 2021
36946e0
bench pdfs on gitignore
Nov 21, 2021
ac736d3
clean up prints
Nov 21, 2021
81fb243
other sync
Nov 22, 2021
58e48cb
some cleanup
Nov 22, 2021
1fdd37a
finish line -> todo list
Nov 22, 2021
05535a4
more clean up
Nov 22, 2021
7d51b4a
more clean up
Nov 22, 2021
e5f8361
even more
Nov 22, 2021
cbc10d7
update NEWS
Nov 22, 2021
2c4f9ab
more clean up
Nov 22, 2021
17716db
even more
Nov 22, 2021
2c97bb9
clean up unit tests
Nov 22, 2021
88dee98
pull tpool
Nov 22, 2021
bb47a20
restrict to 10 runs
Nov 22, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .Rbuildignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ revdep
^\.github$
^CRAN-RELEASE$
.vscode/
new-benchmarks.R
new-benchmarks.R
bench*
2 changes: 2 additions & 0 deletions .github/workflows/R-CMD-check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ jobs:
_R_CHECK_CRAN_INCOMING_: false
run: rcmdcheck::rcmdcheck(args = c("--no-manual", "--as-cran"), error_on = "warning", check_dir = "check")
shell: Rscript {0}
# run: cd .. && R CMD build RcppThread && R CMD check RcppThread_1.1.0.tar.gz
# shell: bash

- name: Show testthat output
if: always()
Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ inst/doc
revdep
__pycache__
.vscode/
new-benchmarks.R
new-benchmarks.R
bench*
2 changes: 1 addition & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* Add classes `ProgressCounter` and `ProgressBar` for tracking progress in long-
running loops (#49).

* Increased speed for short running tasks due to lock-free queue (#50, #51).
* Increased speed due to lock-free queue (#51).


# RcppThread 1.0.0
Expand Down
2 changes: 1 addition & 1 deletion inst/include/RcppThread/Progress.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class ProgressBar : public ProgressPrinter {
std::string makeBar(size_t pct, size_t numBars = 40) {
std::ostringstream msg;
msg << "[";
int i = 0;
size_t i = 0;
for (; i < pct / 100.0 * numBars; i++)
msg << "=";
for (; i < numBars; i++)
Expand Down
96 changes: 35 additions & 61 deletions inst/include/RcppThread/ThreadPool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,6 @@

namespace RcppThread {

namespace util {
void
waitAndSync(tpool::FinishLine& finishLine)
{
finishLine.wait_for(std::chrono::milliseconds(50));
Rcout << "";
checkUserInterrupt();
}
}

//! Implemenation of the thread pool pattern based on `Thread`.
class ThreadPool
{
Expand All @@ -46,13 +36,6 @@ class ThreadPool
ThreadPool& operator=(const ThreadPool&) = delete;
ThreadPool& operator=(ThreadPool&& other) = delete;

//! @brief returns a reference to the global thread pool instance.
static ThreadPool& globalInstance()
{
static ThreadPool instance_;
return instance_;
}

template<class F, class... Args>
void push(F&& f, Args&&... args);

Expand All @@ -76,21 +59,13 @@ class ThreadPool
void clear();

private:
void startWorker();
void joinWorkers();
void execute(std::function<void()>& task);

template<class Task>
void executeSafely(Task& task);

bool allJobsDone();
void waitForEvents();
void rethrowExceptions();

std::vector<std::thread> workers_;
size_t nWorkers_;
// variables for synchronization between workers
// variables for synchronization between workers (destructed last)
tpool::detail::TaskManager taskManager_;
tpool::FinishLine finishLine_{ 0 };
tpool::TodoList todoList_;
std::vector<std::thread> workers_;
};

//! constructs a thread pool with as many workers as there are cores.
Expand All @@ -102,10 +77,21 @@ inline ThreadPool::ThreadPool()
//! @param nWorkers number of worker threads to create; if `nWorkers = 0`, all
//! work pushed to the pool will be done in the main thread.
inline ThreadPool::ThreadPool(size_t nWorkers)
: nWorkers_(nWorkers)
: taskManager_{ nWorkers }
{
for (size_t w = 0; w < nWorkers_; w++)
this->startWorker();
for (size_t id = 0; id < nWorkers; id++) {
workers_.emplace_back([this, id] {
std::function<void()> task;
while (!taskManager_.stopped()) {
taskManager_.wait_for_jobs(id);
do {
// use inner while to save a few cash misses calling done()
while (taskManager_.try_pop(task, id))
execute(task);
} while (!todoList_.empty());
}
});
}
}

//! destructor joins all threads if possible.
Expand All @@ -129,9 +115,10 @@ template<class F, class... Args>
void
ThreadPool::push(F&& f, Args&&... args)
{
if (nWorkers_ == 0) {
if (workers_.size() == 0) {
f(args...); // if there are no workers, do the job in the main thread
} else {
todoList_.add();
taskManager_.push(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
}
Expand All @@ -148,11 +135,11 @@ auto
ThreadPool::pushReturn(F&& f, Args&&... args)
-> std::future<decltype(f(args...))>
{
using task = std::packaged_task<decltype(f(args...))()>;
auto pack = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
auto taskPtr = std::make_shared<task>(std::move(pack));
taskManager_.push([taskPtr] { (*taskPtr)(); });
return taskPtr->get_future();
auto task = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
using pack_t = std::packaged_task<decltype(f(args...))()>;
auto ptr = std::make_shared<pack_t>(std::move(task));
this->push([ptr] { (*ptr)(); });
return ptr->get_future();
}

//! maps a function on a list of items, possibly running tasks in parallel.
Expand Down Expand Up @@ -202,7 +189,7 @@ ThreadPool::parallelFor(ptrdiff_t begin, size_t size, F&& f, size_t nBatches)
for (ptrdiff_t i = b.begin; i < b.end; i++)
f(i);
};
auto batches = createBatches(begin, size, nWorkers_, nBatches);
auto batches = createBatches(begin, size, workers_.size(), nBatches);
auto pushJob = [=] {
for (const auto& batch : batches)
this->push(doBatch, batch);
Expand Down Expand Up @@ -244,8 +231,12 @@ ThreadPool::parallelForEach(I& items, F&& f, size_t nBatches)
inline void
ThreadPool::wait()
{
while (!taskManager_.empty())
util::waitAndSync(finishLine_);
while (!todoList_.empty()) {
todoList_.wait(50);
Rcout << "";
checkUserInterrupt();
}
Rcout << "";
}

//! waits for all jobs to finish and joins all threads.
Expand All @@ -264,31 +255,14 @@ ThreadPool::clear()
taskManager_.clear();
}

//! spawns a worker thread waiting for jobs to arrive.
inline void
ThreadPool::startWorker()
{
workers_.emplace_back([this] {
std::function<void()> task;
while (!taskManager_.stopped()) {
taskManager_.wait_for_jobs();

finishLine_.start();
while (taskManager_.try_pop(task))
executeSafely(task);
finishLine_.cross();
}
});
}

template<class Task>
inline void
ThreadPool::executeSafely(Task& task)
ThreadPool::execute(std::function<void()>& task)
{
try {
task();
todoList_.cross();
} catch (...) {
finishLine_.abort(std::current_exception());
todoList_.clear(std::current_exception());
}
}

Expand Down
40 changes: 19 additions & 21 deletions inst/include/RcppThread/parallelFor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ namespace RcppThread {
//! @param begin first index of the loop.
//! @param end the loop runs in the range `[begin, end)`.
//! @param f a function (the 'loop body').
//! @param nThreads deprecated; loop is run on global thread pool.
//! @param nThreads the number of threads to use; the default uses the number
//! of cores in the machine; if `nThreads = 0`, all work will be done in the
//! main thread.
//! @param nBatches the number of batches to create; the default (0)
//! triggers a heuristic to automatically determine the number of batches.
//! @details Consider the following code:
Expand All @@ -31,8 +33,9 @@ namespace RcppThread {
//! x[i] = i;
//! });
//! ```
//! The function dispatches to a global thread pool, so it can safely be nested
//! or called multiple times with almost no overhead.
//! The function sets up a `ThreadPool` object to do the scheduling. If you
//! want to run multiple parallel for loops, consider creating a `ThreadPool`
//! yourself and using `ThreadPool::parallelFor()`.
//!
//! **Caution**: if the iterations are not independent from another,
//! the tasks need to be synchronized manually (e.g., using mutexes).
Expand All @@ -49,24 +52,18 @@ parallelFor(int begin,
if (end == begin)
return;

nThreads = std::thread::hardware_concurrency();
auto batches = createBatches(begin, end - begin, nThreads, nBatches);
tpool::FinishLine finishLine{ batches.size() };
auto doBatch = [f, &finishLine](const Batch& b) {
for (ptrdiff_t i = b.begin; i < b.end; i++)
f(i);
finishLine.cross();
};
for (const auto& batch : batches)
ThreadPool::globalInstance().push(doBatch, batch);
util::waitAndSync(finishLine);
ThreadPool pool(nThreads);
pool.parallelFor(begin, end, std::forward<F>(f), nBatches);
pool.join();
}

//! computes a range-based for loop in parallel batches.
//! @param items an object allowing for `items.size()` and whose elements
//! are accessed by the `[]` operator.
//! @param f a function (the 'loop body').
//! @param nThreads deprecated; loop is run on global thread pool.
//! @param nThreads the number of threads to use; the default uses the number
//! of cores in the machine; if `nThreads = 0`, all work will be done in the
//! main thread.
//! @param nBatches the number of batches to create; the default (0)
//! triggers a heuristic to automatically determine the number of batches.
//! @details Consider the following code:
Expand All @@ -82,8 +79,9 @@ parallelFor(int begin,
//! xx *= 2;
//! });
//! ```
//! The function dispatches to a global thread pool, so it can safely be nested
//! or called multiple times with almost no overhead.
//! The function sets up a `ThreadPool` object to do the scheduling. If you
//! want to run multiple parallel for loops, consider creating a `ThreadPool`
//! yourself and using `ThreadPool::parallelForEach()`.
//!
//! **Caution**: if the iterations are not independent from another,
//! the tasks need to be synchronized manually (e.g., using mutexes).
Expand All @@ -95,9 +93,9 @@ parallelForEach(I& items,
size_t nBatches = 0)
{
// loop ranges ranges indicate iterator offset
const auto begin_it = std::begin(items);
const auto end_it = std::end(items);
auto size = std::distance(begin_it, end_it);
parallelFor(0, size, [f, &items, &begin_it](int i) { f(*(begin_it + i)); });
ThreadPool pool(nThreads);
pool.parallelForEach(items, std::forward<F>(f), nBatches);
pool.join();
}

}
Loading