• Docs >
  • Program Listing for File executor.hpp
Shortcuts

Program Listing for File executor.hpp

Return to documentation for file (include/ripple/execution/executor.hpp)

//==--- ripple/core/graph/executor.hpp --------------------- -*- C++ -*- ---==//
//
//                                 Ripple
//
//                      Copyright (c) 2019, 2020 Rob Clucas
//
//  This file is distributed under the MIT License. See LICENSE for details.
//
//==------------------------------------------------------------------------==//
//

#ifndef RIPPLE_EXECUTION_EXECUTOR_HPP
#define RIPPLE_EXECUTION_EXECUTOR_HPP

#include <ripple/graph/graph.hpp>
#include <ripple/graph/stealer.hpp>
#include <ripple/arch/topology.hpp>
#include <ripple/container/static_stealable_dequeue.hpp>
#include <chrono>
#include <thread>
#include <variant>

namespace ripple {

/*==-- [forward declarations] ----------------------------------------------==*/

static auto executor() -> Executor&;

inline auto execute(Graph& graph) noexcept -> void;

template <typename Pred, typename... Args>
inline auto
execute_until(Graph& graph, Pred&& pred, Args&&... args) noexcept -> void;

inline auto fence() noexcept -> void;

inline auto barrier() noexcept -> void;

static thread_local uint32_t thread_id = 0;

class Executor {
 public:
  friend auto executor() -> Executor&;

  auto set_steal_policy(StealPolicy policy) noexcept -> void {
    /*
       // clang-format off
       switch (policy) {
         case StealPolicy::random     : stealer_ = RandomStealer()   ; return;
         case StealPolicy::neighbour  : stealer_ = NeighbourStealer(); return;
         case StealPolicy::topological: stealer_ = TopoStealer()     ; return;
         default: return;
       }
       // clang-format on
   */
  }

  auto execute(Graph& graph) noexcept -> void {
    // Check if the threads need to be woken up:
    if (exec_state_ != ExecutionState::run) {
      exec_state_.store(ExecutionState::run, std::memory_order_relaxed);
      for (auto& state : thread_states_) {
        state.run_state.store(
          ThreadState::RunState::running, std::memory_order_relaxed);
      }
    }

    auto& cpu_queue = cpu_queues_[thread_id];
    auto& gpu_queue = gpu_queues_[thread_id];
    for (auto& node : graph.nodes_) {
      auto& queue = node->execution_kind() == ExecutionKind::gpu ? gpu_queue
                                                                 : cpu_queue;
      while (!queue.try_push(&(*node))) {
        // Spin .. this should never happen ...
      }
      total_nodes_.fetch_add(1, std::memory_order_relaxed);
    }
    graph.exec_count_++;
  }

  auto execute_n(Graph& graph, size_t n) noexcept -> void {
    const auto last = graph.num_executions() + n;
    graph.sync([this, &graph, end = graph.num_executions() + n] {
      if (graph.num_executions() < end) {
        execute(graph);
      }
    });
    execute(graph);
  }

  template <typename Pred, typename... Args>
  auto
  execute_until(Graph& graph, Pred&& pred, Args&&... args) noexcept -> void {
    graph.sync(
      [this, &graph](auto&& predicate, auto&&... as) {
        if (predicate(ripple_forward(as)...)) {
          execute(graph);
        }
      },
      ripple_forward(pred),
      ripple_forward(args)...);
    if (pred(ripple_forward(args)...)) {
      execute(graph);
    }
  }

  auto barrier() noexcept -> void {
    size_t gpu_id = 0;
    for (const auto& thread_state : thread_states_) {
      if (gpu_id >= topology().num_gpus()) {
        break;
      }
      if (thread_state.has_gpu_priority()) {
        topology().gpus[gpu_id++].execute_barrier();
      }
    }
    auto& state = thread_states_[thread_id];
    while (has_unfinished_work()) {
      execute_node_work(state, state.first_priority());
      execute_node_work(state, state.second_priority());
    }
  }

  auto fence() noexcept -> void {
    size_t gpu_id = 0;
    for (const auto& thread_state : thread_states_) {
      if (gpu_id >= topology().num_gpus()) {
        break;
      }
      if (thread_state.has_gpu_priority()) {
        topology().gpus[gpu_id++].synchronize_streams();
      }
    }
    auto& state = thread_states_[thread_id];
    while (has_unfinished_work()) {
      execute_node_work(state, state.first_priority());
      execute_node_work(state, state.second_priority());
    }
  }

  auto num_executed_nodes() const noexcept -> uint32_t {
    uint32_t sum = 0;
    for (size_t i = 0; i < thread_states_.size(); ++i) {
      sum += thread_states_[i].processed_nodes();
    }
    return sum;
  }

  auto has_unfinished_work() noexcept -> bool {
    return num_executed_nodes() < total_nodes_.load(std::memory_order_relaxed);
  }

