Program Listing for File graph.hpp¶
↰ Return to documentation for file (include/ripple/graph/graph.hpp
)
#ifndef RIPPLE_GRAPH_GRAPH_HPP
#define RIPPLE_GRAPH_GRAPH_HPP
#include "memcopy.hpp"
#include "node.hpp"
#include "splitter.hpp"
#include "reducer.hpp"
#include <ripple/algorithm/unrolled_for.hpp>
#include <ripple/allocation/allocator.hpp>
#include <ripple/arch/cache.hpp>
#include <ripple/utility/spinlock.hpp>
#include <ripple/utility/type_traits.hpp>
#include <memory>
#include <mutex>
#include <optional>
#include <unordered_map>
#include <unordered_set>
namespace ripple {
/*==--- [forward declarations] ---------------------------------------------==*/
/* Forward declaration of the Graph class. */
class Graph;
/* Forward declaration of the executor for the graph. */
class Executor;
/* Forward declaration of the execution function for a graph. */
auto execute(Graph& graph) noexcept -> void;
class Graph {
friend class Executor;
friend struct Splitter;
friend struct Memcopy;
// clang-format off
static constexpr size_t node_alignment = avoid_false_sharing_size;
static constexpr size_t node_min_storage = 2 * 72;
using ArenaType = HeapArena;
using NodeType = Node<node_alignment, node_min_storage>;
using NodeAllocator = ThreadSafeObjectPoolAllocator<NodeType, ArenaType>;
using InfoAllocator = ThreadSafeObjectPoolAllocator<NodeInfo, ArenaType>;
using NodeContainer = std::vector<NodeType*>;
using Connections = std::vector<int>;
using Lock = Spinlock;
using Guard = std::lock_guard<Lock>;
// clang-format on
template <typename T>
using non_node_enable_t =
std::enable_if_t<!std::is_same_v<NodeType, std::decay_t<T>>, int>;
public:
// clang-format off
static constexpr size_t default_nodes = 1024;
static constexpr auto default_id = NodeInfo::default_id;
// clang-format on
/*==--- [construction] ---------------------------------------------------==*/
Graph() = default;
Graph(ExecutionKind exec_kind) noexcept : execution_{exec_kind} {}
~Graph() noexcept {
reset();
}
Graph(Graph&& other) noexcept
: nodes_{ripple_move(other.nodes_)},
join_ids_{ripple_move(other.join_ids_)},
split_ids_{ripple_move(other.split_ids_)},
exec_count_{other.exec_count_},
execution_{other.execution_} {
other.exec_count_ = 0;
}
Graph(const Graph&) = delete;
/*==--- [operator overloads] ---------------------------------------------==*/
auto operator=(Graph&& other) noexcept -> Graph& {
if (this != &other) {
nodes_ = ripple_move(other.nodes_);
join_ids_ = ripple_move(other.join_ids_);
split_ids_ = ripple_move(other.split_ids_);
exec_count_ = other.exec_count_;
execution_ = other.execution_;
other.exec_count_ = 0;
}
return *this;
}
auto operator=(const Graph& other) = delete;
/*==--- [interface] ------------------------------------------------------==*/
static auto set_allocation_pool_size(size_t nodes) noexcept -> bool;
template <typename F, typename... Args>
static auto make_node(F&& callable, Args&&... args) -> NodeType& {
return *node_allocator().create<NodeType>(
ripple_forward(callable), ripple_forward(args)...);
}
template <typename F, typename... Args>
static auto make_node(F&& callable, ExecutionKind exec_kind, Args&&... args)
-> NodeType& {
NodeType& node = *node_allocator().create<NodeType>(
ripple_forward(callable), ripple_forward(args)...);
node.info_ = info_allocator().create<NodeInfo>(exec_kind);
return node;
}
auto reset() noexcept -> void;
auto clone() const noexcept -> Graph;
auto size() const -> size_t {
return nodes_.size();
}
auto allocation_pool_size() const noexcept -> size_t {
return allocation_pool_nodes();
}
/*==--- [find] -----------------------------------------------------------==*/
auto find(std::string name) noexcept -> std::optional<NodeType*>;
auto find_last_of(std::string name) noexcept -> std::optional<NodeType*>;
/*==--- [emplace] --------------------------------------------------------==*/
template <typename F, typename... Args, non_node_enable_t<F> = 0>
auto emplace(F&& callable, Args&&... args) -> Graph& {
const auto no_name = std::string("");
return emplace_named(
no_name, ripple_forward(callable), ripple_forward(args)...);
}
template <typename F, typename... Args, non_node_enable_t<F> = 0>
auto emplace_named(NodeInfo info, F&& callable, Args&&... args) -> Graph& {
auto& node = *nodes_.emplace_back(node_allocator().create<NodeType>(
ripple_forward(callable), ripple_forward(args)...));
node.info_ = info_allocator().create<NodeInfo>(
info.name, info.id, info.kind, info.exec);
setup_node(node);
return *this;
}
template <
typename... Nodes,
all_same_enable_t<NodeType, std::decay_t<Nodes>...> = 0>
auto emplace(Nodes&&... nodes) -> Graph& {
constexpr size_t node_count = sizeof...(Nodes);
// Make sure that we have a tuple of __references__:
auto node_tuple =
std::tuple<std::decay_t<Nodes>&...>{ripple_forward(nodes)...};
unrolled_for<node_count>([&](auto i) {
setup_node(*nodes_.emplace_back(&std::get<i>(node_tuple)));
});
return *this;
}
auto emplace(Graph& graph) noexcept -> Graph& {
connect(graph);
return emplace([&graph] { execute(graph); });
}
/*==--- [sync] -----------------------------------------------------------==*/
template <typename F, typename... Args, non_node_enable_t<F> = 0>
auto sync(F&& callable, Args&&... args) -> Graph& {
join_ids_.emplace_back(nodes_.size());
NodeInfo info{NodeKind::sync, execution_};
return emplace_named(
info, ripple_forward(callable), ripple_forward(args)...);
}
/*
* \note These synchronization functions need to be implemented here, to
* ensure that the cuda functions are called when device functionality
* is required.
*
* If in a cpp file, then if compiled as c++ code and linked against
* a cuda executable, the cuda synchronization won't run.
*/
template <typename F, typename... Args, non_node_enable_t<F> = 0>
auto gpu_fence(F&& callable, Args&&... args) -> Graph& {
join_ids_.emplace_back(nodes_.size());
NodeInfo info{NodeKind::normal, ExecutionKind::gpu};
for (const auto& gpu : topology().gpus) {
emplace_named(info, [&gpu] { gpu.synchronize_streams(); });
}
join_ids_.emplace_back(nodes_.size());
info.kind = NodeKind::sync;
return emplace_named(
info, ripple_forward(callable), ripple_forward(args)...);
}
auto gpu_barrier() -> Graph& {
join_ids_.emplace_back(nodes_.size());
NodeInfo info{NodeKind::normal, ExecutionKind::gpu};
for (auto& gpu : topology().gpus) {
gpu.prepare_barrier();
emplace_named(info, [&gpu] { gpu.execute_barrier(); });
}
join_ids_.emplace_back(nodes_.size());
info.kind = NodeKind::sync;
return emplace_named(info, [] {
size_t count = topology().num_gpus();
while (count > 0) {
for (const auto& gpu : topology().gpus) {
if (gpu.is_barrier_down()) {
count--;
}
}
}
});
}
/*==--- [then] -----------------------------------------------------------==*/
template <typename F, typename... Args, non_node_enable_t<F> = 0>
auto then(F&& callable, Args&&... args) -> Graph& {
join_ids_.emplace_back(nodes_.size());
return emplace(ripple_forward(callable), ripple_forward(args)...);
}
template <
typename... Nodes,
all_same_enable_t<NodeType, std::decay_t<Nodes>...> = 0>
auto then(Nodes&&... nodes) -> Graph& {
join_ids_.emplace_back(nodes_.size());
return emplace(ripple_forward(nodes)...);
}
auto then(Graph& graph) noexcept -> Graph& {
return then([&graph] { execute(graph); });
}
/*==--- [split] ----------------------------------------------------------==*/
template <typename Pred, typename... Args>
auto conditional(Pred&& pred, Args&&... args) -> Graph& {
return sync(
[this](auto&& predicate, auto&&... as) {
if (predicate(ripple_forward(as)...)) {
execute(*this);
}
},
ripple_forward(pred),
ripple_forward(args)...);
}
template <typename F, typename... Args>
auto split(F&& callable, Args&&... args) noexcept -> Graph& {
Splitter::split(
*this, execution_, ripple_forward(callable), ripple_forward(args)...);
return *this;
}
template <typename F, typename... Args>
auto split(ExecutionKind exec_kind, F&& callable, Args&&... args) noexcept
-> Graph& {
Splitter::split(
*this, exec_kind, ripple_forward(callable), ripple_forward(args)...);
return *this;
}
template <typename F, typename... Args>
auto then_split(F&& callable, Args&&... args) noexcept -> Graph& {
join_ids_.emplace_back(nodes_.size());
split_ids_.emplace_back(nodes_.size());
Splitter::split(
*this, execution_, ripple_forward(callable), ripple_forward(args)...);
return *this;
}
template <typename F, typename... Args>
auto
then_split(ExecutionKind exec_kind, F&& callable, Args&&... args) noexcept
-> Graph& {
join_ids_.emplace_back(nodes_.size());
split_ids_.emplace_back(nodes_.size());
Splitter::split(
*this, exec_kind, ripple_forward(callable), ripple_forward(args)...);
return *this;
}
/*==--- [memcpy] ---------------------------------------------------------==*/
template <typename... Args>
auto memcopy_padding(Args&&... args) noexcept -> Graph& {
Memcopy::memcopy(
*this, execution_, TransferKind::asynchronous, ripple_forward(args)...);
return *this;
}
template <typename... Args>
auto then_memcopy_padding(Args&&... args) noexcept -> Graph& {
// join_ids_.emplace_back(nodes_.size());
// split_ids_.emplace_back(nodes_.size());
Memcopy::memcopy(
*this, execution_, TransferKind::synchronous, ripple_forward(args)...);
return *this;
}
template <typename... Args>
auto memcopy_padding(ExecutionKind exec, Args&&... args) noexcept -> Graph& {
Memcopy::memcopy(*this, exec, ripple_forward(args)...);
return *this;
}
template <typename... Args>
auto
then_memcopy_padding(ExecutionKind exec, Args&&... args) noexcept -> Graph& {
join_ids_.emplace_back(nodes_.size());
split_ids_.emplace_back(nodes_.size());
Memcopy::memcopy(*this, exec, ripple_forward(args)...);
return *this;
}
/*==--- [reduction] ------------------------------------------------------==*/
// clang-format off
template <typename T, size_t Dims, typename Pred, typename... Args>
auto reduce(
Tensor<T, Dims>& data,
ReductionResult<T>& result,
Pred&& pred,
Args&&... args) noexcept -> Graph& {
Reducer::reduce(
*this,
execution_,
data,
result,
ripple_forward(pred),
ripple_forward(args)...);
// Add a synchronization which sets that the reduction is complete.
return sync([&result] { result.set_finished(); });
}
template <typename T, size_t Dims, typename Pred, typename... Args>
auto reduce(
ExecutionKind exec_kind,
Tensor<T, Dims>& data,
ReductionResult<T>& result,
Pred&& pred,
Args&&... args) noexcept -> Graph& {
Reducer::reduce(
*this,
exec_kind,
data,
result,
ripple_forward(pred),
ripple_forward(args)...);
// Add a synchronization which sets that the reduction is complete.
return sync([&result] { result.set_finished(); });
}
template <typename T, size_t Dims, typename Pred, typename... Args>
auto then_reduce(
Tensor<T, Dims>& data,
ReductionResult<T>& result,
Pred&& pred,
Args&&... args) noexcept -> Graph& {
join_ids_.emplace_back(nodes_.size());
split_ids_.emplace_back(nodes_.size());
return this->reduce(
data, result, ripple_forward(pred), ripple_forward(args)...);
}
template <typename T, size_t Dims, typename Pred, typename... Args>
auto then_reduce(
ExecutionKind exec_kind,
Tensor<T, Dims>& data,
ReductionResult<T>& result,
Pred&& pred,
Args&&... args) noexcept -> Graph& {
join_ids_.emplace_back(nodes_.size());
split_ids_.emplace_back(nodes_.size());
return this->reduce(
exec_kind, data, result, ripple_forward(pred), ripple_forward(args)...);
}
// clang-format on
auto num_executions() const noexcept -> size_t {
return exec_count_;
}
private:
NodeContainer nodes_ = {};
Connections join_ids_ = {1, 0};
Connections split_ids_ = {1, 0};
size_t exec_count_ = 0;
ExecutionKind execution_ = ExecutionKind::gpu;
auto connect(Graph& graph) noexcept -> void;
auto find_in_last_split(typename NodeInfo::IdType id) noexcept
-> std::optional<NodeType*>;
auto setup_node(NodeType& node) noexcept -> void {
setup_split_node(node);
setup_nonsplit_node(node);
}
auto setup_split_node(NodeType& node) noexcept -> void;
auto setup_nonsplit_node(NodeType& node) noexcept -> void;
/*==--- [static methods] -------------------------------------------------==*/
static auto node_allocator() noexcept -> NodeAllocator& {
static NodeAllocator allocator(allocation_pool_nodes() * sizeof(NodeType));
return allocator;
}
static auto info_allocator() noexcept -> InfoAllocator& {
static InfoAllocator allocator(allocation_pool_nodes() * sizeof(NodeInfo));
return allocator;
}
static auto allocation_pool_nodes() noexcept -> size_t& {
static size_t nodes_in_pool{default_nodes};
return nodes_in_pool;
}
static auto is_initialized() noexcept -> bool& {
static bool initialized{false};
return initialized;
}
static auto initialization_lock() noexcept -> Lock& {
static Lock lock;
return lock;
}
};
/*==--- [implementation] ---------------------------------------------------==*/
auto Graph::set_allocation_pool_size(size_t nodes) noexcept -> bool {
Guard g(initialization_lock());
if (is_initialized()) {
return false;
}
allocation_pool_nodes() = nodes;
is_initialized() = true;
return true;
}
auto Graph::reset() noexcept -> void {
for (auto& node : nodes_) {
if (node) {
info_allocator().recycle(node->info_);
node_allocator().recycle(node);
}
}
exec_count_ = 0;
}
auto Graph::clone() const noexcept -> Graph {
Graph graph;
for (const auto& node : nodes_) {
// TODO: Add copying of node info for new node.
graph.nodes_.emplace_back(node_allocator().create<NodeType>(*node));
graph.nodes_.back()->info_ =
info_allocator().create<NodeInfo>(node->info_->name, node->info_->id);
}
graph.join_ids_.clear();
for (const auto& id : join_ids_) {
graph.join_ids_.push_back(id);
}
graph.split_ids_.clear();
for (const auto& id : split_ids_) {
graph.split_ids_.push_back(id);
}
graph.exec_count_ = exec_count_;
return graph;
}
auto Graph::find(std::string name) noexcept -> std::optional<NodeType*> {
for (auto& node : nodes_) {
if (std::strcmp(node->info_->name.c_str(), name.c_str()) == 0) {
return std::make_optional<NodeType*>(node);
}
}
return std::nullopt;
}
auto Graph::find_last_of(std::string name) noexcept
-> std::optional<NodeType*> {
for (int i = nodes_.size() - 1; i >= 0; --i) {
auto* node = nodes_[i];
if (std::strcmp(node->info_->name.c_str(), name.c_str()) == 0) {
return std::make_optional<NodeType*>(node);
}
}
return std::nullopt;
}
/*==--- [private] ----------------------------------------------------------==*/
auto Graph::connect(Graph& graph) noexcept -> void {
const size_t start = std::min(split_ids_.back(), join_ids_.back());
const size_t end = std::max(graph.split_ids_[0], graph.join_ids_[0]);
for (size_t i = 0; i < end; ++i) {
auto* other_node = graph.nodes_[i];
for (size_t j = start; j < nodes_.size(); ++j) {
auto* this_node = nodes_[j];
// clang-format off
const bool unmatched_split =
other_node->kind() == this_node->kind() &&
other_node->id() != this_node->id() &&
this_node->kind() == NodeKind::split;
// clang-format on
if (unmatched_split) {
continue;
}
this_node->add_successor(*other_node);
}
}
}
auto Graph::find_in_last_split(typename NodeInfo::IdType id) noexcept
-> std::optional<NodeType*> {
const int start = split_ids_[split_ids_.size() - 2];
const int end = split_ids_[split_ids_.size() - 1];
for (int i = start; i < end; ++i) {
auto* node = nodes_[i];
if (node->id() == id) {
return std::make_optional<NodeType*>(node);
}
}
return std::nullopt;
}
auto Graph::setup_split_node(NodeType& node) noexcept -> void {
/* For a split node, we need to find all the indices in the previous split
* and for any node with the same id, we need to add dependencies between
* that node and this node. We need to do the same for any friends of the
* dependents. */
if (!(node.kind() == NodeKind::split && split_ids_.size() > 1)) {
return;
}
const int start = split_ids_[split_ids_.size() - 2];
const int end = split_ids_[split_ids_.size() - 1];
for (int i = start; i < end; ++i) {
auto* other = nodes_[i];
if (
other->kind() != NodeKind::sync &&
(other->kind() != NodeKind::split || other->id() != node.id())) {
continue;
}
other->add_successor(node);
for (const auto& friend_id : other->friends()) {
if (auto friend_node = find_in_last_split(friend_id)) {
friend_node.value()->add_successor(node);
}
}
}
}
auto Graph::setup_nonsplit_node(NodeType& node) noexcept -> void {
if (join_ids_.size() <= 1) {
return;
}
/* This node is a successor of all nodes between the the node with the
* last index in the join index vector and the last node in the node
* container, so we need to set the number of dependents for the node and
* also add this node as a successor to those other nodes, if there are
* enough join indices. */
constexpr auto split = NodeKind::split;
const int start = join_ids_[join_ids_.size() - 2];
const int end = join_ids_[join_ids_.size() - 1];
for (int i = start; i < end; ++i) {
auto& other_node = *nodes_[i];
if (other_node.kind() == split && node.kind() == split) {
continue;
}
// One of the node kinds is not split, so there is a dependence.
other_node.add_successor(node);
}
}
} // namespace ripple
#endif // RIPPLE_GRAPH_GRAPH_HPP