Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions cpp/benchmarks/bench_comm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,16 +317,7 @@ int main(int argc, char** argv) {
set_current_rmm_resource(args.rmm_mr);

rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref();
BufferResource br{
mr,
PinnedMemoryResource::Disabled,
{},
std::chrono::milliseconds{1},
std::make_shared<rmm::cuda_stream_pool>(
16, rmm::cuda_stream::flags::non_blocking
),
stats
};
BufferResource br{stats, mr};

// Print benchmark/hardware info.
{
Expand Down
4 changes: 2 additions & 2 deletions cpp/benchmarks/bench_partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ static void BM_PartitionAndPack(benchmark::State& state) {

// Create a pool memory resource with 50% of GPU memory
rmm::mr::pool_memory_resource pool_mr{rmm::mr::cuda_memory_resource{}, pool_size};
rapidsmpf::BufferResource br{pool_mr};
rapidsmpf::BufferResource br{rapidsmpf::Statistics::disabled(), pool_mr};

// Create input table
auto table = create_int_table(num_rows, stream);
Expand Down Expand Up @@ -111,7 +111,7 @@ static void BM_PartitionAndPackCurrentImpl(benchmark::State& state) {

// Create a pool memory resource with 50% of GPU memory
rmm::mr::pool_memory_resource pool_mr{rmm::mr::cuda_memory_resource{}, pool_size};
rapidsmpf::BufferResource br{pool_mr};
rapidsmpf::BufferResource br{rapidsmpf::Statistics::disabled(), pool_mr};

// Create input table
auto table = create_int_table(num_rows, stream);
Expand Down
10 changes: 3 additions & 7 deletions cpp/benchmarks/bench_shuffle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -550,20 +550,16 @@ int main(int argc, char** argv) {
};
}

auto stats = std::make_shared<rapidsmpf::Statistics>(/* enable = */ true);
auto stats = rapidsmpf::Statistics::create();

// We're only going to measure the last run, so disable initially.
stats->disable();
rapidsmpf::BufferResource br{
stats,
stat_enabled_mr,
args.pinned_mem_disable ? rapidsmpf::PinnedMemoryResource::Disabled
: rapidsmpf::PinnedMemoryResource::make_if_available(),
std::move(memory_available),
std::chrono::milliseconds{1},
std::make_shared<rmm::cuda_stream_pool>(
16, rmm::cuda_stream::flags::non_blocking
),
stats
std::move(memory_available)
};

std::shared_ptr<rapidsmpf::Communicator> comm;
Expand Down
20 changes: 6 additions & 14 deletions cpp/benchmarks/streaming/bench_streaming_shuffle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,8 @@ int main(int argc, char** argv) {

// Initialize configuration options from environment variables.
rapidsmpf::config::Options options{rapidsmpf::config::get_environment_variables()};
auto progress_thread = std::make_shared<rapidsmpf::ProgressThread>();
auto stats = rapidsmpf::Statistics::from_options(options);
auto progress_thread = std::make_shared<rapidsmpf::ProgressThread>(stats);

std::shared_ptr<rapidsmpf::Communicator> comm;
if (args.comm_type == "mpi") {
Expand Down Expand Up @@ -363,20 +364,11 @@ int main(int argc, char** argv) {
};
}

auto stats = std::make_shared<rapidsmpf::Statistics>(/* enable = */ true);

auto pinned_mr = args.pinned_mem_disable
? rapidsmpf::PinnedMemoryResource::Disabled
: rapidsmpf::PinnedMemoryResource::make_if_available();
auto br = std::make_shared<rapidsmpf::BufferResource>(
stat_enabled_mr,
pinned_mr,
std::move(memory_available),
std::nullopt,
std::make_shared<rmm::cuda_stream_pool>(
16, rmm::cuda_stream::flags::non_blocking
),
stats
stats, stat_enabled_mr, pinned_mr, std::move(memory_available), std::nullopt
);

auto& log = *comm->logger();
Expand Down Expand Up @@ -410,7 +402,7 @@ int main(int argc, char** argv) {
for (std::uint64_t i = 0; i < total_num_runs; ++i) {
// Clear statistics before the last run so only the final run is reported.
if (i == total_num_runs - 1) {
ctx->statistics()->clear();
stats->clear();
}
double const elapsed = run(ctx, comm, args, stream).count();
std::stringstream ss;
Expand Down Expand Up @@ -458,13 +450,13 @@ int main(int argc, char** argv) {
}

if (args.enable_memory_profiler) {
log.print(ctx->statistics()->report({
log.print(stats->report({
.mr = stat_enabled_mr,
.pinned_mr = pinned_mr,
.header = "Statistics (of the last run):",
}));
} else {
log.print(ctx->statistics()->report({.header = "Statistics (of the last run):"}));
log.print(stats->report({.header = "Statistics (of the last run):"}));
}

if (!use_bootstrap) {
Expand Down
6 changes: 3 additions & 3 deletions cpp/benchmarks/streaming/ndsh/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ create_context(ProgramOptions& arguments, RmmResourceAdaptor&& mr) {
memory_available[MemoryType::DEVICE] =
LimitAvailableMemory{mr, static_cast<std::int64_t>(limit_size)};
}
auto statistics = std::make_shared<Statistics>(/* enable = */ true);
auto statistics = Statistics::create();

RAPIDSMPF_EXPECTS(
arguments.no_pinned_host_memory || is_pinned_memory_resources_supported(),
Expand All @@ -156,15 +156,15 @@ create_context(ProgramOptions& arguments, RmmResourceAdaptor&& mr) {
);

auto br = std::make_shared<BufferResource>(
statistics,
std::move(mr),
arguments.no_pinned_host_memory ? PinnedMemoryResource::Disabled
: PinnedMemoryResource::make_if_available(),
std::move(memory_available),
arguments.periodic_spill,
std::make_shared<rmm::cuda_stream_pool>(
arguments.num_streams, rmm::cuda_stream::flags::non_blocking
),
statistics
)
);
auto environment = config::get_environment_variables();
environment["NUM_STREAMING_THREADS"] =
Expand Down
4 changes: 2 additions & 2 deletions cpp/examples/example_shuffle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ int main(int argc, char** argv) {
rapidsmpf::config::Options options{rapidsmpf::config::get_environment_variables()};

// Create a statistics instance for the shuffler that tracks useful information.
auto stats = std::make_shared<rapidsmpf::Statistics>();
auto stats = rapidsmpf::Statistics::create();

// The communicator has a progress thread where the shuffler event loop executes. A
// single progress thread may be used by multiple shufflers simultaneously.
Expand All @@ -46,7 +46,7 @@ int main(int argc, char** argv) {
// We will use the same stream, memory, and buffer resource throughout the example.
rmm::cuda_stream_view stream = cudf::get_default_stream();
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref();
rapidsmpf::BufferResource br{mr};
rapidsmpf::BufferResource br{stats, mr};

// As input data, we use a helper function from the benchmark suite. It creates a
// random cudf table with 2 columns and 100 rows. In this example, each MPI rank
Expand Down
4 changes: 3 additions & 1 deletion cpp/include/rapidsmpf/bootstrap/ucxx.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ namespace bootstrap {
* @throws std::runtime_error if initialization fails.
*
* @code
* auto progress = std::make_shared<rapidsmpf::ProgressThread>();
* auto progress = std::make_shared<rapidsmpf::ProgressThread>(
* rapidsmpf::Statistics::disabled()
* );
* auto comm = rapidsmpf::bootstrap::create_ucxx_comm(progress);
* comm->logger().print("Hello from rank " + std::to_string(comm->rank()));
* @endcode
Expand Down
5 changes: 0 additions & 5 deletions cpp/include/rapidsmpf/coll/allgather.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include <rapidsmpf/memory/packed_data.hpp>
#include <rapidsmpf/memory/spill_manager.hpp>
#include <rapidsmpf/progress_thread.hpp>
#include <rapidsmpf/statistics.hpp>

/**
* @namespace rapidsmpf::coll
Expand Down Expand Up @@ -101,8 +100,6 @@ class AllGather {
* @param comm The communicator for communication.
* @param op_id Unique operation identifier for this allgather.
* @param br Buffer resource for memory allocation.
* @param statistics Statistics collection instance (disabled by
* default).
* @param finished_callback Optional callback run when partitions are locally
* finished. The callback is guaranteed to be called by the progress thread exactly
* once when the allgather is locally ready.
Expand All @@ -118,7 +115,6 @@ class AllGather {
std::shared_ptr<Communicator> comm,
OpID op_id,
BufferResource* br,
std::shared_ptr<Statistics> statistics = Statistics::disabled(),
std::function<void(void)>&& finished_callback = nullptr
);

Expand Down Expand Up @@ -195,7 +191,6 @@ class AllGather {

std::shared_ptr<Communicator> comm_; ///< Communicator
BufferResource* br_; ///< Buffer resource for memory allocation
std::shared_ptr<Statistics> statistics_; ///< Statistics collection instance
std::function<void(void)> finished_callback_{
nullptr
}; ///< Optional callback to run when allgather is finished and ready for extraction.
Expand Down
18 changes: 13 additions & 5 deletions cpp/include/rapidsmpf/memory/buffer_resource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ class BufferResource {
/**
* @brief Constructs a buffer resource.
*
* @param statistics The statistics instance to use. Pass `Statistics::disabled()`
* to opt out of statistics collection.
* @param device_mr The RMM device memory resource used for device allocations.
* Ownership is transferred to the BufferResource.
* @param pinned_mr The pinned host memory resource used for `MemoryType::PINNED_HOST`
Expand All @@ -85,16 +87,15 @@ class BufferResource {
* If `std::nullopt`, no periodic spill check is performed.
* @param stream_pool Pool of CUDA streams. Used throughout RapidsMPF for operations
* that do not take an explicit CUDA stream.
* @param statistics The statistics instance to use (disabled by default).
*/
BufferResource(
std::shared_ptr<Statistics> statistics,
cuda::mr::any_resource<cuda::mr::device_accessible> device_mr,
std::optional<PinnedMemoryResource> pinned_mr = PinnedMemoryResource::Disabled,
std::unordered_map<MemoryType, MemoryAvailable> memory_available = {},
std::optional<Duration> periodic_spill_check = std::chrono::milliseconds{1},
std::shared_ptr<rmm::cuda_stream_pool> stream_pool = std::make_shared<
rmm::cuda_stream_pool>(16, rmm::cuda_stream::flags::non_blocking),
std::shared_ptr<Statistics> statistics = Statistics::disabled()
rmm::cuda_stream_pool>(16, rmm::cuda_stream::flags::non_blocking)
);

/**
Expand All @@ -105,12 +106,17 @@ class BufferResource {
*
* @param mr The RMM resource adaptor.
* @param options Configuration options.
* @param statistics The statistics instance to use. Pass `Statistics::disabled()`
* to opt out of statistics collection. The caller is responsible for creating and
* owning this object.
*
* @return A shared pointer to a BufferResource instance configured according to the
* options.
*/
static std::shared_ptr<BufferResource> from_options(
RmmResourceAdaptor mr, config::Options options
RmmResourceAdaptor mr,
config::Options options,
std::shared_ptr<Statistics> statistics
);

~BufferResource() noexcept = default;
Expand Down Expand Up @@ -400,7 +406,7 @@ class BufferResource {
*
* @return Shared pointer the Statistics instance.
*/
std::shared_ptr<Statistics> statistics();
std::shared_ptr<Statistics> const& statistics() const noexcept;

private:
std::mutex mutex_;
Expand All @@ -415,6 +421,8 @@ class BufferResource {
std::shared_ptr<Statistics> statistics_;
};

static_assert(StatisticsProvider<BufferResource>);

/**
* @brief A functor for querying the remaining available memory within a defined limit
* from an RMM statistics resource.
Expand Down
9 changes: 6 additions & 3 deletions cpp/include/rapidsmpf/progress_thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,15 @@ class ProgressThread {
/**
* @brief Construct a new progress thread that can handle multiple functions.
*
* @param statistics The statistics instance to use (disabled by default).
* @param statistics The statistics instance to use. Pass `Statistics::disabled()`
* to opt out of statistics collection.
* @param sleep The duration to sleep between each progress loop iteration.
* If 0, the thread yields execution instead of sleeping. Anecdotally, a 1 us
* sleep time (the default) is sufficient to avoid starvation and get smooth
* progress.
*/
ProgressThread(
std::shared_ptr<Statistics> statistics = Statistics::disabled(),
std::shared_ptr<Statistics> statistics,
Duration sleep = std::chrono::microseconds{1}
);

Expand Down Expand Up @@ -181,7 +182,7 @@ class ProgressThread {
/**
* @brief @return The statistics instance on this progress thread.
*/
std::shared_ptr<Statistics> statistics() const noexcept;
std::shared_ptr<Statistics> const& statistics() const noexcept;

private:
/**
Expand All @@ -205,4 +206,6 @@ class ProgressThread {
detail::PausableThreadLoop thread_;
};

static_assert(StatisticsProvider<ProgressThread>);

} // namespace rapidsmpf
Loading
Loading