-
Notifications
You must be signed in to change notification settings - Fork 9
#2500: Implement MPI_Improbe/MPI_Imrecv for active and data messages #2501
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
#2500: Implement MPI_Improbe/MPI_Imrecv for active and data messages #2501
Conversation
|
diff --git a/src/vt/group/group_manager.cc b/src/vt/group/group_manager.cc
index c09581b30..3a06735ba 100644
--- a/src/vt/group/group_manager.cc
+++ b/src/vt/group/group_manager.cc
@@ -311,8 +311,7 @@ void GroupManager::initializeLocalGroup(
EventType GroupManager::sendGroupCollective(
MsgSharedPtr<BaseMsgType> const& base, NodeType const from,
- bool const is_root,
- bool* const deliver
+ bool const is_root, bool* const deliver
) {
auto const& msg = base.get();
auto const& group = envelopeGetGroup(msg->env);
@@ -418,9 +417,7 @@ EventType GroupManager::sendGroupCollective(
* that are part of the group can be in the spanning tree. Thus, this node
* must forward.
*/
- auto const put_event = theMsg()->sendMsgBytesWithPut(
- root_node, base
- );
+ auto const put_event = theMsg()->sendMsgBytesWithPut(root_node, base);
/*
* Do not deliver on this node since it is not part of the group and will
* just forward to the root node.
diff --git a/src/vt/messaging/active.cc b/src/vt/messaging/active.cc
index fede49896..101c89056 100644
--- a/src/vt/messaging/active.cc
+++ b/src/vt/messaging/active.cc
@@ -212,10 +212,14 @@ MsgSizeType ActiveMessenger::packMsg(
// Choose active message tag based on final size
/*static*/ MPI_TagType ActiveMessenger::selectActiveTag(MsgSizeType size) {
- if (size <= ActiveRecvBroker::caps_[0]) return static_cast<MPI_TagType>(MPITag::ActiveMsgS);
- if (size <= ActiveRecvBroker::caps_[1]) return static_cast<MPI_TagType>(MPITag::ActiveMsgM);
- if (size <= ActiveRecvBroker::caps_[2]) return static_cast<MPI_TagType>(MPITag::ActiveMsgL);
- if (size <= ActiveRecvBroker::caps_[3]) return static_cast<MPI_TagType>(MPITag::ActiveMsgXL);
+ if (size <= ActiveRecvBroker::caps_[0])
+ return static_cast<MPI_TagType>(MPITag::ActiveMsgS);
+ if (size <= ActiveRecvBroker::caps_[1])
+ return static_cast<MPI_TagType>(MPITag::ActiveMsgM);
+ if (size <= ActiveRecvBroker::caps_[2])
+ return static_cast<MPI_TagType>(MPITag::ActiveMsgL);
+ if (size <= ActiveRecvBroker::caps_[3])
+ return static_cast<MPI_TagType>(MPITag::ActiveMsgXL);
return static_cast<MPI_TagType>(MPITag::ActiveMsgTag); // fallback
}
@@ -450,9 +454,7 @@ EventType ActiveMessenger::sendMsgBytes(
return event_id;
}
-EventType ActiveMessenger::doMessageSend(
- MsgSharedPtr<BaseMsgType>& base
-) {
+EventType ActiveMessenger::doMessageSend(MsgSharedPtr<BaseMsgType>& base) {
auto msg = base.get();
auto const dest = envelopeGetDest(msg->env);
@@ -711,8 +713,8 @@ bool ActiveMessenger::recvDataMsgBuffer(
{
VT_ALLOW_MPI_CALLS;
const int probe_ret = MPI_Improbe(
- node == uninitialized_destination ? MPI_ANY_SOURCE : node,
- tag, comm_, &flag, &first_msg_handle, &stat
+ node == uninitialized_destination ? MPI_ANY_SOURCE : node, tag, comm_,
+ &flag, &first_msg_handle, &stat
);
vtAssertMPISuccess(probe_ret, "MPI_Improbe");
}
@@ -720,9 +722,8 @@ bool ActiveMessenger::recvDataMsgBuffer(
if (flag == 1) {
MPI_Get_count(&stat, MPI_BYTE, &num_probe_bytes);
- std::byte* buf = user_buf == nullptr ?
- thePool()->alloc(num_probe_bytes) :
- user_buf;
+ std::byte* buf =
+ user_buf == nullptr ? thePool()->alloc(num_probe_bytes) : user_buf;
NodeType const sender = stat.MPI_SOURCE;
@@ -809,7 +810,8 @@ void ActiveMessenger::recvDataDirect(
MsgSizeType len, PriorityType prio, ActionType dealloc,
ContinuationDeleterType next, bool is_user_buf
#if MPI_VERSION >= 3
- , MPI_Message first_msg, MsgSizeType first_chunk_bytes
+ ,
+ MPI_Message first_msg, MsgSizeType first_chunk_bytes
#endif
) {
vtAssert(nchunks > 0, "Must have at least one chunk");
@@ -825,34 +827,31 @@ void ActiveMessenger::recvDataDirect(
// If we have a pre-matched first message, use MPI_Imrecv for the first chunk
int start_chunk = 0;
if (first_msg != MPI_MESSAGE_NULL && first_chunk_bytes > 0) {
- #if vt_check_enabled(trace_enabled)
- std::unique_ptr<trace::TraceScopedNote> trace_note;
- if (theConfig()->vt_trace_mpi) {
- trace_note = std::make_unique<trace::TraceScopedNote>(trace_irecv);
- }
- #endif
+#if vt_check_enabled(trace_enabled)
+ std::unique_ptr<trace::TraceScopedNote> trace_note;
+ if (theConfig()->vt_trace_mpi) {
+ trace_note = std::make_unique<trace::TraceScopedNote>(trace_irecv);
+ }
+#endif
{
VT_ALLOW_MPI_CALLS;
- int const ret = MPI_Imrecv(
- cbuf, first_chunk_bytes, MPI_BYTE,
- &first_msg, &reqs[0]
- );
+ int const ret =
+ MPI_Imrecv(cbuf, first_chunk_bytes, MPI_BYTE, &first_msg, &reqs[0]);
vtAssertMPISuccess(ret, "MPI_Imrecv");
}
dmPostedCounterGauge.incrementUpdate(first_chunk_bytes, 1);
- #if vt_check_enabled(trace_enabled)
- if (theConfig()->vt_trace_mpi) {
- auto tr_note = fmt::format(
- "Imrecv(Data, first chunk): from={}, bytes={}",
- from, first_chunk_bytes
- );
- trace_note->setNote(tr_note);
- trace_note->end();
- }
- #endif
+#if vt_check_enabled(trace_enabled)
+ if (theConfig()->vt_trace_mpi) {
+ auto tr_note = fmt::format(
+ "Imrecv(Data, first chunk): from={}, bytes={}", from, first_chunk_bytes
+ );
+ trace_note->setNote(tr_note);
+ trace_note->end();
+ }
+#endif
remainder -= first_chunk_bytes;
start_chunk = 1;
@@ -885,7 +884,7 @@ void ActiveMessenger::recvDataDirect(
dmPostedCounterGauge.incrementUpdate(sublen, 1);
- #if vt_check_enabled(trace_enabled)
+#if vt_check_enabled(trace_enabled)
if (theConfig()->vt_trace_mpi) {
auto tr_note = fmt::format(
"Irecv(Data): from={}, bytes={}",
@@ -1028,13 +1027,15 @@ void ActiveMessenger::processActiveMsg(
// Try to process one incoming active message via Improbe/Iprobe for any size tag
bool ActiveMessenger::tryProcessIncomingActiveMsg() {
- struct TagEntry { MPI_TagType tag; };
+ struct TagEntry {
+ MPI_TagType tag;
+ };
static constexpr TagEntry active_tags[] = {
- { static_cast<MPI_TagType>(MPITag::ActiveMsgS) },
- { static_cast<MPI_TagType>(MPITag::ActiveMsgM) },
- { static_cast<MPI_TagType>(MPITag::ActiveMsgL) },
- { static_cast<MPI_TagType>(MPITag::ActiveMsgXL) },
- { static_cast<MPI_TagType>(MPITag::ActiveMsgTag) }
+ {static_cast<MPI_TagType>(MPITag::ActiveMsgS)},
+ {static_cast<MPI_TagType>(MPITag::ActiveMsgM)},
+ {static_cast<MPI_TagType>(MPITag::ActiveMsgL)},
+ {static_cast<MPI_TagType>(MPITag::ActiveMsgXL)},
+ {static_cast<MPI_TagType>(MPITag::ActiveMsgTag)}
};
for (auto const& te : active_tags) {
@@ -1046,9 +1047,7 @@ bool ActiveMessenger::tryProcessIncomingActiveMsg() {
MPI_Message msg_handle = MPI_MESSAGE_NULL;
{
VT_ALLOW_MPI_CALLS;
- MPI_Improbe(
- MPI_ANY_SOURCE, te.tag, comm_, &flag, &msg_handle, &stat
- );
+ MPI_Improbe(MPI_ANY_SOURCE, te.tag, comm_, &flag, &msg_handle, &stat);
}
if (flag == 1) {
@@ -1060,28 +1059,28 @@ bool ActiveMessenger::tryProcessIncomingActiveMsg() {
MPI_Request req = MPI_REQUEST_NULL;
{
- #if vt_check_enabled(trace_enabled)
- std::unique_ptr<trace::TraceScopedNote> trace_note;
- if (theConfig()->vt_trace_mpi) {
- trace_note = std::make_unique<trace::TraceScopedNote>(trace_irecv);
- }
- #endif
+#if vt_check_enabled(trace_enabled)
+ std::unique_ptr<trace::TraceScopedNote> trace_note;
+ if (theConfig()->vt_trace_mpi) {
+ trace_note = std::make_unique<trace::TraceScopedNote>(trace_irecv);
+ }
+#endif
VT_ALLOW_MPI_CALLS;
MPI_Imrecv(buf, num_probe_bytes, MPI_BYTE, &msg_handle, &req);
amPostedCounterGauge.incrementUpdate(num_probe_bytes, 1);
- #if vt_check_enabled(trace_enabled)
- if (theConfig()->vt_trace_mpi) {
- auto tr_note = fmt::format(
- "Imrecv(AM): from={}, bytes={}, tag={}",
- stat.MPI_SOURCE, num_probe_bytes, te.tag
- );
- trace_note->setNote(tr_note);
- trace_note->end();
- }
- #endif
+#if vt_check_enabled(trace_enabled)
+ if (theConfig()->vt_trace_mpi) {
+ auto tr_note = fmt::format(
+ "Imrecv(AM): from={}, bytes={}, tag={}", stat.MPI_SOURCE,
+ num_probe_bytes, te.tag
+ );
+ trace_note->setNote(tr_note);
+ trace_note->end();
+ }
+#endif
}
InProgressIRecv recv_holder{buf, num_probe_bytes, sender, req};
@@ -1101,9 +1100,7 @@ bool ActiveMessenger::tryProcessIncomingActiveMsg() {
#else
{
VT_ALLOW_MPI_CALLS;
- MPI_Iprobe(
- MPI_ANY_SOURCE, te.tag, comm_, &flag, &stat
- );
+ MPI_Iprobe(MPI_ANY_SOURCE, te.tag, comm_, &flag, &stat);
}
if (flag == 1) {
@@ -1115,31 +1112,30 @@ bool ActiveMessenger::tryProcessIncomingActiveMsg() {
MPI_Request req;
{
- #if vt_check_enabled(trace_enabled)
- std::unique_ptr<trace::TraceScopedNote> trace_note;
- if (theConfig()->vt_trace_mpi) {
- trace_note = std::make_unique<trace::TraceScopedNote>(trace_irecv);
- }
- #endif
+#if vt_check_enabled(trace_enabled)
+ std::unique_ptr<trace::TraceScopedNote> trace_note;
+ if (theConfig()->vt_trace_mpi) {
+ trace_note = std::make_unique<trace::TraceScopedNote>(trace_irecv);
+ }
+#endif
VT_ALLOW_MPI_CALLS;
MPI_Irecv(
- buf, num_probe_bytes, MPI_BYTE, sender, stat.MPI_TAG,
- comm_, &req
+ buf, num_probe_bytes, MPI_BYTE, sender, stat.MPI_TAG, comm_, &req
);
amPostedCounterGauge.incrementUpdate(num_probe_bytes, 1);
- #if vt_check_enabled(trace_enabled)
- if (theConfig()->vt_trace_mpi) {
- auto tr_note = fmt::format(
- "Irecv(AM): from={}, bytes={}, tag={}",
- stat.MPI_SOURCE, num_probe_bytes, stat.MPI_TAG
- );
- trace_note->setNote(tr_note);
- trace_note->end();
- }
- #endif
+#if vt_check_enabled(trace_enabled)
+ if (theConfig()->vt_trace_mpi) {
+ auto tr_note = fmt::format(
+ "Irecv(AM): from={}, bytes={}, tag={}", stat.MPI_SOURCE,
+ num_probe_bytes, stat.MPI_TAG
+ );
+ trace_note->setNote(tr_note);
+ trace_note->end();
+ }
+#endif
}
InProgressIRecv recv_holder{buf, num_probe_bytes, sender, req};
@@ -1318,35 +1314,37 @@ bool ActiveMessenger::testPendingAsyncOps() {
);
}
-void ActiveMessenger::ActiveRecvBroker::postSlot(ActiveMessenger* self, Slot& s) {
+void ActiveMessenger::ActiveRecvBroker::postSlot(
+ ActiveMessenger* self, Slot& s
+) {
// Allocate a fresh buffer for this slot and post a Irecv for the corresponding tag
s.buf = thePool()->alloc(s.cap);
- #if vt_check_enabled(trace_enabled)
- std::unique_ptr<trace::TraceScopedNote> trace_note;
- if (theConfig()->vt_trace_mpi) {
- trace_note = std::make_unique<trace::TraceScopedNote>(self->trace_irecv);
- }
- #endif
+#if vt_check_enabled(trace_enabled)
+ std::unique_ptr<trace::TraceScopedNote> trace_note;
+ if (theConfig()->vt_trace_mpi) {
+ trace_note = std::make_unique<trace::TraceScopedNote>(self->trace_irecv);
+ }
+#endif
{
VT_ALLOW_MPI_CALLS;
int const ret = MPI_Irecv(
- s.buf, s.cap, MPI_BYTE, MPI_ANY_SOURCE,
- s.tag, self->comm_, &s.req
+ s.buf, s.cap, MPI_BYTE, MPI_ANY_SOURCE, s.tag, self->comm_, &s.req
);
vtAssertMPISuccess(ret, "Broker MPI_Irecv");
}
self->amPostedCounterGauge.incrementUpdate(s.cap, 1);
- #if vt_check_enabled(trace_enabled)
- if (theConfig()->vt_trace_mpi) {
- auto tr_note = fmt::format("Irecv(AM Broker): tag={}, cap={}", s.tag, s.cap);
- trace_note->setNote(tr_note);
- trace_note->end();
- }
- #endif
+#if vt_check_enabled(trace_enabled)
+ if (theConfig()->vt_trace_mpi) {
+ auto tr_note =
+ fmt::format("Irecv(AM Broker): tag={}, cap={}", s.tag, s.cap);
+ trace_note->setNote(tr_note);
+ trace_note->end();
+ }
+#endif
s.posted = true;
}
@@ -1357,7 +1355,9 @@ void ActiveMessenger::ActiveRecvBroker::setup(ActiveMessenger* self) {
for (int ci = 0; ci < num_caps_; ++ci) {
for (int k = 0; k < slots_per_class_; ++k) {
- slots_.emplace_back(Slot{caps_[ci], tags_[ci], nullptr, MPI_REQUEST_NULL, false});
+ slots_.emplace_back(
+ Slot{caps_[ci], tags_[ci], nullptr, MPI_REQUEST_NULL, false}
+ );
}
}
@@ -1377,7 +1377,8 @@ bool ActiveMessenger::ActiveRecvBroker::progress(ActiveMessenger* self) {
int tests = 0;
for (auto& s : slots_) {
- if (!s.posted) continue;
+ if (!s.posted)
+ continue;
int flag = 0;
MPI_Status stat{};
@@ -1427,8 +1428,8 @@ int ActiveMessenger::progress([[maybe_unused]] TimeType current_time) {
bool const broker_progress = active_broker_.progress(this);
return started_irecv_active_msg or started_irecv_data_msg or
- received_active_msg or received_data_msg or general_async or
- broker_progress;
+ received_active_msg or received_data_msg or general_async or
+ broker_progress;
}
void ActiveMessenger::registerAsyncOp(std::unique_ptr<AsyncOp> in) {
diff --git a/src/vt/messaging/active.h b/src/vt/messaging/active.h
index 5d079aa4a..60bc1e5af 100644
--- a/src/vt/messaging/active.h
+++ b/src/vt/messaging/active.h
@@ -105,13 +105,13 @@ static constexpr TagType const PutPackedTag =
enum class MPITag : MPI_TagType {
ActiveMsgTag = 1,
- DataMsgTag = 2,
+ DataMsgTag = 2,
// Size-class active message tags
- ActiveMsgS = 11, // <= 512 bytes
- ActiveMsgM = 12, // <= 2048 bytes
- ActiveMsgL = 13, // <= 8192 bytes
- ActiveMsgXL = 14 // <= 32768 bytes
+ ActiveMsgS = 11, // <= 512 bytes
+ ActiveMsgM = 12, // <= 2048 bytes
+ ActiveMsgL = 13, // <= 8192 bytes
+ ActiveMsgXL = 14 // <= 32768 bytes
};
static constexpr TagType const starting_direct_buffer_tag = 1000;
@@ -1389,7 +1389,8 @@ struct ActiveMessenger : runtime::component::PollableComponent<ActiveMessenger>
MsgSizeType len, PriorityType prio, ActionType dealloc = nullptr,
ContinuationDeleterType next = nullptr, bool is_user_buf = false
#if MPI_VERSION >= 3
- , MPI_Message first_msg = MPI_MESSAGE_NULL, MsgSizeType first_chunk_bytes = 0
+ ,
+ MPI_Message first_msg = MPI_MESSAGE_NULL, MsgSizeType first_chunk_bytes = 0
#endif
);
@@ -1759,6 +1760,7 @@ private:
static constexpr int num_caps_ = 4;
static constexpr int caps_[num_caps_] = {512, 2048, 8192, 32768};
+
private:
std::vector<Slot> slots_;
static constexpr MPI_TagType tags_[num_caps_] = {
|
…essages - Updated tryProcessIncomingActiveMsg to use MPI_Improbe+MPI_Imrecv on MPI 3+ - Updated recvDataMsgBuffer to use MPI_Improbe for data messages (enqueue=false) - Extended recvDataDirect signature with optional first_msg and first_chunk_bytes parameters - Implemented matched-first-chunk path in recvDataDirect using MPI_Imrecv - Added MPI_VERSION >= 3 guards for all new code paths - Maintained backward compatibility with MPI < 3 using existing Iprobe/Irecv Co-authored-by: lifflander <3324465+lifflander@users.noreply.github.com>
c991481 to
4bace1d
Compare
Pipelines resultsvt-build-amd64-ubuntu-20-04-gcc-9-ldms-cpp Build for 9876c80 (2026-01-08 19:15:39 UTC) vt-build-amd64-ubuntu-20-04-gcc-10-cpp Build for 9876c80 (2026-01-08 19:15:39 UTC) vt-build-amd64-ubuntu-24-04-gcc-14-cpp Build for 9876c80 (2026-01-08 19:15:39 UTC) vt-build-amd64-ubuntu-20-04-gcc-10-openmpi-cpp Build for 9876c80 (2026-01-08 19:15:39 UTC) vt-build-amd64-ubuntu-20-04-gcc-9-cpp Build for 9876c80 (2026-01-08 19:15:39 UTC) vt-build-amd64-ubuntu-24-04-clang-18-cpp Build for 9876c80 (2026-01-08 19:15:39 UTC) vt-build-amd64-ubuntu-20-04-gcc-10-openmpi-cpp-spack Build for 1a8f1dc (2026-01-09 01:14:19 UTC) vt-build-amd64-ubuntu-24-04-clang-16-vtk-cpp Build for 9876c80 (2026-01-08 19:15:39 UTC) vt-build-amd64-ubuntu-22-04-gcc-12-vtk-cpp Build for 9876c80 (2026-01-08 19:15:39 UTC) vt-build-amd64-ubuntu-22-04-clang-14-cpp Build for 9876c80 (2026-01-08 19:15:39 UTC) vt-build-amd64-ubuntu-22-04-clang-15-cpp-cxx20 Build for 9876c80 (2026-01-08 19:15:39 UTC) vt-build-amd64-ubuntu-20-04-clang-9-cpp Build for 9876c80 (2026-01-08 19:15:39 UTC) vt-build-amd64-ubuntu-22-04-gcc-11-cpp Build for 9876c80 (2026-01-08 19:15:39 UTC) vt-build-amd64-alpine-3-16-clang-cpp Build for 9876c80 (2026-01-08 19:15:39 UTC) vt-build-amd64-ubuntu-22-04-gcc-12-cpp Build for 9876c80 (2026-01-08 19:15:39 UTC) vt-build-amd64-ubuntu-20-04-clang-10-cpp Build for 9876c80 (2026-01-08 19:15:39 UTC) vt-build-amd64-ubuntu-22-04-clang-11-cpp Build for 9876c80 (2026-01-08 19:15:39 UTC) vt-build-amd64-ubuntu-24-04-gcc-13-cpp Build for 9876c80 (2026-01-08 19:15:39 UTC) vt-build-amd64-ubuntu-20-04-gcc-9-cuda-12-2-0-cpp Build for 9876c80 (2026-01-08 19:15:39 UTC) |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## develop #2501 +/- ##
===========================================
- Coverage 88.47% 88.47% -0.01%
===========================================
Files 728 728
Lines 30909 30925 +16
===========================================
+ Hits 27347 27360 +13
- Misses 3562 3565 +3
🚀 New features to boost your workflow:
|
|
The len -> sublen change in the diagnostic call is a distinct bug fix of the existing code, right? If so, I think it ought to be in its own commit, rather than mixed into the broader feature addition of the first commit in this PR. |
Fixes #2500
Large-scale runs on HPE Cray Slingshot (libfabric CXI provider) hit non-deterministic aborts under heavy unexpected-message pressure due to hardware match-table exhaustion and Iprobe→Irecv race conditions. Switch to MPI_Improbe/MPI_Imrecv (MPI 3+) to claim messages from unexpected queue immediately.