• Docs >
  • Program Listing for File graph.hpp
Shortcuts

Program Listing for File graph.hpp

Return to documentation for file (include/ripple/graph/graph.hpp)

#ifndef RIPPLE_GRAPH_GRAPH_HPP
#define RIPPLE_GRAPH_GRAPH_HPP

#include "memcopy.hpp"
#include "node.hpp"
#include "splitter.hpp"
#include "reducer.hpp"
#include <ripple/algorithm/unrolled_for.hpp>
#include <ripple/allocation/allocator.hpp>
#include <ripple/arch/cache.hpp>
#include <ripple/utility/spinlock.hpp>
#include <ripple/utility/type_traits.hpp>
#include <memory>
#include <mutex>
#include <optional>
#include <unordered_map>
#include <unordered_set>

namespace ripple {

/*==--- [forward declarations] ---------------------------------------------==*/

/* Forward declaration of the Graph class. */
class Graph;

/* Forward declaration of the executor for the graph. */
class Executor;

/* Forward declaration of the execution function for a graph. */
auto execute(Graph& graph) noexcept -> void;

class Graph {
  friend class Executor;
  friend struct Splitter;
  friend struct Memcopy;

  // clang-format off
  static constexpr size_t node_alignment   = avoid_false_sharing_size;
  static constexpr size_t node_min_storage = 2 * 72;

  using ArenaType     = HeapArena;
  using NodeType       = Node<node_alignment, node_min_storage>;
  using NodeAllocator = ThreadSafeObjectPoolAllocator<NodeType, ArenaType>;
  using InfoAllocator = ThreadSafeObjectPoolAllocator<NodeInfo, ArenaType>;
  using NodeContainer = std::vector<NodeType*>;
  using Connections   = std::vector<int>;
  using Lock          = Spinlock;
  using Guard         = std::lock_guard<Lock>;
  // clang-format on

  template <typename T>
  using non_node_enable_t =
    std::enable_if_t<!std::is_same_v<NodeType, std::decay_t<T>>, int>;

 public:
  // clang-format off
  static constexpr size_t default_nodes = 1024;
  static constexpr auto   default_id    = NodeInfo::default_id;
  // clang-format on

  /*==--- [construction] ---------------------------------------------------==*/

  Graph() = default;

  Graph(ExecutionKind exec_kind) noexcept : execution_{exec_kind} {}

  ~Graph() noexcept {
    reset();
  }

  Graph(Graph&& other) noexcept
  : nodes_{ripple_move(other.nodes_)},
    join_ids_{ripple_move(other.join_ids_)},
    split_ids_{ripple_move(other.split_ids_)},
    exec_count_{other.exec_count_},
    execution_{other.execution_} {
    other.exec_count_ = 0;
  }

  Graph(const Graph&) = delete;

  /*==--- [operator overloads] ---------------------------------------------==*/

  auto operator=(Graph&& other) noexcept -> Graph& {
    if (this != &other) {
      nodes_            = ripple_move(other.nodes_);
      join_ids_         = ripple_move(other.join_ids_);
      split_ids_        = ripple_move(other.split_ids_);
      exec_count_       = other.exec_count_;
      execution_        = other.execution_;
      other.exec_count_ = 0;
    }
    return *this;
  }

  auto operator=(const Graph& other) = delete;

  /*==--- [interface] ------------------------------------------------------==*/

  static auto set_allocation_pool_size(size_t nodes) noexcept -> bool;

  template <typename F, typename... Args>
  static auto make_node(F&& callable, Args&&... args) -> NodeType& {
    return *node_allocator().create<NodeType>(
      ripple_forward(callable), ripple_forward(args)...);
  }

  template <typename F, typename... Args>
  static auto make_node(F&& callable, ExecutionKind exec_kind, Args&&... args)
    -> NodeType& {
    NodeType& node = *node_allocator().create<NodeType>(
      ripple_forward(callable), ripple_forward(args)...);
    node.info_ = info_allocator().create<NodeInfo>(exec_kind);
    return node;
  }

  auto reset() noexcept -> void;

  auto clone() const noexcept -> Graph;

