diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 72c21f7e2..a892cbce4 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -177,6 +177,7 @@ add_library( src/config.cpp src/cuda_event.cpp src/integrations/cudf/bloom_filter.cu + src/integrations/cudf/pack.cpp src/integrations/cudf/partition.cpp src/integrations/cudf/utils.cpp src/memory/buffer.cpp diff --git a/cpp/include/rapidsmpf/integrations/cudf/pack.hpp b/cpp/include/rapidsmpf/integrations/cudf/pack.hpp new file mode 100644 index 000000000..c7c45e187 --- /dev/null +++ b/cpp/include/rapidsmpf/integrations/cudf/pack.hpp @@ -0,0 +1,150 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ +#pragma once + +#include + +#include +#include + +#include +#include +#include +#include + +namespace rapidsmpf { + + +/** + * @brief Pack a cudf table view into a contiguous buffer using chunked packing. + * + * This function serializes the given table view into a `PackedData` object + * using a bounce buffer for chunked transfer. This is useful when packing to + * host memory to avoid allocating temporary device memory for the entire table. + * + * @param table The table view to pack. + * @param stream CUDA stream used for device memory operations and kernel launches. + * @param bounce_buffer Device buffer used as intermediate storage during chunked packing. + * @param pack_temp_mr Temporary memory resource used for packing. + * @param reservation Memory reservation to use for allocating the packed data buffer. + * @return A unique pointer to the packed data containing the serialized table. + * + * @throws rapidsmpf::reservation_error If the allocation size exceeds the reservation. + * + * @see cudf::chunked_pack + */ +[[nodiscard]] std::unique_ptr chunked_pack( + cudf::table_view const& table, + rmm::cuda_stream_view stream, + rmm::device_buffer& bounce_buffer, + rmm::device_async_resource_ref pack_temp_mr, + MemoryReservation& reservation +); + +namespace detail { + +/** + * @brief Pack a cudf table view into a contiguous device buffer. + * + * Uses cudf::pack(). Returns a `PackedData` with a `Buffer` backed by + * `rmm::device_buffer`. The memory is allocated using the provided reservation. + * + * @param table The table view to pack. + * @param stream CUDA stream used for device memory operations and kernel launches. + * @param reservation Device memory reservation. Must have memory type DEVICE. + * @return A unique pointer to the packed data containing the serialized table. + * + * @throws std::invalid_argument If the reservation's memory type is not DEVICE. + * @throws rapidsmpf::reservation_error If the allocation size exceeds the reservation. + * + * @see cudf::pack + */ +[[nodiscard]] std::unique_ptr pack_device( + cudf::table_view const& table, + rmm::cuda_stream_view stream, + MemoryReservation& reservation +); + +/** + * @brief Pack a cudf table view into a contiguous pinned host buffer. + * + * Uses cudf::pack() with a pinned memory resource. Returns a `PackedData` with + * a `Buffer` backed by a pinned `HostBuffer`. The memory is allocated using + * the provided reservation. + * + * @param table The table view to pack. + * @param stream CUDA stream used for device memory operations and kernel launches. + * @param reservation Pinned host memory reservation. Must have memory type PINNED_HOST. + * @return A unique pointer to the packed data containing the serialized table. + * + * @throws std::invalid_argument If the reservation's memory type is not PINNED_HOST. + * @throws rapidsmpf::reservation_error If the allocation size exceeds the reservation. + * + * @see cudf::pack + */ +[[nodiscard]] std::unique_ptr pack_pinned_host( + cudf::table_view const& table, + rmm::cuda_stream_view stream, + MemoryReservation& reservation +); + +/** + * @brief Pack a cudf table view into a contiguous host buffer. + * + * Uses cudf::chunked_pack() with a device bounce buffer when available, + * otherwise a pinned bounce buffer. Returns a `PackedData` with a `Buffer` + * backed by a `HostBuffer`. The memory is allocated using the provided reservation. + * + * Algorithm: + * 1. Special case: empty tables return immediately with empty packed data. + * 2. Fast path for small tables (< 1MB): pack directly on device and copy to host. + * 3. Estimate the table size (est_size), with a minimum of 1MB. + * 4. Try to reserve device memory for est_size with overbooking allowed. + * 5. If available device memory (reservation - overbooking) >= 1MB, + * use chunked packing with the device bounce buffer. + * 6. Otherwise, if pinned memory is available, retry with pinned memory (steps 4-5). + * 7. If all attempts fail, throw an error. + * + * @param table The table view to pack. + * @param stream CUDA stream used for device memory operations and kernel launches. + * @param reservation Host memory reservation. Must have memory type HOST. + * @return A unique pointer to the packed data containing the serialized table. + * + * @throws std::invalid_argument If the reservation's memory type is not HOST. + * @throws rapidsmpf::reservation_error If the allocation size exceeds the reservation. + * + * @see cudf::chunked_pack + */ +[[nodiscard]] std::unique_ptr pack_host( + cudf::table_view const& table, + rmm::cuda_stream_view stream, + MemoryReservation& reservation +); + +} // namespace detail + +/** + * @brief Pack a cudf table view into a contiguous buffer. + * + * This function serializes the given table view into a `PackedData` object + * with the data buffer residing in the memory type of the provided reservation. + * The memory for the packed data is allocated using the provided reservation. + * + * @param table The table view to pack. + * @param stream CUDA stream used for device memory operations and kernel launches. + * @param reservation Memory reservation to use for allocating the packed data buffer. + * @return A unique pointer to the packed data containing the serialized table. + * + * @throws rapidsmpf::reservation_error If the allocation size exceeds the reservation. + * + * @see cudf::pack + */ +[[nodiscard]] std::unique_ptr pack( + cudf::table_view const& table, + rmm::cuda_stream_view stream, + MemoryReservation& reservation +); + +} // namespace rapidsmpf diff --git a/cpp/include/rapidsmpf/memory/buffer_resource.hpp b/cpp/include/rapidsmpf/memory/buffer_resource.hpp index 3b1dc9a6b..62bb2c6ee 100644 --- a/cpp/include/rapidsmpf/memory/buffer_resource.hpp +++ b/cpp/include/rapidsmpf/memory/buffer_resource.hpp @@ -136,6 +136,22 @@ class BufferResource { */ [[nodiscard]] rmm::host_async_resource_ref pinned_mr(); + /** + * @brief Get the RMM pinned host memory resource as a device resource reference. + * + * @return Reference to the RMM resource used for pinned host allocations. + */ + [[nodiscard]] rmm::device_async_resource_ref pinned_mr_as_device(); + + /** + * @brief Check if pinned memory is available. + * + * @return true if pinned memory is available, false otherwise. + */ + [[nodiscard]] bool is_pinned_memory_available() const noexcept { + return pinned_mr_ != PinnedMemoryResource::Disabled; + } + /** * @brief Retrieves the memory availability function for a given memory type. * @@ -311,6 +327,28 @@ class BufferResource { std::unique_ptr data, rmm::cuda_stream_view stream ); + /** + * @brief Move host buffer data into a Buffer. + * + * This operation is cheap; no copy is performed. The resulting Buffer resides in + * host memory. + * + * If @p stream differs from the device buffer's current stream: + * - @p stream is synchronized with the device buffer's current stream, and + * - the device buffer's current stream is updated to @p stream. + * + * @param data Unique pointer to the host buffer. + * @param stream CUDA stream associated with the new Buffer. Use or synchronize with + * @param host_mem_type The memory type of the underlying @p host_buffer. + * this stream when operating on the Buffer. + * @return Unique pointer to the resulting Buffer. + */ + std::unique_ptr move( + std::unique_ptr data, + rmm::cuda_stream_view stream, + MemoryType host_mem_type + ); + /** * @brief Move a Buffer to the memory type specified by the reservation. * diff --git a/cpp/include/rapidsmpf/memory/host_buffer.hpp b/cpp/include/rapidsmpf/memory/host_buffer.hpp index 3ad35d4a4..7078d1c72 100644 --- a/cpp/include/rapidsmpf/memory/host_buffer.hpp +++ b/cpp/include/rapidsmpf/memory/host_buffer.hpp @@ -209,7 +209,7 @@ class HostBuffer { static HostBuffer from_rmm_device_buffer( std::unique_ptr pinned_host_buffer, rmm::cuda_stream_view stream, - PinnedMemoryResource& mr + rmm::host_async_resource_ref mr ); private: diff --git a/cpp/include/rapidsmpf/memory/memory_type.hpp b/cpp/include/rapidsmpf/memory/memory_type.hpp index 621909d1b..410d97daf 100644 --- a/cpp/include/rapidsmpf/memory/memory_type.hpp +++ b/cpp/include/rapidsmpf/memory/memory_type.hpp @@ -10,6 +10,8 @@ #include #include +#include + namespace rapidsmpf { /// @brief Enum representing the type of memory sorted in decreasing order of preference. @@ -66,6 +68,53 @@ static_assert(std::ranges::equal( leq_memory_types(static_cast(-1)), std::ranges::empty_view{} )); +/** + * @brief Get the memory types that are device accessible. + * + * @return A span of memory types that are device accessible. + */ +constexpr std::span device_accessible_memory_types() noexcept { + return std::span{MEMORY_TYPES}.first<2>(); +} + +static_assert(std::ranges::equal( + device_accessible_memory_types(), + std::array{MemoryType::DEVICE, MemoryType::PINNED_HOST} +)); + +/** + * @brief Check if a memory type is device accessible. + * + * @param mem_type The memory type to check. + * @return true if the memory type is device accessible, false otherwise. + */ +constexpr bool is_device_accessible(MemoryType mem_type) noexcept { + return contains(device_accessible_memory_types(), mem_type); +} + +/** + * @brief Get the memory types that are host accessible. + * + * @return A span of memory types that are host accessible. + */ +constexpr std::span host_accessible_memory_types() { + return std::span{MEMORY_TYPES}.last<2>(); +} + +static_assert(std::ranges::equal( + host_accessible_memory_types(), std::array{MemoryType::PINNED_HOST, MemoryType::HOST} +)); + +/** + * @brief Check if a memory type is host accessible. + * + * @param mem_type The memory type to check. + * @return true if the memory type is host accessible, false otherwise. + */ +constexpr bool is_host_accessible(MemoryType mem_type) noexcept { + return contains(host_accessible_memory_types(), mem_type); +} + /** * @brief Get the name of a MemoryType. * diff --git a/cpp/src/integrations/cudf/pack.cpp b/cpp/src/integrations/cudf/pack.cpp new file mode 100644 index 000000000..849bc8e16 --- /dev/null +++ b/cpp/src/integrations/cudf/pack.cpp @@ -0,0 +1,249 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace rapidsmpf { + +namespace detail { + +std::unique_ptr pack_device( + cudf::table_view const& table, + rmm::cuda_stream_view stream, + MemoryReservation& reservation +) { + RAPIDSMPF_EXPECTS( + reservation.mem_type() == MemoryType::DEVICE, + "pack_device requires a device memory reservation", + std::invalid_argument + ); + auto packed_columns = cudf::pack(table, stream, reservation.br()->device_mr()); + reservation.br()->release(reservation, packed_columns.gpu_data->size()); + + return std::make_unique( + std::move(packed_columns.metadata), + reservation.br()->move(std::move(packed_columns.gpu_data), stream) + ); +} + +std::unique_ptr pack_pinned_host( + cudf::table_view const& table, + rmm::cuda_stream_view stream, + MemoryReservation& reservation +) { + RAPIDSMPF_EXPECTS( + reservation.mem_type() == MemoryType::PINNED_HOST, + "pack_pinned_host requires a pinned host memory reservation", + std::invalid_argument + ); + // benchmarks shows that using cudf::pack with pinned mr is sufficient. + + auto packed_columns = + cudf::pack(table, stream, reservation.br()->pinned_mr_as_device()); + reservation.br()->release(reservation, packed_columns.gpu_data->size()); + + // packed table is now in rmm::device_buffer. We need to move it to a + // HostBuffer as Pinned memory allocations are only used with HostBuffers. + auto pinned_host_buffer = + std::make_unique(HostBuffer::from_rmm_device_buffer( + std::move(packed_columns.gpu_data), stream, reservation.br()->pinned_mr() + )); + return std::make_unique( + std::move(packed_columns.metadata), + reservation.br()->move( + std::move(pinned_host_buffer), stream, MemoryType::PINNED_HOST + ) + ); +} + +// Handle the case where packing allocates slightly more than the input size. This can +// occur because cudf uses aligned allocations, which may exceed the requested size. To +// accommodate this, we allow some wiggle room. +void resize_res_if_possible( + size_t packed_size, + cudf::size_type num_columns, + MemoryReservation& reservation, + BufferResource* br +) { + if (packed_size > reservation.size()) { + auto const wiggle_room = 1024 * static_cast(num_columns); + if (packed_size <= reservation.size() + wiggle_room) { + reservation = + br->reserve(reservation.mem_type(), packed_size, AllowOverbooking::YES) + .first; + } + } +} + +std::unique_ptr pack_host( + cudf::table_view const& table, + rmm::cuda_stream_view stream, + MemoryReservation& reservation +) { + RAPIDSMPF_EXPECTS( + reservation.mem_type() == MemoryType::HOST, + "pack_host requires a host memory reservation", + std::invalid_argument + ); + + auto br = reservation.br(); + + if (table.num_rows() == 0) { // special case for empty table + auto empty_buf = br->allocate(0, stream, reservation); + return std::make_unique( + std::make_unique>(cudf::pack_metadata( + table, reinterpret_cast(empty_buf->data()), 0 + )), + std::move(empty_buf) + ); + } + + // minimum buffer size for chunked packing + static constexpr size_t chunked_pack_min_buffer_size = 1024 * 1024; + + // Fast path for small tables (< 1MB): pack directly on device and copy to host. + // This avoids the overhead of allocating a 1MB bounce buffer for chunked packing. + auto const raw_est_size = estimated_memory_usage(table, stream); + if (raw_est_size < chunked_pack_min_buffer_size) { + auto packed_columns = cudf::pack(table, stream, br->device_mr()); + auto const packed_size = packed_columns.gpu_data->size(); + + // Allocate host buffer from the host reservation + detail::resize_res_if_possible(packed_size, table.num_columns(), reservation, br); + auto dest_buf = br->allocate(packed_size, stream, reservation); + dest_buf->write_access([&](std::byte* dest_ptr, rmm::cuda_stream_view) { + RAPIDSMPF_CUDA_TRY(cudaMemcpyAsync( + dest_ptr, + packed_columns.gpu_data->data(), + packed_size, + cudaMemcpyDefault, + stream + )); + }); + + return std::make_unique( + std::move(packed_columns.metadata), std::move(dest_buf) + ); + } + + // estimate the size of the table with a minimum of 1MiB + auto const est_size = std::max(raw_est_size, chunked_pack_min_buffer_size); + + // max available memory for bounce buffer + auto max_available = [](size_t res_sz, size_t ob) -> size_t { + return res_sz > ob ? res_sz - ob : 0; + }; + + { // try device memory first + auto [dev_res, dev_ob] = + br->reserve(MemoryType::DEVICE, est_size, AllowOverbooking::YES); + + auto dev_avail = max_available(dev_res.size(), dev_ob); + if (dev_avail >= chunked_pack_min_buffer_size) { + rmm::device_buffer bounce_buffer(dev_avail, stream, br->device_mr()); + dev_res.clear(); + return chunked_pack( + table, stream, bounce_buffer, br->device_mr(), reservation + ); + } + } + + if (br->is_pinned_memory_available()) { + auto [pinned_res, pinned_ob] = + br->reserve(MemoryType::PINNED_HOST, est_size, AllowOverbooking::YES); + + auto pinned_avail = max_available(pinned_res.size(), pinned_ob); + if (pinned_avail >= chunked_pack_min_buffer_size) { + rmm::device_buffer bounce_buffer( + pinned_avail, stream, br->pinned_mr_as_device() + ); + pinned_res.clear(); + return chunked_pack( + table, stream, bounce_buffer, br->pinned_mr_as_device(), reservation + ); + } + } + + // all attempts failed. + RAPIDSMPF_FAIL( + "Failed to allocate bounce buffer for chunked packing to host memory.", + std::runtime_error + ); +} + +} // namespace detail + +std::unique_ptr pack( + cudf::table_view const& table, + rmm::cuda_stream_view stream, + MemoryReservation& reservation +) { + switch (reservation.mem_type()) { + case MemoryType::DEVICE: + return detail::pack_device(table, stream, reservation); + case MemoryType::PINNED_HOST: + return detail::pack_pinned_host(table, stream, reservation); + case MemoryType::HOST: + return detail::pack_host(table, stream, reservation); + default: + RAPIDSMPF_FAIL("unknown memory type"); + } +} + +std::unique_ptr chunked_pack( + cudf::table_view const& table, + rmm::cuda_stream_view stream, + rmm::device_buffer& bounce_buffer, + rmm::device_async_resource_ref pack_temp_mr, + MemoryReservation& reservation +) { + auto br = reservation.br(); + + cudf::chunked_pack cpack(table, bounce_buffer.size(), stream, pack_temp_mr); + const auto packed_size = cpack.get_total_contiguous_size(); + + detail::resize_res_if_possible(packed_size, table.num_columns(), reservation, br); + + auto dest_buf = br->allocate(packed_size, stream, reservation); + + if (packed_size > 0) { + dest_buf->write_access([&](std::byte* dest_ptr, + rmm::cuda_stream_view dest_buf_stream) { + // join the destination stream with the bounce buffer stream so that the + // copies can be queued + cuda_stream_join(dest_buf_stream, bounce_buffer.stream()); + bounce_buffer.set_stream(dest_buf_stream); + + std::size_t off = 0; + cudf::device_span bounce_buf_span( + static_cast(bounce_buffer.data()), bounce_buffer.size() + ); + while (cpack.has_next()) { + auto const bytes_copied = cpack.next(bounce_buf_span); + RAPIDSMPF_CUDA_TRY(cudaMemcpyAsync( + dest_ptr + off, + bounce_buf_span.data(), + bytes_copied, + cudaMemcpyDefault, + dest_buf_stream + )); + off += bytes_copied; + } + }); + } + + return std::make_unique(cpack.build_metadata(), std::move(dest_buf)); +} + +} // namespace rapidsmpf diff --git a/cpp/src/memory/buffer_resource.cpp b/cpp/src/memory/buffer_resource.cpp index fe1f020d0..9cc4d4827 100644 --- a/cpp/src/memory/buffer_resource.cpp +++ b/cpp/src/memory/buffer_resource.cpp @@ -7,6 +7,8 @@ #include #include +#include + #include #include #include @@ -30,6 +32,16 @@ auto add_missing_availability_functions( } return memory_available; } + +// Helper function to join the stream of a buffer with a new stream. +void buffer_stream_join(auto& data, rmm::cuda_stream_view stream) { + auto const& upstream = data->stream(); + if (upstream.value() != stream.value()) { + cuda_stream_join(stream, upstream); + data->set_stream(stream); + } +} + } // namespace BufferResource::BufferResource( @@ -80,6 +92,13 @@ rmm::host_async_resource_ref BufferResource::pinned_mr() { return *pinned_mr_; } +rmm::device_async_resource_ref BufferResource::pinned_mr_as_device() { + RAPIDSMPF_EXPECTS( + pinned_mr_, "no pinned memory resource is available", std::invalid_argument + ); + return *pinned_mr_; +} + std::pair BufferResource::reserve( MemoryType mem_type, std::size_t size, AllowOverbooking allow_overbooking ) { @@ -180,14 +199,19 @@ std::unique_ptr BufferResource::allocate( std::unique_ptr BufferResource::move( std::unique_ptr data, rmm::cuda_stream_view stream ) { - auto upstream = data->stream(); - if (upstream.value() != stream.value()) { - cuda_stream_join(stream, upstream); - data->set_stream(stream); - } + buffer_stream_join(data, stream); return std::unique_ptr(new Buffer(std::move(data), MemoryType::DEVICE)); } +std::unique_ptr BufferResource::move( + std::unique_ptr data, + rmm::cuda_stream_view stream, + MemoryType host_mem_type +) { + buffer_stream_join(data, stream); + return std::unique_ptr(new Buffer(std::move(data), stream, host_mem_type)); +} + std::unique_ptr BufferResource::move( std::unique_ptr buffer, MemoryReservation& reservation ) { diff --git a/cpp/src/memory/host_buffer.cpp b/cpp/src/memory/host_buffer.cpp index 4c2594257..2fb62fdd4 100644 --- a/cpp/src/memory/host_buffer.cpp +++ b/cpp/src/memory/host_buffer.cpp @@ -141,7 +141,7 @@ HostBuffer HostBuffer::from_owned_vector( HostBuffer HostBuffer::from_rmm_device_buffer( std::unique_ptr pinned_host_buffer, rmm::cuda_stream_view stream, - PinnedMemoryResource& mr + rmm::host_async_resource_ref mr ) { RAPIDSMPF_EXPECTS( pinned_host_buffer != nullptr, @@ -149,8 +149,10 @@ HostBuffer HostBuffer::from_rmm_device_buffer( std::invalid_argument ); + // if the buffer is not empty, it must be host accessible RAPIDSMPF_EXPECTS( - cuda::is_host_accessible(pinned_host_buffer->data()), + pinned_host_buffer->size() == 0 + || cuda::is_host_accessible(pinned_host_buffer->data()), "pinned_host_buffer must be host accessible", std::invalid_argument ); diff --git a/cpp/src/streaming/cudf/table_chunk.cpp b/cpp/src/streaming/cudf/table_chunk.cpp index 11b73fc04..6f2ae8992 100644 --- a/cpp/src/streaming/cudf/table_chunk.cpp +++ b/cpp/src/streaming/cudf/table_chunk.cpp @@ -7,6 +7,7 @@ #include +#include #include #include #include @@ -176,41 +177,20 @@ TableChunk TableChunk::copy(MemoryReservation& reservation) const { br->release(reservation, data_alloc_size(MemoryType::DEVICE)); return TableChunk(std::move(table), stream()); } - case MemoryType::HOST: case MemoryType::PINNED_HOST: - // Case 2. + // if data is not in a packed format (cudf table, non-owning table_view, + // etc), use packing; otherwise use buffer_copy below. if (packed_data_ == nullptr) { - // We use libcudf's pack() to serialize `table_view()` into a - // packed_columns and then we move the packed_columns' gpu_data to a - // new host buffer. - // TODO: use `cudf::chunked_pack()` with a bounce buffer. Currently, - // `cudf::pack()` allocates device memory we haven't reserved. - auto packed_columns = cudf::pack(table_view(), stream(), br->device_mr()); - auto packed_data = std::make_unique( - std::move(packed_columns.metadata), - br->move(std::move(packed_columns.gpu_data), stream()) - ); - - // Handle the case where `cudf::pack` allocates slightly more than the - // input size. This can occur because cudf uses aligned allocations, - // which may exceed the requested size. To accommodate this, we - // allow some wiggle room. - if (packed_data->data->size > reservation.size()) { - auto const wiggle_room = - 1024 * static_cast(table_view().num_columns()); - if (packed_data->data->size <= reservation.size() + wiggle_room) { - reservation = br->reserve( - reservation.mem_type(), - packed_data->data->size, - AllowOverbooking::YES - ) - .first; - } - } - packed_data->data = br->move(std::move(packed_data->data), reservation); - return TableChunk(std::move(packed_data)); + return detail::pack_pinned_host(table_view(), stream(), reservation); + } + break; // use buffer_copy below. + case MemoryType::HOST: + // if data is not in a packed format (cudf table, non-owning table_view, + // etc), use packing; otherwise use buffer_copy below. + if (packed_data_ == nullptr) { + return detail::pack_host(table_view(), stream(), reservation); } - break; + break; // use buffer_copy below. default: RAPIDSMPF_FAIL("MemoryType: unknown"); } diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 54810cf46..1a35a291a 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -79,6 +79,7 @@ target_sources( test_communicator.cpp test_config.cpp test_cuda_stream.cpp + test_cudf_pack.cpp test_cudf_utils.cpp test_cupti_monitor.cpp test_error_macros.cpp diff --git a/cpp/tests/test_cudf_pack.cpp b/cpp/tests/test_cudf_pack.cpp new file mode 100644 index 000000000..6d6384ffb --- /dev/null +++ b/cpp/tests/test_cudf_pack.cpp @@ -0,0 +1,205 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include + +#include + +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "utils.hpp" + +using namespace rapidsmpf; + +namespace { + +/** + * @brief Verify that packed data can be unpacked and matches the expected table. + * + * @param expect The expected table to compare against. + * @param packed_data The packed data to verify. + * @param expected_mem_type The expected memory type of the packed data. + * @param br The buffer resource to use for moving data if needed. + * @param stream The CUDA stream to use. + */ +void verify_packed_data( + cudf::table const& expect, + std::unique_ptr& packed_data, + MemoryType expected_mem_type, + BufferResource* br, + rmm::cuda_stream_view stream +) { + EXPECT_NE(packed_data, nullptr); + EXPECT_NE(packed_data->metadata, nullptr); + EXPECT_NE(packed_data->data, nullptr); + + if (expect.num_rows() > 0) { + EXPECT_EQ(packed_data->data->mem_type(), expected_mem_type); + } + EXPECT_FALSE(packed_data->empty()); + + // copy to device to unpack + rmm::device_buffer copy_data_buffer( + packed_data->data->data(), packed_data->data->size, stream, br->device_mr() + ); + stream.synchronize(); + + auto unpacked = cudf::unpack( + packed_data->metadata->data(), + reinterpret_cast(copy_data_buffer.data()) + ); + + CUDF_TEST_EXPECT_TABLES_EQUIVALENT(expect, unpacked); +} + +} // namespace + +class CudfPackTest + : public cudf::test::BaseFixtureWithParam> { + protected: + void SetUp() override { + std::tie(mem_type_, num_rows_) = GetParam(); + + stream_ = cudf::get_default_stream(); + + if (mem_type_ == MemoryType::PINNED_HOST + && !is_pinned_memory_resources_supported()) + { + GTEST_SKIP() << "Pinned memory resources are not supported on this system"; + } + + br_ = std::make_unique( + cudf::get_current_device_resource_ref(), + PinnedMemoryResource::make_if_available() + ); + } + + static constexpr std::int64_t seed = 42; + MemoryType mem_type_; + std::size_t num_rows_; + rmm::cuda_stream_view stream_; + std::unique_ptr br_; +}; + +INSTANTIATE_TEST_SUITE_P( + MemoryTypesAndRows, + CudfPackTest, + ::testing::Combine( + ::testing::Values(MemoryType::DEVICE, MemoryType::PINNED_HOST, MemoryType::HOST), + ::testing::Values(0, 1, 100, 1000000) + ), + [](const ::testing::TestParamInfo>& info) { + auto mem_type = std::get<0>(info.param); + auto num_rows = std::get<1>(info.param); + return std::string(to_string(mem_type)) + "_" + std::to_string(num_rows); + } +); + +TEST_P(CudfPackTest, PackAndUnpack) { + cudf::table expect = random_table_with_index(seed, num_rows_, 0, 1000); + + auto cudf_packed = cudf::pack(expect, stream_, br_->device_mr()); + std::size_t packed_size = cudf_packed.gpu_data->size(); + + auto reservation = br_->reserve_or_fail(packed_size, mem_type_); + auto packed_data = pack(expect.view(), stream_, reservation); + + EXPECT_NO_FATAL_FAILURE( + verify_packed_data(expect, packed_data, mem_type_, br_.get(), stream_) + ); +} + +// test chunked pack and unpack with device bounce buffer +TEST_P(CudfPackTest, ChunkedPackAndUnpackDevice) { + cudf::table expect = random_table_with_index(seed, num_rows_, 0, 1000); + + auto cudf_packed = cudf::pack(expect, stream_, br_->device_mr()); + std::size_t packed_size = cudf_packed.gpu_data->size(); + + auto reservation = br_->reserve_or_fail(packed_size, mem_type_); + + rmm::device_async_resource_ref pack_temp_mr = br_->device_mr(); + rmm::device_buffer bounce_buffer(1_MiB, stream_, pack_temp_mr); + auto packed_data = + chunked_pack(expect.view(), stream_, bounce_buffer, pack_temp_mr, reservation); + + EXPECT_NO_FATAL_FAILURE( + verify_packed_data(expect, packed_data, mem_type_, br_.get(), stream_) + ); +} + +// test chunked pack and unpack with pinned bounce buffer +TEST_P(CudfPackTest, ChunkedPackAndUnpackPinned) { + if (!rapidsmpf::is_pinned_memory_resources_supported()) { + GTEST_SKIP() << "Skipping test for non-pinned memory type"; + } + + cudf::table expect = random_table_with_index(seed, num_rows_, 0, 1000); + + auto cudf_packed = cudf::pack(expect, stream_, br_->device_mr()); + std::size_t packed_size = cudf_packed.gpu_data->size(); + + auto reservation = br_->reserve_or_fail(packed_size, mem_type_); + + rmm::device_async_resource_ref pack_temp_mr = br_->pinned_mr_as_device(); + rmm::device_buffer bounce_buffer(1_MiB, stream_, pack_temp_mr); + auto packed_data = + chunked_pack(expect.view(), stream_, bounce_buffer, pack_temp_mr, reservation); + + EXPECT_NO_FATAL_FAILURE( + verify_packed_data(expect, packed_data, mem_type_, br_.get(), stream_) + ); +} + +/** + * @brief Test pack with zero device memory available. + * + * This test verifies that packing to host memory works when no device memory is + * available for the bounce buffer, falling back to pinned memory. If pinned memory + * is also unavailable, the test expects an exception. + */ +TEST(CudfPackHostTest, PackToHostWithZeroDeviceMemory) { + static constexpr std::int64_t seed = 42; + static constexpr std::size_t num_rows = 1000000; + + auto stream = cudf::get_default_stream(); + + // Create a buffer resource with 0 device memory available. + auto pinned_mr = PinnedMemoryResource::make_if_available(); + auto br = std::make_unique( + cudf::get_current_device_resource_ref(), + pinned_mr, + std::unordered_map{ + {MemoryType::DEVICE, []() -> std::int64_t { return 0; }} + } + ); + + cudf::table expect = random_table_with_index(seed, num_rows, 0, 1000); + + // Get packed size using cudf::pack (we need device memory for this estimation). + auto cudf_packed = cudf::pack(expect, stream, br->device_mr()); + std::size_t packed_size = cudf_packed.gpu_data->size(); + + auto reservation = br->reserve_or_fail(packed_size, MemoryType::HOST); + + if (is_pinned_memory_resources_supported()) { + auto packed_data = pack(expect.view(), stream, reservation); + EXPECT_NO_FATAL_FAILURE( + verify_packed_data(expect, packed_data, MemoryType::HOST, br.get(), stream) + ); + } else { + EXPECT_THROW( + std::ignore = pack(expect.view(), stream, reservation), std::runtime_error + ); + } +}