diff --git a/cpp/benchmarks/bench_comm.cpp b/cpp/benchmarks/bench_comm.cpp index 6fcb076af..d1fa2ae5f 100644 --- a/cpp/benchmarks/bench_comm.cpp +++ b/cpp/benchmarks/bench_comm.cpp @@ -317,16 +317,7 @@ int main(int argc, char** argv) { set_current_rmm_resource(args.rmm_mr); rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref(); - BufferResource br{ - mr, - PinnedMemoryResource::Disabled, - {}, - std::chrono::milliseconds{1}, - std::make_shared( - 16, rmm::cuda_stream::flags::non_blocking - ), - stats - }; + BufferResource br{stats, mr}; // Print benchmark/hardware info. { diff --git a/cpp/benchmarks/bench_partition.cpp b/cpp/benchmarks/bench_partition.cpp index ca286885b..24f971a45 100644 --- a/cpp/benchmarks/bench_partition.cpp +++ b/cpp/benchmarks/bench_partition.cpp @@ -60,7 +60,7 @@ static void BM_PartitionAndPack(benchmark::State& state) { // Create a pool memory resource with 50% of GPU memory rmm::mr::pool_memory_resource pool_mr{rmm::mr::cuda_memory_resource{}, pool_size}; - rapidsmpf::BufferResource br{pool_mr}; + rapidsmpf::BufferResource br{rapidsmpf::Statistics::disabled(), pool_mr}; // Create input table auto table = create_int_table(num_rows, stream); @@ -111,7 +111,7 @@ static void BM_PartitionAndPackCurrentImpl(benchmark::State& state) { // Create a pool memory resource with 50% of GPU memory rmm::mr::pool_memory_resource pool_mr{rmm::mr::cuda_memory_resource{}, pool_size}; - rapidsmpf::BufferResource br{pool_mr}; + rapidsmpf::BufferResource br{rapidsmpf::Statistics::disabled(), pool_mr}; // Create input table auto table = create_int_table(num_rows, stream); diff --git a/cpp/benchmarks/bench_shuffle.cpp b/cpp/benchmarks/bench_shuffle.cpp index 1744f488a..d4b630123 100644 --- a/cpp/benchmarks/bench_shuffle.cpp +++ b/cpp/benchmarks/bench_shuffle.cpp @@ -550,20 +550,16 @@ int main(int argc, char** argv) { }; } - auto stats = std::make_shared(/* enable = */ true); + auto stats = rapidsmpf::Statistics::create(); // We're only going to measure the last run, so disable initially. stats->disable(); rapidsmpf::BufferResource br{ + stats, stat_enabled_mr, args.pinned_mem_disable ? rapidsmpf::PinnedMemoryResource::Disabled : rapidsmpf::PinnedMemoryResource::make_if_available(), - std::move(memory_available), - std::chrono::milliseconds{1}, - std::make_shared( - 16, rmm::cuda_stream::flags::non_blocking - ), - stats + std::move(memory_available) }; std::shared_ptr comm; diff --git a/cpp/benchmarks/streaming/bench_streaming_shuffle.cpp b/cpp/benchmarks/streaming/bench_streaming_shuffle.cpp index 63a6cd164..3aae525f0 100644 --- a/cpp/benchmarks/streaming/bench_streaming_shuffle.cpp +++ b/cpp/benchmarks/streaming/bench_streaming_shuffle.cpp @@ -320,7 +320,8 @@ int main(int argc, char** argv) { // Initialize configuration options from environment variables. rapidsmpf::config::Options options{rapidsmpf::config::get_environment_variables()}; - auto progress_thread = std::make_shared(); + auto stats = rapidsmpf::Statistics::from_options(options); + auto progress_thread = std::make_shared(stats); std::shared_ptr comm; if (args.comm_type == "mpi") { @@ -363,20 +364,11 @@ int main(int argc, char** argv) { }; } - auto stats = std::make_shared(/* enable = */ true); - auto pinned_mr = args.pinned_mem_disable ? rapidsmpf::PinnedMemoryResource::Disabled : rapidsmpf::PinnedMemoryResource::make_if_available(); auto br = std::make_shared( - stat_enabled_mr, - pinned_mr, - std::move(memory_available), - std::nullopt, - std::make_shared( - 16, rmm::cuda_stream::flags::non_blocking - ), - stats + stats, stat_enabled_mr, pinned_mr, std::move(memory_available), std::nullopt ); auto& log = *comm->logger(); @@ -410,7 +402,7 @@ int main(int argc, char** argv) { for (std::uint64_t i = 0; i < total_num_runs; ++i) { // Clear statistics before the last run so only the final run is reported. if (i == total_num_runs - 1) { - ctx->statistics()->clear(); + stats->clear(); } double const elapsed = run(ctx, comm, args, stream).count(); std::stringstream ss; @@ -458,13 +450,13 @@ int main(int argc, char** argv) { } if (args.enable_memory_profiler) { - log.print(ctx->statistics()->report({ + log.print(stats->report({ .mr = stat_enabled_mr, .pinned_mr = pinned_mr, .header = "Statistics (of the last run):", })); } else { - log.print(ctx->statistics()->report({.header = "Statistics (of the last run):"})); + log.print(stats->report({.header = "Statistics (of the last run):"})); } if (!use_bootstrap) { diff --git a/cpp/benchmarks/streaming/ndsh/utils.cpp b/cpp/benchmarks/streaming/ndsh/utils.cpp index f1cb64b84..ff81ad1cc 100644 --- a/cpp/benchmarks/streaming/ndsh/utils.cpp +++ b/cpp/benchmarks/streaming/ndsh/utils.cpp @@ -143,7 +143,7 @@ create_context(ProgramOptions& arguments, RmmResourceAdaptor&& mr) { memory_available[MemoryType::DEVICE] = LimitAvailableMemory{mr, static_cast(limit_size)}; } - auto statistics = std::make_shared(/* enable = */ true); + auto statistics = Statistics::create(); RAPIDSMPF_EXPECTS( arguments.no_pinned_host_memory || is_pinned_memory_resources_supported(), @@ -156,6 +156,7 @@ create_context(ProgramOptions& arguments, RmmResourceAdaptor&& mr) { ); auto br = std::make_shared( + statistics, std::move(mr), arguments.no_pinned_host_memory ? PinnedMemoryResource::Disabled : PinnedMemoryResource::make_if_available(), @@ -163,8 +164,7 @@ create_context(ProgramOptions& arguments, RmmResourceAdaptor&& mr) { arguments.periodic_spill, std::make_shared( arguments.num_streams, rmm::cuda_stream::flags::non_blocking - ), - statistics + ) ); auto environment = config::get_environment_variables(); environment["NUM_STREAMING_THREADS"] = diff --git a/cpp/examples/example_shuffle.cpp b/cpp/examples/example_shuffle.cpp index a6b95dd89..0fc8edf85 100644 --- a/cpp/examples/example_shuffle.cpp +++ b/cpp/examples/example_shuffle.cpp @@ -27,7 +27,7 @@ int main(int argc, char** argv) { rapidsmpf::config::Options options{rapidsmpf::config::get_environment_variables()}; // Create a statistics instance for the shuffler that tracks useful information. - auto stats = std::make_shared(); + auto stats = rapidsmpf::Statistics::create(); // The communicator has a progress thread where the shuffler event loop executes. A // single progress thread may be used by multiple shufflers simultaneously. @@ -46,7 +46,7 @@ int main(int argc, char** argv) { // We will use the same stream, memory, and buffer resource throughout the example. rmm::cuda_stream_view stream = cudf::get_default_stream(); rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref(); - rapidsmpf::BufferResource br{mr}; + rapidsmpf::BufferResource br{stats, mr}; // As input data, we use a helper function from the benchmark suite. It creates a // random cudf table with 2 columns and 100 rows. In this example, each MPI rank diff --git a/cpp/include/rapidsmpf/bootstrap/ucxx.hpp b/cpp/include/rapidsmpf/bootstrap/ucxx.hpp index 52d7d7623..35718e1ab 100644 --- a/cpp/include/rapidsmpf/bootstrap/ucxx.hpp +++ b/cpp/include/rapidsmpf/bootstrap/ucxx.hpp @@ -41,7 +41,9 @@ namespace bootstrap { * @throws std::runtime_error if initialization fails. * * @code - * auto progress = std::make_shared(); + * auto progress = std::make_shared( + * rapidsmpf::Statistics::disabled() + * ); * auto comm = rapidsmpf::bootstrap::create_ucxx_comm(progress); * comm->logger().print("Hello from rank " + std::to_string(comm->rank())); * @endcode diff --git a/cpp/include/rapidsmpf/coll/allgather.hpp b/cpp/include/rapidsmpf/coll/allgather.hpp index d0c3684c5..70f135f93 100644 --- a/cpp/include/rapidsmpf/coll/allgather.hpp +++ b/cpp/include/rapidsmpf/coll/allgather.hpp @@ -23,7 +23,6 @@ #include #include #include -#include /** * @namespace rapidsmpf::coll @@ -101,8 +100,6 @@ class AllGather { * @param comm The communicator for communication. * @param op_id Unique operation identifier for this allgather. * @param br Buffer resource for memory allocation. - * @param statistics Statistics collection instance (disabled by - * default). * @param finished_callback Optional callback run when partitions are locally * finished. The callback is guaranteed to be called by the progress thread exactly * once when the allgather is locally ready. @@ -118,7 +115,6 @@ class AllGather { std::shared_ptr comm, OpID op_id, BufferResource* br, - std::shared_ptr statistics = Statistics::disabled(), std::function&& finished_callback = nullptr ); @@ -195,7 +191,6 @@ class AllGather { std::shared_ptr comm_; ///< Communicator BufferResource* br_; ///< Buffer resource for memory allocation - std::shared_ptr statistics_; ///< Statistics collection instance std::function finished_callback_{ nullptr }; ///< Optional callback to run when allgather is finished and ready for extraction. diff --git a/cpp/include/rapidsmpf/memory/buffer_resource.hpp b/cpp/include/rapidsmpf/memory/buffer_resource.hpp index fe0216cfa..739c088a7 100644 --- a/cpp/include/rapidsmpf/memory/buffer_resource.hpp +++ b/cpp/include/rapidsmpf/memory/buffer_resource.hpp @@ -69,6 +69,8 @@ class BufferResource { /** * @brief Constructs a buffer resource. * + * @param statistics The statistics instance to use. Pass `Statistics::disabled()` + * to opt out of statistics collection. * @param device_mr The RMM device memory resource used for device allocations. * Ownership is transferred to the BufferResource. * @param pinned_mr The pinned host memory resource used for `MemoryType::PINNED_HOST` @@ -85,16 +87,15 @@ class BufferResource { * If `std::nullopt`, no periodic spill check is performed. * @param stream_pool Pool of CUDA streams. Used throughout RapidsMPF for operations * that do not take an explicit CUDA stream. - * @param statistics The statistics instance to use (disabled by default). */ BufferResource( + std::shared_ptr statistics, cuda::mr::any_resource device_mr, std::optional pinned_mr = PinnedMemoryResource::Disabled, std::unordered_map memory_available = {}, std::optional periodic_spill_check = std::chrono::milliseconds{1}, std::shared_ptr stream_pool = std::make_shared< - rmm::cuda_stream_pool>(16, rmm::cuda_stream::flags::non_blocking), - std::shared_ptr statistics = Statistics::disabled() + rmm::cuda_stream_pool>(16, rmm::cuda_stream::flags::non_blocking) ); /** @@ -105,12 +106,17 @@ class BufferResource { * * @param mr The RMM resource adaptor. * @param options Configuration options. + * @param statistics The statistics instance to use. Pass `Statistics::disabled()` + * to opt out of statistics collection. The caller is responsible for creating and + * owning this object. * * @return A shared pointer to a BufferResource instance configured according to the * options. */ static std::shared_ptr from_options( - RmmResourceAdaptor mr, config::Options options + RmmResourceAdaptor mr, + config::Options options, + std::shared_ptr statistics ); ~BufferResource() noexcept = default; @@ -400,7 +406,7 @@ class BufferResource { * * @return Shared pointer the Statistics instance. */ - std::shared_ptr statistics(); + std::shared_ptr const& statistics() const noexcept; private: std::mutex mutex_; @@ -415,6 +421,8 @@ class BufferResource { std::shared_ptr statistics_; }; +static_assert(StatisticsProvider); + /** * @brief A functor for querying the remaining available memory within a defined limit * from an RMM statistics resource. diff --git a/cpp/include/rapidsmpf/progress_thread.hpp b/cpp/include/rapidsmpf/progress_thread.hpp index 3a566024a..3a7284857 100644 --- a/cpp/include/rapidsmpf/progress_thread.hpp +++ b/cpp/include/rapidsmpf/progress_thread.hpp @@ -117,14 +117,15 @@ class ProgressThread { /** * @brief Construct a new progress thread that can handle multiple functions. * - * @param statistics The statistics instance to use (disabled by default). + * @param statistics The statistics instance to use. Pass `Statistics::disabled()` + * to opt out of statistics collection. * @param sleep The duration to sleep between each progress loop iteration. * If 0, the thread yields execution instead of sleeping. Anecdotally, a 1 us * sleep time (the default) is sufficient to avoid starvation and get smooth * progress. */ ProgressThread( - std::shared_ptr statistics = Statistics::disabled(), + std::shared_ptr statistics, Duration sleep = std::chrono::microseconds{1} ); @@ -181,7 +182,7 @@ class ProgressThread { /** * @brief @return The statistics instance on this progress thread. */ - std::shared_ptr statistics() const noexcept; + std::shared_ptr const& statistics() const noexcept; private: /** @@ -205,4 +206,6 @@ class ProgressThread { detail::PausableThreadLoop thread_; }; +static_assert(StatisticsProvider); + } // namespace rapidsmpf diff --git a/cpp/include/rapidsmpf/statistics.hpp b/cpp/include/rapidsmpf/statistics.hpp index 79485fc80..e6da53fe0 100644 --- a/cpp/include/rapidsmpf/statistics.hpp +++ b/cpp/include/rapidsmpf/statistics.hpp @@ -4,6 +4,7 @@ */ #pragma once #include +#include #include #include #include @@ -20,6 +21,7 @@ #include #include +#include #include #include #include @@ -69,7 +71,7 @@ class StreamOrderedTiming; * std::cout << stats.report(); * @endcode */ -class Statistics { +class Statistics : public std::enable_shared_from_this { public: /** * @brief Identifies a predefined formatter used by `report()`. @@ -107,12 +109,13 @@ class Statistics { }; /** - * @brief Constructs a Statistics object without memory profiling. + * @brief Creates a Statistics instance. * * @param enabled If true, enables tracking of statistics. If false, all operations * are no-ops. + * @return A shared pointer to a new Statistics instance. */ - Statistics(bool enabled = true); + static std::shared_ptr create(bool enabled = true); /** * @brief Construct from configuration options. @@ -125,8 +128,8 @@ class Statistics { ~Statistics() noexcept; - // `Statistics` is owned exclusively through `std::shared_ptr` (see `disabled()` and - // `from_options()`). + // `Statistics` is owned exclusively through `std::shared_ptr`. Use `create()`, + // `disabled()`, or `from_options()` to construct instances. Statistics(Statistics const&) = delete; Statistics& operator=(Statistics const&) = delete; Statistics(Statistics&&) = delete; @@ -566,11 +569,14 @@ class Statistics { /** * @brief Constructs an active MemoryRecorder. * - * @param stats Pointer to Statistics object that will store the result. + * @param stats Shared pointer to the Statistics object that will store + * the result. Must not be null. * @param mr The RMM resource adaptor providing scoped memory statistics. * @param name Name of the scope. */ - MemoryRecorder(Statistics* stats, RmmResourceAdaptor mr, std::string name); + MemoryRecorder( + std::shared_ptr stats, RmmResourceAdaptor mr, std::string name + ); /** * @brief Destructor. @@ -586,9 +592,10 @@ class Statistics { MemoryRecorder& operator=(MemoryRecorder&&) = delete; private: - Statistics* stats_{ - nullptr - }; // TODO: make this shared_ptr using make_shared_from_this + // Holds the owning Statistics so the recorder can safely outlive any + // raw reference the caller might have. `nullptr` indicates a no-op + // recorder. + std::shared_ptr stats_; std::optional mr_; // optional because RmmResourceAdaptor is not default constructible std::string name_; @@ -624,6 +631,8 @@ class Statistics { Formatter formatter; }; + explicit Statistics(bool enabled); + mutable std::mutex mutex_; std::atomic enabled_; std::map stats_; @@ -631,6 +640,28 @@ class Statistics { std::unordered_map memory_records_; }; +/** + * @brief Satisfied by any type that exposes a `statistics()` method returning + * `std::shared_ptr const&`. + * + * Classes satisfying this concept are *statistics providers* — secondary + * classes that receive a provider as a constructor argument should derive their + * `Statistics` instance by calling `.statistics()` on it rather than accepting + * a separate `std::shared_ptr` argument. + * + * Returning by const-reference avoids the per-call atomic refcount bump that a + * by-value `std::shared_ptr` return would incur on hot paths. + * + * Each provider asserts this concept via `static_assert` in its own header. + * Current providers: `BufferResource`, `ProgressThread`, `streaming::Context`. + */ +template +concept StatisticsProvider = requires(T const& t) { + { + t.statistics() + } noexcept -> std::same_as const&>; +}; + /** * @brief Macro for automatic memory profiling of a code scope. * @@ -644,13 +675,15 @@ class Statistics { * * Example usage: * @code - * void foo(Statistics& stats, RmmResourceAdaptor& mr) { + * void foo(std::shared_ptr stats, RmmResourceAdaptor& mr) { * RAPIDSMPF_MEMORY_PROFILE(stats, mr); * RAPIDSMPF_MEMORY_PROFILE(stats, mr, "custom_name"); * } * @endcode * - * The first argument is a reference or pointer to a Statistics object. + * The first argument is a non-null `std::shared_ptr`. Pass + * `Statistics::disabled()` to disable recording (`create_memory_recorder` returns + * a no-op recorder when the statistics instance is disabled). * The second argument is the RMM resource adaptor (or `std::nullopt` for no-op). * The third argument (optional) is a custom function name string to use instead of * __func__. @@ -669,18 +702,15 @@ class Statistics { RAPIDSMPF_MEMORY_PROFILE_3(stats, mr, __func__) // Version with custom function name -#define RAPIDSMPF_MEMORY_PROFILE_3(stats, mr, funcname) \ - auto&& RAPIDSMPF_CONCAT(_rapidsmpf_stats_, __LINE__) = (stats); \ - auto const RAPIDSMPF_CONCAT(_rapidsmpf_memory_recorder_, __LINE__) = \ - (rapidsmpf::detail::to_pointer(RAPIDSMPF_CONCAT(_rapidsmpf_stats_, __LINE__))) \ - ? rapidsmpf::detail::to_pointer( \ - RAPIDSMPF_CONCAT(_rapidsmpf_stats_, __LINE__) \ - ) \ - -> create_memory_recorder( \ - (mr), \ - std::string(__FILE__) + ":" + RAPIDSMPF_STRINGIFY(__LINE__) + "(" \ - + std::string(funcname) + ")" \ - ) \ - : rapidsmpf::Statistics::MemoryRecorder {} +#define RAPIDSMPF_MEMORY_PROFILE_3(stats, mr, funcname) \ + RAPIDSMPF_EXPECTS( \ + (stats) != nullptr, "RAPIDSMPF_MEMORY_PROFILE: stats must not be null" \ + ); \ + auto const RAPIDSMPF_CONCAT(_rapidsmpf_memory_recorder_, __LINE__) = \ + (stats) -> create_memory_recorder( \ + (mr), \ + std::string(__FILE__) + ":" + RAPIDSMPF_STRINGIFY(__LINE__) + "(" \ + + std::string(funcname) + ")" \ + ) } // namespace rapidsmpf diff --git a/cpp/include/rapidsmpf/streaming/core/context.hpp b/cpp/include/rapidsmpf/streaming/core/context.hpp index 6c9dc4091..594cc518a 100644 --- a/cpp/include/rapidsmpf/streaming/core/context.hpp +++ b/cpp/include/rapidsmpf/streaming/core/context.hpp @@ -83,6 +83,9 @@ class Context { * @param logger The logger to use. * @param options Configuration options used to initialize the Context and its * components. + * @param statistics The statistics instance to use. Pass `Statistics::disabled()` + * to opt out of statistics collection. The caller is responsible for creating and + * owning this object. * @return A fully initialized Context. * * @throws std::invalid_argument If an option value is invalid. @@ -102,7 +105,8 @@ class Context { static std::shared_ptr from_options( RmmResourceAdaptor mr, std::shared_ptr logger, - config::Options options + config::Options options, + std::shared_ptr statistics ); // No copy constructor and assignment operator. @@ -179,7 +183,7 @@ class Context { * * @return Shared pointer to the statistics instance. */ - [[nodiscard]] std::shared_ptr statistics() const noexcept; + [[nodiscard]] std::shared_ptr const& statistics() const noexcept; /** * @brief Create a new channel associated with this context. @@ -231,4 +235,6 @@ class Context { SpillManager::SpillFunctionID spill_function_id_{}; }; +static_assert(StatisticsProvider); + } // namespace rapidsmpf::streaming diff --git a/cpp/src/coll/allgather.cpp b/cpp/src/coll/allgather.cpp index 31b3b2fdb..058c9104e 100644 --- a/cpp/src/coll/allgather.cpp +++ b/cpp/src/coll/allgather.cpp @@ -122,12 +122,10 @@ AllGather::AllGather( std::shared_ptr comm, OpID op_id, BufferResource* br, - std::shared_ptr statistics, std::function&& finished_callback ) : comm_{std::move(comm)}, br_{br}, - statistics_{std::move(statistics)}, finished_callback_{std::move(finished_callback)}, finish_counter_{comm_->nranks()}, op_id_{op_id}, diff --git a/cpp/src/memory/buffer_resource.cpp b/cpp/src/memory/buffer_resource.cpp index 34d0f0500..1ff711871 100644 --- a/cpp/src/memory/buffer_resource.cpp +++ b/cpp/src/memory/buffer_resource.cpp @@ -37,12 +37,12 @@ auto add_missing_availability_functions( } // namespace BufferResource::BufferResource( + std::shared_ptr statistics, cuda::mr::any_resource device_mr, std::optional pinned_mr, std::unordered_map memory_available, std::optional periodic_spill_check, - std::shared_ptr stream_pool, - std::shared_ptr statistics + std::shared_ptr stream_pool ) : device_mr_{std::move(device_mr)}, pinned_mr_{std::move(pinned_mr)}, @@ -58,7 +58,7 @@ BufferResource::BufferResource( } std::shared_ptr BufferResource::from_options( - RmmResourceAdaptor mr, config::Options options + RmmResourceAdaptor mr, config::Options options, std::shared_ptr statistics ) { auto pinned_mr = PinnedMemoryResource::from_options(options); auto mem_available = memory_available_from_options(mr, options); @@ -67,14 +67,13 @@ std::shared_ptr BufferResource::from_options( mem_available[MemoryType::PINNED_HOST] = pinned_mr->get_memory_available_cb(); } - auto statistics = Statistics::from_options(options); return std::make_shared( + std::move(statistics), std::move(mr), std::move(pinned_mr), std::move(mem_available), periodic_spill_check_from_options(options), - stream_pool_from_options(options), - std::move(statistics) + stream_pool_from_options(options) ); } @@ -267,7 +266,7 @@ SpillManager& BufferResource::spill_manager() { return spill_manager_; } -std::shared_ptr BufferResource::statistics() { +std::shared_ptr const& BufferResource::statistics() const noexcept { return statistics_; } diff --git a/cpp/src/progress_thread.cpp b/cpp/src/progress_thread.cpp index 42846b421..d4bbcbc02 100644 --- a/cpp/src/progress_thread.cpp +++ b/cpp/src/progress_thread.cpp @@ -115,7 +115,7 @@ bool ProgressThread::is_running() const { return active_; } -std::shared_ptr ProgressThread::statistics() const noexcept { +std::shared_ptr const& ProgressThread::statistics() const noexcept { return statistics_; } diff --git a/cpp/src/statistics.cpp b/cpp/src/statistics.cpp index df49a33a4..42156dc84 100644 --- a/cpp/src/statistics.cpp +++ b/cpp/src/statistics.cpp @@ -162,19 +162,24 @@ Statistics::~Statistics() noexcept { StreamOrderedTiming::cancel_inflight_timings(this); } -// TODO: remove this constructor and add a factory method, because -// make_shared(false) != Statistics::disabled() Statistics::Statistics(bool enabled) : enabled_{enabled} {} +std::shared_ptr Statistics::create(bool enabled) { + if (!enabled) { + return disabled(); + } + return std::shared_ptr(new Statistics(true)); +} + std::shared_ptr Statistics::from_options(config::Options options) { bool const statistics = options.get("statistics", [](auto const& s) { - return parse_string(s.empty() ? "False" : s); + return s.empty() ? false : parse_string(s); }); - return statistics ? std::make_shared(statistics) : Statistics::disabled(); + return statistics ? Statistics::create() : Statistics::disabled(); } std::shared_ptr Statistics::disabled() { - static std::shared_ptr ret = std::make_shared(false); + static std::shared_ptr ret{new Statistics(false)}; return ret; } @@ -251,15 +256,15 @@ void Statistics::clear() { } Statistics::MemoryRecorder::MemoryRecorder( - Statistics* stats, RmmResourceAdaptor mr, std::string name + std::shared_ptr stats, RmmResourceAdaptor mr, std::string name ) - : stats_{stats}, mr_{std::move(mr)}, name_{std::move(name)} { + : stats_{std::move(stats)}, mr_{std::move(mr)}, name_{std::move(name)} { RAPIDSMPF_EXPECTS(stats_ != nullptr, "the statistics cannot be null"); mr_->begin_scoped_memory_record(); } Statistics::MemoryRecorder::~MemoryRecorder() { - if (stats_ == nullptr) { + if (stats_ == nullptr || !stats_->enabled()) { return; } auto const scope = mr_->end_scoped_memory_record(); @@ -275,10 +280,10 @@ Statistics::MemoryRecorder Statistics::create_memory_recorder( std::optional mr, std::string name ) { auto* rma = get_optional_resource_as(mr); - if (!rma) { + if (!enabled() || !rma) { return MemoryRecorder{}; } - return MemoryRecorder{this, *rma, std::move(name)}; + return MemoryRecorder{shared_from_this(), *rma, std::move(name)}; } std::unordered_map const& @@ -485,7 +490,7 @@ void Statistics::write_json(std::filesystem::path const& filepath) const { std::shared_ptr Statistics::copy() const { std::lock_guard lock(mutex_); - auto ret = std::make_shared(enabled_.load(std::memory_order_acquire)); + auto ret = Statistics::create(enabled()); ret->stats_ = stats_; ret->report_entries_ = report_entries_; return ret; @@ -579,7 +584,7 @@ std::shared_ptr Statistics::deserialize(std::span(enabled != 0); + auto ret = Statistics::create(enabled != 0); std::uint64_t num_stats{}; data = read_pod(data, num_stats); @@ -656,7 +661,7 @@ std::shared_ptr Statistics::merge( bool const any_enabled = std::ranges::any_of(snapshots, [](auto const& s) { return s.enabled; }); - auto ret = std::make_shared(any_enabled); + auto ret = Statistics::create(any_enabled); for (auto const& snap : snapshots) { for (auto const& [name, stat] : snap.stats) { diff --git a/cpp/src/streaming/coll/allgather.cpp b/cpp/src/streaming/coll/allgather.cpp index 6d58bf40b..033e2ec0c 100644 --- a/cpp/src/streaming/coll/allgather.cpp +++ b/cpp/src/streaming/coll/allgather.cpp @@ -15,14 +15,12 @@ AllGather::AllGather( std::shared_ptr ctx, std::shared_ptr comm, OpID op_id ) : ctx_{std::move(ctx)}, - gatherer_{coll::AllGather( - std::move(comm), op_id, ctx_->br().get(), ctx_->statistics(), [this]() { - // Schedule waiters to resume on the executor. - // This doesn't resume the frame immediately so we don't have to track - // completion of this callback with a task_group. - event_.set(ctx_->executor()->get()); - } - )} {} + gatherer_{coll::AllGather(std::move(comm), op_id, ctx_->br().get(), [this]() { + // Schedule waiters to resume on the executor. + // This doesn't resume the frame immediately so we don't have to track + // completion of this callback with a task_group. + event_.set(ctx_->executor()->get()); + })} {} AllGather::~AllGather() noexcept { RAPIDSMPF_EXPECTS_FATAL( diff --git a/cpp/src/streaming/core/context.cpp b/cpp/src/streaming/core/context.cpp index ad8911eff..e42bb5c98 100644 --- a/cpp/src/streaming/core/context.cpp +++ b/cpp/src/streaming/core/context.cpp @@ -106,10 +106,14 @@ Context::Context( std::shared_ptr Context::from_options( RmmResourceAdaptor mr, std::shared_ptr logger, - config::Options options + config::Options options, + std::shared_ptr statistics ) { + RAPIDSMPF_EXPECTS(statistics != nullptr, "the statistics pointer cannot be NULL"); return std::make_shared( - options, std::move(logger), BufferResource::from_options(std::move(mr), options) + options, + std::move(logger), + BufferResource::from_options(std::move(mr), options, std::move(statistics)) ); } @@ -154,7 +158,7 @@ std::shared_ptr const& Context::memory( return memory_[static_cast(mem_type)]; } -std::shared_ptr Context::statistics() const noexcept { +std::shared_ptr const& Context::statistics() const noexcept { return br_->statistics(); } diff --git a/cpp/tests/main/mpi.cpp b/cpp/tests/main/mpi.cpp index a1fd1f2bf..4e23bde5e 100644 --- a/cpp/tests/main/mpi.cpp +++ b/cpp/tests/main/mpi.cpp @@ -7,6 +7,8 @@ #include #include +#include +#include #include "../environment.hpp" @@ -26,7 +28,11 @@ void Environment::SetUp() { options_ = rapidsmpf::config::Options(rapidsmpf::config::get_environment_variables()); comm_ = std::make_shared( - mpi_comm_, options_, std::make_shared() + mpi_comm_, + options_, + std::make_shared( + rapidsmpf::Statistics::from_options(options_) + ) ); } diff --git a/cpp/tests/main/single.cpp b/cpp/tests/main/single.cpp index a8c81ac2e..ddb672b4b 100644 --- a/cpp/tests/main/single.cpp +++ b/cpp/tests/main/single.cpp @@ -8,6 +8,7 @@ #include #include +#include #include "../environment.hpp" @@ -22,7 +23,10 @@ TestEnvironmentType Environment::type() const { void Environment::SetUp() { options_ = rapidsmpf::config::Options(rapidsmpf::config::get_environment_variables()); comm_ = std::make_shared( - options_, std::make_shared() + options_, + std::make_shared( + rapidsmpf::Statistics::from_options(options_) + ) ); split_comm_ = comm_; } diff --git a/cpp/tests/main/ucxx.cpp b/cpp/tests/main/ucxx.cpp index 82e891a58..9d3a5589a 100644 --- a/cpp/tests/main/ucxx.cpp +++ b/cpp/tests/main/ucxx.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include "../environment.hpp" @@ -36,7 +37,11 @@ void Environment::SetUp() { options_ = rapidsmpf::config::Options(rapidsmpf::config::get_environment_variables()); comm_ = rapidsmpf::ucxx::init_using_mpi( - MPI_COMM_WORLD, options_, std::make_shared() + MPI_COMM_WORLD, + options_, + std::make_shared( + rapidsmpf::Statistics::from_options(options_) + ) ); } diff --git a/cpp/tests/streaming/base_streaming_fixture.hpp b/cpp/tests/streaming/base_streaming_fixture.hpp index d6d04c44b..8b35b54fd 100644 --- a/cpp/tests/streaming/base_streaming_fixture.hpp +++ b/cpp/tests/streaming/base_streaming_fixture.hpp @@ -46,7 +46,10 @@ class BaseStreamingFixture : public ::testing::Test { stream = cudf::get_default_stream(); br = std::make_shared( - mr_cuda, rapidsmpf::PinnedMemoryResource::Disabled, memory_available + rapidsmpf::Statistics::disabled(), + mr_cuda, + rapidsmpf::PinnedMemoryResource::Disabled, + memory_available ); ctx = std::make_shared( std::move(options), GlobalEnvironment->comm_->logger(), br diff --git a/cpp/tests/streaming/test_channel_metadata.cpp b/cpp/tests/streaming/test_channel_metadata.cpp index 603ae2006..bfa958cbc 100644 --- a/cpp/tests/streaming/test_channel_metadata.cpp +++ b/cpp/tests/streaming/test_channel_metadata.cpp @@ -166,7 +166,9 @@ TEST_F(StreamingChannelMetadata, MessageRoundTrip) { class StreamingChannelMetadataGPU : public ::testing::Test { protected: rmm::cuda_stream_view stream{cudf::get_default_stream()}; - rapidsmpf::BufferResource br{cudf::get_current_device_resource_ref()}; + rapidsmpf::BufferResource br{ + rapidsmpf::Statistics::disabled(), cudf::get_current_device_resource_ref() + }; std::shared_ptr make_chunk(std::vector vals) { rmm::device_buffer buf(vals.data(), vals.size() * sizeof(int32_t), stream); diff --git a/cpp/tests/streaming/test_fanout.cpp b/cpp/tests/streaming/test_fanout.cpp index a9fb0f3de..d7774abf7 100644 --- a/cpp/tests/streaming/test_fanout.cpp +++ b/cpp/tests/streaming/test_fanout.cpp @@ -547,13 +547,14 @@ class SpillingStreamingFanout : public BaseStreamingFixture { { {MemoryType::DEVICE, []() -> std::int64_t { return 0; }}, }; - br = std::make_shared( - mr_cuda, rapidsmpf::PinnedMemoryResource::Disabled, memory_available + br = std::make_shared( + Statistics::disabled(), + mr_cuda, + PinnedMemoryResource::Disabled, + memory_available ); auto options = ctx->options(); - ctx = std::make_shared( - options, GlobalEnvironment->comm_->logger(), br - ); + ctx = std::make_shared(options, GlobalEnvironment->comm_->logger(), br); } }; diff --git a/cpp/tests/streaming/test_message.cpp b/cpp/tests/streaming/test_message.cpp index c34207896..9d9272fcd 100644 --- a/cpp/tests/streaming/test_message.cpp +++ b/cpp/tests/streaming/test_message.cpp @@ -9,6 +9,7 @@ #include #include +#include #include #include @@ -18,7 +19,9 @@ using namespace rapidsmpf::streaming; class StreamingMessage : public ::testing::Test { protected: void SetUp() override { - br = std::make_unique(cudf::get_current_device_resource_ref()); + br = std::make_unique( + Statistics::disabled(), cudf::get_current_device_resource_ref() + ); stream = cudf::get_default_stream(); } diff --git a/cpp/tests/streaming/test_table_chunk.cpp b/cpp/tests/streaming/test_table_chunk.cpp index b201e4ff3..3122e4e33 100644 --- a/cpp/tests/streaming/test_table_chunk.cpp +++ b/cpp/tests/streaming/test_table_chunk.cpp @@ -29,36 +29,22 @@ using namespace rapidsmpf; using namespace rapidsmpf::streaming; class StreamingTableChunk : public BaseStreamingFixture, - public ::testing::WithParamInterface { + public ::testing::WithParamInterface { protected: void SetUp() override { - rapidsmpf::config::Options options( - rapidsmpf::config::get_environment_variables() - ); + config::Options options(config::get_environment_variables()); - std::unordered_map - memory_available{}; - auto stream_pool = std::make_shared( - 16, rmm::cuda_stream::flags::non_blocking - ); stream = cudf::get_default_stream(); - br = std::make_shared( - mr_cuda, // device_mr - rapidsmpf::PinnedMemoryResource::make_if_available(), // pinned_mr - memory_available, // memory_available - std::chrono::milliseconds{1}, // periodic_spill_check - stream_pool, // stream_pool - Statistics::disabled() // statistics - ); - ctx = std::make_shared( - options, GlobalEnvironment->comm_->logger(), br + br = std::make_shared( + Statistics::disabled(), mr_cuda, PinnedMemoryResource::make_if_available() ); + ctx = std::make_shared(options, GlobalEnvironment->comm_->logger(), br); } rmm::cuda_stream_view stream; rmm::mr::cuda_memory_resource mr_cuda; - std::shared_ptr br; - std::shared_ptr ctx; + std::shared_ptr br; + std::shared_ptr ctx; }; TEST_F(StreamingTableChunk, FromTable) { diff --git a/cpp/tests/test_allgather.cpp b/cpp/tests/test_allgather.cpp index 4320cc1b8..2d6e5156a 100644 --- a/cpp/tests/test_allgather.cpp +++ b/cpp/tests/test_allgather.cpp @@ -34,7 +34,9 @@ class BaseAllGatherTest : public ::testing::Test { protected: void SetUp() override { stream = cudf::get_default_stream(); - br = std::make_unique(rmm::mr::cuda_memory_resource{}); + br = std::make_unique( + rapidsmpf::Statistics::disabled(), rmm::mr::cuda_memory_resource{} + ); } void TearDown() override { @@ -295,6 +297,7 @@ TEST_F(BaseAllGatherTest, opid_reuse) { if (this_rank == 0) { // Recreate the buffer resource and allgather with the delayed MR. delay_br = std::make_unique( + rapidsmpf::Statistics::disabled(), DelayedMemoryResource{br->device_mr(), std::chrono::milliseconds(500)} ); allgather = @@ -363,7 +366,9 @@ TEST_F(BaseAllGatherTest, opid_reuse) { TEST(PostBox, spill_uses_remaining_amount) { auto stream = cudf::get_default_stream(); auto mr = std::make_unique(); - auto br = std::make_unique(*mr); + auto br = std::make_unique( + rapidsmpf::Statistics::disabled(), *mr + ); rapidsmpf::coll::detail::PostBox postbox; diff --git a/cpp/tests/test_allreduce.cu b/cpp/tests/test_allreduce.cu index 2f047d204..65df2a6eb 100644 --- a/cpp/tests/test_allreduce.cu +++ b/cpp/tests/test_allreduce.cu @@ -28,6 +28,7 @@ #include #include #include +#include #include "environment.hpp" @@ -207,7 +208,9 @@ class BaseAllReduceTest : public ::testing::Test { protected: void SetUp() override { mr = std::make_unique(); - br = std::make_unique(*mr); + br = std::make_unique( + rapidsmpf::Statistics::disabled(), *mr + ); comm = GlobalEnvironment->comm_.get(); } diff --git a/cpp/tests/test_buffer.cpp b/cpp/tests/test_buffer.cpp index 7a5f3e4ac..3fb40adb2 100644 --- a/cpp/tests/test_buffer.cpp +++ b/cpp/tests/test_buffer.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include "utils.hpp" @@ -54,6 +55,7 @@ class BufferRebindStreamTest : public ::testing::TestWithParam { } br = std::make_unique( + Statistics::disabled(), cudf::get_current_device_resource_ref(), PinnedMemoryResource::make_if_available(), std::unordered_map{}, diff --git a/cpp/tests/test_buffer_resource.cpp b/cpp/tests/test_buffer_resource.cpp index 2545497b8..ddcd0ef2e 100644 --- a/cpp/tests/test_buffer_resource.cpp +++ b/cpp/tests/test_buffer_resource.cpp @@ -54,6 +54,7 @@ TEST(BufferResource, ReservationOverbooking) { // Create a buffer resource that always have 10 KiB of available device memory. auto dev_mem_available = []() -> std::int64_t { return 10_KiB; }; BufferResource br{ + Statistics::disabled(), cudf::get_current_device_resource_ref(), PinnedMemoryResource::Disabled, {{MemoryType::DEVICE, dev_mem_available}} @@ -122,6 +123,7 @@ TEST(BufferResource, ReservationReleasing) { // memory. auto dev_mem_available = []() -> std::int64_t { return 10_KiB; }; BufferResource br{ + Statistics::disabled(), cudf::get_current_device_resource_ref(), PinnedMemoryResource::Disabled, {{MemoryType::DEVICE, dev_mem_available}, {MemoryType::HOST, dev_mem_available}} @@ -177,7 +179,10 @@ TEST(BufferResource, LimitAvailableMemory) { // Create a buffer resource that limit available device memory to 10 KiB. LimitAvailableMemory dev_mem_available{mr, 10_KiB}; BufferResource br{ - mr, PinnedMemoryResource::Disabled, {{MemoryType::DEVICE, dev_mem_available}} + Statistics::disabled(), + mr, + PinnedMemoryResource::Disabled, + {{MemoryType::DEVICE, dev_mem_available}} }; EXPECT_EQ(dev_mem_available(), 10_KiB); EXPECT_EQ(br.memory_reserved(MemoryType::DEVICE), 0); @@ -262,7 +267,10 @@ TEST_P(PinnedMaxPoolSizeReservationLimitTest, TwoReservations) { ASSERT_NE(pinned_mr, PinnedMemoryResource::Disabled); BufferResource br{ - mr, pinned_mr, {{MemoryType::PINNED_HOST, pinned_mr->get_memory_available_cb()}} + Statistics::disabled(), + mr, + pinned_mr, + {{MemoryType::PINNED_HOST, pinned_mr->get_memory_available_cb()}} }; // First 1 KiB reservation always succeeds. @@ -289,15 +297,15 @@ INSTANTIATE_TEST_SUITE_P( TEST(BufferResource, AllocStatistics) { rmm::mr::cuda_memory_resource mr_cuda; RmmResourceAdaptor mr{mr_cuda}; - auto stats = std::make_shared(/* enable = */ true); + auto stats = Statistics::create(); auto pinned_mr = PinnedMemoryResource::make_if_available(); BufferResource br{ + stats, mr, pinned_mr, {}, std::nullopt, - std::make_shared(1, rmm::cuda_stream::flags::non_blocking), - stats + std::make_shared(1, rmm::cuda_stream::flags::non_blocking) }; auto stream = cudf::get_default_stream(); @@ -358,6 +366,7 @@ class BufferResourceReserveOrFailTest : public ::testing::Test { // host memory. mr = std::make_unique(rmm::mr::cuda_memory_resource{}); br = std::make_unique( + Statistics::disabled(), *mr, PinnedMemoryResource::Disabled, std::unordered_map{ @@ -422,7 +431,9 @@ TEST_F(BufferResourceReserveOrFailTest, MultipleTypes) { class BaseBufferResourceCopyTest : public ::testing::Test { protected: void SetUp() override { - br = std::make_unique(cudf::get_current_device_resource_ref()); + br = std::make_unique( + Statistics::disabled(), cudf::get_current_device_resource_ref() + ); stream = cudf::get_default_stream(); // initialize the host pattern @@ -644,11 +655,11 @@ class BufferResourceDifferentResourcesTest : public ::testing::Test { // Setup br1 with statistics for its device memory mr1 = std::make_unique(rmm::mr::cuda_memory_resource{}); - br1 = std::make_unique(*mr1); + br1 = std::make_unique(Statistics::disabled(), *mr1); // Setup br2 with statistics for its device memory mr2 = std::make_unique(rmm::mr::cuda_memory_resource{}); - br2 = std::make_unique(*mr2); + br2 = std::make_unique(Statistics::disabled(), *mr2); } std::unique_ptr create_source_buffer() { diff --git a/cpp/tests/test_chunk.cpp b/cpp/tests/test_chunk.cpp index 77af94a82..eb9bf7cfb 100644 --- a/cpp/tests/test_chunk.cpp +++ b/cpp/tests/test_chunk.cpp @@ -16,6 +16,7 @@ #include #include #include +#include using namespace rapidsmpf; using namespace rapidsmpf::shuffler; @@ -24,7 +25,9 @@ using namespace rapidsmpf::shuffler::detail; class ChunkTest : public ::testing::Test { protected: void SetUp() override { - br = std::make_unique(cudf::get_current_device_resource_ref()); + br = std::make_unique( + Statistics::disabled(), cudf::get_current_device_resource_ref() + ); stream = cudf::get_default_stream(); } diff --git a/cpp/tests/test_communicator.cpp b/cpp/tests/test_communicator.cpp index 055b5a71d..e17af018a 100644 --- a/cpp/tests/test_communicator.cpp +++ b/cpp/tests/test_communicator.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include "environment.hpp" #include "utils.hpp" @@ -24,7 +25,9 @@ class BaseCommunicatorTest : public ::testing::Test { void SetUp() override { comm = GlobalEnvironment->comm_.get(); mr = std::make_unique(); - br = std::make_unique(*mr); + br = std::make_unique( + rapidsmpf::Statistics::disabled(), *mr + ); stream = rmm::cuda_stream_default; } diff --git a/cpp/tests/test_config.cpp b/cpp/tests/test_config.cpp index 61d948172..0db3d9b9d 100644 --- a/cpp/tests/test_config.cpp +++ b/cpp/tests/test_config.cpp @@ -653,7 +653,7 @@ TEST(OptionsTest, BufferResourceFromOptionsCreatesInstanceWithExplicitOptions) { rmm::mr::cuda_memory_resource cuda_mr; RmmResourceAdaptor mr{cuda_mr}; - auto br = BufferResource::from_options(mr, opts); + auto br = BufferResource::from_options(mr, opts, Statistics::from_options(opts)); EXPECT_TRUE(br->statistics()->enabled()); EXPECT_EQ(br->stream_pool().get_pool_size(), 8); @@ -666,7 +666,7 @@ TEST(OptionsTest, BufferResourceFromOptionsUsesDefaultWhenOptionsEmpty) { rmm::mr::cuda_memory_resource cuda_mr; RmmResourceAdaptor mr{cuda_mr}; - auto br = BufferResource::from_options(mr, opts); + auto br = BufferResource::from_options(mr, opts, Statistics::from_options(opts)); EXPECT_FALSE(br->statistics()->enabled()); EXPECT_EQ(br->stream_pool().get_pool_size(), 16); auto [_, total_mem] = rmm::available_device_memory(); @@ -681,7 +681,7 @@ TEST(OptionsTest, BufferResourceFromOptionsEnablesStatisticsWhenRequested) { rmm::mr::cuda_memory_resource cuda_mr; RmmResourceAdaptor mr{cuda_mr}; - auto br = BufferResource::from_options(mr, opts); + auto br = BufferResource::from_options(mr, opts, Statistics::from_options(opts)); EXPECT_TRUE(br->statistics()->enabled()); } @@ -694,7 +694,7 @@ TEST(OptionsTest, BufferResourceFromOptionsAcceptsPercentageForDeviceLimit) { rmm::mr::cuda_memory_resource cuda_mr; RmmResourceAdaptor mr{cuda_mr}; - auto br = BufferResource::from_options(mr, opts); + auto br = BufferResource::from_options(mr, opts, Statistics::from_options(opts)); // Verify device memory limit is 50% of total auto [_, total_mem] = rmm::available_device_memory(); @@ -713,7 +713,7 @@ TEST(OptionsTest, BufferResourceFromOptionsEnablesPinnedMemoryWhenSupported) { rmm::mr::cuda_memory_resource cuda_mr; RmmResourceAdaptor mr{cuda_mr}; - auto br = BufferResource::from_options(mr, opts); + auto br = BufferResource::from_options(mr, opts, Statistics::from_options(opts)); // Should not throw when accessing pinned_mr EXPECT_NO_THROW(std::ignore = br->pinned_mr()); @@ -729,9 +729,9 @@ TEST(OptionsTest, ContextFromOptionsCreatesInstanceWithExplicitOptions) { rmm::mr::cuda_memory_resource cuda_mr; RmmResourceAdaptor mr{cuda_mr}; - auto comm = - std::make_shared(opts, std::make_shared()); - auto ctx = streaming::Context::from_options(mr, comm->logger(), opts); + auto stats = Statistics::from_options(opts); + auto comm = std::make_shared(opts, std::make_shared(stats)); + auto ctx = streaming::Context::from_options(mr, comm->logger(), opts, stats); ASSERT_NE(ctx, nullptr); EXPECT_TRUE(ctx->statistics()->enabled()); @@ -743,9 +743,9 @@ TEST(OptionsTest, ContextFromOptionsUsesDefaultWhenOptionsEmpty) { rmm::mr::cuda_memory_resource cuda_mr; RmmResourceAdaptor mr{cuda_mr}; - auto comm = - std::make_shared(opts, std::make_shared()); - auto ctx = streaming::Context::from_options(mr, comm->logger(), opts); + auto stats = Statistics::from_options(opts); + auto comm = std::make_shared(opts, std::make_shared(stats)); + auto ctx = streaming::Context::from_options(mr, comm->logger(), opts, stats); ASSERT_NE(ctx, nullptr); EXPECT_FALSE(ctx->statistics()->enabled()); @@ -758,9 +758,9 @@ TEST(OptionsTest, ContextFromOptionsEnablesStatisticsWhenRequested) { rmm::mr::cuda_memory_resource cuda_mr; RmmResourceAdaptor mr{cuda_mr}; - auto comm = - std::make_shared(opts, std::make_shared()); - auto ctx = streaming::Context::from_options(mr, comm->logger(), opts); + auto stats = Statistics::from_options(opts); + auto comm = std::make_shared(opts, std::make_shared(stats)); + auto ctx = streaming::Context::from_options(mr, comm->logger(), opts, stats); ASSERT_NE(ctx, nullptr); EXPECT_TRUE(ctx->statistics()->enabled()); @@ -771,9 +771,9 @@ TEST(OptionsTest, ContextFromOptionsCreatesProgressThread) { rmm::mr::cuda_memory_resource cuda_mr; RmmResourceAdaptor mr{cuda_mr}; - auto comm = - std::make_shared(opts, std::make_shared()); - auto ctx = streaming::Context::from_options(mr, comm->logger(), opts); + auto stats = Statistics::from_options(opts); + auto comm = std::make_shared(opts, std::make_shared(stats)); + auto ctx = streaming::Context::from_options(mr, comm->logger(), opts, stats); ASSERT_NE(ctx, nullptr); } @@ -783,9 +783,9 @@ TEST(OptionsTest, ContextFromOptionsCreatesExecutor) { rmm::mr::cuda_memory_resource cuda_mr; RmmResourceAdaptor mr{cuda_mr}; - auto comm = - std::make_shared(opts, std::make_shared()); - auto ctx = streaming::Context::from_options(mr, comm->logger(), opts); + auto stats = Statistics::from_options(opts); + auto comm = std::make_shared(opts, std::make_shared(stats)); + auto ctx = streaming::Context::from_options(mr, comm->logger(), opts, stats); ASSERT_NE(ctx, nullptr); EXPECT_NE(ctx->executor(), nullptr); diff --git a/cpp/tests/test_metadata_payload_exchange.cpp b/cpp/tests/test_metadata_payload_exchange.cpp index d6b166d0a..8c4b70fc2 100644 --- a/cpp/tests/test_metadata_payload_exchange.cpp +++ b/cpp/tests/test_metadata_payload_exchange.cpp @@ -27,9 +27,9 @@ class MetadataPayloadExchangeTest : public ::testing::Test { void SetUp() override { comm = GlobalEnvironment->comm_.get(); mr = std::make_unique(); - br = std::make_unique(*mr); + statistics = Statistics::create(); + br = std::make_unique(statistics, *mr); stream = rmm::cuda_stream_default; - statistics = std::make_shared(); auto allocate_fn = [this](std::size_t size) { return allocate_receive_buffer(size); diff --git a/cpp/tests/test_partition.cpp b/cpp/tests/test_partition.cpp index 9bae901b4..d59215eda 100644 --- a/cpp/tests/test_partition.cpp +++ b/cpp/tests/test_partition.cpp @@ -44,7 +44,7 @@ TEST_P(NumOfPartitions, partition_and_pack) { std::int64_t const seed = 42; cudf::hash_id const hash_fn = cudf::hash_id::HASH_MURMUR3; auto stream = cudf::get_default_stream(); - rapidsmpf::BufferResource br{mr()}; + BufferResource br{Statistics::disabled(), mr()}; cudf::table expect = random_table_with_index(seed, static_cast(num_rows), 0, 10); @@ -72,7 +72,7 @@ TEST_P(NumOfPartitions, split_and_pack) { int const num_rows = std::get<1>(GetParam()); std::int64_t const seed = 42; auto stream = cudf::get_default_stream(); - rapidsmpf::BufferResource br{cudf::get_current_device_resource_ref()}; + BufferResource br{Statistics::disabled(), cudf::get_current_device_resource_ref()}; cudf::table expect = random_table_with_index(seed, num_rows, 0, 10); @@ -99,7 +99,9 @@ TEST_P(NumOfPartitions, split_and_pack) { class SpillingTest : public ::testing::Test { protected: void SetUp() override { - br = std::make_unique(cudf::get_current_device_resource_ref()); + br = std::make_unique( + Statistics::disabled(), cudf::get_current_device_resource_ref() + ); stream = cudf::get_default_stream(); } diff --git a/cpp/tests/test_progress_thread.cpp b/cpp/tests/test_progress_thread.cpp index 38d8e8e9c..82a3dde1c 100644 --- a/cpp/tests/test_progress_thread.cpp +++ b/cpp/tests/test_progress_thread.cpp @@ -41,7 +41,7 @@ TEST_P(ProgressThreadEvents, events) { std::size_t const num_functions = std::get<1>(GetParam()); bool const enable_statistics = std::get<2>(GetParam()); - auto statistics = std::make_shared(enable_statistics); + auto statistics = rapidsmpf::Statistics::create(enable_statistics); std::vector> progress_threads; std::vector>> test_functions(num_threads); @@ -86,7 +86,7 @@ TEST_P(ProgressThreadEvents, events) { } TEST(ProgressThreadTests, RemoveFunctionWithDelayedPause) { - ProgressThread progress_thread{}; + ProgressThread progress_thread{rapidsmpf::Statistics::disabled()}; // add a function to the progress thread that never completes auto id = progress_thread.add_function([] { diff --git a/cpp/tests/test_shuffler.cpp b/cpp/tests/test_shuffler.cpp index 51f7030f0..8966bf130 100644 --- a/cpp/tests/test_shuffler.cpp +++ b/cpp/tests/test_shuffler.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include "environment.hpp" @@ -32,7 +33,9 @@ extern Environment* GlobalEnvironment; TEST(ReceivedChunks, spill_skips_control_messages) { auto mr = cudf::get_current_device_resource_ref(); - auto br = std::make_unique(mr); + auto br = std::make_unique( + rapidsmpf::Statistics::disabled(), mr + ); rapidsmpf::shuffler::detail::ReceivedChunks received; @@ -49,7 +52,9 @@ TEST(ReceivedChunks, spill_skips_control_messages) { TEST(ReceivedChunks, spill_respects_amount) { auto mr = cudf::get_current_device_resource_ref(); - auto br = std::make_unique(mr); + auto br = std::make_unique( + rapidsmpf::Statistics::disabled(), mr + ); auto stream = cudf::get_default_stream(); rapidsmpf::shuffler::detail::ReceivedChunks received; @@ -75,7 +80,9 @@ TEST(ReceivedChunks, spill_respects_amount) { TEST(MetadataMessage, round_trip) { auto stream = cudf::get_default_stream(); auto mr = cudf::get_current_device_resource_ref(); - auto br = std::make_unique(mr); + auto br = std::make_unique( + rapidsmpf::Statistics::disabled(), mr + ); auto metadata = iota_vector(100); @@ -239,7 +246,10 @@ class MemoryAvailable_NumPartition total_num_partitions = std::get<1>(GetParam()); total_num_rows = std::get<2>(GetParam()); br = std::make_unique( - mr(), rapidsmpf::PinnedMemoryResource::Disabled, memory_available + rapidsmpf::Statistics::disabled(), + mr(), + rapidsmpf::PinnedMemoryResource::Disabled, + memory_available ); shuffler = std::make_unique( @@ -310,7 +320,9 @@ class ConcurrentShuffleTest static_cast(std::get<1>(GetParam())); // these resources will be used by multiple threads to instantiate shufflers - br = std::make_unique(mr()); + br = std::make_unique( + rapidsmpf::Statistics::disabled(), mr() + ); stream = cudf::get_default_stream(); } @@ -411,6 +423,7 @@ TEST(Shuffler, SpillOnInsertAndExtraction) { // through the variable `device_memory_available`. std::int64_t device_memory_available{0}; rapidsmpf::BufferResource br{ + rapidsmpf::Statistics::disabled(), mr, rapidsmpf::PinnedMemoryResource::Disabled, {{rapidsmpf::MemoryType::DEVICE, @@ -678,7 +691,9 @@ TEST(Shuffler, ShutdownWhilePaused) { auto progress_thread = GlobalEnvironment->comm_->progress_thread(); auto mr = cudf::get_current_device_resource_ref(); - auto br = std::make_unique(mr); + auto br = std::make_unique( + rapidsmpf::Statistics::disabled(), mr + ); auto shuffler = rapidsmpf::shuffler::Shuffler(GlobalEnvironment->comm_, 0, 1, br.get()); @@ -713,7 +728,9 @@ class ExtractEmptyPartitionsTest : public cudf::test::BaseFixture { void SetUp() override { stream = cudf::get_default_stream(); - br = std::make_unique(mr()); + br = std::make_unique( + rapidsmpf::Statistics::disabled(), mr() + ); shuffler = std::make_unique( GlobalEnvironment->comm_, 0, nparts, br.get() @@ -812,7 +829,9 @@ TEST_F(ExtractEmptyPartitionsTest, SomeEmptyAndNonEmptyInsertions) { TEST(ShufflerTest, multiple_shutdowns) { auto& comm = GlobalEnvironment->comm_; - rapidsmpf::BufferResource br(cudf::get_current_device_resource_ref()); + rapidsmpf::BufferResource br( + rapidsmpf::Statistics::disabled(), cudf::get_current_device_resource_ref() + ); auto shuffler = std::make_unique(comm, 0, comm->nranks(), &br); @@ -836,7 +855,9 @@ TEST(ShufflerTest, multiple_shutdowns) { TEST(Shuffler, concurrent_wait) { auto const& comm = GlobalEnvironment->comm_; auto stream = cudf::get_default_stream(); - rapidsmpf::BufferResource br(cudf::get_current_device_resource_ref()); + rapidsmpf::BufferResource br( + rapidsmpf::Statistics::disabled(), cudf::get_current_device_resource_ref() + ); // Use more partitions than ranks so each rank owns multiple partitions, ensuring // multiple threads call wait() concurrently on the same shuffler. @@ -940,7 +961,9 @@ TEST(Shuffler, opid_reuse) { constexpr auto wait_timeout = std::chrono::seconds{30}; rmm::mr::cuda_memory_resource mr; - auto br = std::make_unique(mr); + auto br = std::make_unique( + rapidsmpf::Statistics::disabled(), mr + ); // On rank 0, wrap the device MR with a delayed version for the shuffler. std::unique_ptr delayed_mr; @@ -949,7 +972,9 @@ TEST(Shuffler, opid_reuse) { if (comm->rank() == 0) { delayed_mr = std::make_unique(mr, std::chrono::milliseconds(500)); - delayed_br = std::make_unique(*delayed_mr); + delayed_br = std::make_unique( + rapidsmpf::Statistics::disabled(), *delayed_mr + ); shuffler_br = delayed_br.get(); } @@ -1047,7 +1072,9 @@ TEST(Shuffler, opid_reuse_with_empty_partitions) { constexpr auto wait_timeout = std::chrono::seconds{30}; rmm::mr::cuda_memory_resource mr; - auto br = std::make_unique(mr); + auto br = std::make_unique( + rapidsmpf::Statistics::disabled(), mr + ); // On rank 0, wrap the device MR with a delayed version for the shuffler. std::unique_ptr delayed_mr; @@ -1056,7 +1083,9 @@ TEST(Shuffler, opid_reuse_with_empty_partitions) { if (comm->rank() == 0) { delayed_mr = std::make_unique(mr, std::chrono::milliseconds(500)); - delayed_br = std::make_unique(*delayed_mr); + delayed_br = std::make_unique( + rapidsmpf::Statistics::disabled(), *delayed_mr + ); shuffler_br = delayed_br.get(); } diff --git a/cpp/tests/test_shuffler_many_streams.cpp b/cpp/tests/test_shuffler_many_streams.cpp index a42fea215..ea024b0ae 100644 --- a/cpp/tests/test_shuffler_many_streams.cpp +++ b/cpp/tests/test_shuffler_many_streams.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include "environment.hpp" #include "utils.hpp" @@ -50,7 +51,9 @@ TEST(ShufflerManyStreams, Test) { std::mt19937 random_generator{42}; constexpr std::size_t chunksize = 1 << 20; constexpr int num_partitions = 100; - auto br = std::make_unique(cudf::get_current_device_resource_ref()); + auto br = std::make_unique( + Statistics::disabled(), cudf::get_current_device_resource_ref() + ); // Create a CUDA stream for each partition. // To stress-test stream handling, assign random priorities so streams are more diff --git a/cpp/tests/test_sparse_alltoall.cpp b/cpp/tests/test_sparse_alltoall.cpp index ad575680d..5fc483d86 100644 --- a/cpp/tests/test_sparse_alltoall.cpp +++ b/cpp/tests/test_sparse_alltoall.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include "environment.hpp" @@ -105,7 +106,9 @@ int decode_payload(rapidsmpf::PackedData const& packed_data) { class SparseAlltoallTest : public ::testing::Test { protected: void SetUp() override { - br = std::make_unique(rmm::mr::cuda_memory_resource{}); + br = std::make_unique( + rapidsmpf::Statistics::disabled(), rmm::mr::cuda_memory_resource{} + ); } std::unique_ptr br; diff --git a/cpp/tests/test_spill_manager.cpp b/cpp/tests/test_spill_manager.cpp index e4cbc541e..9bf9d9d82 100644 --- a/cpp/tests/test_spill_manager.cpp +++ b/cpp/tests/test_spill_manager.cpp @@ -27,6 +27,7 @@ TEST(SpillManager, SpillFunction) { // Create a buffer resource that report `mem_available` as the available memory. std::int64_t mem_available = 10_KiB; BufferResource br{ + Statistics::disabled(), cudf::get_current_device_resource_ref(), rapidsmpf::PinnedMemoryResource::Disabled, {{MemoryType::DEVICE, diff --git a/cpp/tests/test_statistics.cpp b/cpp/tests/test_statistics.cpp index 2d317fed2..d4211598e 100644 --- a/cpp/tests/test_statistics.cpp +++ b/cpp/tests/test_statistics.cpp @@ -37,43 +37,43 @@ class StatisticsTest : public ::testing::Test { }; TEST_F(StatisticsTest, Disabled) { - rapidsmpf::Statistics stats(false); - EXPECT_FALSE(stats.enabled()); + auto stats = rapidsmpf::Statistics::create(false); + EXPECT_FALSE(stats->enabled()); // Disabed statistics is a no-op. - stats.add_bytes_stat("name", 1); - EXPECT_THROW(stats.get_stat("name"), std::out_of_range); - EXPECT_THAT(stats.report(), ::testing::HasSubstr("Statistics: disabled")); + stats->add_bytes_stat("name", 1); + EXPECT_THROW(stats->get_stat("name"), std::out_of_range); + EXPECT_THAT(stats->report(), ::testing::HasSubstr("Statistics: disabled")); } TEST_F(StatisticsTest, Communication) { - rapidsmpf::Statistics stats; - EXPECT_TRUE(stats.enabled()); + auto stats = rapidsmpf::Statistics::create(); + EXPECT_TRUE(stats->enabled()); - EXPECT_THROW(stats.get_stat("unknown-name"), std::out_of_range); + EXPECT_THROW(stats->get_stat("unknown-name"), std::out_of_range); // Default-formatted stat (no report entry needed). - stats.add_stat("plain-stat", 10); - stats.add_stat("plain-stat", 1); - EXPECT_EQ(stats.get_stat("plain-stat").count(), 2); - EXPECT_EQ(stats.get_stat("plain-stat").value(), 11); - EXPECT_THAT(stats.report(), ::testing::HasSubstr("plain-stat")); - EXPECT_THAT(stats.report(), ::testing::HasSubstr("11 (count 2)")); + stats->add_stat("plain-stat", 10); + stats->add_stat("plain-stat", 1); + EXPECT_EQ(stats->get_stat("plain-stat").count(), 2); + EXPECT_EQ(stats->get_stat("plain-stat").value(), 11); + EXPECT_THAT(stats->report(), ::testing::HasSubstr("plain-stat")); + EXPECT_THAT(stats->report(), ::testing::HasSubstr("11 (count 2)")); - stats.add_bytes_stat("byte-statistics", 20); - EXPECT_THAT(stats.report(), ::testing::HasSubstr("byte-statistics")); - EXPECT_THAT(stats.report(), ::testing::HasSubstr("20 B")); + stats->add_bytes_stat("byte-statistics", 20); + EXPECT_THAT(stats->report(), ::testing::HasSubstr("byte-statistics")); + EXPECT_THAT(stats->report(), ::testing::HasSubstr("20 B")); } TEST_F(StatisticsTest, AddReportEntryArityMismatchThrowsOnRender) { - rapidsmpf::Statistics stats; + auto stats = rapidsmpf::Statistics::create(); // MemoryThroughput expects 3 stats; passing one is accepted at registration but // fails when report() tries to render the entry. - stats.add_report_entry( + stats->add_report_entry( "bad", {"only-one"}, rapidsmpf::Statistics::Formatter::MemoryThroughput ); - stats.add_stat("only-one", 1.0); - EXPECT_THROW(std::ignore = stats.report(), std::out_of_range); + stats->add_stat("only-one", 1.0); + EXPECT_THROW(std::ignore = stats->report(), std::out_of_range); } TEST_F(StatisticsTest, StatMax) { @@ -91,73 +91,73 @@ TEST_F(StatisticsTest, StatMax) { } TEST_F(StatisticsTest, AddReportEntryFirstWins) { - rapidsmpf::Statistics stats; + auto stats = rapidsmpf::Statistics::create(); // The first add_report_entry wins: a Default (count-aware) entry stays // in place even after add_bytes_stat tries to upgrade it to Bytes. - stats.add_report_entry( + stats->add_report_entry( "my-bytes", {"my-bytes"}, rapidsmpf::Statistics::Formatter::Default ); - stats.add_bytes_stat("my-bytes", 1024); - EXPECT_THAT(stats.report(), ::testing::HasSubstr("my-bytes")); - EXPECT_THAT(stats.report(), ::testing::HasSubstr("1024")); - EXPECT_THAT(stats.report(), ::testing::Not(::testing::HasSubstr("KiB"))); + stats->add_bytes_stat("my-bytes", 1024); + EXPECT_THAT(stats->report(), ::testing::HasSubstr("my-bytes")); + EXPECT_THAT(stats->report(), ::testing::HasSubstr("1024")); + EXPECT_THAT(stats->report(), ::testing::Not(::testing::HasSubstr("KiB"))); } TEST_F(StatisticsTest, MultiStatReportEntry) { - rapidsmpf::Statistics stats; + auto stats = rapidsmpf::Statistics::create(); // Build a MemoryThroughput-style 3-stat report entry. - stats.add_report_entry( + stats->add_report_entry( "copy-summary", {"copy-summary-bytes", "copy-summary-time", "copy-summary-stream-delay"}, rapidsmpf::Statistics::Formatter::MemoryThroughput ); - stats.add_stat("copy-summary-bytes", 1024 * 1024); - stats.add_stat("copy-summary-time", 0.001); - stats.add_stat("copy-summary-stream-delay", 0.0001); - EXPECT_THAT(stats.report(), ::testing::HasSubstr("copy-summary")); - EXPECT_THAT(stats.report(), ::testing::HasSubstr("1 MiB")); + stats->add_stat("copy-summary-bytes", 1024 * 1024); + stats->add_stat("copy-summary-time", 0.001); + stats->add_stat("copy-summary-stream-delay", 0.0001); + EXPECT_THAT(stats->report(), ::testing::HasSubstr("copy-summary")); + EXPECT_THAT(stats->report(), ::testing::HasSubstr("1 MiB")); // Component stats are consumed by the report entry and don't emit // their own lines. EXPECT_THAT( - stats.report(), ::testing::Not(::testing::HasSubstr("copy-summary-bytes")) + stats->report(), ::testing::Not(::testing::HasSubstr("copy-summary-bytes")) ); } TEST_F(StatisticsTest, ReportNoDataCollected) { - rapidsmpf::Statistics stats; - stats.add_report_entry( + auto stats = rapidsmpf::Statistics::create(); + stats->add_report_entry( "spill-summary", {"spill-bytes", "spill-time", "spill-delay"}, rapidsmpf::Statistics::Formatter::MemoryThroughput ); // No stats recorded — entry should still appear with "No data collected". - EXPECT_THAT(stats.report(), ::testing::HasSubstr("spill-summary")); - EXPECT_THAT(stats.report(), ::testing::HasSubstr("No data collected")); + EXPECT_THAT(stats->report(), ::testing::HasSubstr("spill-summary")); + EXPECT_THAT(stats->report(), ::testing::HasSubstr("No data collected")); // Adding only one of the three required stats still yields "No data collected". - stats.add_stat("spill-bytes", 1024 * 1024); - EXPECT_THAT(stats.report(), ::testing::HasSubstr("No data collected")); - EXPECT_THAT(stats.report(), ::testing::HasSubstr("spill-bytes")); // uncovered + stats->add_stat("spill-bytes", 1024 * 1024); + EXPECT_THAT(stats->report(), ::testing::HasSubstr("No data collected")); + EXPECT_THAT(stats->report(), ::testing::HasSubstr("spill-bytes")); // uncovered } TEST_F(StatisticsTest, ReportSorting) { - rapidsmpf::Statistics stats; + auto stats = rapidsmpf::Statistics::create(); - stats.add_stat("banana", 2); - stats.add_report_entry( + stats->add_stat("banana", 2); + stats->add_report_entry( "banana", {"banana"}, rapidsmpf::Statistics::Formatter::Default ); - stats.add_stat("cherry", 3); - stats.add_report_entry( + stats->add_stat("cherry", 3); + stats->add_report_entry( "cherry", {"cherry"}, rapidsmpf::Statistics::Formatter::Default ); // Uncovered raw stats for "apple" and "date". - stats.add_stat("apple", 1); - stats.add_stat("date", 4); + stats->add_stat("apple", 1); + stats->add_stat("date", 4); - auto const r = stats.report(); + auto const r = stats->report(); auto const pos_apple = r.find("apple"); auto const pos_banana = r.find("banana"); auto const pos_cherry = r.find("cherry"); @@ -176,12 +176,12 @@ TEST_F(StatisticsTest, ReportSorting) { TEST_F(StatisticsTest, MemoryProfiler) { rapidsmpf::RmmResourceAdaptor mr{cudf::get_current_device_resource_ref()}; auto pinned_mr = rapidsmpf::PinnedMemoryResource::make_if_available(); - rapidsmpf::Statistics stats; + auto stats = rapidsmpf::Statistics::create(); auto stream = cudf::get_default_stream(); // Outer scope { - auto outer = stats.create_memory_recorder(mr, "outer"); + auto outer = stats->create_memory_recorder(mr, "outer"); void* ptr1 = mr.allocate_sync(1_MiB); // +1 MiB void* ptr2 = mr.allocate_sync(1_MiB); // +2 MiB mr.deallocate_sync(ptr1, 1_MiB); @@ -189,7 +189,7 @@ TEST_F(StatisticsTest, MemoryProfiler) { // Nested scope { - auto inner = stats.create_memory_recorder(mr, "inner"); + auto inner = stats->create_memory_recorder(mr, "inner"); void* ptr3 = mr.allocate_sync(1_MiB); // +1 MiB mr.deallocate_sync(ptr3, 1_MiB); } @@ -205,7 +205,7 @@ TEST_F(StatisticsTest, MemoryProfiler) { } stream.synchronize(); } - auto const& records = stats.get_memory_records(); + auto const& records = stats->get_memory_records(); // Verify outer EXPECT_EQ(records.at("outer").num_calls, 1); @@ -221,7 +221,7 @@ TEST_F(StatisticsTest, MemoryProfiler) { // We can call the same name multiple times. { - auto outer = stats.create_memory_recorder(mr, "outer"); + auto outer = stats->create_memory_recorder(mr, "outer"); mr.deallocate_sync(mr.allocate_sync(1_MiB), 1_MiB); } EXPECT_EQ(records.at("outer").num_calls, 2); @@ -229,7 +229,7 @@ TEST_F(StatisticsTest, MemoryProfiler) { EXPECT_EQ(records.at("outer").scoped.peak(), 2_MiB); EXPECT_EQ(records.at("outer").scoped.total(), 4_MiB); - auto const report = stats.report({.mr = mr, .pinned_mr = pinned_mr}); + auto const report = stats->report({.mr = mr, .pinned_mr = pinned_mr}); // Split the report on newlines and find the "main" record line. std::string main_line, pinned_line; @@ -270,11 +270,14 @@ TEST_F(StatisticsTest, MemoryProfiler) { TEST_F(StatisticsTest, MemoryProfilerDisabled) { rapidsmpf::RmmResourceAdaptor mr{cudf::get_current_device_resource_ref()}; - rapidsmpf::Statistics stats(false); - + auto stats = rapidsmpf::Statistics::create(false); + { + auto const& records = stats->get_memory_records(); + EXPECT_TRUE(records.empty()); + } // Outer scope — pass nullopt so the recorder is a no-op. { - auto outer = stats.create_memory_recorder(std::nullopt, "outer"); + auto outer = stats->create_memory_recorder(std::nullopt, "outer"); void* ptr1 = mr.allocate_sync(1_MiB); // +1 MiB void* ptr2 = mr.allocate_sync(1_MiB); // +2 MiB mr.deallocate_sync(ptr1, 1_MiB); @@ -282,23 +285,23 @@ TEST_F(StatisticsTest, MemoryProfilerDisabled) { // Nested scope { - auto inner = stats.create_memory_recorder(std::nullopt, "inner"); + auto inner = stats->create_memory_recorder(std::nullopt, "inner"); void* ptr3 = mr.allocate_sync(1_MiB); // +1 MiB mr.deallocate_sync(ptr3, 1_MiB); } } - auto const& records = stats.get_memory_records(); + auto const& records = stats->get_memory_records(); EXPECT_TRUE(records.empty()); } TEST_F(StatisticsTest, MemoryProfilerMacro) { rapidsmpf::RmmResourceAdaptor mr{cudf::get_current_device_resource_ref()}; - rapidsmpf::Statistics stats; + auto stats = rapidsmpf::Statistics::create(); { RAPIDSMPF_MEMORY_PROFILE(stats, mr); mr.deallocate_sync(mr.allocate_sync(1_MiB), 1_MiB); } - auto const& records = stats.get_memory_records(); + auto const& records = stats->get_memory_records(); ASSERT_EQ(records.size(), 1); auto const& entry = *records.begin(); EXPECT_TRUE(entry.first.find("test_statistics.cpp") != std::string::npos); @@ -308,23 +311,23 @@ TEST_F(StatisticsTest, MemoryProfilerMacro) { TEST_F(StatisticsTest, MemoryProfilerMacroDisabled) { rapidsmpf::RmmResourceAdaptor mr{cudf::get_current_device_resource_ref()}; - rapidsmpf::Statistics stats(false); + auto stats = rapidsmpf::Statistics::create(false); { RAPIDSMPF_MEMORY_PROFILE(stats, std::nullopt); mr.deallocate_sync(mr.allocate_sync(1_MiB), 1_MiB); } - auto const& records = stats.get_memory_records(); + auto const& records = stats->get_memory_records(); EXPECT_TRUE(records.empty()); } TEST_F(StatisticsTest, JsonStream) { - rapidsmpf::Statistics stats; - stats.add_stat("foo", 10.0); - stats.add_stat("foo", 5.0); // count=2, value=15, max=10 - stats.add_bytes_stat("bar", 1024); + auto stats = rapidsmpf::Statistics::create(); + stats->add_stat("foo", 10.0); + stats->add_stat("foo", 5.0); // count=2, value=15, max=10 + stats->add_bytes_stat("bar", 1024); std::ostringstream ss; - stats.write_json(ss); + stats->write_json(ss); auto const& s = ss.str(); EXPECT_THAT(s, ::testing::HasSubstr(R"("foo")")); @@ -339,31 +342,31 @@ TEST_F(StatisticsTest, JsonStream) { } TEST_F(StatisticsTest, InvalidStatNames) { - rapidsmpf::Statistics stats; - stats.add_stat("has\"quote", 1.0); - stats.add_stat("has\\backslash", 2.0); + auto stats = rapidsmpf::Statistics::create(); + stats->add_stat("has\"quote", 1.0); + stats->add_stat("has\\backslash", 2.0); std::ostringstream ss; - EXPECT_THROW(stats.write_json(ss), std::invalid_argument); + EXPECT_THROW(stats->write_json(ss), std::invalid_argument); } TEST_F(StatisticsTest, InvalidMemoryRecordNames) { rapidsmpf::RmmResourceAdaptor mr{cudf::get_current_device_resource_ref()}; - rapidsmpf::Statistics stats; - std::ignore = stats.create_memory_recorder(mr, "bad\"name"); + auto stats = rapidsmpf::Statistics::create(); + std::ignore = stats->create_memory_recorder(mr, "bad\"name"); std::ostringstream ss; - EXPECT_THROW(stats.write_json(ss), std::invalid_argument); + EXPECT_THROW(stats->write_json(ss), std::invalid_argument); } TEST_F(StatisticsTest, JsonMemoryRecords) { rapidsmpf::RmmResourceAdaptor mr{cudf::get_current_device_resource_ref()}; - rapidsmpf::Statistics stats; + auto stats = rapidsmpf::Statistics::create(); { - auto rec = stats.create_memory_recorder(mr, "alloc"); + auto rec = stats->create_memory_recorder(mr, "alloc"); mr.deallocate_sync(mr.allocate_sync(1_MiB), 1_MiB); } std::ostringstream ss; - stats.write_json(ss); + stats->write_json(ss); auto const& s = ss.str(); EXPECT_THAT(s, ::testing::HasSubstr("memory_records")); @@ -375,14 +378,14 @@ TEST_F(StatisticsTest, JsonMemoryRecords) { } TEST_F(StatisticsTest, JsonReport) { - rapidsmpf::Statistics stats; - stats.add_stat("foo", 10.0); - stats.add_stat("foo", 5.0); // count=2, value=15, max=10 - stats.add_bytes_stat("bar", 1024); + auto stats = rapidsmpf::Statistics::create(); + stats->add_stat("foo", 10.0); + stats->add_stat("foo", 5.0); // count=2, value=15, max=10 + stats->add_bytes_stat("bar", 1024); TempDir tmp_dir; auto const path = tmp_dir.path() / "stats.json"; - stats.write_json(path); + stats->write_json(path); std::ifstream f(path); ASSERT_TRUE(f.is_open()); @@ -391,7 +394,7 @@ TEST_F(StatisticsTest, JsonReport) { ); std::ostringstream ss; - stats.write_json(ss); + stats->write_json(ss); EXPECT_EQ(file_contents, ss.str()); } @@ -403,23 +406,23 @@ TEST_F(StatisticsTest, StatConstructor) { } TEST_F(StatisticsTest, SerializeRoundTrip) { - rapidsmpf::Statistics stats; - stats.add_stat("alpha", 10.0); - stats.add_stat("alpha", 5.0); // count=2, value=15, max=10 - stats.add_stat("beta", 3.0); + auto stats = rapidsmpf::Statistics::create(); + stats->add_stat("alpha", 10.0); + stats->add_stat("alpha", 5.0); // count=2, value=15, max=10 + stats->add_stat("beta", 3.0); - auto const bytes = stats.serialize(); + auto const bytes = stats->serialize(); auto deserialized = rapidsmpf::Statistics::deserialize(bytes); EXPECT_TRUE(deserialized->enabled()); - EXPECT_EQ(deserialized->get_stat("alpha"), stats.get_stat("alpha")); - EXPECT_EQ(deserialized->get_stat("beta"), stats.get_stat("beta")); + EXPECT_EQ(deserialized->get_stat("alpha"), stats->get_stat("alpha")); + EXPECT_EQ(deserialized->get_stat("beta"), stats->get_stat("beta")); EXPECT_EQ(deserialized->list_stat_names().size(), 2); } TEST_F(StatisticsTest, SerializeEmpty) { - rapidsmpf::Statistics stats; - auto const bytes = stats.serialize(); + auto stats = rapidsmpf::Statistics::create(); + auto const bytes = stats->serialize(); auto deserialized = rapidsmpf::Statistics::deserialize(bytes); EXPECT_TRUE(deserialized->enabled()); @@ -467,38 +470,38 @@ TEST_F(StatisticsTest, DeserializeRejectsOutOfRangeFormatter) { TEST_F(StatisticsTest, SerializeRoundTripPreservesEnabledFlag) { // A disabled Statistics should come back disabled after a round-trip. - rapidsmpf::Statistics disabled(false); - auto const bytes = disabled.serialize(); + auto disabled = rapidsmpf::Statistics::create(false); + auto const bytes = disabled->serialize(); auto deserialized = rapidsmpf::Statistics::deserialize(bytes); EXPECT_FALSE(deserialized->enabled()); // And an enabled one comes back enabled. - rapidsmpf::Statistics enabled(true); - auto const bytes2 = enabled.serialize(); + auto enabled = rapidsmpf::Statistics::create(true); + auto const bytes2 = enabled->serialize(); auto deserialized2 = rapidsmpf::Statistics::deserialize(bytes2); EXPECT_TRUE(deserialized2->enabled()); } TEST_F(StatisticsTest, SerializeRoundTripWithReportEntries) { - rapidsmpf::Statistics stats; - stats.add_bytes_stat("alpha", 2048); // Bytes entry - stats.add_duration_stat("beta", rapidsmpf::Duration{0.005}); // Duration entry - stats.add_report_entry( + auto stats = rapidsmpf::Statistics::create(); + stats->add_bytes_stat("alpha", 2048); // Bytes entry + stats->add_duration_stat("beta", rapidsmpf::Duration{0.005}); // Duration entry + stats->add_report_entry( "copy", {"copy-bytes", "copy-time", "copy-delay"}, rapidsmpf::Statistics::Formatter::MemoryThroughput ); - stats.add_stat("copy-bytes", 1024.0 * 1024.0); - stats.add_stat("copy-time", 0.002); - stats.add_stat("copy-delay", 0.00001); + stats->add_stat("copy-bytes", 1024.0 * 1024.0); + stats->add_stat("copy-time", 0.002); + stats->add_stat("copy-delay", 0.00001); - auto const bytes = stats.serialize(); + auto const bytes = stats->serialize(); auto deserialized = rapidsmpf::Statistics::deserialize(bytes); // Stats round-trip numerically. - EXPECT_EQ(deserialized->get_stat("alpha"), stats.get_stat("alpha")); - EXPECT_EQ(deserialized->get_stat("beta"), stats.get_stat("beta")); - EXPECT_EQ(deserialized->get_stat("copy-bytes"), stats.get_stat("copy-bytes")); + EXPECT_EQ(deserialized->get_stat("alpha"), stats->get_stat("alpha")); + EXPECT_EQ(deserialized->get_stat("beta"), stats->get_stat("beta")); + EXPECT_EQ(deserialized->get_stat("copy-bytes"), stats->get_stat("copy-bytes")); // And crucially, formatter metadata is preserved: the deserialized // report renders formatted values, not raw numbers. @@ -508,22 +511,22 @@ TEST_F(StatisticsTest, SerializeRoundTripWithReportEntries) { } TEST_F(StatisticsTest, Copy) { - rapidsmpf::Statistics stats; - stats.add_bytes_stat("x", 2048); // registers a Bytes report entry + auto stats = rapidsmpf::Statistics::create(); + stats->add_bytes_stat("x", 2048); // registers a Bytes report entry - auto copied = stats.copy(); + auto copied = stats->copy(); EXPECT_TRUE(copied->enabled()); - EXPECT_EQ(copied->get_stat("x"), stats.get_stat("x")); + EXPECT_EQ(copied->get_stat("x"), stats->get_stat("x")); // The Bytes formatter carried over the copy. EXPECT_THAT(copied->report(), ::testing::HasSubstr("2 KiB")); } TEST_F(StatisticsTest, MergeOverlapping) { - auto a = std::make_shared(); + auto a = rapidsmpf::Statistics::create(); a->add_stat("x", 10.0); a->add_stat("x", 3.0); // count=2, value=13, max=10 - auto b = std::make_shared(); + auto b = rapidsmpf::Statistics::create(); b->add_stat("x", 7.0); // count=1, value=7, max=7 std::vector> inputs{a, b}; @@ -535,10 +538,10 @@ TEST_F(StatisticsTest, MergeOverlapping) { } TEST_F(StatisticsTest, MergeDisjoint) { - auto a = std::make_shared(); + auto a = rapidsmpf::Statistics::create(); a->add_stat("x", 1.0); - auto b = std::make_shared(); + auto b = rapidsmpf::Statistics::create(); b->add_stat("y", 2.0); std::vector> inputs{a, b}; @@ -549,10 +552,10 @@ TEST_F(StatisticsTest, MergeDisjoint) { } TEST_F(StatisticsTest, MergeWithEmpty) { - auto a = std::make_shared(); + auto a = rapidsmpf::Statistics::create(); a->add_stat("x", 5.0); - auto empty = std::make_shared(); + auto empty = rapidsmpf::Statistics::create(); std::vector> inputs{a, empty}; auto merged = rapidsmpf::Statistics::merge(std::span{inputs}); @@ -566,10 +569,10 @@ TEST_F(StatisticsTest, MergeWithEmpty) { } TEST_F(StatisticsTest, MergeCombinesReportEntries) { - auto a = std::make_shared(); + auto a = rapidsmpf::Statistics::create(); a->add_bytes_stat("x", 10); // Bytes report entry - auto b = std::make_shared(); + auto b = rapidsmpf::Statistics::create(); b->add_stat("x", 5.0); // no formatter on this side // Merging a (has Bytes entry) with b: result uses a's entry. @@ -584,13 +587,13 @@ TEST_F(StatisticsTest, MergeCombinesReportEntries) { } TEST_F(StatisticsTest, MergeMultiple) { - auto a = std::make_shared(); + auto a = rapidsmpf::Statistics::create(); a->add_stat("x", 1.0); - auto b = std::make_shared(); + auto b = rapidsmpf::Statistics::create(); b->add_stat("x", 2.0); - auto c = std::make_shared(); + auto c = rapidsmpf::Statistics::create(); c->add_stat("y", 10.0); std::vector> inputs{a, b, c}; @@ -610,7 +613,7 @@ TEST_F(StatisticsTest, MergeRejectsEmptySpan) { } TEST_F(StatisticsTest, MergeRejectsNullElement) { - auto a = std::make_shared(); + auto a = rapidsmpf::Statistics::create(); std::vector> inputs{a, nullptr}; EXPECT_THROW( std::ignore = rapidsmpf::Statistics::merge(std::span{inputs}), @@ -619,11 +622,11 @@ TEST_F(StatisticsTest, MergeRejectsNullElement) { } TEST_F(StatisticsTest, MergeRejectsConflictingFormatter) { - auto a = std::make_shared(); + auto a = rapidsmpf::Statistics::create(); a->add_report_entry("x", {"x"}, rapidsmpf::Statistics::Formatter::Bytes); a->add_stat("x", 1.0); - auto b = std::make_shared(); + auto b = rapidsmpf::Statistics::create(); b->add_report_entry("x", {"x"}, rapidsmpf::Statistics::Formatter::Duration); b->add_stat("x", 2.0); @@ -637,10 +640,10 @@ TEST_F(StatisticsTest, MergeRejectsConflictingFormatter) { TEST_F(StatisticsTest, MergeIdenticalReportEntries) { // Two inputs with the same report entry (same formatter + stat_names) // must merge cleanly — no conflict. - auto a = std::make_shared(); + auto a = rapidsmpf::Statistics::create(); a->add_bytes_stat("x", 10); - auto b = std::make_shared(); + auto b = rapidsmpf::Statistics::create(); b->add_bytes_stat("x", 20); std::vector> inputs{a, b}; @@ -649,8 +652,8 @@ TEST_F(StatisticsTest, MergeIdenticalReportEntries) { } TEST_F(StatisticsTest, MergeEnabledFlagPropagates) { - auto enabled = std::make_shared(true); - auto disabled = std::make_shared(false); + auto enabled = rapidsmpf::Statistics::create(true); + auto disabled = rapidsmpf::Statistics::create(false); // disabled + disabled -> disabled. std::vector> both_off{disabled, disabled}; @@ -662,12 +665,12 @@ TEST_F(StatisticsTest, MergeEnabledFlagPropagates) { } TEST_F(StatisticsTest, MergeRejectsConflictingStatNames) { - auto a = std::make_shared(); + auto a = rapidsmpf::Statistics::create(); a->add_report_entry( "copy", {"b1", "t1", "d1"}, rapidsmpf::Statistics::Formatter::MemoryThroughput ); - auto b = std::make_shared(); + auto b = rapidsmpf::Statistics::create(); b->add_report_entry( "copy", {"b2", "t2", "d2"}, rapidsmpf::Statistics::Formatter::MemoryThroughput ); diff --git a/cpp/tests/test_stream_ordered_timing.cpp b/cpp/tests/test_stream_ordered_timing.cpp index 9ad786ef1..7661b166d 100644 --- a/cpp/tests/test_stream_ordered_timing.cpp +++ b/cpp/tests/test_stream_ordered_timing.cpp @@ -32,7 +32,7 @@ TEST(StreamOrderedTiming, Disabled) { TEST(StreamOrderedTiming, RecordsDuration) { // A positive duration is recorded under the given name after stream sync. rmm::cuda_stream stream; - auto stats = std::make_shared(); + auto stats = Statistics::create(); { StreamOrderedTiming timing{stream.view(), stats}; // Do a small GPU operation so there is measurable work between start and stop. @@ -49,7 +49,7 @@ TEST(StreamOrderedTiming, RecordsDuration) { TEST(StreamOrderedTiming, MultipleTimings) { // Repeated timings for the same name accumulate: count equals the number of timings. rmm::cuda_stream stream; - auto stats = std::make_shared(); + auto stats = Statistics::create(); constexpr int n = 3; for (int i = 0; i < n; ++i) { StreamOrderedTiming timing{stream.view(), stats}; @@ -67,8 +67,8 @@ TEST(StreamOrderedTiming, Cancel) { // cancelled timings are not recorded, while timings for other Statistics objects // are still recorded. Calling cancel when no timings are pending is a no-op. rmm::cuda_stream stream; - auto stats_a = std::make_shared(); - auto stats_b = std::make_shared(); + auto stats_a = Statistics::create(); + auto stats_b = Statistics::create(); // Block stream: no callbacks will execute until gate.open() is called. struct Gate { @@ -109,7 +109,7 @@ TEST(StreamOrderedTiming, Cancel) { TEST(StreamOrderedTiming, StreamDelay) { rmm::cuda_stream stream; - auto stats = std::make_shared(); + auto stats = Statistics::create(); // With a stream_delay_name, the delay stat is recorded and is >= 0. { @@ -122,7 +122,7 @@ TEST(StreamOrderedTiming, StreamDelay) { EXPECT_GE(delay.value(), 0.0); // With std::nullopt (the default), no stream-delay stat is written. - auto stats2 = std::make_shared(); + auto stats2 = Statistics::create(); { StreamOrderedTiming timing{stream.view(), stats2}; timing.stop_and_record("my-timing"); @@ -136,7 +136,7 @@ TEST(StreamOrderedTiming, StatisticsDestroyedBeforeStreamSync) { // the callbacks detect the expired weak_ptr and return without crashing. rmm::cuda_stream stream; { - auto stats = std::make_shared(); + auto stats = Statistics::create(); StreamOrderedTiming timing{stream.view(), stats}; timing.stop_and_record("my-timing"); // Both `timing` and `stats` go out of scope here. The shared_ptr count diff --git a/python/rapidsmpf/rapidsmpf/benchmarks/streaming_benchmark.py b/python/rapidsmpf/rapidsmpf/benchmarks/streaming_benchmark.py index bfd568907..4508fbcfe 100644 --- a/python/rapidsmpf/rapidsmpf/benchmarks/streaming_benchmark.py +++ b/python/rapidsmpf/rapidsmpf/benchmarks/streaming_benchmark.py @@ -262,7 +262,7 @@ def setup_and_run(args: argparse.Namespace) -> None: if args.spill_device is None else {MemoryType.DEVICE: LimitAvailableMemory(mr, limit=args.spill_device)} ) - br = BufferResource(mr, memory_available=memory_available, statistics=stats) + br = BufferResource(stats, mr, memory_available=memory_available) args.out_nparts = args.out_nparts if args.out_nparts is not None else comm.nranks args.part_size = args.part_size if args.part_size is not None else args.local_size diff --git a/python/rapidsmpf/rapidsmpf/coll/allgather.pxd b/python/rapidsmpf/rapidsmpf/coll/allgather.pxd index 09ba8bf2c..c3f9fd533 100644 --- a/python/rapidsmpf/rapidsmpf/coll/allgather.pxd +++ b/python/rapidsmpf/rapidsmpf/coll/allgather.pxd @@ -14,7 +14,6 @@ from rapidsmpf.memory.buffer_resource cimport (BufferResource, cpp_BufferResource) from rapidsmpf.memory.packed_data cimport cpp_PackedData from rapidsmpf.progress_thread cimport cpp_ProgressThread -from rapidsmpf.statistics cimport cpp_Statistics cdef extern from "" namespace \ @@ -31,7 +30,6 @@ cdef extern from "" nogil: shared_ptr[cpp_Communicator] comm, int32_t op_id, cpp_BufferResource *br, - shared_ptr[cpp_Statistics] statistics ) except +ex_handler void insert(uint64_t sequence_number, cpp_PackedData packed_data) \ except +ex_handler diff --git a/python/rapidsmpf/rapidsmpf/coll/allgather.pyi b/python/rapidsmpf/rapidsmpf/coll/allgather.pyi index e89d6858a..108f2d037 100644 --- a/python/rapidsmpf/rapidsmpf/coll/allgather.pyi +++ b/python/rapidsmpf/rapidsmpf/coll/allgather.pyi @@ -5,7 +5,6 @@ from __future__ import annotations from rapidsmpf.communicator.communicator import Communicator from rapidsmpf.memory.buffer_resource import BufferResource from rapidsmpf.memory.packed_data import PackedData -from rapidsmpf.statistics import Statistics class AllGather: def __init__( @@ -13,7 +12,6 @@ class AllGather: comm: Communicator, op_id: int, br: BufferResource, - statistics: Statistics | None = None, ) -> None: ... @property def comm(self) -> Communicator: ... diff --git a/python/rapidsmpf/rapidsmpf/coll/allgather.pyx b/python/rapidsmpf/rapidsmpf/coll/allgather.pyx index 01d345897..3c84634be 100644 --- a/python/rapidsmpf/rapidsmpf/coll/allgather.pyx +++ b/python/rapidsmpf/rapidsmpf/coll/allgather.pyx @@ -14,7 +14,6 @@ from rapidsmpf.memory.buffer_resource cimport (BufferResource, cpp_BufferResource) from rapidsmpf.memory.packed_data cimport (PackedData, cpp_PackedData, packed_data_vector_to_list) -from rapidsmpf.statistics cimport Statistics cdef class AllGather: @@ -37,8 +36,6 @@ cdef class AllGather: between 0 and 2^20 - 1. br Buffer resource for memory allocation. - statistics - Statistics collection instance. If None, statistics is disabled. Notes ----- @@ -52,19 +49,15 @@ cdef class AllGather: Communicator comm not None, int32_t op_id, BufferResource br not None, - Statistics statistics = None, ): self._br = br self._comm = comm cdef cpp_BufferResource* br_ = br.ptr() - if statistics is None: - statistics = Statistics(enable=False) # Disables statistics. with nogil: self._handle = make_unique[cpp_AllGather]( comm._handle, op_id, br_, - statistics._handle, ) def __dealloc__(self): diff --git a/python/rapidsmpf/rapidsmpf/examples/bulk_mpi_shuffle.py b/python/rapidsmpf/rapidsmpf/examples/bulk_mpi_shuffle.py index 892dbb1fd..ec3f8880b 100644 --- a/python/rapidsmpf/rapidsmpf/examples/bulk_mpi_shuffle.py +++ b/python/rapidsmpf/rapidsmpf/examples/bulk_mpi_shuffle.py @@ -158,7 +158,7 @@ def bulk_mpi_shuffle( baseline Whether to skip the shuffle and run a simple IO baseline. statistics - The statistics instance to use. If None, statistics is disabled. + The statistics instance to use. If None, ``Statistics.disabled()`` is used. Notes ----- @@ -192,7 +192,10 @@ def bulk_mpi_shuffle( columns, ) else: - br = BufferResource(rmm.mr.get_current_device_resource()) + br = BufferResource( + statistics or Statistics.disabled(), + rmm.mr.get_current_device_resource(), + ) shuffler = Shuffler( comm, op_id=0, @@ -314,9 +317,8 @@ def setup_and_run(args: argparse.Namespace) -> None: if args.spill_device is None else {MemoryType.DEVICE: LimitAvailableMemory(mr, limit=args.spill_device)} ) - br = BufferResource(mr, memory_available=memory_available) - stats = Statistics(enable=args.statistics) + br = BufferResource(stats, mr, memory_available=memory_available) progress_thread = ProgressThread(stats) if args.cluster_type == "mpi": diff --git a/python/rapidsmpf/rapidsmpf/examples/ray/bulk_ray_shuffle.py b/python/rapidsmpf/rapidsmpf/examples/ray/bulk_ray_shuffle.py index 7d86a3eaf..fc2bc4b20 100644 --- a/python/rapidsmpf/rapidsmpf/examples/ray/bulk_ray_shuffle.py +++ b/python/rapidsmpf/rapidsmpf/examples/ray/bulk_ray_shuffle.py @@ -97,9 +97,10 @@ def __init__( ) } ) - br = BufferResource(self.mr, memory_available=memory_available) + stats = Statistics(enable=enable_statistics) + br = BufferResource(stats, self.mr, memory_available=memory_available) self.br = br - super().__init__(nranks, Statistics(enable=enable_statistics)) + super().__init__(nranks, stats) def setup_worker(self, root_address_bytes: bytes) -> None: """ diff --git a/python/rapidsmpf/rapidsmpf/examples/ray/ray_shuffle_example.py b/python/rapidsmpf/rapidsmpf/examples/ray/ray_shuffle_example.py index 6b99c2ef2..250f99863 100644 --- a/python/rapidsmpf/rapidsmpf/examples/ray/ray_shuffle_example.py +++ b/python/rapidsmpf/rapidsmpf/examples/ray/ray_shuffle_example.py @@ -21,6 +21,7 @@ from rapidsmpf.integrations.ray import RapidsMPFActor, setup_ray_ucxx_cluster from rapidsmpf.memory.buffer_resource import BufferResource from rapidsmpf.shuffler import Shuffler +from rapidsmpf.statistics import Statistics from rapidsmpf.testing import assert_eq from rapidsmpf.utils.cudf import ( cudf_to_pylibcudf_table, @@ -89,7 +90,7 @@ def run(self) -> None: column_names = list(df.columns) mr = rmm.mr.get_current_device_resource() - br = BufferResource(mr) + br = BufferResource(Statistics.disabled(), mr) stream = DEFAULT_STREAM # use the default stream # Calculate the expected output partitions on all ranks diff --git a/python/rapidsmpf/rapidsmpf/examples/streaming/basic_example.py b/python/rapidsmpf/rapidsmpf/examples/streaming/basic_example.py index 61d3f52ff..5dbba9ed5 100644 --- a/python/rapidsmpf/rapidsmpf/examples/streaming/basic_example.py +++ b/python/rapidsmpf/rapidsmpf/examples/streaming/basic_example.py @@ -18,6 +18,7 @@ from rapidsmpf.memory.buffer_resource import BufferResource from rapidsmpf.progress_thread import ProgressThread from rapidsmpf.rmm_resource_adaptor import RmmResourceAdaptor +from rapidsmpf.statistics import Statistics from rapidsmpf.streaming.core.actor import ( define_actor, run_actor_network, @@ -41,10 +42,14 @@ def main() -> int: options = Options(get_environment_variables()) # Create a communicator and context that will be used by all streaming actors. - comm = single_process_comm(options, ProgressThread()) + stats = Statistics.from_options(options) + comm = single_process_comm(options, ProgressThread(stats)) ctx = Context( logger=comm.logger, - br=BufferResource(RmmResourceAdaptor(rmm.mr.get_current_device_resource())), + br=BufferResource( + stats, + RmmResourceAdaptor(rmm.mr.get_current_device_resource()), + ), options=options, ) diff --git a/python/rapidsmpf/rapidsmpf/integrations/core.py b/python/rapidsmpf/rapidsmpf/integrations/core.py index 48713cc70..d8893acc2 100644 --- a/python/rapidsmpf/rapidsmpf/integrations/core.py +++ b/python/rapidsmpf/rapidsmpf/integrations/core.py @@ -24,12 +24,12 @@ from rapidsmpf.memory.spill_collection import SpillCollection from rapidsmpf.rmm_resource_adaptor import RmmResourceAdaptor from rapidsmpf.shuffler import Shuffler +from rapidsmpf.statistics import Statistics if TYPE_CHECKING: from collections.abc import Callable, Sequence from rapidsmpf.communicator.communicator import Communicator - from rapidsmpf.statistics import Statistics DataFrameT = TypeVar("DataFrameT") @@ -95,7 +95,7 @@ class WorkerContext: br The buffer resource used by the worker exclusively. statistics - The statistics used by the worker. If None, statistics is disabled. + The statistics used by the worker. comm The communicator connected to all other workers. spill_collection @@ -748,8 +748,8 @@ def rmpf_worker_local_setup( options = Options(options_map) # use options to create the buffer resource - br = BufferResource.from_options(mr, options) - statistics = br.statistics + statistics = Statistics.from_options(options) + br = BufferResource.from_options(statistics, mr, options) # If enabled, create a staging device buffer for the spilling to reduce # device memory pressure. diff --git a/python/rapidsmpf/rapidsmpf/memory/buffer_resource.pyi b/python/rapidsmpf/rapidsmpf/memory/buffer_resource.pyi index b3577f8c4..fdb713641 100644 --- a/python/rapidsmpf/rapidsmpf/memory/buffer_resource.pyi +++ b/python/rapidsmpf/rapidsmpf/memory/buffer_resource.pyi @@ -18,6 +18,7 @@ from rapidsmpf.statistics import Statistics class BufferResource: def __init__( self, + statistics: Statistics, device_mr: DeviceMemoryResource, *, pinned_mr: PinnedMemoryResource | None = None, @@ -26,11 +27,13 @@ class BufferResource: | None = None, periodic_spill_check: float | None = 1e-3, stream_pool: CudaStreamPool | None = None, - statistics: Statistics | None = None, ) -> None: ... @classmethod def from_options( - cls: type[Self], mr: RmmResourceAdaptor, options: Options + cls: type[Self], + statistics: Statistics, + mr: RmmResourceAdaptor, + options: Options, ) -> Self: ... @property def device_mr(self) -> DeviceMemoryResource: ... diff --git a/python/rapidsmpf/rapidsmpf/memory/buffer_resource.pyx b/python/rapidsmpf/rapidsmpf/memory/buffer_resource.pyx index 0104e85be..5b9f8ad75 100644 --- a/python/rapidsmpf/rapidsmpf/memory/buffer_resource.pyx +++ b/python/rapidsmpf/rapidsmpf/memory/buffer_resource.pyx @@ -158,18 +158,18 @@ cdef class BufferResource: will be created. Must be an instance of ``rmm.pylibrmm.cuda_stream_pool.CudaStreamPool``. statistics - The statistics instance to use. If None, a disabled statistics instance - will be created. + The statistics instance to use. Required. Pass + ``Statistics.disabled()`` for a no-op recorder. """ def __cinit__( self, + Statistics statistics not None, DeviceMemoryResource device_mr not None, *, PinnedMemoryResource pinned_mr = None, memory_available = None, periodic_spill_check = 1e-3, stream_pool = None, - statistics = None, ): cdef unordered_map[MemoryType, cpp_MemoryAvailable] _mem_available if isinstance(memory_available, AvailableMemoryMap): @@ -212,13 +212,9 @@ cdef class BufferResource: ) ) - if statistics is None: - statistics = Statistics(enable=False) - # Keep statistics alive self._statistics = statistics - # checked cast requires the GIL - stats_handle = (statistics)._handle + stats_handle = statistics._handle # Stored for the Python device_mr/pinned_mr property accessors. # The C++ BufferResource owns the resource via any_resource. @@ -229,17 +225,22 @@ cdef class BufferResource: cpp_pinned_mr = self._pinned_mr._handle with nogil: self._handle = make_shared[cpp_BufferResource]( + stats_handle, make_any_device_resource(device_mr.get_mr()), cpp_pinned_mr, move(_mem_available), period, cpp_stream_pool, - stats_handle, ) self.spill_manager = SpillManager._create(self) @classmethod - def from_options(cls, RmmResourceAdaptor mr not None, Options options not None): + def from_options( + cls, + Statistics statistics not None, + RmmResourceAdaptor mr not None, + Options options not None, + ): """ Construct a BufferResource from configuration options. @@ -248,6 +249,9 @@ cdef class BufferResource: Parameters ---------- + statistics + The statistics instance to use. The caller is responsible for creating and + owning this object. Pass ``Statistics.disabled()`` for a no-op recorder. mr RMM resource adaptor. The adaptor must outlive the returned BufferResource. options @@ -259,12 +263,12 @@ cdef class BufferResource: """ cdef PinnedMemoryResource pinned_mr = PinnedMemoryResource.from_options(options) return cls( + statistics=statistics, device_mr=mr, pinned_mr=pinned_mr, memory_available=AvailableMemoryMap.from_options(mr, options), periodic_spill_check=periodic_spill_check_from_options(options), stream_pool=stream_pool_from_options(options), - statistics=Statistics.from_options(options), ) def __dealloc__(self): diff --git a/python/rapidsmpf/rapidsmpf/progress_thread.pyi b/python/rapidsmpf/rapidsmpf/progress_thread.pyi index fba04eb35..3207f6ec9 100644 --- a/python/rapidsmpf/rapidsmpf/progress_thread.pyi +++ b/python/rapidsmpf/rapidsmpf/progress_thread.pyi @@ -5,4 +5,4 @@ from __future__ import annotations from rapidsmpf.statistics import Statistics class ProgressThread: - def __init__(self, statistics: Statistics | None = None) -> None: ... + def __init__(self, statistics: Statistics) -> None: ... diff --git a/python/rapidsmpf/rapidsmpf/progress_thread.pyx b/python/rapidsmpf/rapidsmpf/progress_thread.pyx index f4deee800..2de803241 100644 --- a/python/rapidsmpf/rapidsmpf/progress_thread.pyx +++ b/python/rapidsmpf/rapidsmpf/progress_thread.pyx @@ -19,7 +19,8 @@ cdef class ProgressThread: Parameters ---------- statistics - The statistics instance to use. If None, statistics is disabled. + The statistics instance to use. Required. Pass + ``Statistics.disabled()`` for a no-op recorder. Notes ----- @@ -29,11 +30,8 @@ cdef class ProgressThread: """ def __init__( self, - Statistics statistics = None, + Statistics statistics not None, ): - if statistics is None: - statistics = Statistics(enable=False) # Disables statistics. - with nogil: self._handle = make_shared[cpp_ProgressThread](statistics._handle) diff --git a/python/rapidsmpf/rapidsmpf/statistics.pxd b/python/rapidsmpf/rapidsmpf/statistics.pxd index ef6b57b5b..134620f2a 100644 --- a/python/rapidsmpf/rapidsmpf/statistics.pxd +++ b/python/rapidsmpf/rapidsmpf/statistics.pxd @@ -51,7 +51,7 @@ cdef extern from "" nogil: cdef cppclass cpp_MemoryRecorder "rapidsmpf::Statistics::MemoryRecorder": cpp_MemoryRecorder( - cpp_Statistics* stats, + shared_ptr[cpp_Statistics] stats, cpp_RmmResourceAdaptor mr, string name ) except +ex_handler diff --git a/python/rapidsmpf/rapidsmpf/statistics.pyi b/python/rapidsmpf/rapidsmpf/statistics.pyi index cbd0f669d..e569c5824 100644 --- a/python/rapidsmpf/rapidsmpf/statistics.pyi +++ b/python/rapidsmpf/rapidsmpf/statistics.pyi @@ -24,6 +24,8 @@ class Statistics: def __init__(self, *, enable: bool) -> None: ... @classmethod def from_options(cls: type[Self], options: Options) -> Self: ... + @classmethod + def disabled(cls: type[Self]) -> Self: ... @property def enabled(self) -> bool: ... def report( diff --git a/python/rapidsmpf/rapidsmpf/statistics.pyx b/python/rapidsmpf/rapidsmpf/statistics.pyx index ce784fbf9..89ea6a1b0 100644 --- a/python/rapidsmpf/rapidsmpf/statistics.pyx +++ b/python/rapidsmpf/rapidsmpf/statistics.pyx @@ -7,7 +7,7 @@ from cython.operator cimport preincrement from libc.stdint cimport uint8_t from libc.string cimport memcpy from libcpp cimport bool as bool_t -from libcpp.memory cimport make_shared, make_unique, shared_ptr +from libcpp.memory cimport make_unique, shared_ptr from libcpp.optional cimport optional from libcpp.string cimport string from libcpp.vector cimport vector @@ -27,11 +27,22 @@ import os cdef extern from "" nogil: + cdef shared_ptr[cpp_Statistics] cpp_create \ + "rapidsmpf::Statistics::create"( + bool_t enabled, + ) except +ex_handler + + cdef shared_ptr[cpp_Statistics] cpp_disabled \ + "rapidsmpf::Statistics::disabled"() except +ex_handler + cdef shared_ptr[cpp_Statistics] cpp_from_options \ "rapidsmpf::Statistics::from_options"( cpp_Options options, ) except +ex_handler + cdef shared_ptr[cpp_Statistics] cpp_disabled \ + "rapidsmpf::Statistics::disabled"() except +ex_handler + cdef extern from *: """ @@ -148,7 +159,7 @@ cdef class Statistics: """ def __init__(self, *, bool_t enable): with nogil: - self._handle = make_shared[cpp_Statistics](enable) + self._handle = cpp_create(enable) @classmethod def from_options(cls, Options options not None): @@ -169,6 +180,23 @@ cdef class Statistics: ret._handle = cpp_from_options(options._handle) return ret + @classmethod + def disabled(cls): + """ + Get a shared, no-op Statistics instance. + + Returns the same disabled instance on every call. Use this when an API + requires a Statistics object but no recording is desired. + + Returns + ------- + A disabled Statistics instance. + """ + cdef Statistics ret = cls.__new__(cls) + with nogil: + ret._handle = cpp_disabled() + return ret + def __dealloc__(self): with nogil: self._handle.reset() @@ -673,7 +701,7 @@ cdef class MemoryRecorder: cdef cpp_RmmResourceAdaptor* mr = self._mr.get_handle() with nogil: self._handle = make_unique[cpp_MemoryRecorder]( - self._stats._handle.get(), deref(mr), self._name + self._stats._handle, deref(mr), self._name ) def __exit__(self, exc_type, exc_value, traceback): diff --git a/python/rapidsmpf/rapidsmpf/streaming/core/context.pyi b/python/rapidsmpf/rapidsmpf/streaming/core/context.pyi index d8aca830f..8408924d3 100644 --- a/python/rapidsmpf/rapidsmpf/streaming/core/context.pyi +++ b/python/rapidsmpf/rapidsmpf/streaming/core/context.pyi @@ -27,7 +27,11 @@ class Context: ) -> None: ... @classmethod def from_options( - cls: type[Self], logger: Logger, mr: RmmResourceAdaptor, options: Options + cls: type[Self], + logger: Logger, + mr: RmmResourceAdaptor, + options: Options, + statistics: Statistics, ) -> Self: ... def __enter__(self) -> Context: ... def __exit__( diff --git a/python/rapidsmpf/rapidsmpf/streaming/core/context.pyx b/python/rapidsmpf/rapidsmpf/streaming/core/context.pyx index 0d7437eee..5920e0a8d 100644 --- a/python/rapidsmpf/rapidsmpf/streaming/core/context.pyx +++ b/python/rapidsmpf/rapidsmpf/streaming/core/context.pyx @@ -15,6 +15,7 @@ from libcpp.memory cimport make_shared from rmm.pylibrmm.stream cimport Stream from rapidsmpf.rmm_resource_adaptor cimport RmmResourceAdaptor +from rapidsmpf.statistics cimport Statistics from rapidsmpf.streaming.core.channel cimport Channel, cpp_Channel from rapidsmpf.streaming.core.memory_reserve_or_wait cimport \ MemoryReserveOrWait @@ -97,11 +98,12 @@ cdef class Context: cls, Logger logger not None, RmmResourceAdaptor mr not None, - Options options not None + Options options not None, + Statistics statistics not None, ): return cls( logger=logger, - br=BufferResource.from_options(mr, options), + br=BufferResource.from_options(statistics, mr, options), options=options, ) diff --git a/python/rapidsmpf/rapidsmpf/tests/conftest.py b/python/rapidsmpf/rapidsmpf/tests/conftest.py index 479b43440..45b5c05ba 100644 --- a/python/rapidsmpf/rapidsmpf/tests/conftest.py +++ b/python/rapidsmpf/rapidsmpf/tests/conftest.py @@ -12,6 +12,7 @@ from rapidsmpf.communicator import COMMUNICATORS from rapidsmpf.config import Options, get_environment_variables from rapidsmpf.progress_thread import ProgressThread +from rapidsmpf.statistics import Statistics if TYPE_CHECKING: from collections.abc import Generator @@ -99,8 +100,11 @@ def _mpi_comm(*, _mpi_disabled: bool) -> Communicator: from rapidsmpf.communicator.mpi import new_communicator + options = Options(get_environment_variables()) return new_communicator( - MPI.COMM_WORLD, Options(get_environment_variables()), ProgressThread() + MPI.COMM_WORLD, + options, + ProgressThread(Statistics.from_options(options)), ) @@ -116,7 +120,12 @@ def _ucxx_comm() -> Communicator: """ from rapidsmpf.communicator.testing import ucxx_mpi_setup - return ucxx_mpi_setup(None, Options(get_environment_variables()), ProgressThread()) + options = Options(get_environment_variables()) + return ucxx_mpi_setup( + None, + options, + ProgressThread(Statistics.from_options(options)), + ) @pytest.fixture( diff --git a/python/rapidsmpf/rapidsmpf/tests/streaming/conftest.py b/python/rapidsmpf/rapidsmpf/tests/streaming/conftest.py index 3779597e5..24a622b8b 100644 --- a/python/rapidsmpf/rapidsmpf/tests/streaming/conftest.py +++ b/python/rapidsmpf/rapidsmpf/tests/streaming/conftest.py @@ -12,6 +12,7 @@ from rapidsmpf.config import Options, get_environment_variables from rapidsmpf.memory.buffer_resource import BufferResource from rapidsmpf.rmm_resource_adaptor import RmmResourceAdaptor +from rapidsmpf.statistics import Statistics from rapidsmpf.streaming.core.context import Context if TYPE_CHECKING: @@ -27,7 +28,7 @@ def context(comm: Communicator) -> Generator[Context, None, None]: """ options = Options(get_environment_variables()) mr = RmmResourceAdaptor(rmm.mr.CudaMemoryResource()) - br = BufferResource(mr) + br = BufferResource(Statistics.from_options(options), mr) with Context(comm.logger, br, options) as ctx: yield ctx diff --git a/python/rapidsmpf/rapidsmpf/tests/streaming/test_memory_reserve_or_wait.py b/python/rapidsmpf/rapidsmpf/tests/streaming/test_memory_reserve_or_wait.py index beb741203..12ad8ee0e 100644 --- a/python/rapidsmpf/rapidsmpf/tests/streaming/test_memory_reserve_or_wait.py +++ b/python/rapidsmpf/rapidsmpf/tests/streaming/test_memory_reserve_or_wait.py @@ -19,6 +19,7 @@ from rapidsmpf.memory.buffer_resource import BufferResource, LimitAvailableMemory from rapidsmpf.progress_thread import ProgressThread from rapidsmpf.rmm_resource_adaptor import RmmResourceAdaptor +from rapidsmpf.statistics import Statistics from rapidsmpf.streaming.core.actor import define_actor, run_actor_network from rapidsmpf.streaming.core.context import Context from rapidsmpf.streaming.core.memory_reserve_or_wait import ( @@ -37,9 +38,11 @@ def make_context( if overwrite_options is not None: env.update(overwrite_options) options = Options(env) - comm = single_process_comm(options, ProgressThread()) + stats = Statistics.from_options(options) + comm = single_process_comm(options, ProgressThread(stats)) mr = RmmResourceAdaptor(rmm.mr.CudaMemoryResource()) br = BufferResource( + stats, mr, memory_available={MemoryType.DEVICE: LimitAvailableMemory(mr, limit=dev_limit)}, ) diff --git a/python/rapidsmpf/rapidsmpf/tests/test_allgather.py b/python/rapidsmpf/rapidsmpf/tests/test_allgather.py index 7493b8509..8cdf05819 100644 --- a/python/rapidsmpf/rapidsmpf/tests/test_allgather.py +++ b/python/rapidsmpf/rapidsmpf/tests/test_allgather.py @@ -131,15 +131,13 @@ def test_basic_allgather( Each rank inserts n_inserts pieces of data, then all ranks should receive all data from all ranks. """ - br = BufferResource(device_mr) - statistics = Statistics(enable=False) + br = BufferResource(Statistics.disabled(), device_mr) # Create AllGather instance allgather = AllGather( comm=comm, op_id=0, # Use operation ID 0 br=br, - statistics=statistics, ) this_rank = comm.rank diff --git a/python/rapidsmpf/rapidsmpf/tests/test_buffer_resource.py b/python/rapidsmpf/rapidsmpf/tests/test_buffer_resource.py index ca5fa4148..dea63c98e 100644 --- a/python/rapidsmpf/rapidsmpf/tests/test_buffer_resource.py +++ b/python/rapidsmpf/rapidsmpf/tests/test_buffer_resource.py @@ -57,10 +57,18 @@ def test_buffer_resource() -> None: NotImplementedError, match="only accept `LimitAvailableMemory` as memory available functions", ): - BufferResource(mr, memory_available={MemoryType.DEVICE: lambda: 42}) + BufferResource( + Statistics.disabled(), + mr, + memory_available={MemoryType.DEVICE: lambda: 42}, + ) mem_available = LimitAvailableMemory(mr, limit=KiB(100)) - br = BufferResource(mr, memory_available={MemoryType.DEVICE: mem_available}) + br = BufferResource( + Statistics.disabled(), + mr, + memory_available={MemoryType.DEVICE: mem_available}, + ) assert br.memory_reserved(MemoryType.DEVICE) == 0 assert br.memory_reserved(MemoryType.HOST) == 0 @@ -76,7 +84,9 @@ def test_buffer_resource() -> None: def test_memory_reservation(mem_type: MemoryType) -> None: mr = RmmResourceAdaptor(rmm.mr.CudaMemoryResource()) br = BufferResource( - mr, memory_available={mem_type: LimitAvailableMemory(mr, limit=KiB(100))} + Statistics.disabled(), + mr, + memory_available={mem_type: LimitAvailableMemory(mr, limit=KiB(100))}, ) res1, ob = br.reserve(mem_type, KiB(100), allow_overbooking=False) assert res1.br is br @@ -126,12 +136,12 @@ def test_stream_pool() -> None: mr = RmmResourceAdaptor(rmm.mr.CudaMemoryResource()) # Test with default stream pool size (16) - br_default = BufferResource(mr) + br_default = BufferResource(Statistics.disabled(), mr) assert br_default.stream_pool_size() == 16 # Test with custom stream pool custom_pool = rmm.pylibrmm.cuda_stream_pool.CudaStreamPool(pool_size=32) - br_custom = BufferResource(mr, stream_pool=custom_pool) + br_custom = BufferResource(Statistics.disabled(), mr, stream_pool=custom_pool) assert br_custom.stream_pool_size() == 32 @@ -139,13 +149,13 @@ def test_statistics() -> None: """Test that statistics parameter can be configured.""" mr = RmmResourceAdaptor(rmm.mr.CudaMemoryResource()) - # Disabled by default - br_default = BufferResource(mr) + # Disabled + br_default = BufferResource(Statistics.disabled(), mr) assert not br_default.statistics.enabled # Test with enabled statistics (with memory profiling) stats = Statistics(enable=True) - br_with_mr = BufferResource(mr, statistics=stats) + br_with_mr = BufferResource(stats, mr) assert br_with_mr.statistics is stats @@ -153,7 +163,9 @@ def test_statistics() -> None: def test_opaque_memory_usage_basic(mem_type: MemoryType) -> None: mr = RmmResourceAdaptor(rmm.mr.CudaMemoryResource()) br = BufferResource( - mr, memory_available={mem_type: LimitAvailableMemory(mr, limit=KiB(100))} + Statistics.disabled(), + mr, + memory_available={mem_type: LimitAvailableMemory(mr, limit=KiB(100))}, ) res, _ = br.reserve(mem_type, KiB(50), allow_overbooking=False) @@ -170,7 +182,9 @@ def test_opaque_memory_usage_basic(mem_type: MemoryType) -> None: def test_opaque_memory_usage_clears_on_exception(mem_type: MemoryType) -> None: mr = RmmResourceAdaptor(rmm.mr.CudaMemoryResource()) br = BufferResource( - mr, memory_available={mem_type: LimitAvailableMemory(mr, limit=KiB(100))} + Statistics.disabled(), + mr, + memory_available={mem_type: LimitAvailableMemory(mr, limit=KiB(100))}, ) res, _ = br.reserve(mem_type, KiB(60), allow_overbooking=False) @@ -186,7 +200,9 @@ def test_opaque_memory_usage_clears_on_exception(mem_type: MemoryType) -> None: def test_opaque_memory_usage_multiple_reservations(mem_type: MemoryType) -> None: mr = RmmResourceAdaptor(rmm.mr.CudaMemoryResource()) br = BufferResource( - mr, memory_available={mem_type: LimitAvailableMemory(mr, limit=KiB(200))} + Statistics.disabled(), + mr, + memory_available={mem_type: LimitAvailableMemory(mr, limit=KiB(200))}, ) res1, _ = br.reserve(mem_type, KiB(50), allow_overbooking=False) @@ -210,7 +226,9 @@ def test_opaque_memory_usage_multiple_reservations(mem_type: MemoryType) -> None def test_opaque_memory_usage_nested(mem_type: MemoryType) -> None: mr = RmmResourceAdaptor(rmm.mr.CudaMemoryResource()) br = BufferResource( - mr, memory_available={mem_type: LimitAvailableMemory(mr, limit=KiB(200))} + Statistics.disabled(), + mr, + memory_available={mem_type: LimitAvailableMemory(mr, limit=KiB(200))}, ) res1, _ = br.reserve(mem_type, KiB(40), allow_overbooking=False) @@ -234,7 +252,9 @@ def test_opaque_memory_usage_nested(mem_type: MemoryType) -> None: def test_opaque_memory_usage_partial_consumption(mem_type: MemoryType) -> None: mr = RmmResourceAdaptor(rmm.mr.CudaMemoryResource()) br = BufferResource( - mr, memory_available={mem_type: LimitAvailableMemory(mr, limit=KiB(100))} + Statistics.disabled(), + mr, + memory_available={mem_type: LimitAvailableMemory(mr, limit=KiB(100))}, ) res, _ = br.reserve(mem_type, KiB(80), allow_overbooking=False) @@ -253,6 +273,7 @@ def test_opaque_memory_usage_partial_consumption(mem_type: MemoryType) -> None: def test_reserve_or_fail() -> None: mr = RmmResourceAdaptor(rmm.mr.CudaMemoryResource()) br = BufferResource( + Statistics.disabled(), mr, memory_available={MemoryType.DEVICE: LimitAvailableMemory(mr, limit=KiB(100))}, ) diff --git a/python/rapidsmpf/rapidsmpf/tests/test_communicator.py b/python/rapidsmpf/rapidsmpf/tests/test_communicator.py index 62de32f30..6a5d95664 100644 --- a/python/rapidsmpf/rapidsmpf/tests/test_communicator.py +++ b/python/rapidsmpf/rapidsmpf/tests/test_communicator.py @@ -5,6 +5,7 @@ import pytest from rapidsmpf.progress_thread import ProgressThread +from rapidsmpf.statistics import Statistics MPI = pytest.importorskip("mpi4py.MPI") from typing import TYPE_CHECKING # noqa: E402 @@ -28,8 +29,9 @@ def test_log_level(capfd: pytest.CaptureFixture[str], level: LOG_LEVEL) -> None: with pytest.MonkeyPatch.context() as monkeypatch: monkeypatch.setenv("RAPIDSMPF_LOG", level.name) + options = Options(get_environment_variables()) comm = new_communicator( - MPI.COMM_WORLD, Options(get_environment_variables()), ProgressThread() + MPI.COMM_WORLD, options, ProgressThread(Statistics.from_options(options)) ) assert comm.logger.verbosity_level is level comm.logger.print("PRINT") @@ -50,8 +52,9 @@ def test_log_level(capfd: pytest.CaptureFixture[str], level: LOG_LEVEL) -> None: def test_mpi() -> None: + options = Options(get_environment_variables()) comm = new_communicator( - MPI.COMM_WORLD, Options(get_environment_variables()), ProgressThread() + MPI.COMM_WORLD, options, ProgressThread(Statistics.from_options(options)) ) assert comm.nranks == MPI.COMM_WORLD.size assert comm.rank == MPI.COMM_WORLD.rank @@ -59,8 +62,9 @@ def test_mpi() -> None: def test_ucxx() -> None: ucxx_worker = initialize_ucxx() + options = Options(get_environment_variables()) comm = ucxx_mpi_setup( - ucxx_worker, Options(get_environment_variables()), ProgressThread() + ucxx_worker, options, ProgressThread(Statistics.from_options(options)) ) assert comm.nranks == MPI.COMM_WORLD.size @@ -68,14 +72,20 @@ def test_ucxx() -> None: def test_single_process() -> None: - comm = single_process_comm(Options(get_environment_variables()), ProgressThread()) + options = Options(get_environment_variables()) + comm = single_process_comm( + options, ProgressThread(Statistics.from_options(options)) + ) assert comm.nranks == 1 assert comm.rank == 0 def test_logger_survives_communicator(capfd: pytest.CaptureFixture[str]) -> None: def get_logger() -> Logger: - comm = single_process_comm(Options(), ProgressThread()) + options = Options() + comm = single_process_comm( + options, ProgressThread(Statistics.from_options(options)) + ) return comm.logger get_logger().print("Logger should survive") diff --git a/python/rapidsmpf/rapidsmpf/tests/test_config.py b/python/rapidsmpf/rapidsmpf/tests/test_config.py index 07ba33610..aa26e72c9 100644 --- a/python/rapidsmpf/rapidsmpf/tests/test_config.py +++ b/python/rapidsmpf/rapidsmpf/tests/test_config.py @@ -488,7 +488,7 @@ def test_buffer_resource_from_options_creates_instance_with_explicit_options() - } ) mr = RmmResourceAdaptor(rmm.mr.CudaMemoryResource()) - br = BufferResource.from_options(mr, opts) + br = BufferResource.from_options(Statistics.from_options(opts), mr, opts) assert br.statistics.enabled assert br.stream_pool_size() == 8 @@ -499,7 +499,7 @@ def test_buffer_resource_from_options_creates_instance_with_explicit_options() - def test_buffer_resource_from_options_uses_default_when_options_empty() -> None: opts = Options() mr = RmmResourceAdaptor(rmm.mr.CudaMemoryResource()) - br = BufferResource.from_options(mr, opts) + br = BufferResource.from_options(Statistics.from_options(opts), mr, opts) assert not br.statistics.enabled assert br.stream_pool_size() == 16 @@ -512,7 +512,7 @@ def test_buffer_resource_from_options_uses_default_when_options_empty() -> None: def test_buffer_resource_from_options_enables_statistics_when_requested() -> None: opts = Options({"statistics": "ON"}) mr = RmmResourceAdaptor(rmm.mr.CudaMemoryResource()) - br = BufferResource.from_options(mr, opts) + br = BufferResource.from_options(Statistics.from_options(opts), mr, opts) assert br.statistics.enabled @@ -520,7 +520,7 @@ def test_buffer_resource_from_options_enables_statistics_when_requested() -> Non def test_buffer_resource_from_options_accepts_percentage_for_device_limit() -> None: opts = Options({"spill_device_limit": "50%"}) mr = RmmResourceAdaptor(rmm.mr.CudaMemoryResource()) - br = BufferResource.from_options(mr, opts) + br = BufferResource.from_options(Statistics.from_options(opts), mr, opts) # Verify device memory limit is set (exact value depends on system memory) mem_avail = br.memory_available(MemoryType.DEVICE) @@ -533,7 +533,7 @@ def test_buffer_resource_from_options_enables_pinned_memory_when_supported() -> opts = Options({"pinned_memory": "True"}) mr = RmmResourceAdaptor(rmm.mr.CudaMemoryResource()) - br = BufferResource.from_options(mr, opts) + br = BufferResource.from_options(Statistics.from_options(opts), mr, opts) assert br.pinned_mr is not None @@ -546,9 +546,10 @@ def test_context_from_options_creates_instance_with_explicit_options() -> None: } ) mr = RmmResourceAdaptor(rmm.mr.CudaMemoryResource()) - comm = single_comm.new_communicator(opts, ProgressThread()) + stats = Statistics.from_options(opts) + comm = single_comm.new_communicator(opts, ProgressThread(stats)) - with Context.from_options(comm.logger, mr, opts) as ctx: + with Context.from_options(comm.logger, mr, opts, stats) as ctx: assert ctx is not None assert ctx.statistics().enabled assert ctx.stream_pool_size() == 8 @@ -558,9 +559,10 @@ def test_context_from_options_creates_instance_with_explicit_options() -> None: def test_context_from_options_uses_default_when_options_empty() -> None: opts = Options() mr = RmmResourceAdaptor(rmm.mr.CudaMemoryResource()) - comm = single_comm.new_communicator(opts, ProgressThread()) + stats = Statistics.from_options(opts) + comm = single_comm.new_communicator(opts, ProgressThread(stats)) - with Context.from_options(comm.logger, mr, opts) as ctx: + with Context.from_options(comm.logger, mr, opts, stats) as ctx: assert ctx is not None assert not ctx.statistics().enabled assert ctx.stream_pool_size() == 16 # Default @@ -570,9 +572,10 @@ def test_context_from_options_uses_default_when_options_empty() -> None: def test_context_from_options_enables_statistics_when_requested() -> None: opts = Options({"statistics": "on"}) mr = RmmResourceAdaptor(rmm.mr.CudaMemoryResource()) - comm = single_comm.new_communicator(opts, ProgressThread()) + stats = Statistics.from_options(opts) + comm = single_comm.new_communicator(opts, ProgressThread(stats)) - with Context.from_options(comm.logger, mr, opts) as ctx: + with Context.from_options(comm.logger, mr, opts, stats) as ctx: assert ctx is not None assert ctx.statistics().enabled @@ -580,9 +583,10 @@ def test_context_from_options_enables_statistics_when_requested() -> None: def test_context_from_options_creates_buffer_resource() -> None: opts = Options() mr = RmmResourceAdaptor(rmm.mr.CudaMemoryResource()) - comm = single_comm.new_communicator(opts, ProgressThread()) + stats = Statistics.from_options(opts) + comm = single_comm.new_communicator(opts, ProgressThread(stats)) - with Context.from_options(comm.logger, mr, opts) as ctx: + with Context.from_options(comm.logger, mr, opts, stats) as ctx: assert ctx is not None assert ctx.br() is not None @@ -590,9 +594,10 @@ def test_context_from_options_creates_buffer_resource() -> None: def test_context_from_options_can_create_channel() -> None: opts = Options() mr = RmmResourceAdaptor(rmm.mr.CudaMemoryResource()) - comm = single_comm.new_communicator(opts, ProgressThread()) + stats = Statistics.from_options(opts) + comm = single_comm.new_communicator(opts, ProgressThread(stats)) - with Context.from_options(comm.logger, mr, opts) as ctx: + with Context.from_options(comm.logger, mr, opts, stats) as ctx: assert ctx is not None channel: Channel[Any] = ctx.create_channel() assert channel is not None diff --git a/python/rapidsmpf/rapidsmpf/tests/test_examples.py b/python/rapidsmpf/rapidsmpf/tests/test_examples.py index 0e1e812d2..21bb75aad 100644 --- a/python/rapidsmpf/rapidsmpf/tests/test_examples.py +++ b/python/rapidsmpf/rapidsmpf/tests/test_examples.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. # SPDX-License-Identifier: Apache-2.0 from __future__ import annotations @@ -11,6 +11,7 @@ import cudf from rapidsmpf.memory.buffer_resource import BufferResource +from rapidsmpf.statistics import Statistics from rapidsmpf.testing import assert_eq MPI = pytest.importorskip("mpi4py.MPI") @@ -64,7 +65,7 @@ def test_bulk_shuffle( output_dir = str(mpi_tmpdir.join("output")) # Use a default buffer resource. - br = BufferResource(device_mr) + br = BufferResource(Statistics.disabled(), device_mr) # Perform a the shuffle bulk_mpi_shuffle( diff --git a/python/rapidsmpf/rapidsmpf/tests/test_packed_data_host_bytes.py b/python/rapidsmpf/rapidsmpf/tests/test_packed_data_host_bytes.py index 374b5f727..bb8ba1537 100644 --- a/python/rapidsmpf/rapidsmpf/tests/test_packed_data_host_bytes.py +++ b/python/rapidsmpf/rapidsmpf/tests/test_packed_data_host_bytes.py @@ -12,13 +12,14 @@ from rapidsmpf.memory.buffer_resource import BufferResource from rapidsmpf.memory.packed_data import PackedData +from rapidsmpf.statistics import Statistics def test_packed_data_host_bytes_roundtrip( device_mr: rmm.mr.CudaMemoryResource, ) -> None: """Test creating PackedData from host bytes and extracting them back.""" - br = BufferResource(device_mr) + br = BufferResource(Statistics.disabled(), device_mr) # Test with raw bytes original = b"hello world" @@ -38,7 +39,7 @@ def test_packed_data_empty_bytes( device_mr: rmm.mr.CudaMemoryResource, ) -> None: """Test creating PackedData from empty bytes.""" - br = BufferResource(device_mr) + br = BufferResource(Statistics.disabled(), device_mr) original = b"" packed = PackedData.from_host_bytes(original, br) result = packed.to_host_bytes() diff --git a/python/rapidsmpf/rapidsmpf/tests/test_partition.py b/python/rapidsmpf/rapidsmpf/tests/test_partition.py index 3ce74e595..f0cf83256 100644 --- a/python/rapidsmpf/rapidsmpf/tests/test_partition.py +++ b/python/rapidsmpf/rapidsmpf/tests/test_partition.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. # SPDX-License-Identifier: Apache-2.0 from __future__ import annotations @@ -18,6 +18,7 @@ unspill_partitions, ) from rapidsmpf.memory.buffer_resource import BufferResource +from rapidsmpf.statistics import Statistics from rapidsmpf.testing import assert_eq from rapidsmpf.utils.cudf import ( cudf_to_pylibcudf_table, @@ -33,7 +34,7 @@ def test_partition_and_pack_unpack( device_mr: rmm.mr.CudaMemoryResource, df: dict[str, list[int]], num_partitions: int ) -> None: - br = BufferResource(device_mr) + br = BufferResource(Statistics.disabled(), device_mr) expect = cudf.DataFrame(df) partitions = partition_and_pack( cudf_to_pylibcudf_table(expect), @@ -65,7 +66,7 @@ def test_partition_and_pack_unpack( def test_split_and_pack_unpack( device_mr: rmm.mr.CudaMemoryResource, df: dict[str, list[int]], num_partitions: int ) -> None: - br = BufferResource(device_mr) + br = BufferResource(Statistics.disabled(), device_mr) expect = cudf.DataFrame(df) splits = np.linspace(0, len(expect), num_partitions, endpoint=False)[1:].astype(int) partitions = split_and_pack( @@ -90,7 +91,7 @@ def test_split_and_pack_unpack( def test_split_and_pack_unpack_out_of_range( device_mr: rmm.mr.CudaMemoryResource, df: dict[str, list[int]], num_partitions: int ) -> None: - br = BufferResource(device_mr) + br = BufferResource(Statistics.disabled(), device_mr) expect = cudf.DataFrame({"0": [], "1": []}) with pytest.raises(IndexError): split_and_pack( @@ -106,7 +107,7 @@ def test_split_and_pack_unpack_out_of_range( def test_spill_unspill_roundtrip( device_mr: rmm.mr.CudaMemoryResource, df: dict[str, list[int]], num_partitions: int ) -> None: - br = BufferResource(device_mr) + br = BufferResource(Statistics.disabled(), device_mr) expect = cudf.DataFrame(df) partitions = partition_and_pack( cudf_to_pylibcudf_table(expect), diff --git a/python/rapidsmpf/rapidsmpf/tests/test_shuffler.py b/python/rapidsmpf/rapidsmpf/tests/test_shuffler.py index cda3a72ed..9c01d0fe3 100644 --- a/python/rapidsmpf/rapidsmpf/tests/test_shuffler.py +++ b/python/rapidsmpf/rapidsmpf/tests/test_shuffler.py @@ -20,6 +20,7 @@ from rapidsmpf.shuffler import ( Shuffler, ) +from rapidsmpf.statistics import Statistics from rapidsmpf.testing import assert_eq from rapidsmpf.utils.cudf import ( cudf_to_pylibcudf_table, @@ -38,7 +39,7 @@ def test_shuffler_single_nonempty_partition( device_mr: rmm.mr.CudaMemoryResource, total_num_partitions: int, ) -> None: - br = BufferResource(device_mr) + br = BufferResource(Statistics.disabled(), device_mr) shuffler = Shuffler( comm, @@ -94,7 +95,7 @@ def test_shuffler_uniform( batch_size: int | None, total_num_partitions: int, ) -> None: - br = BufferResource(device_mr) + br = BufferResource(Statistics.disabled(), device_mr) # Every rank creates the full input dataframe and all the expected partitions # (also partitions this rank might not get after the shuffle). diff --git a/python/rapidsmpf/rapidsmpf/tests/test_sparse_alltoall.py b/python/rapidsmpf/rapidsmpf/tests/test_sparse_alltoall.py index 3990751df..3a55a9719 100644 --- a/python/rapidsmpf/rapidsmpf/tests/test_sparse_alltoall.py +++ b/python/rapidsmpf/rapidsmpf/tests/test_sparse_alltoall.py @@ -15,6 +15,7 @@ from rapidsmpf.integrations.cudf.partition import unpack_and_concat from rapidsmpf.memory.buffer_resource import BufferResource from rapidsmpf.memory.packed_data import PackedData +from rapidsmpf.statistics import Statistics from rapidsmpf.testing import assert_eq if TYPE_CHECKING: @@ -71,7 +72,7 @@ def test_basic( Each rank sends `n_inserts` messages to each adjacent rank and then checks that extraction by source returns the expected number of payloads. """ - br = BufferResource(device_mr) + br = BufferResource(Statistics.disabled(), device_mr) srcs, dsts = expected_peers(comm) sparse_alltoall = SparseAlltoall( comm=comm, @@ -123,7 +124,7 @@ def test_non_participating_ranks( if comm.nranks < 2: pytest.skip("Need at least two ranks") - br = BufferResource(device_mr) + br = BufferResource(Statistics.disabled(), device_mr) if comm.rank == 0: srcs = [] @@ -173,7 +174,7 @@ def test_invalid_peers_raise( ) -> None: rank = comm.rank size = comm.nranks - br = BufferResource(device_mr) + br = BufferResource(Statistics.disabled(), device_mr) for src, dst in [([], [rank]), ([rank], []), ([], [size]), ([size], [])]: with pytest.raises( IndexError, match=r"SparseAlltoall invalid (source|destination) rank" diff --git a/python/rapidsmpf/rapidsmpf/tests/test_spill_manager.py b/python/rapidsmpf/rapidsmpf/tests/test_spill_manager.py index ecca8be3d..ee491d0b0 100644 --- a/python/rapidsmpf/rapidsmpf/tests/test_spill_manager.py +++ b/python/rapidsmpf/rapidsmpf/tests/test_spill_manager.py @@ -11,6 +11,7 @@ from rapidsmpf.memory.buffer import MemoryType from rapidsmpf.memory.buffer_resource import BufferResource, LimitAvailableMemory from rapidsmpf.rmm_resource_adaptor import RmmResourceAdaptor +from rapidsmpf.statistics import Statistics if TYPE_CHECKING: import rmm.mr @@ -38,7 +39,7 @@ def test_error_handling( def spill(amount: int) -> int: raise error - br = BufferResource(device_mr, periodic_spill_check=None) + br = BufferResource(Statistics.disabled(), device_mr, periodic_spill_check=None) br.spill_manager.add_spill_function(spill, 0) with pytest.raises(error): br.spill_manager.spill(10) @@ -47,7 +48,7 @@ def spill(amount: int) -> int: def test_spill_function( device_mr: rmm.mr.CudaMemoryResource, ) -> None: - br = BufferResource(device_mr, periodic_spill_check=None) + br = BufferResource(Statistics.disabled(), device_mr, periodic_spill_check=None) track_spilled = [0] def spill_unlimited(amount: int) -> int: @@ -91,7 +92,9 @@ def spill_limited(amount: int) -> int: def test_spill_function_outlive_buffer_resource( device_mr: rmm.mr.CudaMemoryResource, ) -> None: - spill_manager = BufferResource(device_mr, periodic_spill_check=None).spill_manager + spill_manager = BufferResource( + Statistics.disabled(), device_mr, periodic_spill_check=None + ).spill_manager with pytest.raises(ValueError): spill_manager.add_spill_function(lambda x: x, 0) with pytest.raises(ValueError): @@ -108,6 +111,7 @@ def test_periodic_spill_check( mr = RmmResourceAdaptor(device_mr) mem_available = LimitAvailableMemory(mr, limit=-100) br = BufferResource( + Statistics.disabled(), mr, memory_available={MemoryType.DEVICE: mem_available}, periodic_spill_check=1e-3, @@ -132,6 +136,7 @@ def test_spill_to_make_headroom( mr = RmmResourceAdaptor(device_mr) mem_available = LimitAvailableMemory(mr, limit=100) br = BufferResource( + Statistics.disabled(), mr, memory_available={MemoryType.DEVICE: mem_available}, periodic_spill_check=None, @@ -158,6 +163,7 @@ def test_reserve_device_memory_and_spill( mr = RmmResourceAdaptor(device_mr) mem_available = LimitAvailableMemory(mr, limit=100) br = BufferResource( + Statistics.disabled(), mr, memory_available={MemoryType.DEVICE: mem_available}, periodic_spill_check=None,