Program Listing for File tensor.hpp¶
↰ Return to documentation for file (include/ripple/container/tensor.hpp
)
#ifndef RIPPLE_CONTAINER_TENSOR_HPP
#define RIPPLE_CONTAINER_TENSOR_HPP
#include "block.hpp"
#include "tensor_traits.hpp"
#include <ripple/arch/topology.hpp>
#include <ripple/functional/invoke.hpp>
#include <ripple/math/math.hpp>
#include <ripple/space/dynamic_multidim_space.hpp>
#include <ripple/utility/dim.hpp>
#include <array>
#include <cassert>
#include <numeric>
#include <set>
namespace ripple {
struct BlockExtractor;
template <typename T, size_t Dimensions>
class Tensor {
friend struct BlockExtractor;
// clang-format off
using Block = Block<T, Dimensions>;
using Blocks = HostBlock<Block, Dimensions>;
using HostBlock = HostBlock<T, Dimensions>;
using DeviceBlock = DeviceBlock<T, Dimensions>;
using Space = DynamicMultidimSpace<Dimensions>;
using HostBlockIter = IndexedIterator<T, Space>;
static constexpr size_t min_split_threshold = 3e5;
static constexpr size_t dims = Dimensions;
static constexpr uint32_t default_blocks_per_partition = 1;
public:
using Size = uint32_t;
using BlockSplit = std::array<Size, dims>;
using Partitions = std::array<Size, dims>;
using Elements = std::array<Size, dims>;
using BlockIter = typename block_traits_t<Blocks>::Iter;
// clang-format on
/*==--- [friends] --------------------------------------------------------==*/
friend auto swap(Tensor& lhs, Tensor& rhs) noexcept -> void {
invoke_generic(
CpuExecutor(),
[&](auto&& left_it, auto&& right_it) {
using std::swap;
swap(left_it->host_data, right_it->host_data);
swap(left_it->device_data, right_it->device_data);
auto stream = left_it->stream();
left_it->device_data.set_stream(right_it->stream());
right_it->device_data.set_stream(stream);
},
lhs.blocks_,
rhs.blocks_);
}
/*==--- [construction] ---------------------------------------------------==*/
Tensor() noexcept {}
template <typename... Sizes, all_arithmetic_size_enable_t<dims, Sizes...> = 0>
Tensor(Sizes&&... sizes)
: space_{ripple_forward(sizes)...}, blocks_per_part_set_{true} {
blocks_per_part_.fill(default_blocks_per_partition);
partitions_.fill(1);
size_t max_dim = 0, max_size = 0, dim = 0;
for_each(
[&max_dim, &dim, &max_size](auto&& dim_size) {
if (dim_size > max_size) {
max_size = dim_size;
max_dim = dim;
}
dim++;
},
ripple_forward(sizes)...);
partitions_[max_dim] = topology().num_gpus();
initialize();
}
template <typename... Sizes, all_arithmetic_size_enable_t<dims, Sizes...> = 0>
Tensor(Partitions partitions_per_dim, Sizes&&... sizes)
: space_{ripple_forward(sizes)...},
partitions_{partitions_per_dim},
blocks_per_part_set_{true} {
blocks_per_part_.fill(default_blocks_per_partition);
initialize();
}
template <
typename... Sizes,
all_arithmetic_size_enable_t<Dimensions, Sizes...> = 0>
Tensor(Partitions partitions_per_dim, uint32_t padding, Sizes&&... sizes)
: space_{padding, ripple_forward(sizes)...},
partitions_{partitions_per_dim},
blocks_per_part_set_{true} {
blocks_per_part_.fill(default_blocks_per_partition);
initialize();
}
template <
typename... Sizes,
all_arithmetic_size_enable_t<Dimensions, Sizes...> = 0>
Tensor(
Partitions partitions_per_dim,
BlockSplit blocks_per_partition,
Sizes&&... sizes)
: space_{ripple_forward(sizes)...},
partitions_{partitions_per_dim},
blocks_per_part_{blocks_per_partition},
blocks_per_part_set_{true} {
initialize();
}
template <
typename... Sizes,
all_arithmetic_size_enable_t<Dimensions, Sizes...> = 0>
Tensor(
Partitions partitions_per_dim,
BlockSplit blocks_per_partition,
uint32_t padding,
Sizes&&... sizes)
: space_{padding, ripple_forward(sizes)...},
partitions_{partitions_per_dim},
blocks_per_part_{blocks_per_partition},
blocks_per_part_set_{true} {
initialize();
}
/*===--- [operator overloads] --------------------------------------------==*/
template <typename... Indices>
auto operator()(Indices&&... indices) const noexcept -> HostBlockIter {
std::array<int, dims> ids = {static_cast<int>(indices)...};
auto block_iter = blocks_.begin();
unrolled_for<dims>([&](auto dim) {
const int id = ids[dim] / block_sizes_[dim];
block_iter.shift(dim, id);
});
block_iter->ensure_host_data_available();
block_iter->synchronize();
auto host_iter = block_iter->host_iterator();
unrolled_for<dims>([&](auto dim) {
const int offset = ids[dim] % block_sizes_[dim];
host_iter.shift(dim, offset);
});
return host_iter;
}
/*==--- [interface] ------------------------------------------------------==*/
template <typename Dim>
auto size(Dim&& dim) const noexcept -> size_t {
return space_.internal_size(ripple_forward(dim));
}
template <typename Dim>
auto pitch(Dim&& dim) const noexcept -> size_t {
return space_.size(ripple_forward(dim));
}
auto padding() const noexcept -> size_t {
return space_.padding();
}
template <typename Dim>
auto set_partitions(Dim&& dim, size_t partitions) noexcept -> void {
partitions_[dim] = partitions;
}
template <typename Dim>
auto set_sub_partitions(Dim&& dim, size_t sub_partitions) noexcept -> void {
blocks_per_part_[dim] = sub_partitions;
blocks_per_part_set_ = true;
}
template <typename Dim>
auto resize_dim(Dim&& dim, size_t size) noexcept -> void {
space_.resize_dim(ripple_forward(dim), size);
}
auto set_padding(size_t padding) noexcept -> void {
space_.padding() = padding;
}
auto reallocate(const ripple::StreamMap& stream_map = ripple::StreamMap())
-> void {
initialize(stream_map);
}
auto partitions() const noexcept -> Size {
constexpr Size init = 1;
return std::accumulate(
partitions_.cbegin(), partitions_.cend(), init, std::multiplies<Size>());
}
auto partition_size() const -> Size {
return std::accumulate(
partition_sizes_.cbegin(),
partition_sizes_.cend(),
Size{1},
std::multiplies<Size>());
}
template <typename Dim>
auto blocks_in_dim(Dim&& dim) const noexcept -> size_t {
return blocks_.size(ripple_forward(dim));
}
auto begin() noexcept -> BlockIter {
return blocks_.begin();
}
auto diagonal_blocks() const noexcept -> size_t {
float sum = 0.0;
unrolled_for<dims>(
[&](auto dim) { sum += blocks_in_dim(dim) * blocks_in_dim(dim); });
return static_cast<size_t>(std::ceil(std::sqrt(sum)));
}
auto diagonal_block_size() const noexcept -> size_t {
float sum = 0.0;
unrolled_for<dims>(
[&](auto dim) { sum += block_sizes_[dim] * block_sizes_[dim]; });
return static_cast<size_t>(std::ceil(std::sqrt(sum)));
}
auto diagonal_size() const noexcept -> size_t {
float sum = 0.0;
unrolled_for<dims>([&](auto dim) {
sum += space_.internal_size(dim) * space_.internal_size(dim);
});
return static_cast<size_t>(std::ceil(std::sqrt(sum)));
}
private:
Space space_;
Partitions partitions_;
BlockSplit blocks_per_part_;
Elements partition_sizes_ = {};
Elements block_sizes_ = {};
Blocks blocks_;
bool blocks_per_part_set_ = false;
auto initialize(ripple::StreamMap stream_map = ripple::StreamMap()) -> void {
blocks_.set_op_kind(BlockOpKind::synchronous);
check_partitions();
// Resize the block container:
unrolled_for<dims>([&](auto dim) {
if (!blocks_per_part_set_) {
blocks_per_part_[dim] = default_blocks_per_partition;
}
block_sizes_[dim] =
math::div_then_ceil(partition_sizes_[dim], blocks_per_part_[dim]);
blocks_.resize_dim(
dim, math::div_then_ceil(space_.internal_size(dim), block_sizes_[dim]));
// assert(
// blocks_.size(dim) <= blocks_per_part_[dim] * partition_sizes_[dim] &&
// "Inavlid number of blocks per gpu!");
});
blocks_.reallocate_and_init();
allocate_data_for_blocks(stream_map);
}
auto check_partitions() -> void {
const auto parts = partitions();
if (parts > topology().num_gpus()) {
// TODO: CHange this to some default behaviour:
assert(false && "More partitions specified than available gpus!");
}
if (parts <= topology().num_gpus()) {
unrolled_for<dims>([&](auto dim) {
partition_sizes_[dim] =
math::div_then_ceil(space_.internal_size(dim), partitions_[dim]);
});
return;
}
set_default_partition();
};
auto largest_partition_dim() const noexcept -> size_t {
size_t index = 0;
auto max = partition_sizes_[0];
unrolled_for<dims - 1>([&](auto d) {
constexpr size_t dim = d + 1;
if (partition_sizes_[dim] > max) {
max = partition_sizes_[dim];
index = dim;
}
});
return index;
}
auto set_default_partition() noexcept -> void {
for (size_t i = 0; i < dims; ++i) {
partitions_[i] = 1;
partition_sizes_[i] = space_.internal_size(i);
}
// Find the scaling factor:
size_t scaling_factor = 2;
while (topology().num_gpus() % scaling_factor != 0) {
scaling_factor++;
}
size_t dim = 0;
while (partition_size() > min_split_threshold) {
// Split the latgest dimension, this assumes that the number of gpus are
// a power of two
dim = largest_partition_dim();
partitions_[dim] *= scaling_factor;
if (partitions() > topology().num_gpus()) {
partitions_[dim] /= scaling_factor;
return;
}
partition_sizes_[dim] /= scaling_factor;
}
}
auto allocate_data_for_blocks(const ripple::StreamMap& stream_map) -> void {
invoke(blocks_, [&](auto block) {
auto& host = block->host_data;
auto& device = block->device_data;
// Set the host component of the block to enable asynchronous operations
// so that compute and transfer can be overlapped:
host.set_op_kind(BlockOpKind::asynchronous);
// Now set the padding for the block:
host.set_padding(space_.padding());
device.set_padding(space_.padding());
size_t prev_dim_partitions = 1, id = 0;
unrolled_for<dims>([&](auto dim) {
block->indices[dim] = global_idx(dim);
block->block_sizes[dim] = block_sizes_[dim];
block->global_sizes[dim] = space_.internal_size(dim);
block->max_indices[dim] = blocks_.size(dim) - 1;
const Size elements_start = global_idx(dim) * block_sizes_[dim];
const Size block_size = std::min(
space_.internal_size(dim) - elements_start, block_sizes_[dim]);
host.resize_dim(dim, block_size);
device.resize_dim(dim, block_size);
id += block->indices[dim] / blocks_per_part_[dim] * prev_dim_partitions;
prev_dim_partitions *= partitions_[dim];
});
// Set all the gpu data:
GpuInfo& gpu = topology().gpus[id];
block->set_device_id(gpu.index);
block->set_transfer_stream(
gpu.streams[gpu.next_transfer_stream_id()].stream);
gpu::set_device(gpu.index);
// Allocate the host memory:
host.reallocate();
// Here we either use the supplied stream, or the first one.
auto stream_id =
stream_map.find(id) != stream_map.end() ? stream_map.at(id) : 0;
// Now alloate device data:
auto& stream = gpu.streams[stream_id].stream;
device.set_stream(stream);
device.reallocate();
gpu::synchronize_stream(stream);
block->data_state = DataState::updated_device;
});
}
};
} // namespace ripple
#endif // RIPPLE_CONTAINER_TENSOR_HPP