From cbc8320d35358da14d79ebcada4dfb6756ffac79 Mon Sep 17 00:00:00 2001 From: Wen <113942165+wen-coding@users.noreply.github.com> Date: Sat, 6 Jul 2024 07:35:27 -0700 Subject: [PATCH] =?UTF-8?q?wen=5Frestart:=20remove=20WAITING=5FFOR=5FSUPER?= =?UTF-8?q?MAJORITY=20and=20handle=20all=20states=20i=E2=80=A6=20(#1785)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit wen_restart: remove WAITING_FOR_SUPERMAJORITY and handle all states in proto. --- wen-restart/proto/wen_restart.proto | 3 +- wen-restart/src/wen_restart.rs | 262 ++++++++++++++++++++++++---- 2 files changed, 231 insertions(+), 34 deletions(-) diff --git a/wen-restart/proto/wen_restart.proto b/wen-restart/proto/wen_restart.proto index ef2b7e7346ed5f..e3fc0743ef5dc8 100644 --- a/wen-restart/proto/wen_restart.proto +++ b/wen-restart/proto/wen_restart.proto @@ -6,8 +6,7 @@ enum State { LAST_VOTED_FORK_SLOTS = 1; HEAVIEST_FORK = 2; GENERATE_SNAPSHOT = 3; - WAITING_FOR_SUPERMAJORITY = 4; - DONE = 5; + DONE = 4; } message LastVotedForkSlotsRecord { diff --git a/wen-restart/src/wen_restart.rs b/wen-restart/src/wen_restart.rs index 71b8ea027b80e9..f7ed4742a5b809 100644 --- a/wen-restart/src/wen_restart.rs +++ b/wen-restart/src/wen_restart.rs @@ -82,6 +82,7 @@ pub enum WenRestartError { FutureSnapshotExists(Slot, Slot, String), GenerateSnapshotWhenOneExists(Slot, String), MalformedLastVotedForkSlotsProtobuf(Option), + MalformedProgress(RestartState, String), MissingLastVotedForkSlots, MissingFullSnapshot(String), NotEnoughStakeAgreeingWithUs(Slot, Hash, HashMap<(Slot, Hash), u64>), @@ -135,6 +136,9 @@ impl std::fmt::Display for WenRestartError { WenRestartError::MalformedLastVotedForkSlotsProtobuf(record) => { write!(f, "Malformed last voted fork slots protobuf: {:?}", record) } + WenRestartError::MalformedProgress(state, missing) => { + write!(f, "Malformed progress: {:?} missing {}", state, missing) + } WenRestartError::MissingLastVotedForkSlots => { write!(f, "Missing last voted fork slots") } @@ -590,7 +594,11 @@ pub(crate) fn aggregate_restart_heaviest_fork( let epoch_stakes = root_bank.epoch_stakes(root_bank.epoch()).unwrap(); let total_stake = epoch_stakes.total_stake(); if progress.my_heaviest_fork.is_none() { - return Err(WenRestartError::UnexpectedState(RestartState::HeaviestFork).into()); + return Err(WenRestartError::MalformedProgress( + RestartState::HeaviestFork, + "my_heaviest_fork".to_string(), + ) + .into()); } let my_heaviest_fork = progress.my_heaviest_fork.clone().unwrap(); let heaviest_fork_slot = my_heaviest_fork.slot; @@ -1086,12 +1094,28 @@ pub(crate) fn initialize( total_active_stake: result.total_active_stake, }) }) - .unwrap(), + .ok_or(WenRestartError::MalformedProgress( + RestartState::HeaviestFork, + "final_result in last_voted_fork_slots_aggregate".to_string(), + ))?, my_heaviest_fork: progress.my_heaviest_fork.clone(), }, progress, )), - _ => Err(WenRestartError::UnexpectedState(progress.state()).into()), + RestartState::GenerateSnapshot => Ok(( + WenRestartProgressInternalState::GenerateSnapshot { + new_root_slot: progress + .my_heaviest_fork + .as_ref() + .ok_or(WenRestartError::MalformedProgress( + RestartState::GenerateSnapshot, + "my_heaviest_fork".to_string(), + ))? + .slot, + my_snapshot: progress.my_snapshot.clone(), + }, + progress, + )), } } @@ -1119,7 +1143,6 @@ pub(crate) fn write_wen_restart_records( mod tests { use { crate::wen_restart::{tests::wen_restart_proto::LastVotedForkSlotsAggregateFinal, *}, - assert_matches::assert_matches, solana_accounts_db::hardened_unpack::MAX_GENESIS_ARCHIVE_UNPACKED_SIZE, solana_entry::entry::create_ticks, solana_gossip::{ @@ -1618,7 +1641,7 @@ mod tests { } #[test] - fn test_wen_restart_initialize_failures() { + fn test_wen_restart_initialize() { solana_logger::setup(); let ledger_path = get_tmp_ledger_path_auto_delete!(); let test_state = wen_restart_test_init(&ledger_path); @@ -1638,14 +1661,8 @@ mod tests { .unwrap(), prost::DecodeError::new("invalid wire type value: 7") ); - remove_file(&test_state.wen_restart_proto_path).unwrap(); - let last_vote = VoteTransaction::from(TowerSync::from(vec![(0, 8), (1, 1)])); - assert!(initialize( - &test_state.wen_restart_proto_path, - last_vote.clone(), - test_state.blockstore.clone() - ) - .is_ok()); + assert!(remove_file(&test_state.wen_restart_proto_path).is_ok()); + let last_vote_bankhash = Hash::new_unique(); let empty_last_vote = VoteTransaction::from(Vote::new(vec![], last_vote_bankhash)); assert_eq!( initialize( @@ -1658,19 +1675,6 @@ mod tests { .unwrap(), WenRestartError::MissingLastVotedForkSlots, ); - // Test the case where the file is not found. - let _ = remove_file(&test_state.wen_restart_proto_path); - assert_matches!( - initialize(&test_state.wen_restart_proto_path, VoteTransaction::from(Vote::new(last_voted_fork_slots.clone(), last_vote_bankhash)), test_state.blockstore.clone()), - Ok((WenRestartProgressInternalState::Init { last_voted_fork_slots, last_vote_bankhash: bankhash }, progress)) => { - assert_eq!(last_voted_fork_slots, test_state.last_voted_fork_slots); - assert_eq!(bankhash, last_vote_bankhash); - assert_eq!(progress, WenRestartProgress { - state: RestartState::Init.into(), - ..Default::default() - }); - } - ); assert!(write_wen_restart_records( &test_state.wen_restart_proto_path, &WenRestartProgress { @@ -1690,24 +1694,218 @@ mod tests { .to_string(), "Malformed last voted fork slots protobuf: None" ); + let progress_missing_heaviest_fork_aggregate = WenRestartProgress { + state: RestartState::HeaviestFork.into(), + my_heaviest_fork: Some(HeaviestForkRecord { + slot: 0, + bankhash: Hash::new_unique().to_string(), + total_active_stake: 0, + shred_version: SHRED_VERSION as u32, + wallclock: 0, + }), + ..Default::default() + }; assert!(write_wen_restart_records( &test_state.wen_restart_proto_path, - &WenRestartProgress { - state: RestartState::WaitingForSupermajority.into(), - ..Default::default() - }, + &progress_missing_heaviest_fork_aggregate, ) .is_ok()); assert_eq!( initialize( &test_state.wen_restart_proto_path, - VoteTransaction::from(Vote::new(last_voted_fork_slots, last_vote_bankhash)), + VoteTransaction::from(Vote::new(last_voted_fork_slots.clone(), last_vote_bankhash)), + test_state.blockstore.clone() + ).err() + .unwrap() + .to_string(), + "Malformed progress: HeaviestFork missing final_result in last_voted_fork_slots_aggregate", + ); + let progress_missing_my_heaviestfork = WenRestartProgress { + state: RestartState::GenerateSnapshot.into(), + my_snapshot: Some(GenerateSnapshotRecord { + slot: 0, + bankhash: Hash::new_unique().to_string(), + shred_version: SHRED_VERSION as u32, + path: "/path/to/snapshot".to_string(), + }), + ..Default::default() + }; + assert!(write_wen_restart_records( + &test_state.wen_restart_proto_path, + &progress_missing_my_heaviestfork, + ) + .is_ok()); + assert_eq!( + initialize( + &test_state.wen_restart_proto_path, + VoteTransaction::from(Vote::new(last_voted_fork_slots.clone(), last_vote_bankhash)), test_state.blockstore.clone() ) .err() .unwrap() .to_string(), - "Unexpected state: WaitingForSupermajority" + "Malformed progress: GenerateSnapshot missing my_heaviest_fork", + ); + + // Now test successful initialization. + assert!(remove_file(&test_state.wen_restart_proto_path).is_ok()); + // Test the case where the file is not found. + let mut vote = TowerSync::from(vec![(test_state.last_voted_fork_slots[0], 1)]); + vote.hash = last_vote_bankhash; + let last_vote = VoteTransaction::from(vote); + assert_eq!( + initialize( + &test_state.wen_restart_proto_path, + last_vote.clone(), + test_state.blockstore.clone() + ) + .unwrap(), + ( + WenRestartProgressInternalState::Init { + last_voted_fork_slots: test_state.last_voted_fork_slots.clone(), + last_vote_bankhash + }, + WenRestartProgress { + state: RestartState::Init.into(), + ..Default::default() + } + ) + ); + let progress = WenRestartProgress { + state: RestartState::Init.into(), + my_last_voted_fork_slots: Some(LastVotedForkSlotsRecord { + last_voted_fork_slots: test_state.last_voted_fork_slots.clone(), + last_vote_bankhash: last_vote_bankhash.to_string(), + shred_version: SHRED_VERSION as u32, + wallclock: 0, + }), + ..Default::default() + }; + assert!(write_wen_restart_records(&test_state.wen_restart_proto_path, &progress,).is_ok()); + assert_eq!( + initialize( + &test_state.wen_restart_proto_path, + last_vote.clone(), + test_state.blockstore.clone() + ) + .unwrap(), + ( + WenRestartProgressInternalState::Init { + last_voted_fork_slots: test_state.last_voted_fork_slots.clone(), + last_vote_bankhash, + }, + progress + ) + ); + let progress = WenRestartProgress { + state: RestartState::LastVotedForkSlots.into(), + my_last_voted_fork_slots: Some(LastVotedForkSlotsRecord { + last_voted_fork_slots: test_state.last_voted_fork_slots.clone(), + last_vote_bankhash: last_vote_bankhash.to_string(), + shred_version: SHRED_VERSION as u32, + wallclock: 0, + }), + ..Default::default() + }; + assert!(write_wen_restart_records(&test_state.wen_restart_proto_path, &progress,).is_ok()); + assert_eq!( + initialize( + &test_state.wen_restart_proto_path, + last_vote.clone(), + test_state.blockstore.clone() + ) + .unwrap(), + ( + WenRestartProgressInternalState::LastVotedForkSlots { + last_voted_fork_slots: test_state.last_voted_fork_slots.clone(), + aggregate_final_result: None, + }, + progress + ) + ); + let progress = WenRestartProgress { + state: RestartState::HeaviestFork.into(), + my_heaviest_fork: Some(HeaviestForkRecord { + slot: 0, + bankhash: Hash::new_unique().to_string(), + total_active_stake: 0, + shred_version: SHRED_VERSION as u32, + wallclock: 0, + }), + last_voted_fork_slots_aggregate: Some(LastVotedForkSlotsAggregateRecord { + received: HashMap::new(), + final_result: Some(LastVotedForkSlotsAggregateFinal { + slots_stake_map: HashMap::new(), + total_active_stake: 1000, + }), + }), + ..Default::default() + }; + assert!(write_wen_restart_records(&test_state.wen_restart_proto_path, &progress,).is_ok()); + assert_eq!( + initialize( + &test_state.wen_restart_proto_path, + last_vote.clone(), + test_state.blockstore.clone() + ) + .unwrap(), + ( + WenRestartProgressInternalState::FindHeaviestFork { + aggregate_final_result: LastVotedForkSlotsFinalResult { + slots_stake_map: HashMap::new(), + total_active_stake: 1000, + }, + my_heaviest_fork: progress.my_heaviest_fork.clone(), + }, + progress + ) + ); + let progress = WenRestartProgress { + state: RestartState::GenerateSnapshot.into(), + my_heaviest_fork: Some(HeaviestForkRecord { + slot: 0, + bankhash: Hash::new_unique().to_string(), + total_active_stake: 0, + shred_version: SHRED_VERSION as u32, + wallclock: 0, + }), + my_snapshot: Some(GenerateSnapshotRecord { + slot: 0, + bankhash: Hash::new_unique().to_string(), + shred_version: SHRED_VERSION as u32, + path: "/path/to/snapshot".to_string(), + }), + ..Default::default() + }; + assert!(write_wen_restart_records(&test_state.wen_restart_proto_path, &progress,).is_ok()); + assert_eq!( + initialize( + &test_state.wen_restart_proto_path, + VoteTransaction::from(Vote::new(last_voted_fork_slots.clone(), last_vote_bankhash)), + test_state.blockstore.clone() + ) + .unwrap(), + ( + WenRestartProgressInternalState::GenerateSnapshot { + new_root_slot: 0, + my_snapshot: progress.my_snapshot.clone(), + }, + progress, + ) + ); + let progress = WenRestartProgress { + state: RestartState::Done.into(), + ..Default::default() + }; + assert!(write_wen_restart_records(&test_state.wen_restart_proto_path, &progress,).is_ok()); + assert_eq!( + initialize( + &test_state.wen_restart_proto_path, + VoteTransaction::from(Vote::new(last_voted_fork_slots, last_vote_bankhash)), + test_state.blockstore.clone() + ) + .unwrap(), + (WenRestartProgressInternalState::Done, progress) ); }