diff --git a/cpp/benchmarks/bench_shuffle.cpp b/cpp/benchmarks/bench_shuffle.cpp index 1744f488a..1f4719818 100644 --- a/cpp/benchmarks/bench_shuffle.cpp +++ b/cpp/benchmarks/bench_shuffle.cpp @@ -550,7 +550,7 @@ int main(int argc, char** argv) { }; } - auto stats = std::make_shared(/* enable = */ true); + auto stats = rapidsmpf::Statistics::create(); // We're only going to measure the last run, so disable initially. stats->disable(); diff --git a/cpp/benchmarks/streaming/bench_streaming_shuffle.cpp b/cpp/benchmarks/streaming/bench_streaming_shuffle.cpp index 63a6cd164..6002b5da5 100644 --- a/cpp/benchmarks/streaming/bench_streaming_shuffle.cpp +++ b/cpp/benchmarks/streaming/bench_streaming_shuffle.cpp @@ -363,7 +363,7 @@ int main(int argc, char** argv) { }; } - auto stats = std::make_shared(/* enable = */ true); + auto stats = rapidsmpf::Statistics::create(); auto pinned_mr = args.pinned_mem_disable ? rapidsmpf::PinnedMemoryResource::Disabled diff --git a/cpp/benchmarks/streaming/ndsh/utils.cpp b/cpp/benchmarks/streaming/ndsh/utils.cpp index f1cb64b84..0ece65a08 100644 --- a/cpp/benchmarks/streaming/ndsh/utils.cpp +++ b/cpp/benchmarks/streaming/ndsh/utils.cpp @@ -143,7 +143,7 @@ create_context(ProgramOptions& arguments, RmmResourceAdaptor&& mr) { memory_available[MemoryType::DEVICE] = LimitAvailableMemory{mr, static_cast(limit_size)}; } - auto statistics = std::make_shared(/* enable = */ true); + auto statistics = Statistics::create(); RAPIDSMPF_EXPECTS( arguments.no_pinned_host_memory || is_pinned_memory_resources_supported(), diff --git a/cpp/examples/example_shuffle.cpp b/cpp/examples/example_shuffle.cpp index a6b95dd89..7ba73f0cf 100644 --- a/cpp/examples/example_shuffle.cpp +++ b/cpp/examples/example_shuffle.cpp @@ -27,7 +27,7 @@ int main(int argc, char** argv) { rapidsmpf::config::Options options{rapidsmpf::config::get_environment_variables()}; // Create a statistics instance for the shuffler that tracks useful information. - auto stats = std::make_shared(); + auto stats = rapidsmpf::Statistics::create(); // The communicator has a progress thread where the shuffler event loop executes. A // single progress thread may be used by multiple shufflers simultaneously. diff --git a/cpp/include/rapidsmpf/coll/allgather.hpp b/cpp/include/rapidsmpf/coll/allgather.hpp index d0c3684c5..70f135f93 100644 --- a/cpp/include/rapidsmpf/coll/allgather.hpp +++ b/cpp/include/rapidsmpf/coll/allgather.hpp @@ -23,7 +23,6 @@ #include #include #include -#include /** * @namespace rapidsmpf::coll @@ -101,8 +100,6 @@ class AllGather { * @param comm The communicator for communication. * @param op_id Unique operation identifier for this allgather. * @param br Buffer resource for memory allocation. - * @param statistics Statistics collection instance (disabled by - * default). * @param finished_callback Optional callback run when partitions are locally * finished. The callback is guaranteed to be called by the progress thread exactly * once when the allgather is locally ready. @@ -118,7 +115,6 @@ class AllGather { std::shared_ptr comm, OpID op_id, BufferResource* br, - std::shared_ptr statistics = Statistics::disabled(), std::function&& finished_callback = nullptr ); @@ -195,7 +191,6 @@ class AllGather { std::shared_ptr comm_; ///< Communicator BufferResource* br_; ///< Buffer resource for memory allocation - std::shared_ptr statistics_; ///< Statistics collection instance std::function finished_callback_{ nullptr }; ///< Optional callback to run when allgather is finished and ready for extraction. diff --git a/cpp/include/rapidsmpf/memory/buffer_resource.hpp b/cpp/include/rapidsmpf/memory/buffer_resource.hpp index fe0216cfa..d2a52b5e1 100644 --- a/cpp/include/rapidsmpf/memory/buffer_resource.hpp +++ b/cpp/include/rapidsmpf/memory/buffer_resource.hpp @@ -105,12 +105,16 @@ class BufferResource { * * @param mr The RMM resource adaptor. * @param options Configuration options. + * @param statistics The statistics instance to use (disabled by default). The caller + * is responsible for creating and owning this object. * * @return A shared pointer to a BufferResource instance configured according to the * options. */ static std::shared_ptr from_options( - RmmResourceAdaptor mr, config::Options options + RmmResourceAdaptor mr, + config::Options options, + std::shared_ptr statistics = Statistics::disabled() ); ~BufferResource() noexcept = default; @@ -400,7 +404,7 @@ class BufferResource { * * @return Shared pointer the Statistics instance. */ - std::shared_ptr statistics(); + std::shared_ptr const& statistics() const noexcept; private: std::mutex mutex_; @@ -415,6 +419,8 @@ class BufferResource { std::shared_ptr statistics_; }; +static_assert(StatisticsProvider); + /** * @brief A functor for querying the remaining available memory within a defined limit * from an RMM statistics resource. diff --git a/cpp/include/rapidsmpf/progress_thread.hpp b/cpp/include/rapidsmpf/progress_thread.hpp index 3a566024a..0e8822025 100644 --- a/cpp/include/rapidsmpf/progress_thread.hpp +++ b/cpp/include/rapidsmpf/progress_thread.hpp @@ -181,7 +181,7 @@ class ProgressThread { /** * @brief @return The statistics instance on this progress thread. */ - std::shared_ptr statistics() const noexcept; + std::shared_ptr const& statistics() const noexcept; private: /** @@ -205,4 +205,6 @@ class ProgressThread { detail::PausableThreadLoop thread_; }; +static_assert(StatisticsProvider); + } // namespace rapidsmpf diff --git a/cpp/include/rapidsmpf/statistics.hpp b/cpp/include/rapidsmpf/statistics.hpp index 79485fc80..e6da53fe0 100644 --- a/cpp/include/rapidsmpf/statistics.hpp +++ b/cpp/include/rapidsmpf/statistics.hpp @@ -4,6 +4,7 @@ */ #pragma once #include +#include #include #include #include @@ -20,6 +21,7 @@ #include #include +#include #include #include #include @@ -69,7 +71,7 @@ class StreamOrderedTiming; * std::cout << stats.report(); * @endcode */ -class Statistics { +class Statistics : public std::enable_shared_from_this { public: /** * @brief Identifies a predefined formatter used by `report()`. @@ -107,12 +109,13 @@ class Statistics { }; /** - * @brief Constructs a Statistics object without memory profiling. + * @brief Creates a Statistics instance. * * @param enabled If true, enables tracking of statistics. If false, all operations * are no-ops. + * @return A shared pointer to a new Statistics instance. */ - Statistics(bool enabled = true); + static std::shared_ptr create(bool enabled = true); /** * @brief Construct from configuration options. @@ -125,8 +128,8 @@ class Statistics { ~Statistics() noexcept; - // `Statistics` is owned exclusively through `std::shared_ptr` (see `disabled()` and - // `from_options()`). + // `Statistics` is owned exclusively through `std::shared_ptr`. Use `create()`, + // `disabled()`, or `from_options()` to construct instances. Statistics(Statistics const&) = delete; Statistics& operator=(Statistics const&) = delete; Statistics(Statistics&&) = delete; @@ -566,11 +569,14 @@ class Statistics { /** * @brief Constructs an active MemoryRecorder. * - * @param stats Pointer to Statistics object that will store the result. + * @param stats Shared pointer to the Statistics object that will store + * the result. Must not be null. * @param mr The RMM resource adaptor providing scoped memory statistics. * @param name Name of the scope. */ - MemoryRecorder(Statistics* stats, RmmResourceAdaptor mr, std::string name); + MemoryRecorder( + std::shared_ptr stats, RmmResourceAdaptor mr, std::string name + ); /** * @brief Destructor. @@ -586,9 +592,10 @@ class Statistics { MemoryRecorder& operator=(MemoryRecorder&&) = delete; private: - Statistics* stats_{ - nullptr - }; // TODO: make this shared_ptr using make_shared_from_this + // Holds the owning Statistics so the recorder can safely outlive any + // raw reference the caller might have. `nullptr` indicates a no-op + // recorder. + std::shared_ptr stats_; std::optional mr_; // optional because RmmResourceAdaptor is not default constructible std::string name_; @@ -624,6 +631,8 @@ class Statistics { Formatter formatter; }; + explicit Statistics(bool enabled); + mutable std::mutex mutex_; std::atomic enabled_; std::map stats_; @@ -631,6 +640,28 @@ class Statistics { std::unordered_map memory_records_; }; +/** + * @brief Satisfied by any type that exposes a `statistics()` method returning + * `std::shared_ptr const&`. + * + * Classes satisfying this concept are *statistics providers* — secondary + * classes that receive a provider as a constructor argument should derive their + * `Statistics` instance by calling `.statistics()` on it rather than accepting + * a separate `std::shared_ptr` argument. + * + * Returning by const-reference avoids the per-call atomic refcount bump that a + * by-value `std::shared_ptr` return would incur on hot paths. + * + * Each provider asserts this concept via `static_assert` in its own header. + * Current providers: `BufferResource`, `ProgressThread`, `streaming::Context`. + */ +template +concept StatisticsProvider = requires(T const& t) { + { + t.statistics() + } noexcept -> std::same_as const&>; +}; + /** * @brief Macro for automatic memory profiling of a code scope. * @@ -644,13 +675,15 @@ class Statistics { * * Example usage: * @code - * void foo(Statistics& stats, RmmResourceAdaptor& mr) { + * void foo(std::shared_ptr stats, RmmResourceAdaptor& mr) { * RAPIDSMPF_MEMORY_PROFILE(stats, mr); * RAPIDSMPF_MEMORY_PROFILE(stats, mr, "custom_name"); * } * @endcode * - * The first argument is a reference or pointer to a Statistics object. + * The first argument is a non-null `std::shared_ptr`. Pass + * `Statistics::disabled()` to disable recording (`create_memory_recorder` returns + * a no-op recorder when the statistics instance is disabled). * The second argument is the RMM resource adaptor (or `std::nullopt` for no-op). * The third argument (optional) is a custom function name string to use instead of * __func__. @@ -669,18 +702,15 @@ class Statistics { RAPIDSMPF_MEMORY_PROFILE_3(stats, mr, __func__) // Version with custom function name -#define RAPIDSMPF_MEMORY_PROFILE_3(stats, mr, funcname) \ - auto&& RAPIDSMPF_CONCAT(_rapidsmpf_stats_, __LINE__) = (stats); \ - auto const RAPIDSMPF_CONCAT(_rapidsmpf_memory_recorder_, __LINE__) = \ - (rapidsmpf::detail::to_pointer(RAPIDSMPF_CONCAT(_rapidsmpf_stats_, __LINE__))) \ - ? rapidsmpf::detail::to_pointer( \ - RAPIDSMPF_CONCAT(_rapidsmpf_stats_, __LINE__) \ - ) \ - -> create_memory_recorder( \ - (mr), \ - std::string(__FILE__) + ":" + RAPIDSMPF_STRINGIFY(__LINE__) + "(" \ - + std::string(funcname) + ")" \ - ) \ - : rapidsmpf::Statistics::MemoryRecorder {} +#define RAPIDSMPF_MEMORY_PROFILE_3(stats, mr, funcname) \ + RAPIDSMPF_EXPECTS( \ + (stats) != nullptr, "RAPIDSMPF_MEMORY_PROFILE: stats must not be null" \ + ); \ + auto const RAPIDSMPF_CONCAT(_rapidsmpf_memory_recorder_, __LINE__) = \ + (stats) -> create_memory_recorder( \ + (mr), \ + std::string(__FILE__) + ":" + RAPIDSMPF_STRINGIFY(__LINE__) + "(" \ + + std::string(funcname) + ")" \ + ) } // namespace rapidsmpf diff --git a/cpp/include/rapidsmpf/streaming/core/context.hpp b/cpp/include/rapidsmpf/streaming/core/context.hpp index 6c9dc4091..5d6afe5d3 100644 --- a/cpp/include/rapidsmpf/streaming/core/context.hpp +++ b/cpp/include/rapidsmpf/streaming/core/context.hpp @@ -83,6 +83,8 @@ class Context { * @param logger The logger to use. * @param options Configuration options used to initialize the Context and its * components. + * @param statistics The statistics instance to use (disabled by default). The caller + * is responsible for creating and owning this object. * @return A fully initialized Context. * * @throws std::invalid_argument If an option value is invalid. @@ -102,7 +104,8 @@ class Context { static std::shared_ptr from_options( RmmResourceAdaptor mr, std::shared_ptr logger, - config::Options options + config::Options options, + std::shared_ptr statistics = Statistics::disabled() ); // No copy constructor and assignment operator. @@ -179,7 +182,7 @@ class Context { * * @return Shared pointer to the statistics instance. */ - [[nodiscard]] std::shared_ptr statistics() const noexcept; + [[nodiscard]] std::shared_ptr const& statistics() const noexcept; /** * @brief Create a new channel associated with this context. @@ -231,4 +234,6 @@ class Context { SpillManager::SpillFunctionID spill_function_id_{}; }; +static_assert(StatisticsProvider); + } // namespace rapidsmpf::streaming diff --git a/cpp/src/coll/allgather.cpp b/cpp/src/coll/allgather.cpp index 31b3b2fdb..058c9104e 100644 --- a/cpp/src/coll/allgather.cpp +++ b/cpp/src/coll/allgather.cpp @@ -122,12 +122,10 @@ AllGather::AllGather( std::shared_ptr comm, OpID op_id, BufferResource* br, - std::shared_ptr statistics, std::function&& finished_callback ) : comm_{std::move(comm)}, br_{br}, - statistics_{std::move(statistics)}, finished_callback_{std::move(finished_callback)}, finish_counter_{comm_->nranks()}, op_id_{op_id}, diff --git a/cpp/src/memory/buffer_resource.cpp b/cpp/src/memory/buffer_resource.cpp index 34d0f0500..a5f7ffb44 100644 --- a/cpp/src/memory/buffer_resource.cpp +++ b/cpp/src/memory/buffer_resource.cpp @@ -58,7 +58,7 @@ BufferResource::BufferResource( } std::shared_ptr BufferResource::from_options( - RmmResourceAdaptor mr, config::Options options + RmmResourceAdaptor mr, config::Options options, std::shared_ptr statistics ) { auto pinned_mr = PinnedMemoryResource::from_options(options); auto mem_available = memory_available_from_options(mr, options); @@ -67,7 +67,6 @@ std::shared_ptr BufferResource::from_options( mem_available[MemoryType::PINNED_HOST] = pinned_mr->get_memory_available_cb(); } - auto statistics = Statistics::from_options(options); return std::make_shared( std::move(mr), std::move(pinned_mr), @@ -267,7 +266,7 @@ SpillManager& BufferResource::spill_manager() { return spill_manager_; } -std::shared_ptr BufferResource::statistics() { +std::shared_ptr const& BufferResource::statistics() const noexcept { return statistics_; } diff --git a/cpp/src/progress_thread.cpp b/cpp/src/progress_thread.cpp index 42846b421..d4bbcbc02 100644 --- a/cpp/src/progress_thread.cpp +++ b/cpp/src/progress_thread.cpp @@ -115,7 +115,7 @@ bool ProgressThread::is_running() const { return active_; } -std::shared_ptr ProgressThread::statistics() const noexcept { +std::shared_ptr const& ProgressThread::statistics() const noexcept { return statistics_; } diff --git a/cpp/src/statistics.cpp b/cpp/src/statistics.cpp index df49a33a4..42156dc84 100644 --- a/cpp/src/statistics.cpp +++ b/cpp/src/statistics.cpp @@ -162,19 +162,24 @@ Statistics::~Statistics() noexcept { StreamOrderedTiming::cancel_inflight_timings(this); } -// TODO: remove this constructor and add a factory method, because -// make_shared(false) != Statistics::disabled() Statistics::Statistics(bool enabled) : enabled_{enabled} {} +std::shared_ptr Statistics::create(bool enabled) { + if (!enabled) { + return disabled(); + } + return std::shared_ptr(new Statistics(true)); +} + std::shared_ptr Statistics::from_options(config::Options options) { bool const statistics = options.get("statistics", [](auto const& s) { - return parse_string(s.empty() ? "False" : s); + return s.empty() ? false : parse_string(s); }); - return statistics ? std::make_shared(statistics) : Statistics::disabled(); + return statistics ? Statistics::create() : Statistics::disabled(); } std::shared_ptr Statistics::disabled() { - static std::shared_ptr ret = std::make_shared(false); + static std::shared_ptr ret{new Statistics(false)}; return ret; } @@ -251,15 +256,15 @@ void Statistics::clear() { } Statistics::MemoryRecorder::MemoryRecorder( - Statistics* stats, RmmResourceAdaptor mr, std::string name + std::shared_ptr stats, RmmResourceAdaptor mr, std::string name ) - : stats_{stats}, mr_{std::move(mr)}, name_{std::move(name)} { + : stats_{std::move(stats)}, mr_{std::move(mr)}, name_{std::move(name)} { RAPIDSMPF_EXPECTS(stats_ != nullptr, "the statistics cannot be null"); mr_->begin_scoped_memory_record(); } Statistics::MemoryRecorder::~MemoryRecorder() { - if (stats_ == nullptr) { + if (stats_ == nullptr || !stats_->enabled()) { return; } auto const scope = mr_->end_scoped_memory_record(); @@ -275,10 +280,10 @@ Statistics::MemoryRecorder Statistics::create_memory_recorder( std::optional mr, std::string name ) { auto* rma = get_optional_resource_as(mr); - if (!rma) { + if (!enabled() || !rma) { return MemoryRecorder{}; } - return MemoryRecorder{this, *rma, std::move(name)}; + return MemoryRecorder{shared_from_this(), *rma, std::move(name)}; } std::unordered_map const& @@ -485,7 +490,7 @@ void Statistics::write_json(std::filesystem::path const& filepath) const { std::shared_ptr Statistics::copy() const { std::lock_guard lock(mutex_); - auto ret = std::make_shared(enabled_.load(std::memory_order_acquire)); + auto ret = Statistics::create(enabled()); ret->stats_ = stats_; ret->report_entries_ = report_entries_; return ret; @@ -579,7 +584,7 @@ std::shared_ptr Statistics::deserialize(std::span(enabled != 0); + auto ret = Statistics::create(enabled != 0); std::uint64_t num_stats{}; data = read_pod(data, num_stats); @@ -656,7 +661,7 @@ std::shared_ptr Statistics::merge( bool const any_enabled = std::ranges::any_of(snapshots, [](auto const& s) { return s.enabled; }); - auto ret = std::make_shared(any_enabled); + auto ret = Statistics::create(any_enabled); for (auto const& snap : snapshots) { for (auto const& [name, stat] : snap.stats) { diff --git a/cpp/src/streaming/coll/allgather.cpp b/cpp/src/streaming/coll/allgather.cpp index 6d58bf40b..033e2ec0c 100644 --- a/cpp/src/streaming/coll/allgather.cpp +++ b/cpp/src/streaming/coll/allgather.cpp @@ -15,14 +15,12 @@ AllGather::AllGather( std::shared_ptr ctx, std::shared_ptr comm, OpID op_id ) : ctx_{std::move(ctx)}, - gatherer_{coll::AllGather( - std::move(comm), op_id, ctx_->br().get(), ctx_->statistics(), [this]() { - // Schedule waiters to resume on the executor. - // This doesn't resume the frame immediately so we don't have to track - // completion of this callback with a task_group. - event_.set(ctx_->executor()->get()); - } - )} {} + gatherer_{coll::AllGather(std::move(comm), op_id, ctx_->br().get(), [this]() { + // Schedule waiters to resume on the executor. + // This doesn't resume the frame immediately so we don't have to track + // completion of this callback with a task_group. + event_.set(ctx_->executor()->get()); + })} {} AllGather::~AllGather() noexcept { RAPIDSMPF_EXPECTS_FATAL( diff --git a/cpp/src/streaming/core/context.cpp b/cpp/src/streaming/core/context.cpp index ad8911eff..3fdd34262 100644 --- a/cpp/src/streaming/core/context.cpp +++ b/cpp/src/streaming/core/context.cpp @@ -106,10 +106,13 @@ Context::Context( std::shared_ptr Context::from_options( RmmResourceAdaptor mr, std::shared_ptr logger, - config::Options options + config::Options options, + std::shared_ptr statistics ) { return std::make_shared( - options, std::move(logger), BufferResource::from_options(std::move(mr), options) + options, + std::move(logger), + BufferResource::from_options(std::move(mr), options, std::move(statistics)) ); } @@ -154,7 +157,7 @@ std::shared_ptr const& Context::memory( return memory_[static_cast(mem_type)]; } -std::shared_ptr Context::statistics() const noexcept { +std::shared_ptr const& Context::statistics() const noexcept { return br_->statistics(); } diff --git a/cpp/tests/test_buffer_resource.cpp b/cpp/tests/test_buffer_resource.cpp index 2545497b8..d983f5ead 100644 --- a/cpp/tests/test_buffer_resource.cpp +++ b/cpp/tests/test_buffer_resource.cpp @@ -289,7 +289,7 @@ INSTANTIATE_TEST_SUITE_P( TEST(BufferResource, AllocStatistics) { rmm::mr::cuda_memory_resource mr_cuda; RmmResourceAdaptor mr{mr_cuda}; - auto stats = std::make_shared(/* enable = */ true); + auto stats = Statistics::create(); auto pinned_mr = PinnedMemoryResource::make_if_available(); BufferResource br{ mr, diff --git a/cpp/tests/test_config.cpp b/cpp/tests/test_config.cpp index 61d948172..817f05b60 100644 --- a/cpp/tests/test_config.cpp +++ b/cpp/tests/test_config.cpp @@ -653,7 +653,7 @@ TEST(OptionsTest, BufferResourceFromOptionsCreatesInstanceWithExplicitOptions) { rmm::mr::cuda_memory_resource cuda_mr; RmmResourceAdaptor mr{cuda_mr}; - auto br = BufferResource::from_options(mr, opts); + auto br = BufferResource::from_options(mr, opts, Statistics::from_options(opts)); EXPECT_TRUE(br->statistics()->enabled()); EXPECT_EQ(br->stream_pool().get_pool_size(), 8); @@ -681,7 +681,7 @@ TEST(OptionsTest, BufferResourceFromOptionsEnablesStatisticsWhenRequested) { rmm::mr::cuda_memory_resource cuda_mr; RmmResourceAdaptor mr{cuda_mr}; - auto br = BufferResource::from_options(mr, opts); + auto br = BufferResource::from_options(mr, opts, Statistics::from_options(opts)); EXPECT_TRUE(br->statistics()->enabled()); } @@ -731,7 +731,9 @@ TEST(OptionsTest, ContextFromOptionsCreatesInstanceWithExplicitOptions) { RmmResourceAdaptor mr{cuda_mr}; auto comm = std::make_shared(opts, std::make_shared()); - auto ctx = streaming::Context::from_options(mr, comm->logger(), opts); + auto ctx = streaming::Context::from_options( + mr, comm->logger(), opts, Statistics::from_options(opts) + ); ASSERT_NE(ctx, nullptr); EXPECT_TRUE(ctx->statistics()->enabled()); @@ -760,7 +762,9 @@ TEST(OptionsTest, ContextFromOptionsEnablesStatisticsWhenRequested) { RmmResourceAdaptor mr{cuda_mr}; auto comm = std::make_shared(opts, std::make_shared()); - auto ctx = streaming::Context::from_options(mr, comm->logger(), opts); + auto ctx = streaming::Context::from_options( + mr, comm->logger(), opts, Statistics::from_options(opts) + ); ASSERT_NE(ctx, nullptr); EXPECT_TRUE(ctx->statistics()->enabled()); diff --git a/cpp/tests/test_metadata_payload_exchange.cpp b/cpp/tests/test_metadata_payload_exchange.cpp index d6b166d0a..69f16cf31 100644 --- a/cpp/tests/test_metadata_payload_exchange.cpp +++ b/cpp/tests/test_metadata_payload_exchange.cpp @@ -29,7 +29,7 @@ class MetadataPayloadExchangeTest : public ::testing::Test { mr = std::make_unique(); br = std::make_unique(*mr); stream = rmm::cuda_stream_default; - statistics = std::make_shared(); + statistics = Statistics::create(); auto allocate_fn = [this](std::size_t size) { return allocate_receive_buffer(size); diff --git a/cpp/tests/test_progress_thread.cpp b/cpp/tests/test_progress_thread.cpp index 38d8e8e9c..5b7efe4ca 100644 --- a/cpp/tests/test_progress_thread.cpp +++ b/cpp/tests/test_progress_thread.cpp @@ -41,7 +41,7 @@ TEST_P(ProgressThreadEvents, events) { std::size_t const num_functions = std::get<1>(GetParam()); bool const enable_statistics = std::get<2>(GetParam()); - auto statistics = std::make_shared(enable_statistics); + auto statistics = rapidsmpf::Statistics::create(enable_statistics); std::vector> progress_threads; std::vector>> test_functions(num_threads); diff --git a/cpp/tests/test_statistics.cpp b/cpp/tests/test_statistics.cpp index 2d317fed2..d4211598e 100644 --- a/cpp/tests/test_statistics.cpp +++ b/cpp/tests/test_statistics.cpp @@ -37,43 +37,43 @@ class StatisticsTest : public ::testing::Test { }; TEST_F(StatisticsTest, Disabled) { - rapidsmpf::Statistics stats(false); - EXPECT_FALSE(stats.enabled()); + auto stats = rapidsmpf::Statistics::create(false); + EXPECT_FALSE(stats->enabled()); // Disabed statistics is a no-op. - stats.add_bytes_stat("name", 1); - EXPECT_THROW(stats.get_stat("name"), std::out_of_range); - EXPECT_THAT(stats.report(), ::testing::HasSubstr("Statistics: disabled")); + stats->add_bytes_stat("name", 1); + EXPECT_THROW(stats->get_stat("name"), std::out_of_range); + EXPECT_THAT(stats->report(), ::testing::HasSubstr("Statistics: disabled")); } TEST_F(StatisticsTest, Communication) { - rapidsmpf::Statistics stats; - EXPECT_TRUE(stats.enabled()); + auto stats = rapidsmpf::Statistics::create(); + EXPECT_TRUE(stats->enabled()); - EXPECT_THROW(stats.get_stat("unknown-name"), std::out_of_range); + EXPECT_THROW(stats->get_stat("unknown-name"), std::out_of_range); // Default-formatted stat (no report entry needed). - stats.add_stat("plain-stat", 10); - stats.add_stat("plain-stat", 1); - EXPECT_EQ(stats.get_stat("plain-stat").count(), 2); - EXPECT_EQ(stats.get_stat("plain-stat").value(), 11); - EXPECT_THAT(stats.report(), ::testing::HasSubstr("plain-stat")); - EXPECT_THAT(stats.report(), ::testing::HasSubstr("11 (count 2)")); + stats->add_stat("plain-stat", 10); + stats->add_stat("plain-stat", 1); + EXPECT_EQ(stats->get_stat("plain-stat").count(), 2); + EXPECT_EQ(stats->get_stat("plain-stat").value(), 11); + EXPECT_THAT(stats->report(), ::testing::HasSubstr("plain-stat")); + EXPECT_THAT(stats->report(), ::testing::HasSubstr("11 (count 2)")); - stats.add_bytes_stat("byte-statistics", 20); - EXPECT_THAT(stats.report(), ::testing::HasSubstr("byte-statistics")); - EXPECT_THAT(stats.report(), ::testing::HasSubstr("20 B")); + stats->add_bytes_stat("byte-statistics", 20); + EXPECT_THAT(stats->report(), ::testing::HasSubstr("byte-statistics")); + EXPECT_THAT(stats->report(), ::testing::HasSubstr("20 B")); } TEST_F(StatisticsTest, AddReportEntryArityMismatchThrowsOnRender) { - rapidsmpf::Statistics stats; + auto stats = rapidsmpf::Statistics::create(); // MemoryThroughput expects 3 stats; passing one is accepted at registration but // fails when report() tries to render the entry. - stats.add_report_entry( + stats->add_report_entry( "bad", {"only-one"}, rapidsmpf::Statistics::Formatter::MemoryThroughput ); - stats.add_stat("only-one", 1.0); - EXPECT_THROW(std::ignore = stats.report(), std::out_of_range); + stats->add_stat("only-one", 1.0); + EXPECT_THROW(std::ignore = stats->report(), std::out_of_range); } TEST_F(StatisticsTest, StatMax) { @@ -91,73 +91,73 @@ TEST_F(StatisticsTest, StatMax) { } TEST_F(StatisticsTest, AddReportEntryFirstWins) { - rapidsmpf::Statistics stats; + auto stats = rapidsmpf::Statistics::create(); // The first add_report_entry wins: a Default (count-aware) entry stays // in place even after add_bytes_stat tries to upgrade it to Bytes. - stats.add_report_entry( + stats->add_report_entry( "my-bytes", {"my-bytes"}, rapidsmpf::Statistics::Formatter::Default ); - stats.add_bytes_stat("my-bytes", 1024); - EXPECT_THAT(stats.report(), ::testing::HasSubstr("my-bytes")); - EXPECT_THAT(stats.report(), ::testing::HasSubstr("1024")); - EXPECT_THAT(stats.report(), ::testing::Not(::testing::HasSubstr("KiB"))); + stats->add_bytes_stat("my-bytes", 1024); + EXPECT_THAT(stats->report(), ::testing::HasSubstr("my-bytes")); + EXPECT_THAT(stats->report(), ::testing::HasSubstr("1024")); + EXPECT_THAT(stats->report(), ::testing::Not(::testing::HasSubstr("KiB"))); } TEST_F(StatisticsTest, MultiStatReportEntry) { - rapidsmpf::Statistics stats; + auto stats = rapidsmpf::Statistics::create(); // Build a MemoryThroughput-style 3-stat report entry. - stats.add_report_entry( + stats->add_report_entry( "copy-summary", {"copy-summary-bytes", "copy-summary-time", "copy-summary-stream-delay"}, rapidsmpf::Statistics::Formatter::MemoryThroughput ); - stats.add_stat("copy-summary-bytes", 1024 * 1024); - stats.add_stat("copy-summary-time", 0.001); - stats.add_stat("copy-summary-stream-delay", 0.0001); - EXPECT_THAT(stats.report(), ::testing::HasSubstr("copy-summary")); - EXPECT_THAT(stats.report(), ::testing::HasSubstr("1 MiB")); + stats->add_stat("copy-summary-bytes", 1024 * 1024); + stats->add_stat("copy-summary-time", 0.001); + stats->add_stat("copy-summary-stream-delay", 0.0001); + EXPECT_THAT(stats->report(), ::testing::HasSubstr("copy-summary")); + EXPECT_THAT(stats->report(), ::testing::HasSubstr("1 MiB")); // Component stats are consumed by the report entry and don't emit // their own lines. EXPECT_THAT( - stats.report(), ::testing::Not(::testing::HasSubstr("copy-summary-bytes")) + stats->report(), ::testing::Not(::testing::HasSubstr("copy-summary-bytes")) ); } TEST_F(StatisticsTest, ReportNoDataCollected) { - rapidsmpf::Statistics stats; - stats.add_report_entry( + auto stats = rapidsmpf::Statistics::create(); + stats->add_report_entry( "spill-summary", {"spill-bytes", "spill-time", "spill-delay"}, rapidsmpf::Statistics::Formatter::MemoryThroughput ); // No stats recorded — entry should still appear with "No data collected". - EXPECT_THAT(stats.report(), ::testing::HasSubstr("spill-summary")); - EXPECT_THAT(stats.report(), ::testing::HasSubstr("No data collected")); + EXPECT_THAT(stats->report(), ::testing::HasSubstr("spill-summary")); + EXPECT_THAT(stats->report(), ::testing::HasSubstr("No data collected")); // Adding only one of the three required stats still yields "No data collected". - stats.add_stat("spill-bytes", 1024 * 1024); - EXPECT_THAT(stats.report(), ::testing::HasSubstr("No data collected")); - EXPECT_THAT(stats.report(), ::testing::HasSubstr("spill-bytes")); // uncovered + stats->add_stat("spill-bytes", 1024 * 1024); + EXPECT_THAT(stats->report(), ::testing::HasSubstr("No data collected")); + EXPECT_THAT(stats->report(), ::testing::HasSubstr("spill-bytes")); // uncovered } TEST_F(StatisticsTest, ReportSorting) { - rapidsmpf::Statistics stats; + auto stats = rapidsmpf::Statistics::create(); - stats.add_stat("banana", 2); - stats.add_report_entry( + stats->add_stat("banana", 2); + stats->add_report_entry( "banana", {"banana"}, rapidsmpf::Statistics::Formatter::Default ); - stats.add_stat("cherry", 3); - stats.add_report_entry( + stats->add_stat("cherry", 3); + stats->add_report_entry( "cherry", {"cherry"}, rapidsmpf::Statistics::Formatter::Default ); // Uncovered raw stats for "apple" and "date". - stats.add_stat("apple", 1); - stats.add_stat("date", 4); + stats->add_stat("apple", 1); + stats->add_stat("date", 4); - auto const r = stats.report(); + auto const r = stats->report(); auto const pos_apple = r.find("apple"); auto const pos_banana = r.find("banana"); auto const pos_cherry = r.find("cherry"); @@ -176,12 +176,12 @@ TEST_F(StatisticsTest, ReportSorting) { TEST_F(StatisticsTest, MemoryProfiler) { rapidsmpf::RmmResourceAdaptor mr{cudf::get_current_device_resource_ref()}; auto pinned_mr = rapidsmpf::PinnedMemoryResource::make_if_available(); - rapidsmpf::Statistics stats; + auto stats = rapidsmpf::Statistics::create(); auto stream = cudf::get_default_stream(); // Outer scope { - auto outer = stats.create_memory_recorder(mr, "outer"); + auto outer = stats->create_memory_recorder(mr, "outer"); void* ptr1 = mr.allocate_sync(1_MiB); // +1 MiB void* ptr2 = mr.allocate_sync(1_MiB); // +2 MiB mr.deallocate_sync(ptr1, 1_MiB); @@ -189,7 +189,7 @@ TEST_F(StatisticsTest, MemoryProfiler) { // Nested scope { - auto inner = stats.create_memory_recorder(mr, "inner"); + auto inner = stats->create_memory_recorder(mr, "inner"); void* ptr3 = mr.allocate_sync(1_MiB); // +1 MiB mr.deallocate_sync(ptr3, 1_MiB); } @@ -205,7 +205,7 @@ TEST_F(StatisticsTest, MemoryProfiler) { } stream.synchronize(); } - auto const& records = stats.get_memory_records(); + auto const& records = stats->get_memory_records(); // Verify outer EXPECT_EQ(records.at("outer").num_calls, 1); @@ -221,7 +221,7 @@ TEST_F(StatisticsTest, MemoryProfiler) { // We can call the same name multiple times. { - auto outer = stats.create_memory_recorder(mr, "outer"); + auto outer = stats->create_memory_recorder(mr, "outer"); mr.deallocate_sync(mr.allocate_sync(1_MiB), 1_MiB); } EXPECT_EQ(records.at("outer").num_calls, 2); @@ -229,7 +229,7 @@ TEST_F(StatisticsTest, MemoryProfiler) { EXPECT_EQ(records.at("outer").scoped.peak(), 2_MiB); EXPECT_EQ(records.at("outer").scoped.total(), 4_MiB); - auto const report = stats.report({.mr = mr, .pinned_mr = pinned_mr}); + auto const report = stats->report({.mr = mr, .pinned_mr = pinned_mr}); // Split the report on newlines and find the "main" record line. std::string main_line, pinned_line; @@ -270,11 +270,14 @@ TEST_F(StatisticsTest, MemoryProfiler) { TEST_F(StatisticsTest, MemoryProfilerDisabled) { rapidsmpf::RmmResourceAdaptor mr{cudf::get_current_device_resource_ref()}; - rapidsmpf::Statistics stats(false); - + auto stats = rapidsmpf::Statistics::create(false); + { + auto const& records = stats->get_memory_records(); + EXPECT_TRUE(records.empty()); + } // Outer scope — pass nullopt so the recorder is a no-op. { - auto outer = stats.create_memory_recorder(std::nullopt, "outer"); + auto outer = stats->create_memory_recorder(std::nullopt, "outer"); void* ptr1 = mr.allocate_sync(1_MiB); // +1 MiB void* ptr2 = mr.allocate_sync(1_MiB); // +2 MiB mr.deallocate_sync(ptr1, 1_MiB); @@ -282,23 +285,23 @@ TEST_F(StatisticsTest, MemoryProfilerDisabled) { // Nested scope { - auto inner = stats.create_memory_recorder(std::nullopt, "inner"); + auto inner = stats->create_memory_recorder(std::nullopt, "inner"); void* ptr3 = mr.allocate_sync(1_MiB); // +1 MiB mr.deallocate_sync(ptr3, 1_MiB); } } - auto const& records = stats.get_memory_records(); + auto const& records = stats->get_memory_records(); EXPECT_TRUE(records.empty()); } TEST_F(StatisticsTest, MemoryProfilerMacro) { rapidsmpf::RmmResourceAdaptor mr{cudf::get_current_device_resource_ref()}; - rapidsmpf::Statistics stats; + auto stats = rapidsmpf::Statistics::create(); { RAPIDSMPF_MEMORY_PROFILE(stats, mr); mr.deallocate_sync(mr.allocate_sync(1_MiB), 1_MiB); } - auto const& records = stats.get_memory_records(); + auto const& records = stats->get_memory_records(); ASSERT_EQ(records.size(), 1); auto const& entry = *records.begin(); EXPECT_TRUE(entry.first.find("test_statistics.cpp") != std::string::npos); @@ -308,23 +311,23 @@ TEST_F(StatisticsTest, MemoryProfilerMacro) { TEST_F(StatisticsTest, MemoryProfilerMacroDisabled) { rapidsmpf::RmmResourceAdaptor mr{cudf::get_current_device_resource_ref()}; - rapidsmpf::Statistics stats(false); + auto stats = rapidsmpf::Statistics::create(false); { RAPIDSMPF_MEMORY_PROFILE(stats, std::nullopt); mr.deallocate_sync(mr.allocate_sync(1_MiB), 1_MiB); } - auto const& records = stats.get_memory_records(); + auto const& records = stats->get_memory_records(); EXPECT_TRUE(records.empty()); } TEST_F(StatisticsTest, JsonStream) { - rapidsmpf::Statistics stats; - stats.add_stat("foo", 10.0); - stats.add_stat("foo", 5.0); // count=2, value=15, max=10 - stats.add_bytes_stat("bar", 1024); + auto stats = rapidsmpf::Statistics::create(); + stats->add_stat("foo", 10.0); + stats->add_stat("foo", 5.0); // count=2, value=15, max=10 + stats->add_bytes_stat("bar", 1024); std::ostringstream ss; - stats.write_json(ss); + stats->write_json(ss); auto const& s = ss.str(); EXPECT_THAT(s, ::testing::HasSubstr(R"("foo")")); @@ -339,31 +342,31 @@ TEST_F(StatisticsTest, JsonStream) { } TEST_F(StatisticsTest, InvalidStatNames) { - rapidsmpf::Statistics stats; - stats.add_stat("has\"quote", 1.0); - stats.add_stat("has\\backslash", 2.0); + auto stats = rapidsmpf::Statistics::create(); + stats->add_stat("has\"quote", 1.0); + stats->add_stat("has\\backslash", 2.0); std::ostringstream ss; - EXPECT_THROW(stats.write_json(ss), std::invalid_argument); + EXPECT_THROW(stats->write_json(ss), std::invalid_argument); } TEST_F(StatisticsTest, InvalidMemoryRecordNames) { rapidsmpf::RmmResourceAdaptor mr{cudf::get_current_device_resource_ref()}; - rapidsmpf::Statistics stats; - std::ignore = stats.create_memory_recorder(mr, "bad\"name"); + auto stats = rapidsmpf::Statistics::create(); + std::ignore = stats->create_memory_recorder(mr, "bad\"name"); std::ostringstream ss; - EXPECT_THROW(stats.write_json(ss), std::invalid_argument); + EXPECT_THROW(stats->write_json(ss), std::invalid_argument); } TEST_F(StatisticsTest, JsonMemoryRecords) { rapidsmpf::RmmResourceAdaptor mr{cudf::get_current_device_resource_ref()}; - rapidsmpf::Statistics stats; + auto stats = rapidsmpf::Statistics::create(); { - auto rec = stats.create_memory_recorder(mr, "alloc"); + auto rec = stats->create_memory_recorder(mr, "alloc"); mr.deallocate_sync(mr.allocate_sync(1_MiB), 1_MiB); } std::ostringstream ss; - stats.write_json(ss); + stats->write_json(ss); auto const& s = ss.str(); EXPECT_THAT(s, ::testing::HasSubstr("memory_records")); @@ -375,14 +378,14 @@ TEST_F(StatisticsTest, JsonMemoryRecords) { } TEST_F(StatisticsTest, JsonReport) { - rapidsmpf::Statistics stats; - stats.add_stat("foo", 10.0); - stats.add_stat("foo", 5.0); // count=2, value=15, max=10 - stats.add_bytes_stat("bar", 1024); + auto stats = rapidsmpf::Statistics::create(); + stats->add_stat("foo", 10.0); + stats->add_stat("foo", 5.0); // count=2, value=15, max=10 + stats->add_bytes_stat("bar", 1024); TempDir tmp_dir; auto const path = tmp_dir.path() / "stats.json"; - stats.write_json(path); + stats->write_json(path); std::ifstream f(path); ASSERT_TRUE(f.is_open()); @@ -391,7 +394,7 @@ TEST_F(StatisticsTest, JsonReport) { ); std::ostringstream ss; - stats.write_json(ss); + stats->write_json(ss); EXPECT_EQ(file_contents, ss.str()); } @@ -403,23 +406,23 @@ TEST_F(StatisticsTest, StatConstructor) { } TEST_F(StatisticsTest, SerializeRoundTrip) { - rapidsmpf::Statistics stats; - stats.add_stat("alpha", 10.0); - stats.add_stat("alpha", 5.0); // count=2, value=15, max=10 - stats.add_stat("beta", 3.0); + auto stats = rapidsmpf::Statistics::create(); + stats->add_stat("alpha", 10.0); + stats->add_stat("alpha", 5.0); // count=2, value=15, max=10 + stats->add_stat("beta", 3.0); - auto const bytes = stats.serialize(); + auto const bytes = stats->serialize(); auto deserialized = rapidsmpf::Statistics::deserialize(bytes); EXPECT_TRUE(deserialized->enabled()); - EXPECT_EQ(deserialized->get_stat("alpha"), stats.get_stat("alpha")); - EXPECT_EQ(deserialized->get_stat("beta"), stats.get_stat("beta")); + EXPECT_EQ(deserialized->get_stat("alpha"), stats->get_stat("alpha")); + EXPECT_EQ(deserialized->get_stat("beta"), stats->get_stat("beta")); EXPECT_EQ(deserialized->list_stat_names().size(), 2); } TEST_F(StatisticsTest, SerializeEmpty) { - rapidsmpf::Statistics stats; - auto const bytes = stats.serialize(); + auto stats = rapidsmpf::Statistics::create(); + auto const bytes = stats->serialize(); auto deserialized = rapidsmpf::Statistics::deserialize(bytes); EXPECT_TRUE(deserialized->enabled()); @@ -467,38 +470,38 @@ TEST_F(StatisticsTest, DeserializeRejectsOutOfRangeFormatter) { TEST_F(StatisticsTest, SerializeRoundTripPreservesEnabledFlag) { // A disabled Statistics should come back disabled after a round-trip. - rapidsmpf::Statistics disabled(false); - auto const bytes = disabled.serialize(); + auto disabled = rapidsmpf::Statistics::create(false); + auto const bytes = disabled->serialize(); auto deserialized = rapidsmpf::Statistics::deserialize(bytes); EXPECT_FALSE(deserialized->enabled()); // And an enabled one comes back enabled. - rapidsmpf::Statistics enabled(true); - auto const bytes2 = enabled.serialize(); + auto enabled = rapidsmpf::Statistics::create(true); + auto const bytes2 = enabled->serialize(); auto deserialized2 = rapidsmpf::Statistics::deserialize(bytes2); EXPECT_TRUE(deserialized2->enabled()); } TEST_F(StatisticsTest, SerializeRoundTripWithReportEntries) { - rapidsmpf::Statistics stats; - stats.add_bytes_stat("alpha", 2048); // Bytes entry - stats.add_duration_stat("beta", rapidsmpf::Duration{0.005}); // Duration entry - stats.add_report_entry( + auto stats = rapidsmpf::Statistics::create(); + stats->add_bytes_stat("alpha", 2048); // Bytes entry + stats->add_duration_stat("beta", rapidsmpf::Duration{0.005}); // Duration entry + stats->add_report_entry( "copy", {"copy-bytes", "copy-time", "copy-delay"}, rapidsmpf::Statistics::Formatter::MemoryThroughput ); - stats.add_stat("copy-bytes", 1024.0 * 1024.0); - stats.add_stat("copy-time", 0.002); - stats.add_stat("copy-delay", 0.00001); + stats->add_stat("copy-bytes", 1024.0 * 1024.0); + stats->add_stat("copy-time", 0.002); + stats->add_stat("copy-delay", 0.00001); - auto const bytes = stats.serialize(); + auto const bytes = stats->serialize(); auto deserialized = rapidsmpf::Statistics::deserialize(bytes); // Stats round-trip numerically. - EXPECT_EQ(deserialized->get_stat("alpha"), stats.get_stat("alpha")); - EXPECT_EQ(deserialized->get_stat("beta"), stats.get_stat("beta")); - EXPECT_EQ(deserialized->get_stat("copy-bytes"), stats.get_stat("copy-bytes")); + EXPECT_EQ(deserialized->get_stat("alpha"), stats->get_stat("alpha")); + EXPECT_EQ(deserialized->get_stat("beta"), stats->get_stat("beta")); + EXPECT_EQ(deserialized->get_stat("copy-bytes"), stats->get_stat("copy-bytes")); // And crucially, formatter metadata is preserved: the deserialized // report renders formatted values, not raw numbers. @@ -508,22 +511,22 @@ TEST_F(StatisticsTest, SerializeRoundTripWithReportEntries) { } TEST_F(StatisticsTest, Copy) { - rapidsmpf::Statistics stats; - stats.add_bytes_stat("x", 2048); // registers a Bytes report entry + auto stats = rapidsmpf::Statistics::create(); + stats->add_bytes_stat("x", 2048); // registers a Bytes report entry - auto copied = stats.copy(); + auto copied = stats->copy(); EXPECT_TRUE(copied->enabled()); - EXPECT_EQ(copied->get_stat("x"), stats.get_stat("x")); + EXPECT_EQ(copied->get_stat("x"), stats->get_stat("x")); // The Bytes formatter carried over the copy. EXPECT_THAT(copied->report(), ::testing::HasSubstr("2 KiB")); } TEST_F(StatisticsTest, MergeOverlapping) { - auto a = std::make_shared(); + auto a = rapidsmpf::Statistics::create(); a->add_stat("x", 10.0); a->add_stat("x", 3.0); // count=2, value=13, max=10 - auto b = std::make_shared(); + auto b = rapidsmpf::Statistics::create(); b->add_stat("x", 7.0); // count=1, value=7, max=7 std::vector> inputs{a, b}; @@ -535,10 +538,10 @@ TEST_F(StatisticsTest, MergeOverlapping) { } TEST_F(StatisticsTest, MergeDisjoint) { - auto a = std::make_shared(); + auto a = rapidsmpf::Statistics::create(); a->add_stat("x", 1.0); - auto b = std::make_shared(); + auto b = rapidsmpf::Statistics::create(); b->add_stat("y", 2.0); std::vector> inputs{a, b}; @@ -549,10 +552,10 @@ TEST_F(StatisticsTest, MergeDisjoint) { } TEST_F(StatisticsTest, MergeWithEmpty) { - auto a = std::make_shared(); + auto a = rapidsmpf::Statistics::create(); a->add_stat("x", 5.0); - auto empty = std::make_shared(); + auto empty = rapidsmpf::Statistics::create(); std::vector> inputs{a, empty}; auto merged = rapidsmpf::Statistics::merge(std::span{inputs}); @@ -566,10 +569,10 @@ TEST_F(StatisticsTest, MergeWithEmpty) { } TEST_F(StatisticsTest, MergeCombinesReportEntries) { - auto a = std::make_shared(); + auto a = rapidsmpf::Statistics::create(); a->add_bytes_stat("x", 10); // Bytes report entry - auto b = std::make_shared(); + auto b = rapidsmpf::Statistics::create(); b->add_stat("x", 5.0); // no formatter on this side // Merging a (has Bytes entry) with b: result uses a's entry. @@ -584,13 +587,13 @@ TEST_F(StatisticsTest, MergeCombinesReportEntries) { } TEST_F(StatisticsTest, MergeMultiple) { - auto a = std::make_shared(); + auto a = rapidsmpf::Statistics::create(); a->add_stat("x", 1.0); - auto b = std::make_shared(); + auto b = rapidsmpf::Statistics::create(); b->add_stat("x", 2.0); - auto c = std::make_shared(); + auto c = rapidsmpf::Statistics::create(); c->add_stat("y", 10.0); std::vector> inputs{a, b, c}; @@ -610,7 +613,7 @@ TEST_F(StatisticsTest, MergeRejectsEmptySpan) { } TEST_F(StatisticsTest, MergeRejectsNullElement) { - auto a = std::make_shared(); + auto a = rapidsmpf::Statistics::create(); std::vector> inputs{a, nullptr}; EXPECT_THROW( std::ignore = rapidsmpf::Statistics::merge(std::span{inputs}), @@ -619,11 +622,11 @@ TEST_F(StatisticsTest, MergeRejectsNullElement) { } TEST_F(StatisticsTest, MergeRejectsConflictingFormatter) { - auto a = std::make_shared(); + auto a = rapidsmpf::Statistics::create(); a->add_report_entry("x", {"x"}, rapidsmpf::Statistics::Formatter::Bytes); a->add_stat("x", 1.0); - auto b = std::make_shared(); + auto b = rapidsmpf::Statistics::create(); b->add_report_entry("x", {"x"}, rapidsmpf::Statistics::Formatter::Duration); b->add_stat("x", 2.0); @@ -637,10 +640,10 @@ TEST_F(StatisticsTest, MergeRejectsConflictingFormatter) { TEST_F(StatisticsTest, MergeIdenticalReportEntries) { // Two inputs with the same report entry (same formatter + stat_names) // must merge cleanly — no conflict. - auto a = std::make_shared(); + auto a = rapidsmpf::Statistics::create(); a->add_bytes_stat("x", 10); - auto b = std::make_shared(); + auto b = rapidsmpf::Statistics::create(); b->add_bytes_stat("x", 20); std::vector> inputs{a, b}; @@ -649,8 +652,8 @@ TEST_F(StatisticsTest, MergeIdenticalReportEntries) { } TEST_F(StatisticsTest, MergeEnabledFlagPropagates) { - auto enabled = std::make_shared(true); - auto disabled = std::make_shared(false); + auto enabled = rapidsmpf::Statistics::create(true); + auto disabled = rapidsmpf::Statistics::create(false); // disabled + disabled -> disabled. std::vector> both_off{disabled, disabled}; @@ -662,12 +665,12 @@ TEST_F(StatisticsTest, MergeEnabledFlagPropagates) { } TEST_F(StatisticsTest, MergeRejectsConflictingStatNames) { - auto a = std::make_shared(); + auto a = rapidsmpf::Statistics::create(); a->add_report_entry( "copy", {"b1", "t1", "d1"}, rapidsmpf::Statistics::Formatter::MemoryThroughput ); - auto b = std::make_shared(); + auto b = rapidsmpf::Statistics::create(); b->add_report_entry( "copy", {"b2", "t2", "d2"}, rapidsmpf::Statistics::Formatter::MemoryThroughput ); diff --git a/cpp/tests/test_stream_ordered_timing.cpp b/cpp/tests/test_stream_ordered_timing.cpp index 9ad786ef1..7661b166d 100644 --- a/cpp/tests/test_stream_ordered_timing.cpp +++ b/cpp/tests/test_stream_ordered_timing.cpp @@ -32,7 +32,7 @@ TEST(StreamOrderedTiming, Disabled) { TEST(StreamOrderedTiming, RecordsDuration) { // A positive duration is recorded under the given name after stream sync. rmm::cuda_stream stream; - auto stats = std::make_shared(); + auto stats = Statistics::create(); { StreamOrderedTiming timing{stream.view(), stats}; // Do a small GPU operation so there is measurable work between start and stop. @@ -49,7 +49,7 @@ TEST(StreamOrderedTiming, RecordsDuration) { TEST(StreamOrderedTiming, MultipleTimings) { // Repeated timings for the same name accumulate: count equals the number of timings. rmm::cuda_stream stream; - auto stats = std::make_shared(); + auto stats = Statistics::create(); constexpr int n = 3; for (int i = 0; i < n; ++i) { StreamOrderedTiming timing{stream.view(), stats}; @@ -67,8 +67,8 @@ TEST(StreamOrderedTiming, Cancel) { // cancelled timings are not recorded, while timings for other Statistics objects // are still recorded. Calling cancel when no timings are pending is a no-op. rmm::cuda_stream stream; - auto stats_a = std::make_shared(); - auto stats_b = std::make_shared(); + auto stats_a = Statistics::create(); + auto stats_b = Statistics::create(); // Block stream: no callbacks will execute until gate.open() is called. struct Gate { @@ -109,7 +109,7 @@ TEST(StreamOrderedTiming, Cancel) { TEST(StreamOrderedTiming, StreamDelay) { rmm::cuda_stream stream; - auto stats = std::make_shared(); + auto stats = Statistics::create(); // With a stream_delay_name, the delay stat is recorded and is >= 0. { @@ -122,7 +122,7 @@ TEST(StreamOrderedTiming, StreamDelay) { EXPECT_GE(delay.value(), 0.0); // With std::nullopt (the default), no stream-delay stat is written. - auto stats2 = std::make_shared(); + auto stats2 = Statistics::create(); { StreamOrderedTiming timing{stream.view(), stats2}; timing.stop_and_record("my-timing"); @@ -136,7 +136,7 @@ TEST(StreamOrderedTiming, StatisticsDestroyedBeforeStreamSync) { // the callbacks detect the expired weak_ptr and return without crashing. rmm::cuda_stream stream; { - auto stats = std::make_shared(); + auto stats = Statistics::create(); StreamOrderedTiming timing{stream.view(), stats}; timing.stop_and_record("my-timing"); // Both `timing` and `stats` go out of scope here. The shared_ptr count diff --git a/python/rapidsmpf/rapidsmpf/coll/allgather.pxd b/python/rapidsmpf/rapidsmpf/coll/allgather.pxd index 09ba8bf2c..c3f9fd533 100644 --- a/python/rapidsmpf/rapidsmpf/coll/allgather.pxd +++ b/python/rapidsmpf/rapidsmpf/coll/allgather.pxd @@ -14,7 +14,6 @@ from rapidsmpf.memory.buffer_resource cimport (BufferResource, cpp_BufferResource) from rapidsmpf.memory.packed_data cimport cpp_PackedData from rapidsmpf.progress_thread cimport cpp_ProgressThread -from rapidsmpf.statistics cimport cpp_Statistics cdef extern from "" namespace \ @@ -31,7 +30,6 @@ cdef extern from "" nogil: shared_ptr[cpp_Communicator] comm, int32_t op_id, cpp_BufferResource *br, - shared_ptr[cpp_Statistics] statistics ) except +ex_handler void insert(uint64_t sequence_number, cpp_PackedData packed_data) \ except +ex_handler diff --git a/python/rapidsmpf/rapidsmpf/coll/allgather.pyi b/python/rapidsmpf/rapidsmpf/coll/allgather.pyi index e89d6858a..108f2d037 100644 --- a/python/rapidsmpf/rapidsmpf/coll/allgather.pyi +++ b/python/rapidsmpf/rapidsmpf/coll/allgather.pyi @@ -5,7 +5,6 @@ from __future__ import annotations from rapidsmpf.communicator.communicator import Communicator from rapidsmpf.memory.buffer_resource import BufferResource from rapidsmpf.memory.packed_data import PackedData -from rapidsmpf.statistics import Statistics class AllGather: def __init__( @@ -13,7 +12,6 @@ class AllGather: comm: Communicator, op_id: int, br: BufferResource, - statistics: Statistics | None = None, ) -> None: ... @property def comm(self) -> Communicator: ... diff --git a/python/rapidsmpf/rapidsmpf/coll/allgather.pyx b/python/rapidsmpf/rapidsmpf/coll/allgather.pyx index 01d345897..3c84634be 100644 --- a/python/rapidsmpf/rapidsmpf/coll/allgather.pyx +++ b/python/rapidsmpf/rapidsmpf/coll/allgather.pyx @@ -14,7 +14,6 @@ from rapidsmpf.memory.buffer_resource cimport (BufferResource, cpp_BufferResource) from rapidsmpf.memory.packed_data cimport (PackedData, cpp_PackedData, packed_data_vector_to_list) -from rapidsmpf.statistics cimport Statistics cdef class AllGather: @@ -37,8 +36,6 @@ cdef class AllGather: between 0 and 2^20 - 1. br Buffer resource for memory allocation. - statistics - Statistics collection instance. If None, statistics is disabled. Notes ----- @@ -52,19 +49,15 @@ cdef class AllGather: Communicator comm not None, int32_t op_id, BufferResource br not None, - Statistics statistics = None, ): self._br = br self._comm = comm cdef cpp_BufferResource* br_ = br.ptr() - if statistics is None: - statistics = Statistics(enable=False) # Disables statistics. with nogil: self._handle = make_unique[cpp_AllGather]( comm._handle, op_id, br_, - statistics._handle, ) def __dealloc__(self): diff --git a/python/rapidsmpf/rapidsmpf/integrations/core.py b/python/rapidsmpf/rapidsmpf/integrations/core.py index 48713cc70..b301d0079 100644 --- a/python/rapidsmpf/rapidsmpf/integrations/core.py +++ b/python/rapidsmpf/rapidsmpf/integrations/core.py @@ -24,12 +24,12 @@ from rapidsmpf.memory.spill_collection import SpillCollection from rapidsmpf.rmm_resource_adaptor import RmmResourceAdaptor from rapidsmpf.shuffler import Shuffler +from rapidsmpf.statistics import Statistics if TYPE_CHECKING: from collections.abc import Callable, Sequence from rapidsmpf.communicator.communicator import Communicator - from rapidsmpf.statistics import Statistics DataFrameT = TypeVar("DataFrameT") @@ -748,8 +748,8 @@ def rmpf_worker_local_setup( options = Options(options_map) # use options to create the buffer resource - br = BufferResource.from_options(mr, options) - statistics = br.statistics + statistics = Statistics.from_options(options) + br = BufferResource.from_options(mr, options, statistics) # If enabled, create a staging device buffer for the spilling to reduce # device memory pressure. diff --git a/python/rapidsmpf/rapidsmpf/memory/buffer_resource.pyi b/python/rapidsmpf/rapidsmpf/memory/buffer_resource.pyi index b3577f8c4..e5e56db6b 100644 --- a/python/rapidsmpf/rapidsmpf/memory/buffer_resource.pyi +++ b/python/rapidsmpf/rapidsmpf/memory/buffer_resource.pyi @@ -30,7 +30,10 @@ class BufferResource: ) -> None: ... @classmethod def from_options( - cls: type[Self], mr: RmmResourceAdaptor, options: Options + cls: type[Self], + mr: RmmResourceAdaptor, + options: Options, + statistics: Statistics | None = None, ) -> Self: ... @property def device_mr(self) -> DeviceMemoryResource: ... diff --git a/python/rapidsmpf/rapidsmpf/memory/buffer_resource.pyx b/python/rapidsmpf/rapidsmpf/memory/buffer_resource.pyx index 0104e85be..b1795ab49 100644 --- a/python/rapidsmpf/rapidsmpf/memory/buffer_resource.pyx +++ b/python/rapidsmpf/rapidsmpf/memory/buffer_resource.pyx @@ -239,7 +239,12 @@ cdef class BufferResource: self.spill_manager = SpillManager._create(self) @classmethod - def from_options(cls, RmmResourceAdaptor mr not None, Options options not None): + def from_options( + cls, + RmmResourceAdaptor mr not None, + Options options not None, + statistics=None, + ): """ Construct a BufferResource from configuration options. @@ -252,11 +257,16 @@ cdef class BufferResource: RMM resource adaptor. The adaptor must outlive the returned BufferResource. options Configuration options. + statistics + The statistics instance to use. The caller is responsible for creating and + owning this object. Defaults to ``Statistics.disabled()``. Returns ------- A BufferResource instance configured according to the options. """ + if statistics is None: + statistics = Statistics.disabled() cdef PinnedMemoryResource pinned_mr = PinnedMemoryResource.from_options(options) return cls( device_mr=mr, @@ -264,7 +274,7 @@ cdef class BufferResource: memory_available=AvailableMemoryMap.from_options(mr, options), periodic_spill_check=periodic_spill_check_from_options(options), stream_pool=stream_pool_from_options(options), - statistics=Statistics.from_options(options), + statistics=statistics, ) def __dealloc__(self): diff --git a/python/rapidsmpf/rapidsmpf/statistics.pxd b/python/rapidsmpf/rapidsmpf/statistics.pxd index ef6b57b5b..134620f2a 100644 --- a/python/rapidsmpf/rapidsmpf/statistics.pxd +++ b/python/rapidsmpf/rapidsmpf/statistics.pxd @@ -51,7 +51,7 @@ cdef extern from "" nogil: cdef cppclass cpp_MemoryRecorder "rapidsmpf::Statistics::MemoryRecorder": cpp_MemoryRecorder( - cpp_Statistics* stats, + shared_ptr[cpp_Statistics] stats, cpp_RmmResourceAdaptor mr, string name ) except +ex_handler diff --git a/python/rapidsmpf/rapidsmpf/statistics.pyi b/python/rapidsmpf/rapidsmpf/statistics.pyi index cbd0f669d..e569c5824 100644 --- a/python/rapidsmpf/rapidsmpf/statistics.pyi +++ b/python/rapidsmpf/rapidsmpf/statistics.pyi @@ -24,6 +24,8 @@ class Statistics: def __init__(self, *, enable: bool) -> None: ... @classmethod def from_options(cls: type[Self], options: Options) -> Self: ... + @classmethod + def disabled(cls: type[Self]) -> Self: ... @property def enabled(self) -> bool: ... def report( diff --git a/python/rapidsmpf/rapidsmpf/statistics.pyx b/python/rapidsmpf/rapidsmpf/statistics.pyx index ce784fbf9..4e18d06ed 100644 --- a/python/rapidsmpf/rapidsmpf/statistics.pyx +++ b/python/rapidsmpf/rapidsmpf/statistics.pyx @@ -7,7 +7,7 @@ from cython.operator cimport preincrement from libc.stdint cimport uint8_t from libc.string cimport memcpy from libcpp cimport bool as bool_t -from libcpp.memory cimport make_shared, make_unique, shared_ptr +from libcpp.memory cimport make_unique, shared_ptr from libcpp.optional cimport optional from libcpp.string cimport string from libcpp.vector cimport vector @@ -27,6 +27,14 @@ import os cdef extern from "" nogil: + cdef shared_ptr[cpp_Statistics] cpp_create \ + "rapidsmpf::Statistics::create"( + bool_t enabled, + ) except +ex_handler + + cdef shared_ptr[cpp_Statistics] cpp_disabled \ + "rapidsmpf::Statistics::disabled"() except +ex_handler + cdef shared_ptr[cpp_Statistics] cpp_from_options \ "rapidsmpf::Statistics::from_options"( cpp_Options options, @@ -148,7 +156,7 @@ cdef class Statistics: """ def __init__(self, *, bool_t enable): with nogil: - self._handle = make_shared[cpp_Statistics](enable) + self._handle = cpp_create(enable) @classmethod def from_options(cls, Options options not None): @@ -169,6 +177,23 @@ cdef class Statistics: ret._handle = cpp_from_options(options._handle) return ret + @classmethod + def disabled(cls): + """ + Returns a disabled (no-op) Statistics instance. + + Useful when you need to pass a Statistics argument but do not want to + collect any data. + + Returns + ------- + A Statistics instance with tracking disabled. + """ + cdef Statistics ret = cls.__new__(cls) + with nogil: + ret._handle = cpp_disabled() + return ret + def __dealloc__(self): with nogil: self._handle.reset() @@ -673,7 +698,7 @@ cdef class MemoryRecorder: cdef cpp_RmmResourceAdaptor* mr = self._mr.get_handle() with nogil: self._handle = make_unique[cpp_MemoryRecorder]( - self._stats._handle.get(), deref(mr), self._name + self._stats._handle, deref(mr), self._name ) def __exit__(self, exc_type, exc_value, traceback): diff --git a/python/rapidsmpf/rapidsmpf/streaming/core/context.pyi b/python/rapidsmpf/rapidsmpf/streaming/core/context.pyi index d8aca830f..4ed3e4a65 100644 --- a/python/rapidsmpf/rapidsmpf/streaming/core/context.pyi +++ b/python/rapidsmpf/rapidsmpf/streaming/core/context.pyi @@ -27,7 +27,11 @@ class Context: ) -> None: ... @classmethod def from_options( - cls: type[Self], logger: Logger, mr: RmmResourceAdaptor, options: Options + cls: type[Self], + logger: Logger, + mr: RmmResourceAdaptor, + options: Options, + statistics: Statistics | None = None, ) -> Self: ... def __enter__(self) -> Context: ... def __exit__( diff --git a/python/rapidsmpf/rapidsmpf/streaming/core/context.pyx b/python/rapidsmpf/rapidsmpf/streaming/core/context.pyx index 0d7437eee..512bd6f22 100644 --- a/python/rapidsmpf/rapidsmpf/streaming/core/context.pyx +++ b/python/rapidsmpf/rapidsmpf/streaming/core/context.pyx @@ -20,6 +20,7 @@ from rapidsmpf.streaming.core.memory_reserve_or_wait cimport \ MemoryReserveOrWait from rapidsmpf.memory.buffer import MemoryType as py_MemoryType +from rapidsmpf.statistics import Statistics @no_gc_clear @@ -97,11 +98,14 @@ cdef class Context: cls, Logger logger not None, RmmResourceAdaptor mr not None, - Options options not None + Options options not None, + statistics=None, ): + if statistics is None: + statistics = Statistics.disabled() return cls( logger=logger, - br=BufferResource.from_options(mr, options), + br=BufferResource.from_options(mr, options, statistics), options=options, ) diff --git a/python/rapidsmpf/rapidsmpf/tests/test_allgather.py b/python/rapidsmpf/rapidsmpf/tests/test_allgather.py index 7493b8509..c30772d8e 100644 --- a/python/rapidsmpf/rapidsmpf/tests/test_allgather.py +++ b/python/rapidsmpf/rapidsmpf/tests/test_allgather.py @@ -16,7 +16,6 @@ from rapidsmpf.integrations.cudf.partition import unpack_and_concat from rapidsmpf.memory.buffer_resource import BufferResource from rapidsmpf.memory.packed_data import PackedData -from rapidsmpf.statistics import Statistics from rapidsmpf.utils.cudf import ( cudf_to_pylibcudf_table, pylibcudf_to_cudf_dataframe, @@ -132,14 +131,12 @@ def test_basic_allgather( should receive all data from all ranks. """ br = BufferResource(device_mr) - statistics = Statistics(enable=False) # Create AllGather instance allgather = AllGather( comm=comm, op_id=0, # Use operation ID 0 br=br, - statistics=statistics, ) this_rank = comm.rank diff --git a/python/rapidsmpf/rapidsmpf/tests/test_config.py b/python/rapidsmpf/rapidsmpf/tests/test_config.py index 07ba33610..7d78a71a5 100644 --- a/python/rapidsmpf/rapidsmpf/tests/test_config.py +++ b/python/rapidsmpf/rapidsmpf/tests/test_config.py @@ -488,7 +488,7 @@ def test_buffer_resource_from_options_creates_instance_with_explicit_options() - } ) mr = RmmResourceAdaptor(rmm.mr.CudaMemoryResource()) - br = BufferResource.from_options(mr, opts) + br = BufferResource.from_options(mr, opts, Statistics.from_options(opts)) assert br.statistics.enabled assert br.stream_pool_size() == 8 @@ -512,7 +512,7 @@ def test_buffer_resource_from_options_uses_default_when_options_empty() -> None: def test_buffer_resource_from_options_enables_statistics_when_requested() -> None: opts = Options({"statistics": "ON"}) mr = RmmResourceAdaptor(rmm.mr.CudaMemoryResource()) - br = BufferResource.from_options(mr, opts) + br = BufferResource.from_options(mr, opts, Statistics.from_options(opts)) assert br.statistics.enabled @@ -548,7 +548,9 @@ def test_context_from_options_creates_instance_with_explicit_options() -> None: mr = RmmResourceAdaptor(rmm.mr.CudaMemoryResource()) comm = single_comm.new_communicator(opts, ProgressThread()) - with Context.from_options(comm.logger, mr, opts) as ctx: + with Context.from_options( + comm.logger, mr, opts, Statistics.from_options(opts) + ) as ctx: assert ctx is not None assert ctx.statistics().enabled assert ctx.stream_pool_size() == 8 @@ -572,7 +574,9 @@ def test_context_from_options_enables_statistics_when_requested() -> None: mr = RmmResourceAdaptor(rmm.mr.CudaMemoryResource()) comm = single_comm.new_communicator(opts, ProgressThread()) - with Context.from_options(comm.logger, mr, opts) as ctx: + with Context.from_options( + comm.logger, mr, opts, Statistics.from_options(opts) + ) as ctx: assert ctx is not None assert ctx.statistics().enabled