Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions gloo/test/send_recv_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@

#include <algorithm>
#include <array>
#include <chrono>
#include <string>
#include <unordered_set>

#include "gloo/common/error.h"

namespace gloo {
namespace test {
namespace {
Expand Down Expand Up @@ -517,6 +521,88 @@ INSTANTIATE_TEST_CASE_P(
::testing::Values(2, 3, 4, 5, 6, 7, 8),
::testing::Values(1)));

// Tests that TCP unbound-buffer timeout errors include the local rank,
// remote peer or candidate peers, and slot. The peer ranks intentionally
// do not post matching operations, so rank 0's operation times out.
class SendRecvTimeoutTest : public BaseTest {};

TEST_F(SendRecvTimeoutTest, RecvTimeoutReportsRanks) {
constexpr uint64_t slot = 0x1337;
spawn(Transport::TCP, 2, [&](std::shared_ptr<Context> context) {
// Rank 1 stays idle so it never sends; rank 0's recv must time out.
if (context->rank != 0) {
return;
}
int tmp = 0;
auto buf = context->createUnboundBuffer(&tmp, sizeof(tmp));
buf->recv(1, slot);
try {
buf->waitRecv(std::chrono::milliseconds(500));
ADD_FAILURE() << "Expected waitRecv to time out";
} catch (const ::gloo::IoException& e) {
const std::string msg = e.what();
EXPECT_NE(msg.find("Rank 0"), std::string::npos) << msg;
EXPECT_NE(msg.find("recv from rank 1"), std::string::npos) << msg;
EXPECT_NE(
msg.find(std::string("(slot ") + std::to_string(slot) + ")"),
std::string::npos)
<< msg;
}
});
}

TEST_F(SendRecvTimeoutTest, RecvFromAnyTimeoutReportsRanks) {
constexpr uint64_t slot = 0x1337;
spawn(Transport::TCP, 3, [&](std::shared_ptr<Context> context) {
// Ranks 1 and 2 stay idle so rank 0's recv-from-any must time out.
if (context->rank != 0) {
return;
}
int tmp = 0;
auto buf = context->createUnboundBuffer(&tmp, sizeof(tmp));
std::vector<int> ranks = {1, 2};
buf->recv(ranks, slot);
try {
buf->waitRecv(std::chrono::milliseconds(500));
ADD_FAILURE() << "Expected waitRecv to time out";
} catch (const ::gloo::IoException& e) {
const std::string msg = e.what();
EXPECT_NE(msg.find("Rank 0"), std::string::npos) << msg;
EXPECT_NE(msg.find("recv from any of ranks [1, 2]"), std::string::npos)
<< msg;
EXPECT_NE(
msg.find(std::string("(slot ") + std::to_string(slot) + ")"),
std::string::npos)
<< msg;
}
});
}

TEST_F(SendRecvTimeoutTest, SendTimeoutReportsRanks) {
constexpr uint64_t slot = 0x1337;
spawn(Transport::TCP, 2, [&](std::shared_ptr<Context> context) {
// Rank 1 stays idle so it never posts a matching recv
if (context->rank != 0) {
return;
}
int tmp = 0;
auto buf = context->createUnboundBuffer(&tmp, sizeof(tmp));
buf->send(1, slot);
try {
buf->waitSend(std::chrono::milliseconds(500));
ADD_FAILURE() << "Expected waitSend to time out";
} catch (const ::gloo::IoException& e) {
const std::string msg = e.what();
EXPECT_NE(msg.find("Rank 0"), std::string::npos) << msg;
EXPECT_NE(msg.find("send to rank 1"), std::string::npos) << msg;
EXPECT_NE(
msg.find(std::string("(slot ") + std::to_string(slot) + ")"),
std::string::npos)
<< msg;
}
});
}

} // namespace
} // namespace test
} // namespace gloo
39 changes: 35 additions & 4 deletions gloo/transport/tcp/unbound_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@

#include "gloo/transport/tcp/unbound_buffer.h"

#include <string>

#include "gloo/common/error.h"
#include "gloo/common/logging.h"
#include "gloo/common/string.h"
#include "gloo/transport/tcp/context.h"

namespace gloo {
Expand Down Expand Up @@ -75,10 +78,26 @@ bool UnboundBuffer::waitRecv(int* rank, std::chrono::milliseconds timeout) {
// be sure to look for the actual cause (seen below).
context_->signalException("Application timeout caused pair closure");

std::string peers;
if (recvSrcRanks_.size() == 1) {
peers = ::gloo::MakeString("rank ", recvSrcRanks_[0]);
} else {
peers = ::gloo::MakeString(
"any of ranks [",
::gloo::MakeString<int>(recvSrcRanks_, /*delim=*/", "),
"]");
}

throw ::gloo::IoException(GLOO_ERROR_MSG(
"Timed out waiting ",
"Timed out after ",
timeout.count(),
"ms for recv operation to complete"));
"ms: Rank ",
context_->rank,
" waiting for recv from ",
peers,
" (slot ",
recvSlot_,
")"));
}
}
if (abortWaitRecv_) {
Expand Down Expand Up @@ -127,9 +146,15 @@ bool UnboundBuffer::waitSend(int* rank, std::chrono::milliseconds timeout) {
context_->signalException("Application timeout caused pair closure");

throw ::gloo::IoException(GLOO_ERROR_MSG(
"Timed out waiting ",
"Timed out after ",
timeout.count(),
"ms for send operation to complete"));
"ms: Rank ",
context_->rank,
" waiting for send to rank ",
sendDstRank_,
" (slot ",
sendSlot_,
")"));
}
}

Expand All @@ -150,6 +175,8 @@ void UnboundBuffer::send(
uint64_t slot,
size_t offset,
size_t nbytes) {
sendDstRank_ = dstRank;
sendSlot_ = slot;
// Default the number of bytes to be equal to the number
// of bytes remaining in the buffer w.r.t. the offset.
if (nbytes == kUnspecifiedByteCount) {
Expand All @@ -164,6 +191,8 @@ void UnboundBuffer::recv(
uint64_t slot,
size_t offset,
size_t nbytes) {
recvSrcRanks_ = {srcRank};
recvSlot_ = slot;
// Default the number of bytes to be equal to the number
// of bytes remaining in the buffer w.r.t. the offset.
if (nbytes == kUnspecifiedByteCount) {
Expand All @@ -178,6 +207,8 @@ void UnboundBuffer::recv(
uint64_t slot,
size_t offset,
size_t nbytes) {
recvSrcRanks_ = srcRanks;
recvSlot_ = slot;
// Default the number of bytes to be equal to the number
// of bytes remaining in the buffer w.r.t. the offset.
if (nbytes == kUnspecifiedByteCount) {
Expand Down
8 changes: 8 additions & 0 deletions gloo/transport/tcp/unbound_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
#include "gloo/transport/unbound_buffer.h"

#include <condition_variable>
#include <cstdint>
#include <memory>
#include <mutex>
#include <vector>

namespace gloo {
namespace transport {
Expand Down Expand Up @@ -73,6 +75,12 @@ class UnboundBuffer : public ::gloo::transport::UnboundBuffer {
int sendCompletions_;
int sendRank_;

// Issued operation metadata for timeout diagnostics.
std::vector<int> recvSrcRanks_;
uint64_t recvSlot_{0};
int sendDstRank_{-1};
uint64_t sendSlot_{0};

std::exception_ptr ex_;

// Throws if an exception if set.
Expand Down
Loading