diff --git a/cassandra_test.go b/cassandra_test.go index 088993c50..6901f48fa 100644 --- a/cassandra_test.go +++ b/cassandra_test.go @@ -919,7 +919,7 @@ func TestReconnection(t *testing.T) { session := createSessionFromCluster(cluster, t) defer session.Close() - h := session.ring.allHosts()[0] + h := session.hostSource.getHostsList()[0] session.handleNodeDown(h.ConnectAddress(), h.Port()) if h.State() != NodeDown { @@ -1675,7 +1675,7 @@ func TestPrepare_PreparedCacheEviction(t *testing.T) { } // Walk through all the configured hosts and test cache retention and eviction - for _, host := range session.ring.hosts { + for _, host := range session.hostSource.hosts { _, ok := session.stmtsLRU.lru.Get(session.stmtsLRU.keyFor(host.HostID(), session.cfg.Keyspace, "SELECT id,mod FROM prepcachetest WHERE id = 0")) if ok { t.Errorf("expected first select to be purged but was in cache for host=%q", host) @@ -2273,7 +2273,7 @@ func TestTokenAwareConnPool(t *testing.T) { session := createSessionFromCluster(cluster, t) defer session.Close() - expectedPoolSize := cluster.NumConns * len(session.ring.allHosts()) + expectedPoolSize := cluster.NumConns * len(session.hostSource.getHostsList()) // wait for pool to fill for i := 0; i < 50; i++ { diff --git a/control.go b/control.go index 5b6d2732e..c88d7f4e1 100644 --- a/control.go +++ b/control.go @@ -294,7 +294,7 @@ func (c *controlConn) setupConn(conn *Conn) error { return err } - host = c.session.ring.addOrUpdate(host) + host = c.session.hostSource.addOrUpdate(host) if c.session.cfg.filterHost(host) { return fmt.Errorf("host was filtered: %v", host.ConnectAddress()) @@ -385,7 +385,7 @@ func (c *controlConn) reconnect() { } func (c *controlConn) attemptReconnect() (*Conn, error) { - hosts := c.session.ring.allHosts() + hosts := c.session.hostSource.getHostsList() hosts = shuffleHosts(hosts) // keep the old behavior of connecting to the old host first by moving it to diff --git a/control_ccm_test.go b/control_ccm_test.go index 0a2cb558c..21b3c77a9 100644 --- a/control_ccm_test.go +++ b/control_ccm_test.go @@ -108,7 +108,7 @@ func TestControlConn_ReconnectRefreshesRing(t *testing.T) { }() assertNodeDown := func() error { - hosts := session.ring.currentHosts() + hosts := session.hostSource.getHostsMap() if len(hosts) != 1 { return fmt.Errorf("expected 1 host in ring but there were %v", len(hosts)) } @@ -146,7 +146,7 @@ func TestControlConn_ReconnectRefreshesRing(t *testing.T) { } assertNodeUp := func() error { - hosts := session.ring.currentHosts() + hosts := session.hostSource.getHostsMap() if len(hosts) != len(allCcmHosts) { return fmt.Errorf("expected %v hosts in ring but there were %v", len(allCcmHosts), len(hosts)) } diff --git a/events.go b/events.go index 54ca5f4c5..833af82de 100644 --- a/events.go +++ b/events.go @@ -203,7 +203,7 @@ func (s *Session) handleNodeUp(eventIp net.IP, eventPort int) { s.logger.Printf("gocql: Session.handleNodeUp: %s:%d\n", eventIp.String(), eventPort) } - host, ok := s.ring.getHostByIP(eventIp.String()) + host, ok := s.hostSource.getHostByIP(eventIp.String()) if !ok { s.debounceRingRefresh() return @@ -242,7 +242,7 @@ func (s *Session) handleNodeDown(ip net.IP, port int) { s.logger.Printf("gocql: Session.handleNodeDown: %s:%d\n", ip.String(), port) } - host, ok := s.ring.getHostByIP(ip.String()) + host, ok := s.hostSource.getHostByIP(ip.String()) if ok { host.setState(NodeDown) if s.cfg.filterHost(host) { diff --git a/events_ccm_test.go b/events_ccm_test.go index 78a57094e..fbaa00866 100644 --- a/events_ccm_test.go +++ b/events_ccm_test.go @@ -80,7 +80,7 @@ func TestEventNodeDownControl(t *testing.T) { } session.pool.mu.RUnlock() - host := session.ring.getHost(node.Addr) + host := session.hostSource.getHost(node.Addr) if host == nil { t.Fatal("node not in metadata ring") } else if host.IsUp() { @@ -122,7 +122,7 @@ func TestEventNodeDown(t *testing.T) { t.Fatal("node not removed after remove event") } - host := session.ring.getHost(node.Addr) + host := session.hostSource.getHost(node.Addr) if host == nil { t.Fatal("node not in metadata ring") } else if host.IsUp() { @@ -179,7 +179,7 @@ func TestEventNodeUp(t *testing.T) { t.Fatal("node not added after node added event") } - host := session.ring.getHost(node.Addr) + host := session.hostSource.getHost(node.Addr) if host == nil { t.Fatal("node not in metadata ring") } else if !host.IsUp() { diff --git a/export_test.go b/export_test.go index 830436303..058057514 100644 --- a/export_test.go +++ b/export_test.go @@ -9,5 +9,5 @@ var TestLogger = &testLogger{} var WaitUntilPoolsStopFilling = waitUntilPoolsStopFilling func GetRingAllHosts(sess *Session) []*HostInfo { - return sess.ring.allHosts() + return sess.hostSource.getHostsList() } diff --git a/host_source.go b/host_source.go index 7ee6169fa..8b340f8c6 100644 --- a/host_source.go +++ b/host_source.go @@ -675,18 +675,18 @@ func (s *Session) refreshRingNow() error { } func (s *Session) refreshRing() error { - hosts, partitioner, err := s.hostSource.GetHosts() + hosts, partitioner, err := s.hostSource.GetHostsFromSystem() if err != nil { return err } - prevHosts := s.ring.currentHosts() + prevHosts := s.hostSource.getHostsMap() for _, h := range hosts { if s.cfg.filterHost(h) { continue } - if host, ok := s.ring.addHostIfMissing(h); !ok { + if host, ok := s.hostSource.addHostIfMissing(h); !ok { s.startPoolFill(h) } else { // host (by hostID) already exists; determine if IP has changed @@ -702,7 +702,7 @@ func (s *Session) refreshRing() error { // host IP has changed // remove old HostInfo (w/old IP) s.removeHost(existing) - if _, alreadyExists := s.ring.addHostIfMissing(h); alreadyExists { + if _, alreadyExists := s.hostSource.addHostIfMissing(h); alreadyExists { return fmt.Errorf("add new host=%s after removal: %w", h, ErrHostAlreadyExists) } // add new HostInfo (same hostID, new IP) @@ -716,8 +716,6 @@ func (s *Session) refreshRing() error { s.metadataDescriber.removeTabletsWithHost(host) s.removeHost(host) } - - s.metadata.setPartitioner(partitioner) s.policy.SetPartitioner(partitioner) return nil diff --git a/integration_test.go b/integration_test.go index 9e01f068e..4eef23d48 100644 --- a/integration_test.go +++ b/integration_test.go @@ -39,12 +39,12 @@ func TestAuthentication(t *testing.T) { session.Close() } -func TestGetHosts(t *testing.T) { +func TestGetHostsFromSystem(t *testing.T) { clusterHosts := getClusterHosts() cluster := createCluster() session := createSessionFromCluster(cluster, t) - hosts, partitioner, err := session.hostSource.GetHosts() + hosts, partitioner, err := session.hostSource.GetHostsFromSystem() assertTrue(t, "err == nil", err == nil) assertEqual(t, "len(hosts)", len(clusterHosts), len(hosts)) diff --git a/policies.go b/policies.go index 0fec485b4..ca89aecba 100644 --- a/policies.go +++ b/policies.go @@ -982,10 +982,7 @@ func (d *dcAwareRR) IsOperational(session *Session) error { return nil } - hosts, _, err := session.hostSource.GetHosts() - if err != nil { - return fmt.Errorf("gocql: unable to check if session is operational: %v", err) - } + hosts := session.hostSource.getHostsList() for _, host := range hosts { if !session.cfg.filterHost(host) && host.DataCenter() == d.local { // Policy can work properly only if there is at least one host from target DC @@ -1103,10 +1100,7 @@ func (d *rackAwareRR) IsOperational(session *Session) error { if session.cfg.disableInit || session.cfg.disableControlConn { return nil } - hosts, _, err := session.hostSource.GetHosts() - if err != nil { - return fmt.Errorf("gocql: unable to check if session is operational: %v", err) - } + hosts := session.hostSource.getHostsList() for _, host := range hosts { if !session.cfg.filterHost(host) && host.DataCenter() == d.localDC && host.Rack() == d.localRack { // Policy can work properly only if there is at least one host from target DC+Rack diff --git a/ring.go b/ring.go deleted file mode 100644 index 5b77370a1..000000000 --- a/ring.go +++ /dev/null @@ -1,143 +0,0 @@ -package gocql - -import ( - "fmt" - "sync" - "sync/atomic" -) - -type ring struct { - // endpoints are the set of endpoints which the driver will attempt to connect - // to in the case it can not reach any of its hosts. They are also used to boot - // strap the initial connection. - endpoints []*HostInfo - - mu sync.RWMutex - // hosts are the set of all hosts in the cassandra ring that we know of. - // key of map is host_id. - hosts map[string]*HostInfo - // hostIPToUUID maps host native address to host_id. - hostIPToUUID map[string]string - - hostList []*HostInfo - pos uint32 - - // TODO: we should store the ring metadata here also. -} - -func (r *ring) rrHost() *HostInfo { - r.mu.RLock() - defer r.mu.RUnlock() - if len(r.hostList) == 0 { - return nil - } - - pos := int(atomic.AddUint32(&r.pos, 1) - 1) - return r.hostList[pos%len(r.hostList)] -} - -func (r *ring) getHostByIP(ip string) (*HostInfo, bool) { - r.mu.RLock() - defer r.mu.RUnlock() - hi, ok := r.hostIPToUUID[ip] - return r.hosts[hi], ok -} - -func (r *ring) getHost(hostID string) *HostInfo { - r.mu.RLock() - host := r.hosts[hostID] - r.mu.RUnlock() - return host -} - -func (r *ring) allHosts() []*HostInfo { - r.mu.RLock() - hosts := make([]*HostInfo, 0, len(r.hosts)) - for _, host := range r.hosts { - hosts = append(hosts, host) - } - r.mu.RUnlock() - return hosts -} - -func (r *ring) currentHosts() map[string]*HostInfo { - r.mu.RLock() - hosts := make(map[string]*HostInfo, len(r.hosts)) - for k, v := range r.hosts { - hosts[k] = v - } - r.mu.RUnlock() - return hosts -} - -func (r *ring) addOrUpdate(host *HostInfo) *HostInfo { - if existingHost, ok := r.addHostIfMissing(host); ok { - existingHost.update(host) - host = existingHost - } - return host -} - -func (r *ring) addHostIfMissing(host *HostInfo) (*HostInfo, bool) { - if host.invalidConnectAddr() { - panic(fmt.Sprintf("invalid host: %v", host)) - } - hostID := host.HostID() - - r.mu.Lock() - if r.hosts == nil { - r.hosts = make(map[string]*HostInfo) - } - if r.hostIPToUUID == nil { - r.hostIPToUUID = make(map[string]string) - } - - existing, ok := r.hosts[hostID] - if !ok { - r.hosts[hostID] = host - r.hostIPToUUID[host.nodeToNodeAddress().String()] = hostID - existing = host - r.hostList = append(r.hostList, host) - } - r.mu.Unlock() - return existing, ok -} - -func (r *ring) removeHost(hostID string) bool { - r.mu.Lock() - if r.hosts == nil { - r.hosts = make(map[string]*HostInfo) - } - if r.hostIPToUUID == nil { - r.hostIPToUUID = make(map[string]string) - } - - h, ok := r.hosts[hostID] - if ok { - for i, host := range r.hostList { - if host.HostID() == hostID { - r.hostList = append(r.hostList[:i], r.hostList[i+1:]...) - break - } - } - delete(r.hostIPToUUID, h.nodeToNodeAddress().String()) - } - delete(r.hosts, hostID) - r.mu.Unlock() - return ok -} - -type clusterMetadata struct { - mu sync.RWMutex - partitioner string -} - -func (c *clusterMetadata) setPartitioner(partitioner string) { - c.mu.Lock() - defer c.mu.Unlock() - - if c.partitioner != partitioner { - // TODO: update other things now - c.partitioner = partitioner - } -} diff --git a/ring_describer.go b/ring_describer.go index 8e65dd8cc..cad929dda 100644 --- a/ring_describer.go +++ b/ring_describer.go @@ -13,9 +13,15 @@ type ringDescriber struct { control controlConnection cfg *ClusterConfig logger StdLogger - mu sync.Mutex + mu sync.RWMutex prevHosts []*HostInfo prevPartitioner string + + // hosts are the set of all hosts in the cassandra ring that we know of. + // key of map is host_id. + hosts map[string]*HostInfo + // hostIPToUUID maps host native address to host_id. + hostIPToUUID map[string]string } func (r *ringDescriber) setControlConn(c controlConnection) { @@ -97,8 +103,8 @@ func isZeroToken(host *HostInfo) bool { return len(host.tokens) == 0 } -// GetHosts returns a list of hosts found via queries to system.local and system.peers -func (r *ringDescriber) GetHosts() ([]*HostInfo, string, error) { +// GetHostsFromSystem returns a list of hosts found via queries to system.local and system.peers +func (r *ringDescriber) GetHostsFromSystem() ([]*HostInfo, string, error) { r.mu.Lock() defer r.mu.Unlock() @@ -128,6 +134,9 @@ func (r *ringDescriber) GetHosts() ([]*HostInfo, string, error) { partitioner = hosts[0].Partitioner() } + r.prevHosts = hosts + r.prevPartitioner = partitioner + return hosts, partitioner, nil } @@ -180,3 +189,87 @@ func (r *ringDescriber) getHostInfo(hostID UUID) (*HostInfo, error) { return host, nil } + +func (r *ringDescriber) getHostByIP(ip string) (*HostInfo, bool) { + r.mu.RLock() + defer r.mu.RUnlock() + hi, ok := r.hostIPToUUID[ip] + return r.hosts[hi], ok +} + +func (r *ringDescriber) getHost(hostID string) *HostInfo { + r.mu.RLock() + host := r.hosts[hostID] + r.mu.RUnlock() + return host +} + +func (r *ringDescriber) getHostsList() []*HostInfo { + r.mu.RLock() + hosts := make([]*HostInfo, 0, len(r.hosts)) + for _, host := range r.hosts { + hosts = append(hosts, host) + } + r.mu.RUnlock() + return hosts +} + +func (r *ringDescriber) getHostsMap() map[string]*HostInfo { + r.mu.RLock() + hosts := make(map[string]*HostInfo, len(r.hosts)) + for k, v := range r.hosts { + hosts[k] = v + } + r.mu.RUnlock() + return hosts +} + +func (r *ringDescriber) addOrUpdate(host *HostInfo) *HostInfo { + if existingHost, ok := r.addHostIfMissing(host); ok { + existingHost.update(host) + host = existingHost + } + return host +} + +func (r *ringDescriber) addHostIfMissing(host *HostInfo) (*HostInfo, bool) { + if host.invalidConnectAddr() { + panic(fmt.Sprintf("invalid host: %v", host)) + } + hostID := host.HostID() + + r.mu.Lock() + if r.hosts == nil { + r.hosts = make(map[string]*HostInfo) + } + if r.hostIPToUUID == nil { + r.hostIPToUUID = make(map[string]string) + } + + existing, ok := r.hosts[hostID] + if !ok { + r.hosts[hostID] = host + r.hostIPToUUID[host.nodeToNodeAddress().String()] = hostID + existing = host + } + r.mu.Unlock() + return existing, ok +} + +func (r *ringDescriber) removeHost(hostID string) bool { + r.mu.Lock() + if r.hosts == nil { + r.hosts = make(map[string]*HostInfo) + } + if r.hostIPToUUID == nil { + r.hostIPToUUID = make(map[string]string) + } + + h, ok := r.hosts[hostID] + if ok { + delete(r.hostIPToUUID, h.nodeToNodeAddress().String()) + } + delete(r.hosts, hostID) + r.mu.Unlock() + return ok +} diff --git a/ring_describer_test.go b/ring_describer_test.go index 4f02b5fb3..944b45a04 100644 --- a/ring_describer_test.go +++ b/ring_describer_test.go @@ -322,10 +322,10 @@ func marshalMetadataMust(metadata resultMetadata, data []interface{}) [][]byte { return res } -func TestGetHosts(t *testing.T) { +func TestGetHostsFromSystem(t *testing.T) { r := &ringDescriber{control: &mockControlConn{}, cfg: &ClusterConfig{}} - hosts, _, err := r.GetHosts() + hosts, _, err := r.GetHostsFromSystem() if err != nil { t.Fatalf("unable to get hosts: %v", err) } @@ -334,3 +334,35 @@ func TestGetHosts(t *testing.T) { assertEqual(t, "hosts length", 1, len(hosts)) assertEqual(t, "host token length", 2, len(hosts[0].tokens)) } + +func TestRing_AddHostIfMissing_Missing(t *testing.T) { + ring := &ringDescriber{} + + host := &HostInfo{hostId: MustRandomUUID().String(), connectAddress: net.IPv4(1, 1, 1, 1)} + h1, ok := ring.addHostIfMissing(host) + if ok { + t.Fatal("host was reported as already existing") + } else if !h1.Equal(host) { + t.Fatalf("hosts not equal that are returned %v != %v", h1, host) + } else if h1 != host { + t.Fatalf("returned host same pointer: %p != %p", h1, host) + } +} + +func TestRing_AddHostIfMissing_Existing(t *testing.T) { + ring := &ringDescriber{} + + host := &HostInfo{hostId: MustRandomUUID().String(), connectAddress: net.IPv4(1, 1, 1, 1)} + ring.addHostIfMissing(host) + + h2 := &HostInfo{hostId: host.hostId, connectAddress: net.IPv4(2, 2, 2, 2)} + + h1, ok := ring.addHostIfMissing(h2) + if !ok { + t.Fatal("host was not reported as already existing") + } else if !h1.Equal(host) { + t.Fatalf("hosts not equal that are returned %v != %v", h1, host) + } else if h1 != host { + t.Fatalf("returned host same pointer: %p != %p", h1, host) + } +} diff --git a/ring_test.go b/ring_test.go deleted file mode 100644 index 2a44b8198..000000000 --- a/ring_test.go +++ /dev/null @@ -1,41 +0,0 @@ -//go:build all || unit -// +build all unit - -package gocql - -import ( - "net" - "testing" -) - -func TestRing_AddHostIfMissing_Missing(t *testing.T) { - ring := &ring{} - - host := &HostInfo{hostId: MustRandomUUID().String(), connectAddress: net.IPv4(1, 1, 1, 1)} - h1, ok := ring.addHostIfMissing(host) - if ok { - t.Fatal("host was reported as already existing") - } else if !h1.Equal(host) { - t.Fatalf("hosts not equal that are returned %v != %v", h1, host) - } else if h1 != host { - t.Fatalf("returned host same pointer: %p != %p", h1, host) - } -} - -func TestRing_AddHostIfMissing_Existing(t *testing.T) { - ring := &ring{} - - host := &HostInfo{hostId: MustRandomUUID().String(), connectAddress: net.IPv4(1, 1, 1, 1)} - ring.addHostIfMissing(host) - - h2 := &HostInfo{hostId: host.hostId, connectAddress: net.IPv4(2, 2, 2, 2)} - - h1, ok := ring.addHostIfMissing(h2) - if !ok { - t.Fatal("host was not reported as already existing") - } else if !h1.Equal(host) { - t.Fatalf("hosts not equal that are returned %v != %v", h1, host) - } else if h1 != host { - t.Fatalf("returned host same pointer: %p != %p", h1, host) - } -} diff --git a/scylla_shard_aware_port_common_test.go b/scylla_shard_aware_port_common_test.go index d9d4e0eb3..538937162 100644 --- a/scylla_shard_aware_port_common_test.go +++ b/scylla_shard_aware_port_common_test.go @@ -51,7 +51,7 @@ func testShardAwarePortNoReconnections(t *testing.T, makeCluster makeClusterTest return } - hosts := sess.ring.allHosts() + hosts := sess.hostSource.getHostsList() for _, host := range hosts { t.Logf("checking host %q hostID: %q", host.hostname, host.hostId) hostPool, ok := sess.pool.getPool(host) @@ -191,7 +191,7 @@ func testShardAwarePortUnusedIfNotEnabled(t *testing.T, makeCluster makeClusterT t.Fatal(err) } - hosts := sess.ring.allHosts() + hosts := sess.hostSource.getHostsList() for _, host := range hosts { t.Logf("checking host %s", host.hostname) hostPool, _ := sess.pool.getPool(host) @@ -237,7 +237,7 @@ func getShardAwareAddress(pool *hostConnPool, useTLS bool) string { } func triggerPoolsRefill(sess *Session) { - hosts := sess.ring.allHosts() + hosts := sess.hostSource.getHostsList() for _, host := range hosts { hostPool, _ := sess.pool.getPool(host) go hostPool.fill_debounce() @@ -263,7 +263,7 @@ func waitUntilPoolsStopFilling(ctx context.Context, sess *Session, timeout time. } func checkIfPoolsStoppedFilling(sess *Session) bool { - hosts := sess.ring.allHosts() + hosts := sess.hostSource.getHostsList() for _, host := range hosts { hostPool, _ := sess.pool.getPool(host) @@ -280,7 +280,7 @@ func checkIfPoolsStoppedFilling(sess *Session) bool { } func checkIfPoolsAreFull(sess *Session) bool { - hosts := sess.ring.allHosts() + hosts := sess.hostSource.getHostsList() for _, host := range hosts { hostPool, _ := sess.pool.getPool(host) diff --git a/session.go b/session.go index bbb61692e..643897a3a 100644 --- a/session.go +++ b/session.go @@ -52,9 +52,6 @@ type Session struct { pool *policyConnPool policy HostSelectionPolicy - ring ring - metadata clusterMetadata - mu sync.RWMutex control controlConnection @@ -210,7 +207,6 @@ func (s *Session) init() error { if err != nil { return err } - s.ring.endpoints = hosts if !s.cfg.disableControlConn { s.control = createControlConn(s) @@ -265,7 +261,7 @@ func (s *Session) init() error { if !s.cfg.DisableInitialHostLookup { var partitioner string - newHosts, partitioner, err := s.hostSource.GetHosts() + newHosts, partitioner, err := s.hostSource.GetHostsFromSystem() if err != nil { return err } @@ -313,7 +309,7 @@ func (s *Session) init() error { // again atomic.AddInt64(&left, 1) for _, host := range hostMap { - host := s.ring.addOrUpdate(host) + host := s.hostSource.addOrUpdate(host) if s.cfg.filterHost(host) { continue } @@ -375,7 +371,7 @@ func (s *Session) init() error { newer, _ := checkSystemSchema(s.control) s.useSystemSchema = newer } else { - version := s.ring.rrHost().Version() + version := s.hostSource.getHostsList()[0].Version() s.useSystemSchema = version.AtLeast(3, 0, 0) s.hasAggregatesAndFunctions = version.AtLeast(2, 2, 0) } @@ -418,11 +414,11 @@ func (s *Session) reconnectDownedHosts(intv time.Duration) { for { select { case <-reconnectTicker.C: - hosts := s.ring.allHosts() + hosts := s.hostSource.getHostsList() - // Print session.ring for debug. + // Print session.hostSource for debug. if gocqlDebug { - buf := bytes.NewBufferString("Session.ring:") + buf := bytes.NewBufferString("Session.hostSource:") for _, h := range hosts { buf.WriteString("[" + h.ConnectAddress().String() + ":" + h.State().String() + "]") } @@ -594,7 +590,7 @@ func (s *Session) removeHost(h *HostInfo) { s.policy.RemoveHost(h) hostID := h.HostID() s.pool.removeHost(hostID) - s.ring.removeHost(hostID) + s.hostSource.removeHost(hostID) } // KeyspaceMetadata returns the schema metadata for the keyspace specified. Returns an error if the keyspace does not exist. @@ -622,7 +618,7 @@ func (s *Session) TabletsMetadata() (TabletInfoList, error) { } func (s *Session) getConn() *Conn { - hosts := s.ring.allHosts() + hosts := s.hostSource.getHostsList() for _, host := range hosts { if !host.IsUp() { continue diff --git a/tablet_integration_test.go b/tablet_integration_test.go index 1419a3c00..d2b6ded3e 100644 --- a/tablet_integration_test.go +++ b/tablet_integration_test.go @@ -24,8 +24,7 @@ func TestTablets(t *testing.T) { panic(fmt.Sprintf("unable to create table: %v", err)) } - hosts, _, err := session.hostSource.GetHosts() - assertTrue(t, "err == nil", err == nil) + hosts := session.hostSource.getHostsList() hostAddresses := []string{} for _, host := range hosts { @@ -37,7 +36,7 @@ func TestTablets(t *testing.T) { i := 0 for i < 50 { i = i + 1 - err = session.Query(`INSERT INTO test1.table1 (pk, ck, v) VALUES (?, ?, ?);`, i, i%5, i%2).WithContext(ctx).Exec() + err := session.Query(`INSERT INTO test1.table1 (pk, ck, v) VALUES (?, ?, ?);`, i, i%5, i%2).WithContext(ctx).Exec() if err != nil { t.Fatal(err) } @@ -52,7 +51,7 @@ func TestTablets(t *testing.T) { var ck int var v int - err = session.Query(`SELECT pk, ck, v FROM test1.table1 WHERE pk = ?;`, i).WithContext(ctx).Consistency(One).Trace(trace).Scan(&pk, &ck, &v) + err := session.Query(`SELECT pk, ck, v FROM test1.table1 WHERE pk = ?;`, i).WithContext(ctx).Consistency(One).Trace(trace).Scan(&pk, &ck, &v) if err != nil { t.Fatal(err) }