.. _program_listing_file_include_ripple_graph_graph.hpp: Program Listing for File graph.hpp ================================== |exhale_lsh| :ref:`Return to documentation for file ` (``include/ripple/graph/graph.hpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp #ifndef RIPPLE_GRAPH_GRAPH_HPP #define RIPPLE_GRAPH_GRAPH_HPP #include "memcopy.hpp" #include "node.hpp" #include "splitter.hpp" #include "reducer.hpp" #include #include #include #include #include #include #include #include #include #include namespace ripple { /*==--- [forward declarations] ---------------------------------------------==*/ /* Forward declaration of the Graph class. */ class Graph; /* Forward declaration of the executor for the graph. */ class Executor; /* Forward declaration of the execution function for a graph. */ auto execute(Graph& graph) noexcept -> void; class Graph { friend class Executor; friend struct Splitter; friend struct Memcopy; // clang-format off static constexpr size_t node_alignment = avoid_false_sharing_size; static constexpr size_t node_min_storage = 2 * 72; using ArenaType = HeapArena; using NodeType = Node; using NodeAllocator = ThreadSafeObjectPoolAllocator; using InfoAllocator = ThreadSafeObjectPoolAllocator; using NodeContainer = std::vector; using Connections = std::vector; using Lock = Spinlock; using Guard = std::lock_guard; // clang-format on template using non_node_enable_t = std::enable_if_t>, int>; public: // clang-format off static constexpr size_t default_nodes = 1024; static constexpr auto default_id = NodeInfo::default_id; // clang-format on /*==--- [construction] ---------------------------------------------------==*/ Graph() = default; Graph(ExecutionKind exec_kind) noexcept : execution_{exec_kind} {} ~Graph() noexcept { reset(); } Graph(Graph&& other) noexcept : nodes_{ripple_move(other.nodes_)}, join_ids_{ripple_move(other.join_ids_)}, split_ids_{ripple_move(other.split_ids_)}, exec_count_{other.exec_count_}, execution_{other.execution_} { other.exec_count_ = 0; } Graph(const Graph&) = delete; /*==--- [operator overloads] ---------------------------------------------==*/ auto operator=(Graph&& other) noexcept -> Graph& { if (this != &other) { nodes_ = ripple_move(other.nodes_); join_ids_ = ripple_move(other.join_ids_); split_ids_ = ripple_move(other.split_ids_); exec_count_ = other.exec_count_; execution_ = other.execution_; other.exec_count_ = 0; } return *this; } auto operator=(const Graph& other) = delete; /*==--- [interface] ------------------------------------------------------==*/ static auto set_allocation_pool_size(size_t nodes) noexcept -> bool; template static auto make_node(F&& callable, Args&&... args) -> NodeType& { return *node_allocator().create( ripple_forward(callable), ripple_forward(args)...); } template static auto make_node(F&& callable, ExecutionKind exec_kind, Args&&... args) -> NodeType& { NodeType& node = *node_allocator().create( ripple_forward(callable), ripple_forward(args)...); node.info_ = info_allocator().create(exec_kind); return node; } auto reset() noexcept -> void; auto clone() const noexcept -> Graph; auto size() const -> size_t { return nodes_.size(); } auto allocation_pool_size() const noexcept -> size_t { return allocation_pool_nodes(); } /*==--- [find] -----------------------------------------------------------==*/ auto find(std::string name) noexcept -> std::optional; auto find_last_of(std::string name) noexcept -> std::optional; /*==--- [emplace] --------------------------------------------------------==*/ template = 0> auto emplace(F&& callable, Args&&... args) -> Graph& { const auto no_name = std::string(""); return emplace_named( no_name, ripple_forward(callable), ripple_forward(args)...); } template = 0> auto emplace_named(NodeInfo info, F&& callable, Args&&... args) -> Graph& { auto& node = *nodes_.emplace_back(node_allocator().create( ripple_forward(callable), ripple_forward(args)...)); node.info_ = info_allocator().create( info.name, info.id, info.kind, info.exec); setup_node(node); return *this; } template < typename... Nodes, all_same_enable_t...> = 0> auto emplace(Nodes&&... nodes) -> Graph& { constexpr size_t node_count = sizeof...(Nodes); // Make sure that we have a tuple of __references__: auto node_tuple = std::tuple&...>{ripple_forward(nodes)...}; unrolled_for([&](auto i) { setup_node(*nodes_.emplace_back(&std::get(node_tuple))); }); return *this; } auto emplace(Graph& graph) noexcept -> Graph& { connect(graph); return emplace([&graph] { execute(graph); }); } /*==--- [sync] -----------------------------------------------------------==*/ template = 0> auto sync(F&& callable, Args&&... args) -> Graph& { join_ids_.emplace_back(nodes_.size()); NodeInfo info{NodeKind::sync, execution_}; return emplace_named( info, ripple_forward(callable), ripple_forward(args)...); } /* * \note These synchronization functions need to be implemented here, to * ensure that the cuda functions are called when device functionality * is required. * * If in a cpp file, then if compiled as c++ code and linked against * a cuda executable, the cuda synchronization won't run. */ template = 0> auto gpu_fence(F&& callable, Args&&... args) -> Graph& { join_ids_.emplace_back(nodes_.size()); NodeInfo info{NodeKind::normal, ExecutionKind::gpu}; for (const auto& gpu : topology().gpus) { emplace_named(info, [&gpu] { gpu.synchronize_streams(); }); } join_ids_.emplace_back(nodes_.size()); info.kind = NodeKind::sync; return emplace_named( info, ripple_forward(callable), ripple_forward(args)...); } auto gpu_barrier() -> Graph& { join_ids_.emplace_back(nodes_.size()); NodeInfo info{NodeKind::normal, ExecutionKind::gpu}; for (auto& gpu : topology().gpus) { gpu.prepare_barrier(); emplace_named(info, [&gpu] { gpu.execute_barrier(); }); } join_ids_.emplace_back(nodes_.size()); info.kind = NodeKind::sync; return emplace_named(info, [] { size_t count = topology().num_gpus(); while (count > 0) { for (const auto& gpu : topology().gpus) { if (gpu.is_barrier_down()) { count--; } } } }); } /*==--- [then] -----------------------------------------------------------==*/ template = 0> auto then(F&& callable, Args&&... args) -> Graph& { join_ids_.emplace_back(nodes_.size()); return emplace(ripple_forward(callable), ripple_forward(args)...); } template < typename... Nodes, all_same_enable_t...> = 0> auto then(Nodes&&... nodes) -> Graph& { join_ids_.emplace_back(nodes_.size()); return emplace(ripple_forward(nodes)...); } auto then(Graph& graph) noexcept -> Graph& { return then([&graph] { execute(graph); }); } /*==--- [split] ----------------------------------------------------------==*/ template auto conditional(Pred&& pred, Args&&... args) -> Graph& { return sync( [this](auto&& predicate, auto&&... as) { if (predicate(ripple_forward(as)...)) { execute(*this); } }, ripple_forward(pred), ripple_forward(args)...); } template auto split(F&& callable, Args&&... args) noexcept -> Graph& { Splitter::split( *this, execution_, ripple_forward(callable), ripple_forward(args)...); return *this; } template auto split(ExecutionKind exec_kind, F&& callable, Args&&... args) noexcept -> Graph& { Splitter::split( *this, exec_kind, ripple_forward(callable), ripple_forward(args)...); return *this; } template auto then_split(F&& callable, Args&&... args) noexcept -> Graph& { join_ids_.emplace_back(nodes_.size()); split_ids_.emplace_back(nodes_.size()); Splitter::split( *this, execution_, ripple_forward(callable), ripple_forward(args)...); return *this; } template auto then_split(ExecutionKind exec_kind, F&& callable, Args&&... args) noexcept -> Graph& { join_ids_.emplace_back(nodes_.size()); split_ids_.emplace_back(nodes_.size()); Splitter::split( *this, exec_kind, ripple_forward(callable), ripple_forward(args)...); return *this; } /*==--- [memcpy] ---------------------------------------------------------==*/ template auto memcopy_padding(Args&&... args) noexcept -> Graph& { Memcopy::memcopy( *this, execution_, TransferKind::asynchronous, ripple_forward(args)...); return *this; } template auto then_memcopy_padding(Args&&... args) noexcept -> Graph& { // join_ids_.emplace_back(nodes_.size()); // split_ids_.emplace_back(nodes_.size()); Memcopy::memcopy( *this, execution_, TransferKind::synchronous, ripple_forward(args)...); return *this; } template auto memcopy_padding(ExecutionKind exec, Args&&... args) noexcept -> Graph& { Memcopy::memcopy(*this, exec, ripple_forward(args)...); return *this; } template auto then_memcopy_padding(ExecutionKind exec, Args&&... args) noexcept -> Graph& { join_ids_.emplace_back(nodes_.size()); split_ids_.emplace_back(nodes_.size()); Memcopy::memcopy(*this, exec, ripple_forward(args)...); return *this; } /*==--- [reduction] ------------------------------------------------------==*/ // clang-format off template auto reduce( Tensor& data, ReductionResult& result, Pred&& pred, Args&&... args) noexcept -> Graph& { Reducer::reduce( *this, execution_, data, result, ripple_forward(pred), ripple_forward(args)...); // Add a synchronization which sets that the reduction is complete. return sync([&result] { result.set_finished(); }); } template auto reduce( ExecutionKind exec_kind, Tensor& data, ReductionResult& result, Pred&& pred, Args&&... args) noexcept -> Graph& { Reducer::reduce( *this, exec_kind, data, result, ripple_forward(pred), ripple_forward(args)...); // Add a synchronization which sets that the reduction is complete. return sync([&result] { result.set_finished(); }); } template auto then_reduce( Tensor& data, ReductionResult& result, Pred&& pred, Args&&... args) noexcept -> Graph& { join_ids_.emplace_back(nodes_.size()); split_ids_.emplace_back(nodes_.size()); return this->reduce( data, result, ripple_forward(pred), ripple_forward(args)...); } template auto then_reduce( ExecutionKind exec_kind, Tensor& data, ReductionResult& result, Pred&& pred, Args&&... args) noexcept -> Graph& { join_ids_.emplace_back(nodes_.size()); split_ids_.emplace_back(nodes_.size()); return this->reduce( exec_kind, data, result, ripple_forward(pred), ripple_forward(args)...); } // clang-format on auto num_executions() const noexcept -> size_t { return exec_count_; } private: NodeContainer nodes_ = {}; Connections join_ids_ = {1, 0}; Connections split_ids_ = {1, 0}; size_t exec_count_ = 0; ExecutionKind execution_ = ExecutionKind::gpu; auto connect(Graph& graph) noexcept -> void; auto find_in_last_split(typename NodeInfo::IdType id) noexcept -> std::optional; auto setup_node(NodeType& node) noexcept -> void { setup_split_node(node); setup_nonsplit_node(node); } auto setup_split_node(NodeType& node) noexcept -> void; auto setup_nonsplit_node(NodeType& node) noexcept -> void; /*==--- [static methods] -------------------------------------------------==*/ static auto node_allocator() noexcept -> NodeAllocator& { static NodeAllocator allocator(allocation_pool_nodes() * sizeof(NodeType)); return allocator; } static auto info_allocator() noexcept -> InfoAllocator& { static InfoAllocator allocator(allocation_pool_nodes() * sizeof(NodeInfo)); return allocator; } static auto allocation_pool_nodes() noexcept -> size_t& { static size_t nodes_in_pool{default_nodes}; return nodes_in_pool; } static auto is_initialized() noexcept -> bool& { static bool initialized{false}; return initialized; } static auto initialization_lock() noexcept -> Lock& { static Lock lock; return lock; } }; /*==--- [implementation] ---------------------------------------------------==*/ auto Graph::set_allocation_pool_size(size_t nodes) noexcept -> bool { Guard g(initialization_lock()); if (is_initialized()) { return false; } allocation_pool_nodes() = nodes; is_initialized() = true; return true; } auto Graph::reset() noexcept -> void { for (auto& node : nodes_) { if (node) { info_allocator().recycle(node->info_); node_allocator().recycle(node); } } exec_count_ = 0; } auto Graph::clone() const noexcept -> Graph { Graph graph; for (const auto& node : nodes_) { // TODO: Add copying of node info for new node. graph.nodes_.emplace_back(node_allocator().create(*node)); graph.nodes_.back()->info_ = info_allocator().create(node->info_->name, node->info_->id); } graph.join_ids_.clear(); for (const auto& id : join_ids_) { graph.join_ids_.push_back(id); } graph.split_ids_.clear(); for (const auto& id : split_ids_) { graph.split_ids_.push_back(id); } graph.exec_count_ = exec_count_; return graph; } auto Graph::find(std::string name) noexcept -> std::optional { for (auto& node : nodes_) { if (std::strcmp(node->info_->name.c_str(), name.c_str()) == 0) { return std::make_optional(node); } } return std::nullopt; } auto Graph::find_last_of(std::string name) noexcept -> std::optional { for (int i = nodes_.size() - 1; i >= 0; --i) { auto* node = nodes_[i]; if (std::strcmp(node->info_->name.c_str(), name.c_str()) == 0) { return std::make_optional(node); } } return std::nullopt; } /*==--- [private] ----------------------------------------------------------==*/ auto Graph::connect(Graph& graph) noexcept -> void { const size_t start = std::min(split_ids_.back(), join_ids_.back()); const size_t end = std::max(graph.split_ids_[0], graph.join_ids_[0]); for (size_t i = 0; i < end; ++i) { auto* other_node = graph.nodes_[i]; for (size_t j = start; j < nodes_.size(); ++j) { auto* this_node = nodes_[j]; // clang-format off const bool unmatched_split = other_node->kind() == this_node->kind() && other_node->id() != this_node->id() && this_node->kind() == NodeKind::split; // clang-format on if (unmatched_split) { continue; } this_node->add_successor(*other_node); } } } auto Graph::find_in_last_split(typename NodeInfo::IdType id) noexcept -> std::optional { const int start = split_ids_[split_ids_.size() - 2]; const int end = split_ids_[split_ids_.size() - 1]; for (int i = start; i < end; ++i) { auto* node = nodes_[i]; if (node->id() == id) { return std::make_optional(node); } } return std::nullopt; } auto Graph::setup_split_node(NodeType& node) noexcept -> void { /* For a split node, we need to find all the indices in the previous split * and for any node with the same id, we need to add dependencies between * that node and this node. We need to do the same for any friends of the * dependents. */ if (!(node.kind() == NodeKind::split && split_ids_.size() > 1)) { return; } const int start = split_ids_[split_ids_.size() - 2]; const int end = split_ids_[split_ids_.size() - 1]; for (int i = start; i < end; ++i) { auto* other = nodes_[i]; if ( other->kind() != NodeKind::sync && (other->kind() != NodeKind::split || other->id() != node.id())) { continue; } other->add_successor(node); for (const auto& friend_id : other->friends()) { if (auto friend_node = find_in_last_split(friend_id)) { friend_node.value()->add_successor(node); } } } } auto Graph::setup_nonsplit_node(NodeType& node) noexcept -> void { if (join_ids_.size() <= 1) { return; } /* This node is a successor of all nodes between the the node with the * last index in the join index vector and the last node in the node * container, so we need to set the number of dependents for the node and * also add this node as a successor to those other nodes, if there are * enough join indices. */ constexpr auto split = NodeKind::split; const int start = join_ids_[join_ids_.size() - 2]; const int end = join_ids_[join_ids_.size() - 1]; for (int i = start; i < end; ++i) { auto& other_node = *nodes_[i]; if (other_node.kind() == split && node.kind() == split) { continue; } // One of the node kinds is not split, so there is a dependence. other_node.add_successor(node); } } } // namespace ripple #endif // RIPPLE_GRAPH_GRAPH_HPP