Skip to content

Commit

Permalink
Merge pull request #14736 from MinaProtocol/fix/archive-client-decouple
Browse files Browse the repository at this point in the history
Fix/archive client decouple
  • Loading branch information
georgeee authored Jan 10, 2024
2 parents 3b54253 + af73b42 commit c22a213
Showing 1 changed file with 51 additions and 43 deletions.
94 changes: 51 additions & 43 deletions src/lib/mina_lib/archive_client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -66,54 +66,19 @@ let dispatch_precomputed_block =
let dispatch_extensional_block =
make_dispatch_block Archive_lib.Rpc.extensional_block

let transfer ~logger ~precomputed_values ~archive_location
let transfer
(breadcrumb_reader :
Transition_frontier.Extensions.New_breadcrumbs.view
Broadcast_pipe.Reader.t ) =
Broadcast_pipe.Reader.t ) breadcrumb_writer =
Broadcast_pipe.Reader.iter breadcrumb_reader ~f:(fun breadcrumbs ->
Deferred.List.iter breadcrumbs ~f:(fun breadcrumb ->
let start = Time.now () in
let diff =
Archive_lib.Diff.Builder.breadcrumb_added ~precomputed_values
~logger breadcrumb
in
let diff_time = Time.now () in
[%log debug] "Archive data generation for $state_hash took $time ms"
~metadata:
[ ( "state_hash"
, Mina_base.State_hash.to_yojson
(Transition_frontier.Breadcrumb.state_hash breadcrumb) )
; ("time", `Float (Time.Span.to_ms (Time.diff diff_time start)))
] ;
match%map
dispatch archive_location ~logger (Transition_frontier diff)
with
| Ok () ->
[%log debug]
"Dispatched archive data for $state_hash, took $time ms"
~metadata:
[ ( "state_hash"
, Mina_base.State_hash.to_yojson
(Transition_frontier.Breadcrumb.state_hash breadcrumb)
)
; ( "time"
, `Float
(Time.Span.to_ms (Time.diff (Time.now ()) diff_time)) )
] ;
()
| Error e ->
[%log warn]
~metadata:
[ ("error", Error_json.error_to_yojson e)
; ( "breadcrumb"
, Transition_frontier.Breadcrumb.to_yojson breadcrumb )
]
"Could not send breadcrumb to archive: $error" ) )
Async.Pipe.write_without_pushback breadcrumb_writer breadcrumbs ;
Deferred.unit )

let run ~logger ~precomputed_values
~(frontier_broadcast_pipe :
Transition_frontier.t option Broadcast_pipe.Reader.t ) archive_location =
O1trace.background_thread "send_diffs_to_archiver" (fun () ->
let reader, writer = Async.Pipe.create () in
O1trace.background_thread "enqueue_diffs_for_archiver" (fun () ->
Broadcast_pipe.Reader.iter frontier_broadcast_pipe
~f:
(Option.value_map ~default:Deferred.unit
Expand All @@ -125,5 +90,48 @@ let run ~logger ~precomputed_values
Transition_frontier.Extensions.get_view_pipe extensions
Transition_frontier.Extensions.New_breadcrumbs
in
transfer ~logger ~precomputed_values ~archive_location
breadcrumb_reader ) ) )
transfer breadcrumb_reader writer ) ) ) ;
O1trace.background_thread "send_diffs_to_archiver" (fun () ->
Async.Pipe.iter reader ~f:(fun breadcrumbs ->
Deferred.List.iter breadcrumbs ~f:(fun breadcrumb ->
let start = Time.now () in
let diff =
Archive_lib.Diff.Builder.breadcrumb_added ~precomputed_values
~logger breadcrumb
in
let diff_time = Time.now () in
[%log debug]
"Archive data generation for $state_hash took $time ms"
~metadata:
[ ( "state_hash"
, Mina_base.State_hash.to_yojson
(Transition_frontier.Breadcrumb.state_hash breadcrumb)
)
; ( "time"
, `Float (Time.Span.to_ms (Time.diff diff_time start)) )
] ;
match%map
dispatch archive_location ~logger (Transition_frontier diff)
with
| Ok () ->
[%log debug]
"Dispatched archive data for $state_hash, took $time ms"
~metadata:
[ ( "state_hash"
, Mina_base.State_hash.to_yojson
(Transition_frontier.Breadcrumb.state_hash
breadcrumb ) )
; ( "time"
, `Float
(Time.Span.to_ms
(Time.diff (Time.now ()) diff_time) ) )
] ;
()
| Error e ->
[%log warn]
~metadata:
[ ("error", Error_json.error_to_yojson e)
; ( "breadcrumb"
, Transition_frontier.Breadcrumb.to_yojson breadcrumb )
]
"Could not send breadcrumb to archive: $error" ) ) )

0 comments on commit c22a213

Please sign in to comment.