Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

do not fire false ackMessage in timeout. check timeout in caller #304

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 12 additions & 25 deletions state.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ func (m *Memberlist) probeNode(node *nodeState) {
}
ackCh := make(chan ackMessage, m.config.IndirectChecks+1)
nackCh := make(chan struct{}, m.config.IndirectChecks+1)
start := time.Now()
m.setProbeChannels(ping.SeqNo, ackCh, nackCh, probeInterval)

// Mark the sent time here, which should be after any pre-processing but
Expand Down Expand Up @@ -391,19 +392,11 @@ func (m *Memberlist) probeNode(node *nodeState) {
// Wait for response or round-trip-time.
select {
case v := <-ackCh:
if v.Complete == true {
if m.config.Ping != nil {
rtt := v.Timestamp.Sub(sent)
m.config.Ping.NotifyPingComplete(&node.Node, rtt, v.Payload)
}
return
}

// As an edge case, if we get a timeout, we need to re-enqueue it
// here to break out of the select below.
if v.Complete == false {
ackCh <- v
if m.config.Ping != nil {
rtt := v.Timestamp.Sub(sent)
m.config.Ping.NotifyPingComplete(&node.Node, rtt, v.Payload)
}
return
case <-time.After(m.config.ProbeTimeout):
// Note that we don't scale this timeout based on awareness and
// the health score. That's because we don't really expect waiting
Expand Down Expand Up @@ -484,11 +477,12 @@ HANDLE_REMOTE_FAILURE:
// channel here because we want to issue a warning below if that's the
// *only* way we hear back from the peer, so we have to let this time
// out first to allow the normal UDP-based acks to come in.
remaining := time.Until(start.Add(probeInterval))
select {
case v := <-ackCh:
if v.Complete == true {
return
}
case <-ackCh:
return
case <-time.After(remaining):
m.logger.Printf("[DEBUG] memberlist: ackHandler timeout for node %s", node.Name)
}

// Finally, poll the fallback channel. The timeouts are set such that
Expand Down Expand Up @@ -552,9 +546,7 @@ func (m *Memberlist) Ping(node string, addr net.Addr) (time.Duration, error) {
// Wait for response or timeout.
select {
case v := <-ackCh:
if v.Complete == true {
return v.Timestamp.Sub(sent), nil
}
return v.Timestamp.Sub(sent), nil
case <-time.After(m.config.ProbeTimeout):
// Timeout, return an error below.
}
Expand Down Expand Up @@ -825,7 +817,6 @@ func (m *Memberlist) estNumNodes() int {
}

type ackMessage struct {
Complete bool
Payload []byte
Timestamp time.Time
}
Expand All @@ -838,7 +829,7 @@ func (m *Memberlist) setProbeChannels(seqNo uint32, ackCh chan ackMessage, nackC
// Create handler functions for acks and nacks
ackFn := func(payload []byte, timestamp time.Time) {
select {
case ackCh <- ackMessage{true, payload, timestamp}:
case ackCh <- ackMessage{payload, timestamp}:
default:
}
}
Expand All @@ -860,10 +851,6 @@ func (m *Memberlist) setProbeChannels(seqNo uint32, ackCh chan ackMessage, nackC
m.ackLock.Lock()
delete(m.ackHandlers, seqNo)
m.ackLock.Unlock()
select {
case ackCh <- ackMessage{false, nil, time.Now()}:
default:
}
})
}

Expand Down
18 changes: 8 additions & 10 deletions state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1130,23 +1130,22 @@ func TestMemberList_invokeAckHandler_Channel_Ack(t *testing.T) {

ackCh := make(chan ackMessage, 1)
nackCh := make(chan struct{}, 1)
m.setProbeChannels(0, ackCh, nackCh, 10*time.Millisecond)
timeout := 10 * time.Millisecond
m.setProbeChannels(0, ackCh, nackCh, timeout)

// Should send message
m.invokeAckHandler(ack, time.Now())

select {
case v := <-ackCh:
if v.Complete != true {
t.Fatalf("Bad value")
}
if bytes.Compare(v.Payload, ack.Payload) != 0 {
t.Fatalf("wrong payload. expected: %v; actual: %v", ack.Payload, v.Payload)
}

case <-nackCh:
t.Fatalf("should not get a nack")

case <-time.After(timeout):
t.Fatalf("timeout")
default:
t.Fatalf("message not sent")
}
Expand All @@ -1164,7 +1163,8 @@ func TestMemberList_invokeAckHandler_Channel_Nack(t *testing.T) {

ackCh := make(chan ackMessage, 1)
nackCh := make(chan struct{}, 1)
m.setProbeChannels(0, ackCh, nackCh, 10*time.Millisecond)
timeout := 10 * time.Millisecond
m.setProbeChannels(0, ackCh, nackCh, timeout)

// Should send message.
m.invokeNackHandler(nack)
Expand All @@ -1189,16 +1189,14 @@ func TestMemberList_invokeAckHandler_Channel_Nack(t *testing.T) {

select {
case v := <-ackCh:
if v.Complete != true {
t.Fatalf("Bad value")
}
if bytes.Compare(v.Payload, ack.Payload) != 0 {
t.Fatalf("wrong payload. expected: %v; actual: %v", ack.Payload, v.Payload)
}

case <-nackCh:
t.Fatalf("should not get a nack")

case <-time.After(timeout):
t.Fatalf("timeout")
default:
t.Fatalf("message not sent")
}
Expand Down