• Docs >
  • Program Listing for File tensor.hpp
Shortcuts

Program Listing for File tensor.hpp

Return to documentation for file (include/ripple/container/tensor.hpp)

#ifndef RIPPLE_CONTAINER_TENSOR_HPP
#define RIPPLE_CONTAINER_TENSOR_HPP

#include "block.hpp"
#include "tensor_traits.hpp"
#include <ripple/arch/topology.hpp>
#include <ripple/functional/invoke.hpp>
#include <ripple/math/math.hpp>
#include <ripple/space/dynamic_multidim_space.hpp>
#include <ripple/utility/dim.hpp>
#include <array>
#include <cassert>
#include <numeric>
#include <set>

namespace ripple {

struct BlockExtractor;

template <typename T, size_t Dimensions>
class Tensor {
  friend struct BlockExtractor;

  // clang-format off
  using Block         = Block<T, Dimensions>;
  using Blocks        = HostBlock<Block, Dimensions>;
  using HostBlock     = HostBlock<T, Dimensions>;
  using DeviceBlock   = DeviceBlock<T, Dimensions>;
  using Space         = DynamicMultidimSpace<Dimensions>;
  using HostBlockIter = IndexedIterator<T, Space>;

  static constexpr size_t   min_split_threshold          = 3e5;
  static constexpr size_t   dims                         = Dimensions;
  static constexpr uint32_t default_blocks_per_partition = 1;

 public:
  using Size       = uint32_t;
  using BlockSplit = std::array<Size, dims>;
  using Partitions = std::array<Size, dims>;
  using Elements   = std::array<Size, dims>;
  using BlockIter  = typename block_traits_t<Blocks>::Iter;
  // clang-format on

  /*==--- [friends] --------------------------------------------------------==*/

  friend auto swap(Tensor& lhs, Tensor& rhs) noexcept -> void {
    invoke_generic(
      CpuExecutor(),
      [&](auto&& left_it, auto&& right_it) {
        using std::swap;
        swap(left_it->host_data, right_it->host_data);
        swap(left_it->device_data, right_it->device_data);

        auto stream = left_it->stream();
        left_it->device_data.set_stream(right_it->stream());
        right_it->device_data.set_stream(stream);
      },
      lhs.blocks_,
      rhs.blocks_);
  }

  /*==--- [construction] ---------------------------------------------------==*/

  Tensor() noexcept {}

  template <typename... Sizes, all_arithmetic_size_enable_t<dims, Sizes...> = 0>
  Tensor(Sizes&&... sizes)
  : space_{ripple_forward(sizes)...}, blocks_per_part_set_{true} {
    blocks_per_part_.fill(default_blocks_per_partition);
    partitions_.fill(1);
    size_t max_dim = 0, max_size = 0, dim = 0;
    for_each(
      [&max_dim, &dim, &max_size](auto&& dim_size) {
        if (dim_size > max_size) {
          max_size = dim_size;
          max_dim  = dim;
        }
        dim++;
      },
      ripple_forward(sizes)...);
    partitions_[max_dim] = topology().num_gpus();
    initialize();
  }

  template <typename... Sizes, all_arithmetic_size_enable_t<dims, Sizes...> = 0>
  Tensor(Partitions partitions_per_dim, Sizes&&... sizes)
  : space_{ripple_forward(sizes)...},
    partitions_{partitions_per_dim},
    blocks_per_part_set_{true} {
    blocks_per_part_.fill(default_blocks_per_partition);
    initialize();
  }

  template <
    typename... Sizes,
    all_arithmetic_size_enable_t<Dimensions, Sizes...> = 0>
  Tensor(Partitions partitions_per_dim, uint32_t padding, Sizes&&... sizes)
  : space_{padding, ripple_forward(sizes)...},
    partitions_{partitions_per_dim},
    blocks_per_part_set_{true} {
    blocks_per_part_.fill(default_blocks_per_partition);
    initialize();
  }

  template <
    typename... Sizes,
    all_arithmetic_size_enable_t<Dimensions, Sizes...> = 0>
  Tensor(
    Partitions partitions_per_dim,
    BlockSplit blocks_per_partition,
    Sizes&&... sizes)
  : space_{ripple_forward(sizes)...},
    partitions_{partitions_per_dim},
    blocks_per_part_{blocks_per_partition},
    blocks_per_part_set_{true} {
    initialize();
  }