  auto set_active_threads(int cpu_threads, int gpu_threads) noexcept -> void {
    int cpu_count = 0, gpu_count = 0;
    for (auto& state : thread_states_) {
      gpu_count += state.has_gpu_priority() && state.is_running() ? 1 : 0;
      cpu_count += state.has_cpu_priority() && state.is_running() ? 1 : 0;
    }

    int gpu_change = gpu_threads - gpu_count;
    int cpu_change = cpu_threads - cpu_count;
    gpu_count = 0, cpu_count = 0;

    auto modify = [](auto& state, auto& count, const auto& change) {
      if (state.is_running() && change < 0) {
        state.pause();
        count++;
      } else if (state.is_paused() && change > 0) {
        state.run();
        count++;
      }
    };

    for (auto& state : thread_states_) {
      state.max_gpu_threads = gpu_threads;
      state.max_cpu_threads = cpu_threads;
      if (state.has_gpu_priority() && gpu_count < std::abs(gpu_change)) {
        modify(state, gpu_count, gpu_change);
        state.priority = ExecutionKind::gpu;
        continue;
      }
      if (state.has_cpu_priority() && cpu_count < std::abs(cpu_change)) {
        modify(state, cpu_count, cpu_change);
        state.priority = ExecutionKind::cpu;
      }
    }
  }

 private:
  enum class ExecutionState : uint8_t {
    paused    = 0,
    run       = 1,
    terminate = 2
  };

  static constexpr size_t max_nodes_per_queue = 4096;

  using NodeCounter = std::atomic<uint32_t>;

  struct alignas(avoid_false_sharing_size) ThreadState {
    enum class RunState : uint8_t {
      shutdown = 0,
      paused   = 1,
      running  = 2
    };

    using SafeRunState = std::atomic<RunState>;

    uint32_t      id              = 0;
    uint32_t      max_cpu_threads = 0;
    uint32_t      max_gpu_threads = 0;
    NodeCounter   processed       = 0;
    SafeRunState  run_state       = RunState::paused;
    ExecutionKind priority        = ExecutionKind::gpu;

    /*==--- [construction] -------------------------------------------------==*/

    ThreadState(
      uint32_t      index,
      uint32_t      cpu_threads,
      uint32_t      gpu_threads,
      RunState      state,
      ExecutionKind p) noexcept
    : id{index},
      max_cpu_threads{cpu_threads},
      max_gpu_threads{gpu_threads},
      run_state{state},
      priority{p} {}

    ThreadState(ThreadState&& other) noexcept
    : id{other.id},
      max_cpu_threads{other.max_cpu_threads},
      max_gpu_threads{other.max_gpu_threads},
      run_state{other.run_state.load()},
      priority{other.priority} {
      other.run_state.store(RunState::paused, std::memory_order_relaxed);
    }

    ThreadState(const ThreadState&) = delete;

    /*==--- [methods] ------------------------------------------------------==*/

    auto processed_nodes() const noexcept -> uint32_t {
      return processed.load(std::memory_order_relaxed);
    }

    auto inc_processed_nodes() noexcept -> void {
      processed.fetch_add(1, std::memory_order_relaxed);
    }

    auto max_threads(ExecutionKind kind) noexcept -> uint32_t {
      return kind == ExecutionKind::gpu ? max_gpu_threads : max_cpu_threads;
    }

    auto has_cpu_priority() const noexcept -> bool {
      return priority == ExecutionKind::cpu;
    }

    auto has_gpu_priority() const noexcept -> bool {
      return priority == ExecutionKind::gpu;
    }

    auto first_priority() const noexcept -> ExecutionKind {
      return priority;
    }

    auto second_priority() const noexcept -> ExecutionKind {
      return ExecutionKind::cpu;
    }

    auto is_shutdown() const noexcept -> bool {
      return run_state.load(std::memory_order_relaxed) == RunState::shutdown;
    }

    auto is_paused() const noexcept -> bool {
      return run_state.load(std::memory_order_relaxed) == RunState::paused;
    }

    auto is_running() const noexcept -> bool {
      return run_state.load(std::memory_order_relaxed) == RunState::running;
    }

    auto shutdown() noexcept -> void {
      run_state.store(RunState::shutdown, std::memory_order_relaxed);
    }

    auto pause() noexcept -> void {
      return run_state.store(RunState::paused, std::memory_order_relaxed);
    }

    auto run() noexcept -> void {
      return run_state.store(RunState::running, std::memory_order_relaxed);
    }
  };

  // clang-format off
  using Threads      = std::vector<std::thread>;
  using ThreadStates = std::vector<ThreadState>;
  using Stealer      = NeighbourStealer;
//  std::variant<
//    RandomStealer, NeighbourStealer, TopoStealer>;

  using Queue     =
    StaticStealableDequeue<Graph::NodeType*, max_nodes_per_queue>;
  using Queues    = std::vector<Queue>;
  using ExecState = std::atomic<ExecutionState>;
  // clang-format on

  /*==--- [construction] ---------------------------------------------------==*/

  Executor(size_t cpu_threads, size_t gpu_threads)
  : stealer_{NeighbourStealer{}}, exec_state_{ExecutionState::run} {
    create_threads(cpu_threads, gpu_threads);
  }

