#include <atomic>
#include <mutex>
#include <condition_variable>
#include <list>
#include <thread>
#include <queue>
#include <tuple>
#include <future>
#include <iostream>
template <typename T, typename Lockable = std::mutex>
class Synchronized {
public:
template <typename... Args>
Synchronized(Args&&... args) : m_value(std::forward<Args>(args)...) {}
const T& Get() const {
return m_value;
}
T& Get() {
return m_value;
}
Lockable& GetLockable() const {
return m_lockable;
}
private:
T m_value;
mutable Lockable m_lockable;
private:
Synchronized(const Synchronized&) = delete;
Synchronized& operator=(const Synchronized&) = delete;
};
class ThreadPool final {
public:
enum class TaskPriority { LOW, NORMAL, HIGH };
struct Task {
template <typename F>
Task(F&& func, TaskPriority priority) :
func(std::forward<F>(func)), priority(priority) {}
std::function<void()> func;
TaskPriority priority;
bool operator<(const Task& rhs) const {
return priority < rhs.priority;
}
};
ThreadPool(unsigned numThreads = std::thread::hardware_concurrency()) : m_run(true) {
for (unsigned i = 0; i < numThreads; i++) {
m_worker_threads.emplace_back([this]() { Run(); });
}
}
~ThreadPool() {
if (!m_worker_threads.empty())
Shutdown();
}
void Shutdown() {
m_run = false;
m_wakeup_condition.notify_all();
for (auto& thread : m_worker_threads)
if (thread.joinable())
thread.join();
m_worker_threads.clear();
}
template <typename F, typename... Args>
void AddTask(F&& func, TaskPriority priority = TaskPriority::NORMAL, Args&&... args) {
{
std::unique_lock<std::mutex> queue_lock(m_task_queue.GetLockable());
m_task_queue.Get().emplace([func = std::forward<F>(func), tupple_args = std::forward_as_tuple(args...)](){
std::apply(func, tupple_args);
}, priority);
}
m_wakeup_condition.notify_one();
}
template <typename F, typename... Args>
std::future<typename std::result_of<F(Args...)>::type> AddTrackedTask(F&& func, TaskPriority priority = TaskPriority::NORMAL, Args&&... args) {
auto task = std::make_shared<std::packaged_task<typename std::result_of<F(Args...)>::type(Args...)>>(std::forward<F>(func));
auto future = task->get_future();
{
std::unique_lock<std::mutex> queue_lock(m_task_queue.GetLockable());
m_task_queue.Get().emplace([task = std::move(task), tupple_args = std::forward_as_tuple(args...)](){
std::apply(*task, tupple_args); }, priority);
}
m_wakeup_condition.notify_one();
return future;
}
void SetThreadCount(size_t count) {
Shutdown();
m_run = true;
for (unsigned i = 0; i < count; i++) {
m_worker_threads.emplace_back([this]() { Run(); });
}
}
size_t GetThreadCount() const {
return m_worker_threads.size();
}
private:
void Run() {
while (m_run) {
std::function<void()> task;
{
std::unique_lock<std::mutex> queue_lock(m_task_queue.GetLockable());
m_wakeup_condition.wait(queue_lock, [&]() { return !m_task_queue.Get().empty() || !m_run; });
if (!m_run)
return;
task = std::move(m_task_queue.Get().top().func);
m_task_queue.Get().pop();
}
task();
}
}
private:
std::list<std::thread> m_worker_threads;
Synchronized<std::priority_queue<Task>> m_task_queue;
std::condition_variable m_wakeup_condition;
bool m_run;
private:
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
};
int main() {
ThreadPool tp;
std::mutex cout_mutex;
auto func = [&cout_mutex](const std::string& message) {
std::lock_guard<std::mutex> cout_lock(cout_mutex);
std::cout << message << " from " << std::this_thread::get_id() << std::endl;
};
tp.AddTask(func, ThreadPool::TaskPriority::NORMAL, "Untracked task");
auto f = tp.AddTrackedTask(func, ThreadPool::TaskPriority::HIGH, "Tracked task");
f.get();
tp.Shutdown();
return 0;
}