From d7051d431121359f8956bdec4ce2c02081d9d12a Mon Sep 17 00:00:00 2001 From: niranda perera Date: Mon, 13 Apr 2026 16:23:38 -0700 Subject: [PATCH 1/5] use cudf pack with pinned mr Signed-off-by: niranda perera --- .../rapidsmpf/memory/buffer_resource.hpp | 14 +++++-- cpp/include/rapidsmpf/memory/host_buffer.hpp | 2 +- cpp/src/memory/buffer_resource.cpp | 29 ++++++++++++-- cpp/src/memory/host_buffer.cpp | 2 +- cpp/src/streaming/cudf/table_chunk.cpp | 38 +++++++++++++++++-- 5 files changed, 73 insertions(+), 12 deletions(-) diff --git a/cpp/include/rapidsmpf/memory/buffer_resource.hpp b/cpp/include/rapidsmpf/memory/buffer_resource.hpp index f8efd4968..4da2ff5d3 100644 --- a/cpp/include/rapidsmpf/memory/buffer_resource.hpp +++ b/cpp/include/rapidsmpf/memory/buffer_resource.hpp @@ -134,7 +134,7 @@ class BufferResource { * * @return Reference to the RMM resource used for pinned host allocations. */ - [[nodiscard]] rmm::host_async_resource_ref pinned_mr(); + [[nodiscard]] rmm::host_device_async_resource_ref pinned_mr(); /** * @brief Retrieves the memory availability function for a given memory type. @@ -298,7 +298,7 @@ class BufferResource { ); /** - * @brief Move device buffer data into a Buffer. + * @brief Move device/ pinned host buffer data into a Buffer. * * This operation is cheap; no copy is performed. The resulting Buffer resides in * device memory. @@ -310,10 +310,18 @@ class BufferResource { * @param data Unique pointer to the device buffer. * @param stream CUDA stream associated with the new Buffer. Use or synchronize with * this stream when operating on the Buffer. + * @param mem_type The memory type of the underlying @p data. This requires to be a + * device accessible memory type (ie. MemoryType::DEVICE or MemoryType::PINNED_HOST). * @return Unique pointer to the resulting Buffer. + * + * @throws std::invalid_argument If the memory type is invalid. + * @throws std::invalid_argument If @p mem_type is MemoryType::PINNED_HOST and the + * pinned memory resource is not available. */ std::unique_ptr move( - std::unique_ptr data, rmm::cuda_stream_view stream + std::unique_ptr data, + rmm::cuda_stream_view stream, + MemoryType mem_type = MemoryType::DEVICE ); /** diff --git a/cpp/include/rapidsmpf/memory/host_buffer.hpp b/cpp/include/rapidsmpf/memory/host_buffer.hpp index 3f3934664..89df25984 100644 --- a/cpp/include/rapidsmpf/memory/host_buffer.hpp +++ b/cpp/include/rapidsmpf/memory/host_buffer.hpp @@ -213,7 +213,7 @@ class HostBuffer { static HostBuffer from_rmm_device_buffer( std::unique_ptr pinned_host_buffer, rmm::cuda_stream_view stream, - PinnedMemoryResource& mr + rmm::host_device_async_resource_ref mr ); private: diff --git a/cpp/src/memory/buffer_resource.cpp b/cpp/src/memory/buffer_resource.cpp index 042700439..251f3acba 100644 --- a/cpp/src/memory/buffer_resource.cpp +++ b/cpp/src/memory/buffer_resource.cpp @@ -82,7 +82,7 @@ rmm::host_async_resource_ref BufferResource::host_mr() noexcept { return host_mr_; } -rmm::host_async_resource_ref BufferResource::pinned_mr() { +rmm::host_device_async_resource_ref BufferResource::pinned_mr() { RAPIDSMPF_EXPECTS( pinned_mr_, "no pinned memory resource is available", std::invalid_argument ); @@ -196,14 +196,37 @@ std::unique_ptr BufferResource::allocate( } std::unique_ptr BufferResource::move( - std::unique_ptr data, rmm::cuda_stream_view stream + std::unique_ptr data, + rmm::cuda_stream_view stream, + MemoryType mem_type ) { auto upstream = data->stream(); if (upstream.value() != stream.value()) { cuda_stream_join(stream, upstream); data->set_stream(stream); } - return std::unique_ptr(new Buffer(std::move(data), MemoryType::DEVICE)); + + if (mem_type == MemoryType::DEVICE) { + return std::unique_ptr(new Buffer(std::move(data), MemoryType::DEVICE)); + } else if (mem_type == MemoryType::PINNED_HOST) { + RAPIDSMPF_EXPECTS( + pinned_mr_ != PinnedMemoryResource::Disabled, + "pinned memory resource is not available", + std::invalid_argument + ); + + auto pinned_host_buffer = std::make_unique( + HostBuffer::from_rmm_device_buffer(std::move(data), stream, pinned_mr()) + ); + return std::unique_ptr( + new Buffer(std::move(pinned_host_buffer), stream, MemoryType::PINNED_HOST) + ); + } else { + RAPIDSMPF_FAIL( + "Invalid memory type: " + std::string(to_string(mem_type)), + std::invalid_argument + ); + } } std::unique_ptr BufferResource::move( diff --git a/cpp/src/memory/host_buffer.cpp b/cpp/src/memory/host_buffer.cpp index 99011634b..e2efa0bd0 100644 --- a/cpp/src/memory/host_buffer.cpp +++ b/cpp/src/memory/host_buffer.cpp @@ -141,7 +141,7 @@ HostBuffer HostBuffer::from_owned_vector( HostBuffer HostBuffer::from_rmm_device_buffer( std::unique_ptr pinned_host_buffer, rmm::cuda_stream_view stream, - PinnedMemoryResource& mr + rmm::host_device_async_resource_ref mr ) { RAPIDSMPF_EXPECTS( pinned_host_buffer != nullptr, diff --git a/cpp/src/streaming/cudf/table_chunk.cpp b/cpp/src/streaming/cudf/table_chunk.cpp index 313dc1e85..f81eaf2d9 100644 --- a/cpp/src/streaming/cudf/table_chunk.cpp +++ b/cpp/src/streaming/cudf/table_chunk.cpp @@ -160,6 +160,10 @@ TableChunk TableChunk::copy(MemoryReservation& reservation) const { // 2. The chunk is available and the data is a generic cudf table that is // not already packed. In this case, the table data must first be packed // before copying it to host or pinned memory. + // a. reservaiton in pinned memory - Use cudf::pack to directly copy the table + // data to pinned memory. + // b. reservaiton in host memory - Use cudf::pack to copy the table data to + // intermediate device memory and then copy to host memory. // // 3. The chunk data is already packed (packed_data_ != nullptr). // In this case, we simply use buffer_copy() to copy the packed data @@ -183,9 +187,33 @@ TableChunk TableChunk::copy(MemoryReservation& reservation) const { br->release(reservation, nbytes); return TableChunk(std::move(table), stream()); } - case MemoryType::HOST: - case MemoryType::PINNED_HOST: - // Case 2. + case MemoryType::PINNED_HOST: // Case 2a. + if (packed_data_ == nullptr) { // data is in device memory as a table + auto stream = this->stream(); + + StreamOrderedTiming timing{stream, br->statistics()}; + + // use cudf pack with pinned mr. When the pinned mr is warmed up, the + // performance + auto packed_pinned = cudf::pack(table_view(), stream, br->pinned_mr()); + auto nbytes = packed_pinned.gpu_data->size(); + + br->statistics()->record_copy( + MemoryType::DEVICE, MemoryType::PINNED_HOST, nbytes, std::move(timing) + ); + // update the provided `reservation` + br->release(reservation, nbytes); + auto host_buffer = br->move( + std::move(packed_pinned.gpu_data), stream, MemoryType::PINNED_HOST + ); + return TableChunk( + std::make_unique( + std::move(packed_pinned.metadata), std::move(host_buffer) + ) + ); + } // else fall through to the default Case 3. + break; + case MemoryType::HOST: // Case 2b. if (packed_data_ == nullptr) { // We use libcudf's pack() to serialize `table_view()` into a // packed_columns and then we move the packed_columns' gpu_data to a @@ -217,6 +245,7 @@ TableChunk TableChunk::copy(MemoryReservation& reservation) const { packed_data->data = br->move(std::move(packed_data->data), reservation); return TableChunk(std::move(packed_data)); } + // else fall through to the Case 3. break; default: RAPIDSMPF_FAIL("MemoryType: unknown"); @@ -225,7 +254,8 @@ TableChunk TableChunk::copy(MemoryReservation& reservation) const { // Note, `is_available() == false` implies `packed_data_ != nullptr`. RAPIDSMPF_EXPECTS(packed_data_ != nullptr, "something went wrong"); - // Case 3. + // Case 3. The chunk data is already packed (packed_data_ != nullptr). We need + // to copy the packed data into the reservation-specified memory type. auto const nbytes = packed_data_->data->size; auto metadata = std::make_unique>(*packed_data_->metadata); auto data = br->allocate(nbytes, packed_data_->stream(), reservation); From b4fc954e19a319f357ae50c8254212235f12e44a Mon Sep 17 00:00:00 2001 From: Niranda Perera Date: Fri, 17 Apr 2026 08:17:54 -0700 Subject: [PATCH 2/5] Apply suggestions from code review Co-authored-by: Peter Andreas Entschev --- cpp/include/rapidsmpf/memory/buffer_resource.hpp | 6 +++--- cpp/src/streaming/cudf/table_chunk.cpp | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/cpp/include/rapidsmpf/memory/buffer_resource.hpp b/cpp/include/rapidsmpf/memory/buffer_resource.hpp index 4da2ff5d3..872c42d58 100644 --- a/cpp/include/rapidsmpf/memory/buffer_resource.hpp +++ b/cpp/include/rapidsmpf/memory/buffer_resource.hpp @@ -298,7 +298,7 @@ class BufferResource { ); /** - * @brief Move device/ pinned host buffer data into a Buffer. + * @brief Move device or pinned host buffer data into a Buffer. * * This operation is cheap; no copy is performed. The resulting Buffer resides in * device memory. @@ -310,8 +310,8 @@ class BufferResource { * @param data Unique pointer to the device buffer. * @param stream CUDA stream associated with the new Buffer. Use or synchronize with * this stream when operating on the Buffer. - * @param mem_type The memory type of the underlying @p data. This requires to be a - * device accessible memory type (ie. MemoryType::DEVICE or MemoryType::PINNED_HOST). + * @param mem_type The memory type of the underlying @p data. A device accessible memory + * type is required (ie. MemoryType::DEVICE or MemoryType::PINNED_HOST). * @return Unique pointer to the resulting Buffer. * * @throws std::invalid_argument If the memory type is invalid. diff --git a/cpp/src/streaming/cudf/table_chunk.cpp b/cpp/src/streaming/cudf/table_chunk.cpp index ab1fec50c..a92d3fb72 100644 --- a/cpp/src/streaming/cudf/table_chunk.cpp +++ b/cpp/src/streaming/cudf/table_chunk.cpp @@ -161,10 +161,10 @@ TableChunk TableChunk::copy(MemoryReservation& reservation) const { // 2. The chunk is available and the data is a generic cudf table that is // not already packed. In this case, the table data must first be packed // before copying it to host or pinned memory. - // a. reservaiton in pinned memory - Use cudf::pack to directly copy the table - // data to pinned memory. - // b. reservaiton in host memory - Use cudf::pack to copy the table data to - // intermediate device memory and then copy to host memory. + // a. reservation in pinned memory - Use cudf::pack to directly copy the table + // data to pinned memory. + // b. reservation in host memory - Use cudf::pack to copy the table data to + // intermediate device memory and then copy to host memory. // // 3. The chunk data is already packed (packed_data_ != nullptr). // In this case, we simply use buffer_copy() to copy the packed data From 0b00c6eedf046255e2760aaacba77142eeb36843 Mon Sep 17 00:00:00 2001 From: niranda perera Date: Mon, 20 Apr 2026 15:45:57 -0700 Subject: [PATCH 3/5] addressing PR comments Signed-off-by: niranda perera --- cpp/include/rapidsmpf/memory/buffer_resource.hpp | 6 +++--- cpp/src/streaming/cudf/table_chunk.cpp | 13 +++++++------ 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/cpp/include/rapidsmpf/memory/buffer_resource.hpp b/cpp/include/rapidsmpf/memory/buffer_resource.hpp index 872c42d58..f1429ce07 100644 --- a/cpp/include/rapidsmpf/memory/buffer_resource.hpp +++ b/cpp/include/rapidsmpf/memory/buffer_resource.hpp @@ -298,7 +298,7 @@ class BufferResource { ); /** - * @brief Move device or pinned host buffer data into a Buffer. + * @brief Move device or pinned host buffer data into a Buffer. * * This operation is cheap; no copy is performed. The resulting Buffer resides in * device memory. @@ -310,8 +310,8 @@ class BufferResource { * @param data Unique pointer to the device buffer. * @param stream CUDA stream associated with the new Buffer. Use or synchronize with * this stream when operating on the Buffer. - * @param mem_type The memory type of the underlying @p data. A device accessible memory - * type is required (ie. MemoryType::DEVICE or MemoryType::PINNED_HOST). + * @param mem_type The memory type of the underlying @p data. A device accessible + * memory type is required (ie. MemoryType::DEVICE or MemoryType::PINNED_HOST). * @return Unique pointer to the resulting Buffer. * * @throws std::invalid_argument If the memory type is invalid. diff --git a/cpp/src/streaming/cudf/table_chunk.cpp b/cpp/src/streaming/cudf/table_chunk.cpp index a92d3fb72..46521d4b9 100644 --- a/cpp/src/streaming/cudf/table_chunk.cpp +++ b/cpp/src/streaming/cudf/table_chunk.cpp @@ -161,10 +161,10 @@ TableChunk TableChunk::copy(MemoryReservation& reservation) const { // 2. The chunk is available and the data is a generic cudf table that is // not already packed. In this case, the table data must first be packed // before copying it to host or pinned memory. - // a. reservation in pinned memory - Use cudf::pack to directly copy the table - // data to pinned memory. - // b. reservation in host memory - Use cudf::pack to copy the table data to - // intermediate device memory and then copy to host memory. + // a. reservation in pinned memory - Use cudf::pack to directly copy the table + // data to pinned memory. + // b. reservation in host memory - Use cudf::pack to copy the table data to + // intermediate device memory and then copy to host memory. // // 3. The chunk data is already packed (packed_data_ != nullptr). // In this case, we simply use buffer_copy() to copy the packed data @@ -194,8 +194,9 @@ TableChunk TableChunk::copy(MemoryReservation& reservation) const { StreamOrderedTiming timing{stream, br->statistics()}; - // use cudf pack with pinned mr. When the pinned mr is warmed up, the - // performance + // use cudf pack with pinned mr, because it is more device memory + // efficient. When the pinned mr is warmed up, the performance is in par + // with packing to device memory and copying. auto packed_pinned = cudf::pack(table_view(), stream, br->pinned_mr()); auto nbytes = packed_pinned.gpu_data->size(); From 5fe889b79330f7c8a6329d55f6653575f3ce443f Mon Sep 17 00:00:00 2001 From: niranda perera Date: Wed, 22 Apr 2026 10:22:18 -0700 Subject: [PATCH 4/5] addressing comments --- cpp/src/memory/buffer_resource.cpp | 2 +- cpp/src/streaming/cudf/table_chunk.cpp | 27 ++++++++++++-------------- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/cpp/src/memory/buffer_resource.cpp b/cpp/src/memory/buffer_resource.cpp index 3656aaf52..8518e75ae 100644 --- a/cpp/src/memory/buffer_resource.cpp +++ b/cpp/src/memory/buffer_resource.cpp @@ -214,7 +214,7 @@ std::unique_ptr BufferResource::move( RAPIDSMPF_EXPECTS( pinned_mr_ != PinnedMemoryResource::Disabled, "pinned memory resource is not available", - std::invalid_argument + std::runtime_error ); auto pinned_host_buffer = std::make_unique( diff --git a/cpp/src/streaming/cudf/table_chunk.cpp b/cpp/src/streaming/cudf/table_chunk.cpp index 46521d4b9..adce174c1 100644 --- a/cpp/src/streaming/cudf/table_chunk.cpp +++ b/cpp/src/streaming/cudf/table_chunk.cpp @@ -171,7 +171,11 @@ TableChunk TableChunk::copy(MemoryReservation& reservation) const { // into the reservation-specified memory type. The original memory // type of the chunk does not matter. BufferResource* br = reservation.br(); - if (is_available()) { + + // If the table view is available and the table is not packed, we can use libcudf to + // copy the table in device memory, or pack it to pinned/ host memory. Else, fall + // through to case 3 (ie. use buffer_copy). + if (is_available() && packed_data_ == nullptr) { switch (reservation.mem_type()) { case MemoryType::DEVICE: // Case 1. { @@ -189,15 +193,11 @@ TableChunk TableChunk::copy(MemoryReservation& reservation) const { return TableChunk(std::move(table), stream()); } case MemoryType::PINNED_HOST: // Case 2a. - if (packed_data_ == nullptr) { // data is in device memory as a table - auto stream = this->stream(); - - StreamOrderedTiming timing{stream, br->statistics()}; + { + StreamOrderedTiming timing{stream(), br->statistics()}; - // use cudf pack with pinned mr, because it is more device memory - // efficient. When the pinned mr is warmed up, the performance is in par - // with packing to device memory and copying. - auto packed_pinned = cudf::pack(table_view(), stream, br->pinned_mr()); + // use cudf pack with pinned mr + auto packed_pinned = cudf::pack(table_view(), stream(), br->pinned_mr()); auto nbytes = packed_pinned.gpu_data->size(); br->statistics()->record_copy( @@ -206,17 +206,16 @@ TableChunk TableChunk::copy(MemoryReservation& reservation) const { // update the provided `reservation` br->release(reservation, nbytes); auto host_buffer = br->move( - std::move(packed_pinned.gpu_data), stream, MemoryType::PINNED_HOST + std::move(packed_pinned.gpu_data), stream(), MemoryType::PINNED_HOST ); return TableChunk( std::make_unique( std::move(packed_pinned.metadata), std::move(host_buffer) ) ); - } // else fall through to the default Case 3. - break; + } case MemoryType::HOST: // Case 2b. - if (packed_data_ == nullptr) { + { // We use libcudf's pack() to serialize `table_view()` into a // packed_columns and then we move the packed_columns' gpu_data to a // new host buffer. @@ -247,8 +246,6 @@ TableChunk TableChunk::copy(MemoryReservation& reservation) const { packed_data->data = br->move(std::move(packed_data->data), reservation); return TableChunk(std::move(packed_data)); } - // else fall through to the Case 3. - break; default: RAPIDSMPF_FAIL("MemoryType: unknown"); } From 7769d1fe635061062ebd1798b3e614a6c0f125b6 Mon Sep 17 00:00:00 2001 From: niranda perera Date: Wed, 22 Apr 2026 10:30:46 -0700 Subject: [PATCH 5/5] minor doc change Signed-off-by: niranda perera --- cpp/include/rapidsmpf/memory/buffer_resource.hpp | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/cpp/include/rapidsmpf/memory/buffer_resource.hpp b/cpp/include/rapidsmpf/memory/buffer_resource.hpp index 62d3fc640..a5843889f 100644 --- a/cpp/include/rapidsmpf/memory/buffer_resource.hpp +++ b/cpp/include/rapidsmpf/memory/buffer_resource.hpp @@ -299,14 +299,13 @@ class BufferResource { /** * @brief Move device or pinned host buffer data into a Buffer. * - * This operation is cheap; no copy is performed. The resulting Buffer resides in - * device memory. + * This operation is cheap; no copy is performed. * * If @p stream differs from the device buffer's current stream: * - @p stream is synchronized with the device buffer's current stream, and * - the device buffer's current stream is updated to @p stream. * - * @param data Unique pointer to the device buffer. + * @param data Unique pointer to the device or pinned host buffer. * @param stream CUDA stream associated with the new Buffer. Use or synchronize with * this stream when operating on the Buffer. * @param mem_type The memory type of the underlying @p data. A device accessible @@ -314,7 +313,7 @@ class BufferResource { * @return Unique pointer to the resulting Buffer. * * @throws std::invalid_argument If the memory type is invalid. - * @throws std::invalid_argument If @p mem_type is MemoryType::PINNED_HOST and the + * @throws std::runtime_error If @p mem_type is MemoryType::PINNED_HOST and the * pinned memory resource is not available. */ std::unique_ptr move(