diff --git a/dkg-gadget/src/async_protocols/remote.rs b/dkg-gadget/src/async_protocols/remote.rs index f9b974107..8da204078 100644 --- a/dkg-gadget/src/async_protocols/remote.rs +++ b/dkg-gadget/src/async_protocols/remote.rs @@ -46,6 +46,16 @@ type MessageReceiverHandle = impl Clone for AsyncProtocolRemote { fn clone(&self) -> Self { + let strong_count_next = Arc::strong_count(&self.status) + 1; + let status = self.get_status(); + debug_log_cloning_and_dropping( + &self.logger, + "CLONE", + strong_count_next, + self.session_id, + status, + ); + Self { status: self.status.clone(), tx_keygen_signing: self.tx_keygen_signing.clone(), @@ -300,6 +310,16 @@ impl AsyncProtocolRemote { impl Drop for AsyncProtocolRemote { fn drop(&mut self) { + let strong_count_next = Arc::strong_count(&self.status).saturating_sub(1); + let status = self.get_status(); + debug_log_cloning_and_dropping( + &self.logger, + "DROP", + strong_count_next, + self.session_id, + status, + ); + if Arc::strong_count(&self.status) == 1 || self.is_primary_remote { if self.get_status() != MetaHandlerStatus::Complete { self.logger.info(format!( @@ -318,3 +338,15 @@ impl Drop for AsyncProtocolRemote { } } } + +fn debug_log_cloning_and_dropping( + logger: &DebugLogger, + prepend_tag: &str, + next_arc_strong_count: usize, + session_id: SessionId, + status: MetaHandlerStatus, +) { + logger.debug(format!( + "{prepend_tag}: next_arc_strong_count: {next_arc_strong_count}, session_id: {session_id} | status: {status:?}", + )); +} diff --git a/dkg-gadget/src/keygen_manager/mod.rs b/dkg-gadget/src/keygen_manager/mod.rs index d35fb8efd..953eb8599 100644 --- a/dkg-gadget/src/keygen_manager/mod.rs +++ b/dkg-gadget/src/keygen_manager/mod.rs @@ -69,6 +69,8 @@ pub enum KeygenState { /// only 1 task at a time may run for keygen const MAX_RUNNING_TASKS: usize = 1; +/// There should never be any job enqueueing for keygen +const MAX_ENQUEUED_TASKS: usize = 0; impl KeygenManager where @@ -84,6 +86,7 @@ where logger, clock, MAX_RUNNING_TASKS, + MAX_ENQUEUED_TASKS, PollMethod::Manual, ), active_keygen_retry_id: Arc::new(AtomicUsize::new(0)), @@ -479,7 +482,7 @@ where handle.session_id, self.active_keygen_retry_id.load(Ordering::Relaxed), ); - self.work_manager.push_task(task_hash, handle, task)?; + self.work_manager.push_task(task_hash, false, handle, task)?; // poll to start the task self.work_manager.poll(); Ok(()) diff --git a/dkg-gadget/src/signing_manager/mod.rs b/dkg-gadget/src/signing_manager/mod.rs index 7d85a6f7c..0da908b98 100644 --- a/dkg-gadget/src/signing_manager/mod.rs +++ b/dkg-gadget/src/signing_manager/mod.rs @@ -23,6 +23,7 @@ use std::{ pin::Pin, sync::atomic::{AtomicBool, Ordering}, }; +use webb_proposals::TypedChainId; /// For balancing the amount of work done by each node pub mod work_manager; @@ -55,6 +56,7 @@ impl Clone for SigningManager { // the maximum number of tasks that the work manager tries to assign const MAX_RUNNING_TASKS: usize = 4; +const MAX_ENQUEUED_TASKS: usize = 20; // How often to poll the jobs to check completion status const JOB_POLL_INTERVAL_IN_MILLISECONDS: u64 = 500; @@ -72,6 +74,7 @@ where logger, clock, MAX_RUNNING_TASKS, + MAX_ENQUEUED_TASKS, PollMethod::Interval { millis: JOB_POLL_INTERVAL_IN_MILLISECONDS }, ), lock: Arc::new(AtomicBool::new(false)), @@ -182,6 +185,14 @@ where let authority_public_key = dkg_worker.get_authority_public_key(); for unsigned_proposal in unsigned_proposals { + let typed_chain_id = unsigned_proposal.0.typed_chain_id; + if !self.work_manager.can_submit_more_tasks() { + dkg_worker.logger.info( + "Will not submit more unsigned proposals because the work manager is full", + ); + break + } + /* create a seed s where s is keccak256(pk, fN=at, unsignedProposal) you take this seed and use it as a seed to random number generator. @@ -226,8 +237,16 @@ where *header.number(), ) { Ok((handle, task)) => { - // send task to the work manager - self.work_manager.push_task(unsigned_proposal_hash, handle, task)?; + // Send task to the work manager. Force start if the type chain ID is + // None, implying this is a proposal needed for rotating sessions and + // thus a priority + let force_start = typed_chain_id == TypedChainId::None; + self.work_manager.push_task( + unsigned_proposal_hash, + force_start, + handle, + task, + )?; }, Err(err) => { dkg_worker diff --git a/dkg-gadget/src/signing_manager/work_manager.rs b/dkg-gadget/src/signing_manager/work_manager.rs index dfe790696..ea70b6c04 100644 --- a/dkg-gadget/src/signing_manager/work_manager.rs +++ b/dkg-gadget/src/signing_manager/work_manager.rs @@ -12,6 +12,7 @@ use dkg_primitives::{ use dkg_runtime_primitives::{associated_block_id_acceptable, SessionId}; use parking_lot::RwLock; use sp_api::BlockT; +use sp_arithmetic::traits::SaturatedConversion; use std::{ collections::{HashMap, HashSet, VecDeque}, hash::{Hash, Hasher}, @@ -31,7 +32,8 @@ pub struct WorkManager { inner: Arc>>, clock: Arc>, // for now, use a hard-coded value for the number of tasks - max_tasks: usize, + max_tasks: Arc, + max_enqueued_tasks: Arc, logger: DebugLogger, poll_method: Arc, to_handler: tokio::sync::mpsc::UnboundedSender<[u8; 32]>, @@ -57,6 +59,7 @@ impl WorkManager { logger: DebugLogger, clock: impl HasLatestHeader, max_tasks: usize, + max_enqueued_tasks: usize, poll_method: PollMethod, ) -> Self { let (to_handler, mut rx) = tokio::sync::mpsc::unbounded_channel(); @@ -67,7 +70,8 @@ impl WorkManager { enqueued_messages: HashMap::new(), })), clock: Arc::new(clock), - max_tasks, + max_tasks: Arc::new(max_tasks), + max_enqueued_tasks: Arc::new(max_enqueued_tasks), logger, to_handler, poll_method: Arc::new(poll_method), @@ -83,7 +87,7 @@ impl WorkManager { while let Some(task_hash) = rx.recv().await { job_receiver_worker .logger - .info_signing(format!("[worker] Received job {task_hash:?}",)); + .info(format!("[worker] Received job {task_hash:?}",)); job_receiver_worker.poll(); } }; @@ -99,10 +103,10 @@ impl WorkManager { tokio::select! { _ = job_receiver => { - logger.error_signing("[worker] job_receiver exited"); + logger.error("[worker] job_receiver exited"); }, _ = periodic_poller => { - logger.error_signing("[worker] periodic_poller exited"); + logger.error("[worker] periodic_poller exited"); } } }; @@ -117,6 +121,7 @@ impl WorkManager { pub fn push_task( &self, task_hash: [u8; 32], + force_start: bool, mut handle: AsyncProtocolRemote>, task: Pin>>, ) -> Result<(), DKGError> { @@ -129,6 +134,15 @@ impl WorkManager { task_hash, logger: self.logger.clone(), }; + + if force_start { + // This job has priority over the max_tasks limit + self.logger + .debug(format!("[FORCE START] Force starting task {}", hex::encode(task_hash))); + self.start_job_unconditional(job, &mut *lock); + return Ok(()) + } + lock.enqueued_tasks.push_back(job); if *self.poll_method != PollMethod::Manual { @@ -140,6 +154,11 @@ impl WorkManager { } } + pub fn can_submit_more_tasks(&self) -> bool { + let lock = self.inner.read(); + lock.enqueued_tasks.len() < *self.max_enqueued_tasks + } + // Only relevant for keygen pub fn get_active_sessions_metadata(&self, now: NumberFor) -> Vec { self.inner.read().active_tasks.iter().map(|r| r.metadata(now)).collect() @@ -154,25 +173,24 @@ impl WorkManager { } pub fn poll(&self) { - // go through each task and see if it's done - // finally, see if we can start a new task + // Go through each task and see if it's done let now = self.clock.get_latest_block_number(); let mut lock = self.inner.write(); let cur_count = lock.active_tasks.len(); lock.active_tasks.retain(|job| { let is_stalled = job.handle.signing_has_stalled(now); if is_stalled { - // if stalled, lets log the start and now blocks for logging purposes - self.logger.info_signing(format!( + // If stalled, lets log the start and now blocks for logging purposes + self.logger.info(format!( "[worker] Job {:?} | Started at {:?} | Now {:?} | is stalled, shutting down", hex::encode(job.task_hash), job.handle.started_at, now )); - // the task is stalled, lets be pedantic and shutdown + // The task is stalled, lets be pedantic and shutdown let _ = job.handle.shutdown(ShutdownReason::Stalled); - // return false so that the proposals are released from the currently signing + // Return false so that the proposals are released from the currently signing // proposals return false } @@ -184,61 +202,88 @@ impl WorkManager { let new_count = lock.active_tasks.len(); if cur_count != new_count { - self.logger - .info_signing(format!("[worker] {} jobs dropped", cur_count - new_count)); + self.logger.info(format!("[worker] {} jobs dropped", cur_count - new_count)); } - // now, check to see if there is room to start a new task - let tasks_to_start = self.max_tasks - lock.active_tasks.len(); + // Now, check to see if there is room to start a new task + let tasks_to_start = self.max_tasks.saturating_sub(lock.active_tasks.len()); for _ in 0..tasks_to_start { if let Some(job) = lock.enqueued_tasks.pop_front() { - self.logger.info_signing(format!( - "[worker] Starting job {:?}", + self.start_job_unconditional(job, &mut *lock); + } else { + break + } + } + + // Next, remove any outdated enqueued messages to prevent RAM bloat + let mut to_remove = vec![]; + for (hash, queue) in lock.enqueued_messages.iter_mut() { + let before = queue.len(); + // Only keep the messages that are not outdated + queue.retain(|msg| { + associated_block_id_acceptable(now.saturated_into(), msg.msg.associated_block_id) + }); + let after = queue.len(); + + if before != after { + self.logger.info(format!( + "[worker] Removed {} outdated enqueued messages from the queue for {:?}", + before - after, + hex::encode(*hash) + )); + } + + if queue.is_empty() { + to_remove.push(*hash); + } + } + + // Finally, to prevent the existence of piling-up empty queues, remove them + for hash in to_remove { + lock.enqueued_messages.remove(&hash); + } + } + + fn start_job_unconditional(&self, job: Job, lock: &mut WorkManagerInner) { + self.logger + .info(format!("[worker] Starting job {:?}", hex::encode(job.task_hash))); + if let Err(err) = job.handle.start() { + self.logger + .error(format!("Failed to start job {:?}: {err:?}", hex::encode(job.task_hash))); + } else { + // deliver all the enqueued messages to the protocol now + if let Some(mut enqueued_messages) = lock.enqueued_messages.remove(&job.task_hash) { + self.logger.info(format!( + "Will now deliver {} enqueued message(s) to the async protocol for {:?}", + enqueued_messages.len(), hex::encode(job.task_hash) )); - if let Err(err) = job.handle.start() { - self.logger.error_signing(format!( - "Failed to start job {:?}: {err:?}", - hex::encode(job.task_hash) - )); - } else { - // deliver all the enqueued messages to the protocol now - if let Some(mut enqueued_messages) = - lock.enqueued_messages.remove(&job.task_hash) - { - self.logger.info_signing(format!( - "Will now deliver {} enqueued message(s) to the async protocol for {:?}", - enqueued_messages.len(), - hex::encode(job.task_hash) - )); - while let Some(message) = enqueued_messages.pop_front() { - if should_deliver(&job, &message, job.task_hash) { - if let Err(err) = job.handle.deliver_message(message) { - self.logger.error_signing(format!( - "Unable to deliver message for job {:?}: {err:?}", - hex::encode(job.task_hash) - )); - } - } else { - self.logger.warn("Will not deliver enqueued message to async protocol since the message is no longer acceptable") - } + while let Some(message) = enqueued_messages.pop_front() { + if should_deliver(&job, &message, job.task_hash) { + if let Err(err) = job.handle.deliver_message(message) { + self.logger.error(format!( + "Unable to deliver message for job {:?}: {err:?}", + hex::encode(job.task_hash) + )); } + } else { + self.logger.warn("Will not deliver enqueued message to async protocol since the message is no longer acceptable") } } - let task = job.task.clone(); - // Put the job inside here, that way the drop code does not get called right away, - // killing the process - lock.active_tasks.insert(job); - // run the task - let task = async move { - let task = task.write().take().expect("Should not happen"); - task.into_inner().await - }; - - // Spawn the task. When it finishes, it will clean itself up - tokio::task::spawn(task); } } + let task = job.task.clone(); + // Put the job inside here, that way the drop code does not get called right away, + // killing the process + lock.active_tasks.insert(job); + // run the task + let task = async move { + let task = task.write().take().expect("Should not happen"); + task.into_inner().await + }; + + // Spawn the task. When it finishes, it will clean itself up + tokio::task::spawn(task); } pub fn job_exists(&self, job: &[u8; 32]) -> bool { @@ -247,7 +292,7 @@ impl WorkManager { } pub fn deliver_message(&self, msg: SignedDKGMessage, message_task_hash: [u8; 32]) { - self.logger.debug_signing(format!( + self.logger.debug(format!( "Delivered message is intended for session_id = {}", msg.msg.session_id )); @@ -261,7 +306,7 @@ impl WorkManager { task.handle.session_id )); if let Err(_err) = task.handle.deliver_message(msg) { - self.logger.warn_signing("Failed to deliver message to signing task"); + self.logger.warn("Failed to deliver message to signing task"); } return @@ -276,7 +321,7 @@ impl WorkManager { task.handle.session_id )); if let Err(_err) = task.handle.deliver_message(msg) { - self.logger.warn_signing("Failed to deliver message to signing task"); + self.logger.warn("Failed to deliver message to signing task"); } return @@ -290,7 +335,7 @@ impl WorkManager { let enqueued_session_ids: Vec = lock.enqueued_tasks.iter().map(|job| job.handle.session_id).collect(); self.logger - .info_signing(format!("Enqueuing message for {:?} | current_running_session_ids: {current_running_session_ids:?} | enqueued_session_ids: {enqueued_session_ids:?}", hex::encode(message_task_hash))); + .info(format!("Enqueuing message for {:?} | current_running_session_ids: {current_running_session_ids:?} | enqueued_session_ids: {enqueued_session_ids:?}", hex::encode(message_task_hash))); lock.enqueued_messages.entry(message_task_hash).or_default().push_back(msg) } } @@ -339,7 +384,7 @@ impl Hash for Job { impl Drop for Job { fn drop(&mut self) { - self.logger.info_signing(format!( + self.logger.info(format!( "Will remove job {:?} from currently_signing_proposals", hex::encode(self.task_hash) )); diff --git a/dkg-test-suite/yarn.lock b/dkg-test-suite/yarn.lock index 7b6417a14..ab6d4be24 100644 --- a/dkg-test-suite/yarn.lock +++ b/dkg-test-suite/yarn.lock @@ -3692,8 +3692,10 @@ "@webb-tools/contracts" "^0.5.40" ethers "5.7.0" -"@webb-tools/dkg-substrate-types@0.0.5", "@webb-tools/dkg-substrate-types@file:../types": - version "0.0.6" +"@webb-tools/dkg-substrate-types@0.0.5", "@webb-tools/dkg-substrate-types@^0.0.6": + version "0.0.5" + resolved "https://registry.yarnpkg.com/@webb-tools/dkg-substrate-types/-/dkg-substrate-types-0.0.5.tgz#d6178d5d9feca39a1f0b56f42295bcd88d343088" + integrity sha512-WMVqLZ7OPkpoel7jSqBXFdijl15+D3tSs+OG98a5jf5DrvgwEFaC3QrWmD7OWfdfaRR3gpoy6WmEesBpozprsQ== dependencies: "@babel/cli" "^7.20.7" "@babel/core" "^7.20.12" diff --git a/scripts/insert_keys.sh b/scripts/insert_keys.sh index 219568dff..b7b7b185b 100755 --- a/scripts/insert_keys.sh +++ b/scripts/insert_keys.sh @@ -1,4 +1,4 @@ -#!/bin/sh +#!/bin/bash # The following line ensure we run from the project root PROJECT_ROOT=$(git rev-parse --show-toplevel) @@ -203,4 +203,4 @@ echo "\n ****************** NODE-5 KEY INSERTION ******************" --suri "gown surprise mirror hotel cash alarm raccoon you frog rose midnight enter//webb//4//dkg" \ --key-type wdkg -echo "node-5 keys inserted into path: ./tmp/standalone5 \n" \ No newline at end of file +echo "node-5 keys inserted into path: ./tmp/standalone5 \n" diff --git a/scripts/run-local-testnet.sh b/scripts/run-local-testnet.sh index 06f5d51c3..e3db497b2 100755 --- a/scripts/run-local-testnet.sh +++ b/scripts/run-local-testnet.sh @@ -1,4 +1,4 @@ -#!/usr/bin/env bash +#!/bin/bash set -e # ensure we kill all child processes when we exit trap "trap - SIGTERM && kill -- -$$" SIGINT SIGTERM EXIT diff --git a/standalone/node/src/chain_spec.rs b/standalone/node/src/chain_spec.rs index 8df840460..60e007656 100644 --- a/standalone/node/src/chain_spec.rs +++ b/standalone/node/src/chain_spec.rs @@ -168,10 +168,10 @@ pub fn local_testnet_config() -> Result { let wasm_binary = WASM_BINARY.ok_or_else(|| "Development wasm not available".to_string())?; // This maybe weird at first, but now we can control how many authorities we want to have in the // local testnet. during the build process, using the environment variable below. - // it defaults to 3 if not set. + // it defaults to 5 if not set. let initial_authorities_count = option_env!("INITIAL_AUTHORITIES_COUNT") .and_then(|v| v.parse::().ok()) - .unwrap_or(3); + .unwrap_or(5); const ALL_DEV_AUTHORITIES: [&str; 6] = ["Alice", "Bob", "Charlie", "Dave", "Eve", "Ferdie"]; let initial_authorities_count = core::cmp::min(initial_authorities_count, ALL_DEV_AUTHORITIES.len());