pineapple/src/common/threadsafe_queue.h

246 lines
6.6 KiB
C
Raw Normal View History

2020-12-28 16:15:37 +01:00
// Copyright 2010 Dolphin Emulator Project
// Licensed under GPLv2+
// Refer to the license.txt file included.
#pragma once
#include <atomic>
#include <condition_variable>
2021-08-16 13:42:12 +02:00
#include <iostream>
2020-12-28 16:15:37 +01:00
#include <mutex>
2021-08-16 13:42:12 +02:00
#include <optional>
2020-12-28 16:15:37 +01:00
namespace Common {
2021-08-16 13:42:12 +02:00
/// a more foolproof multiple reader, multiple writer queue
2020-12-28 16:15:37 +01:00
template <typename T>
2021-08-16 13:42:12 +02:00
class MPMCQueue {
#define ABORT() \
do { \
std::cerr << __FILE__ " ERR " << __LINE__ << std::endl; \
abort(); \
} while (0)
2020-12-28 16:15:37 +01:00
public:
2021-08-16 13:42:12 +02:00
~MPMCQueue() {
Clear();
if (waiting || head || tail) {
// Remove all the ABORT() after 1 month merged without problems
ABORT();
}
2020-12-28 16:15:37 +01:00
}
template <typename Arg>
void Push(Arg&& t) {
2021-08-16 13:42:12 +02:00
Node* const node = new Node(std::forward<Arg>(t));
if (!node || node == PLACEHOLDER) {
ABORT();
}
while (true) {
if (Node* const previous = tail.load(ACQUIRE)) {
if (Node* exchange = nullptr;
!previous->next.compare_exchange_weak(exchange, node, ACQ_REL)) {
continue;
}
if (tail.exchange(node, ACQ_REL) != previous) {
ABORT();
}
} else {
if (Node* exchange = nullptr;
!tail.compare_exchange_weak(exchange, node, ACQ_REL)) {
continue;
}
for (Node* exchange = nullptr;
!head.compare_exchange_weak(exchange, node, ACQ_REL);)
;
}
break;
}
if (waiting.load(ACQUIRE)) {
std::lock_guard lock{mutex};
condition.notify_one();
}
2020-12-28 16:15:37 +01:00
}
bool Pop(T& t) {
2021-08-16 13:42:12 +02:00
return PopImpl<false>(t);
2021-04-07 20:28:12 +02:00
}
T PopWait() {
2020-12-28 16:15:37 +01:00
T t;
2021-08-16 13:42:12 +02:00
if (!PopImpl<true>(t)) {
ABORT();
}
2020-12-28 16:15:37 +01:00
return t;
}
2021-08-16 13:42:12 +02:00
void Wait() {
if (head.load(ACQUIRE)) {
return;
}
static_cast<void>(waiting.fetch_add(1, ACQ_REL));
std::unique_lock lock{mutex};
while (true) {
if (head.load(ACQUIRE)) {
break;
}
condition.wait(lock);
}
if (!waiting.fetch_sub(1, ACQ_REL)) {
ABORT();
}
}
2020-12-28 16:15:37 +01:00
void Clear() {
2021-08-16 13:42:12 +02:00
while (true) {
Node* const last = tail.load(ACQUIRE);
if (!last) {
return;
}
if (Node* exchange = nullptr;
!last->next.compare_exchange_weak(exchange, PLACEHOLDER, ACQ_REL)) {
continue;
}
if (tail.exchange(nullptr, ACQ_REL) != last) {
ABORT();
}
Node* node = head.exchange(nullptr, ACQ_REL);
while (node && node != PLACEHOLDER) {
Node* next = node->next.load(ACQUIRE);
delete node;
node = next;
}
return;
}
2020-12-28 16:15:37 +01:00
}
private:
2021-08-16 13:42:12 +02:00
template <bool WAIT>
bool PopImpl(T& t) {
std::optional<std::unique_lock<std::mutex>> lock{std::nullopt};
while (true) {
Node* const node = head.load(ACQUIRE);
if (!node) {
if constexpr (!WAIT) {
return false;
}
if (!lock) {
static_cast<void>(waiting.fetch_add(1, ACQ_REL));
lock = std::unique_lock{mutex};
continue;
}
condition.wait(*lock);
continue;
}
Node* const next = node->next.load(ACQUIRE);
if (next) {
if (next == PLACEHOLDER) {
continue;
}
if (Node* exchange = node; !head.compare_exchange_weak(exchange, next, ACQ_REL)) {
continue;
}
} else {
if (Node* exchange = nullptr;
!node->next.compare_exchange_weak(exchange, PLACEHOLDER, ACQ_REL)) {
continue;
}
if (tail.exchange(nullptr, ACQ_REL) != node) {
ABORT();
}
if (head.exchange(nullptr, ACQ_REL) != node) {
ABORT();
}
}
t = std::move(node->value);
delete node;
if (lock) {
if (!waiting.fetch_sub(1, ACQ_REL)) {
ABORT();
}
}
return true;
2020-12-28 16:15:37 +01:00
}
2021-08-16 13:42:12 +02:00
}
struct Node {
template <typename Arg>
explicit Node(Arg&& t) : value{std::forward<Arg>(t)} {}
Node(const Node&) = delete;
Node& operator=(const Node&) = delete;
2020-12-28 16:15:37 +01:00
2021-08-16 13:42:12 +02:00
Node(Node&&) = delete;
Node& operator=(Node&&) = delete;
const T value;
std::atomic<Node*> next{nullptr};
2020-12-28 16:15:37 +01:00
};
2021-08-16 13:42:12 +02:00
// We only need to avoid SEQ_CST on X86
// We can add RELAXED later if we port to ARM and it's too slow
static constexpr auto ACQUIRE = std::memory_order_acquire;
static constexpr auto ACQ_REL = std::memory_order_acq_rel;
static inline const auto PLACEHOLDER = reinterpret_cast<Node*>(1);
2020-12-28 16:15:37 +01:00
2021-08-16 13:42:12 +02:00
std::atomic<Node*> head{nullptr};
std::atomic<Node*> tail{nullptr};
std::atomic_size_t waiting{0};
std::condition_variable condition{};
std::mutex mutex{};
#undef ABORT
};
2020-12-28 16:15:37 +01:00
2021-08-16 13:42:12 +02:00
/// a simple lockless thread-safe,
/// single reader, single writer queue
2020-12-28 16:15:37 +01:00
template <typename T>
2021-08-16 13:42:12 +02:00
class /*[[deprecated("Transition to MPMCQueue")]]*/ SPSCQueue {
2020-12-28 16:15:37 +01:00
public:
2021-08-16 13:42:12 +02:00
template <typename Arg>
void Push(Arg&& t) {
queue.Push(std::forward<Arg>(t));
2020-12-28 16:15:37 +01:00
}
2021-08-16 13:42:12 +02:00
bool Pop(T& t) {
return queue.Pop(t);
2020-12-28 16:15:37 +01:00
}
2021-08-16 13:42:12 +02:00
void Wait() {
queue.Wait();
2020-12-28 16:15:37 +01:00
}
2021-08-16 13:42:12 +02:00
T PopWait() {
return queue.PopWait();
2020-12-28 16:15:37 +01:00
}
2021-08-16 13:42:12 +02:00
void Clear() {
queue.Clear();
2020-12-28 16:15:37 +01:00
}
2021-08-16 13:42:12 +02:00
private:
MPMCQueue<T> queue{};
};
2020-12-28 16:15:37 +01:00
2021-08-16 13:42:12 +02:00
/// a simple thread-safe,
/// single reader, multiple writer queue
template <typename T>
class /*[[deprecated("Transition to MPMCQueue")]]*/ MPSCQueue {
public:
template <typename Arg>
void Push(Arg&& t) {
queue.Push(std::forward<Arg>(t));
2021-04-07 20:28:12 +02:00
}
2021-08-16 13:42:12 +02:00
bool Pop(T& t) {
return queue.Pop(t);
2020-12-28 16:15:37 +01:00
}
2021-08-16 13:42:12 +02:00
T PopWait() {
return queue.PopWait();
2020-12-28 16:15:37 +01:00
}
private:
2021-08-16 13:42:12 +02:00
MPMCQueue<T> queue{};
2020-12-28 16:15:37 +01:00
};
} // namespace Common