Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 42 additions & 8 deletions code/utils/threading.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,19 @@

namespace threading {
static size_t num_threads = 1;

static std::condition_variable wait_for_task;
static std::mutex wait_for_task_mutex;
static bool wait_for_task_condition;
static std::atomic_uint32_t wait_for_spindown_tasks;

static std::condition_variable wait_for_spindown_tasks;
static std::mutex wait_for_spindown_task_mutex;
static size_t wait_for_spindown_tasks_counter;

static std::condition_variable wait_for_spinup_tasks;
static std::mutex wait_for_spinup_task_mutex;
static size_t wait_for_spinup_tasks_counter;

static std::atomic<WorkerThreadTask> worker_task;

static SCP_vector<std::thread> worker_threads;
Expand All @@ -29,11 +38,21 @@ namespace threading {
static void mp_worker_thread_main(size_t threadIdx) {
while(true) {
{
//We're waiting for a new task, so spindown was successful
wait_for_spindown_tasks.fetch_add(1, std::memory_order_release);
std::scoped_lock lock {wait_for_spindown_task_mutex};
++wait_for_spindown_tasks_counter;
wait_for_spindown_tasks.notify_all();
}
//We're waiting for a new task, so spindown was successful
{
std::unique_lock<std::mutex> lk(wait_for_task_mutex);
wait_for_task.wait(lk, []() { return wait_for_task_condition; });
}
//Notify that we passed the wait and can now start processing. This is necessary, as slow thread wakeups in very low workloads could cause the wait_for_task_condition to be false, never waking up the thread, locking on spindown wait
{
std::scoped_lock lock {wait_for_spinup_task_mutex};
++wait_for_spinup_tasks_counter;
wait_for_spinup_tasks.notify_all();
}

switch (worker_task.load(std::memory_order_acquire)) {
case WorkerThreadTask::EXIT:
Expand Down Expand Up @@ -148,7 +167,11 @@ namespace threading {
//External Functions

void spin_up_threaded_task(WorkerThreadTask task) {
wait_for_spindown_tasks.store(0, std::memory_order_release);
{
std::scoped_lock lock {wait_for_spindown_task_mutex};
wait_for_spindown_tasks_counter = 0;
//No notify here cause we only ever lock, never unlock the wait here.
}
worker_task.store(task);
{
std::scoped_lock lock {wait_for_task_mutex};
Expand All @@ -158,13 +181,24 @@ namespace threading {
}

void spin_down_threaded_task() {
std::scoped_lock lock {wait_for_task_mutex};
wait_for_task_condition = false;
//Make sure all threads are actually running before stopping the pool again
{
std::unique_lock<std::mutex> lk(wait_for_spinup_task_mutex);
wait_for_spinup_tasks.wait(lk, []() { return wait_for_spinup_tasks_counter >= num_threads; });
wait_for_spinup_tasks_counter = 0;
}
{
std::scoped_lock lock {wait_for_task_mutex};
wait_for_task_condition = false;
}
}

void spin_down_wait_complete() {
//Technically, spindowns should only occur when the actual code is confirmed to be complete. So busy-waiting here is not an issue.
while (wait_for_spindown_tasks.load(std::memory_order_acquire) < num_threads);
//Technically, spindowns should only occur when the actual code is confirmed to be complete. So we shouldn't be waiting for long here
{
std::unique_lock<std::mutex> lk(wait_for_spindown_task_mutex);
wait_for_spindown_tasks.wait(lk, []() { return wait_for_spindown_tasks_counter >= num_threads; });
};
}

void init_task_pool() {
Expand Down
Loading