Skip to content
Draft
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cpp/include/rapidsmpf/coll/allreduce.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class AllReduce {
* already in the buffer are ignored).
* @param op_id Unique operation identifier for this allreduce.
* @param reduce_operator Type-erased reduction operator to use. See `ReduceOperator`.
* @param statistics Statistics collection instance (disabled by default).
* @param finished_callback Optional callback run once locally when the allreduce
* is finished and results are ready for extraction.
*
Expand All @@ -91,6 +92,7 @@ class AllReduce {
std::unique_ptr<Buffer> output,
OpID op_id,
ReduceOperator reduce_operator,
std::shared_ptr<Statistics> statistics = Statistics::disabled(),
std::function<void(void)> finished_callback = nullptr
);

Expand Down Expand Up @@ -185,6 +187,7 @@ class AllReduce {
[[nodiscard]] ProgressThread::ProgressState event_loop();

std::shared_ptr<Communicator> comm_{};
std::shared_ptr<Statistics> statistics_; ///< Statistics collection instance
ReduceOperator reduce_operator_; ///< Reduction operator
std::unique_ptr<Buffer> in_buffer_{};
std::unique_ptr<Buffer> out_buffer_{};
Expand Down
4 changes: 4 additions & 0 deletions cpp/include/rapidsmpf/coll/sparse_alltoall.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <rapidsmpf/memory/buffer_resource.hpp>
#include <rapidsmpf/memory/packed_data.hpp>
#include <rapidsmpf/progress_thread.hpp>
#include <rapidsmpf/statistics.hpp>

namespace rapidsmpf::coll {

Expand All @@ -42,6 +43,7 @@ class SparseAlltoall {
* @param br Buffer resource used for allocations.
* @param srcs Ranks this rank will receive from.
* @param dsts Ranks this rank will send to.
* @param statistics Statistics collection instance (disabled by default).
* @param finished_callback Optional callback invoked exactly once when the collective
* is locally complete. The callback should be fast and non-blocking. Ideally it
* should only be used to signal a thread to do the actual work of extraction. Note in
Expand Down Expand Up @@ -74,6 +76,7 @@ class SparseAlltoall {
BufferResource* br,
std::vector<Rank> srcs,
std::vector<Rank> dsts,
std::shared_ptr<Statistics> statistics = Statistics::disabled(),
std::function<void()>&& finished_callback = nullptr
);

Expand Down Expand Up @@ -171,6 +174,7 @@ class SparseAlltoall {

std::shared_ptr<Communicator> comm_;
BufferResource* br_;
std::shared_ptr<Statistics> statistics_;
std::vector<Rank> srcs_;
std::vector<Rank> dsts_;
std::unordered_map<Rank, std::atomic<std::uint64_t>> next_ordinal_per_dst_;
Expand Down
2 changes: 1 addition & 1 deletion cpp/include/rapidsmpf/coll/utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class Chunk {
* @brief Return the memory type of the chunk.
*
* @return The memory type of the chunk.
* @note a finish chunk has memory type host.
* @note A finish/ metadata-only chunk has memory type HOST (ie. data_size() == 0).
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No.

*/
[[nodiscard]] MemoryType memory_type() const noexcept;

Expand Down
38 changes: 38 additions & 0 deletions cpp/include/rapidsmpf/communicator/communicator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
*/
#pragma once

#include <concepts>
#include <cstdint>
#include <cstdlib>
#include <memory>
#include <mutex>
Expand All @@ -17,6 +19,7 @@
#include <rapidsmpf/error.hpp>
#include <rapidsmpf/memory/buffer.hpp>
#include <rapidsmpf/progress_thread.hpp>
#include <rapidsmpf/statistics.hpp>

/**
* @namespace rapidsmpf
Expand Down Expand Up @@ -640,6 +643,41 @@ constexpr bool COMM_HAVE_MPI = true;
constexpr bool COMM_HAVE_MPI = false;
#endif

/**
* @brief Records a send statistic and then sends a message to a specific rank.
*
* Equivalent to calling `statistics.record_send(...)` followed by
* `comm.send(msg, rank, tag)`. The memory type and byte count are inferred from
* `T`: `std::vector<uint8_t>` is treated as `MemoryType::HOST` with size given by
* `.size()`, while `Buffer` uses `Buffer::mem_type()` and `Buffer::size`.
*
* @tparam T Message type: `std::vector<std::uint8_t>` or `Buffer`.
* @param comm The communicator to send through.
* @param msg Unique pointer to the message to send.
* @param rank The destination rank.
* @param tag Message tag for identification.
* @param statistics Statistics instance to record the send into.
* @param stat_suffix Optional suffix appended to the stat key name.
* @return A unique pointer to a `Future` representing the asynchronous operation.
*/
template <typename T>
requires std::same_as<T, std::vector<std::uint8_t>> || std::same_as<T, Buffer>
[[nodiscard]] inline std::unique_ptr<Communicator::Future> send_with_stats(
Communicator& comm,
std::unique_ptr<T> msg,
Rank rank,
Tag tag,
Statistics& statistics,
std::string_view stat_suffix = ""
Comment thread
nirandaperera marked this conversation as resolved.
Outdated
) {
if constexpr (std::same_as<T, std::vector<std::uint8_t>>) {
statistics.record_send(MemoryType::HOST, msg->size(), stat_suffix);
} else {
statistics.record_send(msg->mem_type(), msg->size, stat_suffix);
}
return comm.send(std::move(msg), rank, tag);
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite sure I like this for a few reasons:

  1. It only covers send, but none of the recv* methods are covered;
  2. Adding all recv* methods as well will easily bloat this interface substantially;
  3. We now have to specify the Communicator as argument.

IMO, we would either:

  1. Make Statistics an integral part of Communicator, so that it can be passed to the constructor and all send/recv* calls will automatically do that; or
  2. Move this methods into a separate file to avoid bloating the interface.

I think 1 is a better choice and probably what I'll need to do for #996 , so we should probably go that direction nevertheless but perhaps in a separate PR?

/**
* @brief Overloads the stream insertion operator for the Communicator class.
*
Expand Down
18 changes: 18 additions & 0 deletions cpp/include/rapidsmpf/statistics.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,24 @@ class Statistics {
MemoryType mem_type, std::size_t nbytes, StreamOrderedTiming&& timing
);

/**
* @brief Record byte count for a send operation.
*
* @param src Source memory type.
* @param nbytes Number of bytes sent.
* @param suffix Suffix to add to the statistic name.
*/
void record_send(MemoryType src, std::size_t nbytes, std::string_view suffix = "");

/**
* @brief Record byte count for a receive operation.
*
* @param dst Destination memory type.
* @param nbytes Number of bytes received.
* @param suffix Suffix to add to the statistic name.
*/
void record_recv(MemoryType dst, std::size_t nbytes, std::string_view suffix = "");

/**
* @brief Get the names of all statistics.
*
Expand Down
25 changes: 16 additions & 9 deletions cpp/src/coll/allgather.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,19 +217,21 @@ ProgressThread::ProgressState AllGather::event_loop() {
// metadata_tag so the no-overtaking guarantee on a single (src, tag) pair
// ensures current-collective messages arrive before any new-collective
// messages that reuse the same op_id.
fire_and_forget_.push_back(
comm_->send(chunk->serialize(), dst, metadata_tag)
);
auto metadata = chunk->serialize();
fire_and_forget_.push_back(send_with_stats(
*comm_, std::move(metadata), dst, metadata_tag, *statistics_
));
if (chunk->is_finish()) {
// Finish chunk contains as sequence number the number
// of insertions from that rank.
mark_finish(chunk->sequence());
} else {
auto buf = chunk->release_data_buffer();
sent_posted_.emplace_back(std::move(chunk));
sent_futures_.emplace_back(
comm_->send(std::move(buf), dst, gpu_data_tag)
);

sent_futures_.emplace_back(send_with_stats(
*comm_, std::move(buf), dst, gpu_data_tag, *statistics_
));
}
}
// Receive metadata messages. All messages (data + finish) share metadata_tag, so
Expand All @@ -244,13 +246,18 @@ ProgressThread::ProgressState AllGather::event_loop() {
break;
}
auto chunk = detail::Chunk::deserialize(*msg, br_);
// During deserialization, we have allocated the recv buffer. So we record the
// stats here.
statistics_->record_recv(MemoryType::HOST, msg->size());
statistics_->record_recv(chunk->memory_type(), chunk->data_size());
Comment thread
nirandaperera marked this conversation as resolved.
Outdated
if (chunk->is_finish()) {
remote_finish_counter_--;
num_expected_messages_ += chunk->sequence();
if (chunk->origin() != dst) {
fire_and_forget_.push_back(
comm_->send(chunk->serialize(), dst, metadata_tag)
);
auto finished_msg = chunk->serialize();
fire_and_forget_.push_back(send_with_stats(
*comm_, std::move(finished_msg), dst, metadata_tag, *statistics_
));
}
mark_finish(chunk->sequence());
} else {
Expand Down
27 changes: 20 additions & 7 deletions cpp/src/coll/allreduce.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ AllReduce::AllReduce(
std::unique_ptr<Buffer> output,
OpID op_id,
ReduceOperator reduce_operator,
std::shared_ptr<Statistics> statistics,
std::function<void(void)> finished_callback
)
: comm_{std::move(comm)},
statistics_{std::move(statistics)},
reduce_operator_{std::move(reduce_operator)},
in_buffer_{std::move(input)},
out_buffer_{std::move(output)},
Expand Down Expand Up @@ -59,7 +61,7 @@ AllReduce::AllReduce(
out_buffer_->rebind_stream(in_buffer_->stream());
// Note: after this copy, we must check out_buffer's write event before receiving into
// in_buffer. See StartPreRemainder in the event loop.
buffer_copy(Statistics::disabled(), *out_buffer_, *in_buffer_, in_buffer_->size);
buffer_copy(statistics_, *out_buffer_, *in_buffer_, in_buffer_->size);

auto const rank = comm_->rank();
if (rank < 2 * non_pow2_remainder_) {
Expand Down Expand Up @@ -156,9 +158,9 @@ ProgressThread::ProgressState AllReduce::event_loop() {
// If we don't have a power of two number of ranks, then we have `power_of_two +
// remainder` ranks. We first take `2 * remainder` ranks, and the even ranks send
// their contribution to their odd pair. The even ranks then jump to receive a final
// contribution, while the rest form a power of two and exchange via the above loop.
// Once that is complete, the paired odd ranks send the final answer to their even
// counterpart.
// contribution, while the rest (ranks >= 2 * remainder or odd) form a power of two
// and exchange via the above loop. Once that is complete, the paired odd ranks send
// the final answer to their even counterpart.
Comment on lines 160 to +163
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because this wasnt apparent when I read the comment initially. I thought rest meant just the odd ranks in the initial 2 * remainder ranks.

//
// So even ranks in the remainder do:
// PreRemainder -> PostRemainder -> Done
Expand All @@ -172,7 +174,9 @@ ProgressThread::ProgressState AllReduce::event_loop() {
if (!out_buffer_->is_latest_write_done()) {
break;
}
send_future_ = comm_->send(std::move(out_buffer_), rank + 1, tag);
send_future_ = send_with_stats(
*comm_, std::move(out_buffer_), rank + 1, tag, *statistics_
);
} else {
// The constructor copies in_buffer_ to out_buffer_ on in_buffer's
// stream. The copy must be complete before we can receive into
Expand All @@ -185,6 +189,7 @@ ProgressThread::ProgressState AllReduce::event_loop() {
{
break;
}
statistics_->record_recv(in_buffer_->mem_type(), in_buffer_->size);
recv_future_ = comm_->recv(rank - 1, tag, std::move(in_buffer_));
}
phase_.store(Phase::CompletePreRemainder, std::memory_order_release);
Expand Down Expand Up @@ -236,8 +241,13 @@ ProgressThread::ProgressState AllReduce::event_loop() {
{
break;
}
statistics_->record_recv(in_buffer_->mem_type(), in_buffer_->size);
recv_future_ = comm_->recv(stage_partner_, tag, std::move(in_buffer_));
send_future_ = comm_->send(std::move(out_buffer_), stage_partner_, tag);

send_future_ = send_with_stats(
*comm_, std::move(out_buffer_), stage_partner_, tag, *statistics_
);

phase_.store(Phase::CompleteButterfly, std::memory_order_release);
break;
}
Expand Down Expand Up @@ -268,12 +278,15 @@ ProgressThread::ProgressState AllReduce::event_loop() {
if (!out_buffer_->is_latest_write_done()) {
break;
}
statistics_->record_recv(out_buffer_->mem_type(), out_buffer_->size);
recv_future_ = comm_->recv(rank + 1, tag, std::move(out_buffer_));
} else {
if (!out_buffer_->is_latest_write_done()) {
break;
}
send_future_ = comm_->send(std::move(out_buffer_), rank - 1, tag);
send_future_ = send_with_stats(
*comm_, std::move(out_buffer_), rank - 1, tag, *statistics_
);
}
phase_.store(Phase::CompletePostRemainder, std::memory_order_release);
break;
Expand Down
15 changes: 13 additions & 2 deletions cpp/src/coll/sparse_alltoall.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <rapidsmpf/coll/sparse_alltoall.hpp>
#include <rapidsmpf/error.hpp>
#include <rapidsmpf/memory/buffer_resource.hpp>
#include <rapidsmpf/statistics.hpp>

namespace rapidsmpf::coll {

Expand All @@ -24,10 +25,12 @@ SparseAlltoall::SparseAlltoall(
BufferResource* br,
std::vector<Rank> srcs,
std::vector<Rank> dsts,
std::shared_ptr<Statistics> statistics,
std::function<void()>&& finished_callback
)
: comm_{std::move(comm)},
br_{br},
statistics_{std::move(statistics)},
srcs_{std::move(srcs)},
dsts_{std::move(dsts)},
op_id_{op_id},
Expand Down Expand Up @@ -159,10 +162,14 @@ void SparseAlltoall::send_ready_messages() {
Tag const payload_tag{op_id_, 1};
for (auto& chunk : outgoing_.extract_ready()) {
auto const dst = chunk->destination();
fire_and_forget_.push_back(comm_->send(chunk->serialize(), dst, metadata_tag));
auto metadata = chunk->serialize();
fire_and_forget_.push_back(
send_with_stats(*comm_, std::move(metadata), dst, metadata_tag, *statistics_)
);
if (chunk->data_size() > 0) {
auto buf = chunk->release_data_buffer();
fire_and_forget_.push_back(
comm_->send(chunk->release_data_buffer(), dst, payload_tag)
send_with_stats(*comm_, std::move(buf), dst, payload_tag, *statistics_)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you decide to create temporary variables (metadata and buf) that are only used once instead of keeping it like before that the serialization/release are done in-place without the additional variable?

);
}
}
Expand All @@ -178,6 +185,10 @@ void SparseAlltoall::receive_metadata_messages() {
}
state.received_count++;
auto chunk = detail::Chunk::deserialize(*msg, br_);
// During deserialization, we have allocated the recv buffer. So we record the
// stats here.
statistics_->record_recv(MemoryType::HOST, msg->size());
statistics_->record_recv(chunk->memory_type(), chunk->data_size());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking more about this, your explanation about when to post the receive from https://github.com/rapidsai/rapidsmpf/pull/999/files#r3148360252 was:

Data receive stats are recorded at the point of posting the receive, not at the point of actually receive the data.

This is not actually true or perhaps not phrased accurately, I think what you meant to say is they are recorded at the point of "posting the metadata receive", which is what is happening here. However, is there a specific reason we need to do that? I think this is a bit flaky for future modifications, if we eventually make changes to the actual data recv*() call we're not guaranteed to make changes here as well without careful analysis. Therefore, unless there is a good technical reason to do it this way, it would probably be best to have the record_recv next to the actual recv*() call.

RAPIDSMPF_EXPECTS(
chunk->origin() == src,
"SparseAlltoall received metadata with unexpected origin"
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/communicator/metadata_payload_exchange/tag.cpp
Comment thread
pentschev marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,14 @@ void TagMetadataPayloadExchange::send(
sizeof(std::size_t)
);

statistics_->record_send(MemoryType::HOST, combined_metadata->size());
fire_and_forget_.push_back(
comm_->send(std::move(combined_metadata), dst, metadata_tag_)
);

// Send data immediately after metadata (if any)
if (payload_size > 0) {
statistics_->record_send(message->data()->mem_type(), payload_size);
fire_and_forget_.push_back(
comm_->send(message->release_data(), dst, gpu_data_tag_)
);
Expand Down Expand Up @@ -245,9 +247,13 @@ void TagMetadataPayloadExchange::receive_metadata() {
msg->begin() + safe_cast<std::ptrdiff_t>(original_metadata_size)
);

// record stats. Data recv may not have completed yet. But buffer is already
// allocated.
statistics_->record_recv(MemoryType::HOST, msg->size());
std::unique_ptr<Buffer> buffer = nullptr;
if (payload_size > 0) {
buffer = allocate_buffer_fn_(payload_size);
statistics_->record_recv(buffer->mem_type(), payload_size);
}

auto message = std::make_unique<MetadataPayloadExchange::Message>(
Expand Down
17 changes: 17 additions & 0 deletions cpp/src/statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <algorithm>
#include <array>
#include <cstring>
#include <format>
#include <fstream>
#include <iomanip>
#include <ranges>
Expand Down Expand Up @@ -680,6 +681,22 @@ std::shared_ptr<Statistics> Statistics::merge(
return ret;
}

void Statistics::record_send(
MemoryType src, std::size_t nbytes, std::string_view suffix
) {
add_stat(
std::format("send-from-{}{}", to_string(src), suffix), static_cast<double>(nbytes)
);
}

void Statistics::record_recv(
MemoryType dst, std::size_t nbytes, std::string_view suffix
) {
add_stat(
std::format("recv-to-{}{}", to_string(dst), suffix), static_cast<double>(nbytes)
);
}

void Statistics::record_copy(
MemoryType src, MemoryType dst, std::size_t nbytes, StreamOrderedTiming&& timing
) {
Expand Down
Loading
Loading