Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make schemaDescriber solely owner of tablets metadata #356

Merged
merged 8 commits into from
Nov 29, 2024
2 changes: 1 addition & 1 deletion conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1522,7 +1522,7 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) *Iter {
tablet.keyspaceName = qry.routingInfo.keyspace
tablet.tableName = qry.routingInfo.table

c.session.addTablet(&tablet)
c.session.metadataDescriber.addTablet(&tablet)
}
}

Expand Down
14 changes: 7 additions & 7 deletions events.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,32 +110,32 @@ func (s *Session) handleSchemaEvent(frames []frame) {
for _, frame := range frames {
switch f := frame.(type) {
case *schemaChangeKeyspace:
s.schemaDescriber.clearSchema(f.keyspace)
s.metadataDescriber.clearSchema(f.keyspace)
s.handleKeyspaceChange(f.keyspace, f.change)
case *schemaChangeTable:
s.schemaDescriber.clearSchema(f.keyspace)
s.metadataDescriber.clearSchema(f.keyspace)
s.handleTableChange(f.keyspace, f.object, f.change)
case *schemaChangeAggregate:
s.schemaDescriber.clearSchema(f.keyspace)
s.metadataDescriber.clearSchema(f.keyspace)
case *schemaChangeFunction:
s.schemaDescriber.clearSchema(f.keyspace)
s.metadataDescriber.clearSchema(f.keyspace)
case *schemaChangeType:
s.schemaDescriber.clearSchema(f.keyspace)
s.metadataDescriber.clearSchema(f.keyspace)
}
}
}

func (s *Session) handleKeyspaceChange(keyspace, change string) {
s.control.awaitSchemaAgreement()
if change == "DROPPED" || change == "UPDATED" {
s.removeTabletsWithKeyspace(keyspace)
s.metadataDescriber.removeTabletsWithKeyspace(keyspace)
}
s.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: keyspace, Change: change})
}

func (s *Session) handleTableChange(keyspace, table, change string) {
if change == "DROPPED" || change == "UPDATED" {
s.removeTabletsWithTable(keyspace, table)
s.metadataDescriber.removeTabletsWithTable(keyspace, table)
}
}

Expand Down
224 changes: 1 addition & 223 deletions host_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,180 +504,6 @@ func (h *HostInfo) ScyllaShardAwarePortTLS() uint16 {
return h.scyllaShardAwarePortTLS
}

type ReplicaInfo struct {
hostId UUID
shardId int
}

type TabletInfo struct {
keyspaceName string
tableName string
firstToken int64
lastToken int64
replicas []ReplicaInfo
}

func (t *TabletInfo) KeyspaceName() string {
return t.keyspaceName
}

func (t *TabletInfo) FirstToken() int64 {
return t.firstToken
}

func (t *TabletInfo) LastToken() int64 {
return t.lastToken
}

func (t *TabletInfo) TableName() string {
return t.tableName
}

func (t *TabletInfo) Replicas() []ReplicaInfo {
return t.replicas
}

type TabletInfoList []*TabletInfo

// Search for place in tablets table with specific Keyspace and Table name
func (t TabletInfoList) findTablets(keyspace string, table string) (int, int) {
l := -1
r := -1
for i, tablet := range t {
if tablet.KeyspaceName() == keyspace && tablet.TableName() == table {
if l == -1 {
l = i
}
r = i
} else if l != -1 {
break
}
}

return l, r
}

func (t TabletInfoList) addTabletToTabletsList(tablet *TabletInfo) TabletInfoList {
l, r := t.findTablets(tablet.keyspaceName, tablet.tableName)
if l == -1 && r == -1 {
l = 0
r = 0
} else {
r = r + 1
}

l1, r1 := l, r
l2, r2 := l1, r1

// find first overlaping range
for l1 < r1 {
mid := (l1 + r1) / 2
if t[mid].FirstToken() < tablet.FirstToken() {
l1 = mid + 1
} else {
r1 = mid
}
}
start := l1

if start > l && t[start-1].LastToken() > tablet.FirstToken() {
start = start - 1
}

// find last overlaping range
for l2 < r2 {
mid := (l2 + r2) / 2
if t[mid].LastToken() < tablet.LastToken() {
l2 = mid + 1
} else {
r2 = mid
}
}
end := l2
if end < r && t[end].FirstToken() >= tablet.LastToken() {
end = end - 1
}
if end == len(t) {
end = end - 1
}

updated_tablets := t
if start <= end {
// Delete elements from index start to end
updated_tablets = append(t[:start], t[end+1:]...)
}
// Insert tablet element at index start
t = append(updated_tablets[:start], append([]*TabletInfo{tablet}, updated_tablets[start:]...)...)
return t
}

