Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 18 additions & 28 deletions cpp/include/rapidsmpf/memory/host_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
#include <span>
#include <vector>

#include <cuda/memory_resource>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_buffer.hpp>
#include <rmm/resource_ref.hpp>

#include <rapidsmpf/error.hpp>
#include <rapidsmpf/memory/host_memory_resource.hpp>
#include <rapidsmpf/memory/pinned_memory_resource.hpp>

namespace rapidsmpf {

Expand All @@ -29,17 +30,6 @@ namespace rapidsmpf {
*/
class HostBuffer {
public:
/**
* @brief Type-erased deleter for owned storage.
*
* This deleter holds a callable that releases the underlying storage when invoked.
* It enables `HostBuffer` to take ownership of different storage types
* (e.g., `rmm::device_buffer`, `std::vector<std::uint8_t>`) without exposing their
* types. The deleter captures the owned object and destroys it when the deleter
* itself is destroyed (the `void*` parameter is ignored).
*/
using OwnedStorageDeleter = std::function<void(void*)>;

/**
* @brief Allocate a new host buffer.
*
Expand All @@ -48,10 +38,15 @@ class HostBuffer {
*
* @param size Number of bytes to allocate.
* @param stream CUDA stream on which allocation and deallocation occur.
* @param mr RMM host memory resource used for allocation.
* @param mr Host-accessible memory resource used for allocation. Taken by value
* so the buffer shares ownership of the resource (e.g. bumps the refcount
* when constructed from a shared-ownership resource); an implicit
* conversion from `rmm::host_async_resource_ref` is also supported.
*/
HostBuffer(
std::size_t size, rmm::cuda_stream_view stream, rmm::host_async_resource_ref mr
std::size_t size,
rmm::cuda_stream_view stream,
cuda::mr::any_resource<cuda::mr::host_accessible> mr
);

~HostBuffer() noexcept;
Expand Down Expand Up @@ -176,14 +171,11 @@ class HostBuffer {
*
* @param data Vector to take ownership of (will be moved).
* @param stream CUDA stream to associate with this buffer.
* @param mr Host memory resource used to allocate the buffer.
*
* @return A new `HostBuffer` owning the vector's memory.
*/
static HostBuffer from_owned_vector(
std::vector<std::uint8_t>&& data,
rmm::cuda_stream_view stream,
rmm::host_async_resource_ref mr
std::vector<std::uint8_t>&& data, rmm::cuda_stream_view stream
);

/**
Expand All @@ -199,7 +191,6 @@ class HostBuffer {
*
* @param pinned_host_buffer Device buffer to take ownership of.
* @param stream CUDA stream to associate with this buffer.
* @param mr Pinned host memory resource used to allocate the buffer.
*
* @return A new `HostBuffer` owning the device buffer's memory.
*
Expand All @@ -212,8 +203,7 @@ class HostBuffer {
*/
static HostBuffer from_rmm_device_buffer(
std::unique_ptr<rmm::device_buffer> pinned_host_buffer,
rmm::cuda_stream_view stream,
PinnedMemoryResource& mr
rmm::cuda_stream_view stream
);

private:
Expand All @@ -222,21 +212,21 @@ class HostBuffer {
*
* @param span View of the owned memory.
* @param stream CUDA stream associated with this buffer.
* @param mr Dummy memory resource (not used for deallocation).
* @param owned_storage Unique pointer managing the owned storage lifetime.
* @param deallocate_fn Callable invoked with the current stream to release the
* underlying memory. It captures all resources needed for deallocation (e.g.
* memory resource, raw pointer, size).
*/
HostBuffer(
std::span<std::byte> span,
rmm::cuda_stream_view stream,
rmm::host_async_resource_ref mr,
std::unique_ptr<void, OwnedStorageDeleter> owned_storage
std::function<void(rmm::cuda_stream_view)> deallocate_fn
);

rmm::cuda_stream_view stream_;
rmm::host_async_resource_ref mr_;
std::span<std::byte> span_{};
/// @brief Optional owned storage that will be released when the buffer is destroyed.
std::unique_ptr<void, OwnedStorageDeleter> owned_storage_{nullptr, [](void*) {}};
/// @brief Callable that releases the underlying memory when invoked with the current
/// stream. Null when the buffer is empty.
std::function<void(rmm::cuda_stream_view)> deallocate_fn_{};
};

} // namespace rapidsmpf
74 changes: 34 additions & 40 deletions cpp/src/memory/host_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,47 +14,42 @@
namespace rapidsmpf {

HostBuffer::HostBuffer(
std::size_t size, rmm::cuda_stream_view stream, rmm::host_async_resource_ref mr
std::size_t size,
rmm::cuda_stream_view stream,
cuda::mr::any_resource<cuda::mr::host_accessible> mr
)
: stream_{stream}, mr_{std::move(mr)} {
: stream_{stream} {
if (size > 0) {
auto* ptr = static_cast<std::byte*>(
mr_.allocate(stream_, size, alignof(::cuda::std::max_align_t))
mr.allocate(stream_, size, alignof(::cuda::std::max_align_t))
);
span_ = std::span<std::byte>{ptr, size};
deallocate_fn_ =
[mr = std::move(mr), ptr, size](rmm::cuda_stream_view s) mutable {
mr.deallocate(s, ptr, size, alignof(::cuda::std::max_align_t));
};
}
}

HostBuffer::HostBuffer(
std::span<std::byte> span,
rmm::cuda_stream_view stream,
rmm::host_async_resource_ref mr,
std::unique_ptr<void, OwnedStorageDeleter> owned_storage
std::function<void(rmm::cuda_stream_view)> deallocate_fn
)
: stream_{stream},
mr_{std::move(mr)},
span_{span},
owned_storage_{std::move(owned_storage)} {}
: stream_{stream}, span_{span}, deallocate_fn_{std::move(deallocate_fn)} {}

void HostBuffer::deallocate_async() noexcept {
if (!span_.empty()) {
// If we have owned storage, release it; otherwise deallocate via mr_.
if (owned_storage_) {
owned_storage_.reset();
} else {
mr_.deallocate(
stream_, span_.data(), span_.size(), alignof(::cuda::std::max_align_t)
);
}
if (!span_.empty() && deallocate_fn_) {
deallocate_fn_(stream_);
deallocate_fn_ = nullptr;
}
span_ = {};
}

HostBuffer::HostBuffer(HostBuffer&& other) noexcept
: stream_{other.stream_},
mr_{other.mr_},
span_{std::exchange(other.span_, {})},
owned_storage_{std::move(other.owned_storage_)} {}
deallocate_fn_{std::exchange(other.deallocate_fn_, {})} {}

HostBuffer& HostBuffer::operator=(HostBuffer&& other) {
if (this != &other) {
Expand All @@ -64,9 +59,8 @@ HostBuffer& HostBuffer::operator=(HostBuffer&& other) {
std::invalid_argument
);
stream_ = other.stream_;
mr_ = other.mr_;
span_ = std::exchange(other.span_, {});
owned_storage_ = std::move(other.owned_storage_);
deallocate_fn_ = std::exchange(other.deallocate_fn_, {});
}
return *this;
}
Expand Down Expand Up @@ -125,27 +119,24 @@ HostBuffer HostBuffer::from_uint8_vector(
}

HostBuffer HostBuffer::from_owned_vector(
std::vector<std::uint8_t>&& data,
rmm::cuda_stream_view stream,
rmm::host_async_resource_ref mr
std::vector<std::uint8_t>&& data, rmm::cuda_stream_view stream
) {
// Wrap in shared_ptr so the lambda is copyable (required by std::function).
auto shared_vec = std::make_shared<std::vector<std::uint8_t>>(std::move(data));
auto* ptr = reinterpret_cast<std::byte*>(shared_vec->data());
auto size = shared_vec->size();
std::span<std::byte> span{ptr, size};
std::span<std::byte> span{ptr, shared_vec->size()};

std::unique_ptr<void, OwnedStorageDeleter> owned_storage{
ptr, [shared_vec_ = std::move(shared_vec)](void*) mutable { shared_vec_.reset(); }
return HostBuffer{
span,
stream,
[shared_vec_ = std::move(shared_vec)](rmm::cuda_stream_view) mutable {
shared_vec_.reset();
}
};

return HostBuffer{span, stream, std::move(mr), std::move(owned_storage)};
}

HostBuffer HostBuffer::from_rmm_device_buffer(
std::unique_ptr<rmm::device_buffer> pinned_host_buffer,
rmm::cuda_stream_view stream,
PinnedMemoryResource& mr
std::unique_ptr<rmm::device_buffer> pinned_host_buffer, rmm::cuda_stream_view stream
) {
RAPIDSMPF_EXPECTS(
pinned_host_buffer != nullptr,
Expand All @@ -161,14 +152,17 @@ HostBuffer HostBuffer::from_rmm_device_buffer(

// Wrap in shared_ptr so the lambda is copyable (required by std::function).
auto shared_db = std::make_shared<rmm::device_buffer>(std::move(*pinned_host_buffer));
auto* ptr = static_cast<std::byte*>(shared_db->data());
std::span<std::byte> span{ptr, shared_db->size()};

std::unique_ptr<void, OwnedStorageDeleter> owned_storage{
ptr, [shared_db_ = std::move(shared_db)](void*) mutable { shared_db_.reset(); }
std::span<std::byte> span{
static_cast<std::byte*>(shared_db->data()), shared_db->size()
};

return HostBuffer{std::move(span), stream, mr, std::move(owned_storage)};
return HostBuffer{
std::move(span),
stream,
[shared_db_ = std::move(shared_db)](rmm::cuda_stream_view) mutable {
shared_db_.reset();
}
};
}

} // namespace rapidsmpf
30 changes: 27 additions & 3 deletions cpp/tests/test_host_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ TEST_P(HostMemoryResource, from_owned_vector) {

// Create a host buffer by taking ownership of a vector
auto buffer = rapidsmpf::HostBuffer::from_owned_vector(
std::vector<std::uint8_t>(source_data), stream, mr
std::vector<std::uint8_t>(source_data), stream
);

EXPECT_NO_THROW(test_buffer(std::move(buffer), source_data));
Expand Down Expand Up @@ -181,7 +181,7 @@ TEST_P(PinnedResource, from_owned_vector) {

// Create a host buffer by taking ownership of a vector
auto buffer = rapidsmpf::HostBuffer::from_owned_vector(
std::vector<std::uint8_t>(source_data), stream, *mr
std::vector<std::uint8_t>(source_data), stream
);

EXPECT_NO_THROW(test_buffer(std::move(buffer), source_data));
Expand All @@ -200,7 +200,7 @@ TEST_P(PinnedResource, from_rmm_device_buffer) {

// Create a host buffer by taking ownership of an rmm::device_buffer
auto buffer = rapidsmpf::HostBuffer::from_rmm_device_buffer(
std::move(pinned_host_buffer), stream, *mr
std::move(pinned_host_buffer), stream
);

EXPECT_NO_THROW(test_buffer(std::move(buffer), source_data));
Expand All @@ -217,6 +217,30 @@ TEST(PinnedResource, equality) {
EXPECT_NE(mr1, mr3);
}

TEST(PinnedResource, transient_mr) {
auto mr = rapidsmpf::PinnedMemoryResource::make_if_available();
if (mr == rapidsmpf::PinnedMemoryResource::Disabled) {
GTEST_SKIP() << "PinnedMemoryResource is not supported";
}
rmm::cuda_stream_view stream{};

auto source_data = random_vector<std::uint8_t>(0, 1024);

// create a pinned host buffer using mr
auto pinned_host_buffer = std::make_unique<rmm::device_buffer>(
source_data.data(), source_data.size(), stream, *mr
);

// now reset mr, but pinned_host_buffer should keep the shared mr alive
mr.reset();

auto buffer = rapidsmpf::HostBuffer::from_rmm_device_buffer(
std::move(pinned_host_buffer), stream
);

EXPECT_NO_THROW(test_buffer(std::move(buffer), source_data));
}

namespace {

/// Discover the actual pool size the driver creates when a small max is requested.
Expand Down
Loading