Program Listing for File executor.hpp¶
↰ Return to documentation for file (include/ripple/execution/executor.hpp
)
//==--- ripple/core/graph/executor.hpp --------------------- -*- C++ -*- ---==//
//
// Ripple
//
// Copyright (c) 2019, 2020 Rob Clucas
//
// This file is distributed under the MIT License. See LICENSE for details.
//
//==------------------------------------------------------------------------==//
//
#ifndef RIPPLE_EXECUTION_EXECUTOR_HPP
#define RIPPLE_EXECUTION_EXECUTOR_HPP
#include <ripple/graph/graph.hpp>
#include <ripple/graph/stealer.hpp>
#include <ripple/arch/topology.hpp>
#include <ripple/container/static_stealable_dequeue.hpp>
#include <chrono>
#include <thread>
#include <variant>
namespace ripple {
/*==-- [forward declarations] ----------------------------------------------==*/
static auto executor() -> Executor&;
inline auto execute(Graph& graph) noexcept -> void;
template <typename Pred, typename... Args>
inline auto
execute_until(Graph& graph, Pred&& pred, Args&&... args) noexcept -> void;
inline auto fence() noexcept -> void;
inline auto barrier() noexcept -> void;
static thread_local uint32_t thread_id = 0;
class Executor {
public:
friend auto executor() -> Executor&;
auto set_steal_policy(StealPolicy policy) noexcept -> void {
/*
// clang-format off
switch (policy) {
case StealPolicy::random : stealer_ = RandomStealer() ; return;
case StealPolicy::neighbour : stealer_ = NeighbourStealer(); return;
case StealPolicy::topological: stealer_ = TopoStealer() ; return;
default: return;
}
// clang-format on
*/
}
auto execute(Graph& graph) noexcept -> void {
// Check if the threads need to be woken up:
if (exec_state_ != ExecutionState::run) {
exec_state_.store(ExecutionState::run, std::memory_order_relaxed);
for (auto& state : thread_states_) {
state.run_state.store(
ThreadState::RunState::running, std::memory_order_relaxed);
}
}
auto& cpu_queue = cpu_queues_[thread_id];
auto& gpu_queue = gpu_queues_[thread_id];
for (auto& node : graph.nodes_) {
auto& queue = node->execution_kind() == ExecutionKind::gpu ? gpu_queue
: cpu_queue;
while (!queue.try_push(&(*node))) {
// Spin .. this should never happen ...
}
total_nodes_.fetch_add(1, std::memory_order_relaxed);
}
graph.exec_count_++;
}
auto execute_n(Graph& graph, size_t n) noexcept -> void {
const auto last = graph.num_executions() + n;
graph.sync([this, &graph, end = graph.num_executions() + n] {
if (graph.num_executions() < end) {
execute(graph);
}
});
execute(graph);
}
template <typename Pred, typename... Args>
auto
execute_until(Graph& graph, Pred&& pred, Args&&... args) noexcept -> void {
graph.sync(
[this, &graph](auto&& predicate, auto&&... as) {
if (predicate(ripple_forward(as)...)) {
execute(graph);
}
},
ripple_forward(pred),
ripple_forward(args)...);
if (pred(ripple_forward(args)...)) {
execute(graph);
}
}
auto barrier() noexcept -> void {
size_t gpu_id = 0;
for (const auto& thread_state : thread_states_) {
if (gpu_id >= topology().num_gpus()) {
break;
}
if (thread_state.has_gpu_priority()) {
topology().gpus[gpu_id++].execute_barrier();
}
}
auto& state = thread_states_[thread_id];
while (has_unfinished_work()) {
execute_node_work(state, state.first_priority());
execute_node_work(state, state.second_priority());
}
}
auto fence() noexcept -> void {
size_t gpu_id = 0;
for (const auto& thread_state : thread_states_) {
if (gpu_id >= topology().num_gpus()) {
break;
}
if (thread_state.has_gpu_priority()) {
topology().gpus[gpu_id++].synchronize_streams();
}
}
auto& state = thread_states_[thread_id];
while (has_unfinished_work()) {
execute_node_work(state, state.first_priority());
execute_node_work(state, state.second_priority());
}
}
auto num_executed_nodes() const noexcept -> uint32_t {
uint32_t sum = 0;
for (size_t i = 0; i < thread_states_.size(); ++i) {
sum += thread_states_[i].processed_nodes();
}
return sum;
}
auto has_unfinished_work() noexcept -> bool {
return num_executed_nodes() < total_nodes_.load(std::memory_order_relaxed);
}
auto set_active_threads(int cpu_threads, int gpu_threads) noexcept -> void {
int cpu_count = 0, gpu_count = 0;
for (auto& state : thread_states_) {
gpu_count += state.has_gpu_priority() && state.is_running() ? 1 : 0;
cpu_count += state.has_cpu_priority() && state.is_running() ? 1 : 0;
}
int gpu_change = gpu_threads - gpu_count;
int cpu_change = cpu_threads - cpu_count;
gpu_count = 0, cpu_count = 0;
auto modify = [](auto& state, auto& count, const auto& change) {
if (state.is_running() && change < 0) {
state.pause();
count++;
} else if (state.is_paused() && change > 0) {
state.run();
count++;
}
};
for (auto& state : thread_states_) {
state.max_gpu_threads = gpu_threads;
state.max_cpu_threads = cpu_threads;
if (state.has_gpu_priority() && gpu_count < std::abs(gpu_change)) {
modify(state, gpu_count, gpu_change);
state.priority = ExecutionKind::gpu;
continue;
}
if (state.has_cpu_priority() && cpu_count < std::abs(cpu_change)) {
modify(state, cpu_count, cpu_change);
state.priority = ExecutionKind::cpu;
}
}
}
private:
enum class ExecutionState : uint8_t {
paused = 0,
run = 1,
terminate = 2
};
static constexpr size_t max_nodes_per_queue = 4096;
using NodeCounter = std::atomic<uint32_t>;
struct alignas(avoid_false_sharing_size) ThreadState {
enum class RunState : uint8_t {
shutdown = 0,
paused = 1,
running = 2
};
using SafeRunState = std::atomic<RunState>;
uint32_t id = 0;
uint32_t max_cpu_threads = 0;
uint32_t max_gpu_threads = 0;
NodeCounter processed = 0;
SafeRunState run_state = RunState::paused;
ExecutionKind priority = ExecutionKind::gpu;
/*==--- [construction] -------------------------------------------------==*/
ThreadState(
uint32_t index,
uint32_t cpu_threads,
uint32_t gpu_threads,
RunState state,
ExecutionKind p) noexcept
: id{index},
max_cpu_threads{cpu_threads},
max_gpu_threads{gpu_threads},
run_state{state},
priority{p} {}
ThreadState(ThreadState&& other) noexcept
: id{other.id},
max_cpu_threads{other.max_cpu_threads},
max_gpu_threads{other.max_gpu_threads},
run_state{other.run_state.load()},
priority{other.priority} {
other.run_state.store(RunState::paused, std::memory_order_relaxed);
}
ThreadState(const ThreadState&) = delete;
/*==--- [methods] ------------------------------------------------------==*/
auto processed_nodes() const noexcept -> uint32_t {
return processed.load(std::memory_order_relaxed);
}
auto inc_processed_nodes() noexcept -> void {
processed.fetch_add(1, std::memory_order_relaxed);
}
auto max_threads(ExecutionKind kind) noexcept -> uint32_t {
return kind == ExecutionKind::gpu ? max_gpu_threads : max_cpu_threads;
}
auto has_cpu_priority() const noexcept -> bool {
return priority == ExecutionKind::cpu;
}
auto has_gpu_priority() const noexcept -> bool {
return priority == ExecutionKind::gpu;
}
auto first_priority() const noexcept -> ExecutionKind {
return priority;
}
auto second_priority() const noexcept -> ExecutionKind {
return ExecutionKind::cpu;
}
auto is_shutdown() const noexcept -> bool {
return run_state.load(std::memory_order_relaxed) == RunState::shutdown;
}
auto is_paused() const noexcept -> bool {
return run_state.load(std::memory_order_relaxed) == RunState::paused;
}
auto is_running() const noexcept -> bool {
return run_state.load(std::memory_order_relaxed) == RunState::running;
}
auto shutdown() noexcept -> void {
run_state.store(RunState::shutdown, std::memory_order_relaxed);
}
auto pause() noexcept -> void {
return run_state.store(RunState::paused, std::memory_order_relaxed);
}
auto run() noexcept -> void {
return run_state.store(RunState::running, std::memory_order_relaxed);
}
};
// clang-format off
using Threads = std::vector<std::thread>;
using ThreadStates = std::vector<ThreadState>;
using Stealer = NeighbourStealer;
// std::variant<
// RandomStealer, NeighbourStealer, TopoStealer>;
using Queue =
StaticStealableDequeue<Graph::NodeType*, max_nodes_per_queue>;
using Queues = std::vector<Queue>;
using ExecState = std::atomic<ExecutionState>;
// clang-format on
/*==--- [construction] ---------------------------------------------------==*/
Executor(size_t cpu_threads, size_t gpu_threads)
: stealer_{NeighbourStealer{}}, exec_state_{ExecutionState::run} {
create_threads(cpu_threads, gpu_threads);
}
~Executor() {
fence();
for (auto& thread_state : thread_states_) {
thread_state.shutdown();
}
for (auto& thread : threads_) {
if (thread.joinable()) {
thread.join();
}
}
}
//==--- [members] --------------------------------------------------------==//
ThreadStates thread_states_;
Threads threads_;
Queues cpu_queues_;
Queues gpu_queues_;
Stealer stealer_;
ExecState exec_state_ = ExecutionState::run;
NodeCounter total_nodes_ = 0;
auto create_threads(uint32_t cpu_threads, uint32_t gpu_threads) -> void {
const uint32_t total_threads = cpu_threads + gpu_threads;
// Creating thread is always the main thread and has GPU priority so that
// it can push work to both queue types.
thread_id = 0;
set_affinity(thread_id);
gpu_queues_.emplace_back();
cpu_queues_.emplace_back();
thread_states_.emplace_back(
uint32_t{0},
total_threads,
gpu_threads + 1,
ThreadState::RunState::paused,
ExecutionKind::gpu);
for (uint32_t i = 1; i < total_threads; ++i) {
if (i <= gpu_threads) {
gpu_queues_.emplace_back();
}
cpu_queues_.emplace_back();
thread_states_.emplace_back(
i,
total_threads,
gpu_threads + 1,
ThreadState::RunState::running,
i <= gpu_threads ? ExecutionKind::gpu : ExecutionKind::cpu);
}
for (uint32_t i = 1; i < total_threads; ++i) {
threads_.emplace_back(
[&](ThreadState& state) {
using namespace std::chrono_literals;
thread_id = state.id;
set_affinity(state.id);
constexpr uint32_t sleep_fail_count = 100;
uint32_t fails = 0;
while (!state.is_shutdown()) {
if (state.is_paused()) {
std::this_thread::sleep_for(100us);
continue;
}
// Try and do some work. If we did some work, then there is likely
// some more work to do, otherwise we couldn't find any work to
// execute, so we aren't executing.
if (!execute_node_work(state, state.first_priority())) {
if (!execute_node_work(state, state.second_priority())) {
if (++fails != sleep_fail_count) {
continue;
}
// state.pause();
// std::this_thread::sleep_for(5us);
}
}
fails = 0;
}
},
std::ref(thread_states_[i]));
}
}
auto execute_node_work(ThreadState& thread_state, ExecutionKind kind) noexcept
-> bool {
auto& queue = get_queue(thread_state.id, kind);
if (auto node = queue.pop()) {
if (node.value()->try_run()) {
thread_state.inc_processed_nodes();
return true;
}
queue.push(*node);
}
// Steal if we are empty:
return steal(thread_state, kind);
}
auto steal(ThreadState& thread_state, ExecutionKind kind) noexcept -> bool {
// Keep trying to steal until we succeed or reach the max number of
// attempts.
uint32_t steal_id = thread_state.id;
const uint32_t max_threads = thread_state.max_threads(kind);
const uint32_t max_id = std::max(max_threads, uint32_t{1});
const uint32_t max_attempts = std::max(max_id, uint32_t{5});
for (uint32_t i = 0; i < max_attempts; ++i) {
steal_id = stealer_(steal_id, max_id);
// std::visit(
// [&thread_state, steal_id, max_id](auto&& stealer) {
// return stealer(steal_id, max_id);
// },
// stealer_);
if (steal_id == thread_state.id) {
continue;
}
if (auto node = get_queue(steal_id, kind).steal()) {
// If we stole a node, try and run it:
if (node.value()->try_run()) {
thread_state.inc_processed_nodes();
return true;
}
// Node couldn't run, so push it onto our queue.
get_queue(thread_state.id, kind).push(*node);
}
}
return false;
}
auto get_queue(size_t id, ExecutionKind kind) noexcept -> Queue& {
if (kind == ExecutionKind::cpu) {
return cpu_queues_[id];
}
return gpu_queues_[id];
}
};
inline auto executor() -> Executor& {
static Executor exec(13, 3);
// topology().num_cores() - topology().num_gpus(), topology().num_gpus());
return exec;
}
inline auto execute(Graph& graph) noexcept -> void {
executor().execute(graph);
}
template <typename Pred, typename... Args>
inline auto
execute_until(Graph& graph, Pred&& pred, Args&&... args) noexcept -> void {
executor().execute_until(
graph, ripple_forward(pred), ripple_forward(args)...);
}
inline auto fence() noexcept -> void {
executor().fence();
}
inline auto barrier() noexcept -> void {
executor().barrier();
}
} // namespace ripple
#endif // RIPPLE_EXECUTION_EXECUTOR_HPP