Skip to content

Commit

Permalink
Merge pull request #7 from butaneprotocol/connect-debugging-round-2
Browse files Browse the repository at this point in the history
Connect debugging round 2
  • Loading branch information
Quantumplation authored May 19, 2024
2 parents d1217b7 + 20fddf9 commit 48acfc2
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 18 deletions.
2 changes: 1 addition & 1 deletion config.base.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
port: 8000 # Port to handle traffic from other nodes
port: 31415 # Port to handle traffic from other nodes
health_port: 18000 # Port to report our health status
heartbeat_ms: 100 # How long should the leader wait before sending a heartbeat
timeout_ms: 500 # How long should we wait before picking a new leader
Expand Down
2 changes: 2 additions & 0 deletions config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ keygen:
min_signers: 2 # How many signers do we need to publish a package? Every node must agree on this.
peers:
- address: 127.0.0.1:8001
label: Descriptive label
public_key: |
-----BEGIN PUBLIC KEY-----
MCowBQYDK2VwAyEAtYoVpsW+gRlzXM9Iu3OBoopNj6y5OB8oeOJIWD10G/U=
-----END PUBLIC KEY-----
- address: 127.0.0.1:8002
label: Who's this
public_key: |
-----BEGIN PUBLIC KEY-----
MCowBQYDK2VwAyEAg1sXWpm7CUeBGj9pdCgAVjKtJhEwL9s3BH1Ua9y6Nm0=
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ services:
timeout: 10s
retries: 1
ports:
- 8000:8000
- 31415:31415
- 18000:18000
restart: unless-stopped
secrets:
Expand Down
1 change: 1 addition & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ impl OracleConfig {

#[derive(Debug, Deserialize, Clone)]
pub struct PeerConfig {
pub label: Option<String>,
pub address: String,
pub public_key: String,
}
Expand Down
2 changes: 1 addition & 1 deletion src/keygen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ pub async fn run(config: &OracleConfig) -> Result<()> {
fs::write(&key_path, key_package.serialize()?)?;
info!("Frost private key saved to {}", key_path);
fs::write(&public_key_path, public_key_package.serialize()?)?;
info!("Frost public key saved to {}", key_path);
info!("Frost public key saved to {}", public_key_path);
}
}
}
Expand Down
40 changes: 25 additions & 15 deletions src/network/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ enum Message {
struct Peer {
id: NodeId,
public_key: PublicKey,
label: String,
address: String,
}

Expand Down Expand Up @@ -138,6 +139,8 @@ impl Core {
) -> Result<Self> {
let private_key = Arc::new(read_private_key()?);
let id = compute_node_id(&private_key.verifying_key());
info!("This node has ID {}", id);

let peers = {
let peers: Result<Vec<Peer>> = config.peers.iter().map(parse_peer).collect();
Arc::new(peers?)
Expand Down Expand Up @@ -292,19 +295,24 @@ impl Core {

// Figure out who they are based on the public key they sent us
let id_public_key = message.id_public_key;
let them = compute_node_id(&id_public_key);
let peer_id = compute_node_id(&id_public_key);
let Some(peer) = self.peers.iter().find(|p| p.id == peer_id) else {
warn!("Unrecognized peer {}", peer_id);
return;
};
let them = peer.label.clone();

// Confirm that they are who they say they are; they should have signed the ecdh nonce with their private key
let signature = message.signature;
if let Err(e) = id_public_key.verify(ecdh_public_key.as_bytes(), &signature) {
warn!(them = %them, "Signature does not match public key: {}", e);
warn!(them, "Signature does not match public key: {}", e);
return;
}

// Notify whoever's listening that we have a new connection
// (We look up the "incoming connection" sender, so if we don't recognize them we fail here)
let Some(connection_tx) = txs.get(&them) else {
warn!(them = %them, "Other node not recognized");
let Some(connection_tx) = txs.get(&peer_id) else {
warn!(them, "Other node not recognized");
return;
};
if let Err(e) = connection_tx
Expand All @@ -314,7 +322,7 @@ impl Core {
})
.await
{
warn!(them = %them, "Could not send incoming connection: {:?}", e);
warn!(them, "Could not send incoming connection: {:?}", e);
return;
}

Expand Down Expand Up @@ -350,7 +358,7 @@ impl Core {
let outgoing_connection = match self.connect_to_peer(&peer).await {
Ok(conn) => conn,
Err(e) => {
warn!("error connecting to {}: {:#}", peer.id, e);
warn!("error connecting to {}: {:#}", peer.label, e);
sleep(Duration::from_secs(sleep_secs)).await;
sleep_secs = if sleep_secs >= 8 { 8 } else { sleep_secs * 2 };
continue;
Expand Down Expand Up @@ -386,7 +394,7 @@ impl Core {
}

async fn connect_to_peer(&self, peer: &Peer) -> Result<OutgoingConnection> {
trace!("Attempting to connect to {} ({})", peer.id, peer.address);
trace!("Attempting to connect to {} ({})", peer.id, peer.label);
let stream = TcpStream::connect(&peer.address)
.await
.context("error opening connection")?;
Expand Down Expand Up @@ -453,7 +461,7 @@ impl Core {
outgoing_connection: OutgoingConnection,
outgoing_message_rx: &mut mpsc::Receiver<AppMessage>,
) {
info!("Connected to {} ({})", peer.id, peer.address);
info!("Connected to {} ({})", peer.id, peer.label);
// try to tell Raft that we are definitely connected
let _ = self
.incoming_tx
Expand All @@ -480,36 +488,36 @@ impl Core {

let mut stream = incoming_connection.stream;
let incoming_message_tx = self.incoming_tx.clone();
let them = peer.id.clone();
let them = peer.label.clone();
let recv_task = async move {
while let Some(msg) = stream.next().await {
match msg {
Ok(Message::Application(message)) => {
let message = match message.decrypt(&chacha) {
Ok(message) => message,
Err(e) => {
warn!(them = %them, "Failed to decrypt incoming message: {:#}", e);
warn!(them, "Failed to decrypt incoming message: {:#}", e);
break;
}
};
if let Err(e) = incoming_message_tx.send((them.clone(), message)).await {
warn!(them = %them, "Failed to send message: {:?}", e);
if let Err(e) = incoming_message_tx.send((peer.id.clone(), message)).await {
warn!(them, "Failed to send message: {:?}", e);
break;
}
}
// If someone tries to Hello us again, break it off
Ok(other) => {
warn!(them = %them, "Unexpected message: {:?}", other);
warn!(them, "Unexpected message: {:?}", other);
break;
}
// Someone is sending us messages that we can't parse
Err(e) => {
warn!(them = %them, "Failed to parse message: {:?}", e);
warn!(them, "Failed to parse message: {:?}", e);
break;
}
}
}
warn!(them = %them, "Incoming connection Disconnected");
warn!(them, "Incoming connection Disconnected");
};

// Run until either the sender or receiver task stops running, then return so we can reconnect
Expand Down Expand Up @@ -545,9 +553,11 @@ fn parse_peer(config: &PeerConfig) -> Result<Peer> {
PublicKey::from_bytes(&key_bytes.0)?
};
let id = compute_node_id(&public_key);
let label = config.label.as_ref().unwrap_or(&config.address).clone();
Ok(Peer {
id,
public_key,
label,
address: config.address.clone(),
})
}
Expand Down

0 comments on commit 48acfc2

Please sign in to comment.