Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/benchmarks/bench_shuffle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ int main(int argc, char** argv) {
};
}

auto stats = std::make_shared<rapidsmpf::Statistics>(/* enable = */ true);
auto stats = rapidsmpf::Statistics::create();

// We're only going to measure the last run, so disable initially.
stats->disable();
Expand Down
2 changes: 1 addition & 1 deletion cpp/benchmarks/streaming/bench_streaming_shuffle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ int main(int argc, char** argv) {
};
}

auto stats = std::make_shared<rapidsmpf::Statistics>(/* enable = */ true);
auto stats = rapidsmpf::Statistics::create();

auto pinned_mr = args.pinned_mem_disable
? rapidsmpf::PinnedMemoryResource::Disabled
Expand Down
2 changes: 1 addition & 1 deletion cpp/benchmarks/streaming/ndsh/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ create_context(ProgramOptions& arguments, RmmResourceAdaptor&& mr) {
memory_available[MemoryType::DEVICE] =
LimitAvailableMemory{mr, static_cast<std::int64_t>(limit_size)};
}
auto statistics = std::make_shared<Statistics>(/* enable = */ true);
auto statistics = Statistics::create();

RAPIDSMPF_EXPECTS(
arguments.no_pinned_host_memory || is_pinned_memory_resources_supported(),
Expand Down
2 changes: 1 addition & 1 deletion cpp/examples/example_shuffle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ int main(int argc, char** argv) {
rapidsmpf::config::Options options{rapidsmpf::config::get_environment_variables()};

// Create a statistics instance for the shuffler that tracks useful information.
auto stats = std::make_shared<rapidsmpf::Statistics>();
auto stats = rapidsmpf::Statistics::create();

// The communicator has a progress thread where the shuffler event loop executes. A
// single progress thread may be used by multiple shufflers simultaneously.
Expand Down
5 changes: 0 additions & 5 deletions cpp/include/rapidsmpf/coll/allgather.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include <rapidsmpf/memory/packed_data.hpp>
#include <rapidsmpf/memory/spill_manager.hpp>
#include <rapidsmpf/progress_thread.hpp>
#include <rapidsmpf/statistics.hpp>

/**
* @namespace rapidsmpf::coll
Expand Down Expand Up @@ -101,8 +100,6 @@ class AllGather {
* @param comm The communicator for communication.
* @param op_id Unique operation identifier for this allgather.
* @param br Buffer resource for memory allocation.
* @param statistics Statistics collection instance (disabled by
* default).
* @param finished_callback Optional callback run when partitions are locally
* finished. The callback is guaranteed to be called by the progress thread exactly
* once when the allgather is locally ready.
Expand All @@ -118,7 +115,6 @@ class AllGather {
std::shared_ptr<Communicator> comm,
OpID op_id,
BufferResource* br,
std::shared_ptr<Statistics> statistics = Statistics::disabled(),
std::function<void(void)>&& finished_callback = nullptr
);

Expand Down Expand Up @@ -195,7 +191,6 @@ class AllGather {

std::shared_ptr<Communicator> comm_; ///< Communicator
BufferResource* br_; ///< Buffer resource for memory allocation
std::shared_ptr<Statistics> statistics_; ///< Statistics collection instance
std::function<void(void)> finished_callback_{
nullptr
}; ///< Optional callback to run when allgather is finished and ready for extraction.
Expand Down
10 changes: 8 additions & 2 deletions cpp/include/rapidsmpf/memory/buffer_resource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,16 @@ class BufferResource {
*
* @param mr The RMM resource adaptor.
* @param options Configuration options.
* @param statistics The statistics instance to use (disabled by default). The caller
* is responsible for creating and owning this object.
*
* @return A shared pointer to a BufferResource instance configured according to the
* options.
*/
static std::shared_ptr<BufferResource> from_options(
RmmResourceAdaptor mr, config::Options options
RmmResourceAdaptor mr,
config::Options options,
std::shared_ptr<Statistics> statistics = Statistics::disabled()
);

~BufferResource() noexcept = default;
Expand Down Expand Up @@ -400,7 +404,7 @@ class BufferResource {
*
* @return Shared pointer the Statistics instance.
*/
std::shared_ptr<Statistics> statistics();
std::shared_ptr<Statistics> const& statistics() const noexcept;

private:
std::mutex mutex_;
Expand All @@ -415,6 +419,8 @@ class BufferResource {
std::shared_ptr<Statistics> statistics_;
};

static_assert(StatisticsProvider<BufferResource>);

/**
* @brief A functor for querying the remaining available memory within a defined limit
* from an RMM statistics resource.
Expand Down
4 changes: 3 additions & 1 deletion cpp/include/rapidsmpf/progress_thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class ProgressThread {
/**
* @brief @return The statistics instance on this progress thread.
*/
std::shared_ptr<Statistics> statistics() const noexcept;
std::shared_ptr<Statistics> const& statistics() const noexcept;

private:
/**
Expand All @@ -205,4 +205,6 @@ class ProgressThread {
detail::PausableThreadLoop thread_;
};

static_assert(StatisticsProvider<ProgressThread>);

} // namespace rapidsmpf
50 changes: 40 additions & 10 deletions cpp/include/rapidsmpf/statistics.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
#pragma once
#include <atomic>
#include <concepts>
#include <cstddef>
#include <filesystem>
#include <initializer_list>
Expand Down Expand Up @@ -69,7 +70,7 @@ class StreamOrderedTiming;
* std::cout << stats.report();
* @endcode
*/
class Statistics {
class Statistics : public std::enable_shared_from_this<Statistics> {
public:
/**
* @brief Identifies a predefined formatter used by `report()`.
Expand Down Expand Up @@ -107,12 +108,13 @@ class Statistics {
};

/**
* @brief Constructs a Statistics object without memory profiling.
* @brief Creates a Statistics instance.
*
* @param enabled If true, enables tracking of statistics. If false, all operations
* are no-ops.
* @return A shared pointer to a new Statistics instance.
*/
Statistics(bool enabled = true);
static std::shared_ptr<Statistics> create(bool enabled = true);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: What does having a create static method buy us over a constructor?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

factory methods allows us to safely use std::enable_shared_from_this, where we can pass a shared_ptr to memory recorder rather than a ptr


/**
* @brief Construct from configuration options.
Expand All @@ -125,8 +127,8 @@ class Statistics {

~Statistics() noexcept;

// `Statistics` is owned exclusively through `std::shared_ptr` (see `disabled()` and
// `from_options()`).
// `Statistics` is owned exclusively through `std::shared_ptr`. Use `create()`,
// `disabled()`, or `from_options()` to construct instances.
Statistics(Statistics const&) = delete;
Statistics& operator=(Statistics const&) = delete;
Statistics(Statistics&&) = delete;
Expand Down Expand Up @@ -566,11 +568,14 @@ class Statistics {
/**
* @brief Constructs an active MemoryRecorder.
*
* @param stats Pointer to Statistics object that will store the result.
* @param stats Shared pointer to the Statistics object that will store
* the result. Must not be null.
* @param mr The RMM resource adaptor providing scoped memory statistics.
* @param name Name of the scope.
*/
MemoryRecorder(Statistics* stats, RmmResourceAdaptor mr, std::string name);
MemoryRecorder(
std::shared_ptr<Statistics> stats, RmmResourceAdaptor mr, std::string name
);

/**
* @brief Destructor.
Expand All @@ -586,9 +591,10 @@ class Statistics {
MemoryRecorder& operator=(MemoryRecorder&&) = delete;

private:
Statistics* stats_{
nullptr
}; // TODO: make this shared_ptr using make_shared_from_this
// Holds the owning Statistics so the recorder can safely outlive any
// raw reference the caller might have. `nullptr` indicates a no-op
// recorder.
std::shared_ptr<Statistics> stats_;
std::optional<RmmResourceAdaptor>
mr_; // optional because RmmResourceAdaptor is not default constructible
std::string name_;
Expand Down Expand Up @@ -624,13 +630,37 @@ class Statistics {
Formatter formatter;
};

explicit Statistics(bool enabled);

mutable std::mutex mutex_;
std::atomic<bool> enabled_;
std::map<std::string, Stat> stats_;
std::map<std::string, ReportEntry> report_entries_;
std::unordered_map<std::string, MemoryRecord> memory_records_;
};

/**
* @brief Satisfied by any type that exposes a `statistics()` method returning
* `std::shared_ptr<Statistics> const&`.
*
* Classes satisfying this concept are *statistics providers* — secondary
* classes that receive a provider as a constructor argument should derive their
* `Statistics` instance by calling `.statistics()` on it rather than accepting
* a separate `std::shared_ptr<Statistics>` argument.
*
* Returning by const-reference avoids the per-call atomic refcount bump that a
* by-value `std::shared_ptr` return would incur on hot paths.
*
* Each provider asserts this concept via `static_assert` in its own header.
* Current providers: `BufferResource`, `ProgressThread`, `streaming::Context`.
*/
template <typename T>
concept StatisticsProvider = requires(T const& t) {
{
t.statistics()
} noexcept -> std::same_as<std::shared_ptr<Statistics> const&>;
};

/**
* @brief Macro for automatic memory profiling of a code scope.
*
Expand Down
9 changes: 7 additions & 2 deletions cpp/include/rapidsmpf/streaming/core/context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ class Context {
* @param logger The logger to use.
* @param options Configuration options used to initialize the Context and its
* components.
* @param statistics The statistics instance to use (disabled by default). The caller
* is responsible for creating and owning this object.
* @return A fully initialized Context.
*
* @throws std::invalid_argument If an option value is invalid.
Expand All @@ -102,7 +104,8 @@ class Context {
static std::shared_ptr<Context> from_options(
RmmResourceAdaptor mr,
std::shared_ptr<Communicator::Logger> logger,
config::Options options
config::Options options,
std::shared_ptr<Statistics> statistics = Statistics::disabled()
);

// No copy constructor and assignment operator.
Expand Down Expand Up @@ -179,7 +182,7 @@ class Context {
*
* @return Shared pointer to the statistics instance.
*/
[[nodiscard]] std::shared_ptr<Statistics> statistics() const noexcept;
[[nodiscard]] std::shared_ptr<Statistics> const& statistics() const noexcept;

/**
* @brief Create a new channel associated with this context.
Expand Down Expand Up @@ -231,4 +234,6 @@ class Context {
SpillManager::SpillFunctionID spill_function_id_{};
};

static_assert(StatisticsProvider<Context>);

} // namespace rapidsmpf::streaming
2 changes: 0 additions & 2 deletions cpp/src/coll/allgather.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,10 @@ AllGather::AllGather(
std::shared_ptr<Communicator> comm,
OpID op_id,
BufferResource* br,
std::shared_ptr<Statistics> statistics,
std::function<void(void)>&& finished_callback
)
: comm_{std::move(comm)},
br_{br},
statistics_{std::move(statistics)},
finished_callback_{std::move(finished_callback)},
finish_counter_{comm_->nranks()},
op_id_{op_id},
Expand Down
5 changes: 2 additions & 3 deletions cpp/src/memory/buffer_resource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ BufferResource::BufferResource(
}

std::shared_ptr<BufferResource> BufferResource::from_options(
RmmResourceAdaptor mr, config::Options options
RmmResourceAdaptor mr, config::Options options, std::shared_ptr<Statistics> statistics
) {
auto pinned_mr = PinnedMemoryResource::from_options(options);
auto mem_available = memory_available_from_options(mr, options);
Expand All @@ -67,7 +67,6 @@ std::shared_ptr<BufferResource> BufferResource::from_options(
mem_available[MemoryType::PINNED_HOST] = pinned_mr->get_memory_available_cb();
}

auto statistics = Statistics::from_options(options);
return std::make_shared<BufferResource>(
std::move(mr),
std::move(pinned_mr),
Expand Down Expand Up @@ -267,7 +266,7 @@ SpillManager& BufferResource::spill_manager() {
return spill_manager_;
}

std::shared_ptr<Statistics> BufferResource::statistics() {
std::shared_ptr<Statistics> const& BufferResource::statistics() const noexcept {
return statistics_;
}

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/progress_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ bool ProgressThread::is_running() const {
return active_;
}

std::shared_ptr<Statistics> ProgressThread::statistics() const noexcept {
std::shared_ptr<Statistics> const& ProgressThread::statistics() const noexcept {
return statistics_;
}

Expand Down
Loading
Loading