  auto size() const -> size_t {
    return nodes_.size();
  }

  auto allocation_pool_size() const noexcept -> size_t {
    return allocation_pool_nodes();
  }

  /*==--- [find] -----------------------------------------------------------==*/

  auto find(std::string name) noexcept -> std::optional<NodeType*>;

  auto find_last_of(std::string name) noexcept -> std::optional<NodeType*>;

  /*==--- [emplace] --------------------------------------------------------==*/

  template <typename F, typename... Args, non_node_enable_t<F> = 0>
  auto emplace(F&& callable, Args&&... args) -> Graph& {
    const auto no_name = std::string("");
    return emplace_named(
      no_name, ripple_forward(callable), ripple_forward(args)...);
  }

  template <typename F, typename... Args, non_node_enable_t<F> = 0>
  auto emplace_named(NodeInfo info, F&& callable, Args&&... args) -> Graph& {
    auto& node = *nodes_.emplace_back(node_allocator().create<NodeType>(
      ripple_forward(callable), ripple_forward(args)...));
    node.info_ = info_allocator().create<NodeInfo>(
      info.name, info.id, info.kind, info.exec);
    setup_node(node);
    return *this;
  }

  template <
    typename... Nodes,
    all_same_enable_t<NodeType, std::decay_t<Nodes>...> = 0>
  auto emplace(Nodes&&... nodes) -> Graph& {
    constexpr size_t node_count = sizeof...(Nodes);

    // Make sure that we have a tuple of __references__:
    auto node_tuple =
      std::tuple<std::decay_t<Nodes>&...>{ripple_forward(nodes)...};
    unrolled_for<node_count>([&](auto i) {
      setup_node(*nodes_.emplace_back(&std::get<i>(node_tuple)));
    });
    return *this;
  }

  auto emplace(Graph& graph) noexcept -> Graph& {
    connect(graph);
    return emplace([&graph] { execute(graph); });
  }

  /*==--- [sync] -----------------------------------------------------------==*/

  template <typename F, typename... Args, non_node_enable_t<F> = 0>
  auto sync(F&& callable, Args&&... args) -> Graph& {
    join_ids_.emplace_back(nodes_.size());
    NodeInfo info{NodeKind::sync, execution_};
    return emplace_named(
      info, ripple_forward(callable), ripple_forward(args)...);
  }

  /*
   * \note These synchronization functions need to be implemented here, to
   *       ensure that the cuda functions are called when device functionality
   *       is required.
   *
   *       If in a cpp file, then if compiled as c++ code and linked against
   *       a cuda executable, the cuda synchronization won't run.
   */

  template <typename F, typename... Args, non_node_enable_t<F> = 0>
  auto gpu_fence(F&& callable, Args&&... args) -> Graph& {
    join_ids_.emplace_back(nodes_.size());

    NodeInfo info{NodeKind::normal, ExecutionKind::gpu};
    for (const auto& gpu : topology().gpus) {
      emplace_named(info, [&gpu] { gpu.synchronize_streams(); });
    }

    join_ids_.emplace_back(nodes_.size());
    info.kind = NodeKind::sync;
    return emplace_named(
      info, ripple_forward(callable), ripple_forward(args)...);
  }

  auto gpu_barrier() -> Graph& {
    join_ids_.emplace_back(nodes_.size());

    NodeInfo info{NodeKind::normal, ExecutionKind::gpu};
    for (auto& gpu : topology().gpus) {
      gpu.prepare_barrier();
      emplace_named(info, [&gpu] { gpu.execute_barrier(); });
    }

    join_ids_.emplace_back(nodes_.size());
    info.kind = NodeKind::sync;
    return emplace_named(info, [] {
      size_t count = topology().num_gpus();
      while (count > 0) {
        for (const auto& gpu : topology().gpus) {
          if (gpu.is_barrier_down()) {
            count--;
          }
        }
      }
    });
  }

  /*==--- [then] -----------------------------------------------------------==*/

  template <typename F, typename... Args, non_node_enable_t<F> = 0>
  auto then(F&& callable, Args&&... args) -> Graph& {
    join_ids_.emplace_back(nodes_.size());
    return emplace(ripple_forward(callable), ripple_forward(args)...);
  }

