reference, declarationdefinition
definition → references, declarations, derived classes, virtual overrides
reference to multiple definitions → definitions
unreferenced
    1
    2
    3
    4
    5
    6
    7
    8
    9
   10
   11
   12
   13
   14
   15
   16
   17
   18
   19
   20
   21
   22
   23
   24
   25
   26
   27
   28
   29
   30
   31
   32
   33
   34
   35
   36
   37
   38
   39
   40
   41
   42
   43
   44
   45
   46
   47
   48
   49
   50
   51
   52
   53
   54
   55
   56
   57
   58
   59
   60
   61
   62
   63
   64
   65
   66
   67
   68
   69
   70
   71
   72
   73
   74
   75
   76
   77
   78
   79
   80
   81
   82
   83
   84
   85
   86
   87
   88
   89
   90
   91
   92
   93
   94
   95
   96
   97
   98
   99
  100
  101
  102
  103
  104
  105
  106
  107
  108
  109
  110
  111
  112
  113
  114
  115
  116
  117
  118
  119
  120
  121
  122
  123
  124
  125
  126
  127
  128
  129
  130
  131
  132
  133
  134
  135
  136
  137
  138
  139
  140
  141
  142
  143
  144
  145
//==-- llvm/Support/ThreadPool.cpp - A ThreadPool implementation -*- C++ -*-==//
//
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
// See https://llvm.org/LICENSE.txt for license information.
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
//
//===----------------------------------------------------------------------===//
//
// This file implements a crude C++11 based thread pool.
//
//===----------------------------------------------------------------------===//

#include "llvm/Support/ThreadPool.h"

#include "llvm/Config/llvm-config.h"
#include "llvm/Support/Threading.h"
#include "llvm/Support/raw_ostream.h"

using namespace llvm;

#if LLVM_ENABLE_THREADS

// Default to hardware_concurrency
ThreadPool::ThreadPool() : ThreadPool(hardware_concurrency()) {}

ThreadPool::ThreadPool(unsigned ThreadCount)
    : ActiveThreads(0), EnableFlag(true) {
  // Create ThreadCount threads that will loop forever, wait on QueueCondition
  // for tasks to be queued or the Pool to be destroyed.
  Threads.reserve(ThreadCount);
  for (unsigned ThreadID = 0; ThreadID < ThreadCount; ++ThreadID) {
    Threads.emplace_back([&] {
      while (true) {
        PackagedTaskTy Task;
        {
          std::unique_lock<std::mutex> LockGuard(QueueLock);
          // Wait for tasks to be pushed in the queue
          QueueCondition.wait(LockGuard,
                              [&] { return !EnableFlag || !Tasks.empty(); });
          // Exit condition
          if (!EnableFlag && Tasks.empty())
            return;
          // Yeah, we have a task, grab it and release the lock on the queue

          // We first need to signal that we are active before popping the queue
          // in order for wait() to properly detect that even if the queue is
          // empty, there is still a task in flight.
          {
            std::unique_lock<std::mutex> LockGuard(CompletionLock);
            ++ActiveThreads;
          }
          Task = std::move(Tasks.front());
          Tasks.pop();
        }
        // Run the task we just grabbed
        Task();

        {
          // Adjust `ActiveThreads`, in case someone waits on ThreadPool::wait()
          std::unique_lock<std::mutex> LockGuard(CompletionLock);
          --ActiveThreads;
        }

        // Notify task completion, in case someone waits on ThreadPool::wait()
        CompletionCondition.notify_all();
      }
    });
  }
}

void ThreadPool::wait() {
  // Wait for all threads to complete and the queue to be empty
  std::unique_lock<std::mutex> LockGuard(CompletionLock);
  // The order of the checks for ActiveThreads and Tasks.empty() matters because
  // any active threads might be modifying the Tasks queue, and this would be a
  // race.
  CompletionCondition.wait(LockGuard,
                           [&] { return !ActiveThreads && Tasks.empty(); });
}

std::shared_future<void> ThreadPool::asyncImpl(TaskTy Task) {
  /// Wrap the Task in a packaged_task to return a future object.
  PackagedTaskTy PackagedTask(std::move(Task));
  auto Future = PackagedTask.get_future();
  {
    // Lock the queue and push the new task
    std::unique_lock<std::mutex> LockGuard(QueueLock);

    // Don't allow enqueueing after disabling the pool
    assert(EnableFlag && "Queuing a thread during ThreadPool destruction");

    Tasks.push(std::move(PackagedTask));
  }
  QueueCondition.notify_one();
  return Future.share();
}

// The destructor joins all threads, waiting for completion.
ThreadPool::~ThreadPool() {
  {
    std::unique_lock<std::mutex> LockGuard(QueueLock);
    EnableFlag = false;
  }
  QueueCondition.notify_all();
  for (auto &Worker : Threads)
    Worker.join();
}

#else // LLVM_ENABLE_THREADS Disabled

ThreadPool::ThreadPool() : ThreadPool(0) {}

// No threads are launched, issue a warning if ThreadCount is not 0
ThreadPool::ThreadPool(unsigned ThreadCount)
    : ActiveThreads(0) {
  if (ThreadCount) {
    errs() << "Warning: request a ThreadPool with " << ThreadCount
           << " threads, but LLVM_ENABLE_THREADS has been turned off\n";
  }
}

void ThreadPool::wait() {
  // Sequential implementation running the tasks
  while (!Tasks.empty()) {
    auto Task = std::move(Tasks.front());
    Tasks.pop();
    Task();
  }
}

std::shared_future<void> ThreadPool::asyncImpl(TaskTy Task) {
  // Get a Future with launch::deferred execution using std::async
  auto Future = std::async(std::launch::deferred, std::move(Task)).share();
  // Wrap the future so that both ThreadPool::wait() can operate and the
  // returned future can be sync'ed on.
  PackagedTaskTy PackagedTask([Future]() { Future.get(); });
  Tasks.push(std::move(PackagedTask));
  return Future;
}

ThreadPool::~ThreadPool() {
  wait();
}

#endif