Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions ci/run_cpp_benchmark_smoketests.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/bin/bash
# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES.
# SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0

set -xeuo pipefail
Expand All @@ -21,8 +21,8 @@ python "${TIMEOUT_TOOL_PATH}" 30 \
python "${TIMEOUT_TOOL_PATH}" 30 \
mpirun --map-by node --bind-to none -np 3 ./bench_comm -m cuda

python "${TIMEOUT_TOOL_PATH}" 30 \
./bench_memory_resources --benchmark_min_time=0s
RAPIDSMPF_SMOKE_TEST_MODE="ON" \
python "${TIMEOUT_TOOL_PATH}" 30 ./bench_memory_resources

python "${TIMEOUT_TOOL_PATH}" 30 \
./bench_streaming_shuffle -m cuda
Expand Down
96 changes: 89 additions & 7 deletions cpp/benchmarks/bench_memory_resources.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

#include <cstdlib>
#include <cstring>

#include <benchmark/benchmark.h>
Expand All @@ -15,9 +16,28 @@
#include <rapidsmpf/memory/cuda_memcpy_async.hpp>
#include <rapidsmpf/memory/host_memory_resource.hpp>
#include <rapidsmpf/memory/pinned_memory_resource.hpp>
#include <rapidsmpf/utils/string.hpp>

using rapidsmpf::safe_cast;

// When the RAPIDSMPF_SMOKE_TEST_MODE env var is set to a truthy value (e.g.
// "1", "on", "true", "yes"), each benchmark's argument generator emits only a
// tiny subset of cases so the suite finishes quickly during CI smoke tests.
// Cached because Apply callbacks invoke this once per registered benchmark.
//
// We use an env var rather than a CLI flag because google-benchmark's
// BENCHMARK(...)->Apply(...) macros run during static initialization, before
// main() has a chance to parse argv. A CLI-flag approach would require moving
// every benchmark registration into main() (via benchmark::RegisterBenchmark),
// which is more invasive. std::getenv works fine during static init.
Comment on lines +28 to +32
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whelp, this is lame. 😞

static bool smoke_test_mode() {
static bool const value = []() {
char const* env = std::getenv("RAPIDSMPF_SMOKE_TEST_MODE");
return env != nullptr && rapidsmpf::parse_string<bool>(env);
}();
return value;
}

