16b3f41edSDimitry Andric //===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===//
26b3f41edSDimitry Andric //
3e6d15924SDimitry Andric // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4e6d15924SDimitry Andric // See https://llvm.org/LICENSE.txt for license information.
5e6d15924SDimitry Andric // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
66b3f41edSDimitry Andric //
76b3f41edSDimitry Andric //===----------------------------------------------------------------------===//
86b3f41edSDimitry Andric
96b3f41edSDimitry Andric #include "llvm/Support/Parallel.h"
106b3f41edSDimitry Andric #include "llvm/Config/llvm-config.h"
11706b4fc4SDimitry Andric #include "llvm/Support/ManagedStatic.h"
12044eb2f6SDimitry Andric #include "llvm/Support/Threading.h"
136b3f41edSDimitry Andric
146b3f41edSDimitry Andric #include <atomic>
15706b4fc4SDimitry Andric #include <future>
166b3f41edSDimitry Andric #include <thread>
17706b4fc4SDimitry Andric #include <vector>
186b3f41edSDimitry Andric
19cfca06d7SDimitry Andric llvm::ThreadPoolStrategy llvm::parallel::strategy;
20cfca06d7SDimitry Andric
21e6d15924SDimitry Andric namespace llvm {
22e6d15924SDimitry Andric namespace parallel {
23e3b55780SDimitry Andric #if LLVM_ENABLE_THREADS
24e3b55780SDimitry Andric
25e3b55780SDimitry Andric #ifdef _WIN32
267fa27ce4SDimitry Andric static thread_local unsigned threadIndex = UINT_MAX;
27e3b55780SDimitry Andric
getThreadIndex()287fa27ce4SDimitry Andric unsigned getThreadIndex() { GET_THREAD_INDEX_IMPL; }
29e3b55780SDimitry Andric #else
307fa27ce4SDimitry Andric thread_local unsigned threadIndex = UINT_MAX;
31e3b55780SDimitry Andric #endif
32e3b55780SDimitry Andric
33e6d15924SDimitry Andric namespace detail {
346b3f41edSDimitry Andric
356b3f41edSDimitry Andric namespace {
366b3f41edSDimitry Andric
37eb11fae6SDimitry Andric /// An abstract class that takes closures and runs them asynchronously.
386b3f41edSDimitry Andric class Executor {
396b3f41edSDimitry Andric public:
406b3f41edSDimitry Andric virtual ~Executor() = default;
4154521a2fSDimitry Andric virtual void add(std::function<void()> func) = 0;
427fa27ce4SDimitry Andric virtual size_t getThreadCount() const = 0;
436b3f41edSDimitry Andric
446b3f41edSDimitry Andric static Executor *getDefaultExecutor();
456b3f41edSDimitry Andric };
466b3f41edSDimitry Andric
47eb11fae6SDimitry Andric /// An implementation of an Executor that runs closures on a thread pool
486b3f41edSDimitry Andric /// in filo order.
496b3f41edSDimitry Andric class ThreadPoolExecutor : public Executor {
506b3f41edSDimitry Andric public:
ThreadPoolExecutor(ThreadPoolStrategy S=hardware_concurrency ())51cfca06d7SDimitry Andric explicit ThreadPoolExecutor(ThreadPoolStrategy S = hardware_concurrency()) {
527fa27ce4SDimitry Andric ThreadCount = S.compute_thread_count();
536b3f41edSDimitry Andric // Spawn all but one of the threads in another thread as spawning threads
546b3f41edSDimitry Andric // can take a while.
55706b4fc4SDimitry Andric Threads.reserve(ThreadCount);
56706b4fc4SDimitry Andric Threads.resize(1);
57706b4fc4SDimitry Andric std::lock_guard<std::mutex> Lock(Mutex);
587fa27ce4SDimitry Andric // Use operator[] before creating the thread to avoid data race in .size()
59*0fca6ea1SDimitry Andric // in 'safe libc++' mode.
607fa27ce4SDimitry Andric auto &Thread0 = Threads[0];
617fa27ce4SDimitry Andric Thread0 = std::thread([this, S] {
62cfca06d7SDimitry Andric for (unsigned I = 1; I < ThreadCount; ++I) {
63cfca06d7SDimitry Andric Threads.emplace_back([=] { work(S, I); });
64706b4fc4SDimitry Andric if (Stop)
65706b4fc4SDimitry Andric break;
666b3f41edSDimitry Andric }
67706b4fc4SDimitry Andric ThreadsCreated.set_value();
68cfca06d7SDimitry Andric work(S, 0);
69706b4fc4SDimitry Andric });
70706b4fc4SDimitry Andric }
71706b4fc4SDimitry Andric
stop()72706b4fc4SDimitry Andric void stop() {
73706b4fc4SDimitry Andric {
74706b4fc4SDimitry Andric std::lock_guard<std::mutex> Lock(Mutex);
75706b4fc4SDimitry Andric if (Stop)
76706b4fc4SDimitry Andric return;
77706b4fc4SDimitry Andric Stop = true;
78706b4fc4SDimitry Andric }
79706b4fc4SDimitry Andric Cond.notify_all();
80706b4fc4SDimitry Andric ThreadsCreated.get_future().wait();
816b3f41edSDimitry Andric }
826b3f41edSDimitry Andric
~ThreadPoolExecutor()836b3f41edSDimitry Andric ~ThreadPoolExecutor() override {
84706b4fc4SDimitry Andric stop();
85706b4fc4SDimitry Andric std::thread::id CurrentThreadId = std::this_thread::get_id();
86706b4fc4SDimitry Andric for (std::thread &T : Threads)
87706b4fc4SDimitry Andric if (T.get_id() == CurrentThreadId)
88706b4fc4SDimitry Andric T.detach();
89706b4fc4SDimitry Andric else
90706b4fc4SDimitry Andric T.join();
916b3f41edSDimitry Andric }
926b3f41edSDimitry Andric
93cfca06d7SDimitry Andric struct Creator {
callllvm::parallel::detail::__anon441af0fe0111::ThreadPoolExecutor::Creator94cfca06d7SDimitry Andric static void *call() { return new ThreadPoolExecutor(strategy); }
95cfca06d7SDimitry Andric };
96706b4fc4SDimitry Andric struct Deleter {
callllvm::parallel::detail::__anon441af0fe0111::ThreadPoolExecutor::Deleter97706b4fc4SDimitry Andric static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); }
98706b4fc4SDimitry Andric };
99706b4fc4SDimitry Andric
add(std::function<void ()> F)10054521a2fSDimitry Andric void add(std::function<void()> F) override {
101706b4fc4SDimitry Andric {
102706b4fc4SDimitry Andric std::lock_guard<std::mutex> Lock(Mutex);
10354521a2fSDimitry Andric WorkStack.push_back(std::move(F));
104706b4fc4SDimitry Andric }
1056b3f41edSDimitry Andric Cond.notify_one();
1066b3f41edSDimitry Andric }
1076b3f41edSDimitry Andric
getThreadCount() const1087fa27ce4SDimitry Andric size_t getThreadCount() const override { return ThreadCount; }
1097fa27ce4SDimitry Andric
1106b3f41edSDimitry Andric private:
work(ThreadPoolStrategy S,unsigned ThreadID)111cfca06d7SDimitry Andric void work(ThreadPoolStrategy S, unsigned ThreadID) {
112e3b55780SDimitry Andric threadIndex = ThreadID;
113cfca06d7SDimitry Andric S.apply_thread_strategy(ThreadID);
1146b3f41edSDimitry Andric while (true) {
1156b3f41edSDimitry Andric std::unique_lock<std::mutex> Lock(Mutex);
11654521a2fSDimitry Andric Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); });
1176b3f41edSDimitry Andric if (Stop)
1186b3f41edSDimitry Andric break;
11954521a2fSDimitry Andric auto Task = std::move(WorkStack.back());
12054521a2fSDimitry Andric WorkStack.pop_back();
1216b3f41edSDimitry Andric Lock.unlock();
1226b3f41edSDimitry Andric Task();
1236b3f41edSDimitry Andric }
1246b3f41edSDimitry Andric }
1256b3f41edSDimitry Andric
1266b3f41edSDimitry Andric std::atomic<bool> Stop{false};
12754521a2fSDimitry Andric std::vector<std::function<void()>> WorkStack;
1286b3f41edSDimitry Andric std::mutex Mutex;
1296b3f41edSDimitry Andric std::condition_variable Cond;
130706b4fc4SDimitry Andric std::promise<void> ThreadsCreated;
131706b4fc4SDimitry Andric std::vector<std::thread> Threads;
1327fa27ce4SDimitry Andric unsigned ThreadCount;
1336b3f41edSDimitry Andric };
1346b3f41edSDimitry Andric
getDefaultExecutor()1356b3f41edSDimitry Andric Executor *Executor::getDefaultExecutor() {
136706b4fc4SDimitry Andric // The ManagedStatic enables the ThreadPoolExecutor to be stopped via
137706b4fc4SDimitry Andric // llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This
138706b4fc4SDimitry Andric // stops the thread pool and waits for any worker thread creation to complete
139706b4fc4SDimitry Andric // but does not wait for the threads to finish. The wait for worker thread
140706b4fc4SDimitry Andric // creation to complete is important as it prevents intermittent crashes on
141706b4fc4SDimitry Andric // Windows due to a race condition between thread creation and process exit.
142706b4fc4SDimitry Andric //
143706b4fc4SDimitry Andric // The ThreadPoolExecutor will only be destroyed when the static unique_ptr to
144706b4fc4SDimitry Andric // it is destroyed, i.e. in a normal full exit. The ThreadPoolExecutor
145706b4fc4SDimitry Andric // destructor ensures it has been stopped and waits for worker threads to
146706b4fc4SDimitry Andric // finish. The wait is important as it prevents intermittent crashes on
147706b4fc4SDimitry Andric // Windows when the process is doing a full exit.
148706b4fc4SDimitry Andric //
149706b4fc4SDimitry Andric // The Windows crashes appear to only occur with the MSVC static runtimes and
150706b4fc4SDimitry Andric // are more frequent with the debug static runtime.
151706b4fc4SDimitry Andric //
152706b4fc4SDimitry Andric // This also prevents intermittent deadlocks on exit with the MinGW runtime.
153cfca06d7SDimitry Andric
154cfca06d7SDimitry Andric static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator,
155706b4fc4SDimitry Andric ThreadPoolExecutor::Deleter>
156706b4fc4SDimitry Andric ManagedExec;
157706b4fc4SDimitry Andric static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec));
158706b4fc4SDimitry Andric return Exec.get();
1596b3f41edSDimitry Andric }
1601d5ae102SDimitry Andric } // namespace
161e3b55780SDimitry Andric } // namespace detail
1626b3f41edSDimitry Andric
getThreadCount()1637fa27ce4SDimitry Andric size_t getThreadCount() {
1647fa27ce4SDimitry Andric return detail::Executor::getDefaultExecutor()->getThreadCount();
1657fa27ce4SDimitry Andric }
1667fa27ce4SDimitry Andric #endif
167e6d15924SDimitry Andric
168e6d15924SDimitry Andric // Latch::sync() called by the dtor may cause one thread to block. If is a dead
169e6d15924SDimitry Andric // lock if all threads in the default executor are blocked. To prevent the dead
1707fa27ce4SDimitry Andric // lock, only allow the root TaskGroup to run tasks parallelly. In the scenario
171e6d15924SDimitry Andric // of nested parallel_for_each(), only the outermost one runs parallelly.
TaskGroup()1727fa27ce4SDimitry Andric TaskGroup::TaskGroup()
1737fa27ce4SDimitry Andric #if LLVM_ENABLE_THREADS
1747fa27ce4SDimitry Andric : Parallel((parallel::strategy.ThreadsRequested != 1) &&
1757fa27ce4SDimitry Andric (threadIndex == UINT_MAX)) {}
1767fa27ce4SDimitry Andric #else
1777fa27ce4SDimitry Andric : Parallel(false) {}
1787fa27ce4SDimitry Andric #endif
~TaskGroup()179c0981da4SDimitry Andric TaskGroup::~TaskGroup() {
180c0981da4SDimitry Andric // We must ensure that all the workloads have finished before decrementing the
181c0981da4SDimitry Andric // instances count.
182c0981da4SDimitry Andric L.sync();
183c0981da4SDimitry Andric }
184e6d15924SDimitry Andric
spawn(std::function<void ()> F)18554521a2fSDimitry Andric void TaskGroup::spawn(std::function<void()> F) {
186e3b55780SDimitry Andric #if LLVM_ENABLE_THREADS
187e6d15924SDimitry Andric if (Parallel) {
1886b3f41edSDimitry Andric L.inc();
18954521a2fSDimitry Andric detail::Executor::getDefaultExecutor()->add([&, F = std::move(F)] {
1906b3f41edSDimitry Andric F();
1916b3f41edSDimitry Andric L.dec();
19254521a2fSDimitry Andric });
193e3b55780SDimitry Andric return;
194e3b55780SDimitry Andric }
195e3b55780SDimitry Andric #endif
196e6d15924SDimitry Andric F();
1976b3f41edSDimitry Andric }
198e6d15924SDimitry Andric
199e6d15924SDimitry Andric } // namespace parallel
200e6d15924SDimitry Andric } // namespace llvm
2016f8fc217SDimitry Andric
parallelFor(size_t Begin,size_t End,llvm::function_ref<void (size_t)> Fn)202145449b1SDimitry Andric void llvm::parallelFor(size_t Begin, size_t End,
2036f8fc217SDimitry Andric llvm::function_ref<void(size_t)> Fn) {
2046f8fc217SDimitry Andric #if LLVM_ENABLE_THREADS
2057fa27ce4SDimitry Andric if (parallel::strategy.ThreadsRequested != 1) {
2066f8fc217SDimitry Andric auto NumItems = End - Begin;
2076f8fc217SDimitry Andric // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling
2086f8fc217SDimitry Andric // overhead on large inputs.
2096f8fc217SDimitry Andric auto TaskSize = NumItems / parallel::detail::MaxTasksPerGroup;
2106f8fc217SDimitry Andric if (TaskSize == 0)
2116f8fc217SDimitry Andric TaskSize = 1;
2126f8fc217SDimitry Andric
213e3b55780SDimitry Andric parallel::TaskGroup TG;
2146f8fc217SDimitry Andric for (; Begin + TaskSize < End; Begin += TaskSize) {
2156f8fc217SDimitry Andric TG.spawn([=, &Fn] {
2166f8fc217SDimitry Andric for (size_t I = Begin, E = Begin + TaskSize; I != E; ++I)
2176f8fc217SDimitry Andric Fn(I);
2186f8fc217SDimitry Andric });
2196f8fc217SDimitry Andric }
2207fa27ce4SDimitry Andric if (Begin != End) {
2217fa27ce4SDimitry Andric TG.spawn([=, &Fn] {
2227fa27ce4SDimitry Andric for (size_t I = Begin; I != End; ++I)
2237fa27ce4SDimitry Andric Fn(I);
2247fa27ce4SDimitry Andric });
2257fa27ce4SDimitry Andric }
2266f8fc217SDimitry Andric return;
2276f8fc217SDimitry Andric }
2286f8fc217SDimitry Andric #endif
2296f8fc217SDimitry Andric
2306f8fc217SDimitry Andric for (; Begin != End; ++Begin)
2316f8fc217SDimitry Andric Fn(Begin);
2326f8fc217SDimitry Andric }
233