Skip to content

Commit

Permalink
Wrap tablets introduction in experimental setting
Browse files Browse the repository at this point in the history
Tablets are still experimental in ScyllaDB and
changes in the tablets format are possible.
  • Loading branch information
sylwiaszunejko committed Aug 16, 2023
1 parent 4e51b4b commit d2f016f
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 39 deletions.
8 changes: 8 additions & 0 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,9 @@ type ClusterConfig struct {
// internal config for testing
disableControlConn bool
disableInit bool

// If true tablet feature is enabled
experimentalTabletsEnabled bool
}

type Dialer interface {
Expand Down Expand Up @@ -277,6 +280,7 @@ func NewCluster(hosts ...string) *ClusterConfig {
ConvictionPolicy: &SimpleConvictionPolicy{},
ReconnectionPolicy: &ConstantReconnectionPolicy{MaxRetries: 3, Interval: 1 * time.Second},
WriteCoalesceWaitTime: 200 * time.Microsecond,
experimentalTabletsEnabled: false,
}

return cfg
Expand Down Expand Up @@ -314,6 +318,10 @@ func (cfg *ClusterConfig) filterHost(host *HostInfo) bool {
return !(cfg.HostFilter == nil || cfg.HostFilter.Accept(host))
}

func (cfg *ClusterConfig) EnableExperimentalTablets() {
cfg.experimentalTabletsEnabled = true
}

var (
ErrNoHosts = errors.New("no hosts provided")
ErrNoConnectionsStarted = errors.New("no connections were made when creating the session")
Expand Down
1 change: 1 addition & 0 deletions connectionpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type SetPartitioner interface {
}

// interface to implement to receive the tablets value
// Experimental, this interface and use may change
type SetTablets interface {
SetTablets(tablets []*TabletInfo)
}
Expand Down
12 changes: 11 additions & 1 deletion host_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,11 +472,13 @@ func (h *HostInfo) ScyllaShardAwarePortTLS() uint16 {
return h.scyllaShardAwarePortTLS
}

// Experimental, this interface and use may change
type ReplicaInfo struct {
hostId UUID
shardId int
}

// Experimental, this interface and use may change
type TabletInfo struct {
mu sync.RWMutex
keyspaceName string
Expand Down Expand Up @@ -576,7 +578,8 @@ type ringDescriber struct {
mu sync.Mutex
prevHosts []*HostInfo
prevPartitioner string
prevTablets []*TabletInfo
// Experimental, this interface and use may change
prevTablets []*TabletInfo
}

// Returns true if we are using system_schema.keyspaces instead of system.schema_keyspaces
Expand Down Expand Up @@ -719,6 +722,7 @@ func (s *Session) hostInfoFromMap(row map[string]interface{}, host *HostInfo) (*

// Given a map that represents a row from system.tablets
// return as much information as we can in *TabletInfo
// Experimental, this interface and use may change
func (s *Session) tabletInfoFromMap(row map[string]interface{}, tablet *TabletInfo) (*TabletInfo, error) {
const assertErrorMsg = "Missing column %s in tablet description"
var ok bool
Expand Down Expand Up @@ -884,6 +888,7 @@ func (r *ringDescriber) getClusterPeerInfo(localHost *HostInfo) ([]*HostInfo, er
}

// Ask the control node for info about tablets
// Experimental, this interface and use may change
func (r *ringDescriber) getSystemTabletsInfo() ([]*TabletInfo, error) {
if r.session.control == nil {
return nil, errNoControl
Expand Down Expand Up @@ -921,6 +926,7 @@ func (r *ringDescriber) getSystemTabletsInfo() ([]*TabletInfo, error) {
}

// Return true if the tablet is valid
// Experimental, this interface and use may change
func isValidTablet(tablet *TabletInfo) bool {
return tablet.replicas != nil && len(tablet.replicas) != 0 && tablet.tableName != ""
}
Expand Down Expand Up @@ -959,6 +965,7 @@ func (r *ringDescriber) GetHosts() ([]*HostInfo, string, error) {
}

// GetTablets returns a list of tablets found via queries to system.tablets
// Experimental, this interface and use may change
func (r *ringDescriber) GetTablets() ([]*TabletInfo, error) {
r.mu.Lock()
defer r.mu.Unlock()
Expand All @@ -973,6 +980,7 @@ func (r *ringDescriber) GetTablets() ([]*TabletInfo, error) {
}

// True if experimental feature "tablets" is enabled and it is possible to query system.tablets
// Experimental, this interface and use may change
func (r *ringDescriber) UsesTablets() bool {
_, err := r.GetTablets()
if err != nil {
Expand Down Expand Up @@ -1091,6 +1099,7 @@ func refreshRing(r *ringDescriber) error {
return nil
}

// Experimental, this interface and use may change
func refreshTablets(r *ringDescriber) error {
tablets, err := r.GetTablets()
if err != nil {
Expand Down Expand Up @@ -1216,6 +1225,7 @@ func (d *refreshDebouncer) stop() {
}

// used to refresh tablets every x seconds
// Experimental, this interface and use may change
type tabletRefresher struct {
mu sync.Mutex
ticker *time.Ticker
Expand Down
7 changes: 7 additions & 0 deletions metadata_scylla.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,13 @@ type IndexMetadata struct {
}

// TabletsMetadata holds metadata for tablet list
// Experimental, this interface and use may change
type TabletsMetadata struct {
Tablets []*TabletMetadata
}

// TabletMetadata holds metadata for single tablet
// Experimental, this interface and use may change
type TabletMetadata struct {
KeyspaceName string
TableId UUID
Expand All @@ -151,6 +153,7 @@ type TabletMetadata struct {
}

// TabletMetadata holds metadata for single replica
// Experimental, this interface and use may change
type ReplicaMetadata struct {
HostId UUID
ShardId int
Expand Down Expand Up @@ -245,6 +248,8 @@ type schemaDescriber struct {
mu sync.Mutex

cache map[string]*KeyspaceMetadata

// Experimental, this interface and use may change
tabletsCache *TabletsMetadata
}

Expand Down Expand Up @@ -278,6 +283,7 @@ func (s *schemaDescriber) getSchema(keyspaceName string) (*KeyspaceMetadata, err
return metadata, nil
}

// Experimental, this interface and use may change
func (s *schemaDescriber) getTabletsSchema() *TabletsMetadata {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -287,6 +293,7 @@ func (s *schemaDescriber) getTabletsSchema() *TabletsMetadata {
return metadata
}

// Experimental, this interface and use may change
func (s *schemaDescriber) refreshTabletsSchema() {
tablets := s.session.getTablets()
s.tabletsCache.Tablets = []*TabletMetadata{}
Expand Down
76 changes: 50 additions & 26 deletions policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func (c *cowHostList) remove(ip net.IP) bool {
}

// cowTabletList implements a copy on write tablet list, its equivalent type is []*TabletInfo
// Experimental, this interface and use may change
type cowTabletList struct {
list atomic.Value
mu sync.Mutex
Expand Down Expand Up @@ -306,6 +307,7 @@ type HostTierer interface {
type HostSelectionPolicy interface {
HostStateNotifier
SetPartitioner
// Experimental, this interface and use may change
SetTablets
KeyspaceChanged(KeyspaceUpdateEvent)
Init(*Session)
Expand Down Expand Up @@ -357,9 +359,11 @@ type roundRobinHostPolicy struct {
func (r *roundRobinHostPolicy) IsLocal(*HostInfo) bool { return true }
func (r *roundRobinHostPolicy) KeyspaceChanged(KeyspaceUpdateEvent) {}
func (r *roundRobinHostPolicy) SetPartitioner(partitioner string) {}
func (r *roundRobinHostPolicy) SetTablets(tablets []*TabletInfo) {}
func (r *roundRobinHostPolicy) Init(*Session) {}

// Experimental, this interface and use may change
func (r *roundRobinHostPolicy) SetTablets(tablets []*TabletInfo) {}

func (r *roundRobinHostPolicy) Pick(qry ExecutableQuery) NextHost {
nextStartOffset := atomic.AddUint64(&r.lastUsedHostIdx, 1)
return roundRobbin(int(nextStartOffset), r.hosts.get())
Expand Down Expand Up @@ -437,6 +441,7 @@ type tokenAwareHostPolicy struct {

logger StdLogger

// Experimental, this interface and use may change
tablets cowTabletList
}

Expand Down Expand Up @@ -504,6 +509,7 @@ func (t *tokenAwareHostPolicy) SetPartitioner(partitioner string) {
}
}

// Experimental, this interface and use may change
func (t *tokenAwareHostPolicy) SetTablets(tablets []*TabletInfo) {
t.mu.Lock()
defer t.mu.Unlock()
Expand Down Expand Up @@ -630,27 +636,44 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost {

var replicas []*HostInfo

t.tablets.mu.Lock()
tablets := t.tablets.get()
if qry.GetSession() != nil && qry.GetSession().cfg.experimentalTabletsEnabled {
t.tablets.mu.Lock()
tablets := t.tablets.get()

// Search for tablets with Keyspace and Table from the Query
l := findTablets(tablets, qry.Keyspace(), qry.Table())

if l != -1 {
tablet := findTabletForToken(tablets, token, l)

replicas = []*HostInfo{}
for _, replica := range tablet.Replicas() {
t.hosts.mu.Lock()
hosts := t.hosts.get()
for _, host := range hosts {
if host.hostId == replica.hostId.String() {
replicas = append(replicas, host)
break
if l != -1 {
tablet := findTabletForToken(tablets, token, l)

replicas = []*HostInfo{}
for _, replica := range tablet.Replicas() {
t.hosts.mu.Lock()
hosts := t.hosts.get()
for _, host := range hosts {
if host.hostId == replica.hostId.String() {
replicas = append(replicas, host)
break
}
}
t.hosts.mu.Unlock()
}
} else {
ht := meta.replicas[qry.Keyspace()].replicasFor(token)

if ht == nil {
host, _ := meta.tokenRing.GetHostForToken(token)
replicas = []*HostInfo{host}
} else {
replicas = ht.hosts
}
t.hosts.mu.Unlock()
}

if t.shuffleReplicas && !qry.IsLWT() {
replicas = shuffleHosts(replicas)
}

t.tablets.mu.Unlock()
} else {
ht := meta.replicas[qry.Keyspace()].replicasFor(token)

Expand All @@ -659,15 +682,12 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost {
replicas = []*HostInfo{host}
} else {
replicas = ht.hosts
if t.shuffleReplicas && !qry.IsLWT() {
replicas = shuffleHosts(replicas)
}
}
}

if t.shuffleReplicas && !qry.IsLWT() {
replicas = shuffleHosts(replicas)
}

t.tablets.mu.Unlock()

var (
fallbackIter NextHost
i, j, k int
Expand Down Expand Up @@ -775,9 +795,11 @@ type hostPoolHostPolicy struct {
func (r *hostPoolHostPolicy) Init(*Session) {}
func (r *hostPoolHostPolicy) KeyspaceChanged(KeyspaceUpdateEvent) {}
func (r *hostPoolHostPolicy) SetPartitioner(string) {}
func (r *hostPoolHostPolicy) SetTablets(tablets []*TabletInfo) {}
func (r *hostPoolHostPolicy) IsLocal(*HostInfo) bool { return true }

// Experimental, this interface and use may change
func (r *hostPoolHostPolicy) SetTablets(tablets []*TabletInfo) {}

func (r *hostPoolHostPolicy) SetHosts(hosts []*HostInfo) {
peers := make([]string, len(hosts))
hostMap := make(map[string]*HostInfo, len(hosts))
Expand Down Expand Up @@ -912,12 +934,13 @@ func DCAwareRoundRobinPolicy(localDC string) HostSelectionPolicy {
func (d *dcAwareRR) Init(*Session) {}
func (d *dcAwareRR) KeyspaceChanged(KeyspaceUpdateEvent) {}
func (d *dcAwareRR) SetPartitioner(p string) {}
func (d *dcAwareRR) SetTablets(tablets []*TabletInfo) {}

func (d *dcAwareRR) IsLocal(host *HostInfo) bool {
return host.DataCenter() == d.local
}

// Experimental, this interface and use may change
func (d *dcAwareRR) SetTablets(tablets []*TabletInfo) {}

func (d *dcAwareRR) AddHost(host *HostInfo) {
if d.IsLocal(host) {
d.localHosts.add(host)
Expand Down Expand Up @@ -1006,12 +1029,13 @@ func RackAwareRoundRobinPolicy(localDC string, localRack string) HostSelectionPo
func (d *rackAwareRR) Init(*Session) {}
func (d *rackAwareRR) KeyspaceChanged(KeyspaceUpdateEvent) {}
func (d *rackAwareRR) SetPartitioner(p string) {}
func (d *rackAwareRR) SetTablets(tablets []*TabletInfo) {}

func (d *rackAwareRR) MaxHostTier() uint {
return 2
}

// Experimental, this interface and use may change
func (d *rackAwareRR) SetTablets(tablets []*TabletInfo) {}

func (d *rackAwareRR) HostTier(host *HostInfo) uint {
if host.DataCenter() == d.localDC {
if host.Rack() == d.localRack {
Expand Down
2 changes: 2 additions & 0 deletions query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type ExecutableQuery interface {
withContext(context.Context) ExecutableQuery

RetryableQuery

GetSession() *Session
}

type queryExecutor struct {
Expand Down
2 changes: 2 additions & 0 deletions ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type ring struct {
hostList []*HostInfo
pos uint32

// Experimental, this interface and use may change
tabletList []*TabletInfo

// TODO: we should store the ring metadata here also.
Expand Down Expand Up @@ -144,6 +145,7 @@ func (c *clusterMetadata) setPartitioner(partitioner string) {
}
}

// Experimental, this interface and use may change
func (r *ring) setTablets(newTablets []*TabletInfo) {
r.mu.Lock()
defer r.mu.Unlock()
Expand Down
22 changes: 12 additions & 10 deletions scylla.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,22 +333,24 @@ func (p *scyllaConnPicker) Pick(t token, keyspace string, table string) *Conn {
return nil
}

tablets := p.conns[0].session.getTablets()

idx := -1

// Search for tablets with Keyspace and Table from the Query
l := findTablets(tablets, keyspace, table)
if p.conns[0].session.cfg.experimentalTabletsEnabled {
tablets := p.conns[0].session.getTablets()

// Search for tablets with Keyspace and Table from the Query
l := findTablets(tablets, keyspace, table)

if l != -1 {
tablet := findTabletForToken(tablets, mmt, l)
if l != -1 {
tablet := findTabletForToken(tablets, mmt, l)

for _, replica := range tablet.replicas {
if replica.hostId.String() == p.hostId {
idx = replica.shardId
for _, replica := range tablet.replicas {
if replica.hostId.String() == p.hostId {
idx = replica.shardId
}
}
}
}
}
if idx == -1 {
idx = p.shardOf(mmt)
}
Expand Down
Loading

0 comments on commit d2f016f

Please sign in to comment.