  template <
    typename... Nodes,
    all_same_enable_t<NodeType, std::decay_t<Nodes>...> = 0>
  auto then(Nodes&&... nodes) -> Graph& {
    join_ids_.emplace_back(nodes_.size());
    return emplace(ripple_forward(nodes)...);
  }

  auto then(Graph& graph) noexcept -> Graph& {
    return then([&graph] { execute(graph); });
  }

  /*==--- [split] ----------------------------------------------------------==*/

  template <typename Pred, typename... Args>
  auto conditional(Pred&& pred, Args&&... args) -> Graph& {
    return sync(
      [this](auto&& predicate, auto&&... as) {
        if (predicate(ripple_forward(as)...)) {
          execute(*this);
        }
      },
      ripple_forward(pred),
      ripple_forward(args)...);
  }

  template <typename F, typename... Args>
  auto split(F&& callable, Args&&... args) noexcept -> Graph& {
    Splitter::split(
      *this, execution_, ripple_forward(callable), ripple_forward(args)...);
    return *this;
  }

  template <typename F, typename... Args>
  auto split(ExecutionKind exec_kind, F&& callable, Args&&... args) noexcept
    -> Graph& {
    Splitter::split(
      *this, exec_kind, ripple_forward(callable), ripple_forward(args)...);
    return *this;
  }

  template <typename F, typename... Args>
  auto then_split(F&& callable, Args&&... args) noexcept -> Graph& {
    join_ids_.emplace_back(nodes_.size());
    split_ids_.emplace_back(nodes_.size());
    Splitter::split(
      *this, execution_, ripple_forward(callable), ripple_forward(args)...);
    return *this;
  }

  template <typename F, typename... Args>
  auto
  then_split(ExecutionKind exec_kind, F&& callable, Args&&... args) noexcept
    -> Graph& {
    join_ids_.emplace_back(nodes_.size());
    split_ids_.emplace_back(nodes_.size());
    Splitter::split(
      *this, exec_kind, ripple_forward(callable), ripple_forward(args)...);
    return *this;
  }

  /*==--- [memcpy] ---------------------------------------------------------==*/

  template <typename... Args>
  auto memcopy_padding(Args&&... args) noexcept -> Graph& {
    Memcopy::memcopy(
      *this, execution_, TransferKind::asynchronous, ripple_forward(args)...);
    return *this;
  }

  template <typename... Args>
  auto then_memcopy_padding(Args&&... args) noexcept -> Graph& {
    // join_ids_.emplace_back(nodes_.size());
    // split_ids_.emplace_back(nodes_.size());
    Memcopy::memcopy(
      *this, execution_, TransferKind::synchronous, ripple_forward(args)...);
    return *this;
  }

  template <typename... Args>
  auto memcopy_padding(ExecutionKind exec, Args&&... args) noexcept -> Graph& {
    Memcopy::memcopy(*this, exec, ripple_forward(args)...);
    return *this;
  }

  template <typename... Args>
  auto
  then_memcopy_padding(ExecutionKind exec, Args&&... args) noexcept -> Graph& {
    join_ids_.emplace_back(nodes_.size());
    split_ids_.emplace_back(nodes_.size());
    Memcopy::memcopy(*this, exec, ripple_forward(args)...);
    return *this;
  }

  /*==--- [reduction] ------------------------------------------------------==*/

  // clang-format off
  template <typename T, size_t Dims, typename Pred, typename... Args>
  auto reduce(
    Tensor<T, Dims>&    data,
    ReductionResult<T>& result,
    Pred&&              pred,
    Args&&...           args) noexcept -> Graph& {
    Reducer::reduce(
      *this,
      execution_,
      data,
      result,
      ripple_forward(pred),
      ripple_forward(args)...);

    // Add a synchronization which sets that the reduction is complete.
    return sync([&result] { result.set_finished(); });
  }

