/*
* Copyright (C) Nemirtingas
* This file is part of System.
*
* System is free software; you can redistribute it
* and/or modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* System is distributed in the hope that it will be
* useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with the System; if not, see
* .
*/
#pragma once
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
namespace System {
class ThreadPool
{
using task_t = std::function;
std::atomic _StopWorkers;
std::atomic _ActiveCount;
std::condition_variable _WorkerNotifier;
std::mutex _Mutex;
std::vector _Workers;
std::queue _Tasks;
public:
explicit ThreadPool():
_ActiveCount(0)
{
}
~ThreadPool()
{
Join();
}
ThreadPool(ThreadPool const &) = delete;
ThreadPool(ThreadPool&&) = default;
ThreadPool&operator=(ThreadPool const &) = delete;
ThreadPool&operator=(ThreadPool&&) = default;
template
auto Push(Func &&fn, Args &&...args)
{
using return_type = typename std::result_of::type;
auto task{ std::make_shared>(
std::bind(std::forward(fn), std::forward(args)...)
) };
auto future{ task->get_future() };
{
std::lock_guard lock(_Mutex);
_Tasks.emplace([task]()
{
(*task)();
});
}
_WorkerNotifier.notify_one();
return future;
}
// Remove all pending tasks from the queue
void Clear()
{
std::lock_guard lock(_Mutex);
_Tasks = {};
}
// Stops all previous and creates new worker threads.
void Start(std::size_t worker_count = std::thread::hardware_concurrency())
{
Join();
_StopWorkers = false;
for (std::size_t i = 0; i < worker_count; ++i)
_Workers.emplace_back(std::bind(&ThreadPool::_WorkerLoop, this));
}
// Wait all workers to finish
void Join()
{
_StopWorkers = true;
_WorkerNotifier.notify_all();
for (auto &thread : _Workers)
{
if (thread.joinable())
thread.join();
}
_Workers.clear();
}
std::size_t WorkerCount() const
{
return _Workers.size();
}
// Get the number of active workers
std::size_t ActiveCount() const
{
return _ActiveCount;
}
private:
void _WorkerLoop()
{
while (true)
{
auto task{ _NextTask() };
if (task)
{
++_ActiveCount;
task();
--_ActiveCount;
}
else if (_StopWorkers)
{
break;
}
}
}
task_t _NextTask()
{
std::unique_lock lock{ _Mutex };
_WorkerNotifier.wait(lock, [this]() { return !_Tasks.empty() || _StopWorkers; });
if (_Tasks.empty())
return {};
auto task{ _Tasks.front() };
_Tasks.pop();
return task;
}
};
}