Skip to content

Commit

Permalink
Fix cowHostList can't have hosts with same ConnectAddress
Browse files Browse the repository at this point in the history
cowHostList uses HostInfo.Equal to confirm host uniqueness,
which relies on `ConnectAddress.Equal`, which does not allow to have
different hosts with same `ConnectAddress`
  • Loading branch information
dkropachev committed Jun 2, 2024
1 parent 7f7905d commit 74e3404
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 19 deletions.
10 changes: 5 additions & 5 deletions host_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (h *HostInfo) Equal(host *HostInfo) bool {
return true
}

return h.ConnectAddress().Equal(host.ConnectAddress())
return h.HostID() == host.HostID() && h.ConnectAddressAndPort() == host.ConnectAddressAndPort()
}

func (h *HostInfo) Peer() net.IP {
Expand Down Expand Up @@ -435,10 +435,10 @@ func (h *HostInfo) Hostname() string {
}

func (h *HostInfo) ConnectAddressAndPort() string {
h.mu.Lock()
defer h.mu.Unlock()
addr, _ := h.connectAddressLocked()
return net.JoinHostPort(addr.String(), strconv.Itoa(h.port))
h.mu.Lock()
defer h.mu.Unlock()
addr, _ := h.connectAddressLocked()
return net.JoinHostPort(addr.String(), strconv.Itoa(h.port))
}

func (h *HostInfo) String() string {
Expand Down
15 changes: 7 additions & 8 deletions policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"fmt"
"math"
"math/rand"
"net"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -64,7 +63,7 @@ func (c *cowHostList) add(host *HostInfo) bool {
return true
}

func (c *cowHostList) remove(ip net.IP) bool {
func (c *cowHostList) remove(host *HostInfo) bool {
c.mu.Lock()
l := c.get()
size := len(l)
Expand All @@ -76,7 +75,7 @@ func (c *cowHostList) remove(ip net.IP) bool {
found := false
newL := make([]*HostInfo, 0, size)
for i := 0; i < len(l); i++ {
if !l[i].ConnectAddress().Equal(ip) {
if !l[i].Equal(host) {
newL = append(newL, l[i])
} else {
found = true
Expand Down Expand Up @@ -374,7 +373,7 @@ func (r *roundRobinHostPolicy) AddHost(host *HostInfo) {
}

func (r *roundRobinHostPolicy) RemoveHost(host *HostInfo) {
r.hosts.remove(host.ConnectAddress())
r.hosts.remove(host)
}

func (r *roundRobinHostPolicy) HostUp(host *HostInfo) {
Expand Down Expand Up @@ -566,7 +565,7 @@ func (t *tokenAwareHostPolicy) AddHosts(hosts []*HostInfo) {

func (t *tokenAwareHostPolicy) RemoveHost(host *HostInfo) {
t.mu.Lock()
if t.hosts.remove(host.ConnectAddress()) {
if t.hosts.remove(host) {
meta := t.getMetadataForUpdate()
meta.resetTokenRing(t.partitioner, t.hosts.get(), t.logger)
t.updateReplicas(meta, t.getKeyspaceName())
Expand Down Expand Up @@ -981,9 +980,9 @@ func (d *dcAwareRR) AddHost(host *HostInfo) {

func (d *dcAwareRR) RemoveHost(host *HostInfo) {
if d.IsLocal(host) {
d.localHosts.remove(host.ConnectAddress())
d.localHosts.remove(host)
} else {
d.remoteHosts.remove(host.ConnectAddress())
d.remoteHosts.remove(host)
}
}

Expand Down Expand Up @@ -1090,7 +1089,7 @@ func (d *rackAwareRR) AddHost(host *HostInfo) {

func (d *rackAwareRR) RemoveHost(host *HostInfo) {
dist := d.HostTier(host)
d.hosts[dist].remove(host.ConnectAddress())
d.hosts[dist].remove(host)
}

func (d *rackAwareRR) HostUp(host *HostInfo) { d.AddHost(host) }
Expand Down
38 changes: 32 additions & 6 deletions policies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,32 @@ func TestRoundRobbin(t *testing.T) {
}
}

func TestRoundRobbinSameConnectAddress(t *testing.T) {
policy := RoundRobinHostPolicy()

hosts := [...]*HostInfo{
{hostId: "0", connectAddress: net.IPv4(0, 0, 0, 1), port: 9042},
{hostId: "1", connectAddress: net.IPv4(0, 0, 0, 1), port: 9043},
}

for _, host := range hosts {
policy.AddHost(host)
}

got := make(map[string]bool)
it := policy.Pick(nil)
for h := it(); h != nil; h = it() {
id := h.Info().hostId
if got[id] {
t.Fatalf("got duplicate host: %v", id)
}
got[id] = true
}
if len(got) != len(hosts) {
t.Fatalf("expected %d hosts got %d", len(hosts), len(got))
}
}

// Tests of the token-aware host selection policy implementation with a
// round-robin host selection policy fallback.
func TestHostPolicy_TokenAware_SimpleStrategy(t *testing.T) {
Expand Down Expand Up @@ -132,47 +158,47 @@ func TestHostPolicy_TokenAware_LWT_DisablesHostShuffling(t *testing.T) {
{hostId: "3", connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"25", "35", "45"}},
{hostId: "4", connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"50", "60", "70"}},
{hostId: "5", connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"50", "60", "70"}},
}, routingKey: "8", lwt: true, shuffle: true, want: []string{"0", "2", "3", "1"}},
}, routingKey: "8", lwt: true, shuffle: true, want: []string{"0", "2", "3", "4", "5", "1"}},
"token 08 shuffling not configured": {hosts: []*HostInfo{
{hostId: "0", connectAddress: net.IPv4(10, 0, 0, 1), tokens: []string{"00", "10", "20"}},
{hostId: "1", connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"25", "35", "45"}},
{hostId: "2", connectAddress: net.IPv4(10, 0, 0, 2), tokens: []string{"00", "10", "20"}},
{hostId: "3", connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"25", "35", "45"}},
{hostId: "4", connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"50", "60", "70"}},
{hostId: "5", connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"50", "60", "70"}},
}, routingKey: "8", lwt: true, shuffle: false, want: []string{"0", "2", "3", "1"}},
}, routingKey: "8", lwt: true, shuffle: true, want: []string{"0", "2", "3", "4", "5", "1"}},
"token 30 shuffling configured": {hosts: []*HostInfo{
{hostId: "0", connectAddress: net.IPv4(10, 0, 0, 1), tokens: []string{"00", "10", "20"}},
{hostId: "1", connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"25", "35", "45"}},
{hostId: "2", connectAddress: net.IPv4(10, 0, 0, 2), tokens: []string{"00", "10", "20"}},
{hostId: "3", connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"25", "35", "45"}},
{hostId: "4", connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"50", "60", "70"}},
{hostId: "5", connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"50", "60", "70"}},
}, routingKey: "30", lwt: true, shuffle: true, want: []string{"1", "3", "2", "0"}},
}, routingKey: "30", lwt: true, shuffle: true, want: []string{"1", "3", "2", "4", "5", "0"}},
"token 30 shuffling not configured": {hosts: []*HostInfo{
{hostId: "0", connectAddress: net.IPv4(10, 0, 0, 1), tokens: []string{"00", "10", "20"}},
{hostId: "1", connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"25", "35", "45"}},
{hostId: "2", connectAddress: net.IPv4(10, 0, 0, 2), tokens: []string{"00", "10", "20"}},
{hostId: "3", connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"25", "35", "45"}},
{hostId: "4", connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"50", "60", "70"}},
{hostId: "5", connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"50", "60", "70"}},
}, routingKey: "30", lwt: true, shuffle: false, want: []string{"1", "3", "2", "0"}},
}, routingKey: "30", lwt: true, shuffle: false, want: []string{"1", "3", "2", "4", "5", "0"}},
"token 55 shuffling configured": {hosts: []*HostInfo{
{hostId: "0", connectAddress: net.IPv4(10, 0, 0, 1), tokens: []string{"00", "10", "20"}},
{hostId: "1", connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"25", "35", "45"}},
{hostId: "2", connectAddress: net.IPv4(10, 0, 0, 2), tokens: []string{"00", "10", "20"}},
{hostId: "3", connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"25", "35", "45"}},
{hostId: "4", connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"50", "60", "70"}},
{hostId: "5", connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"50", "60", "70"}},
}, routingKey: "55", lwt: true, shuffle: true, want: []string{"0", "2", "3", "1"}},
}, routingKey: "55", lwt: true, shuffle: true, want: []string{"4", "5", "2", "3", "0", "1"}},
"token 55 shuffling not configured": {hosts: []*HostInfo{
{hostId: "0", connectAddress: net.IPv4(10, 0, 0, 1), tokens: []string{"00", "10", "20"}},
{hostId: "1", connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"25", "35", "45"}},
{hostId: "2", connectAddress: net.IPv4(10, 0, 0, 2), tokens: []string{"00", "10", "20"}},
{hostId: "3", connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"25", "35", "45"}},
{hostId: "4", connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"50", "60", "70"}},
{hostId: "5", connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"50", "60", "70"}},
}, routingKey: "55", lwt: true, shuffle: false, want: []string{"0", "2", "3", "1"}},
}, routingKey: "55", lwt: true, shuffle: false, want: []string{"4", "5", "2", "3", "0", "1"}},
}
const keyspace = "myKeyspace"
for name, tc := range tests {
Expand Down

0 comments on commit 74e3404

Please sign in to comment.