Skip to content

Commit

Permalink
Merge pull request #1346 from msimberg/transform-mpi-cleanup
Browse files Browse the repository at this point in the history
Minor `transform_mpi` refactoring
  • Loading branch information
msimberg authored Nov 22, 2024
2 parents a06ccec + c6ee2c8 commit c2ac8b2
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 39 deletions.
14 changes: 2 additions & 12 deletions libs/pika/async_mpi/include/pika/async_mpi/dispatch_mpi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,18 +158,8 @@ namespace pika::mpi::experimental::detail {
mpi::exception(status, "dispatch mpi")));
return;
}
// early poll just in case the request completed immediately
if (poll_request(request))
{
#ifdef PIKA_HAVE_APEX
apex::scoped_timer apex_invoke("pika::mpi::trigger");
#endif
PIKA_DETAIL_DP(mpi_tran<7>,
debug(
str<>("dispatch_mpi_recv"), "eager poll ok", ptr(request)));
ex::set_value(PIKA_MOVE(r.op_state.receiver), MPI_REQUEST_NULL);
}
else { ex::set_value(PIKA_MOVE(r.op_state.receiver), request); }

ex::set_value(PIKA_MOVE(r.op_state.receiver), request);
},
[&](std::exception_ptr ep) {
ex::set_error(PIKA_MOVE(r.op_state.receiver), PIKA_MOVE(ep));
Expand Down
54 changes: 29 additions & 25 deletions libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ namespace pika::mpi::experimental {
using pika::execution::experimental::continues_on;
using pika::execution::experimental::just;
using pika::execution::experimental::let_value;
using pika::execution::experimental::unique_any_sender;
using pika::execution::experimental::unpack;

// get mpi completion mode settings
Expand All @@ -64,35 +63,40 @@ namespace pika::mpi::experimental {
execution::thread_priority::boost :
execution::thread_priority::normal;

auto completion_snd = [=](MPI_Request request) -> unique_any_sender<> {
if (!completions_inline) // not inline : a transfer is required
if (completions_inline)
{
auto f_completion = [mode, f = std::forward<F>(f)](auto&... args) mutable {
return just(std::forward_as_tuple(args...)) | unpack() |
dispatch_mpi(std::move(f)) | trigger_mpi(mode);
};

if (requests_inline)
{
if (request == MPI_REQUEST_NULL)
{
return ex::schedule(default_pool_scheduler(p));
}
return just(request) | trigger_mpi(mode) |
ex::continues_on(default_pool_scheduler(p));
return std::forward<Sender>(sender) | let_value(std::move(f_completion));
}
else
{
return std::forward<Sender>(sender) | continues_on(mpi_pool_scheduler(p)) |
let_value(std::move(f_completion));
}
if (request == MPI_REQUEST_NULL) { return just(); }
return just(request) | trigger_mpi(mode);
};

if (requests_inline)
{
return std::forward<Sender>(sender) |
let_value([=, f = std::forward<F>(f)](auto&... args) mutable {
return just(std::forward_as_tuple(args...)) | ex::unpack() |
dispatch_mpi(std::move(f)) | let_value(completion_snd);
});
}
else
{
return std::forward<Sender>(sender) | continues_on(mpi_pool_scheduler(p)) |
let_value([=, f = std::forward<F>(f)](auto&... args) mutable {
return just(std::forward_as_tuple(args...)) | ex::unpack() |
dispatch_mpi(std::move(f)) | let_value(completion_snd);
});
auto f_completion = [mode, p, f = std::forward<F>(f)](auto&... args) mutable {
return just(std::forward_as_tuple(args...)) | unpack() |
dispatch_mpi(std::move(f)) | trigger_mpi(mode) |
continues_on(default_pool_scheduler(p));
};

if (requests_inline)
{
return std::forward<Sender>(sender) | let_value(std::move(f_completion));
}
else
{
return std::forward<Sender>(sender) | continues_on(mpi_pool_scheduler(p)) |
let_value(std::move(f_completion));
}
}
}

Expand Down
9 changes: 7 additions & 2 deletions libs/pika/async_mpi/include/pika/async_mpi/trigger_mpi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,14 @@ namespace pika::mpi::experimental::detail {
{
auto r = PIKA_MOVE(*this);

// early exit check
if (request == MPI_REQUEST_NULL)
// early poll just in case the request completed immediately
if (poll_request(request))
{
#ifdef PIKA_HAVE_APEX
apex::scoped_timer apex_invoke("pika::mpi::trigger");
#endif
PIKA_DETAIL_DP(mpi_tran<7>,
debug(str<>("trigger_mpi_recv"), "eager poll ok", ptr(request)));
ex::set_value(PIKA_MOVE(r.op_state.receiver));
return;
}
Expand Down

0 comments on commit c2ac8b2

Please sign in to comment.