diff --git a/libs/pika/async_mpi/include/pika/async_mpi/dispatch_mpi.hpp b/libs/pika/async_mpi/include/pika/async_mpi/dispatch_mpi.hpp index a1bc641e1..52a4c2eca 100644 --- a/libs/pika/async_mpi/include/pika/async_mpi/dispatch_mpi.hpp +++ b/libs/pika/async_mpi/include/pika/async_mpi/dispatch_mpi.hpp @@ -158,18 +158,8 @@ namespace pika::mpi::experimental::detail { mpi::exception(status, "dispatch mpi"))); return; } - // early poll just in case the request completed immediately - if (poll_request(request)) - { -#ifdef PIKA_HAVE_APEX - apex::scoped_timer apex_invoke("pika::mpi::trigger"); -#endif - PIKA_DETAIL_DP(mpi_tran<7>, - debug( - str<>("dispatch_mpi_recv"), "eager poll ok", ptr(request))); - ex::set_value(PIKA_MOVE(r.op_state.receiver), MPI_REQUEST_NULL); - } - else { ex::set_value(PIKA_MOVE(r.op_state.receiver), request); } + + ex::set_value(PIKA_MOVE(r.op_state.receiver), request); }, [&](std::exception_ptr ep) { ex::set_error(PIKA_MOVE(r.op_state.receiver), PIKA_MOVE(ep)); diff --git a/libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp b/libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp index 7472b6925..cf7a516f9 100644 --- a/libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp +++ b/libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp @@ -52,7 +52,6 @@ namespace pika::mpi::experimental { using pika::execution::experimental::continues_on; using pika::execution::experimental::just; using pika::execution::experimental::let_value; - using pika::execution::experimental::unique_any_sender; using pika::execution::experimental::unpack; // get mpi completion mode settings @@ -64,35 +63,40 @@ namespace pika::mpi::experimental { execution::thread_priority::boost : execution::thread_priority::normal; - auto completion_snd = [=](MPI_Request request) -> unique_any_sender<> { - if (!completions_inline) // not inline : a transfer is required + if (completions_inline) + { + auto f_completion = [mode, f = std::forward(f)](auto&... args) mutable { + return just(std::forward_as_tuple(args...)) | unpack() | + dispatch_mpi(std::move(f)) | trigger_mpi(mode); + }; + + if (requests_inline) { - if (request == MPI_REQUEST_NULL) - { - return ex::schedule(default_pool_scheduler(p)); - } - return just(request) | trigger_mpi(mode) | - ex::continues_on(default_pool_scheduler(p)); + return std::forward(sender) | let_value(std::move(f_completion)); + } + else + { + return std::forward(sender) | continues_on(mpi_pool_scheduler(p)) | + let_value(std::move(f_completion)); } - if (request == MPI_REQUEST_NULL) { return just(); } - return just(request) | trigger_mpi(mode); - }; - - if (requests_inline) - { - return std::forward(sender) | - let_value([=, f = std::forward(f)](auto&... args) mutable { - return just(std::forward_as_tuple(args...)) | ex::unpack() | - dispatch_mpi(std::move(f)) | let_value(completion_snd); - }); } else { - return std::forward(sender) | continues_on(mpi_pool_scheduler(p)) | - let_value([=, f = std::forward(f)](auto&... args) mutable { - return just(std::forward_as_tuple(args...)) | ex::unpack() | - dispatch_mpi(std::move(f)) | let_value(completion_snd); - }); + auto f_completion = [mode, p, f = std::forward(f)](auto&... args) mutable { + return just(std::forward_as_tuple(args...)) | unpack() | + dispatch_mpi(std::move(f)) | trigger_mpi(mode) | + continues_on(default_pool_scheduler(p)); + }; + + if (requests_inline) + { + return std::forward(sender) | let_value(std::move(f_completion)); + } + else + { + return std::forward(sender) | continues_on(mpi_pool_scheduler(p)) | + let_value(std::move(f_completion)); + } } } diff --git a/libs/pika/async_mpi/include/pika/async_mpi/trigger_mpi.hpp b/libs/pika/async_mpi/include/pika/async_mpi/trigger_mpi.hpp index b466073b7..d80e237bd 100644 --- a/libs/pika/async_mpi/include/pika/async_mpi/trigger_mpi.hpp +++ b/libs/pika/async_mpi/include/pika/async_mpi/trigger_mpi.hpp @@ -119,9 +119,14 @@ namespace pika::mpi::experimental::detail { { auto r = PIKA_MOVE(*this); - // early exit check - if (request == MPI_REQUEST_NULL) + // early poll just in case the request completed immediately + if (poll_request(request)) { +#ifdef PIKA_HAVE_APEX + apex::scoped_timer apex_invoke("pika::mpi::trigger"); +#endif + PIKA_DETAIL_DP(mpi_tran<7>, + debug(str<>("trigger_mpi_recv"), "eager poll ok", ptr(request))); ex::set_value(PIKA_MOVE(r.op_state.receiver)); return; }