  template <typename T, size_t Dims, typename Pred, typename... Args>
  auto reduce(
    ExecutionKind       exec_kind,
    Tensor<T, Dims>&    data,
    ReductionResult<T>& result,
    Pred&&              pred,
    Args&&...           args) noexcept -> Graph& {
    Reducer::reduce(
      *this,
      exec_kind,
      data,
      result,
      ripple_forward(pred),
      ripple_forward(args)...);

    // Add a synchronization which sets that the reduction is complete.
    return sync([&result] { result.set_finished(); });
  }

  template <typename T, size_t Dims, typename Pred, typename... Args>
  auto then_reduce(
    Tensor<T, Dims>&    data,
    ReductionResult<T>& result,
    Pred&&              pred,
    Args&&...           args) noexcept -> Graph& {
    join_ids_.emplace_back(nodes_.size());
    split_ids_.emplace_back(nodes_.size());
    return this->reduce(
      data, result, ripple_forward(pred), ripple_forward(args)...);
  }

  template <typename T, size_t Dims, typename Pred, typename... Args>
  auto then_reduce(
    ExecutionKind       exec_kind,
    Tensor<T, Dims>&    data,
    ReductionResult<T>& result,
    Pred&&              pred,
    Args&&...           args) noexcept -> Graph& {
    join_ids_.emplace_back(nodes_.size());
    split_ids_.emplace_back(nodes_.size());
    return this->reduce(
      exec_kind, data, result, ripple_forward(pred), ripple_forward(args)...);
  }
  // clang-format on

  auto num_executions() const noexcept -> size_t {
    return exec_count_;
  }

 private:
  NodeContainer nodes_      = {};
  Connections   join_ids_   = {1, 0};
  Connections   split_ids_  = {1, 0};
  size_t        exec_count_ = 0;
  ExecutionKind execution_  = ExecutionKind::gpu;

  auto connect(Graph& graph) noexcept -> void;

  auto find_in_last_split(typename NodeInfo::IdType id) noexcept
    -> std::optional<NodeType*>;

  auto setup_node(NodeType& node) noexcept -> void {
    setup_split_node(node);
    setup_nonsplit_node(node);
  }

  auto setup_split_node(NodeType& node) noexcept -> void;

  auto setup_nonsplit_node(NodeType& node) noexcept -> void;

  /*==--- [static methods] -------------------------------------------------==*/

  static auto node_allocator() noexcept -> NodeAllocator& {
    static NodeAllocator allocator(allocation_pool_nodes() * sizeof(NodeType));
    return allocator;
  }

  static auto info_allocator() noexcept -> InfoAllocator& {
    static InfoAllocator allocator(allocation_pool_nodes() * sizeof(NodeInfo));
    return allocator;
  }

  static auto allocation_pool_nodes() noexcept -> size_t& {
    static size_t nodes_in_pool{default_nodes};
    return nodes_in_pool;
  }

  static auto is_initialized() noexcept -> bool& {
    static bool initialized{false};
    return initialized;
  }

