xref: /src/contrib/llvm-project/llvm/lib/Support/Parallel.cpp (revision 0fca6ea1d4eea4c934cfff25ac9ee8ad6fe95583)
16b3f41edSDimitry Andric //===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===//
26b3f41edSDimitry Andric //
3e6d15924SDimitry Andric // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4e6d15924SDimitry Andric // See https://llvm.org/LICENSE.txt for license information.
5e6d15924SDimitry Andric // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
66b3f41edSDimitry Andric //
76b3f41edSDimitry Andric //===----------------------------------------------------------------------===//
86b3f41edSDimitry Andric 
96b3f41edSDimitry Andric #include "llvm/Support/Parallel.h"
106b3f41edSDimitry Andric #include "llvm/Config/llvm-config.h"
11706b4fc4SDimitry Andric #include "llvm/Support/ManagedStatic.h"
12044eb2f6SDimitry Andric #include "llvm/Support/Threading.h"
136b3f41edSDimitry Andric 
146b3f41edSDimitry Andric #include <atomic>
15706b4fc4SDimitry Andric #include <future>
166b3f41edSDimitry Andric #include <thread>
17706b4fc4SDimitry Andric #include <vector>
186b3f41edSDimitry Andric 
19cfca06d7SDimitry Andric llvm::ThreadPoolStrategy llvm::parallel::strategy;
20cfca06d7SDimitry Andric 
21e6d15924SDimitry Andric namespace llvm {
22e6d15924SDimitry Andric namespace parallel {
23e3b55780SDimitry Andric #if LLVM_ENABLE_THREADS
24e3b55780SDimitry Andric 
25e3b55780SDimitry Andric #ifdef _WIN32
267fa27ce4SDimitry Andric static thread_local unsigned threadIndex = UINT_MAX;
27e3b55780SDimitry Andric 
getThreadIndex()287fa27ce4SDimitry Andric unsigned getThreadIndex() { GET_THREAD_INDEX_IMPL; }
29e3b55780SDimitry Andric #else
307fa27ce4SDimitry Andric thread_local unsigned threadIndex = UINT_MAX;
31e3b55780SDimitry Andric #endif
32e3b55780SDimitry Andric 
33e6d15924SDimitry Andric namespace detail {
346b3f41edSDimitry Andric 
356b3f41edSDimitry Andric namespace {
366b3f41edSDimitry Andric 
37eb11fae6SDimitry Andric /// An abstract class that takes closures and runs them asynchronously.
386b3f41edSDimitry Andric class Executor {
396b3f41edSDimitry Andric public:
406b3f41edSDimitry Andric   virtual ~Executor() = default;
4154521a2fSDimitry Andric   virtual void add(std::function<void()> func) = 0;
427fa27ce4SDimitry Andric   virtual size_t getThreadCount() const = 0;
436b3f41edSDimitry Andric 
446b3f41edSDimitry Andric   static Executor *getDefaultExecutor();
456b3f41edSDimitry Andric };
466b3f41edSDimitry Andric 
47eb11fae6SDimitry Andric /// An implementation of an Executor that runs closures on a thread pool
486b3f41edSDimitry Andric ///   in filo order.
496b3f41edSDimitry Andric class ThreadPoolExecutor : public Executor {
506b3f41edSDimitry Andric public:
ThreadPoolExecutor(ThreadPoolStrategy S=hardware_concurrency ())51cfca06d7SDimitry Andric   explicit ThreadPoolExecutor(ThreadPoolStrategy S = hardware_concurrency()) {
527fa27ce4SDimitry Andric     ThreadCount = S.compute_thread_count();
536b3f41edSDimitry Andric     // Spawn all but one of the threads in another thread as spawning threads
546b3f41edSDimitry Andric     // can take a while.
55706b4fc4SDimitry Andric     Threads.reserve(ThreadCount);
56706b4fc4SDimitry Andric     Threads.resize(1);
57706b4fc4SDimitry Andric     std::lock_guard<std::mutex> Lock(Mutex);
587fa27ce4SDimitry Andric     // Use operator[] before creating the thread to avoid data race in .size()
59*0fca6ea1SDimitry Andric     // in 'safe libc++' mode.
607fa27ce4SDimitry Andric     auto &Thread0 = Threads[0];
617fa27ce4SDimitry Andric     Thread0 = std::thread([this, S] {
62cfca06d7SDimitry Andric       for (unsigned I = 1; I < ThreadCount; ++I) {
63cfca06d7SDimitry Andric         Threads.emplace_back([=] { work(S, I); });
64706b4fc4SDimitry Andric         if (Stop)
65706b4fc4SDimitry Andric           break;
666b3f41edSDimitry Andric       }
67706b4fc4SDimitry Andric       ThreadsCreated.set_value();
68cfca06d7SDimitry Andric       work(S, 0);
69706b4fc4SDimitry Andric     });
70706b4fc4SDimitry Andric   }
71706b4fc4SDimitry Andric 
stop()72706b4fc4SDimitry Andric   void stop() {
73706b4fc4SDimitry Andric     {
74706b4fc4SDimitry Andric       std::lock_guard<std::mutex> Lock(Mutex);
75706b4fc4SDimitry Andric       if (Stop)
76706b4fc4SDimitry Andric         return;
77706b4fc4SDimitry Andric       Stop = true;
78706b4fc4SDimitry Andric     }
79706b4fc4SDimitry Andric     Cond.notify_all();
80706b4fc4SDimitry Andric     ThreadsCreated.get_future().wait();
816b3f41edSDimitry Andric   }
826b3f41edSDimitry Andric 
~ThreadPoolExecutor()836b3f41edSDimitry Andric   ~ThreadPoolExecutor() override {
84706b4fc4SDimitry Andric     stop();
85706b4fc4SDimitry Andric     std::thread::id CurrentThreadId = std::this_thread::get_id();
86706b4fc4SDimitry Andric     for (std::thread &T : Threads)
87706b4fc4SDimitry Andric       if (T.get_id() == CurrentThreadId)
88706b4fc4SDimitry Andric         T.detach();
89706b4fc4SDimitry Andric       else
90706b4fc4SDimitry Andric         T.join();
916b3f41edSDimitry Andric   }
926b3f41edSDimitry Andric 
93cfca06d7SDimitry Andric   struct Creator {
callllvm::parallel::detail::__anon441af0fe0111::ThreadPoolExecutor::Creator94cfca06d7SDimitry Andric     static void *call() { return new ThreadPoolExecutor(strategy); }
95cfca06d7SDimitry Andric   };
96706b4fc4SDimitry Andric   struct Deleter {
callllvm::parallel::detail::__anon441af0fe0111::ThreadPoolExecutor::Deleter97706b4fc4SDimitry Andric     static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); }
98706b4fc4SDimitry Andric   };
99706b4fc4SDimitry Andric 
add(std::function<void ()> F)10054521a2fSDimitry Andric   void add(std::function<void()> F) override {
101706b4fc4SDimitry Andric     {
102706b4fc4SDimitry Andric       std::lock_guard<std::mutex> Lock(Mutex);
10354521a2fSDimitry Andric       WorkStack.push_back(std::move(F));
104706b4fc4SDimitry Andric     }
1056b3f41edSDimitry Andric     Cond.notify_one();
1066b3f41edSDimitry Andric   }
1076b3f41edSDimitry Andric 
getThreadCount() const1087fa27ce4SDimitry Andric   size_t getThreadCount() const override { return ThreadCount; }
1097fa27ce4SDimitry Andric 
1106b3f41edSDimitry Andric private:
work(ThreadPoolStrategy S,unsigned ThreadID)111cfca06d7SDimitry Andric   void work(ThreadPoolStrategy S, unsigned ThreadID) {
112e3b55780SDimitry Andric     threadIndex = ThreadID;
113cfca06d7SDimitry Andric     S.apply_thread_strategy(ThreadID);
1146b3f41edSDimitry Andric     while (true) {
1156b3f41edSDimitry Andric       std::unique_lock<std::mutex> Lock(Mutex);
11654521a2fSDimitry Andric       Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); });
1176b3f41edSDimitry Andric       if (Stop)
1186b3f41edSDimitry Andric         break;
11954521a2fSDimitry Andric       auto Task = std::move(WorkStack.back());
12054521a2fSDimitry Andric       WorkStack.pop_back();
1216b3f41edSDimitry Andric       Lock.unlock();
1226b3f41edSDimitry Andric       Task();
1236b3f41edSDimitry Andric     }
1246b3f41edSDimitry Andric   }
1256b3f41edSDimitry Andric 
1266b3f41edSDimitry Andric   std::atomic<bool> Stop{false};
12754521a2fSDimitry Andric   std::vector<std::function<void()>> WorkStack;
1286b3f41edSDimitry Andric   std::mutex Mutex;
1296b3f41edSDimitry Andric   std::condition_variable Cond;
130706b4fc4SDimitry Andric   std::promise<void> ThreadsCreated;
131706b4fc4SDimitry Andric   std::vector<std::thread> Threads;
1327fa27ce4SDimitry Andric   unsigned ThreadCount;
1336b3f41edSDimitry Andric };
1346b3f41edSDimitry Andric 
getDefaultExecutor()1356b3f41edSDimitry Andric Executor *Executor::getDefaultExecutor() {
136706b4fc4SDimitry Andric   // The ManagedStatic enables the ThreadPoolExecutor to be stopped via
137706b4fc4SDimitry Andric   // llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This
138706b4fc4SDimitry Andric   // stops the thread pool and waits for any worker thread creation to complete
139706b4fc4SDimitry Andric   // but does not wait for the threads to finish. The wait for worker thread
140706b4fc4SDimitry Andric   // creation to complete is important as it prevents intermittent crashes on
141706b4fc4SDimitry Andric   // Windows due to a race condition between thread creation and process exit.
142706b4fc4SDimitry Andric   //
143706b4fc4SDimitry Andric   // The ThreadPoolExecutor will only be destroyed when the static unique_ptr to
144706b4fc4SDimitry Andric   // it is destroyed, i.e. in a normal full exit. The ThreadPoolExecutor
145706b4fc4SDimitry Andric   // destructor ensures it has been stopped and waits for worker threads to
146706b4fc4SDimitry Andric   // finish. The wait is important as it prevents intermittent crashes on
147706b4fc4SDimitry Andric   // Windows when the process is doing a full exit.
148706b4fc4SDimitry Andric   //
149706b4fc4SDimitry Andric   // The Windows crashes appear to only occur with the MSVC static runtimes and
150706b4fc4SDimitry Andric   // are more frequent with the debug static runtime.
151706b4fc4SDimitry Andric   //
152706b4fc4SDimitry Andric   // This also prevents intermittent deadlocks on exit with the MinGW runtime.
153cfca06d7SDimitry Andric 
154cfca06d7SDimitry Andric   static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator,
155706b4fc4SDimitry Andric                        ThreadPoolExecutor::Deleter>
156706b4fc4SDimitry Andric       ManagedExec;
157706b4fc4SDimitry Andric   static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec));
158706b4fc4SDimitry Andric   return Exec.get();
1596b3f41edSDimitry Andric }
1601d5ae102SDimitry Andric } // namespace
161e3b55780SDimitry Andric } // namespace detail
1626b3f41edSDimitry Andric 
getThreadCount()1637fa27ce4SDimitry Andric size_t getThreadCount() {
1647fa27ce4SDimitry Andric   return detail::Executor::getDefaultExecutor()->getThreadCount();
1657fa27ce4SDimitry Andric }
1667fa27ce4SDimitry Andric #endif
167e6d15924SDimitry Andric 
168e6d15924SDimitry Andric // Latch::sync() called by the dtor may cause one thread to block. If is a dead
169e6d15924SDimitry Andric // lock if all threads in the default executor are blocked. To prevent the dead
1707fa27ce4SDimitry Andric // lock, only allow the root TaskGroup to run tasks parallelly. In the scenario
171e6d15924SDimitry Andric // of nested parallel_for_each(), only the outermost one runs parallelly.
TaskGroup()1727fa27ce4SDimitry Andric TaskGroup::TaskGroup()
1737fa27ce4SDimitry Andric #if LLVM_ENABLE_THREADS
1747fa27ce4SDimitry Andric     : Parallel((parallel::strategy.ThreadsRequested != 1) &&
1757fa27ce4SDimitry Andric                (threadIndex == UINT_MAX)) {}
1767fa27ce4SDimitry Andric #else
1777fa27ce4SDimitry Andric     : Parallel(false) {}
1787fa27ce4SDimitry Andric #endif
~TaskGroup()179c0981da4SDimitry Andric TaskGroup::~TaskGroup() {
180c0981da4SDimitry Andric   // We must ensure that all the workloads have finished before decrementing the
181c0981da4SDimitry Andric   // instances count.
182c0981da4SDimitry Andric   L.sync();
183c0981da4SDimitry Andric }
184e6d15924SDimitry Andric 
spawn(std::function<void ()> F)18554521a2fSDimitry Andric void TaskGroup::spawn(std::function<void()> F) {
186e3b55780SDimitry Andric #if LLVM_ENABLE_THREADS
187e6d15924SDimitry Andric   if (Parallel) {
1886b3f41edSDimitry Andric     L.inc();
18954521a2fSDimitry Andric     detail::Executor::getDefaultExecutor()->add([&, F = std::move(F)] {
1906b3f41edSDimitry Andric       F();
1916b3f41edSDimitry Andric       L.dec();
19254521a2fSDimitry Andric     });
193e3b55780SDimitry Andric     return;
194e3b55780SDimitry Andric   }
195e3b55780SDimitry Andric #endif
196e6d15924SDimitry Andric   F();
1976b3f41edSDimitry Andric }
198e6d15924SDimitry Andric 
199e6d15924SDimitry Andric } // namespace parallel
200e6d15924SDimitry Andric } // namespace llvm
2016f8fc217SDimitry Andric 
parallelFor(size_t Begin,size_t End,llvm::function_ref<void (size_t)> Fn)202145449b1SDimitry Andric void llvm::parallelFor(size_t Begin, size_t End,
2036f8fc217SDimitry Andric                        llvm::function_ref<void(size_t)> Fn) {
2046f8fc217SDimitry Andric #if LLVM_ENABLE_THREADS
2057fa27ce4SDimitry Andric   if (parallel::strategy.ThreadsRequested != 1) {
2066f8fc217SDimitry Andric     auto NumItems = End - Begin;
2076f8fc217SDimitry Andric     // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling
2086f8fc217SDimitry Andric     // overhead on large inputs.
2096f8fc217SDimitry Andric     auto TaskSize = NumItems / parallel::detail::MaxTasksPerGroup;
2106f8fc217SDimitry Andric     if (TaskSize == 0)
2116f8fc217SDimitry Andric       TaskSize = 1;
2126f8fc217SDimitry Andric 
213e3b55780SDimitry Andric     parallel::TaskGroup TG;
2146f8fc217SDimitry Andric     for (; Begin + TaskSize < End; Begin += TaskSize) {
2156f8fc217SDimitry Andric       TG.spawn([=, &Fn] {
2166f8fc217SDimitry Andric         for (size_t I = Begin, E = Begin + TaskSize; I != E; ++I)
2176f8fc217SDimitry Andric           Fn(I);
2186f8fc217SDimitry Andric       });
2196f8fc217SDimitry Andric     }
2207fa27ce4SDimitry Andric     if (Begin != End) {
2217fa27ce4SDimitry Andric       TG.spawn([=, &Fn] {
2227fa27ce4SDimitry Andric         for (size_t I = Begin; I != End; ++I)
2237fa27ce4SDimitry Andric           Fn(I);
2247fa27ce4SDimitry Andric       });
2257fa27ce4SDimitry Andric     }
2266f8fc217SDimitry Andric     return;
2276f8fc217SDimitry Andric   }
2286f8fc217SDimitry Andric #endif
2296f8fc217SDimitry Andric 
2306f8fc217SDimitry Andric   for (; Begin != End; ++Begin)
2316f8fc217SDimitry Andric     Fn(Begin);
2326f8fc217SDimitry Andric }
233