diff --git a/cpp/include/rapidsmpf/memory/buffer_resource.hpp b/cpp/include/rapidsmpf/memory/buffer_resource.hpp index f8285b4d4..5be8cf09b 100644 --- a/cpp/include/rapidsmpf/memory/buffer_resource.hpp +++ b/cpp/include/rapidsmpf/memory/buffer_resource.hpp @@ -133,7 +133,7 @@ class BufferResource { * * @return Reference to the RMM resource used for pinned host allocations. */ - [[nodiscard]] rmm::host_async_resource_ref pinned_mr(); + [[nodiscard]] rmm::host_device_async_resource_ref pinned_mr(); /** * @brief Retrieves the memory availability function for a given memory type. @@ -297,22 +297,29 @@ class BufferResource { ); /** - * @brief Move device buffer data into a Buffer. + * @brief Move device or pinned host buffer data into a Buffer. * - * This operation is cheap; no copy is performed. The resulting Buffer resides in - * device memory. + * This operation is cheap; no copy is performed. * * If @p stream differs from the device buffer's current stream: * - @p stream is synchronized with the device buffer's current stream, and * - the device buffer's current stream is updated to @p stream. * - * @param data Unique pointer to the device buffer. + * @param data Unique pointer to the device or pinned host buffer. * @param stream CUDA stream associated with the new Buffer. Use or synchronize with * this stream when operating on the Buffer. + * @param mem_type The memory type of the underlying @p data. A device accessible + * memory type is required (ie. MemoryType::DEVICE or MemoryType::PINNED_HOST). * @return Unique pointer to the resulting Buffer. + * + * @throws std::invalid_argument If the memory type is invalid. + * @throws std::runtime_error If @p mem_type is MemoryType::PINNED_HOST and the + * pinned memory resource is not available. */ std::unique_ptr move( - std::unique_ptr data, rmm::cuda_stream_view stream + std::unique_ptr data, + rmm::cuda_stream_view stream, + MemoryType mem_type = MemoryType::DEVICE ); /** diff --git a/cpp/include/rapidsmpf/memory/host_buffer.hpp b/cpp/include/rapidsmpf/memory/host_buffer.hpp index 3f3934664..89df25984 100644 --- a/cpp/include/rapidsmpf/memory/host_buffer.hpp +++ b/cpp/include/rapidsmpf/memory/host_buffer.hpp @@ -213,7 +213,7 @@ class HostBuffer { static HostBuffer from_rmm_device_buffer( std::unique_ptr pinned_host_buffer, rmm::cuda_stream_view stream, - PinnedMemoryResource& mr + rmm::host_device_async_resource_ref mr ); private: diff --git a/cpp/src/memory/buffer_resource.cpp b/cpp/src/memory/buffer_resource.cpp index 5f89dfd68..795e7eb0a 100644 --- a/cpp/src/memory/buffer_resource.cpp +++ b/cpp/src/memory/buffer_resource.cpp @@ -86,7 +86,7 @@ rmm::host_async_resource_ref BufferResource::host_mr() noexcept { return host_mr_; } -rmm::host_async_resource_ref BufferResource::pinned_mr() { +rmm::host_device_async_resource_ref BufferResource::pinned_mr() { RAPIDSMPF_EXPECTS( pinned_mr_, "no pinned memory resource is available", std::invalid_argument ); @@ -200,14 +200,37 @@ std::unique_ptr BufferResource::allocate( } std::unique_ptr BufferResource::move( - std::unique_ptr data, rmm::cuda_stream_view stream + std::unique_ptr data, + rmm::cuda_stream_view stream, + MemoryType mem_type ) { auto upstream = data->stream(); if (upstream.value() != stream.value()) { cuda_stream_join(stream, upstream); data->set_stream(stream); } - return std::unique_ptr(new Buffer(std::move(data), MemoryType::DEVICE)); + + if (mem_type == MemoryType::DEVICE) { + return std::unique_ptr(new Buffer(std::move(data), MemoryType::DEVICE)); + } else if (mem_type == MemoryType::PINNED_HOST) { + RAPIDSMPF_EXPECTS( + pinned_mr_ != PinnedMemoryResource::Disabled, + "pinned memory resource is not available", + std::runtime_error + ); + + auto pinned_host_buffer = std::make_unique( + HostBuffer::from_rmm_device_buffer(std::move(data), stream, pinned_mr()) + ); + return std::unique_ptr( + new Buffer(std::move(pinned_host_buffer), stream, MemoryType::PINNED_HOST) + ); + } else { + RAPIDSMPF_FAIL( + "Invalid memory type: " + std::string(to_string(mem_type)), + std::invalid_argument + ); + } } std::unique_ptr BufferResource::move( diff --git a/cpp/src/memory/host_buffer.cpp b/cpp/src/memory/host_buffer.cpp index bf376131a..ca52b4776 100644 --- a/cpp/src/memory/host_buffer.cpp +++ b/cpp/src/memory/host_buffer.cpp @@ -145,7 +145,7 @@ HostBuffer HostBuffer::from_owned_vector( HostBuffer HostBuffer::from_rmm_device_buffer( std::unique_ptr pinned_host_buffer, rmm::cuda_stream_view stream, - PinnedMemoryResource& mr + rmm::host_device_async_resource_ref mr ) { RAPIDSMPF_EXPECTS( pinned_host_buffer != nullptr, diff --git a/cpp/src/streaming/cudf/table_chunk.cpp b/cpp/src/streaming/cudf/table_chunk.cpp index 26e527655..4ac8979a9 100644 --- a/cpp/src/streaming/cudf/table_chunk.cpp +++ b/cpp/src/streaming/cudf/table_chunk.cpp @@ -161,13 +161,21 @@ TableChunk TableChunk::copy(MemoryReservation& reservation) const { // 2. The chunk is available and the data is a generic cudf table that is // not already packed. In this case, the table data must first be packed // before copying it to host or pinned memory. + // a. reservation in pinned memory - Use cudf::pack to directly copy the table + // data to pinned memory. + // b. reservation in host memory - Use cudf::pack to copy the table data to + // intermediate device memory and then copy to host memory. // // 3. The chunk data is already packed (packed_data_ != nullptr). // In this case, we simply use buffer_copy() to copy the packed data // into the reservation-specified memory type. The original memory // type of the chunk does not matter. BufferResource* br = reservation.br(); - if (is_available()) { + + // If the table view is available and the table is not packed, we can use libcudf to + // copy the table in device memory, or pack it to pinned/ host memory. Else, fall + // through to case 3 (ie. use buffer_copy). + if (is_available() && packed_data_ == nullptr) { switch (reservation.mem_type()) { case MemoryType::DEVICE: // Case 1. { @@ -184,10 +192,30 @@ TableChunk TableChunk::copy(MemoryReservation& reservation) const { br->release(reservation, nbytes); return TableChunk(std::move(table), stream()); } - case MemoryType::HOST: - case MemoryType::PINNED_HOST: - // Case 2. - if (packed_data_ == nullptr) { + case MemoryType::PINNED_HOST: // Case 2a. + { + StreamOrderedTiming timing{stream(), br->statistics()}; + + // use cudf pack with pinned mr + auto packed_pinned = cudf::pack(table_view(), stream(), br->pinned_mr()); + auto nbytes = packed_pinned.gpu_data->size(); + + br->statistics()->record_copy( + MemoryType::DEVICE, MemoryType::PINNED_HOST, nbytes, std::move(timing) + ); + // update the provided `reservation` + br->release(reservation, nbytes); + auto host_buffer = br->move( + std::move(packed_pinned.gpu_data), stream(), MemoryType::PINNED_HOST + ); + return TableChunk( + std::make_unique( + std::move(packed_pinned.metadata), std::move(host_buffer) + ) + ); + } + case MemoryType::HOST: // Case 2b. + { // We use libcudf's pack() to serialize `table_view()` into a // packed_columns and then we move the packed_columns' gpu_data to a // new host buffer. @@ -218,7 +246,6 @@ TableChunk TableChunk::copy(MemoryReservation& reservation) const { packed_data->data = br->move(std::move(packed_data->data), reservation); return TableChunk(std::move(packed_data)); } - break; default: RAPIDSMPF_FAIL("MemoryType: unknown"); } @@ -226,7 +253,8 @@ TableChunk TableChunk::copy(MemoryReservation& reservation) const { // Note, `is_available() == false` implies `packed_data_ != nullptr`. RAPIDSMPF_EXPECTS(packed_data_ != nullptr, "something went wrong"); - // Case 3. + // Case 3. The chunk data is already packed (packed_data_ != nullptr). We need + // to copy the packed data into the reservation-specified memory type. auto const nbytes = packed_data_->data->size; auto metadata = std::make_unique>(*packed_data_->metadata); auto data = br->allocate(nbytes, packed_data_->stream(), reservation);