diff --git a/cluster.go b/cluster.go index 452d138cf..a17dc4408 100644 --- a/cluster.go +++ b/cluster.go @@ -244,6 +244,9 @@ type ClusterConfig struct { // internal config for testing disableControlConn bool disableInit bool + + // If true tablet feature is enabled + experimentalTabletsEnabled bool } type Dialer interface { @@ -277,6 +280,7 @@ func NewCluster(hosts ...string) *ClusterConfig { ConvictionPolicy: &SimpleConvictionPolicy{}, ReconnectionPolicy: &ConstantReconnectionPolicy{MaxRetries: 3, Interval: 1 * time.Second}, WriteCoalesceWaitTime: 200 * time.Microsecond, + experimentalTabletsEnabled: false, } return cfg @@ -314,6 +318,10 @@ func (cfg *ClusterConfig) filterHost(host *HostInfo) bool { return !(cfg.HostFilter == nil || cfg.HostFilter.Accept(host)) } +func (cfg *ClusterConfig) enableExperimentalTablets() { + cfg.experimentalTabletsEnabled = true +} + var ( ErrNoHosts = errors.New("no hosts provided") ErrNoConnectionsStarted = errors.New("no connections were made when creating the session") diff --git a/connectionpool.go b/connectionpool.go index 52a4f0fe4..4e61f3062 100644 --- a/connectionpool.go +++ b/connectionpool.go @@ -27,6 +27,7 @@ type SetPartitioner interface { } // interface to implement to receive the tablets value +// Experimental, this interface and use may change type SetTablets interface { SetTablets(tablets []*TabletInfo) } diff --git a/host_source.go b/host_source.go index 96d0de23e..1a549d1a2 100644 --- a/host_source.go +++ b/host_source.go @@ -472,11 +472,13 @@ func (h *HostInfo) ScyllaShardAwarePortTLS() uint16 { return h.scyllaShardAwarePortTLS } +// Experimental, this interface and use may change type ReplicaInfo struct { hostId UUID shardId int } +// Experimental, this interface and use may change type TabletInfo struct { mu sync.RWMutex keyspaceName string @@ -571,6 +573,7 @@ type ringDescriber struct { mu sync.Mutex prevHosts []*HostInfo prevPartitioner string + // Experimental, this interface and use may change prevTablets []*TabletInfo } @@ -714,6 +717,7 @@ func (s *Session) hostInfoFromMap(row map[string]interface{}, host *HostInfo) (* // Given a map that represents a row from system.tablets // return as much information as we can in *TabletInfo +// Experimental, this interface and use may change func (s *Session) tabletInfoFromMap(row map[string]interface{}, tablet *TabletInfo) (*TabletInfo, error) { const assertErrorMsg = "Assertion failed for %s" var ok bool @@ -879,6 +883,7 @@ func (r *ringDescriber) getClusterPeerInfo(localHost *HostInfo) ([]*HostInfo, er } // Ask the control node for info about tablets +// Experimental, this interface and use may change func (r *ringDescriber) getSystemTabletsInfo() ([]*TabletInfo, error) { if r.session.control == nil { return nil, errNoControl @@ -916,6 +921,7 @@ func (r *ringDescriber) getSystemTabletsInfo() ([]*TabletInfo, error) { } // Return true if the tablet is valid +// Experimental, this interface and use may change func isValidTablet(tablet *TabletInfo) bool { return tablet.replicas != nil && len(tablet.replicas) != 0 && tablet.tableName != "" } @@ -954,6 +960,7 @@ func (r *ringDescriber) GetHosts() ([]*HostInfo, string, error) { } // GetTablets returns a list of tablets found via queries to system.tablets +// Experimental, this interface and use may change func (r *ringDescriber) GetTablets() ([]*TabletInfo, error) { r.mu.Lock() defer r.mu.Unlock() @@ -968,6 +975,7 @@ func (r *ringDescriber) GetTablets() ([]*TabletInfo, error) { } // True if experimental feature "tablets" is enabled and it is possible to query system.tablets +// Experimental, this interface and use may change func (r *ringDescriber) UsesTablets() bool { _, err := r.GetTablets() if err != nil { @@ -1086,6 +1094,7 @@ func refreshRing(r *ringDescriber) error { return nil } +// Experimental, this interface and use may change func refreshTablets(r *ringDescriber) error { tablets, err := r.GetTablets() if err != nil { @@ -1211,6 +1220,7 @@ func (d *refreshDebouncer) stop() { } // used to refresh tablets every x seconds +// Experimental, this interface and use may change type tabletRefresher struct { mu sync.Mutex ticker *time.Ticker diff --git a/metadata_scylla.go b/metadata_scylla.go index 66f05961c..222c2b11e 100644 --- a/metadata_scylla.go +++ b/metadata_scylla.go @@ -134,11 +134,13 @@ type IndexMetadata struct { } // TabletsMetadata holds metadata for tablet list +// Experimental, this interface and use may change type TabletsMetadata struct { Tablets []*TabletMetadata } // TabletMetadata holds metadata for single tablet +// Experimental, this interface and use may change type TabletMetadata struct { KeyspaceName string TableId UUID @@ -151,6 +153,7 @@ type TabletMetadata struct { } // TabletMetadata holds metadata for single replica +// Experimental, this interface and use may change type ReplicaMetadata struct { HostId UUID ShardId int @@ -245,6 +248,8 @@ type schemaDescriber struct { mu sync.Mutex cache map[string]*KeyspaceMetadata + + // Experimental, this interface and use may change tabletsCache *TabletsMetadata } @@ -278,6 +283,7 @@ func (s *schemaDescriber) getSchema(keyspaceName string) (*KeyspaceMetadata, err return metadata, nil } +// Experimental, this interface and use may change func (s *schemaDescriber) getTabletsSchema() *TabletsMetadata { s.mu.Lock() defer s.mu.Unlock() @@ -287,6 +293,7 @@ func (s *schemaDescriber) getTabletsSchema() *TabletsMetadata { return metadata } +// Experimental, this interface and use may change func (s *schemaDescriber) refreshTabletsSchema() { tablets := s.session.getTablets() s.tabletsCache.Tablets = []*TabletMetadata{} diff --git a/policies.go b/policies.go index b082c033b..678bea027 100644 --- a/policies.go +++ b/policies.go @@ -96,6 +96,7 @@ func (c *cowHostList) remove(ip net.IP) bool { } // cowTabletList implements a copy on write tablet list, its equivalent type is []*TabletInfo +// Experimental, this interface and use may change type cowTabletList struct { list atomic.Value mu sync.Mutex @@ -306,6 +307,7 @@ type HostTierer interface { type HostSelectionPolicy interface { HostStateNotifier SetPartitioner + // Experimental, this interface and use may change SetTablets KeyspaceChanged(KeyspaceUpdateEvent) Init(*Session) @@ -357,9 +359,11 @@ type roundRobinHostPolicy struct { func (r *roundRobinHostPolicy) IsLocal(*HostInfo) bool { return true } func (r *roundRobinHostPolicy) KeyspaceChanged(KeyspaceUpdateEvent) {} func (r *roundRobinHostPolicy) SetPartitioner(partitioner string) {} -func (r *roundRobinHostPolicy) SetTablets(tablets []*TabletInfo) {} func (r *roundRobinHostPolicy) Init(*Session) {} +// Experimental, this interface and use may change +func (r *roundRobinHostPolicy) SetTablets(tablets []*TabletInfo) {} + func (r *roundRobinHostPolicy) Pick(qry ExecutableQuery) NextHost { nextStartOffset := atomic.AddUint64(&r.lastUsedHostIdx, 1) return roundRobbin(int(nextStartOffset), r.hosts.get()) @@ -437,6 +441,7 @@ type tokenAwareHostPolicy struct { logger StdLogger + // Experimental, this interface and use may change tablets cowTabletList } @@ -504,6 +509,7 @@ func (t *tokenAwareHostPolicy) SetPartitioner(partitioner string) { } } +// Experimental, this interface and use may change func (t *tokenAwareHostPolicy) SetTablets(tablets []*TabletInfo) { t.mu.Lock() defer t.mu.Unlock() @@ -630,27 +636,44 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost { var replicas []*HostInfo - t.tablets.mu.Lock() - tablets := t.tablets.get() - - // Search for tablets with Keyspce and Table from the Query - l := findTablets(tablets, qry.Keyspace(), qry.Table()) - - if l != -1 { - tablet := findTabletForToken(tablets, token, l) - - replicas = []*HostInfo{} - for _, replica := range tablet.Replicas() { - t.hosts.mu.Lock() - hosts := t.hosts.get() - for _, host := range hosts { - if host.hostId == replica.hostId.String() { - replicas = append(replicas, host) - break + if qry.GetSession() != nil && qry.GetSession().cfg.experimentalTabletsEnabled { + t.tablets.mu.Lock() + tablets := t.tablets.get() + + // Search for tablets with Keyspce and Table from the Query + l := findTablets(tablets, qry.Keyspace(), qry.Table()) + + if l != -1 { + tablet := findTabletForToken(tablets, token, l) + + replicas = []*HostInfo{} + for _, replica := range tablet.Replicas() { + t.hosts.mu.Lock() + hosts := t.hosts.get() + for _, host := range hosts { + if host.hostId == replica.hostId.String() { + replicas = append(replicas, host) + break + } } + t.hosts.mu.Unlock() } - t.hosts.mu.Unlock() + } else { + ht := meta.replicas[qry.Keyspace()].replicasFor(token) + + if ht == nil { + host, _ := meta.tokenRing.GetHostForToken(token) + replicas = []*HostInfo{host} + } else { + replicas = ht.hosts + } + } + + if t.shuffleReplicas && !qry.IsLWT() { + replicas = shuffleHosts(replicas) } + + t.tablets.mu.Unlock() } else { ht := meta.replicas[qry.Keyspace()].replicasFor(token) @@ -659,15 +682,12 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost { replicas = []*HostInfo{host} } else { replicas = ht.hosts + if t.shuffleReplicas && !qry.IsLWT() { + replicas = shuffleHosts(replicas) + } } } - if t.shuffleReplicas && !qry.IsLWT() { - replicas = shuffleHosts(replicas) - } - - t.tablets.mu.Unlock() - var ( fallbackIter NextHost i, j, k int @@ -775,9 +795,11 @@ type hostPoolHostPolicy struct { func (r *hostPoolHostPolicy) Init(*Session) {} func (r *hostPoolHostPolicy) KeyspaceChanged(KeyspaceUpdateEvent) {} func (r *hostPoolHostPolicy) SetPartitioner(string) {} -func (r *hostPoolHostPolicy) SetTablets(tablets []*TabletInfo) {} func (r *hostPoolHostPolicy) IsLocal(*HostInfo) bool { return true } +// Experimental, this interface and use may change +func (r *hostPoolHostPolicy) SetTablets(tablets []*TabletInfo) {} + func (r *hostPoolHostPolicy) SetHosts(hosts []*HostInfo) { peers := make([]string, len(hosts)) hostMap := make(map[string]*HostInfo, len(hosts)) @@ -912,12 +934,13 @@ func DCAwareRoundRobinPolicy(localDC string) HostSelectionPolicy { func (d *dcAwareRR) Init(*Session) {} func (d *dcAwareRR) KeyspaceChanged(KeyspaceUpdateEvent) {} func (d *dcAwareRR) SetPartitioner(p string) {} -func (d *dcAwareRR) SetTablets(tablets []*TabletInfo) {} - func (d *dcAwareRR) IsLocal(host *HostInfo) bool { return host.DataCenter() == d.local } +// Experimental, this interface and use may change +func (d *dcAwareRR) SetTablets(tablets []*TabletInfo) {} + func (d *dcAwareRR) AddHost(host *HostInfo) { if d.IsLocal(host) { d.localHosts.add(host) @@ -1006,12 +1029,13 @@ func RackAwareRoundRobinPolicy(localDC string, localRack string) HostSelectionPo func (d *rackAwareRR) Init(*Session) {} func (d *rackAwareRR) KeyspaceChanged(KeyspaceUpdateEvent) {} func (d *rackAwareRR) SetPartitioner(p string) {} -func (d *rackAwareRR) SetTablets(tablets []*TabletInfo) {} - func (d *rackAwareRR) MaxHostTier() uint { return 2 } +// Experimental, this interface and use may change +func (d *rackAwareRR) SetTablets(tablets []*TabletInfo) {} + func (d *rackAwareRR) HostTier(host *HostInfo) uint { if host.DataCenter() == d.localDC { if host.Rack() == d.localRack { diff --git a/query_executor.go b/query_executor.go index 6bd7361ea..f0d4e761f 100644 --- a/query_executor.go +++ b/query_executor.go @@ -23,6 +23,8 @@ type ExecutableQuery interface { withContext(context.Context) ExecutableQuery RetryableQuery + + GetSession() *Session } type queryExecutor struct { diff --git a/ring.go b/ring.go index 58c76124d..86970a766 100644 --- a/ring.go +++ b/ring.go @@ -22,6 +22,7 @@ type ring struct { hostList []*HostInfo pos uint32 + // Experimental, this interface and use may change tabletList []*TabletInfo // TODO: we should store the ring metadata here also. @@ -144,6 +145,7 @@ func (c *clusterMetadata) setPartitioner(partitioner string) { } } +// Experimental, this interface and use may change func (r *ring) setTablets(newTablets []*TabletInfo) { r.mu.Lock() defer r.mu.Unlock() diff --git a/scylla.go b/scylla.go index 8cb416655..07bd9c99e 100644 --- a/scylla.go +++ b/scylla.go @@ -330,17 +330,21 @@ func (p *scyllaConnPicker) Pick(t token, keyspace string, table string) *Conn { return nil } - tablets := p.conns[0].session.getTablets() - var idx int - // Search for tablets with Keyspace and Table from the Query - l := findTablets(tablets, keyspace, table) + if p.conns[0].session.cfg.experimentalTabletsEnabled { + tablets := p.conns[0].session.getTablets() + + // Search for tablets with Keyspace and Table from the Query + l := findTablets(tablets, keyspace, table) - if l != -1 { - tablet := findTabletForToken(tablets, mmt, l) + if l != -1 { + tablet := findTabletForToken(tablets, mmt, l) - idx = tablet.replicas[0].shardId + idx = tablet.replicas[0].shardId + } else { + idx = p.shardOf(mmt) + } } else { idx = p.shardOf(mmt) } diff --git a/session.go b/session.go index 3a7020df3..6776f2e83 100644 --- a/session.go +++ b/session.go @@ -197,7 +197,7 @@ func NewSession(cfg ClusterConfig) (*Session, error) { } } - if s.hostSource.UsesTablets() { + if s.cfg.experimentalTabletsEnabled && s.hostSource.UsesTablets() { s.tabletTicker = newTabletRefresher(tabletRefreshTime, func() error { return refreshTablets(s.hostSource) }) } @@ -250,7 +250,7 @@ func (s *Session) init() error { hosts = filteredHosts - if s.hostSource.UsesTablets() { + if s.cfg.experimentalTabletsEnabled && s.hostSource.UsesTablets() { tablets, err := s.hostSource.GetTablets() if err != nil { return err @@ -583,6 +583,7 @@ func (s *Session) KeyspaceMetadata(keyspace string) (*KeyspaceMetadata, error) { } // TabletsMetadata returns the metadata about tablets +// Experimental, this interface and use may change func (s *Session) TabletsMetadata() (*TabletsMetadata, error) { // fail fast if s.Closed() { @@ -612,6 +613,7 @@ func (s *Session) getConn() *Conn { return nil } +// Experimental, this interface and use may change func (s *Session) getTablets() []*TabletInfo { s.ring.mu.Lock() defer s.ring.mu.Unlock() @@ -1218,6 +1220,10 @@ func (q *Query) Table() string { return q.routingInfo.table } +func (q *Query) GetSession() *Session { + return q.session +} + // GetRoutingKey gets the routing key to use for routing this query. If // a routing key has not been explicitly set, then the routing key will // be constructed if possible using the keyspace's schema and the query @@ -1877,6 +1883,10 @@ func (b *Batch) Table() string { return b.routingInfo.table } +func (b *Batch) GetSession() *Session { + return b.session +} + // Attempts returns the number of attempts made to execute the batch. func (b *Batch) Attempts() int { return b.metrics.attempts() diff --git a/tablet_integration_test.go b/tablet_integration_test.go index d26d271ec..8fd39e2b7 100644 --- a/tablet_integration_test.go +++ b/tablet_integration_test.go @@ -16,6 +16,7 @@ import ( // Check if TokenAwareHostPolicy works correctly when using tablets func TestTablets(t *testing.T) { cluster := createMultiNodeCluster() + cluster.enableExperimentalTablets() fallback := RoundRobinHostPolicy() cluster.PoolConfig.HostSelectionPolicy = TokenAwareHostPolicy(fallback) @@ -75,6 +76,7 @@ func TestTablets(t *testing.T) { // Check if shard awareness works correctly when using tablets func TestTabletsShardAwareness(t *testing.T) { cluster := createMultiNodeCluster() + cluster.enableExperimentalTablets() fallback := RoundRobinHostPolicy() cluster.PoolConfig.HostSelectionPolicy = TokenAwareHostPolicy(fallback) @@ -133,6 +135,7 @@ func TestTabletsShardAwareness(t *testing.T) { // Check if adding new table changes tablets table func TestTabletsRefresh(t *testing.T) { cluster := createMultiNodeCluster() + cluster.enableExperimentalTablets() fallback := RoundRobinHostPolicy() cluster.PoolConfig.HostSelectionPolicy = TokenAwareHostPolicy(fallback)