Skip to content
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions cpp/include/rapidsmpf/statistics.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,24 @@ class Statistics {
MemoryType mem_type, std::size_t nbytes, StreamOrderedTiming&& timing
);

/**
* @brief Record byte count for a send operation.
*
* @param src Source memory type.
* @param nbytes Number of bytes sent.
* @param suffix Suffix to add to the statistic name.
*/
void record_send(MemoryType src, std::size_t nbytes, std::string_view suffix = "");

/**
* @brief Record byte count for a receive operation.
*
* @param dst Destination memory type.
* @param nbytes Number of bytes received.
* @param suffix Suffix to add to the statistic name.
*/
void record_recv(MemoryType dst, std::size_t nbytes, std::string_view suffix = "");

/**
* @brief Get the names of all statistics.
*
Expand Down
10 changes: 9 additions & 1 deletion cpp/src/coll/allgather.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,10 @@ ProgressThread::ProgressState AllGather::event_loop() {
// metadata_tag so the no-overtaking guarantee on a single (src, tag) pair
// ensures current-collective messages arrive before any new-collective
// messages that reuse the same op_id.
auto metadata = chunk->serialize();
statistics_->record_send(MemoryType::HOST, metadata->size());
fire_and_forget_.push_back(
comm_->send(chunk->serialize(), dst, metadata_tag)
comm_->send(std::move(metadata), dst, metadata_tag)
);
if (chunk->is_finish()) {
// Finish chunk contains as sequence number the number
Expand All @@ -227,6 +229,8 @@ ProgressThread::ProgressState AllGather::event_loop() {
} else {
auto buf = chunk->release_data_buffer();
sent_posted_.emplace_back(std::move(chunk));

statistics_->record_send(buf->mem_type(), buf->size);
sent_futures_.emplace_back(
comm_->send(std::move(buf), dst, gpu_data_tag)
);
Expand All @@ -244,6 +248,10 @@ ProgressThread::ProgressState AllGather::event_loop() {
break;
}
auto chunk = detail::Chunk::deserialize(*msg, br_);
// During deserialization, we may have allocated the buffer. So we record the
// stats here.
statistics_->record_recv(MemoryType::HOST, msg->size());
statistics_->record_recv(chunk->memory_type(), chunk->data_size());
Comment thread
nirandaperera marked this conversation as resolved.
Outdated
if (chunk->is_finish()) {
remote_finish_counter_--;
num_expected_messages_ += chunk->sequence();
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/communicator/metadata_payload_exchange/tag.cpp
Comment thread
pentschev marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,14 @@ void TagMetadataPayloadExchange::send(
sizeof(std::size_t)
);

statistics_->record_send(MemoryType::HOST, combined_metadata->size());
fire_and_forget_.push_back(
comm_->send(std::move(combined_metadata), dst, metadata_tag_)
);

// Send data immediately after metadata (if any)
if (payload_size > 0) {
statistics_->record_send(message->data()->mem_type(), payload_size);
fire_and_forget_.push_back(
comm_->send(message->release_data(), dst, gpu_data_tag_)
);
Expand Down Expand Up @@ -245,9 +247,13 @@ void TagMetadataPayloadExchange::receive_metadata() {
msg->begin() + safe_cast<std::ptrdiff_t>(original_metadata_size)
);

// record stats. Data recv may not have completed yet. But buffer is already
// allocated.
statistics_->record_recv(MemoryType::HOST, msg->size());
std::unique_ptr<Buffer> buffer = nullptr;
if (payload_size > 0) {
buffer = allocate_buffer_fn_(payload_size);
statistics_->record_recv(buffer->mem_type(), payload_size);
}

auto message = std::make_unique<MetadataPayloadExchange::Message>(
Expand Down
17 changes: 17 additions & 0 deletions cpp/src/statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <algorithm>
#include <array>
#include <cstring>
#include <format>
#include <fstream>
#include <iomanip>
#include <ranges>
Expand Down Expand Up @@ -680,6 +681,22 @@ std::shared_ptr<Statistics> Statistics::merge(
return ret;
}

void Statistics::record_send(
MemoryType src, std::size_t nbytes, std::string_view suffix
) {
add_stat(
std::format("send-from-{}{}", to_string(src), suffix), static_cast<double>(nbytes)
);
}

void Statistics::record_recv(
MemoryType dst, std::size_t nbytes, std::string_view suffix
) {
add_stat(
std::format("recv-to-{}{}", to_string(dst), suffix), static_cast<double>(nbytes)
);
}

void Statistics::record_copy(
MemoryType src, MemoryType dst, std::size_t nbytes, StreamOrderedTiming&& timing
) {
Expand Down
49 changes: 49 additions & 0 deletions cpp/tests/test_allgather.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,55 @@ TEST_P(AllGatherOrderedTest, non_uniform_inserts) {
}
}

// Test that send/recv statistics record correct byte counts when allgather is run with an
// enabled Statistics object.
//
// In the ring allgather each data chunk travels (nranks-1) hops. Every forwarding rank
// records one DEVICE send and the receiving rank records one DEVICE recv, so the
// aggregate per-rank totals are both (nranks-1) * data_bytes.
TEST_F(BaseAllGatherTest, stats_egress_ingress) {
auto const& comm = GlobalEnvironment->comm_;
if (comm->nranks() == 1) {
GTEST_SKIP()
<< "Stats test requires multiple ranks (no network traffic on 1 rank)";
}

constexpr int n_elements = 10;
constexpr std::size_t data_bytes = n_elements * sizeof(int);

auto stats = std::make_shared<rapidsmpf::Statistics>();
AllGather allgather{comm, 0, br.get(), stats};

allgather.insert(
0, generate_packed_data(n_elements, comm->rank() * n_elements, stream, *br)
);
allgather.insert_finished();

std::vector<rapidsmpf::PackedData> results;
EXPECT_NO_THROW(
results =
allgather.wait_and_extract(AllGather::Ordered::NO, std::chrono::seconds{30})
);

auto const nranks = comm->nranks();
ASSERT_EQ(results.size(), static_cast<std::size_t>(nranks));
for (auto const& result : results) {
EXPECT_EQ(result.data->size, data_bytes);
}
auto const metadata_bytes = results[0].metadata->size();

// DEVICE: exact — each chunk does (nranks-1) hops
auto const expected_device_bytes = static_cast<double>((nranks - 1) * data_bytes);
EXPECT_DOUBLE_EQ(stats->get_stat("send-from-DEVICE").value(), expected_device_bytes);
EXPECT_DOUBLE_EQ(stats->get_stat("recv-to-DEVICE").value(), expected_device_bytes);

// HOST: lower bound — at minimum (nranks-1) data-chunk sends/recvs, each carrying
// at least metadata_bytes of user content (ChunkID and data_size fields add more).
auto const host_lower_bound = static_cast<double>((nranks - 1) * metadata_bytes);
EXPECT_GT(stats->get_stat("send-from-HOST").value(), host_lower_bound);
EXPECT_GT(stats->get_stat("recv-to-HOST").value(), host_lower_bound);
}

// Test that reusing an OpID after a completed allgather doesn't cause cross-matching of
// messages between the old and new collective.
//
Expand Down
72 changes: 72 additions & 0 deletions cpp/tests/test_shuffler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <rapidsmpf/memory/packed_data.hpp>
#include <rapidsmpf/shuffler/finish_counter.hpp>
#include <rapidsmpf/shuffler/shuffler.hpp>
#include <rapidsmpf/statistics.hpp>
#include <rapidsmpf/utils/misc.hpp>

#include "environment.hpp"
Expand Down Expand Up @@ -1126,3 +1127,74 @@ TEST(Shuffler, opid_reuse_with_empty_partitions) {
validate_results(shuffle1, 42);
validate_results(shuffle2, 123);
}

// Test that send-from-DEVICE and recv-to-DEVICE statistics record the correct number of
// bytes for a remote-only shuffle with deterministic sizes.
//
// Design: nranks partitions with round-robin ownership. Each rank r inserts one chunk of
// N bytes for partition (r+1) % nranks, which is always owned by a different rank. This
// produces exactly one remote DEVICE send and one remote DEVICE recv per rank with no
// self-transfers going through the communicator.
TEST(ShufflerStats, stats_egress_ingress) {
auto const& comm = GlobalEnvironment->comm_;
if (comm->nranks() == 1) {
GTEST_SKIP()
<< "Stats test requires multiple ranks (no network traffic on 1 rank)";
}

auto stats = std::make_shared<rapidsmpf::Statistics>();
auto stream_pool = std::make_shared<rmm::cuda_stream_pool>(
16, rmm::cuda_stream::flags::non_blocking
);
auto br = std::make_unique<rapidsmpf::BufferResource>(
cudf::get_current_device_resource_ref(),
rapidsmpf::PinnedMemoryResource::Disabled,
std::unordered_map<
rapidsmpf::MemoryType,
rapidsmpf::BufferResource::MemoryAvailable>{},
std::chrono::milliseconds{1},
stream_pool,
stats
);
auto stream = stream_pool->get_stream();

constexpr int n_elements = 10;
auto const nranks = comm->nranks();
rapidsmpf::shuffler::PartID const total_num_partitions =
static_cast<rapidsmpf::shuffler::PartID>(nranks);

rapidsmpf::shuffler::Shuffler shuffler{comm, 0, total_num_partitions, br.get()};

// Each rank inserts into the partition owned by the next rank — always remote.
auto remote_part =
static_cast<rapidsmpf::shuffler::PartID>((comm->rank() + 1) % nranks);
auto packed =
generate_packed_data(n_elements, comm->rank() * n_elements, stream, *br);
auto data_bytes = packed.data->size;
std::unordered_map<rapidsmpf::shuffler::PartID, rapidsmpf::PackedData> chunks;
chunks.emplace(remote_part, std::move(packed));
shuffler.insert(std::move(chunks));
shuffler.insert_finished();
EXPECT_NO_THROW(shuffler.wait(std::chrono::seconds{30}));

auto received = shuffler.extract(comm->rank());
EXPECT_EQ(received.size(), 1);
EXPECT_EQ(received[0].data->size, data_bytes);
auto metadata_bytes = received[0].metadata->size();
// 1 data msg + n-1 finish msgs -> nranks msgs total
auto expected_metadata_bytes =
metadata_bytes // data msg metadata
+ nranks
* (rapidsmpf::shuffler::detail::Chunk::metadata_message_header_size()
+ 2 * sizeof(size_t));

// Coarse shuffler-level stats: payload bytes submitted to and received from the MPE.
EXPECT_DOUBLE_EQ(stats->get_stat("shuffle-payload-send").value(), data_bytes);
EXPECT_DOUBLE_EQ(stats->get_stat("shuffle-payload-recv").value(), data_bytes);

// Fine-grained communicator-level stats from TagMetadataPayloadExchange.
EXPECT_DOUBLE_EQ(stats->get_stat("send-from-DEVICE").value(), data_bytes);
EXPECT_DOUBLE_EQ(stats->get_stat("recv-to-DEVICE").value(), data_bytes);
EXPECT_DOUBLE_EQ(stats->get_stat("send-from-HOST").value(), expected_metadata_bytes);
EXPECT_DOUBLE_EQ(stats->get_stat("recv-to-HOST").value(), expected_metadata_bytes);
}
Loading