diff --git a/host_source.go b/host_source.go index 8dcf371ae..ce34eabbf 100644 --- a/host_source.go +++ b/host_source.go @@ -145,7 +145,7 @@ func (h *HostInfo) Equal(host *HostInfo) bool { return true } - return h.ConnectAddress().Equal(host.ConnectAddress()) + return h.HostID() == host.HostID() && h.ConnectAddressAndPort() == host.ConnectAddressAndPort() } func (h *HostInfo) Peer() net.IP { diff --git a/policies.go b/policies.go index 98867d2fa..17d2541ca 100644 --- a/policies.go +++ b/policies.go @@ -12,7 +12,6 @@ import ( "fmt" "math" "math/rand" - "net" "sync" "sync/atomic" "time" @@ -64,7 +63,7 @@ func (c *cowHostList) add(host *HostInfo) bool { return true } -func (c *cowHostList) remove(ip net.IP) bool { +func (c *cowHostList) remove(host *HostInfo) bool { c.mu.Lock() l := c.get() size := len(l) @@ -76,7 +75,7 @@ func (c *cowHostList) remove(ip net.IP) bool { found := false newL := make([]*HostInfo, 0, size) for i := 0; i < len(l); i++ { - if !l[i].ConnectAddress().Equal(ip) { + if !l[i].Equal(host) { newL = append(newL, l[i]) } else { found = true @@ -374,7 +373,7 @@ func (r *roundRobinHostPolicy) AddHost(host *HostInfo) { } func (r *roundRobinHostPolicy) RemoveHost(host *HostInfo) { - r.hosts.remove(host.ConnectAddress()) + r.hosts.remove(host) } func (r *roundRobinHostPolicy) HostUp(host *HostInfo) { @@ -566,7 +565,7 @@ func (t *tokenAwareHostPolicy) AddHosts(hosts []*HostInfo) { func (t *tokenAwareHostPolicy) RemoveHost(host *HostInfo) { t.mu.Lock() - if t.hosts.remove(host.ConnectAddress()) { + if t.hosts.remove(host) { meta := t.getMetadataForUpdate() meta.resetTokenRing(t.partitioner, t.hosts.get(), t.logger) t.updateReplicas(meta, t.getKeyspaceName()) @@ -981,9 +980,9 @@ func (d *dcAwareRR) AddHost(host *HostInfo) { func (d *dcAwareRR) RemoveHost(host *HostInfo) { if d.IsLocal(host) { - d.localHosts.remove(host.ConnectAddress()) + d.localHosts.remove(host) } else { - d.remoteHosts.remove(host.ConnectAddress()) + d.remoteHosts.remove(host) } } @@ -1090,7 +1089,7 @@ func (d *rackAwareRR) AddHost(host *HostInfo) { func (d *rackAwareRR) RemoveHost(host *HostInfo) { dist := d.HostTier(host) - d.hosts[dist].remove(host.ConnectAddress()) + d.hosts[dist].remove(host) } func (d *rackAwareRR) HostUp(host *HostInfo) { d.AddHost(host) } diff --git a/policies_test.go b/policies_test.go index 0b0420596..6afb839b6 100644 --- a/policies_test.go +++ b/policies_test.go @@ -44,6 +44,32 @@ func TestRoundRobbin(t *testing.T) { } } +func TestRoundRobbinSameConnectAddress(t *testing.T) { + policy := RoundRobinHostPolicy() + + hosts := [...]*HostInfo{ + {hostId: "0", connectAddress: net.IPv4(0, 0, 0, 1), port: 9042}, + {hostId: "1", connectAddress: net.IPv4(0, 0, 0, 1), port: 9043}, + } + + for _, host := range hosts { + policy.AddHost(host) + } + + got := make(map[string]bool) + it := policy.Pick(nil) + for h := it(); h != nil; h = it() { + id := h.Info().hostId + if got[id] { + t.Fatalf("got duplicate host: %v", id) + } + got[id] = true + } + if len(got) != len(hosts) { + t.Fatalf("expected %d hosts got %d", len(hosts), len(got)) + } +} + // Tests of the token-aware host selection policy implementation with a // round-robin host selection policy fallback. func TestHostPolicy_TokenAware_SimpleStrategy(t *testing.T) { @@ -132,7 +158,7 @@ func TestHostPolicy_TokenAware_LWT_DisablesHostShuffling(t *testing.T) { {hostId: "3", connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"25", "35", "45"}}, {hostId: "4", connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"50", "60", "70"}}, {hostId: "5", connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"50", "60", "70"}}, - }, routingKey: "8", lwt: true, shuffle: true, want: []string{"0", "2", "3", "1"}}, + }, routingKey: "8", lwt: true, shuffle: true, want: []string{"0", "2", "3", "4", "5", "1"}}, "token 08 shuffling not configured": {hosts: []*HostInfo{ {hostId: "0", connectAddress: net.IPv4(10, 0, 0, 1), tokens: []string{"00", "10", "20"}}, {hostId: "1", connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"25", "35", "45"}}, @@ -140,7 +166,7 @@ func TestHostPolicy_TokenAware_LWT_DisablesHostShuffling(t *testing.T) { {hostId: "3", connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"25", "35", "45"}}, {hostId: "4", connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"50", "60", "70"}}, {hostId: "5", connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"50", "60", "70"}}, - }, routingKey: "8", lwt: true, shuffle: false, want: []string{"0", "2", "3", "1"}}, + }, routingKey: "8", lwt: true, shuffle: false, want: []string{"0", "2", "3", "4", "5", "1"}}, "token 30 shuffling configured": {hosts: []*HostInfo{ {hostId: "0", connectAddress: net.IPv4(10, 0, 0, 1), tokens: []string{"00", "10", "20"}}, {hostId: "1", connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"25", "35", "45"}}, @@ -148,7 +174,7 @@ func TestHostPolicy_TokenAware_LWT_DisablesHostShuffling(t *testing.T) { {hostId: "3", connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"25", "35", "45"}}, {hostId: "4", connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"50", "60", "70"}}, {hostId: "5", connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"50", "60", "70"}}, - }, routingKey: "30", lwt: true, shuffle: true, want: []string{"1", "3", "2", "0"}}, + }, routingKey: "30", lwt: true, shuffle: true, want: []string{"1", "3", "2", "4", "5", "0"}}, "token 30 shuffling not configured": {hosts: []*HostInfo{ {hostId: "0", connectAddress: net.IPv4(10, 0, 0, 1), tokens: []string{"00", "10", "20"}}, {hostId: "1", connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"25", "35", "45"}}, @@ -156,7 +182,7 @@ func TestHostPolicy_TokenAware_LWT_DisablesHostShuffling(t *testing.T) { {hostId: "3", connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"25", "35", "45"}}, {hostId: "4", connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"50", "60", "70"}}, {hostId: "5", connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"50", "60", "70"}}, - }, routingKey: "30", lwt: true, shuffle: false, want: []string{"1", "3", "2", "0"}}, + }, routingKey: "30", lwt: true, shuffle: false, want: []string{"1", "3", "2", "4", "5", "0"}}, "token 55 shuffling configured": {hosts: []*HostInfo{ {hostId: "0", connectAddress: net.IPv4(10, 0, 0, 1), tokens: []string{"00", "10", "20"}}, {hostId: "1", connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"25", "35", "45"}}, @@ -164,7 +190,7 @@ func TestHostPolicy_TokenAware_LWT_DisablesHostShuffling(t *testing.T) { {hostId: "3", connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"25", "35", "45"}}, {hostId: "4", connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"50", "60", "70"}}, {hostId: "5", connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"50", "60", "70"}}, - }, routingKey: "55", lwt: true, shuffle: true, want: []string{"0", "2", "3", "1"}}, + }, routingKey: "55", lwt: true, shuffle: true, want: []string{"4", "5", "2", "3", "0", "1"}}, "token 55 shuffling not configured": {hosts: []*HostInfo{ {hostId: "0", connectAddress: net.IPv4(10, 0, 0, 1), tokens: []string{"00", "10", "20"}}, {hostId: "1", connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"25", "35", "45"}}, @@ -172,7 +198,7 @@ func TestHostPolicy_TokenAware_LWT_DisablesHostShuffling(t *testing.T) { {hostId: "3", connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"25", "35", "45"}}, {hostId: "4", connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"50", "60", "70"}}, {hostId: "5", connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"50", "60", "70"}}, - }, routingKey: "55", lwt: true, shuffle: false, want: []string{"0", "2", "3", "1"}}, + }, routingKey: "55", lwt: true, shuffle: false, want: []string{"4", "5", "2", "3", "0", "1"}}, } const keyspace = "myKeyspace" for name, tc := range tests {