enum ResourceType : int {
NEW_DELETE = 0,
HOST_MEMORY_RESOURCE = 1,
Expand Down Expand Up @@ -351,11 +371,15 @@ void BM_DeviceToDeviceCopy(benchmark::State& state) {

// Custom argument generator for the benchmark
void CustomArguments(benchmark::Benchmark* b) {
// Test different allocation sizes
for (auto size : {1 << 10, 500 << 10, 1 << 20, 500 << 20, 1 << 30}) {
// Test all memory resource types
constexpr std::array all_sizes{1 << 10, 500 << 10, 1 << 20, 500 << 20, 1 << 30};
auto num_sizes = all_sizes.size();
if (smoke_test_mode()) {
num_sizes = 1;
b->Iterations(1);
}
for (std::size_t i = 0; i < num_sizes; ++i) {
for (auto resource_type : RESOURCE_TYPES) {
b->Args({size, resource_type});
b->Args({all_sizes[i], resource_type});
}
}
}
Expand Down Expand Up @@ -435,9 +459,15 @@ void BM_PinnedFirstAlloc_InitialPoolSize(benchmark::State& state) {
}

void PinnedFirstAlloc_InitialPoolSize_Args(benchmark::Benchmark* b) {
for (auto size : {1, 256, 1024}) { // in MB
b->Args({size, 1}); // primed
b->Args({size, 0}); // no priming
constexpr std::array all_sizes_mb{1, 256, 1024};
auto num_sizes = all_sizes_mb.size();
if (smoke_test_mode()) {
num_sizes = 1;
b->Iterations(1);
}
for (std::size_t i = 0; i < num_sizes; ++i) {
b->Args({all_sizes_mb[i], 1}); // primed
b->Args({all_sizes_mb[i], 0}); // no priming
}
}

Expand All @@ -446,4 +476,56 @@ BENCHMARK(BM_PinnedFirstAlloc_InitialPoolSize)
->UseRealTime()
->Unit(benchmark::kMicrosecond);

// Pool initialization time as a function of initial pool size.
// max_pool_size is fixed at 100% of host memory per GPU.
// initial_pool_size sweeps 0%, 10%, 20%, ..., 100% of max_pool_size.
void BM_PinnedPoolInit_InitialPoolSize(benchmark::State& state) {
if (!rapidsmpf::is_pinned_memory_resources_supported()) {
state.SkipWithMessage("pinned memory not supported on system");
return;
}

// Ensure CUDA device context is initialized.
RAPIDSMPF_CUDA_TRY(cudaFree(nullptr));

auto const pct = safe_cast<std::size_t>(state.range(0));
std::size_t const max_pool_size = rapidsmpf::get_host_memory_per_gpu();
std::size_t const initial_pool_size =
safe_cast<std::size_t>(max_pool_size * pct / 100);

rapidsmpf::PinnedPoolProperties props{
.initial_pool_size = initial_pool_size,
.max_pool_size = max_pool_size,
};

for (auto _ : state) {
auto mr = rapidsmpf::PinnedMemoryResource::make_if_available(
rapidsmpf::get_current_numa_node(), props
);
benchmark::DoNotOptimize(mr);
// Destroy mr at end of iteration (pool teardown excluded from timing).
state.PauseTiming();
mr.reset();
state.ResumeTiming();
}

state.counters["initial_pool_size_bytes"] = static_cast<double>(initial_pool_size);
state.counters["max_pool_size_bytes"] = static_cast<double>(max_pool_size);
state.counters["initial_pool_pct"] = static_cast<double>(pct);
}

void PinnedPoolInit_InitialPoolSize_Args(benchmark::Benchmark* b) {
if (smoke_test_mode()) {
b->Iterations(1);
b->Args({1}); // only do 1% for the smoke test
} else {
b->DenseRange(0, 100, 10);
}
}

BENCHMARK(BM_PinnedPoolInit_InitialPoolSize)
->Apply(PinnedPoolInit_InitialPoolSize_Args)
->UseRealTime()
->Unit(benchmark::kMillisecond);

BENCHMARK_MAIN();
26 changes: 26 additions & 0 deletions cpp/include/rapidsmpf/memory/pinned_memory_resource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,25 @@ class PinnedMemoryResource final
/// @brief Sentinel value indicating that pinned host memory is disabled.
static constexpr std::nullopt_t Disabled = std::nullopt;

/**
* @brief Fraction of total host memory per GPU used as the initial pinned pool size
* when no explicit `pinned_initial_pool_size` option is provided.
*
* Applied as: `initial_pool_size = get_host_memory_per_gpu() *
* DefaultInitiPoolSizeFactor`.
*/
static constexpr std::string_view DefaultInitiPoolSizeFactor = "10%";

/**
* @brief Fraction of total host memory per GPU used as the maximum pinned pool size
* when no explicit `pinned_max_pool_size` option is provided.
*
* Applied as: `max_pool_size = get_host_memory_per_gpu() *
* DefaultMaxPoolSizeFactor`. `get_host_memory_per_gpu()` is computed as total
* host memory divided by the number of GPUs visible to the system.
*/
static constexpr std::string_view DefaultMaxPoolSizeFactor = "80%";

/**
* @brief Create a pinned memory resource if the system supports pinned memory.
*
Expand All @@ -118,6 +137,13 @@ class PinnedMemoryResource final
/**
* @brief Construct from configuration options.
*
* Recognized options:
* - `pinned_memory` (bool): enables pinned memory; defaults to `true`.
* - `pinned_initial_pool_size` (nbytes string): initial pool size; defaults to
* `get_host_memory_per_gpu() * DefaultInitiPoolSizeFactor`.
* - `pinned_max_pool_size` (nbytes string or empty): maximum pool size; defaults to
* `get_host_memory_per_gpu() * DefaultMaxPoolSizeFactor`.
*
* @param options Configuration options.
*
* @return A `PinnedMemoryResource` if pinned memory is enabled and supported,
Expand Down
11 changes: 11 additions & 0 deletions cpp/include/rapidsmpf/system_info.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,16 @@ std::vector<int> get_current_numa_nodes() noexcept;
*/
std::uint64_t get_numa_node_host_memory(int numa_id = get_current_numa_node()) noexcept;

/**
* @brief Get the amount of host memory per GPU.
*
Comment thread
nirandaperera marked this conversation as resolved.
* This is calculated as the total host memory available for the current NUMA node divided
* by the number of GPUs bound to that NUMA node.
*
* @throws std::runtime_error if no GPUs are found on the current NUMA node.
*
* @return Amount of host memory per GPU in bytes.
*/
std::uint64_t get_host_memory_per_gpu();

} // namespace rapidsmpf
22 changes: 14 additions & 8 deletions cpp/src/memory/pinned_memory_resource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,27 @@ std::optional<PinnedMemoryResource> PinnedMemoryResource::from_options(
config::Options options
) {
bool const pinned_memory = options.get<bool>("pinned_memory", [](auto const& s) {
return parse_string<bool>(s.empty() ? "True" : s);
return s.empty() ? true : parse_string<bool>(s);
});

if (pinned_memory && is_pinned_memory_resources_supported()) {
auto const host_memory_per_gpu = get_host_memory_per_gpu();
PinnedPoolProperties pool_properties{
.initial_pool_size = options.get<size_t>(
"pinned_initial_pool_size",
[](auto const& s) { return s.empty() ? 0 : parse_nbytes_unsigned(s); }
[&](auto const& s) {
return parse_nbytes_or_percent(
s.empty() ? DefaultInitiPoolSizeFactor : s,
safe_cast<double>(host_memory_per_gpu)
);
}
),
.max_pool_size = options.get<std::optional<size_t>>(
"pinned_max_pool_size", [](auto const& s) -> std::optional<size_t> {
auto parsed = parse_optional(s);
if (parsed.has_value() && !parsed->empty()) {
return parse_nbytes_unsigned(*parsed);
}
return std::nullopt;
"pinned_max_pool_size", [&](auto const& s) {
return parse_nbytes_or_percent(
s.empty() ? DefaultMaxPoolSizeFactor : s,
safe_cast<double>(host_memory_per_gpu)
);
}
)
};
Expand Down
35 changes: 35 additions & 0 deletions cpp/src/system_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@
*/


#include <algorithm>

#include <sched.h>
#include <unistd.h>

#include <cucascade/memory/topology_discovery.hpp>

#include <rapidsmpf/error.hpp>
#include <rapidsmpf/system_info.hpp>

Expand Down Expand Up @@ -79,4 +83,35 @@ std::uint64_t get_numa_node_host_memory([[maybe_unused]] int numa_id) noexcept {
return safe_cast<std::uint64_t>(ret);
}

namespace {
const auto& get_topology() {
static const auto topo = [] {
cucascade::memory::topology_discovery discovery;
RAPIDSMPF_EXPECTS(
discovery.discover(), "Failed to discover system topology", std::runtime_error
);
return discovery;
}();
return topo.get_topology();
}
} // namespace

std::uint64_t get_host_memory_per_gpu() {
auto const current_numa_node = get_current_numa_node();
auto const& gpus = get_topology().gpus;
// gpu.numa_node == -1 means the kernel has no NUMA affinity info for the
// device (common in VMs and single-socket machines without ACPI SRAT/SLIT
// entries for PCIe). Treat those GPUs as local to every NUMA node.
auto const num_local_gpus = std::ranges::count_if(gpus, [&](auto const& gpu) {
return gpu.numa_node == current_numa_node || gpu.numa_node == -1;
});
RAPIDSMPF_EXPECTS(
num_local_gpus > 0,
"No GPUs found on current NUMA node " + std::to_string(current_numa_node),
std::runtime_error
);
return get_numa_node_host_memory(current_numa_node)
/ safe_cast<std::uint64_t>(num_local_gpus);
}

} // namespace rapidsmpf
1 change: 0 additions & 1 deletion cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,6 @@ add_executable(
"${PROJECT_SOURCE_DIR}/src/bootstrap/bootstrap.cpp"
"${PROJECT_SOURCE_DIR}/src/bootstrap/file_backend.cpp"
"${PROJECT_SOURCE_DIR}/src/bootstrap/utils.cpp"
"${PROJECT_SOURCE_DIR}/src/system_info.cpp"
"$<$<BOOL:${RAPIDSMPF_HAVE_SLURM}>:${PROJECT_SOURCE_DIR}/src/bootstrap/slurm_backend.cpp>"
)
set_target_properties(
Expand Down
27 changes: 26 additions & 1 deletion cpp/tests/test_host_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@
#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_buffer.hpp>

#include <rapidsmpf/config.hpp>
#include <rapidsmpf/cuda_stream.hpp>
#include <rapidsmpf/memory/pinned_memory_resource.hpp>
#include <rapidsmpf/system_info.hpp>
#include <rapidsmpf/utils/misc.hpp>
#include <rapidsmpf/utils/string.hpp>

#include "utils.hpp"

Expand Down Expand Up @@ -265,7 +268,7 @@ std::size_t discover_pinned_pool_actual_size(

} // namespace

TEST(PinnedResourceMaxSize, max_pool_size_limit) {
TEST(PinnedResource, max_pool_size_limit) {
// Ensure CUDA device context is initialized (required for pinned memory pools).
RAPIDSMPF_CUDA_TRY(cudaFree(nullptr));
auto stream = cudf::get_default_stream();
Expand All @@ -292,3 +295,25 @@ TEST(PinnedResourceMaxSize, max_pool_size_limit) {
EXPECT_THROW(alloc_and_dealloc(actual_pool_size + 1), cuda::cuda_error);
stream.synchronize();
}

TEST(PinnedResource, from_default_options) {
auto mr = rapidsmpf::PinnedMemoryResource::from_options(rapidsmpf::config::Options{});
if (mr == rapidsmpf::PinnedMemoryResource::Disabled) {
GTEST_SKIP() << "PinnedMemoryResource is not supported";
}
EXPECT_EQ(
mr->properties().initial_pool_size,
rapidsmpf::parse_nbytes_or_percent(
rapidsmpf::PinnedMemoryResource::DefaultInitiPoolSizeFactor,
static_cast<double>(rapidsmpf::get_host_memory_per_gpu())
)
);
EXPECT_EQ(
mr->properties().max_pool_size.value(),
rapidsmpf::parse_nbytes_or_percent(
rapidsmpf::PinnedMemoryResource::DefaultMaxPoolSizeFactor,
static_cast<double>(rapidsmpf::get_host_memory_per_gpu())

)
);
}
20 changes: 11 additions & 9 deletions docs/source/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ rapidsmpf::config::Options options{rapidsmpf::config::get_environment_variables(

- **`pinned_memory`**
- **Environment Variable**: `RAPIDSMPF_PINNED_MEMORY`
- **Default**: `false`
- **Default**: `true`
- **Description**: Enables pinned host memory if it is available on the system.
Pinned host memory provides higher bandwidth and lower latency for device-to-host
transfers compared to regular pageable host memory. When enabled, RapidsMPF
Expand All @@ -105,17 +105,19 @@ rapidsmpf::config::Options options{rapidsmpf::config::get_environment_variables(

- **`pinned_initial_pool_size`**
- **Environment Variable**: `RAPIDSMPF_PINNED_INITIAL_POOL_SIZE`
- **Default**: `0`
- **Description**: Initial size (in bytes) of the pinned host memory pool when
`pinned_memory` is enabled. A value of `0` means the pool starts empty and grows
on demand. Accepts byte counts (e.g. `"1GiB"`, `"512MiB"`).
- **Default**: 10% of per-GPU host memory
- **Description**: Initial size of the pinned host memory pool when `pinned_memory` is
enabled. When unset or empty, the pool is pre-allocated to 10% of total host memory
available in the current NUMA node divided by the number of GPUs in that NUMA node.
Accepts byte counts or percentage (e.g. `"1GiB"`, `"512MiB"`).

- **`pinned_max_pool_size`**
- **Environment Variable**: `RAPIDSMPF_PINNED_MAX_POOL_SIZE`
- **Default**: `"disabled"`
- **Description**: Maximum size (in bytes) of the pinned host memory pool when
`pinned_memory` is enabled. When unset or empty, the pool is allowed to grow
without an upper bound. Accepts byte counts (e.g. `"4GiB"`, `"2048MiB"`).
- **Default**: 80% of per-GPU host memory
- **Description**: Maximum size of the pinned host memory pool when `pinned_memory` is
enabled. When unset or empty, the pool is capped at 80% of total host memory
available in the current NUMA node divided by the number of GPUs in that NUMA node.
Accepts byte counts or percentage (e.g. `"4GiB"`, `"2048MiB"`).

- **`spill_device_limit`**
- **Environment Variable**: `RAPIDSMPF_SPILL_DEVICE_LIMIT`
Expand Down
1 change: 1 addition & 0 deletions python/rapidsmpf/rapidsmpf/utils/system_info.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
def get_total_host_memory() -> int: ...
def get_current_numa_node() -> int: ...
def get_numa_node_host_memory(numa_id: int | None = None) -> int: ...
def get_host_memory_per_gpu() -> int: ...
Loading
Loading