// Remove all tablets that have given host as a replica
func (t TabletInfoList) removeTabletsWithHostFromTabletsList(host *HostInfo) TabletInfoList {
filteredTablets := make([]*TabletInfo, 0, len(t)) // Preallocate for efficiency

for _, tablet := range t {
// Check if any replica matches the given host ID
shouldExclude := false
for _, replica := range tablet.replicas {
if replica.hostId.String() == host.HostID() {
shouldExclude = true
break
}
}
if !shouldExclude {
filteredTablets = append(filteredTablets, tablet)
}
}

t = filteredTablets
return t
}

func (t TabletInfoList) removeTabletsWithKeyspaceFromTabletsList(keyspace string) TabletInfoList {
filteredTablets := make([]*TabletInfo, 0, len(t))

for _, tablet := range t {
if tablet.keyspaceName != keyspace {
filteredTablets = append(filteredTablets, tablet)
}
}

t = filteredTablets
return t
}

func (t TabletInfoList) removeTabletsWithTableFromTabletsList(keyspace string, table string) TabletInfoList {
filteredTablets := make([]*TabletInfo, 0, len(t))

for _, tablet := range t {
if !(tablet.keyspaceName == keyspace && tablet.tableName == table) {
filteredTablets = append(filteredTablets, tablet)
}
}

t = filteredTablets
return t
}

// Search for place in tablets table for token starting from index l to index r
func (t TabletInfoList) findTabletForToken(token Token, l int, r int) *TabletInfo {
for l < r {
var m int
if r*l > 0 {
m = l + (r-l)/2
} else {
m = (r + l) / 2
}
if int64Token(t[m].LastToken()).Less(token) {
l = m + 1
} else {
r = m
}
}

return t[l]
}

// Polls system.peers at a specific interval to find new hosts
type ringDescriber struct {
session *Session
Expand Down Expand Up @@ -1051,7 +877,7 @@ func refreshRing(r *ringDescriber) error {
}

for _, host := range prevHosts {
r.session.removeTabletsWithHost(host)
r.session.metadataDescriber.removeTabletsWithHost(host)
r.session.removeHost(host)
}

Expand All @@ -1061,54 +887,6 @@ func refreshRing(r *ringDescriber) error {
return nil
}

func (s *Session) addTablet(tablet *TabletInfo) error {
tablets := s.getTablets()
tablets = tablets.addTabletToTabletsList(tablet)

s.ring.setTablets(tablets)
s.policy.SetTablets(tablets)

s.schemaDescriber.refreshTabletsSchema()

return nil
}

func (s *Session) removeTabletsWithHost(host *HostInfo) error {
tablets := s.getTablets()
tablets = tablets.removeTabletsWithHostFromTabletsList(host)

s.ring.setTablets(tablets)
s.policy.SetTablets(tablets)

s.schemaDescriber.refreshTabletsSchema()

return nil
}

func (s *Session) removeTabletsWithKeyspace(keyspace string) error {
tablets := s.getTablets()
tablets = tablets.removeTabletsWithKeyspaceFromTabletsList(keyspace)

s.ring.setTablets(tablets)
s.policy.SetTablets(tablets)

s.schemaDescriber.refreshTabletsSchema()

return nil
}

func (s *Session) removeTabletsWithTable(keyspace string, table string) error {
tablets := s.getTablets()
tablets = tablets.removeTabletsWithTableFromTabletsList(keyspace, table)

s.ring.setTablets(tablets)
s.policy.SetTablets(tablets)

s.schemaDescriber.refreshTabletsSchema()

return nil
}

const (
ringRefreshDebounceTime = 1 * time.Second
)
Expand Down
Loading
Loading