Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cpp/benchmarks/bench_pack.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <rapidsmpf/memory/cuda_memcpy_async.hpp>
#include <rapidsmpf/memory/pinned_memory_resource.hpp>
#include <rapidsmpf/utils/misc.hpp>

#include "utils/random_data.hpp"

Expand All @@ -44,7 +45,7 @@ void run_pack(

// Calculate number of rows for a single-column table of the desired size
auto const nrows =
static_cast<cudf::size_type>(table_size_bytes / sizeof(random_data_t));
rapidsmpf::safe_cast<cudf::size_type>(table_size_bytes / sizeof(random_data_t));
auto table = random_table(1, nrows, 0, 1000, stream, table_mr);

// Warm up
Expand Down Expand Up @@ -120,7 +121,8 @@ void run_chunked_pack(
rmm::cuda_stream_view stream
) {
// Calculate number of rows for a single-column table of the desired size
auto const nrows = static_cast<cudf::size_type>(table_size / sizeof(random_data_t));
auto const nrows =
rapidsmpf::safe_cast<cudf::size_type>(table_size / sizeof(random_data_t));
auto table = random_table(1, nrows, 0, 1000, stream, table_mr);

// Create the chunked_pack instance to get total output size
Expand Down
18 changes: 7 additions & 11 deletions cpp/benchmarks/bench_shuffle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <rapidsmpf/progress_thread.hpp>
#include <rapidsmpf/shuffler/shuffler.hpp>
#include <rapidsmpf/statistics.hpp>
#include <rapidsmpf/utils/misc.hpp>
#include <rapidsmpf/utils/string.hpp>

#ifdef RAPIDSMPF_HAVE_CUPTI
Expand Down Expand Up @@ -373,28 +374,23 @@ std::vector<InputPartitionsT> generate_input_partitions(
rapidsmpf::BufferResource* br,
TransformFn&& transform_fn
) {
auto const num_columns = rapidsmpf::safe_cast<cudf::size_type>(args.num_columns);
auto const num_local_rows =
rapidsmpf::safe_cast<cudf::size_type>(args.num_local_rows);
std::int32_t const min_val = 0;
std::int32_t const max_val = args.num_local_rows;
std::int32_t const max_val = num_local_rows;

std::vector<InputPartitionsT> input_partitions;
input_partitions.reserve(args.num_local_partitions);
for (rapidsmpf::shuffler::PartID i = 0; i < args.num_local_partitions; ++i) {
std::size_t size_lb = random_table_size_lower_bound(
static_cast<cudf::size_type>(args.num_columns),
static_cast<cudf::size_type>(args.num_local_rows)
);
std::size_t size_lb = random_table_size_lower_bound(num_columns, num_local_rows);

// reserve at least size_lb and spill if necessary.
auto res = br->reserve_device_memory_and_spill(
size_lb, args.input_data_allow_overbooking
);
cudf::table table = random_table(
static_cast<cudf::size_type>(args.num_columns),
static_cast<cudf::size_type>(args.num_local_rows),
min_val,
max_val,
stream,
br->device_mr()
num_columns, num_local_rows, min_val, max_val, stream, br->device_mr()
);
input_partitions.emplace_back(transform_fn(std::move(table)));
}
Expand Down
8 changes: 6 additions & 2 deletions cpp/benchmarks/streaming/bench_streaming_shuffle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <rapidsmpf/streaming/core/context.hpp>
#include <rapidsmpf/streaming/cudf/partition.hpp>
#include <rapidsmpf/streaming/cudf/table_chunk.hpp>
#include <rapidsmpf/utils/misc.hpp>
#include <rapidsmpf/utils/string.hpp>

#include "../utils/misc.hpp"
Expand Down Expand Up @@ -263,14 +264,17 @@ rapidsmpf::Duration run(
std::vector<rapidsmpf::streaming::Actor> actors;
{
auto ch1 = ctx->create_channel();
auto const num_columns = rapidsmpf::safe_cast<cudf::size_type>(args.num_columns);
auto const num_local_rows =
rapidsmpf::safe_cast<cudf::size_type>(args.num_local_rows);
actors.push_back(
rapidsmpf::streaming::actor::random_table_generator(
ctx,
stream,
ch1,
args.num_local_partitions,
static_cast<cudf::size_type>(args.num_columns),
static_cast<cudf::size_type>(args.num_local_rows),
num_columns,
num_local_rows,
min_val,
max_val
)
Expand Down
4 changes: 3 additions & 1 deletion cpp/benchmarks/streaming/data_generator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <rapidsmpf/streaming/core/channel.hpp>
#include <rapidsmpf/streaming/core/context.hpp>
#include <rapidsmpf/streaming/cudf/table_chunk.hpp>
#include <rapidsmpf/utils/misc.hpp>

#include "../utils/random_data.hpp"

Expand Down Expand Up @@ -52,7 +53,8 @@ inline Actor random_table_generator(
) {
ShutdownAtExit c{ch_out};
co_await ctx->executor()->schedule();
auto nbytes = static_cast<std::size_t>(ncolumns * nrows) * sizeof(std::int32_t);
auto nbytes = rapidsmpf::safe_cast<std::size_t>(ncolumns)
* rapidsmpf::safe_cast<std::size_t>(nrows) * sizeof(std::int32_t);
for (std::uint64_t seq = 0; seq < num_blocks; ++seq) {
auto res =
ctx->br()->reserve_device_memory_and_spill(nbytes, AllowOverbooking::NO);
Expand Down
34 changes: 26 additions & 8 deletions cpp/benchmarks/utils/random_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
* SPDX-License-Identifier: Apache-2.0
*/

#include <algorithm>
#include <cstdint>
#include <limits>

#include <thrust/random.h>
#include <thrust/transform.h>

Expand All @@ -13,25 +17,32 @@
#include <rmm/exec_policy.hpp>

#include <rapidsmpf/memory/cuda_memcpy_async.hpp>
#include <rapidsmpf/utils/misc.hpp>

#include "random_data.hpp"

rmm::device_uvector<std::int32_t> random_device_vector(
cudf::size_type nelem,
std::size_t nelem,
std::int32_t min_val,
std::int32_t max_val,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr
) {
// Fill vector with random data.
rmm::device_uvector<std::int32_t> vec(static_cast<std::size_t>(nelem), stream, mr);
using index_t = std::int64_t;
auto const end_index = rapidsmpf::safe_cast<index_t>(nelem);
rmm::device_uvector<std::int32_t> vec(nelem, stream, mr);
thrust::counting_iterator<index_t> const begin(0);
thrust::counting_iterator<index_t> const end(end_index);
thrust::transform(
rmm::exec_policy(stream),
thrust::make_counting_iterator(0),
thrust::make_counting_iterator(nelem),
begin,
end,
vec.begin(),
[min_val, max_val] __device__(cudf::size_type index) {
thrust::default_random_engine engine(index); // HACK: use the seed as index
[min_val, max_val] __device__(index_t index) {
thrust::default_random_engine engine(
static_cast<thrust::default_random_engine::result_type>(index)
);
thrust::uniform_int_distribution<std::int32_t> dist(min_val, max_val);
return dist(engine);
}
Expand All @@ -46,7 +57,9 @@ std::unique_ptr<cudf::column> random_column(
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr
) {
auto vec = random_device_vector(nrows, min_val, max_val, stream, mr);
auto vec = random_device_vector(
rapidsmpf::safe_cast<std::size_t>(nrows), min_val, max_val, stream, mr
);
return std::make_unique<cudf::column>(
std::move(vec), rmm::device_buffer{0, stream, mr}, 0
);
Expand All @@ -71,8 +84,13 @@ void random_fill(rapidsmpf::Buffer& buffer, rmm::device_async_resource_ref mr) {
switch (buffer.mem_type()) {
case rapidsmpf::MemoryType::DEVICE:
{
auto const num_elements = std::max<std::size_t>(
std::size_t{1},
buffer.size / sizeof(random_data_t)
+ (buffer.size % sizeof(random_data_t) != 0)
);
auto vec = random_device_vector(
buffer.size / sizeof(std::int32_t) + sizeof(std::int32_t),
num_elements,
std::numeric_limits<std::int32_t>::min(),
std::numeric_limits<std::int32_t>::max(),
buffer.stream(),
Expand Down
9 changes: 7 additions & 2 deletions cpp/benchmarks/utils/random_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@
*/
#pragma once

#include <cstddef>
#include <cstdint>

#include <cudf/column/column.hpp>
#include <cudf/table/table.hpp>
#include <cudf/types.hpp>

#include <rapidsmpf/memory/buffer.hpp>
#include <rapidsmpf/utils/misc.hpp>


/**
Expand All @@ -25,7 +29,8 @@ using random_data_t = std::int32_t;
std::size_t constexpr random_table_size_lower_bound(
cudf::size_type ncolumns, cudf::size_type nrows
) {
return static_cast<std::size_t>(ncolumns * nrows) * sizeof(random_data_t);
return rapidsmpf::safe_cast<std::size_t>(ncolumns)
* rapidsmpf::safe_cast<std::size_t>(nrows) * sizeof(random_data_t);
}

/**
Expand All @@ -44,7 +49,7 @@ std::size_t constexpr random_table_size_lower_bound(
* @note The function uses the specified CUDA stream for asynchronous operations.
*/
rmm::device_uvector<std::int32_t> random_device_vector(
cudf::size_type nelem,
std::size_t nelem,
std::int32_t min_val,
std::int32_t max_val,
rmm::cuda_stream_view stream,
Expand Down
Loading