Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge ring and ringDescriber #373

Merged
merged 7 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -919,7 +919,7 @@ func TestReconnection(t *testing.T) {
session := createSessionFromCluster(cluster, t)
defer session.Close()

h := session.ring.allHosts()[0]
h := session.hostSource.allHosts()[0]
session.handleNodeDown(h.ConnectAddress(), h.Port())

if h.State() != NodeDown {
Expand Down Expand Up @@ -1675,7 +1675,7 @@ func TestPrepare_PreparedCacheEviction(t *testing.T) {
}

// Walk through all the configured hosts and test cache retention and eviction
for _, host := range session.ring.hosts {
for _, host := range session.hostSource.hosts {
_, ok := session.stmtsLRU.lru.Get(session.stmtsLRU.keyFor(host.HostID(), session.cfg.Keyspace, "SELECT id,mod FROM prepcachetest WHERE id = 0"))
if ok {
t.Errorf("expected first select to be purged but was in cache for host=%q", host)
Expand Down Expand Up @@ -2273,7 +2273,7 @@ func TestTokenAwareConnPool(t *testing.T) {
session := createSessionFromCluster(cluster, t)
defer session.Close()

expectedPoolSize := cluster.NumConns * len(session.ring.allHosts())
expectedPoolSize := cluster.NumConns * len(session.hostSource.allHosts())

// wait for pool to fill
for i := 0; i < 50; i++ {
Expand Down
4 changes: 2 additions & 2 deletions control.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func (c *controlConn) setupConn(conn *Conn) error {
return err
}

host = c.session.ring.addOrUpdate(host)
host = c.session.hostSource.addOrUpdate(host)

if c.session.cfg.filterHost(host) {
return fmt.Errorf("host was filtered: %v", host.ConnectAddress())
Expand Down Expand Up @@ -385,7 +385,7 @@ func (c *controlConn) reconnect() {
}

func (c *controlConn) attemptReconnect() (*Conn, error) {
hosts := c.session.ring.allHosts()
hosts := c.session.hostSource.allHosts()
hosts = shuffleHosts(hosts)

// keep the old behavior of connecting to the old host first by moving it to
Expand Down
4 changes: 2 additions & 2 deletions control_ccm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func TestControlConn_ReconnectRefreshesRing(t *testing.T) {
}()

assertNodeDown := func() error {
hosts := session.ring.currentHosts()
hosts := session.hostSource.currentHosts()
if len(hosts) != 1 {
return fmt.Errorf("expected 1 host in ring but there were %v", len(hosts))
}
Expand Down Expand Up @@ -146,7 +146,7 @@ func TestControlConn_ReconnectRefreshesRing(t *testing.T) {
}

assertNodeUp := func() error {
hosts := session.ring.currentHosts()
hosts := session.hostSource.currentHosts()
if len(hosts) != len(allCcmHosts) {
return fmt.Errorf("expected %v hosts in ring but there were %v", len(allCcmHosts), len(hosts))
}
Expand Down
4 changes: 2 additions & 2 deletions events.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (s *Session) handleNodeUp(eventIp net.IP, eventPort int) {
s.logger.Printf("gocql: Session.handleNodeUp: %s:%d\n", eventIp.String(), eventPort)
}

host, ok := s.ring.getHostByIP(eventIp.String())
host, ok := s.hostSource.getHostByIP(eventIp.String())
if !ok {
s.debounceRingRefresh()
return
Expand Down Expand Up @@ -242,7 +242,7 @@ func (s *Session) handleNodeDown(ip net.IP, port int) {
s.logger.Printf("gocql: Session.handleNodeDown: %s:%d\n", ip.String(), port)
}

host, ok := s.ring.getHostByIP(ip.String())
host, ok := s.hostSource.getHostByIP(ip.String())
if ok {
host.setState(NodeDown)
if s.cfg.filterHost(host) {
Expand Down
6 changes: 3 additions & 3 deletions events_ccm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestEventNodeDownControl(t *testing.T) {
}
session.pool.mu.RUnlock()

host := session.ring.getHost(node.Addr)
host := session.hostSource.getHost(node.Addr)
if host == nil {
t.Fatal("node not in metadata ring")
} else if host.IsUp() {
Expand Down Expand Up @@ -122,7 +122,7 @@ func TestEventNodeDown(t *testing.T) {
t.Fatal("node not removed after remove event")
}

host := session.ring.getHost(node.Addr)
host := session.hostSource.getHost(node.Addr)
if host == nil {
t.Fatal("node not in metadata ring")
} else if host.IsUp() {
Expand Down Expand Up @@ -179,7 +179,7 @@ func TestEventNodeUp(t *testing.T) {
t.Fatal("node not added after node added event")
}

host := session.ring.getHost(node.Addr)
host := session.hostSource.getHost(node.Addr)
if host == nil {
t.Fatal("node not in metadata ring")
} else if !host.IsUp() {
Expand Down
2 changes: 1 addition & 1 deletion export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ var TestLogger = &testLogger{}
var WaitUntilPoolsStopFilling = waitUntilPoolsStopFilling

func GetRingAllHosts(sess *Session) []*HostInfo {
return sess.ring.allHosts()
return sess.hostSource.allHosts()
}
8 changes: 3 additions & 5 deletions host_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,14 +679,14 @@ func (s *Session) refreshRing() error {
if err != nil {
return err
}
prevHosts := s.ring.currentHosts()
prevHosts := s.hostSource.currentHosts()

for _, h := range hosts {
if s.cfg.filterHost(h) {
continue
}

if host, ok := s.ring.addHostIfMissing(h); !ok {
if host, ok := s.hostSource.addHostIfMissing(h); !ok {
s.startPoolFill(h)
} else {
// host (by hostID) already exists; determine if IP has changed
Expand All @@ -702,7 +702,7 @@ func (s *Session) refreshRing() error {
// host IP has changed
// remove old HostInfo (w/old IP)
s.removeHost(existing)
if _, alreadyExists := s.ring.addHostIfMissing(h); alreadyExists {
if _, alreadyExists := s.hostSource.addHostIfMissing(h); alreadyExists {
return fmt.Errorf("add new host=%s after removal: %w", h, ErrHostAlreadyExists)
}
// add new HostInfo (same hostID, new IP)
Expand All @@ -716,8 +716,6 @@ func (s *Session) refreshRing() error {
s.metadataDescriber.removeTabletsWithHost(host)
s.removeHost(host)
}

s.metadata.setPartitioner(partitioner)
s.policy.SetPartitioner(partitioner)

return nil
Expand Down
10 changes: 2 additions & 8 deletions policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -982,10 +982,7 @@ func (d *dcAwareRR) IsOperational(session *Session) error {
return nil
}

hosts, _, err := session.hostSource.GetHosts()
if err != nil {
return fmt.Errorf("gocql: unable to check if session is operational: %v", err)
}
hosts := session.hostSource.allHosts()
for _, host := range hosts {
if !session.cfg.filterHost(host) && host.DataCenter() == d.local {
// Policy can work properly only if there is at least one host from target DC
Expand Down Expand Up @@ -1103,10 +1100,7 @@ func (d *rackAwareRR) IsOperational(session *Session) error {
if session.cfg.disableInit || session.cfg.disableControlConn {
return nil
}
hosts, _, err := session.hostSource.GetHosts()
if err != nil {
return fmt.Errorf("gocql: unable to check if session is operational: %v", err)
}
hosts := session.hostSource.allHosts()
for _, host := range hosts {
if !session.cfg.filterHost(host) && host.DataCenter() == d.localDC && host.Rack() == d.localRack {
// Policy can work properly only if there is at least one host from target DC+Rack
Expand Down
143 changes: 0 additions & 143 deletions ring.go

This file was deleted.

Loading
Loading