From 975c01f233ff9c81e777935d902f439d6b9ed02e Mon Sep 17 00:00:00 2001 From: Mikael Simberg Date: Wed, 20 Nov 2024 10:32:38 +0100 Subject: [PATCH 1/8] Only do check for MPI_REQUEST_NULL in trigger_mpi receiver --- libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp | 5 ----- 1 file changed, 5 deletions(-) diff --git a/libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp b/libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp index 7472b6925..7807ec775 100644 --- a/libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp +++ b/libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp @@ -67,14 +67,9 @@ namespace pika::mpi::experimental { auto completion_snd = [=](MPI_Request request) -> unique_any_sender<> { if (!completions_inline) // not inline : a transfer is required { - if (request == MPI_REQUEST_NULL) - { - return ex::schedule(default_pool_scheduler(p)); - } return just(request) | trigger_mpi(mode) | ex::continues_on(default_pool_scheduler(p)); } - if (request == MPI_REQUEST_NULL) { return just(); } return just(request) | trigger_mpi(mode); }; From 838caaffb3f9a79d5c87bfa05028cdf26a83681e Mon Sep 17 00:00:00 2001 From: Mikael Simberg Date: Wed, 20 Nov 2024 10:42:31 +0100 Subject: [PATCH 2/8] Do early poll of MPI request only in trigger_mpi receiver --- .../include/pika/async_mpi/dispatch_mpi.hpp | 14 ++------------ .../include/pika/async_mpi/trigger_mpi.hpp | 9 +++++++-- 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/libs/pika/async_mpi/include/pika/async_mpi/dispatch_mpi.hpp b/libs/pika/async_mpi/include/pika/async_mpi/dispatch_mpi.hpp index a1bc641e1..52a4c2eca 100644 --- a/libs/pika/async_mpi/include/pika/async_mpi/dispatch_mpi.hpp +++ b/libs/pika/async_mpi/include/pika/async_mpi/dispatch_mpi.hpp @@ -158,18 +158,8 @@ namespace pika::mpi::experimental::detail { mpi::exception(status, "dispatch mpi"))); return; } - // early poll just in case the request completed immediately - if (poll_request(request)) - { -#ifdef PIKA_HAVE_APEX - apex::scoped_timer apex_invoke("pika::mpi::trigger"); -#endif - PIKA_DETAIL_DP(mpi_tran<7>, - debug( - str<>("dispatch_mpi_recv"), "eager poll ok", ptr(request))); - ex::set_value(PIKA_MOVE(r.op_state.receiver), MPI_REQUEST_NULL); - } - else { ex::set_value(PIKA_MOVE(r.op_state.receiver), request); } + + ex::set_value(PIKA_MOVE(r.op_state.receiver), request); }, [&](std::exception_ptr ep) { ex::set_error(PIKA_MOVE(r.op_state.receiver), PIKA_MOVE(ep)); diff --git a/libs/pika/async_mpi/include/pika/async_mpi/trigger_mpi.hpp b/libs/pika/async_mpi/include/pika/async_mpi/trigger_mpi.hpp index b466073b7..d80e237bd 100644 --- a/libs/pika/async_mpi/include/pika/async_mpi/trigger_mpi.hpp +++ b/libs/pika/async_mpi/include/pika/async_mpi/trigger_mpi.hpp @@ -119,9 +119,14 @@ namespace pika::mpi::experimental::detail { { auto r = PIKA_MOVE(*this); - // early exit check - if (request == MPI_REQUEST_NULL) + // early poll just in case the request completed immediately + if (poll_request(request)) { +#ifdef PIKA_HAVE_APEX + apex::scoped_timer apex_invoke("pika::mpi::trigger"); +#endif + PIKA_DETAIL_DP(mpi_tran<7>, + debug(str<>("trigger_mpi_recv"), "eager poll ok", ptr(request))); ex::set_value(PIKA_MOVE(r.op_state.receiver)); return; } From f507aff84708ca0889a476328a7d0c5e560253ac Mon Sep 17 00:00:00 2001 From: Mikael Simberg Date: Wed, 20 Nov 2024 11:23:07 +0100 Subject: [PATCH 3/8] Remove one level of let_value nesting in transform_mpi --- .../include/pika/async_mpi/transform_mpi.hpp | 23 ++++++------------- 1 file changed, 7 insertions(+), 16 deletions(-) diff --git a/libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp b/libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp index 7807ec775..f662b0cef 100644 --- a/libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp +++ b/libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp @@ -64,30 +64,21 @@ namespace pika::mpi::experimental { execution::thread_priority::boost : execution::thread_priority::normal; - auto completion_snd = [=](MPI_Request request) -> unique_any_sender<> { - if (!completions_inline) // not inline : a transfer is required - { - return just(request) | trigger_mpi(mode) | - ex::continues_on(default_pool_scheduler(p)); - } - return just(request) | trigger_mpi(mode); + auto f_mpi = [=, f = std::forward(f)](auto&... args) mutable -> unique_any_sender<> { + auto s = just(std::forward_as_tuple(args...)) | ex::unpack() | + dispatch_mpi(std::move(f)) | trigger_mpi(mode); + if (completions_inline) { return s; } + else { return std::move(s) | ex::continues_on(default_pool_scheduler(p)); } }; if (requests_inline) { - return std::forward(sender) | - let_value([=, f = std::forward(f)](auto&... args) mutable { - return just(std::forward_as_tuple(args...)) | ex::unpack() | - dispatch_mpi(std::move(f)) | let_value(completion_snd); - }); + return std::forward(sender) | let_value(std::move(f_mpi)); } else { return std::forward(sender) | continues_on(mpi_pool_scheduler(p)) | - let_value([=, f = std::forward(f)](auto&... args) mutable { - return just(std::forward_as_tuple(args...)) | ex::unpack() | - dispatch_mpi(std::move(f)) | let_value(completion_snd); - }); + let_value(std::move(f_mpi)); } } From ec08151d96438b34c03dd704dc55df7dc230a42f Mon Sep 17 00:00:00 2001 From: Mikael Simberg Date: Wed, 20 Nov 2024 11:40:29 +0100 Subject: [PATCH 4/8] Remove internal use of unique_any_sender in transform_mpi adaptor Only use unique_any_sender for return type of adaptor itself. --- .../include/pika/async_mpi/transform_mpi.hpp | 44 ++++++++++++++----- 1 file changed, 34 insertions(+), 10 deletions(-) diff --git a/libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp b/libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp index f662b0cef..34c996850 100644 --- a/libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp +++ b/libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp @@ -64,21 +64,45 @@ namespace pika::mpi::experimental { execution::thread_priority::boost : execution::thread_priority::normal; - auto f_mpi = [=, f = std::forward(f)](auto&... args) mutable -> unique_any_sender<> { - auto s = just(std::forward_as_tuple(args...)) | ex::unpack() | - dispatch_mpi(std::move(f)) | trigger_mpi(mode); - if (completions_inline) { return s; } - else { return std::move(s) | ex::continues_on(default_pool_scheduler(p)); } - }; - if (requests_inline) { - return std::forward(sender) | let_value(std::move(f_mpi)); + if (completions_inline) + { + return std::forward(sender) | + let_value([=, f = std::forward(f)](auto&... args) mutable { + return just(std::forward_as_tuple(args...)) | ex::unpack() | + dispatch_mpi(std::move(f)) | trigger_mpi(mode); + }); + } + else + { + return std::forward(sender) | + let_value([=, f = std::forward(f)](auto&... args) mutable { + return just(std::forward_as_tuple(args...)) | ex::unpack() | + dispatch_mpi(std::move(f)) | trigger_mpi(mode) | + ex::continues_on(default_pool_scheduler(p)); + }); + } } else { - return std::forward(sender) | continues_on(mpi_pool_scheduler(p)) | - let_value(std::move(f_mpi)); + if (completions_inline) + { + return std::forward(sender) | continues_on(mpi_pool_scheduler(p)) | + let_value([=, f = std::forward(f)](auto&... args) mutable { + return just(std::forward_as_tuple(args...)) | ex::unpack() | + dispatch_mpi(std::move(f)) | trigger_mpi(mode); + }); + } + else + { + return std::forward(sender) | continues_on(mpi_pool_scheduler(p)) | + let_value([=, f = std::forward(f)](auto&... args) mutable { + return just(std::forward_as_tuple(args...)) | ex::unpack() | + dispatch_mpi(std::move(f)) | trigger_mpi(mode) | + ex::continues_on(default_pool_scheduler(p)); + }); + } } } From 8181d6d790d14bc3dab61cc4cdc66ac83bb1482a Mon Sep 17 00:00:00 2001 From: Mikael Simberg Date: Wed, 20 Nov 2024 11:44:57 +0100 Subject: [PATCH 5/8] Slightly reduce duplication in transform_mpi adaptor --- .../include/pika/async_mpi/transform_mpi.hpp | 43 ++++++++----------- 1 file changed, 19 insertions(+), 24 deletions(-) diff --git a/libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp b/libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp index 34c996850..a3437ce39 100644 --- a/libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp +++ b/libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp @@ -64,44 +64,39 @@ namespace pika::mpi::experimental { execution::thread_priority::boost : execution::thread_priority::normal; - if (requests_inline) + if (completions_inline) { - if (completions_inline) + auto f_completion = [=, f = std::forward(f)](auto&... args) mutable { + return just(std::forward_as_tuple(args...)) | ex::unpack() | + dispatch_mpi(std::move(f)) | trigger_mpi(mode); + }; + + if (requests_inline) { - return std::forward(sender) | - let_value([=, f = std::forward(f)](auto&... args) mutable { - return just(std::forward_as_tuple(args...)) | ex::unpack() | - dispatch_mpi(std::move(f)) | trigger_mpi(mode); - }); + return std::forward(sender) | let_value(std::move(f_completion)); } else { - return std::forward(sender) | - let_value([=, f = std::forward(f)](auto&... args) mutable { - return just(std::forward_as_tuple(args...)) | ex::unpack() | - dispatch_mpi(std::move(f)) | trigger_mpi(mode) | - ex::continues_on(default_pool_scheduler(p)); - }); + return std::forward(sender) | continues_on(mpi_pool_scheduler(p)) | + let_value(std::move(f_completion)); } } else { - if (completions_inline) + auto f_completion = [=, f = std::forward(f)](auto&... args) mutable { + return just(std::forward_as_tuple(args...)) | ex::unpack() | + dispatch_mpi(std::move(f)) | trigger_mpi(mode) | + ex::continues_on(default_pool_scheduler(p)); + }; + + if (requests_inline) { - return std::forward(sender) | continues_on(mpi_pool_scheduler(p)) | - let_value([=, f = std::forward(f)](auto&... args) mutable { - return just(std::forward_as_tuple(args...)) | ex::unpack() | - dispatch_mpi(std::move(f)) | trigger_mpi(mode); - }); + return std::forward(sender) | let_value(std::move(f_completion)); } else { return std::forward(sender) | continues_on(mpi_pool_scheduler(p)) | - let_value([=, f = std::forward(f)](auto&... args) mutable { - return just(std::forward_as_tuple(args...)) | ex::unpack() | - dispatch_mpi(std::move(f)) | trigger_mpi(mode) | - ex::continues_on(default_pool_scheduler(p)); - }); + let_value(std::move(f_completion)); } } } From 4688f865ee0d0bdb4eddbf357d58fa59fae26765 Mon Sep 17 00:00:00 2001 From: Mikael Simberg Date: Wed, 20 Nov 2024 11:45:43 +0100 Subject: [PATCH 6/8] Remove unused typedefs and namespace qualification in transform_mpi adaptor --- .../async_mpi/include/pika/async_mpi/transform_mpi.hpp | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp b/libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp index a3437ce39..b8bce7e2e 100644 --- a/libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp +++ b/libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp @@ -52,7 +52,6 @@ namespace pika::mpi::experimental { using pika::execution::experimental::continues_on; using pika::execution::experimental::just; using pika::execution::experimental::let_value; - using pika::execution::experimental::unique_any_sender; using pika::execution::experimental::unpack; // get mpi completion mode settings @@ -67,7 +66,7 @@ namespace pika::mpi::experimental { if (completions_inline) { auto f_completion = [=, f = std::forward(f)](auto&... args) mutable { - return just(std::forward_as_tuple(args...)) | ex::unpack() | + return just(std::forward_as_tuple(args...)) | unpack() | dispatch_mpi(std::move(f)) | trigger_mpi(mode); }; @@ -84,9 +83,9 @@ namespace pika::mpi::experimental { else { auto f_completion = [=, f = std::forward(f)](auto&... args) mutable { - return just(std::forward_as_tuple(args...)) | ex::unpack() | + return just(std::forward_as_tuple(args...)) | unpack() | dispatch_mpi(std::move(f)) | trigger_mpi(mode) | - ex::continues_on(default_pool_scheduler(p)); + continues_on(default_pool_scheduler(p)); }; if (requests_inline) From 633ed2a1630ad39f254205a251f35448ae3047f8 Mon Sep 17 00:00:00 2001 From: Mikael Simberg Date: Wed, 20 Nov 2024 11:48:47 +0100 Subject: [PATCH 7/8] Explicitly capture mode and priority in transform_mpi --- libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp b/libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp index b8bce7e2e..6d4c596df 100644 --- a/libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp +++ b/libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp @@ -65,7 +65,7 @@ namespace pika::mpi::experimental { if (completions_inline) { - auto f_completion = [=, f = std::forward(f)](auto&... args) mutable { + auto f_completion = [mode, p, f = std::forward(f)](auto&... args) mutable { return just(std::forward_as_tuple(args...)) | unpack() | dispatch_mpi(std::move(f)) | trigger_mpi(mode); }; @@ -82,7 +82,7 @@ namespace pika::mpi::experimental { } else { - auto f_completion = [=, f = std::forward(f)](auto&... args) mutable { + auto f_completion = [mode, p, f = std::forward(f)](auto&... args) mutable { return just(std::forward_as_tuple(args...)) | unpack() | dispatch_mpi(std::move(f)) | trigger_mpi(mode) | continues_on(default_pool_scheduler(p)); From c6ee2c8684eabc49ca2dabafcd787e20679faa1b Mon Sep 17 00:00:00 2001 From: Mikael Simberg Date: Thu, 21 Nov 2024 18:52:35 +0100 Subject: [PATCH 8/8] Remove unused capture from transform_mpi --- libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp b/libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp index 6d4c596df..cf7a516f9 100644 --- a/libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp +++ b/libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp @@ -65,7 +65,7 @@ namespace pika::mpi::experimental { if (completions_inline) { - auto f_completion = [mode, p, f = std::forward(f)](auto&... args) mutable { + auto f_completion = [mode, f = std::forward(f)](auto&... args) mutable { return just(std::forward_as_tuple(args...)) | unpack() | dispatch_mpi(std::move(f)) | trigger_mpi(mode); };