From ff92b1cd73d35d2b27bde1d2d198d9a8edb4b482 Mon Sep 17 00:00:00 2001 From: ember arlynx Date: Mon, 11 Dec 2023 16:51:22 -0500 Subject: [PATCH] Child_processes: ensure stderr/stdout get flushed --- src/lib/child_processes/child_processes.ml | 6 +-- src/lib/mina_net2/libp2p_helper.ml | 57 +++++++++++++--------- 2 files changed, 35 insertions(+), 28 deletions(-) diff --git a/src/lib/child_processes/child_processes.ml b/src/lib/child_processes/child_processes.ml index 6e1deaccd18..63839e20b0a 100644 --- a/src/lib/child_processes/child_processes.ml +++ b/src/lib/child_processes/child_processes.ml @@ -270,11 +270,7 @@ let start_custom : Deferred.Or_error.try_with ~here:[%here] (fun () -> Process.wait process) in [%log trace] "child process %s died" name ; - don't_wait_for - (let%bind () = after (Time.Span.of_sec 1.) in - let%bind () = Writer.close @@ Process.stdin process in - let%bind () = Reader.close @@ Process.stdout process in - Reader.close @@ Process.stderr process ) ; + don't_wait_for (Writer.close @@ Process.stdin process) ; let%bind () = Sys.remove lock_path in Ivar.fill terminated_ivar termination_status ; let log_bad_termination () = diff --git a/src/lib/mina_net2/libp2p_helper.ml b/src/lib/mina_net2/libp2p_helper.ml index ee91c2a3ff1..2ef08749016 100644 --- a/src/lib/mina_net2/libp2p_helper.ml +++ b/src/lib/mina_net2/libp2p_helper.ml @@ -122,6 +122,7 @@ type t = { process : Child_processes.t ; logger : Logger.t ; mutable finished : bool + ; stderr_finished : unit Ivar.t ; outstanding_requests : Libp2p_ipc.rpc_response_body Or_error.t Ivar.t Libp2p_ipc.Sequence_number.Table.t @@ -141,13 +142,15 @@ let handle_libp2p_helper_termination t ~pids ~killed result = ~metadata: [ ("exit_status", `String (Unix.Exit_or_signal.to_string_hum e)) ] ; t.finished <- true ; + let%map () = Ivar.read t.stderr_finished in raise Libp2p_helper_died_unexpectedly | Error err -> [%log' fatal t.logger] !"Child processes library could not track libp2p_helper process: $err" ~metadata:[ ("err", Error_json.error_to_yojson err) ] ; t.finished <- true ; - let%map () = Deferred.ignore_m (Child_processes.kill t.process) in + let%bind () = Deferred.ignore_m (Child_processes.kill t.process) in + let%map () = Ivar.read t.stderr_finished in raise Libp2p_helper_died_unexpectedly | Ok (Ok ()) -> [%log' error t.logger] @@ -249,33 +252,39 @@ let spawn ?(allow_multiple_instances = false) ~logger ~pids ~conf_dir { process ; logger ; finished = false + ; stderr_finished = Ivar.create () ; outstanding_requests = Libp2p_ipc.Sequence_number.Table.create () } in termination_handler := handle_libp2p_helper_termination t ~pids ; O1trace.background_thread "handle_libp2p_helper_subprocess_logs" (fun () -> - Child_processes.stderr process - |> Strict_pipe.Reader.iter ~f:(fun line -> - Mina_metrics.( - Counter.inc_one Mina_metrics.Network.ipc_logs_received_total) ; - let record_result = - try - Some - (Go_log.record_of_yojson @@ Yojson.Safe.from_string line) - with Yojson.Json_error _error -> None - in - ( match record_result with - | Some (Ok record) -> - record |> Go_log.record_to_message |> Logger.raw logger - | Some (Error error) -> - [%log error] - "failed to parse record over libp2p_helper stderr: \ - $error" - ~metadata:[ ("error", `String error) ] - | None -> - Core.print_endline line ) ; - Deferred.unit ) ) ; + let%map () = + Child_processes.stderr process + |> Strict_pipe.Reader.iter ~f:(fun line -> + Mina_metrics.( + Counter.inc_one + Mina_metrics.Network.ipc_logs_received_total) ; + let record_result = + try + Some + ( Go_log.record_of_yojson + @@ Yojson.Safe.from_string line ) + with Yojson.Json_error _error -> None + in + ( match record_result with + | Some (Ok record) -> + record |> Go_log.record_to_message |> Logger.raw logger + | Some (Error error) -> + [%log error] + "failed to parse record over libp2p_helper stderr: \ + $error" + ~metadata:[ ("error", `String error) ] + | None -> + Core.print_endline line ) ; + Deferred.unit ) + in + Ivar.fill t.stderr_finished () ) ; O1trace.background_thread "handle_libp2p_ipc_incoming" (fun () -> Child_processes.stdout process |> Libp2p_ipc.read_incoming_messages @@ -284,7 +293,9 @@ let spawn ?(allow_multiple_instances = false) ~logger ~pids ~conf_dir let msg = Libp2p_ipc.Reader.DaemonInterface.Message.get msg in - handle_incoming_message t msg ~handle_push_message + if not t.finished then + handle_incoming_message t msg ~handle_push_message + else Deferred.unit | Error error -> [%log error] "failed to parse IPC message over libp2p_helper stdout: \