diff --git a/cpp/include/rapidsmpf/memory/host_buffer.hpp b/cpp/include/rapidsmpf/memory/host_buffer.hpp index 3f3934664..a7140b0b4 100644 --- a/cpp/include/rapidsmpf/memory/host_buffer.hpp +++ b/cpp/include/rapidsmpf/memory/host_buffer.hpp @@ -13,13 +13,14 @@ #include #include +#include + #include #include #include #include #include -#include namespace rapidsmpf { @@ -29,17 +30,6 @@ namespace rapidsmpf { */ class HostBuffer { public: - /** - * @brief Type-erased deleter for owned storage. - * - * This deleter holds a callable that releases the underlying storage when invoked. - * It enables `HostBuffer` to take ownership of different storage types - * (e.g., `rmm::device_buffer`, `std::vector`) without exposing their - * types. The deleter captures the owned object and destroys it when the deleter - * itself is destroyed (the `void*` parameter is ignored). - */ - using OwnedStorageDeleter = std::function; - /** * @brief Allocate a new host buffer. * @@ -48,10 +38,15 @@ class HostBuffer { * * @param size Number of bytes to allocate. * @param stream CUDA stream on which allocation and deallocation occur. - * @param mr RMM host memory resource used for allocation. + * @param mr Host-accessible memory resource used for allocation. Taken by value + * so the buffer shares ownership of the resource (e.g. bumps the refcount + * when constructed from a shared-ownership resource); an implicit + * conversion from `rmm::host_async_resource_ref` is also supported. */ HostBuffer( - std::size_t size, rmm::cuda_stream_view stream, rmm::host_async_resource_ref mr + std::size_t size, + rmm::cuda_stream_view stream, + cuda::mr::any_resource mr ); ~HostBuffer() noexcept; @@ -176,14 +171,11 @@ class HostBuffer { * * @param data Vector to take ownership of (will be moved). * @param stream CUDA stream to associate with this buffer. - * @param mr Host memory resource used to allocate the buffer. * * @return A new `HostBuffer` owning the vector's memory. */ static HostBuffer from_owned_vector( - std::vector&& data, - rmm::cuda_stream_view stream, - rmm::host_async_resource_ref mr + std::vector&& data, rmm::cuda_stream_view stream ); /** @@ -199,7 +191,6 @@ class HostBuffer { * * @param pinned_host_buffer Device buffer to take ownership of. * @param stream CUDA stream to associate with this buffer. - * @param mr Pinned host memory resource used to allocate the buffer. * * @return A new `HostBuffer` owning the device buffer's memory. * @@ -212,8 +203,7 @@ class HostBuffer { */ static HostBuffer from_rmm_device_buffer( std::unique_ptr pinned_host_buffer, - rmm::cuda_stream_view stream, - PinnedMemoryResource& mr + rmm::cuda_stream_view stream ); private: @@ -222,21 +212,21 @@ class HostBuffer { * * @param span View of the owned memory. * @param stream CUDA stream associated with this buffer. - * @param mr Dummy memory resource (not used for deallocation). - * @param owned_storage Unique pointer managing the owned storage lifetime. + * @param deallocate_fn Callable invoked with the current stream to release the + * underlying memory. It captures all resources needed for deallocation (e.g. + * memory resource, raw pointer, size). */ HostBuffer( std::span span, rmm::cuda_stream_view stream, - rmm::host_async_resource_ref mr, - std::unique_ptr owned_storage + std::function deallocate_fn ); rmm::cuda_stream_view stream_; - rmm::host_async_resource_ref mr_; std::span span_{}; - /// @brief Optional owned storage that will be released when the buffer is destroyed. - std::unique_ptr owned_storage_{nullptr, [](void*) {}}; + /// @brief Callable that releases the underlying memory when invoked with the current + /// stream. Null when the buffer is empty. + std::function deallocate_fn_{}; }; } // namespace rapidsmpf diff --git a/cpp/src/memory/host_buffer.cpp b/cpp/src/memory/host_buffer.cpp index bf376131a..43c288c2a 100644 --- a/cpp/src/memory/host_buffer.cpp +++ b/cpp/src/memory/host_buffer.cpp @@ -14,47 +14,42 @@ namespace rapidsmpf { HostBuffer::HostBuffer( - std::size_t size, rmm::cuda_stream_view stream, rmm::host_async_resource_ref mr + std::size_t size, + rmm::cuda_stream_view stream, + cuda::mr::any_resource mr ) - : stream_{stream}, mr_{std::move(mr)} { + : stream_{stream} { if (size > 0) { auto* ptr = static_cast( - mr_.allocate(stream_, size, alignof(::cuda::std::max_align_t)) + mr.allocate(stream_, size, alignof(::cuda::std::max_align_t)) ); span_ = std::span{ptr, size}; + deallocate_fn_ = + [mr = std::move(mr), ptr, size](rmm::cuda_stream_view s) mutable { + mr.deallocate(s, ptr, size, alignof(::cuda::std::max_align_t)); + }; } } HostBuffer::HostBuffer( std::span span, rmm::cuda_stream_view stream, - rmm::host_async_resource_ref mr, - std::unique_ptr owned_storage + std::function deallocate_fn ) - : stream_{stream}, - mr_{std::move(mr)}, - span_{span}, - owned_storage_{std::move(owned_storage)} {} + : stream_{stream}, span_{span}, deallocate_fn_{std::move(deallocate_fn)} {} void HostBuffer::deallocate_async() noexcept { - if (!span_.empty()) { - // If we have owned storage, release it; otherwise deallocate via mr_. - if (owned_storage_) { - owned_storage_.reset(); - } else { - mr_.deallocate( - stream_, span_.data(), span_.size(), alignof(::cuda::std::max_align_t) - ); - } + if (!span_.empty() && deallocate_fn_) { + deallocate_fn_(stream_); + deallocate_fn_ = nullptr; } span_ = {}; } HostBuffer::HostBuffer(HostBuffer&& other) noexcept : stream_{other.stream_}, - mr_{other.mr_}, span_{std::exchange(other.span_, {})}, - owned_storage_{std::move(other.owned_storage_)} {} + deallocate_fn_{std::exchange(other.deallocate_fn_, {})} {} HostBuffer& HostBuffer::operator=(HostBuffer&& other) { if (this != &other) { @@ -64,9 +59,8 @@ HostBuffer& HostBuffer::operator=(HostBuffer&& other) { std::invalid_argument ); stream_ = other.stream_; - mr_ = other.mr_; span_ = std::exchange(other.span_, {}); - owned_storage_ = std::move(other.owned_storage_); + deallocate_fn_ = std::exchange(other.deallocate_fn_, {}); } return *this; } @@ -125,27 +119,24 @@ HostBuffer HostBuffer::from_uint8_vector( } HostBuffer HostBuffer::from_owned_vector( - std::vector&& data, - rmm::cuda_stream_view stream, - rmm::host_async_resource_ref mr + std::vector&& data, rmm::cuda_stream_view stream ) { // Wrap in shared_ptr so the lambda is copyable (required by std::function). auto shared_vec = std::make_shared>(std::move(data)); auto* ptr = reinterpret_cast(shared_vec->data()); - auto size = shared_vec->size(); - std::span span{ptr, size}; + std::span span{ptr, shared_vec->size()}; - std::unique_ptr owned_storage{ - ptr, [shared_vec_ = std::move(shared_vec)](void*) mutable { shared_vec_.reset(); } + return HostBuffer{ + span, + stream, + [shared_vec_ = std::move(shared_vec)](rmm::cuda_stream_view) mutable { + shared_vec_.reset(); + } }; - - return HostBuffer{span, stream, std::move(mr), std::move(owned_storage)}; } HostBuffer HostBuffer::from_rmm_device_buffer( - std::unique_ptr pinned_host_buffer, - rmm::cuda_stream_view stream, - PinnedMemoryResource& mr + std::unique_ptr pinned_host_buffer, rmm::cuda_stream_view stream ) { RAPIDSMPF_EXPECTS( pinned_host_buffer != nullptr, @@ -161,14 +152,17 @@ HostBuffer HostBuffer::from_rmm_device_buffer( // Wrap in shared_ptr so the lambda is copyable (required by std::function). auto shared_db = std::make_shared(std::move(*pinned_host_buffer)); - auto* ptr = static_cast(shared_db->data()); - std::span span{ptr, shared_db->size()}; - - std::unique_ptr owned_storage{ - ptr, [shared_db_ = std::move(shared_db)](void*) mutable { shared_db_.reset(); } + std::span span{ + static_cast(shared_db->data()), shared_db->size() }; - return HostBuffer{std::move(span), stream, mr, std::move(owned_storage)}; + return HostBuffer{ + std::move(span), + stream, + [shared_db_ = std::move(shared_db)](rmm::cuda_stream_view) mutable { + shared_db_.reset(); + } + }; } } // namespace rapidsmpf diff --git a/cpp/tests/test_host_buffer.cpp b/cpp/tests/test_host_buffer.cpp index 408ed37e7..f324cee3d 100644 --- a/cpp/tests/test_host_buffer.cpp +++ b/cpp/tests/test_host_buffer.cpp @@ -111,7 +111,7 @@ TEST_P(HostMemoryResource, from_owned_vector) { // Create a host buffer by taking ownership of a vector auto buffer = rapidsmpf::HostBuffer::from_owned_vector( - std::vector(source_data), stream, mr + std::vector(source_data), stream ); EXPECT_NO_THROW(test_buffer(std::move(buffer), source_data)); @@ -181,7 +181,7 @@ TEST_P(PinnedResource, from_owned_vector) { // Create a host buffer by taking ownership of a vector auto buffer = rapidsmpf::HostBuffer::from_owned_vector( - std::vector(source_data), stream, *mr + std::vector(source_data), stream ); EXPECT_NO_THROW(test_buffer(std::move(buffer), source_data)); @@ -200,7 +200,7 @@ TEST_P(PinnedResource, from_rmm_device_buffer) { // Create a host buffer by taking ownership of an rmm::device_buffer auto buffer = rapidsmpf::HostBuffer::from_rmm_device_buffer( - std::move(pinned_host_buffer), stream, *mr + std::move(pinned_host_buffer), stream ); EXPECT_NO_THROW(test_buffer(std::move(buffer), source_data)); @@ -217,6 +217,30 @@ TEST(PinnedResource, equality) { EXPECT_NE(mr1, mr3); } +TEST(PinnedResource, transient_mr) { + auto mr = rapidsmpf::PinnedMemoryResource::make_if_available(); + if (mr == rapidsmpf::PinnedMemoryResource::Disabled) { + GTEST_SKIP() << "PinnedMemoryResource is not supported"; + } + rmm::cuda_stream_view stream{}; + + auto source_data = random_vector(0, 1024); + + // create a pinned host buffer using mr + auto pinned_host_buffer = std::make_unique( + source_data.data(), source_data.size(), stream, *mr + ); + + // now reset mr, but pinned_host_buffer should keep the shared mr alive + mr.reset(); + + auto buffer = rapidsmpf::HostBuffer::from_rmm_device_buffer( + std::move(pinned_host_buffer), stream + ); + + EXPECT_NO_THROW(test_buffer(std::move(buffer), source_data)); +} + namespace { /// Discover the actual pool size the driver creates when a small max is requested.