  ~Executor() {
    fence();

    for (auto& thread_state : thread_states_) {
      thread_state.shutdown();
    }

    for (auto& thread : threads_) {
      if (thread.joinable()) {
        thread.join();
      }
    }
  }

  //==--- [members] --------------------------------------------------------==//

  ThreadStates thread_states_;
  Threads      threads_;
  Queues       cpu_queues_;
  Queues       gpu_queues_;
  Stealer      stealer_;
  ExecState    exec_state_  = ExecutionState::run;
  NodeCounter  total_nodes_ = 0;

  auto create_threads(uint32_t cpu_threads, uint32_t gpu_threads) -> void {
    const uint32_t total_threads = cpu_threads + gpu_threads;

    // Creating thread is always the main thread and has GPU priority so that
    // it can push work to both queue types.
    thread_id = 0;
    set_affinity(thread_id);
    gpu_queues_.emplace_back();
    cpu_queues_.emplace_back();
    thread_states_.emplace_back(
      uint32_t{0},
      total_threads,
      gpu_threads + 1,
      ThreadState::RunState::paused,
      ExecutionKind::gpu);

    for (uint32_t i = 1; i < total_threads; ++i) {
      if (i <= gpu_threads) {
        gpu_queues_.emplace_back();
      }
      cpu_queues_.emplace_back();
      thread_states_.emplace_back(
        i,
        total_threads,
        gpu_threads + 1,
        ThreadState::RunState::running,
        i <= gpu_threads ? ExecutionKind::gpu : ExecutionKind::cpu);
    }
    for (uint32_t i = 1; i < total_threads; ++i) {
      threads_.emplace_back(
        [&](ThreadState& state) {
          using namespace std::chrono_literals;
          thread_id = state.id;
          set_affinity(state.id);
          constexpr uint32_t sleep_fail_count = 100;

          uint32_t fails = 0;
          while (!state.is_shutdown()) {
            if (state.is_paused()) {
              std::this_thread::sleep_for(100us);
              continue;
            }

            // Try and do some work. If we did some work, then there is likely
            // some more work to do, otherwise we couldn't find any work to
            // execute, so we aren't executing.
            if (!execute_node_work(state, state.first_priority())) {
              if (!execute_node_work(state, state.second_priority())) {
                if (++fails != sleep_fail_count) {
                  continue;
                }
                // state.pause();
                // std::this_thread::sleep_for(5us);
              }
            }
            fails = 0;
          }
        },
        std::ref(thread_states_[i]));
    }
  }

  auto execute_node_work(ThreadState& thread_state, ExecutionKind kind) noexcept
    -> bool {
    auto& queue = get_queue(thread_state.id, kind);
    if (auto node = queue.pop()) {
      if (node.value()->try_run()) {
        thread_state.inc_processed_nodes();
        return true;
      }
      queue.push(*node);
    }

    // Steal if we are empty:
    return steal(thread_state, kind);
  }

  auto steal(ThreadState& thread_state, ExecutionKind kind) noexcept -> bool {
    // Keep trying to steal until we succeed or reach the max number of
    // attempts.
    uint32_t       steal_id     = thread_state.id;
    const uint32_t max_threads  = thread_state.max_threads(kind);
    const uint32_t max_id       = std::max(max_threads, uint32_t{1});
    const uint32_t max_attempts = std::max(max_id, uint32_t{5});
    for (uint32_t i = 0; i < max_attempts; ++i) {
      steal_id = stealer_(steal_id, max_id);
      //      std::visit(
      //        [&thread_state, steal_id, max_id](auto&& stealer) {
      //          return stealer(steal_id, max_id);
      //        },
      //        stealer_);

      if (steal_id == thread_state.id) {
        continue;
      }

      if (auto node = get_queue(steal_id, kind).steal()) {
        // If we stole a node, try and run it:
        if (node.value()->try_run()) {
          thread_state.inc_processed_nodes();
          return true;
        }
        // Node couldn't  run, so push it onto our queue.
        get_queue(thread_state.id, kind).push(*node);
      }
    }
    return false;
  }

  auto get_queue(size_t id, ExecutionKind kind) noexcept -> Queue& {
    if (kind == ExecutionKind::cpu) {
      return cpu_queues_[id];
    }
    return gpu_queues_[id];
  }
};

inline auto executor() -> Executor& {
  static Executor exec(13, 3);
  // topology().num_cores() - topology().num_gpus(), topology().num_gpus());

  return exec;
}

inline auto execute(Graph& graph) noexcept -> void {
  executor().execute(graph);
}

template <typename Pred, typename... Args>
inline auto
execute_until(Graph& graph, Pred&& pred, Args&&... args) noexcept -> void {
  executor().execute_until(
    graph, ripple_forward(pred), ripple_forward(args)...);
}

inline auto fence() noexcept -> void {
  executor().fence();
}

inline auto barrier() noexcept -> void {
  executor().barrier();
}

} // namespace ripple

#endif // RIPPLE_EXECUTION_EXECUTOR_HPP

Docs

Access comprehensive developer documentation for Ripple

View Docs

Tutorials

Get tutorials to help with understand all features

View Tutorials

Examples

Find examples to help get started

View Examples