Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions cpp/include/rapidsmpf/memory/buffer_resource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class BufferResource {
*
* @return Reference to the RMM resource used for pinned host allocations.
*/
[[nodiscard]] rmm::host_async_resource_ref pinned_mr();
[[nodiscard]] rmm::host_device_async_resource_ref pinned_mr();

/**
* @brief Retrieves the memory availability function for a given memory type.
Expand Down Expand Up @@ -297,22 +297,29 @@ class BufferResource {
);

/**
* @brief Move device buffer data into a Buffer.
* @brief Move device or pinned host buffer data into a Buffer.
*
* This operation is cheap; no copy is performed. The resulting Buffer resides in
* device memory.
* This operation is cheap; no copy is performed.
*
* If @p stream differs from the device buffer's current stream:
* - @p stream is synchronized with the device buffer's current stream, and
* - the device buffer's current stream is updated to @p stream.
*
* @param data Unique pointer to the device buffer.
* @param data Unique pointer to the device or pinned host buffer.
* @param stream CUDA stream associated with the new Buffer. Use or synchronize with
* this stream when operating on the Buffer.
* @param mem_type The memory type of the underlying @p data. A device accessible
* memory type is required (ie. MemoryType::DEVICE or MemoryType::PINNED_HOST).
* @return Unique pointer to the resulting Buffer.
*
* @throws std::invalid_argument If the memory type is invalid.
* @throws std::runtime_error If @p mem_type is MemoryType::PINNED_HOST and the
* pinned memory resource is not available.
*/
std::unique_ptr<Buffer> move(
std::unique_ptr<rmm::device_buffer> data, rmm::cuda_stream_view stream
std::unique_ptr<rmm::device_buffer> data,
rmm::cuda_stream_view stream,
MemoryType mem_type = MemoryType::DEVICE
);

/**
Expand Down
2 changes: 1 addition & 1 deletion cpp/include/rapidsmpf/memory/host_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ class HostBuffer {
static HostBuffer from_rmm_device_buffer(
std::unique_ptr<rmm::device_buffer> pinned_host_buffer,
rmm::cuda_stream_view stream,
PinnedMemoryResource& mr
rmm::host_device_async_resource_ref mr
);

private:
Expand Down
29 changes: 26 additions & 3 deletions cpp/src/memory/buffer_resource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ rmm::host_async_resource_ref BufferResource::host_mr() noexcept {
return host_mr_;
}

rmm::host_async_resource_ref BufferResource::pinned_mr() {
rmm::host_device_async_resource_ref BufferResource::pinned_mr() {
RAPIDSMPF_EXPECTS(
pinned_mr_, "no pinned memory resource is available", std::invalid_argument
);
Expand Down Expand Up @@ -200,14 +200,37 @@ std::unique_ptr<Buffer> BufferResource::allocate(
}

std::unique_ptr<Buffer> BufferResource::move(
std::unique_ptr<rmm::device_buffer> data, rmm::cuda_stream_view stream
std::unique_ptr<rmm::device_buffer> data,
rmm::cuda_stream_view stream,
MemoryType mem_type
) {
auto upstream = data->stream();
if (upstream.value() != stream.value()) {
cuda_stream_join(stream, upstream);
data->set_stream(stream);
}
return std::unique_ptr<Buffer>(new Buffer(std::move(data), MemoryType::DEVICE));

if (mem_type == MemoryType::DEVICE) {
return std::unique_ptr<Buffer>(new Buffer(std::move(data), MemoryType::DEVICE));
} else if (mem_type == MemoryType::PINNED_HOST) {
RAPIDSMPF_EXPECTS(
pinned_mr_ != PinnedMemoryResource::Disabled,
"pinned memory resource is not available",
std::runtime_error
);

auto pinned_host_buffer = std::make_unique<HostBuffer>(
HostBuffer::from_rmm_device_buffer(std::move(data), stream, pinned_mr())
);
return std::unique_ptr<Buffer>(
new Buffer(std::move(pinned_host_buffer), stream, MemoryType::PINNED_HOST)
);
Comment on lines +222 to +227
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to think about this going forward. I don't really like the split in the type system between the pinned host buffer and the rmm::device_buffer storage.

Remind me why we need the pinned host buffer to be stored as a HostBuffer inside the Buffer object?

Copy link
Copy Markdown
Contributor Author

@nirandaperera nirandaperera Apr 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really like the split in the type system between the pinned host buffer and the rmm::device_buffer storage.

This was required, because say we pass a pinned MR to a cudf operation, all allocations made by cudf are returned as rmm::device_buffer. So, we need a way to move it into a Host buffer.

Remind me why we need the pinned host buffer to be stored as a HostBuffer inside the Buffer object?

PinnedBuffer and HostBuffer were merged together because both impls were very similar. So, now both pinned and host data ptr is passed on to the HostBuffer.
TBH I think we may be able to merge all buffer classes in to a single impl, with the rmm MR changes. Let's see.

} else {
RAPIDSMPF_FAIL(
"Invalid memory type: " + std::string(to_string(mem_type)),
std::invalid_argument
);
}
}

std::unique_ptr<Buffer> BufferResource::move(
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/memory/host_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ HostBuffer HostBuffer::from_owned_vector(
HostBuffer HostBuffer::from_rmm_device_buffer(
std::unique_ptr<rmm::device_buffer> pinned_host_buffer,
rmm::cuda_stream_view stream,
PinnedMemoryResource& mr
rmm::host_device_async_resource_ref mr
) {
RAPIDSMPF_EXPECTS(
pinned_host_buffer != nullptr,
Expand Down
42 changes: 35 additions & 7 deletions cpp/src/streaming/cudf/table_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,21 @@ TableChunk TableChunk::copy(MemoryReservation& reservation) const {
// 2. The chunk is available and the data is a generic cudf table that is
// not already packed. In this case, the table data must first be packed
// before copying it to host or pinned memory.
// a. reservation in pinned memory - Use cudf::pack to directly copy the table
// data to pinned memory.
// b. reservation in host memory - Use cudf::pack to copy the table data to
// intermediate device memory and then copy to host memory.
//
// 3. The chunk data is already packed (packed_data_ != nullptr).
// In this case, we simply use buffer_copy() to copy the packed data
// into the reservation-specified memory type. The original memory
// type of the chunk does not matter.
BufferResource* br = reservation.br();
if (is_available()) {

// If the table view is available and the table is not packed, we can use libcudf to
// copy the table in device memory, or pack it to pinned/ host memory. Else, fall
// through to case 3 (ie. use buffer_copy).
if (is_available() && packed_data_ == nullptr) {
switch (reservation.mem_type()) {
case MemoryType::DEVICE: // Case 1.
{
Expand All @@ -184,10 +192,30 @@ TableChunk TableChunk::copy(MemoryReservation& reservation) const {
br->release(reservation, nbytes);
return TableChunk(std::move(table), stream());
}
case MemoryType::HOST:
case MemoryType::PINNED_HOST:
// Case 2.
if (packed_data_ == nullptr) {
case MemoryType::PINNED_HOST: // Case 2a.
{
StreamOrderedTiming timing{stream(), br->statistics()};

// use cudf pack with pinned mr
auto packed_pinned = cudf::pack(table_view(), stream(), br->pinned_mr());
auto nbytes = packed_pinned.gpu_data->size();

br->statistics()->record_copy(
MemoryType::DEVICE, MemoryType::PINNED_HOST, nbytes, std::move(timing)
);
// update the provided `reservation`
br->release(reservation, nbytes);
auto host_buffer = br->move(
std::move(packed_pinned.gpu_data), stream(), MemoryType::PINNED_HOST
);
return TableChunk(
std::make_unique<PackedData>(
std::move(packed_pinned.metadata), std::move(host_buffer)
)
);
}
case MemoryType::HOST: // Case 2b.
{
// We use libcudf's pack() to serialize `table_view()` into a
// packed_columns and then we move the packed_columns' gpu_data to a
// new host buffer.
Expand Down Expand Up @@ -218,15 +246,15 @@ TableChunk TableChunk::copy(MemoryReservation& reservation) const {
packed_data->data = br->move(std::move(packed_data->data), reservation);
return TableChunk(std::move(packed_data));
}
break;
default:
RAPIDSMPF_FAIL("MemoryType: unknown");
}
}
// Note, `is_available() == false` implies `packed_data_ != nullptr`.
RAPIDSMPF_EXPECTS(packed_data_ != nullptr, "something went wrong");

// Case 3.
// Case 3. The chunk data is already packed (packed_data_ != nullptr). We need
// to copy the packed data into the reservation-specified memory type.
auto const nbytes = packed_data_->data->size;
auto metadata = std::make_unique<std::vector<std::uint8_t>>(*packed_data_->metadata);
auto data = br->allocate(nbytes, packed_data_->stream(), reservation);
Expand Down
Loading