  static auto initialization_lock() noexcept -> Lock& {
    static Lock lock;
    return lock;
  }
};

/*==--- [implementation] ---------------------------------------------------==*/

auto Graph::set_allocation_pool_size(size_t nodes) noexcept -> bool {
  Guard g(initialization_lock());
  if (is_initialized()) {
    return false;
  }

  allocation_pool_nodes() = nodes;
  is_initialized()        = true;
  return true;
}

auto Graph::reset() noexcept -> void {
  for (auto& node : nodes_) {
    if (node) {
      info_allocator().recycle(node->info_);
      node_allocator().recycle(node);
    }
  }
  exec_count_ = 0;
}

auto Graph::clone() const noexcept -> Graph {
  Graph graph;
  for (const auto& node : nodes_) {
    // TODO: Add copying of node info for new node.
    graph.nodes_.emplace_back(node_allocator().create<NodeType>(*node));
    graph.nodes_.back()->info_ =
      info_allocator().create<NodeInfo>(node->info_->name, node->info_->id);
  }
  graph.join_ids_.clear();
  for (const auto& id : join_ids_) {
    graph.join_ids_.push_back(id);
  }
  graph.split_ids_.clear();
  for (const auto& id : split_ids_) {
    graph.split_ids_.push_back(id);
  }
  graph.exec_count_ = exec_count_;
  return graph;
}

auto Graph::find(std::string name) noexcept -> std::optional<NodeType*> {
  for (auto& node : nodes_) {
    if (std::strcmp(node->info_->name.c_str(), name.c_str()) == 0) {
      return std::make_optional<NodeType*>(node);
    }
  }
  return std::nullopt;
}

auto Graph::find_last_of(std::string name) noexcept
  -> std::optional<NodeType*> {
  for (int i = nodes_.size() - 1; i >= 0; --i) {
    auto* node = nodes_[i];
    if (std::strcmp(node->info_->name.c_str(), name.c_str()) == 0) {
      return std::make_optional<NodeType*>(node);
    }
  }
  return std::nullopt;
}

/*==--- [private] ----------------------------------------------------------==*/

auto Graph::connect(Graph& graph) noexcept -> void {
  const size_t start = std::min(split_ids_.back(), join_ids_.back());
  const size_t end   = std::max(graph.split_ids_[0], graph.join_ids_[0]);

  for (size_t i = 0; i < end; ++i) {
    auto* other_node = graph.nodes_[i];
    for (size_t j = start; j < nodes_.size(); ++j) {
      auto* this_node = nodes_[j];

      // clang-format off
      const bool unmatched_split =
        other_node->kind() == this_node->kind() &&
        other_node->id()   != this_node->id()   &&
        this_node->kind()  == NodeKind::split;
      // clang-format on

      if (unmatched_split) {
        continue;
      }
      this_node->add_successor(*other_node);
    }
  }
}

auto Graph::find_in_last_split(typename NodeInfo::IdType id) noexcept
  -> std::optional<NodeType*> {
  const int start = split_ids_[split_ids_.size() - 2];
  const int end   = split_ids_[split_ids_.size() - 1];
  for (int i = start; i < end; ++i) {
    auto* node = nodes_[i];
    if (node->id() == id) {
      return std::make_optional<NodeType*>(node);
    }
  }
  return std::nullopt;
}

auto Graph::setup_split_node(NodeType& node) noexcept -> void {
  /* For a split node, we need to find all the indices in the previous split
   * and for any node with the same id, we need to add dependencies between
   * that node and this node. We need to do the same for any friends of the
   * dependents. */
  if (!(node.kind() == NodeKind::split && split_ids_.size() > 1)) {
    return;
  }

  const int start = split_ids_[split_ids_.size() - 2];
  const int end   = split_ids_[split_ids_.size() - 1];
  for (int i = start; i < end; ++i) {
    auto* other = nodes_[i];
    if (
      other->kind() != NodeKind::sync &&
      (other->kind() != NodeKind::split || other->id() != node.id())) {
      continue;
    }
    other->add_successor(node);

    for (const auto& friend_id : other->friends()) {
      if (auto friend_node = find_in_last_split(friend_id)) {
        friend_node.value()->add_successor(node);
      }
    }
  }
}

auto Graph::setup_nonsplit_node(NodeType& node) noexcept -> void {
  if (join_ids_.size() <= 1) {
    return;
  }

  /* This node is a successor of all nodes between the the node with the
   * last index in the join index vector and the last node in the node
   * container, so we need to set the number of dependents for the node and
   * also add this node as a successor to those other nodes, if there are
   * enough join indices. */
  constexpr auto split = NodeKind::split;
  const int      start = join_ids_[join_ids_.size() - 2];
  const int      end   = join_ids_[join_ids_.size() - 1];
  for (int i = start; i < end; ++i) {
    auto& other_node = *nodes_[i];
    if (other_node.kind() == split && node.kind() == split) {
      continue;
    }

    // One of the node kinds is not split, so there is a dependence.
    other_node.add_successor(node);
  }
}

} // namespace ripple

#endif // RIPPLE_GRAPH_GRAPH_HPP

Docs

Access comprehensive developer documentation for Ripple

View Docs

Tutorials

Get tutorials to help with understand all features

View Tutorials

Examples

Find examples to help get started

View Examples