Skip to content

Commit

Permalink
Fix round notification handling for CLI commands. (#1826)
Browse files Browse the repository at this point in the history
* Fix some comments and style in end-to-end tests.

* Wait for timeout on finalize_block communication error.

* Wait for timeout if there's a conflicting proposal in the current round.

* Return a notification stream from listen; use it in apply_client_command.

* Synchronize from validators in listen.

* Listen to notifications and save wallet in process_inbox.

* Save wallet in watch, if we learn about new blocks.

* Add an end-to-end test.

* First try without subscribing to the validators.
  • Loading branch information
afck committed Mar 27, 2024
1 parent 206563f commit 950a223
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 76 deletions.
70 changes: 48 additions & 22 deletions linera-core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1502,10 +1502,24 @@ where
if let Some(certificate) = &manager.requested_locked {
if certificate.round == manager.current_round {
let committee = self.local_committee().await?;
let certificate = self
.finalize_block(&committee, *certificate.clone())
.await?;
return Ok(ClientOutcome::Committed(Some(certificate)));
match self.finalize_block(&committee, *certificate.clone()).await {
Ok(certificate) => return Ok(ClientOutcome::Committed(Some(certificate))),
Err(ChainClientError::CommunicationError(_)) => {
// Communication errors in this case often mean that someone else already
// finalized the block.
let timestamp = manager.round_timeout.ok_or_else(|| {
ChainClientError::BlockProposalError(
"Cannot propose in the current round.",
)
})?;
return Ok(ClientOutcome::WaitForTimeout(RoundTimeout {
timestamp,
current_round: manager.current_round,
next_block_height: info.next_block_height,
}));
}
Err(error) => return Err(error),
}
}
}
// The block we want to propose is either the highest validated, or our pending one.
Expand All @@ -1529,18 +1543,24 @@ where
.map_or(false, |proposal| {
proposal.content.round == manager.current_round && proposal.content.block != *block
});
let round = if conflicting_proposal {
manager
.ownership
.next_round(manager.current_round)
.filter(|_| manager.current_round.is_multi_leader())
.ok_or_else(|| {
ChainClientError::BlockProposalError(
"Conflicting proposal in the current round",
)
})?
} else {
let round = if !conflicting_proposal {
manager.current_round
} else if let Some(round) = manager
.ownership
.next_round(manager.current_round)
.filter(|_| manager.current_round.is_multi_leader())
{
round
} else if let Some(timestamp) = manager.round_timeout {
return Ok(ClientOutcome::WaitForTimeout(RoundTimeout {
timestamp,
current_round: manager.current_round,
next_block_height: info.next_block_height,
}));
} else {
return Err(ChainClientError::BlockProposalError(
"Conflicting proposal in the current round.",
));
};
let can_propose = match round {
Round::Fast => manager.ownership.super_owners.contains_key(&identity),
Expand Down Expand Up @@ -2125,30 +2145,36 @@ where

/// Spawns a task that listens to notifications about the current chain from all validators,
/// and synchronizes the local state accordingly.
pub async fn listen(&self) -> Result<AbortOnDrop, ChainClientError>
pub async fn listen(&self) -> Result<(AbortOnDrop, NotificationStream), ChainClientError>
where
P: Send + 'static,
{
let mut senders = HashMap::new(); // Senders to cancel notification streams.
let notifications = self.lock().await.subscribe().await?;
let mut guard = self.lock().await;
let notifications = guard.subscribe().await?;
let notifications2 = guard.subscribe().await?;
if let Err(error) = guard.synchronize_from_validators().await {
error!("Failed to synchronize from validators: {}", error);
}
drop(guard);
let (mut notifications, abort) = stream::abortable(notifications);
if let Err(err) = self.update_streams(&mut senders).await {
error!("Failed to update committee: {}", err);
if let Err(error) = self.update_streams(&mut senders).await {
error!("Failed to update committee: {}", error);
}
let this = self.clone();
tokio::spawn(async move {
while let Some(notification) = notifications.next().await {
if matches!(notification.reason, Reason::NewBlock { .. }) {
if let Err(err) = this.update_streams(&mut senders).await {
error!("Failed to update committee: {}", err);
if let Err(error) = this.update_streams(&mut senders).await {
error!("Failed to update committee: {}", error);
}
}
}
for abort in senders.into_values() {
abort.abort();
}
});
Ok(AbortOnDrop(abort))
Ok((AbortOnDrop(abort), notifications2))
}

async fn update_streams(
Expand Down
2 changes: 1 addition & 1 deletion linera-core/src/unit_tests/client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ where
let sender = ArcChainClient::new(sender);
// Listen to the notifications on the sender chain.
let mut notifications = sender.lock().await.subscribe().await?;
let _listen_handle = sender.listen().await?;
let (_listen_handle, _) = sender.listen().await?;
{
let mut sender = sender.lock().await;
let certificate = sender
Expand Down
16 changes: 4 additions & 12 deletions linera-service/src/chain_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,18 +131,10 @@ where
entry.insert(client.clone());
client
};
let _listen_handle = client.listen().await?;
let mut local_stream = {
let mut guard = client.lock().await;
let stream = guard.subscribe().await?;
// Process the inbox: For messages that are already there we won't receive a
// notification.
guard.synchronize_from_validators().await?;
if let Err(error) = guard.process_inbox_if_owned().await {
warn!(%error, "Failed to process inbox after starting stream.");
}
stream
};
let (_listen_handle, mut local_stream) = client.listen().await?;
if let Err(error) = client.lock().await.process_inbox_if_owned().await {
warn!(%error, "Failed to process inbox after starting stream.");
}
while let Some(notification) = local_stream.next().await {
info!("Received new notification: {:?}", notification);
if config.delay_before_ms > 0 {
Expand Down
4 changes: 3 additions & 1 deletion linera-service/src/cli_wrappers/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,13 +603,15 @@ impl ClientWrapper {
weights: Vec<u64>,
multi_leader_rounds: u32,
balance: Amount,
base_timeout: Duration,
) -> Result<(MessageId, ChainId)> {
let mut command = self.command().await?;
command
.arg("open-multi-owner-chain")
.args(["--from", &from.to_string()])
.arg("--owner-public-keys")
.args(to_public_keys.iter().map(PublicKey::to_string));
.args(to_public_keys.iter().map(PublicKey::to_string))
.args(["--base-timeout-ms", &base_timeout.as_millis().to_string()]);
if !weights.is_empty() {
command
.arg("--owner-weights")
Expand Down
64 changes: 48 additions & 16 deletions linera-service/src/linera/client_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,31 +308,57 @@ impl ClientContext {
.receive_certificate(certificate.clone())
.await
.unwrap();
self.process_inbox(&mut chain_client).await.unwrap();
let epochs = chain_client.epochs().await.unwrap();
let chain_client = chain_client.into_arc();
self.process_inbox(&chain_client).await.unwrap();
let epochs = chain_client.lock().await.epochs().await.unwrap();
debug!("{:?} accepts epochs {:?}", chain_id, epochs);
}
}

pub async fn process_inbox<S>(
&mut self,
chain_client: &mut ChainClient<impl ValidatorNodeProvider + Sync + 'static, S>,
chain_client: &ArcChainClient<NodeProvider, S>,
) -> anyhow::Result<Vec<Certificate>>
where
S: Storage + Clone + Send + Sync + 'static,
ViewError: From<S::ContextError>,
{
let mut certificates = Vec::new();
// Try processing the inbox optimistically without waiting for validator notifications.
let (new_certificates, maybe_timeout) = {
let mut guard = chain_client.0.lock().await;
guard.synchronize_from_validators().await?;
let result = guard.process_inbox().await;
self.update_wallet_from_client(&mut *guard).await;
if result.is_err() {
self.save_wallet();
}
result?
};
certificates.extend(new_certificates);
if maybe_timeout.is_none() {
self.save_wallet();
return Ok(certificates);
}
// Start listening for notifications, so we learn about new rounds and blocks.
let (_listen_handle, mut notification_stream) = chain_client.listen().await?;
loop {
chain_client.synchronize_from_validators().await?;
let mut stream = chain_client.subscribe().await?;
let result = chain_client.process_inbox().await;
self.update_wallet_from_client(chain_client).await;
let (new_certificates, maybe_timeout) = result.unwrap();
let (new_certificates, maybe_timeout) = {
let mut guard = chain_client.0.lock().await;
let result = guard.process_inbox().await;
self.update_wallet_from_client(&mut *guard).await;
if result.is_err() {
self.save_wallet();
}
result?
};
certificates.extend(new_certificates);
match maybe_timeout {
None => return Ok(certificates),
Some(timestamp) => wait_for_next_round(&mut stream, timestamp).await,
None => {
self.save_wallet();
return Ok(certificates);
}
Some(timestamp) => wait_for_next_round(&mut notification_stream, timestamp).await,
}
}
}
Expand Down Expand Up @@ -379,7 +405,7 @@ impl ClientContext {
.await
.synchronize_from_validators()
.await?;
self.process_inbox(&mut *chain_client.lock().await).await?;
self.process_inbox(chain_client).await?;
Ok(bytecode_id)
}

Expand All @@ -403,9 +429,15 @@ impl ClientContext {
Fut: Future<Output = Result<ClientOutcome<T>, E>>,
anyhow::Error: From<E>,
{
// Try applying f optimistically without validator notifications. Return if committed.
let result = f(client.0.clone().lock_owned().await).await;
self.update_and_save_wallet(&mut *client.lock().await).await;
if let ClientOutcome::Committed(t) = result? {
return Ok(t);
}
// Start listening for notifications, so we learn about new rounds and blocks.
let (_listen_handle, mut notification_stream) = client.listen().await?;
loop {
// Subscribe to the local node, so we learn about new rounds.
let mut notification_stream = client.lock().await.subscribe().await?;
// Try applying f. Return if committed.
let result = f(client.0.clone().lock_owned().await).await;
self.update_and_save_wallet(&mut *client.lock().await).await;
Expand Down Expand Up @@ -460,9 +492,9 @@ impl ClientContext {
ViewError: From<S::ContextError>,
{
for chain_id in self.wallet_state.own_chain_ids() {
let mut chain_client = self.make_chain_client(storage.clone(), chain_id);
self.process_inbox(&mut chain_client).await.unwrap();
chain_client.update_validators().await.unwrap();
let chain_client = self.make_chain_client(storage.clone(), chain_id).into_arc();
self.process_inbox(&chain_client).await.unwrap();
chain_client.lock().await.update_validators().await.unwrap();
}
}

Expand Down
34 changes: 18 additions & 16 deletions linera-service/src/linera/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use linera_core::{
local_node::LocalNodeClient,
node::LocalValidatorNodeProvider,
notifier::Notifier,
worker::WorkerState,
worker::{Reason, WorkerState},
};
use linera_execution::{
committee::{Committee, ValidatorName, ValidatorState},
Expand Down Expand Up @@ -403,11 +403,10 @@ impl Runnable for Job {

ProcessInbox { chain_id } => {
let chain_id = chain_id.unwrap_or_else(|| context.default_chain());
let mut chain_client = context.make_chain_client(storage, chain_id);
let chain_client = context.make_chain_client(storage, chain_id).into_arc();
info!("Processing the inbox of chain {}", chain_id);
let time_start = Instant::now();
let certificates = context.process_inbox(&mut chain_client).await?;
context.update_and_save_wallet(&mut chain_client).await;
let certificates = context.process_inbox(&chain_client).await?;
let time_total = time_start.elapsed();
info!(
"Processed incoming messages with {} blocks in {} ms",
Expand Down Expand Up @@ -461,18 +460,19 @@ impl Runnable for Job {
// Make sure genesis chains are subscribed to the admin chain.
let context = Arc::new(Mutex::new(context));
let mut context = context.lock().await;
let mut chain_client = context.make_chain_client(
storage.clone(),
context.wallet_state().genesis_admin_chain(),
);
let chain_client = context
.make_chain_client(
storage.clone(),
context.wallet_state().genesis_admin_chain(),
)
.into_arc();
let n = context
.process_inbox(&mut chain_client)
.process_inbox(&chain_client)
.await
.unwrap()
.into_iter()
.filter_map(|c| c.value().executed_block().map(|e| e.messages.len()))
.sum::<usize>();
let chain_client = chain_client.into_arc();
info!("Subscribed {} chains to new committees", n);
let maybe_certificate = context
.apply_client_command(&chain_client, |mut chain_client| {
Expand Down Expand Up @@ -748,17 +748,19 @@ impl Runnable for Job {

Watch { chain_id, raw } => {
let chain_id = chain_id.unwrap_or_else(|| context.default_chain());
let mut chain_client = context.make_chain_client(storage, chain_id);
let chain_client = context.make_chain_client(storage, chain_id).into_arc();
info!("Watching for notifications for chain {:?}", chain_id);
let mut notification_stream = chain_client.subscribe().await?;
let _listen_handle = chain_client.into_arc().listen().await?;
while let Some(notification) = notification_stream.next().await {
let (_listen_handle, mut notifications) = chain_client.listen().await?;
while let Some(notification) = notifications.next().await {
if let Reason::NewBlock { .. } = notification.reason {
let mut guard = chain_client.lock().await;
context.update_and_save_wallet(&mut *guard).await;
}
if raw {
println!("{}", serde_json::to_string(&notification)?);
}
}
info!("Notification stream ended.");
// Not saving the wallet because `listen()` does not create blocks.
}

Service { config, port } => {
Expand Down Expand Up @@ -831,8 +833,8 @@ impl Runnable for Job {

info!("Synchronizing");
chain_client.synchronize_from_validators().await?;
context.process_inbox(&mut chain_client).await?;
let chain_client = chain_client.into_arc();
context.process_inbox(&chain_client).await?;

let (application_id, _) = context
.apply_client_command(&chain_client, move |mut chain_client| {
Expand Down
Loading

0 comments on commit 950a223

Please sign in to comment.