Program Listing for File static_stealable_dequeue.hpp¶
↰ Return to documentation for file (include/ripple/container/static_stealable_dequeue.hpp
)
#ifndef RIPPLE_CONTAINER_STATIC_STEALABLE_DEQUEUE_HPP
#define RIPPLE_CONTAINER_STATIC_STEALABLE_DEQUEUE_HPP
#include <array>
#include <atomic>
#include <optional>
namespace ripple {
template <typename Type, uint32_t MaxElements>
class alignas(avoid_false_sharing_size) StaticStealableDequeue {
public:
// clang-format off
using Size = uint32_t;
using Atomic = std::atomic<Size>;
using Element = Type;
using Optional = std::optional<Element>;
using Container = std::vector<Element>;
// clang-format on
private:
template <typename... Args>
using pointer_enable_t =
std::enable_if_t<(std::is_pointer_v<Args> * ... * true), int>;
template <typename... Args>
using non_pointer_enable_t =
std::enable_if_t<(!std::is_pointer_v<Args> * ... * true), int>;
public:
StaticStealableDequeue() = default;
StaticStealableDequeue(StaticStealableDequeue&& other) noexcept
: elements_{ripple_move(other.elements_)},
top_{other.top_.load()},
bottom_{other.bottom_.load()} {}
StaticStealableDequeue(const StaticStealableDequeue&) = delete;
/*==--- [interface] ------------------------------------------------------==*/
template <typename T, pointer_enable_t<T> = 0>
auto push(T ptr) noexcept -> void {
const auto bottom = bottom_.load(std::memory_order_relaxed);
const auto index = wrapped_index(bottom);
elements_[index] = ptr;
// Ensure that the compiler does not reorder this instruction and the
// setting of the object above, otherwise the object seems added (since the
// bottom index has moved) but isn't (since it would not have been added).
bottom_.store(bottom + 1, std::memory_order_release);
}
template <typename... Args, non_pointer_enable_t<Args...> = 0>
auto push(Args&&... args) noexcept -> void {
const auto bottom = bottom_.load(std::memory_order_relaxed);
const auto index = wrapped_index(bottom);
new (&elements_[index]) Element(ripple_forward(args)...);
// Ensure that the compiler does not reorder this instruction and the
// setting of the object above, otherwise the object seems added (since the
// bottom index has moved) but isn't (since it would not have been added).
bottom_.store(bottom + 1, std::memory_order_release);
}
template <typename... Args>
auto try_push(Args&&... args) noexcept -> bool {
if (size() >= MaxElements) {
return false;
}
push(ripple_forward(args)...);
return true;
}
auto pop() noexcept -> Optional {
auto bottom = bottom_.load(std::memory_order_relaxed) - 1;
// Sequentially consistant memory ordering is used here to ensure that the
// load to top always happens after the load to bottom above, and that the
// compiler emits an __mfence__ instruction.
bottom_.store(bottom, std::memory_order_seq_cst);
auto top = top_.load(std::memory_order_relaxed);
if (top > bottom) {
bottom_.store(top, std::memory_order_relaxed);
return std::nullopt;
}
auto object = std::make_optional(elements_[wrapped_index(bottom)]);
if (top != bottom) {
return object;
}
// If we are here there is only one element left in the queue, and there may
// be a race between this method and steal() to get it. If this exchange is
// true then this method won the race (or was not in one) and then the
// object can be returned, otherwise the queue has already been emptied.
bool exchanged =
top_.compare_exchange_strong(top, top + 1, std::memory_order_release);
// This is also a little tricky: If we lost the race, top will be changed to
// the new value set by the stealing thread (i.e it's already incremented).
// If it's incremented again then bottom_ > top_ when the last item was
// actually just cleared. This is also the unlikely case -- since this path
// is only executed when there is contention on the last element -- so the
// const of the branch is acceptable.
bottom_.store(top + (exchanged ? 1 : 0), std::memory_order_relaxed);
return exchanged ? object : std::nullopt;
}
auto steal() noexcept -> Optional {
auto top = top_.load(std::memory_order_relaxed);
// Top must always be set before bottom, so that bottom - top represents an
// accurate enough (to prevent error) view of the queue size. Loads to
// different address aren't reordered (i.e load load barrier)
asm volatile("" ::: "memory");
auto bottom = bottom_.load(std::memory_order_relaxed);
if (top >= bottom) {
return std::nullopt;
}
// Here the object at top is fetched, and and update to top_ is atempted,
// since the top element is being stolen. __If__ the exchange succeeds,
// then this method won a race with another thread to steal the element,
// or if there is only a single element left, then it won a race between
// other threads and the pop() method (potentially), or there was no race.
// In summary, if the exchange succeeds, the object can be returned,
// otherwise it can't.
//
// Also note that the object __must__ be created before the exchange to top,
// otherwise there will potentially be a race to construct the object and
// between a thread pushing onto the queue.
//
// This introduces overhead when there is contention to steal, and the steal
// is unsuccessful, but in the succesful case there is no overhead.
auto object = std::make_optional(elements_[wrapped_index(top)]);
bool exchanged =
top_.compare_exchange_strong(top, top + 1, std::memory_order_release);
return exchanged ? object : std::nullopt;
}
auto size() const noexcept -> Size {
return bottom_.load(std::memory_order_relaxed) -
top_.load(std::memory_order_relaxed);
}
private:
/* Note: The starting values are 1 since the element popping pops (bottom - 1)
* so if bottom = 0 to start, then (0 - 1) = size_t_max, and the pop
* function tries to access an out of range element. */
Container elements_{MaxElements};
Atomic top_ = 1;
Atomic bottom_ = 1;
auto wrapped_index(Size index) const noexcept -> Size {
return index % MaxElements;
}
};
} // namespace ripple
#endif // RIPPLE_CONTAINER_STATIC_STEALABLE_DEQUEUE_HPP