496 lines
15 KiB
C
496 lines
15 KiB
C
|
/*
|
||
|
* Copyright © 2016 Mozilla Foundation
|
||
|
*
|
||
|
* This program is made available under an ISC-style license. See the
|
||
|
* accompanying file LICENSE for details.
|
||
|
*/
|
||
|
|
||
|
#ifndef CUBEB_RING_BUFFER_H
|
||
|
#define CUBEB_RING_BUFFER_H
|
||
|
|
||
|
#include "cubeb_utils.h"
|
||
|
#include <algorithm>
|
||
|
#include <atomic>
|
||
|
#include <cstdint>
|
||
|
#include <memory>
|
||
|
#include <thread>
|
||
|
|
||
|
/**
|
||
|
* Single producer single consumer lock-free and wait-free ring buffer.
|
||
|
*
|
||
|
* This data structure allows producing data from one thread, and consuming it on
|
||
|
* another thread, safely and without explicit synchronization. If used on two
|
||
|
* threads, this data structure uses atomics for thread safety. It is possible
|
||
|
* to disable the use of atomics at compile time and only use this data
|
||
|
* structure on one thread.
|
||
|
*
|
||
|
* The role for the producer and the consumer must be constant, i.e., the
|
||
|
* producer should always be on one thread and the consumer should always be on
|
||
|
* another thread.
|
||
|
*
|
||
|
* Some words about the inner workings of this class:
|
||
|
* - Capacity is fixed. Only one allocation is performed, in the constructor.
|
||
|
* When reading and writing, the return value of the method allows checking if
|
||
|
* the ring buffer is empty or full.
|
||
|
* - We always keep the read index at least one element ahead of the write
|
||
|
* index, so we can distinguish between an empty and a full ring buffer: an
|
||
|
* empty ring buffer is when the write index is at the same position as the
|
||
|
* read index. A full buffer is when the write index is exactly one position
|
||
|
* before the read index.
|
||
|
* - We synchronize updates to the read index after having read the data, and
|
||
|
* the write index after having written the data. This means that the each
|
||
|
* thread can only touch a portion of the buffer that is not touched by the
|
||
|
* other thread.
|
||
|
* - Callers are expected to provide buffers. When writing to the queue,
|
||
|
* elements are copied into the internal storage from the buffer passed in.
|
||
|
* When reading from the queue, the user is expected to provide a buffer.
|
||
|
* Because this is a ring buffer, data might not be contiguous in memory,
|
||
|
* providing an external buffer to copy into is an easy way to have linear
|
||
|
* data for further processing.
|
||
|
*/
|
||
|
template <typename T>
|
||
|
class ring_buffer_base
|
||
|
{
|
||
|
public:
|
||
|
/**
|
||
|
* Constructor for a ring buffer.
|
||
|
*
|
||
|
* This performs an allocation, but is the only allocation that will happen
|
||
|
* for the life time of a `ring_buffer_base`.
|
||
|
*
|
||
|
* @param capacity The maximum number of element this ring buffer will hold.
|
||
|
*/
|
||
|
ring_buffer_base(int capacity)
|
||
|
/* One more element to distinguish from empty and full buffer. */
|
||
|
: capacity_(capacity + 1)
|
||
|
{
|
||
|
assert(storage_capacity() <
|
||
|
std::numeric_limits<int>::max() / 2 &&
|
||
|
"buffer too large for the type of index used.");
|
||
|
assert(capacity_ > 0);
|
||
|
|
||
|
data_.reset(new T[storage_capacity()]);
|
||
|
/* If this queue is using atomics, initializing those members as the last
|
||
|
* action in the constructor acts as a full barrier, and allow capacity() to
|
||
|
* be thread-safe. */
|
||
|
write_index_ = 0;
|
||
|
read_index_ = 0;
|
||
|
}
|
||
|
/**
|
||
|
* Push `count` zero or default constructed elements in the array.
|
||
|
*
|
||
|
* Only safely called on the producer thread.
|
||
|
*
|
||
|
* @param count The number of elements to enqueue.
|
||
|
* @return The number of element enqueued.
|
||
|
*/
|
||
|
int enqueue_default(int count)
|
||
|
{
|
||
|
return enqueue(nullptr, count);
|
||
|
}
|
||
|
/**
|
||
|
* @brief Put an element in the queue
|
||
|
*
|
||
|
* Only safely called on the producer thread.
|
||
|
*
|
||
|
* @param element The element to put in the queue.
|
||
|
*
|
||
|
* @return 1 if the element was inserted, 0 otherwise.
|
||
|
*/
|
||
|
int enqueue(T& element)
|
||
|
{
|
||
|
return enqueue(&element, 1);
|
||
|
}
|
||
|
/**
|
||
|
* Push `count` elements in the ring buffer.
|
||
|
*
|
||
|
* Only safely called on the producer thread.
|
||
|
*
|
||
|
* @param elements a pointer to a buffer containing at least `count` elements.
|
||
|
* If `elements` is nullptr, zero or default constructed elements are enqueued.
|
||
|
* @param count The number of elements to read from `elements`
|
||
|
* @return The number of elements successfully coped from `elements` and inserted
|
||
|
* into the ring buffer.
|
||
|
*/
|
||
|
int enqueue(T * elements, int count)
|
||
|
{
|
||
|
#ifndef NDEBUG
|
||
|
assert_correct_thread(producer_id);
|
||
|
#endif
|
||
|
|
||
|
int rd_idx = read_index_.load(std::memory_order::memory_order_relaxed);
|
||
|
int wr_idx = write_index_.load(std::memory_order::memory_order_relaxed);
|
||
|
|
||
|
if (full_internal(rd_idx, wr_idx)) {
|
||
|
return 0;
|
||
|
}
|
||
|
|
||
|
int to_write =
|
||
|
std::min(available_write_internal(rd_idx, wr_idx), count);
|
||
|
|
||
|
/* First part, from the write index to the end of the array. */
|
||
|
int first_part = std::min(storage_capacity() - wr_idx,
|
||
|
to_write);
|
||
|
/* Second part, from the beginning of the array */
|
||
|
int second_part = to_write - first_part;
|
||
|
|
||
|
if (elements) {
|
||
|
Copy(data_.get() + wr_idx, elements, first_part);
|
||
|
Copy(data_.get(), elements + first_part, second_part);
|
||
|
} else {
|
||
|
ConstructDefault(data_.get() + wr_idx, first_part);
|
||
|
ConstructDefault(data_.get(), second_part);
|
||
|
}
|
||
|
|
||
|
write_index_.store(increment_index(wr_idx, to_write), std::memory_order::memory_order_release);
|
||
|
|
||
|
return to_write;
|
||
|
}
|
||
|
/**
|
||
|
* Retrieve at most `count` elements from the ring buffer, and copy them to
|
||
|
* `elements`, if non-null.
|
||
|
*
|
||
|
* Only safely called on the consumer side.
|
||
|
*
|
||
|
* @param elements A pointer to a buffer with space for at least `count`
|
||
|
* elements. If `elements` is `nullptr`, `count` element will be discarded.
|
||
|
* @param count The maximum number of elements to dequeue.
|
||
|
* @return The number of elements written to `elements`.
|
||
|
*/
|
||
|
int dequeue(T * elements, int count)
|
||
|
{
|
||
|
#ifndef NDEBUG
|
||
|
assert_correct_thread(consumer_id);
|
||
|
#endif
|
||
|
|
||
|
int wr_idx = write_index_.load(std::memory_order::memory_order_acquire);
|
||
|
int rd_idx = read_index_.load(std::memory_order::memory_order_relaxed);
|
||
|
|
||
|
if (empty_internal(rd_idx, wr_idx)) {
|
||
|
return 0;
|
||
|
}
|
||
|
|
||
|
int to_read =
|
||
|
std::min(available_read_internal(rd_idx, wr_idx), count);
|
||
|
|
||
|
int first_part = std::min(storage_capacity() - rd_idx, to_read);
|
||
|
int second_part = to_read - first_part;
|
||
|
|
||
|
if (elements) {
|
||
|
Copy(elements, data_.get() + rd_idx, first_part);
|
||
|
Copy(elements + first_part, data_.get(), second_part);
|
||
|
}
|
||
|
|
||
|
read_index_.store(increment_index(rd_idx, to_read), std::memory_order::memory_order_relaxed);
|
||
|
|
||
|
return to_read;
|
||
|
}
|
||
|
/**
|
||
|
* Get the number of available element for consuming.
|
||
|
*
|
||
|
* Only safely called on the consumer thread.
|
||
|
*
|
||
|
* @return The number of available elements for reading.
|
||
|
*/
|
||
|
int available_read() const
|
||
|
{
|
||
|
#ifndef NDEBUG
|
||
|
assert_correct_thread(consumer_id);
|
||
|
#endif
|
||
|
return available_read_internal(read_index_.load(std::memory_order::memory_order_relaxed),
|
||
|
write_index_.load(std::memory_order::memory_order_relaxed));
|
||
|
}
|
||
|
/**
|
||
|
* Get the number of available elements for consuming.
|
||
|
*
|
||
|
* Only safely called on the producer thread.
|
||
|
*
|
||
|
* @return The number of empty slots in the buffer, available for writing.
|
||
|
*/
|
||
|
int available_write() const
|
||
|
{
|
||
|
#ifndef NDEBUG
|
||
|
assert_correct_thread(producer_id);
|
||
|
#endif
|
||
|
return available_write_internal(read_index_.load(std::memory_order::memory_order_relaxed),
|
||
|
write_index_.load(std::memory_order::memory_order_relaxed));
|
||
|
}
|
||
|
/**
|
||
|
* Get the total capacity, for this ring buffer.
|
||
|
*
|
||
|
* Can be called safely on any thread.
|
||
|
*
|
||
|
* @return The maximum capacity of this ring buffer.
|
||
|
*/
|
||
|
int capacity() const
|
||
|
{
|
||
|
return storage_capacity() - 1;
|
||
|
}
|
||
|
/**
|
||
|
* Reset the consumer and producer thread identifier, in case the thread are
|
||
|
* being changed. This has to be externally synchronized. This is no-op when
|
||
|
* asserts are disabled.
|
||
|
*/
|
||
|
void reset_thread_ids()
|
||
|
{
|
||
|
#ifndef NDEBUG
|
||
|
consumer_id = producer_id = std::thread::id();
|
||
|
#endif
|
||
|
}
|
||
|
private:
|
||
|
/** Return true if the ring buffer is empty.
|
||
|
*
|
||
|
* @param read_index the read index to consider
|
||
|
* @param write_index the write index to consider
|
||
|
* @return true if the ring buffer is empty, false otherwise.
|
||
|
**/
|
||
|
bool empty_internal(int read_index,
|
||
|
int write_index) const
|
||
|
{
|
||
|
return write_index == read_index;
|
||
|
}
|
||
|
/** Return true if the ring buffer is full.
|
||
|
*
|
||
|
* This happens if the write index is exactly one element behind the read
|
||
|
* index.
|
||
|
*
|
||
|
* @param read_index the read index to consider
|
||
|
* @param write_index the write index to consider
|
||
|
* @return true if the ring buffer is full, false otherwise.
|
||
|
**/
|
||
|
bool full_internal(int read_index,
|
||
|
int write_index) const
|
||
|
{
|
||
|
return (write_index + 1) % storage_capacity() == read_index;
|
||
|
}
|
||
|
/**
|
||
|
* Return the size of the storage. It is one more than the number of elements
|
||
|
* that can be stored in the buffer.
|
||
|
*
|
||
|
* @return the number of elements that can be stored in the buffer.
|
||
|
*/
|
||
|
int storage_capacity() const
|
||
|
{
|
||
|
return capacity_;
|
||
|
}
|
||
|
/**
|
||
|
* Returns the number of elements available for reading.
|
||
|
*
|
||
|
* @return the number of available elements for reading.
|
||
|
*/
|
||
|
int
|
||
|
available_read_internal(int read_index,
|
||
|
int write_index) const
|
||
|
{
|
||
|
if (write_index >= read_index) {
|
||
|
return write_index - read_index;
|
||
|
} else {
|
||
|
return write_index + storage_capacity() - read_index;
|
||
|
}
|
||
|
}
|
||
|
/**
|
||
|
* Returns the number of empty elements, available for writing.
|
||
|
*
|
||
|
* @return the number of elements that can be written into the array.
|
||
|
*/
|
||
|
int
|
||
|
available_write_internal(int read_index,
|
||
|
int write_index) const
|
||
|
{
|
||
|
/* We substract one element here to always keep at least one sample
|
||
|
* free in the buffer, to distinguish between full and empty array. */
|
||
|
int rv = read_index - write_index - 1;
|
||
|
if (write_index >= read_index) {
|
||
|
rv += storage_capacity();
|
||
|
}
|
||
|
return rv;
|
||
|
}
|
||
|
/**
|
||
|
* Increments an index, wrapping it around the storage.
|
||
|
*
|
||
|
* @param index a reference to the index to increment.
|
||
|
* @param increment the number by which `index` is incremented.
|
||
|
* @return the new index.
|
||
|
*/
|
||
|
int
|
||
|
increment_index(int index, int increment) const
|
||
|
{
|
||
|
assert(increment >= 0);
|
||
|
return (index + increment) % storage_capacity();
|
||
|
}
|
||
|
/**
|
||
|
* @brief This allows checking that enqueue (resp. dequeue) are always called
|
||
|
* by the right thread.
|
||
|
*
|
||
|
* @param id the id of the thread that has called the calling method first.
|
||
|
*/
|
||
|
#ifndef NDEBUG
|
||
|
static void assert_correct_thread(std::thread::id& id)
|
||
|
{
|
||
|
if (id == std::thread::id()) {
|
||
|
id = std::this_thread::get_id();
|
||
|
return;
|
||
|
}
|
||
|
assert(id == std::this_thread::get_id());
|
||
|
}
|
||
|
#endif
|
||
|
/** Index at which the oldest element is at, in samples. */
|
||
|
std::atomic<int> read_index_;
|
||
|
/** Index at which to write new elements. `write_index` is always at
|
||
|
* least one element ahead of `read_index_`. */
|
||
|
std::atomic<int> write_index_;
|
||
|
/** Maximum number of elements that can be stored in the ring buffer. */
|
||
|
const int capacity_;
|
||
|
/** Data storage */
|
||
|
std::unique_ptr<T[]> data_;
|
||
|
#ifndef NDEBUG
|
||
|
/** The id of the only thread that is allowed to read from the queue. */
|
||
|
mutable std::thread::id consumer_id;
|
||
|
/** The id of the only thread that is allowed to write from the queue. */
|
||
|
mutable std::thread::id producer_id;
|
||
|
#endif
|
||
|
};
|
||
|
|
||
|
/**
|
||
|
* Adapter for `ring_buffer_base` that exposes an interface in frames.
|
||
|
*/
|
||
|
template <typename T>
|
||
|
class audio_ring_buffer_base
|
||
|
{
|
||
|
public:
|
||
|
/**
|
||
|
* @brief Constructor.
|
||
|
*
|
||
|
* @param channel_count Number of channels.
|
||
|
* @param capacity_in_frames The capacity in frames.
|
||
|
*/
|
||
|
audio_ring_buffer_base(int channel_count, int capacity_in_frames)
|
||
|
: channel_count(channel_count)
|
||
|
, ring_buffer(frames_to_samples(capacity_in_frames))
|
||
|
{
|
||
|
assert(channel_count > 0);
|
||
|
}
|
||
|
/**
|
||
|
* @brief Enqueue silence.
|
||
|
*
|
||
|
* Only safely called on the producer thread.
|
||
|
*
|
||
|
* @param frame_count The number of frames of silence to enqueue.
|
||
|
* @return The number of frames of silence actually written to the queue.
|
||
|
*/
|
||
|
int enqueue_default(int frame_count)
|
||
|
{
|
||
|
return samples_to_frames(ring_buffer.enqueue(nullptr, frames_to_samples(frame_count)));
|
||
|
}
|
||
|
/**
|
||
|
* @brief Enqueue `frames_count` frames of audio.
|
||
|
*
|
||
|
* Only safely called from the producer thread.
|
||
|
*
|
||
|
* @param [in] frames If non-null, the frames to enqueue.
|
||
|
* Otherwise, silent frames are enqueued.
|
||
|
* @param frame_count The number of frames to enqueue.
|
||
|
*
|
||
|
* @return The number of frames enqueued
|
||
|
*/
|
||
|
|
||
|
int enqueue(T * frames, int frame_count)
|
||
|
{
|
||
|
return samples_to_frames(ring_buffer.enqueue(frames, frames_to_samples(frame_count)));
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* @brief Removes `frame_count` frames from the buffer, and
|
||
|
* write them to `frames` if it is non-null.
|
||
|
*
|
||
|
* Only safely called on the consumer thread.
|
||
|
*
|
||
|
* @param frames If non-null, the frames are copied to `frames`.
|
||
|
* Otherwise, they are dropped.
|
||
|
* @param frame_count The number of frames to remove.
|
||
|
*
|
||
|
* @return The number of frames actually dequeud.
|
||
|
*/
|
||
|
int dequeue(T * frames, int frame_count)
|
||
|
{
|
||
|
return samples_to_frames(ring_buffer.dequeue(frames, frames_to_samples(frame_count)));
|
||
|
}
|
||
|
/**
|
||
|
* Get the number of available frames of audio for consuming.
|
||
|
*
|
||
|
* Only safely called on the consumer thread.
|
||
|
*
|
||
|
* @return The number of available frames of audio for reading.
|
||
|
*/
|
||
|
int available_read() const
|
||
|
{
|
||
|
return samples_to_frames(ring_buffer.available_read());
|
||
|
}
|
||
|
/**
|
||
|
* Get the number of available frames of audio for consuming.
|
||
|
*
|
||
|
* Only safely called on the producer thread.
|
||
|
*
|
||
|
* @return The number of empty slots in the buffer, available for writing.
|
||
|
*/
|
||
|
int available_write() const
|
||
|
{
|
||
|
return samples_to_frames(ring_buffer.available_write());
|
||
|
}
|
||
|
/**
|
||
|
* Get the total capacity, for this ring buffer.
|
||
|
*
|
||
|
* Can be called safely on any thread.
|
||
|
*
|
||
|
* @return The maximum capacity of this ring buffer.
|
||
|
*/
|
||
|
int capacity() const
|
||
|
{
|
||
|
return samples_to_frames(ring_buffer.capacity());
|
||
|
}
|
||
|
private:
|
||
|
/**
|
||
|
* @brief Frames to samples conversion.
|
||
|
*
|
||
|
* @param frames The number of frames.
|
||
|
*
|
||
|
* @return A number of samples.
|
||
|
*/
|
||
|
int frames_to_samples(int frames) const
|
||
|
{
|
||
|
return frames * channel_count;
|
||
|
}
|
||
|
/**
|
||
|
* @brief Samples to frames conversion.
|
||
|
*
|
||
|
* @param samples The number of samples.
|
||
|
*
|
||
|
* @return A number of frames.
|
||
|
*/
|
||
|
int samples_to_frames(int samples) const
|
||
|
{
|
||
|
return samples / channel_count;
|
||
|
}
|
||
|
/** Number of channels of audio that will stream through this ring buffer. */
|
||
|
int channel_count;
|
||
|
/** The underlying ring buffer that is used to store the data. */
|
||
|
ring_buffer_base<T> ring_buffer;
|
||
|
};
|
||
|
|
||
|
/**
|
||
|
* Lock-free instantiation of the `ring_buffer_base` type. This is safe to use
|
||
|
* from two threads, one producer, one consumer (that never change role),
|
||
|
* without explicit synchronization.
|
||
|
*/
|
||
|
template<typename T>
|
||
|
using lock_free_queue = ring_buffer_base<T>;
|
||
|
/**
|
||
|
* Lock-free instantiation of the `audio_ring_buffer` type. This is safe to use
|
||
|
* from two threads, one producer, one consumer (that never change role),
|
||
|
* without explicit synchronization.
|
||
|
*/
|
||
|
template<typename T>
|
||
|
using lock_free_audio_ring_buffer = audio_ring_buffer_base<T>;
|
||
|
|
||
|
#endif // CUBEB_RING_BUFFER_H
|