  template <
    typename... Sizes,
    all_arithmetic_size_enable_t<Dimensions, Sizes...> = 0>
  Tensor(
    Partitions partitions_per_dim,
    BlockSplit blocks_per_partition,
    uint32_t   padding,
    Sizes&&... sizes)
  : space_{padding, ripple_forward(sizes)...},
    partitions_{partitions_per_dim},
    blocks_per_part_{blocks_per_partition},
    blocks_per_part_set_{true} {
    initialize();
  }

  /*===--- [operator overloads] --------------------------------------------==*/

  template <typename... Indices>
  auto operator()(Indices&&... indices) const noexcept -> HostBlockIter {
    std::array<int, dims> ids        = {static_cast<int>(indices)...};
    auto                  block_iter = blocks_.begin();
    unrolled_for<dims>([&](auto dim) {
      const int id = ids[dim] / block_sizes_[dim];
      block_iter.shift(dim, id);
    });
    block_iter->ensure_host_data_available();
    block_iter->synchronize();

    auto host_iter = block_iter->host_iterator();
    unrolled_for<dims>([&](auto dim) {
      const int offset = ids[dim] % block_sizes_[dim];
      host_iter.shift(dim, offset);
    });
    return host_iter;
  }

  /*==--- [interface] ------------------------------------------------------==*/

  template <typename Dim>
  auto size(Dim&& dim) const noexcept -> size_t {
    return space_.internal_size(ripple_forward(dim));
  }

  template <typename Dim>
  auto pitch(Dim&& dim) const noexcept -> size_t {
    return space_.size(ripple_forward(dim));
  }

  auto padding() const noexcept -> size_t {
    return space_.padding();
  }

  template <typename Dim>
  auto set_partitions(Dim&& dim, size_t partitions) noexcept -> void {
    partitions_[dim] = partitions;
  }

  template <typename Dim>
  auto set_sub_partitions(Dim&& dim, size_t sub_partitions) noexcept -> void {
    blocks_per_part_[dim] = sub_partitions;
    blocks_per_part_set_  = true;
  }

  template <typename Dim>
  auto resize_dim(Dim&& dim, size_t size) noexcept -> void {
    space_.resize_dim(ripple_forward(dim), size);
  }

  auto set_padding(size_t padding) noexcept -> void {
    space_.padding() = padding;
  }

  auto reallocate(const ripple::StreamMap& stream_map = ripple::StreamMap())
    -> void {
    initialize(stream_map);
  }

  auto partitions() const noexcept -> Size {
    constexpr Size init = 1;
    return std::accumulate(
      partitions_.cbegin(), partitions_.cend(), init, std::multiplies<Size>());
  }

  auto partition_size() const -> Size {
    return std::accumulate(
      partition_sizes_.cbegin(),
      partition_sizes_.cend(),
      Size{1},
      std::multiplies<Size>());
  }

  template <typename Dim>
  auto blocks_in_dim(Dim&& dim) const noexcept -> size_t {
    return blocks_.size(ripple_forward(dim));
  }

  auto begin() noexcept -> BlockIter {
    return blocks_.begin();
  }

  auto diagonal_blocks() const noexcept -> size_t {
    float sum = 0.0;
    unrolled_for<dims>(
      [&](auto dim) { sum += blocks_in_dim(dim) * blocks_in_dim(dim); });
    return static_cast<size_t>(std::ceil(std::sqrt(sum)));
  }

  auto diagonal_block_size() const noexcept -> size_t {
    float sum = 0.0;
    unrolled_for<dims>(
      [&](auto dim) { sum += block_sizes_[dim] * block_sizes_[dim]; });
    return static_cast<size_t>(std::ceil(std::sqrt(sum)));
  }

  auto diagonal_size() const noexcept -> size_t {
    float sum = 0.0;
    unrolled_for<dims>([&](auto dim) {
      sum += space_.internal_size(dim) * space_.internal_size(dim);
    });
    return static_cast<size_t>(std::ceil(std::sqrt(sum)));
  }

 private:
  Space      space_;
  Partitions partitions_;
  BlockSplit blocks_per_part_;
  Elements   partition_sizes_ = {};
  Elements   block_sizes_     = {};
  Blocks     blocks_;
  bool       blocks_per_part_set_ = false;

