Skip to content

Commit

Permalink
Fix cowHostList can't have hosts with same ConnectAddress
Browse files Browse the repository at this point in the history
cowHostList uses HostInfo.Equal to confirm host uniqueness,
which relies on `ConnectAddress.Equal`, which does not allow to have
different hosts with same `ConnectAddress`
  • Loading branch information
dkropachev committed May 30, 2024
1 parent 34fdeeb commit cc58718
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 13 deletions.
10 changes: 5 additions & 5 deletions host_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (h *HostInfo) Equal(host *HostInfo) bool {
return true
}

return h.ConnectAddress().Equal(host.ConnectAddress())
return h.HostID() == host.HostID() && h.ConnectAddressAndPort() == host.ConnectAddressAndPort()
}

func (h *HostInfo) Peer() net.IP {
Expand Down Expand Up @@ -402,10 +402,10 @@ func (h *HostInfo) HostnameAndPort() string {
}

func (h *HostInfo) ConnectAddressAndPort() string {
h.mu.Lock()
defer h.mu.Unlock()
addr, _ := h.connectAddressLocked()
return net.JoinHostPort(addr.String(), strconv.Itoa(h.port))
h.mu.Lock()
defer h.mu.Unlock()
addr, _ := h.connectAddressLocked()
return net.JoinHostPort(addr.String(), strconv.Itoa(h.port))
}

func (h *HostInfo) String() string {
Expand Down
15 changes: 7 additions & 8 deletions policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"fmt"
"math"
"math/rand"
"net"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -64,7 +63,7 @@ func (c *cowHostList) add(host *HostInfo) bool {
return true
}

func (c *cowHostList) remove(ip net.IP) bool {
func (c *cowHostList) remove(host *HostInfo) bool {
c.mu.Lock()
l := c.get()
size := len(l)
Expand All @@ -76,7 +75,7 @@ func (c *cowHostList) remove(ip net.IP) bool {
found := false
newL := make([]*HostInfo, 0, size)
for i := 0; i < len(l); i++ {
if !l[i].ConnectAddress().Equal(ip) {
if !l[i].Equal(host) {
newL = append(newL, l[i])
} else {
found = true
Expand Down Expand Up @@ -333,7 +332,7 @@ func (r *roundRobinHostPolicy) AddHost(host *HostInfo) {
}

func (r *roundRobinHostPolicy) RemoveHost(host *HostInfo) {
r.hosts.remove(host.ConnectAddress())
r.hosts.remove(host)
}

func (r *roundRobinHostPolicy) HostUp(host *HostInfo) {
Expand Down Expand Up @@ -499,7 +498,7 @@ func (t *tokenAwareHostPolicy) AddHosts(hosts []*HostInfo) {

func (t *tokenAwareHostPolicy) RemoveHost(host *HostInfo) {
t.mu.Lock()
if t.hosts.remove(host.ConnectAddress()) {
if t.hosts.remove(host) {
meta := t.getMetadataForUpdate()
meta.resetTokenRing(t.partitioner, t.hosts.get(), t.logger)
t.updateReplicas(meta, t.getKeyspaceName())
Expand Down Expand Up @@ -843,9 +842,9 @@ func (d *dcAwareRR) AddHost(host *HostInfo) {

func (d *dcAwareRR) RemoveHost(host *HostInfo) {
if d.IsLocal(host) {
d.localHosts.remove(host.ConnectAddress())
d.localHosts.remove(host)
} else {
d.remoteHosts.remove(host.ConnectAddress())
d.remoteHosts.remove(host)
}
}

Expand Down Expand Up @@ -950,7 +949,7 @@ func (d *rackAwareRR) AddHost(host *HostInfo) {

func (d *rackAwareRR) RemoveHost(host *HostInfo) {
dist := d.HostTier(host)
d.hosts[dist].remove(host.ConnectAddress())
d.hosts[dist].remove(host)
}

func (d *rackAwareRR) HostUp(host *HostInfo) { d.AddHost(host) }
Expand Down
26 changes: 26 additions & 0 deletions policies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,32 @@ func TestRoundRobbin(t *testing.T) {
}
}

func TestRoundRobbinSameConnectAddress(t *testing.T) {
policy := RoundRobinHostPolicy()

hosts := [...]*HostInfo{
{hostId: "0", connectAddress: net.IPv4(0, 0, 0, 1), port: 9042},
{hostId: "1", connectAddress: net.IPv4(0, 0, 0, 1), port: 9043},
}

for _, host := range hosts {
policy.AddHost(host)
}

got := make(map[string]bool)
it := policy.Pick(nil)
for h := it(); h != nil; h = it() {
id := h.Info().hostId
if got[id] {
t.Fatalf("got duplicate host: %v", id)
}
got[id] = true
}
if len(got) != len(hosts) {
t.Fatalf("expected %d hosts got %d", len(hosts), len(got))
}
}

// Tests of the token-aware host selection policy implementation with a
// round-robin host selection policy fallback.
func TestHostPolicy_TokenAware_SimpleStrategy(t *testing.T) {
Expand Down

0 comments on commit cc58718

Please sign in to comment.