Skip to content

Commit

Permalink
wen_restart: remove WAITING_FOR_SUPERMAJORITY and handle all states i… (
Browse files Browse the repository at this point in the history
#1785)

wen_restart: remove WAITING_FOR_SUPERMAJORITY and handle all states in proto.
  • Loading branch information
wen-coding authored Jul 6, 2024
1 parent 45a4792 commit cbc8320
Show file tree
Hide file tree
Showing 2 changed files with 231 additions and 34 deletions.
3 changes: 1 addition & 2 deletions wen-restart/proto/wen_restart.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ enum State {
LAST_VOTED_FORK_SLOTS = 1;
HEAVIEST_FORK = 2;
GENERATE_SNAPSHOT = 3;
WAITING_FOR_SUPERMAJORITY = 4;
DONE = 5;
DONE = 4;
}

message LastVotedForkSlotsRecord {
Expand Down
262 changes: 230 additions & 32 deletions wen-restart/src/wen_restart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ pub enum WenRestartError {
FutureSnapshotExists(Slot, Slot, String),
GenerateSnapshotWhenOneExists(Slot, String),
MalformedLastVotedForkSlotsProtobuf(Option<LastVotedForkSlotsRecord>),
MalformedProgress(RestartState, String),
MissingLastVotedForkSlots,
MissingFullSnapshot(String),
NotEnoughStakeAgreeingWithUs(Slot, Hash, HashMap<(Slot, Hash), u64>),
Expand Down Expand Up @@ -135,6 +136,9 @@ impl std::fmt::Display for WenRestartError {
WenRestartError::MalformedLastVotedForkSlotsProtobuf(record) => {
write!(f, "Malformed last voted fork slots protobuf: {:?}", record)
}
WenRestartError::MalformedProgress(state, missing) => {
write!(f, "Malformed progress: {:?} missing {}", state, missing)
}
WenRestartError::MissingLastVotedForkSlots => {
write!(f, "Missing last voted fork slots")
}
Expand Down Expand Up @@ -590,7 +594,11 @@ pub(crate) fn aggregate_restart_heaviest_fork(
let epoch_stakes = root_bank.epoch_stakes(root_bank.epoch()).unwrap();
let total_stake = epoch_stakes.total_stake();
if progress.my_heaviest_fork.is_none() {
return Err(WenRestartError::UnexpectedState(RestartState::HeaviestFork).into());
return Err(WenRestartError::MalformedProgress(
RestartState::HeaviestFork,
"my_heaviest_fork".to_string(),
)
.into());
}
let my_heaviest_fork = progress.my_heaviest_fork.clone().unwrap();
let heaviest_fork_slot = my_heaviest_fork.slot;
Expand Down Expand Up @@ -1086,12 +1094,28 @@ pub(crate) fn initialize(
total_active_stake: result.total_active_stake,
})
})
.unwrap(),
.ok_or(WenRestartError::MalformedProgress(
RestartState::HeaviestFork,
"final_result in last_voted_fork_slots_aggregate".to_string(),
))?,
my_heaviest_fork: progress.my_heaviest_fork.clone(),
},
progress,
)),
_ => Err(WenRestartError::UnexpectedState(progress.state()).into()),
RestartState::GenerateSnapshot => Ok((
WenRestartProgressInternalState::GenerateSnapshot {
new_root_slot: progress
.my_heaviest_fork
.as_ref()
.ok_or(WenRestartError::MalformedProgress(
RestartState::GenerateSnapshot,
"my_heaviest_fork".to_string(),
))?
.slot,
my_snapshot: progress.my_snapshot.clone(),
},
progress,
)),
}
}