  auto initialize(ripple::StreamMap stream_map = ripple::StreamMap()) -> void {
    blocks_.set_op_kind(BlockOpKind::synchronous);
    check_partitions();

    // Resize the block container:
    unrolled_for<dims>([&](auto dim) {
      if (!blocks_per_part_set_) {
        blocks_per_part_[dim] = default_blocks_per_partition;
      }

      block_sizes_[dim] =
        math::div_then_ceil(partition_sizes_[dim], blocks_per_part_[dim]);
      blocks_.resize_dim(
        dim, math::div_then_ceil(space_.internal_size(dim), block_sizes_[dim]));

      // assert(
      //  blocks_.size(dim) <= blocks_per_part_[dim] * partition_sizes_[dim] &&
      //  "Inavlid number of blocks per gpu!");
    });
    blocks_.reallocate_and_init();
    allocate_data_for_blocks(stream_map);
  }

  auto check_partitions() -> void {
    const auto parts = partitions();
    if (parts > topology().num_gpus()) {
      // TODO: CHange this to some default behaviour:
      assert(false && "More partitions specified than available gpus!");
    }

    if (parts <= topology().num_gpus()) {
      unrolled_for<dims>([&](auto dim) {
        partition_sizes_[dim] =
          math::div_then_ceil(space_.internal_size(dim), partitions_[dim]);
      });
      return;
    }

    set_default_partition();
  };

  auto largest_partition_dim() const noexcept -> size_t {
    size_t index = 0;
    auto   max   = partition_sizes_[0];
    unrolled_for<dims - 1>([&](auto d) {
      constexpr size_t dim = d + 1;
      if (partition_sizes_[dim] > max) {
        max   = partition_sizes_[dim];
        index = dim;
      }
    });
    return index;
  }

  auto set_default_partition() noexcept -> void {
    for (size_t i = 0; i < dims; ++i) {
      partitions_[i]      = 1;
      partition_sizes_[i] = space_.internal_size(i);
    }

    // Find the scaling factor:
    size_t scaling_factor = 2;
    while (topology().num_gpus() % scaling_factor != 0) {
      scaling_factor++;
    }

    size_t dim = 0;
    while (partition_size() > min_split_threshold) {
      // Split the latgest dimension, this assumes that the number of gpus are
      // a power of two
      dim = largest_partition_dim();
      partitions_[dim] *= scaling_factor;

      if (partitions() > topology().num_gpus()) {
        partitions_[dim] /= scaling_factor;
        return;
      }

      partition_sizes_[dim] /= scaling_factor;
    }
  }

  auto allocate_data_for_blocks(const ripple::StreamMap& stream_map) -> void {
    invoke(blocks_, [&](auto block) {
      auto& host   = block->host_data;
      auto& device = block->device_data;

      // Set the host component of the block to enable asynchronous operations
      // so that compute and transfer can be overlapped:
      host.set_op_kind(BlockOpKind::asynchronous);

      // Now set the padding for the block:
      host.set_padding(space_.padding());
      device.set_padding(space_.padding());

      size_t prev_dim_partitions = 1, id = 0;
      unrolled_for<dims>([&](auto dim) {
        block->indices[dim]      = global_idx(dim);
        block->block_sizes[dim]  = block_sizes_[dim];
        block->global_sizes[dim] = space_.internal_size(dim);
        block->max_indices[dim]  = blocks_.size(dim) - 1;

        const Size elements_start = global_idx(dim) * block_sizes_[dim];
        const Size block_size     = std::min(
          space_.internal_size(dim) - elements_start, block_sizes_[dim]);

        host.resize_dim(dim, block_size);
        device.resize_dim(dim, block_size);

        id += block->indices[dim] / blocks_per_part_[dim] * prev_dim_partitions;
        prev_dim_partitions *= partitions_[dim];
      });

      // Set all the gpu data:
      GpuInfo& gpu = topology().gpus[id];
      block->set_device_id(gpu.index);
      block->set_transfer_stream(
        gpu.streams[gpu.next_transfer_stream_id()].stream);
      gpu::set_device(gpu.index);

      // Allocate the host memory:
      host.reallocate();

      // Here we either use the supplied stream, or the first one.
      auto stream_id =
        stream_map.find(id) != stream_map.end() ? stream_map.at(id) : 0;

      // Now alloate device data:
      auto& stream = gpu.streams[stream_id].stream;
      device.set_stream(stream);
      device.reallocate();
      gpu::synchronize_stream(stream);

      block->data_state = DataState::updated_device;
    });
  }
};

} // namespace ripple

#endif // RIPPLE_CONTAINER_TENSOR_HPP

Docs

Access comprehensive developer documentation for Ripple

View Docs

Tutorials

Get tutorials to help with understand all features

View Tutorials

Examples

Find examples to help get started

View Examples