• Docs >
  • Program Listing for File static_stealable_dequeue.hpp
Shortcuts

Program Listing for File static_stealable_dequeue.hpp

Return to documentation for file (include/ripple/container/static_stealable_dequeue.hpp)

#ifndef RIPPLE_CONTAINER_STATIC_STEALABLE_DEQUEUE_HPP
#define RIPPLE_CONTAINER_STATIC_STEALABLE_DEQUEUE_HPP

#include <array>
#include <atomic>
#include <optional>

namespace ripple {

template <typename Type, uint32_t MaxElements>
class alignas(avoid_false_sharing_size) StaticStealableDequeue {
 public:
  // clang-format off
  using Size      = uint32_t;
  using Atomic    = std::atomic<Size>;
  using Element   = Type;
  using Optional  = std::optional<Element>;
  using Container = std::vector<Element>;
  // clang-format on

 private:
  template <typename... Args>
  using pointer_enable_t =
    std::enable_if_t<(std::is_pointer_v<Args> * ... * true), int>;

  template <typename... Args>
  using non_pointer_enable_t =
    std::enable_if_t<(!std::is_pointer_v<Args> * ... * true), int>;

 public:
  StaticStealableDequeue() = default;

  StaticStealableDequeue(StaticStealableDequeue&& other) noexcept
  : elements_{ripple_move(other.elements_)},
    top_{other.top_.load()},
    bottom_{other.bottom_.load()} {}

  StaticStealableDequeue(const StaticStealableDequeue&) = delete;

  /*==--- [interface] ------------------------------------------------------==*/

  template <typename T, pointer_enable_t<T> = 0>
  auto push(T ptr) noexcept -> void {
    const auto bottom = bottom_.load(std::memory_order_relaxed);
    const auto index  = wrapped_index(bottom);

    elements_[index] = ptr;

    // Ensure that the compiler does not reorder this instruction and the
    // setting of the object above, otherwise the object seems added (since the
    // bottom index has moved) but isn't (since it would not have been added).
    bottom_.store(bottom + 1, std::memory_order_release);
  }

  template <typename... Args, non_pointer_enable_t<Args...> = 0>
  auto push(Args&&... args) noexcept -> void {
    const auto bottom = bottom_.load(std::memory_order_relaxed);
    const auto index  = wrapped_index(bottom);

    new (&elements_[index]) Element(ripple_forward(args)...);

    // Ensure that the compiler does not reorder this instruction and the
    // setting of the object above, otherwise the object seems added (since the
    // bottom index has moved) but isn't (since it would not have been added).
    bottom_.store(bottom + 1, std::memory_order_release);
  }

  template <typename... Args>
  auto try_push(Args&&... args) noexcept -> bool {
    if (size() >= MaxElements) {
      return false;
    }

    push(ripple_forward(args)...);
    return true;
  }

  auto pop() noexcept -> Optional {
    auto bottom = bottom_.load(std::memory_order_relaxed) - 1;

    // Sequentially consistant memory ordering is used here to ensure that the
    // load to top always happens after the load to bottom above, and that the
    // compiler emits an __mfence__ instruction.
    bottom_.store(bottom, std::memory_order_seq_cst);

    auto top = top_.load(std::memory_order_relaxed);
    if (top > bottom) {
      bottom_.store(top, std::memory_order_relaxed);
      return std::nullopt;
    }

    auto object = std::make_optional(elements_[wrapped_index(bottom)]);
    if (top != bottom) {
      return object;
    }

    // If we are here there is only one element left in the queue, and there may
    // be a race between this method and steal() to get it. If this exchange is
    // true then this method won the race (or was not in one) and then the
    // object can be returned, otherwise the queue has already been emptied.
    bool exchanged =
      top_.compare_exchange_strong(top, top + 1, std::memory_order_release);

    // This is also a little tricky: If we lost the race, top will be changed to
    // the new value set by the stealing thread (i.e it's already incremented).
    // If it's incremented again then bottom_ > top_ when the last item was
    // actually just cleared. This is also the unlikely case -- since this path
    // is only executed when there is contention on the last element -- so the
    // const of the branch is acceptable.
    bottom_.store(top + (exchanged ? 1 : 0), std::memory_order_relaxed);

    return exchanged ? object : std::nullopt;
  }

  auto steal() noexcept -> Optional {
    auto top = top_.load(std::memory_order_relaxed);

    // Top must always be set before bottom, so that bottom - top represents an
    // accurate enough (to prevent error) view of the queue size. Loads to
    // different address aren't reordered (i.e load load barrier)
    asm volatile("" ::: "memory");

    auto bottom = bottom_.load(std::memory_order_relaxed);
    if (top >= bottom) {
      return std::nullopt;
    }

    // Here the object at top is fetched, and and update to top_ is atempted,
    // since the top element is being stolen. __If__ the exchange succeeds,
    // then this method won a race with another thread to steal the element,
    // or if there is only a single element left, then it won a race between
    // other threads and the pop() method (potentially), or there was no race.
    // In summary, if the exchange succeeds, the object can be returned,
    // otherwise it can't.
    //
    // Also note that the object __must__ be created before the exchange to top,
    // otherwise there will potentially be a race to construct the object and
    // between a thread pushing onto the queue.
    //
    // This introduces overhead when there is contention to steal, and the steal
    // is unsuccessful, but in the succesful case there is no overhead.
    auto object = std::make_optional(elements_[wrapped_index(top)]);
    bool exchanged =
      top_.compare_exchange_strong(top, top + 1, std::memory_order_release);

    return exchanged ? object : std::nullopt;
  }

  auto size() const noexcept -> Size {
    return bottom_.load(std::memory_order_relaxed) -
           top_.load(std::memory_order_relaxed);
  }

 private:
  /* Note: The starting values are 1 since the element popping pops (bottom - 1)
   *       so if bottom = 0 to start, then (0 - 1) = size_t_max, and the pop
   *       function tries to access an out of range element. */

  Container elements_{MaxElements};
  Atomic    top_    = 1;
  Atomic    bottom_ = 1;

  auto wrapped_index(Size index) const noexcept -> Size {
    return index % MaxElements;
  }
};

} // namespace ripple

#endif // RIPPLE_CONTAINER_STATIC_STEALABLE_DEQUEUE_HPP

Docs

Access comprehensive developer documentation for Ripple

View Docs

Tutorials

Get tutorials to help with understand all features

View Tutorials

Examples

Find examples to help get started

View Examples