/* Copyright (C) 2021 Wildfire Games.
* This file is part of 0 A.D.
*
* 0 A.D. is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* 0 A.D. is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with 0 A.D. If not, see .
*/
#include "precompiled.h"
#include "TaskManager.h"
#include "lib/debug.h"
#include "maths/MathUtil.h"
#include "ps/CLogger.h"
#include "ps/ConfigDB.h"
#include "ps/Threading.h"
#include "ps/ThreadUtil.h"
#include "ps/Profiler2.h"
#include
#include
#include
#include
#include
#include
namespace Threading
{
/**
* Minimum number of TaskManager workers.
*/
static constexpr size_t MIN_THREADS = 3;
/**
* Maximum number of TaskManager workers.
*/
static constexpr size_t MAX_THREADS = 32;
std::unique_ptr g_TaskManager;
class Thread;
using QueueItem = std::function;
}
/**
* Light wrapper around std::thread. Ensures Join has been called.
*/
class Threading::Thread
{
public:
Thread() = default;
Thread(const Thread&) = delete;
Thread(Thread&&) = delete;
template
void Start(T* object)
{
m_Thread = std::thread(Threading::HandleExceptions>::Wrapper, object);
}
template
static void DoStart(T* object);
protected:
~Thread()
{
ENSURE(!m_Thread.joinable());
}
std::thread m_Thread;
std::atomic m_Kill = false;
};
/**
* Worker thread: process the taskManager queues until killed.
*/
class Threading::WorkerThread : public Thread
{
public:
WorkerThread(Threading::TaskManager::Impl& taskManager);
~WorkerThread();
/**
* Wake the worker.
*/
void Wake();
protected:
void RunUntilDeath();
std::mutex m_Mutex;
std::condition_variable m_ConditionVariable;
Threading::TaskManager::Impl& m_TaskManager;
};
/**
* PImpl-ed implementation of the Task manager.
*
* The normal priority queue is processed first, the low priority only if there are no higher-priority tasks
*/
class Threading::TaskManager::Impl
{
friend class TaskManager;
friend class WorkerThread;
public:
Impl(TaskManager& backref);
~Impl()
{
ClearQueue();
m_Workers.clear();
}
/**
* 2-phase init to avoid having to think too hard about the order of class members.
*/
void SetupWorkers(size_t numberOfWorkers);
/**
* Push a task on the global queue.
* Takes ownership of @a task.
* May be called from any thread.
*/
void PushTask(std::function&& task, TaskPriority priority);
protected:
void ClearQueue();
template
bool PopTask(std::function& taskOut);
// Back reference (keep this first).
TaskManager& m_TaskManager;
std::atomic m_HasWork;
std::atomic m_HasLowPriorityWork;
std::mutex m_GlobalMutex;
std::mutex m_GlobalLowPriorityMutex;
std::deque m_GlobalQueue;
std::deque m_GlobalLowPriorityQueue;
// Ideally this would be a vector, since it does get iterated, but that requires movable types.
std::deque m_Workers;
// Round-robin counter for GetWorker.
mutable size_t m_RoundRobinIdx = 0;
};
Threading::TaskManager::TaskManager() : TaskManager(std::thread::hardware_concurrency() - 1)
{
}
Threading::TaskManager::TaskManager(size_t numberOfWorkers)
{
m = std::make_unique(*this);
numberOfWorkers = Clamp(numberOfWorkers, MIN_THREADS, MAX_THREADS);
m->SetupWorkers(numberOfWorkers);
}
Threading::TaskManager::~TaskManager() {}
Threading::TaskManager::Impl::Impl(TaskManager& backref)
: m_TaskManager(backref)
{
}
void Threading::TaskManager::Impl::SetupWorkers(size_t numberOfWorkers)
{
for (size_t i = 0; i < numberOfWorkers; ++i)
m_Workers.emplace_back(*this);
}
void Threading::TaskManager::ClearQueue() { m->ClearQueue(); }
void Threading::TaskManager::Impl::ClearQueue()
{
{
std::lock_guard lock(m_GlobalMutex);
m_GlobalQueue.clear();
}
{
std::lock_guard lock(m_GlobalLowPriorityMutex);
m_GlobalLowPriorityQueue.clear();
}
}
size_t Threading::TaskManager::GetNumberOfWorkers() const
{
return m->m_Workers.size();
}
void Threading::TaskManager::DoPushTask(std::function&& task, TaskPriority priority)
{
m->PushTask(std::move(task), priority);
}
void Threading::TaskManager::Impl::PushTask(std::function&& task, TaskPriority priority)
{
std::mutex& mutex = priority == TaskPriority::NORMAL ? m_GlobalMutex : m_GlobalLowPriorityMutex;
std::deque& queue = priority == TaskPriority::NORMAL ? m_GlobalQueue : m_GlobalLowPriorityQueue;
std::atomic& hasWork = priority == TaskPriority::NORMAL ? m_HasWork : m_HasLowPriorityWork;
{
std::lock_guard lock(mutex);
queue.emplace_back(std::move(task));
hasWork = true;
}
for (WorkerThread& worker : m_Workers)
worker.Wake();
}
template
bool Threading::TaskManager::Impl::PopTask(std::function& taskOut)
{
std::mutex& mutex = Priority == TaskPriority::NORMAL ? m_GlobalMutex : m_GlobalLowPriorityMutex;
std::deque& queue = Priority == TaskPriority::NORMAL ? m_GlobalQueue : m_GlobalLowPriorityQueue;
std::atomic& hasWork = Priority == TaskPriority::NORMAL ? m_HasWork : m_HasLowPriorityWork;
// Particularly critical section since we're locking the global queue.
std::lock_guard globalLock(mutex);
if (!queue.empty())
{
taskOut = std::move(queue.front());
queue.pop_front();
hasWork = !queue.empty();
return true;
}
return false;
}
void Threading::TaskManager::Initialise()
{
if (!g_TaskManager)
g_TaskManager = std::make_unique();
}
Threading::TaskManager& Threading::TaskManager::Instance()
{
ENSURE(g_TaskManager);
return *g_TaskManager;
}
// Thread definition
Threading::WorkerThread::WorkerThread(Threading::TaskManager::Impl& taskManager)
: m_TaskManager(taskManager)
{
Start(this);
}
Threading::WorkerThread::~WorkerThread()
{
m_Kill = true;
m_ConditionVariable.notify_one();
if (m_Thread.joinable())
m_Thread.join();
}
void Threading::WorkerThread::Wake()
{
m_ConditionVariable.notify_one();
}
void Threading::WorkerThread::RunUntilDeath()
{
// The profiler does better if the names are unique.
static std::atomic n = 0;
std::string name = "Task Mgr #" + std::to_string(n++);
debug_SetThreadName(name.c_str());
g_Profiler2.RegisterCurrentThread(name);
std::function task;
bool hasTask = false;
std::unique_lock lock(m_Mutex, std::defer_lock);
while (!m_Kill)
{
lock.lock();
m_ConditionVariable.wait(lock, [this](){
return m_Kill || m_TaskManager.m_HasWork || m_TaskManager.m_HasLowPriorityWork;
});
lock.unlock();
if (m_Kill)
break;
// Fetch work from the global queues.
hasTask = m_TaskManager.PopTask(task);
if (!hasTask)
hasTask = m_TaskManager.PopTask(task);
if (hasTask)
task();
}
}
// Defined here - needs access to derived types.
template
void Threading::Thread::DoStart(T* object)
{
std::invoke(callable, object);
}