Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process coordinator messages to duplicate state between multiple coor… #4186

Merged
merged 3 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions stacks-signer/src/runloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl<C: Coordinator> RunLoop<C> {
event: &StackerDBChunksEvent,
) -> (Vec<Packet>, Vec<OperationResult>) {
// Determine the current coordinator id and public key for verification
let (coordinator_id, coordinator_public_key) =
let (_coordinator_id, coordinator_public_key) =
calculate_coordinator(&self.signing_round.public_keys);
// Filter out invalid messages
let inbound_messages: Vec<Packet> = event
Expand All @@ -189,15 +189,18 @@ impl<C: Coordinator> RunLoop<C> {
let mut outbound_messages = self
.signing_round
.process_inbound_messages(&inbound_messages)
.unwrap_or_default();
// If the signer is the coordinator, then next process the message as the coordinator
let (messages, results) = if self.signing_round.signer_id == coordinator_id {
self.coordinator
.process_inbound_messages(&inbound_messages)
.unwrap_or_default()
} else {
(vec![], vec![])
};
.unwrap_or_else(|e| {
error!("Failed to process inbound messages as a signer: {e}");
vec![]
});
// Next process the message as the coordinator
let (messages, results) = self
.coordinator
.process_inbound_messages(&inbound_messages)
.unwrap_or_else(|e| {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the best behavior? I.e. when processing inbound fails, should we log and continue?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its the currently best behaviour. For now we don't want messages received out of order for example to crash the system. I.e. if a rogue signer starts sending valid but inappropriate responses, the coordinator state could error. I think we might need to introduce some sort of recovery. But for now, I think its better than crashing outright.

error!("Failed to process inbound messages as a coordinator: {e}");
(vec![], vec![])
});
outbound_messages.extend(messages);
(outbound_messages, results)
}
Expand Down
68 changes: 36 additions & 32 deletions testnet/stacks-node/src/tests/signer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,15 +240,15 @@ fn test_stackerdb_dkg() {
let mut running_signers = vec![];
// Spawn all the signers first to listen to the coordinator request for dkg
let mut signer_cmd_senders = Vec::new();
let mut signer_res_receivers = Vec::new();
let mut res_receivers = Vec::new();
for i in (1..num_signers).rev() {
let (cmd_send, cmd_recv) = channel();
let (res_send, res_recv) = channel();
info!("spawn signer");
let running_signer = spawn_signer(&signer_configs[i as usize], cmd_recv, res_send);
running_signers.push(running_signer);
signer_cmd_senders.push(cmd_send);
signer_res_receivers.push(res_recv);
res_receivers.push(res_recv);
}
// Spawn coordinator second
let (coordinator_cmd_send, coordinator_cmd_recv) = channel();
Expand All @@ -260,6 +260,8 @@ fn test_stackerdb_dkg() {
coordinator_res_send,
);

res_receivers.push(coordinator_res_recv);

// Let's wrap the node in a lifetime to ensure stopping the signers doesn't cause issues.
{
// Setup the nodes and deploy the contract to it
Expand Down Expand Up @@ -291,38 +293,40 @@ fn test_stackerdb_dkg() {
merkle_root: None,
})
.expect("failed to send Sign command");

let mut aggregate_group_key = None;
let mut frost_signature = None;
let mut schnorr_proof = None;

loop {
let results = coordinator_res_recv.recv().expect("failed to recv results");
for result in results {
match result {
OperationResult::Dkg(point) => {
info!("Received aggregate_group_key {point}");
aggregate_group_key = Some(point);
}
OperationResult::Sign(sig) => {
info!("Received Signature ({},{})", &sig.R, &sig.z);
frost_signature = Some(sig);
}
OperationResult::SignTaproot(proof) => {
info!("Received SchnorrProof ({},{})", &proof.r, &proof.s);
schnorr_proof = Some(proof);
}
OperationResult::DkgError(dkg_error) => {
panic!("Received DkgError {}", dkg_error);
}
OperationResult::SignError(sign_error) => {
panic!("Received SignError {}", sign_error);
for recv in res_receivers.iter() {
let mut aggregate_group_key = None;
let mut frost_signature = None;
let mut schnorr_proof = None;
loop {
let results = recv.recv().expect("failed to recv results");
for result in results {
match result {
OperationResult::Dkg(point) => {
info!("Received aggregate_group_key {point}");
aggregate_group_key = Some(point);
}
OperationResult::Sign(sig) => {
info!("Received Signature ({},{})", &sig.R, &sig.z);
frost_signature = Some(sig);
}
OperationResult::SignTaproot(proof) => {
info!("Received SchnorrProof ({},{})", &proof.r, &proof.s);
schnorr_proof = Some(proof);
}
OperationResult::DkgError(dkg_error) => {
panic!("Received DkgError {:?}", dkg_error);
}
OperationResult::SignError(sign_error) => {
panic!("Received SignError {}", sign_error);
}
}
}
}
if aggregate_group_key.is_some() && frost_signature.is_some() && schnorr_proof.is_some()
{
break;
if aggregate_group_key.is_some()
&& frost_signature.is_some()
&& schnorr_proof.is_some()
{
break;
}
}
}
let elapsed = now.elapsed();
Expand Down