Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/archive client decouple #14736

Merged
merged 4 commits into from
Jan 10, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 51 additions & 43 deletions src/lib/mina_lib/archive_client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -66,54 +66,19 @@ let dispatch_precomputed_block =
let dispatch_extensional_block =
make_dispatch_block Archive_lib.Rpc.extensional_block

let transfer ~logger ~precomputed_values ~archive_location
let transfer
(breadcrumb_reader :
Transition_frontier.Extensions.New_breadcrumbs.view
Broadcast_pipe.Reader.t ) =
Broadcast_pipe.Reader.t ) breadcrumb_writer =
Broadcast_pipe.Reader.iter breadcrumb_reader ~f:(fun breadcrumbs ->
Deferred.List.iter breadcrumbs ~f:(fun breadcrumb ->
let start = Time.now () in
let diff =
Archive_lib.Diff.Builder.breadcrumb_added ~precomputed_values
~logger breadcrumb
in
let diff_time = Time.now () in
[%log debug] "Archive data generation for $state_hash took $time ms"
~metadata:
[ ( "state_hash"
, Mina_base.State_hash.to_yojson
(Transition_frontier.Breadcrumb.state_hash breadcrumb) )
; ("time", `Float (Time.Span.to_ms (Time.diff diff_time start)))
] ;
match%map
dispatch archive_location ~logger (Transition_frontier diff)
with
| Ok () ->
[%log debug]
"Dispatched archive data for $state_hash, took $time ms"
~metadata:
[ ( "state_hash"
, Mina_base.State_hash.to_yojson
(Transition_frontier.Breadcrumb.state_hash breadcrumb)
)
; ( "time"
, `Float
(Time.Span.to_ms (Time.diff (Time.now ()) diff_time)) )
] ;
()
| Error e ->
[%log warn]
~metadata:
[ ("error", Error_json.error_to_yojson e)
; ( "breadcrumb"
, Transition_frontier.Breadcrumb.to_yojson breadcrumb )
]
"Could not send breadcrumb to archive: $error" ) )
Async.Pipe.write_without_pushback breadcrumb_writer breadcrumbs ;
Deferred.unit )

let run ~logger ~precomputed_values
~(frontier_broadcast_pipe :
Transition_frontier.t option Broadcast_pipe.Reader.t ) archive_location =
O1trace.background_thread "send_diffs_to_archiver" (fun () ->
let reader, writer = Async.Pipe.create () in
O1trace.background_thread "enqueue_diffs_for_archiver" (fun () ->
Broadcast_pipe.Reader.iter frontier_broadcast_pipe
~f:
(Option.value_map ~default:Deferred.unit
Expand All @@ -125,5 +90,48 @@ let run ~logger ~precomputed_values
Transition_frontier.Extensions.get_view_pipe extensions
Transition_frontier.Extensions.New_breadcrumbs
in
transfer ~logger ~precomputed_values ~archive_location
breadcrumb_reader ) ) )
transfer breadcrumb_reader writer ) ) ) ;
O1trace.background_thread "send_diffs_to_archiver" (fun () ->
Async.Pipe.iter reader ~f:(fun breadcrumbs ->
Deferred.List.iter breadcrumbs ~f:(fun breadcrumb ->
let start = Time.now () in
let diff =
Archive_lib.Diff.Builder.breadcrumb_added ~precomputed_values
~logger breadcrumb
in
let diff_time = Time.now () in
[%log debug]
"Archive data generation for $state_hash took $time ms"
~metadata:
[ ( "state_hash"
, Mina_base.State_hash.to_yojson
(Transition_frontier.Breadcrumb.state_hash breadcrumb)
)
; ( "time"
, `Float (Time.Span.to_ms (Time.diff diff_time start)) )
] ;
match%map
dispatch archive_location ~logger (Transition_frontier diff)
with
| Ok () ->
[%log debug]
"Dispatched archive data for $state_hash, took $time ms"
~metadata:
[ ( "state_hash"
, Mina_base.State_hash.to_yojson
(Transition_frontier.Breadcrumb.state_hash
breadcrumb ) )
; ( "time"
, `Float
(Time.Span.to_ms
(Time.diff (Time.now ()) diff_time) ) )
] ;
()
| Error e ->
[%log warn]
~metadata:
[ ("error", Error_json.error_to_yojson e)
; ( "breadcrumb"
, Transition_frontier.Breadcrumb.to_yojson breadcrumb )
]
"Could not send breadcrumb to archive: $error" ) ) )