Expand Down Expand Up @@ -1119,7 +1143,6 @@ pub(crate) fn write_wen_restart_records(
mod tests {
use {
crate::wen_restart::{tests::wen_restart_proto::LastVotedForkSlotsAggregateFinal, *},
assert_matches::assert_matches,
solana_accounts_db::hardened_unpack::MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
solana_entry::entry::create_ticks,
solana_gossip::{
Expand Down Expand Up @@ -1618,7 +1641,7 @@ mod tests {
}

#[test]
fn test_wen_restart_initialize_failures() {
fn test_wen_restart_initialize() {
solana_logger::setup();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let test_state = wen_restart_test_init(&ledger_path);
Expand All @@ -1638,14 +1661,8 @@ mod tests {
.unwrap(),
prost::DecodeError::new("invalid wire type value: 7")
);
remove_file(&test_state.wen_restart_proto_path).unwrap();
let last_vote = VoteTransaction::from(TowerSync::from(vec![(0, 8), (1, 1)]));
assert!(initialize(
&test_state.wen_restart_proto_path,
last_vote.clone(),
test_state.blockstore.clone()
)
.is_ok());
assert!(remove_file(&test_state.wen_restart_proto_path).is_ok());
let last_vote_bankhash = Hash::new_unique();
let empty_last_vote = VoteTransaction::from(Vote::new(vec![], last_vote_bankhash));
assert_eq!(
initialize(
Expand All @@ -1658,19 +1675,6 @@ mod tests {
.unwrap(),
WenRestartError::MissingLastVotedForkSlots,
);
// Test the case where the file is not found.
let _ = remove_file(&test_state.wen_restart_proto_path);
assert_matches!(
initialize(&test_state.wen_restart_proto_path, VoteTransaction::from(Vote::new(last_voted_fork_slots.clone(), last_vote_bankhash)), test_state.blockstore.clone()),
Ok((WenRestartProgressInternalState::Init { last_voted_fork_slots, last_vote_bankhash: bankhash }, progress)) => {
assert_eq!(last_voted_fork_slots, test_state.last_voted_fork_slots);
assert_eq!(bankhash, last_vote_bankhash);
assert_eq!(progress, WenRestartProgress {
state: RestartState::Init.into(),
..Default::default()
});
}
);
assert!(write_wen_restart_records(
&test_state.wen_restart_proto_path,
&WenRestartProgress {
Expand All @@ -1690,24 +1694,218 @@ mod tests {
.to_string(),
"Malformed last voted fork slots protobuf: None"
);
let progress_missing_heaviest_fork_aggregate = WenRestartProgress {
state: RestartState::HeaviestFork.into(),
my_heaviest_fork: Some(HeaviestForkRecord {
slot: 0,
bankhash: Hash::new_unique().to_string(),
total_active_stake: 0,
shred_version: SHRED_VERSION as u32,
wallclock: 0,
}),
..Default::default()
};
assert!(write_wen_restart_records(
&test_state.wen_restart_proto_path,
&WenRestartProgress {
state: RestartState::WaitingForSupermajority.into(),
..Default::default()
},
&progress_missing_heaviest_fork_aggregate,
)
.is_ok());
assert_eq!(
initialize(
&test_state.wen_restart_proto_path,
VoteTransaction::from(Vote::new(last_voted_fork_slots, last_vote_bankhash)),
VoteTransaction::from(Vote::new(last_voted_fork_slots.clone(), last_vote_bankhash)),
test_state.blockstore.clone()
).err()
.unwrap()
.to_string(),
"Malformed progress: HeaviestFork missing final_result in last_voted_fork_slots_aggregate",
);
let progress_missing_my_heaviestfork = WenRestartProgress {
state: RestartState::GenerateSnapshot.into(),
my_snapshot: Some(GenerateSnapshotRecord {
slot: 0,
bankhash: Hash::new_unique().to_string(),
shred_version: SHRED_VERSION as u32,
path: "/path/to/snapshot".to_string(),
}),
..Default::default()
};
assert!(write_wen_restart_records(
&test_state.wen_restart_proto_path,
&progress_missing_my_heaviestfork,
)
.is_ok());
assert_eq!(
initialize(
&test_state.wen_restart_proto_path,
VoteTransaction::from(Vote::new(last_voted_fork_slots.clone(), last_vote_bankhash)),
test_state.blockstore.clone()
)
.err()
.unwrap()
.to_string(),
"Unexpected state: WaitingForSupermajority"
"Malformed progress: GenerateSnapshot missing my_heaviest_fork",
);

// Now test successful initialization.
assert!(remove_file(&test_state.wen_restart_proto_path).is_ok());
// Test the case where the file is not found.
let mut vote = TowerSync::from(vec![(test_state.last_voted_fork_slots[0], 1)]);
vote.hash = last_vote_bankhash;
let last_vote = VoteTransaction::from(vote);
assert_eq!(
initialize(
&test_state.wen_restart_proto_path,
last_vote.clone(),
test_state.blockstore.clone()
)
.unwrap(),
(
WenRestartProgressInternalState::Init {
last_voted_fork_slots: test_state.last_voted_fork_slots.clone(),
last_vote_bankhash
},
WenRestartProgress {
state: RestartState::Init.into(),
..Default::default()
}
)
);
let progress = WenRestartProgress {
state: RestartState::Init.into(),
my_last_voted_fork_slots: Some(LastVotedForkSlotsRecord {
last_voted_fork_slots: test_state.last_voted_fork_slots.clone(),
last_vote_bankhash: last_vote_bankhash.to_string(),
shred_version: SHRED_VERSION as u32,
wallclock: 0,
}),
..Default::default()
};
assert!(write_wen_restart_records(&test_state.wen_restart_proto_path, &progress,).is_ok());
assert_eq!(
initialize(
&test_state.wen_restart_proto_path,
last_vote.clone(),
test_state.blockstore.clone()
)
.unwrap(),
(
WenRestartProgressInternalState::Init {
last_voted_fork_slots: test_state.last_voted_fork_slots.clone(),
last_vote_bankhash,
},
progress
)
);
let progress = WenRestartProgress {
state: RestartState::LastVotedForkSlots.into(),
my_last_voted_fork_slots: Some(LastVotedForkSlotsRecord {
last_voted_fork_slots: test_state.last_voted_fork_slots.clone(),
last_vote_bankhash: last_vote_bankhash.to_string(),
shred_version: SHRED_VERSION as u32,
wallclock: 0,
}),
..Default::default()
};
assert!(write_wen_restart_records(&test_state.wen_restart_proto_path, &progress,).is_ok());
assert_eq!(
initialize(
&test_state.wen_restart_proto_path,
last_vote.clone(),
test_state.blockstore.clone()
)
.unwrap(),
(
WenRestartProgressInternalState::LastVotedForkSlots {
last_voted_fork_slots: test_state.last_voted_fork_slots.clone(),
aggregate_final_result: None,
},
progress
)
);
let progress = WenRestartProgress {
state: RestartState::HeaviestFork.into(),
my_heaviest_fork: Some(HeaviestForkRecord {
slot: 0,
bankhash: Hash::new_unique().to_string(),
total_active_stake: 0,
shred_version: SHRED_VERSION as u32,
wallclock: 0,
}),
last_voted_fork_slots_aggregate: Some(LastVotedForkSlotsAggregateRecord {
received: HashMap::new(),
final_result: Some(LastVotedForkSlotsAggregateFinal {
slots_stake_map: HashMap::new(),
total_active_stake: 1000,
}),
}),
..Default::default()
};
assert!(write_wen_restart_records(&test_state.wen_restart_proto_path, &progress,).is_ok());
assert_eq!(
initialize(
&test_state.wen_restart_proto_path,
last_vote.clone(),
test_state.blockstore.clone()
)
.unwrap(),
(
WenRestartProgressInternalState::FindHeaviestFork {
aggregate_final_result: LastVotedForkSlotsFinalResult {
slots_stake_map: HashMap::new(),
total_active_stake: 1000,
},
my_heaviest_fork: progress.my_heaviest_fork.clone(),
},
progress
)
);
let progress = WenRestartProgress {
state: RestartState::GenerateSnapshot.into(),
my_heaviest_fork: Some(HeaviestForkRecord {
slot: 0,
bankhash: Hash::new_unique().to_string(),
total_active_stake: 0,
shred_version: SHRED_VERSION as u32,
wallclock: 0,
}),
my_snapshot: Some(GenerateSnapshotRecord {
slot: 0,
bankhash: Hash::new_unique().to_string(),
shred_version: SHRED_VERSION as u32,
path: "/path/to/snapshot".to_string(),
}),
..Default::default()
};
assert!(write_wen_restart_records(&test_state.wen_restart_proto_path, &progress,).is_ok());
assert_eq!(
initialize(
&test_state.wen_restart_proto_path,
VoteTransaction::from(Vote::new(last_voted_fork_slots.clone(), last_vote_bankhash)),
test_state.blockstore.clone()
)
.unwrap(),
(
WenRestartProgressInternalState::GenerateSnapshot {
new_root_slot: 0,
my_snapshot: progress.my_snapshot.clone(),
},
progress,
)
);
let progress = WenRestartProgress {
state: RestartState::Done.into(),
..Default::default()
};
assert!(write_wen_restart_records(&test_state.wen_restart_proto_path, &progress,).is_ok());
assert_eq!(
initialize(
&test_state.wen_restart_proto_path,
VoteTransaction::from(Vote::new(last_voted_fork_slots, last_vote_bankhash)),
test_state.blockstore.clone()
)
.unwrap(),
(WenRestartProgressInternalState::Done, progress)
);
}

Expand Down

0 comments on commit cbc8320

Please sign in to comment.