From 00c128c5aacb2b14ec7bb685b69c7fb8c980092c Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Tue, 26 Nov 2024 13:33:38 +0100 Subject: [PATCH 1/8] Move tablets metadata from LBP and ring to schemaDescriber --- host_source.go | 12 ++++-------- metadata_scylla.go | 21 +++++++++++++++++++++ policies.go | 22 ++-------------------- ring.go | 9 --------- session.go | 8 ++------ 5 files changed, 29 insertions(+), 43 deletions(-) diff --git a/host_source.go b/host_source.go index e6639c55d..f7684fa9a 100644 --- a/host_source.go +++ b/host_source.go @@ -1065,8 +1065,7 @@ func (s *Session) addTablet(tablet *TabletInfo) error { tablets := s.getTablets() tablets = tablets.addTabletToTabletsList(tablet) - s.ring.setTablets(tablets) - s.policy.SetTablets(tablets) + s.schemaDescriber.setTablets(tablets) s.schemaDescriber.refreshTabletsSchema() @@ -1077,8 +1076,7 @@ func (s *Session) removeTabletsWithHost(host *HostInfo) error { tablets := s.getTablets() tablets = tablets.removeTabletsWithHostFromTabletsList(host) - s.ring.setTablets(tablets) - s.policy.SetTablets(tablets) + s.schemaDescriber.setTablets(tablets) s.schemaDescriber.refreshTabletsSchema() @@ -1089,8 +1087,7 @@ func (s *Session) removeTabletsWithKeyspace(keyspace string) error { tablets := s.getTablets() tablets = tablets.removeTabletsWithKeyspaceFromTabletsList(keyspace) - s.ring.setTablets(tablets) - s.policy.SetTablets(tablets) + s.schemaDescriber.setTablets(tablets) s.schemaDescriber.refreshTabletsSchema() @@ -1101,8 +1098,7 @@ func (s *Session) removeTabletsWithTable(keyspace string, table string) error { tablets := s.getTablets() tablets = tablets.removeTabletsWithTableFromTabletsList(keyspace, table) - s.ring.setTablets(tablets) - s.policy.SetTablets(tablets) + s.schemaDescriber.setTablets(tablets) s.schemaDescriber.refreshTabletsSchema() diff --git a/metadata_scylla.go b/metadata_scylla.go index a62a8809e..74205a4d3 100644 --- a/metadata_scylla.go +++ b/metadata_scylla.go @@ -237,11 +237,17 @@ func columnKindFromSchema(kind string) (ColumnKind, error) { } } +type Metadata struct { + tabletsMetadata cowTabletList +} + // queries the cluster for schema information for a specific keyspace and for tablets type schemaDescriber struct { session *Session mu sync.Mutex + metadata *Metadata + cache map[string]*KeyspaceMetadata tabletsCache *TabletsMetadata @@ -252,6 +258,7 @@ type schemaDescriber struct { func newSchemaDescriber(session *Session) *schemaDescriber { return &schemaDescriber{ session: session, + metadata: &Metadata{}, cache: map[string]*KeyspaceMetadata{}, tabletsCache: &TabletsMetadata{}, } @@ -305,6 +312,20 @@ func (s *schemaDescriber) refreshTabletsSchema() { } } +func (s *schemaDescriber) setTablets(tablets TabletInfoList) { + s.mu.Lock() + defer s.mu.Unlock() + + s.metadata.tabletsMetadata.set(tablets) +} + +func (s *schemaDescriber) getTablets() TabletInfoList { + s.mu.Lock() + defer s.mu.Unlock() + + return s.metadata.tabletsMetadata.get() +} + // clears the already cached keyspace metadata func (s *schemaDescriber) clearSchema(keyspaceName string) { s.mu.Lock() diff --git a/policies.go b/policies.go index 0d4d9393b..1a5595480 100644 --- a/policies.go +++ b/policies.go @@ -337,7 +337,6 @@ type HostTierer interface { type HostSelectionPolicy interface { HostStateNotifier SetPartitioner - SetTablets KeyspaceChanged(KeyspaceUpdateEvent) Init(*Session) // Reset is opprotunity to reset HostSelectionPolicy if Session initilization failed and we want to @@ -397,8 +396,6 @@ func (r *roundRobinHostPolicy) Init(*Session) {} func (r *roundRobinHostPolicy) Reset() {} func (r *roundRobinHostPolicy) IsOperational(*Session) error { return nil } -func (r *roundRobinHostPolicy) SetTablets(tablets TabletInfoList) {} - func (r *roundRobinHostPolicy) Pick(qry ExecutableQuery) NextHost { nextStartOffset := atomic.AddUint64(&r.lastUsedHostIdx, 1) return roundRobbin(int(nextStartOffset), r.hosts.get()) @@ -489,8 +486,6 @@ type tokenAwareHostPolicy struct { logger StdLogger - tablets cowTabletList - avoidSlowReplicas bool } @@ -575,13 +570,6 @@ func (t *tokenAwareHostPolicy) SetPartitioner(partitioner string) { } } -func (t *tokenAwareHostPolicy) SetTablets(tablets TabletInfoList) { - t.mu.Lock() - defer t.mu.Unlock() - - t.tablets.set(tablets) -} - func (t *tokenAwareHostPolicy) AddHost(host *HostInfo) { t.mu.Lock() if t.hosts.add(host) { @@ -701,8 +689,8 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost { var replicas []*HostInfo - if qry.GetSession() != nil && qry.GetSession().tabletsRoutingV1 { - tablets := t.tablets.get() + if session := qry.GetSession(); session != nil && session.tabletsRoutingV1 { + tablets := session.schemaDescriber.getTablets() // Search for tablets with Keyspace and Table from the Query l, r := tablets.findTablets(qry.Keyspace(), qry.Table()) @@ -862,8 +850,6 @@ func (r *hostPoolHostPolicy) KeyspaceChanged(KeyspaceUpdateEvent) {} func (r *hostPoolHostPolicy) SetPartitioner(string) {} func (r *hostPoolHostPolicy) IsLocal(*HostInfo) bool { return true } -func (r *hostPoolHostPolicy) SetTablets(tablets TabletInfoList) {} - func (r *hostPoolHostPolicy) SetHosts(hosts []*HostInfo) { peers := make([]string, len(hosts)) hostMap := make(map[string]*HostInfo, len(hosts)) @@ -1043,8 +1029,6 @@ func (d *dcAwareRR) IsLocal(host *HostInfo) bool { return host.DataCenter() == d.local } -func (d *dcAwareRR) SetTablets(tablets TabletInfoList) {} - func (d *dcAwareRR) AddHost(host *HostInfo) { if d.IsLocal(host) { d.localHosts.add(host) @@ -1169,8 +1153,6 @@ func (d *rackAwareRR) setDCFailoverDisabled() { d.disableDCFailover = true } -func (d *rackAwareRR) SetTablets(tablets TabletInfoList) {} - func (d *rackAwareRR) HostTier(host *HostInfo) uint { if host.DataCenter() == d.localDC { if host.Rack() == d.localRack { diff --git a/ring.go b/ring.go index 0a02bdefb..5b77370a1 100644 --- a/ring.go +++ b/ring.go @@ -22,8 +22,6 @@ type ring struct { hostList []*HostInfo pos uint32 - tabletList TabletInfoList - // TODO: we should store the ring metadata here also. } @@ -143,10 +141,3 @@ func (c *clusterMetadata) setPartitioner(partitioner string) { c.partitioner = partitioner } } - -func (r *ring) setTablets(newTablets TabletInfoList) { - r.mu.Lock() - defer r.mu.Unlock() - - r.tabletList = newTablets -} diff --git a/session.go b/session.go index a60a1206c..ec7ec2a8a 100644 --- a/session.go +++ b/session.go @@ -277,8 +277,7 @@ func (s *Session) init() error { if s.tabletsRoutingV1 { tablets := TabletInfoList{} - s.ring.setTablets(tablets) - s.policy.SetTablets(tablets) + s.schemaDescriber.setTablets(tablets) } } } @@ -638,10 +637,7 @@ func (s *Session) getConn() *Conn { } func (s *Session) getTablets() TabletInfoList { - s.ring.mu.Lock() - defer s.ring.mu.Unlock() - - return s.ring.tabletList + return s.schemaDescriber.getTablets() } // returns routing key indexes and type info From 81616fec7350093012e4cd669fab599b4a902e5e Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Tue, 26 Nov 2024 13:42:27 +0100 Subject: [PATCH 2/8] Remove duplication in tablet metadata representation --- host_source.go | 8 ------- metadata_scylla.go | 57 +++------------------------------------------- session.go | 4 ++-- 3 files changed, 5 insertions(+), 64 deletions(-) diff --git a/host_source.go b/host_source.go index f7684fa9a..eb2e23d5f 100644 --- a/host_source.go +++ b/host_source.go @@ -1067,8 +1067,6 @@ func (s *Session) addTablet(tablet *TabletInfo) error { s.schemaDescriber.setTablets(tablets) - s.schemaDescriber.refreshTabletsSchema() - return nil } @@ -1078,8 +1076,6 @@ func (s *Session) removeTabletsWithHost(host *HostInfo) error { s.schemaDescriber.setTablets(tablets) - s.schemaDescriber.refreshTabletsSchema() - return nil } @@ -1089,8 +1085,6 @@ func (s *Session) removeTabletsWithKeyspace(keyspace string) error { s.schemaDescriber.setTablets(tablets) - s.schemaDescriber.refreshTabletsSchema() - return nil } @@ -1100,8 +1094,6 @@ func (s *Session) removeTabletsWithTable(keyspace string, table string) error { s.schemaDescriber.setTablets(tablets) - s.schemaDescriber.refreshTabletsSchema() - return nil } diff --git a/metadata_scylla.go b/metadata_scylla.go index 74205a4d3..b1f8349a1 100644 --- a/metadata_scylla.go +++ b/metadata_scylla.go @@ -134,26 +134,6 @@ type IndexMetadata struct { Options map[string]string } -// TabletsMetadata holds metadata for tablet list -type TabletsMetadata struct { - Tablets []*TabletMetadata -} - -// TabletMetadata holds metadata for single tablet -type TabletMetadata struct { - KeyspaceName string - TableName string - FirstToken int64 - LastToken int64 - Replicas []ReplicaMetadata -} - -// TabletMetadata holds metadata for single replica -type ReplicaMetadata struct { - HostId UUID - ShardId int -} - const ( IndexKindCustom = "CUSTOM" ) @@ -249,18 +229,15 @@ type schemaDescriber struct { metadata *Metadata cache map[string]*KeyspaceMetadata - - tabletsCache *TabletsMetadata } // creates a session bound schema describer which will query and cache // keyspace metadata and tablets metadata func newSchemaDescriber(session *Session) *schemaDescriber { return &schemaDescriber{ - session: session, - metadata: &Metadata{}, - cache: map[string]*KeyspaceMetadata{}, - tabletsCache: &TabletsMetadata{}, + session: session, + metadata: &Metadata{}, + cache: map[string]*KeyspaceMetadata{}, } } @@ -284,34 +261,6 @@ func (s *schemaDescriber) getSchema(keyspaceName string) (*KeyspaceMetadata, err return metadata, nil } -func (s *schemaDescriber) getTabletsSchema() *TabletsMetadata { - s.mu.Lock() - defer s.mu.Unlock() - - metadata := s.tabletsCache - - return metadata -} - -func (s *schemaDescriber) refreshTabletsSchema() { - tablets := s.session.getTablets() - s.tabletsCache.Tablets = []*TabletMetadata{} - - for _, tablet := range tablets { - t := &TabletMetadata{} - t.KeyspaceName = tablet.KeyspaceName() - t.TableName = tablet.TableName() - t.FirstToken = tablet.FirstToken() - t.LastToken = tablet.LastToken() - t.Replicas = []ReplicaMetadata{} - for _, replica := range tablet.Replicas() { - t.Replicas = append(t.Replicas, ReplicaMetadata{replica.hostId, replica.shardId}) - } - - s.tabletsCache.Tablets = append(s.tabletsCache.Tablets, t) - } -} - func (s *schemaDescriber) setTablets(tablets TabletInfoList) { s.mu.Lock() defer s.mu.Unlock() diff --git a/session.go b/session.go index ec7ec2a8a..bc09fa4f6 100644 --- a/session.go +++ b/session.go @@ -607,7 +607,7 @@ func (s *Session) KeyspaceMetadata(keyspace string) (*KeyspaceMetadata, error) { } // TabletsMetadata returns the metadata about tablets -func (s *Session) TabletsMetadata() (*TabletsMetadata, error) { +func (s *Session) TabletsMetadata() (TabletInfoList, error) { // fail fast if s.Closed() { return nil, ErrSessionClosed @@ -615,7 +615,7 @@ func (s *Session) TabletsMetadata() (*TabletsMetadata, error) { return nil, ErrTabletsNotUsed } - return s.schemaDescriber.getTabletsSchema(), nil + return s.schemaDescriber.getTablets(), nil } func (s *Session) getConn() *Conn { From d14cf92d85a7356b305adcf2dffb298eee9d1563 Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Tue, 26 Nov 2024 13:44:37 +0100 Subject: [PATCH 3/8] Move cowTabletList implementation to tablets.go file --- policies.go | 27 --------------------------- tablets.go | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 27 deletions(-) create mode 100644 tablets.go diff --git a/policies.go b/policies.go index 1a5595480..955d2ecc8 100644 --- a/policies.go +++ b/policies.go @@ -94,33 +94,6 @@ func (c *cowHostList) remove(host *HostInfo) bool { return true } -// cowTabletList implements a copy on write tablet list, its equivalent type is TabletInfoList -type cowTabletList struct { - list atomic.Value - mu sync.Mutex -} - -func (c *cowTabletList) get() TabletInfoList { - l, ok := c.list.Load().(TabletInfoList) - if !ok { - return nil - } - return l -} - -func (c *cowTabletList) set(tablets TabletInfoList) { - c.mu.Lock() - defer c.mu.Unlock() - - n := len(tablets) - t := make(TabletInfoList, n) - for i := 0; i < n; i++ { - t[i] = tablets[i] - } - - c.list.Store(t) -} - // RetryableQuery is an interface that represents a query or batch statement that // exposes the correct functions for the retry policy logic to evaluate correctly. type RetryableQuery interface { diff --git a/tablets.go b/tablets.go new file mode 100644 index 000000000..31d4a2e0a --- /dev/null +++ b/tablets.go @@ -0,0 +1,33 @@ +package gocql + +import ( + "sync" + "sync/atomic" +) + +// cowTabletList implements a copy on write tablet list, its equivalent type is TabletInfoList +type cowTabletList struct { + list atomic.Value + mu sync.Mutex +} + +func (c *cowTabletList) get() TabletInfoList { + l, ok := c.list.Load().(TabletInfoList) + if !ok { + return nil + } + return l +} + +func (c *cowTabletList) set(tablets TabletInfoList) { + c.mu.Lock() + defer c.mu.Unlock() + + n := len(tablets) + t := make(TabletInfoList, n) + for i := 0; i < n; i++ { + t[i] = tablets[i] + } + + c.list.Store(t) +} From bd21a9be1954537c9cd01801c58edf88dd43ec91 Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Tue, 26 Nov 2024 13:46:39 +0100 Subject: [PATCH 4/8] Move TabletInfoList implementation to tablets.go file --- host_source.go | 174 ------------------------------------------------- tablets.go | 174 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 174 insertions(+), 174 deletions(-) diff --git a/host_source.go b/host_source.go index eb2e23d5f..887351963 100644 --- a/host_source.go +++ b/host_source.go @@ -504,180 +504,6 @@ func (h *HostInfo) ScyllaShardAwarePortTLS() uint16 { return h.scyllaShardAwarePortTLS } -type ReplicaInfo struct { - hostId UUID - shardId int -} - -type TabletInfo struct { - keyspaceName string - tableName string - firstToken int64 - lastToken int64 - replicas []ReplicaInfo -} - -func (t *TabletInfo) KeyspaceName() string { - return t.keyspaceName -} - -func (t *TabletInfo) FirstToken() int64 { - return t.firstToken -} - -func (t *TabletInfo) LastToken() int64 { - return t.lastToken -} - -func (t *TabletInfo) TableName() string { - return t.tableName -} - -func (t *TabletInfo) Replicas() []ReplicaInfo { - return t.replicas -} - -type TabletInfoList []*TabletInfo - -// Search for place in tablets table with specific Keyspace and Table name -func (t TabletInfoList) findTablets(keyspace string, table string) (int, int) { - l := -1 - r := -1 - for i, tablet := range t { - if tablet.KeyspaceName() == keyspace && tablet.TableName() == table { - if l == -1 { - l = i - } - r = i - } else if l != -1 { - break - } - } - - return l, r -} - -func (t TabletInfoList) addTabletToTabletsList(tablet *TabletInfo) TabletInfoList { - l, r := t.findTablets(tablet.keyspaceName, tablet.tableName) - if l == -1 && r == -1 { - l = 0 - r = 0 - } else { - r = r + 1 - } - - l1, r1 := l, r - l2, r2 := l1, r1 - - // find first overlaping range - for l1 < r1 { - mid := (l1 + r1) / 2 - if t[mid].FirstToken() < tablet.FirstToken() { - l1 = mid + 1 - } else { - r1 = mid - } - } - start := l1 - - if start > l && t[start-1].LastToken() > tablet.FirstToken() { - start = start - 1 - } - - // find last overlaping range - for l2 < r2 { - mid := (l2 + r2) / 2 - if t[mid].LastToken() < tablet.LastToken() { - l2 = mid + 1 - } else { - r2 = mid - } - } - end := l2 - if end < r && t[end].FirstToken() >= tablet.LastToken() { - end = end - 1 - } - if end == len(t) { - end = end - 1 - } - - updated_tablets := t - if start <= end { - // Delete elements from index start to end - updated_tablets = append(t[:start], t[end+1:]...) - } - // Insert tablet element at index start - t = append(updated_tablets[:start], append([]*TabletInfo{tablet}, updated_tablets[start:]...)...) - return t -} - -// Remove all tablets that have given host as a replica -func (t TabletInfoList) removeTabletsWithHostFromTabletsList(host *HostInfo) TabletInfoList { - filteredTablets := make([]*TabletInfo, 0, len(t)) // Preallocate for efficiency - - for _, tablet := range t { - // Check if any replica matches the given host ID - shouldExclude := false - for _, replica := range tablet.replicas { - if replica.hostId.String() == host.HostID() { - shouldExclude = true - break - } - } - if !shouldExclude { - filteredTablets = append(filteredTablets, tablet) - } - } - - t = filteredTablets - return t -} - -func (t TabletInfoList) removeTabletsWithKeyspaceFromTabletsList(keyspace string) TabletInfoList { - filteredTablets := make([]*TabletInfo, 0, len(t)) - - for _, tablet := range t { - if tablet.keyspaceName != keyspace { - filteredTablets = append(filteredTablets, tablet) - } - } - - t = filteredTablets - return t -} - -func (t TabletInfoList) removeTabletsWithTableFromTabletsList(keyspace string, table string) TabletInfoList { - filteredTablets := make([]*TabletInfo, 0, len(t)) - - for _, tablet := range t { - if !(tablet.keyspaceName == keyspace && tablet.tableName == table) { - filteredTablets = append(filteredTablets, tablet) - } - } - - t = filteredTablets - return t -} - -// Search for place in tablets table for token starting from index l to index r -func (t TabletInfoList) findTabletForToken(token Token, l int, r int) *TabletInfo { - for l < r { - var m int - if r*l > 0 { - m = l + (r-l)/2 - } else { - m = (r + l) / 2 - } - if int64Token(t[m].LastToken()).Less(token) { - l = m + 1 - } else { - r = m - } - } - - return t[l] -} - // Polls system.peers at a specific interval to find new hosts type ringDescriber struct { session *Session diff --git a/tablets.go b/tablets.go index 31d4a2e0a..a83313104 100644 --- a/tablets.go +++ b/tablets.go @@ -5,6 +5,180 @@ import ( "sync/atomic" ) +type ReplicaInfo struct { + hostId UUID + shardId int +} + +type TabletInfo struct { + keyspaceName string + tableName string + firstToken int64 + lastToken int64 + replicas []ReplicaInfo +} + +func (t *TabletInfo) KeyspaceName() string { + return t.keyspaceName +} + +func (t *TabletInfo) FirstToken() int64 { + return t.firstToken +} + +func (t *TabletInfo) LastToken() int64 { + return t.lastToken +} + +func (t *TabletInfo) TableName() string { + return t.tableName +} + +func (t *TabletInfo) Replicas() []ReplicaInfo { + return t.replicas +} + +type TabletInfoList []*TabletInfo + +// Search for place in tablets table with specific Keyspace and Table name +func (t TabletInfoList) findTablets(keyspace string, table string) (int, int) { + l := -1 + r := -1 + for i, tablet := range t { + if tablet.KeyspaceName() == keyspace && tablet.TableName() == table { + if l == -1 { + l = i + } + r = i + } else if l != -1 { + break + } + } + + return l, r +} + +func (t TabletInfoList) addTabletToTabletsList(tablet *TabletInfo) TabletInfoList { + l, r := t.findTablets(tablet.keyspaceName, tablet.tableName) + if l == -1 && r == -1 { + l = 0 + r = 0 + } else { + r = r + 1 + } + + l1, r1 := l, r + l2, r2 := l1, r1 + + // find first overlaping range + for l1 < r1 { + mid := (l1 + r1) / 2 + if t[mid].FirstToken() < tablet.FirstToken() { + l1 = mid + 1 + } else { + r1 = mid + } + } + start := l1 + + if start > l && t[start-1].LastToken() > tablet.FirstToken() { + start = start - 1 + } + + // find last overlaping range + for l2 < r2 { + mid := (l2 + r2) / 2 + if t[mid].LastToken() < tablet.LastToken() { + l2 = mid + 1 + } else { + r2 = mid + } + } + end := l2 + if end < r && t[end].FirstToken() >= tablet.LastToken() { + end = end - 1 + } + if end == len(t) { + end = end - 1 + } + + updated_tablets := t + if start <= end { + // Delete elements from index start to end + updated_tablets = append(t[:start], t[end+1:]...) + } + // Insert tablet element at index start + t = append(updated_tablets[:start], append([]*TabletInfo{tablet}, updated_tablets[start:]...)...) + return t +} + +// Remove all tablets that have given host as a replica +func (t TabletInfoList) removeTabletsWithHostFromTabletsList(host *HostInfo) TabletInfoList { + filteredTablets := make([]*TabletInfo, 0, len(t)) // Preallocate for efficiency + + for _, tablet := range t { + // Check if any replica matches the given host ID + shouldExclude := false + for _, replica := range tablet.replicas { + if replica.hostId.String() == host.HostID() { + shouldExclude = true + break + } + } + if !shouldExclude { + filteredTablets = append(filteredTablets, tablet) + } + } + + t = filteredTablets + return t +} + +func (t TabletInfoList) removeTabletsWithKeyspaceFromTabletsList(keyspace string) TabletInfoList { + filteredTablets := make([]*TabletInfo, 0, len(t)) + + for _, tablet := range t { + if tablet.keyspaceName != keyspace { + filteredTablets = append(filteredTablets, tablet) + } + } + + t = filteredTablets + return t +} + +func (t TabletInfoList) removeTabletsWithTableFromTabletsList(keyspace string, table string) TabletInfoList { + filteredTablets := make([]*TabletInfo, 0, len(t)) + + for _, tablet := range t { + if !(tablet.keyspaceName == keyspace && tablet.tableName == table) { + filteredTablets = append(filteredTablets, tablet) + } + } + + t = filteredTablets + return t +} + +// Search for place in tablets table for token starting from index l to index r +func (t TabletInfoList) findTabletForToken(token Token, l int, r int) *TabletInfo { + for l < r { + var m int + if r*l > 0 { + m = l + (r-l)/2 + } else { + m = (r + l) / 2 + } + if int64Token(t[m].LastToken()).Less(token) { + l = m + 1 + } else { + r = m + } + } + + return t[l] +} + // cowTabletList implements a copy on write tablet list, its equivalent type is TabletInfoList type cowTabletList struct { list atomic.Value From f5e1fcae051337b384e3968a2779b4a601138727 Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Tue, 26 Nov 2024 14:16:51 +0100 Subject: [PATCH 5/8] Make tablets addition/removal methods of schemaDescriber --- conn.go | 2 +- events.go | 4 ++-- host_source.go | 38 +------------------------------------- metadata_scylla.go | 36 ++++++++++++++++++++++++++++++++++++ 4 files changed, 40 insertions(+), 40 deletions(-) diff --git a/conn.go b/conn.go index 3cfad8f06..ec0846da1 100644 --- a/conn.go +++ b/conn.go @@ -1522,7 +1522,7 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) *Iter { tablet.keyspaceName = qry.routingInfo.keyspace tablet.tableName = qry.routingInfo.table - c.session.addTablet(&tablet) + c.session.schemaDescriber.addTablet(&tablet) } } diff --git a/events.go b/events.go index ef0d3a325..200c003df 100644 --- a/events.go +++ b/events.go @@ -128,14 +128,14 @@ func (s *Session) handleSchemaEvent(frames []frame) { func (s *Session) handleKeyspaceChange(keyspace, change string) { s.control.awaitSchemaAgreement() if change == "DROPPED" || change == "UPDATED" { - s.removeTabletsWithKeyspace(keyspace) + s.schemaDescriber.removeTabletsWithKeyspace(keyspace) } s.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: keyspace, Change: change}) } func (s *Session) handleTableChange(keyspace, table, change string) { if change == "DROPPED" || change == "UPDATED" { - s.removeTabletsWithTable(keyspace, table) + s.schemaDescriber.removeTabletsWithTable(keyspace, table) } } diff --git a/host_source.go b/host_source.go index 887351963..b71ac3139 100644 --- a/host_source.go +++ b/host_source.go @@ -877,7 +877,7 @@ func refreshRing(r *ringDescriber) error { } for _, host := range prevHosts { - r.session.removeTabletsWithHost(host) + r.session.schemaDescriber.removeTabletsWithHost(host) r.session.removeHost(host) } @@ -887,42 +887,6 @@ func refreshRing(r *ringDescriber) error { return nil } -func (s *Session) addTablet(tablet *TabletInfo) error { - tablets := s.getTablets() - tablets = tablets.addTabletToTabletsList(tablet) - - s.schemaDescriber.setTablets(tablets) - - return nil -} - -func (s *Session) removeTabletsWithHost(host *HostInfo) error { - tablets := s.getTablets() - tablets = tablets.removeTabletsWithHostFromTabletsList(host) - - s.schemaDescriber.setTablets(tablets) - - return nil -} - -func (s *Session) removeTabletsWithKeyspace(keyspace string) error { - tablets := s.getTablets() - tablets = tablets.removeTabletsWithKeyspaceFromTabletsList(keyspace) - - s.schemaDescriber.setTablets(tablets) - - return nil -} - -func (s *Session) removeTabletsWithTable(keyspace string, table string) error { - tablets := s.getTablets() - tablets = tablets.removeTabletsWithTableFromTabletsList(keyspace, table) - - s.schemaDescriber.setTablets(tablets) - - return nil -} - const ( ringRefreshDebounceTime = 1 * time.Second ) diff --git a/metadata_scylla.go b/metadata_scylla.go index b1f8349a1..3dabe87cf 100644 --- a/metadata_scylla.go +++ b/metadata_scylla.go @@ -275,6 +275,42 @@ func (s *schemaDescriber) getTablets() TabletInfoList { return s.metadata.tabletsMetadata.get() } +func (s *schemaDescriber) addTablet(tablet *TabletInfo) error { + tablets := s.getTablets() + tablets = tablets.addTabletToTabletsList(tablet) + + s.setTablets(tablets) + + return nil +} + +func (s *schemaDescriber) removeTabletsWithHost(host *HostInfo) error { + tablets := s.getTablets() + tablets = tablets.removeTabletsWithHostFromTabletsList(host) + + s.setTablets(tablets) + + return nil +} + +func (s *schemaDescriber) removeTabletsWithKeyspace(keyspace string) error { + tablets := s.getTablets() + tablets = tablets.removeTabletsWithKeyspaceFromTabletsList(keyspace) + + s.setTablets(tablets) + + return nil +} + +func (s *schemaDescriber) removeTabletsWithTable(keyspace string, table string) error { + tablets := s.getTablets() + tablets = tablets.removeTabletsWithTableFromTabletsList(keyspace, table) + + s.setTablets(tablets) + + return nil +} + // clears the already cached keyspace metadata func (s *schemaDescriber) clearSchema(keyspaceName string) { s.mu.Lock() From c2c1be40b839cca961fe0c21f3fe430d617c01ae Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Tue, 26 Nov 2024 16:06:44 +0100 Subject: [PATCH 6/8] Store keyspace metadata in copy-on-write map --- metadata_scylla.go | 71 +++++++++++++++++++++++++++++++++++++----- schema_queries_test.go | 6 +++- 2 files changed, 68 insertions(+), 9 deletions(-) diff --git a/metadata_scylla.go b/metadata_scylla.go index 3dabe87cf..d2d95472a 100644 --- a/metadata_scylla.go +++ b/metadata_scylla.go @@ -11,6 +11,7 @@ import ( "fmt" "strings" "sync" + "sync/atomic" ) // schema metadata for a keyspace @@ -134,6 +135,59 @@ type IndexMetadata struct { Options map[string]string } +// cowTabletList implements a copy on write keyspace metadata map, its equivalent type is map[string]*KeyspaceMetadata +type cowKeyspaceMetadataMap struct { + keyspaceMap atomic.Value + mu sync.Mutex +} + +func (c *cowKeyspaceMetadataMap) get() map[string]*KeyspaceMetadata { + l, ok := c.keyspaceMap.Load().(map[string]*KeyspaceMetadata) + if !ok { + return nil + } + return l +} + +func (c *cowKeyspaceMetadataMap) getKeyspace(keyspaceName string) (*KeyspaceMetadata, bool) { + m, ok := c.keyspaceMap.Load().(map[string]*KeyspaceMetadata) + if !ok { + return nil, ok + } + val, ok := m[keyspaceName] + return val, ok +} + +func (c *cowKeyspaceMetadataMap) set(keyspaceName string, keyspaceMetadata *KeyspaceMetadata) bool { + c.mu.Lock() + m := c.get() + + newM := map[string]*KeyspaceMetadata{} + for name, metadata := range m { + newM[name] = metadata + } + newM[keyspaceName] = keyspaceMetadata + + c.keyspaceMap.Store(newM) + c.mu.Unlock() + return true +} + +func (c *cowKeyspaceMetadataMap) remove(keyspaceName string) { + c.mu.Lock() + m := c.get() + + newM := map[string]*KeyspaceMetadata{} + for name, meta := range m { + if name != keyspaceName { + newM[name] = meta + } + } + + c.keyspaceMap.Store(newM) + c.mu.Unlock() +} + const ( IndexKindCustom = "CUSTOM" ) @@ -218,7 +272,8 @@ func columnKindFromSchema(kind string) (ColumnKind, error) { } type Metadata struct { - tabletsMetadata cowTabletList + tabletsMetadata cowTabletList + keyspaceMetadata cowKeyspaceMetadataMap } // queries the cluster for schema information for a specific keyspace and for tablets @@ -227,8 +282,6 @@ type schemaDescriber struct { mu sync.Mutex metadata *Metadata - - cache map[string]*KeyspaceMetadata } // creates a session bound schema describer which will query and cache @@ -237,7 +290,6 @@ func newSchemaDescriber(session *Session) *schemaDescriber { return &schemaDescriber{ session: session, metadata: &Metadata{}, - cache: map[string]*KeyspaceMetadata{}, } } @@ -247,7 +299,7 @@ func (s *schemaDescriber) getSchema(keyspaceName string) (*KeyspaceMetadata, err s.mu.Lock() defer s.mu.Unlock() - metadata, found := s.cache[keyspaceName] + metadata, found := s.metadata.keyspaceMetadata.getKeyspace(keyspaceName) if !found { // refresh the cache for this keyspace err := s.refreshSchema(keyspaceName) @@ -255,7 +307,10 @@ func (s *schemaDescriber) getSchema(keyspaceName string) (*KeyspaceMetadata, err return nil, err } - metadata = s.cache[keyspaceName] + metadata, found = s.metadata.keyspaceMetadata.getKeyspace(keyspaceName) + if !found { + return nil, fmt.Errorf("Metadata not found for keyspace: %s", keyspaceName) + } } return metadata, nil @@ -316,7 +371,7 @@ func (s *schemaDescriber) clearSchema(keyspaceName string) { s.mu.Lock() defer s.mu.Unlock() - delete(s.cache, keyspaceName) + s.metadata.keyspaceMetadata.remove(keyspaceName) } // forcibly updates the current KeyspaceMetadata held by the schema describer @@ -368,7 +423,7 @@ func (s *schemaDescriber) refreshSchema(keyspaceName string) error { compileMetadata(keyspace, tables, columns, functions, aggregates, types, indexes, views, createStmts) // update the cache - s.cache[keyspaceName] = keyspace + s.metadata.keyspaceMetadata.set(keyspaceName, keyspace) return nil } diff --git a/schema_queries_test.go b/schema_queries_test.go index 2b0c88171..62f10f02d 100644 --- a/schema_queries_test.go +++ b/schema_queries_test.go @@ -16,5 +16,9 @@ func TestSchemaQueries(t *testing.T) { session := createSessionFromCluster(cluster, t) defer session.Close() - assertTrue(t, "keyspace present in schemaDescriber", session.schemaDescriber.cache["gocql_test"].Name == "gocql_test") + keyspaceMetadata, err := session.schemaDescriber.getSchema("gocql_test") + if err != nil { + t.Fatal("unable to get keyspace metadata for keyspace: ", err) + } + assertTrue(t, "keyspace present in schemaDescriber", keyspaceMetadata.Name == "gocql_test") } From 4693b692ac17e0fb5464026623d7b2f7da42b13b Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Thu, 28 Nov 2024 11:28:19 +0100 Subject: [PATCH 7/8] Use RWMutex in copy-on-write tablet list instead of atomic.Value and Mutex --- tablets.go | 21 ++++++--------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/tablets.go b/tablets.go index a83313104..fd29cb655 100644 --- a/tablets.go +++ b/tablets.go @@ -2,7 +2,6 @@ package gocql import ( "sync" - "sync/atomic" ) type ReplicaInfo struct { @@ -181,27 +180,19 @@ func (t TabletInfoList) findTabletForToken(token Token, l int, r int) *TabletInf // cowTabletList implements a copy on write tablet list, its equivalent type is TabletInfoList type cowTabletList struct { - list atomic.Value - mu sync.Mutex + list TabletInfoList + mu sync.RWMutex } func (c *cowTabletList) get() TabletInfoList { - l, ok := c.list.Load().(TabletInfoList) - if !ok { - return nil - } - return l + c.mu.RLock() + defer c.mu.RUnlock() + return c.list } func (c *cowTabletList) set(tablets TabletInfoList) { c.mu.Lock() defer c.mu.Unlock() - n := len(tablets) - t := make(TabletInfoList, n) - for i := 0; i < n; i++ { - t[i] = tablets[i] - } - - c.list.Store(t) + c.list = tablets } From 6b25456998248024d3f426754a18bdd4d7a0cdcf Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Fri, 29 Nov 2024 14:49:42 +0100 Subject: [PATCH 8/8] Rename schemaDescriber to metadataDescriber --- conn.go | 2 +- events.go | 14 +++++++------- host_source.go | 2 +- metadata_scylla.go | 24 ++++++++++++------------ policies.go | 2 +- recreate_test.go | 2 +- schema_queries_test.go | 4 ++-- session.go | 12 ++++++------ 8 files changed, 31 insertions(+), 31 deletions(-) diff --git a/conn.go b/conn.go index ec0846da1..26757f509 100644 --- a/conn.go +++ b/conn.go @@ -1522,7 +1522,7 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) *Iter { tablet.keyspaceName = qry.routingInfo.keyspace tablet.tableName = qry.routingInfo.table - c.session.schemaDescriber.addTablet(&tablet) + c.session.metadataDescriber.addTablet(&tablet) } } diff --git a/events.go b/events.go index 200c003df..54ca5f4c5 100644 --- a/events.go +++ b/events.go @@ -110,17 +110,17 @@ func (s *Session) handleSchemaEvent(frames []frame) { for _, frame := range frames { switch f := frame.(type) { case *schemaChangeKeyspace: - s.schemaDescriber.clearSchema(f.keyspace) + s.metadataDescriber.clearSchema(f.keyspace) s.handleKeyspaceChange(f.keyspace, f.change) case *schemaChangeTable: - s.schemaDescriber.clearSchema(f.keyspace) + s.metadataDescriber.clearSchema(f.keyspace) s.handleTableChange(f.keyspace, f.object, f.change) case *schemaChangeAggregate: - s.schemaDescriber.clearSchema(f.keyspace) + s.metadataDescriber.clearSchema(f.keyspace) case *schemaChangeFunction: - s.schemaDescriber.clearSchema(f.keyspace) + s.metadataDescriber.clearSchema(f.keyspace) case *schemaChangeType: - s.schemaDescriber.clearSchema(f.keyspace) + s.metadataDescriber.clearSchema(f.keyspace) } } } @@ -128,14 +128,14 @@ func (s *Session) handleSchemaEvent(frames []frame) { func (s *Session) handleKeyspaceChange(keyspace, change string) { s.control.awaitSchemaAgreement() if change == "DROPPED" || change == "UPDATED" { - s.schemaDescriber.removeTabletsWithKeyspace(keyspace) + s.metadataDescriber.removeTabletsWithKeyspace(keyspace) } s.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: keyspace, Change: change}) } func (s *Session) handleTableChange(keyspace, table, change string) { if change == "DROPPED" || change == "UPDATED" { - s.schemaDescriber.removeTabletsWithTable(keyspace, table) + s.metadataDescriber.removeTabletsWithTable(keyspace, table) } } diff --git a/host_source.go b/host_source.go index b71ac3139..9b835e3a2 100644 --- a/host_source.go +++ b/host_source.go @@ -877,7 +877,7 @@ func refreshRing(r *ringDescriber) error { } for _, host := range prevHosts { - r.session.schemaDescriber.removeTabletsWithHost(host) + r.session.metadataDescriber.removeTabletsWithHost(host) r.session.removeHost(host) } diff --git a/metadata_scylla.go b/metadata_scylla.go index d2d95472a..13b5e60e3 100644 --- a/metadata_scylla.go +++ b/metadata_scylla.go @@ -277,7 +277,7 @@ type Metadata struct { } // queries the cluster for schema information for a specific keyspace and for tablets -type schemaDescriber struct { +type metadataDescriber struct { session *Session mu sync.Mutex @@ -286,8 +286,8 @@ type schemaDescriber struct { // creates a session bound schema describer which will query and cache // keyspace metadata and tablets metadata -func newSchemaDescriber(session *Session) *schemaDescriber { - return &schemaDescriber{ +func newMetadataDescriber(session *Session) *metadataDescriber { + return &metadataDescriber{ session: session, metadata: &Metadata{}, } @@ -295,7 +295,7 @@ func newSchemaDescriber(session *Session) *schemaDescriber { // returns the cached KeyspaceMetadata held by the describer for the named // keyspace. -func (s *schemaDescriber) getSchema(keyspaceName string) (*KeyspaceMetadata, error) { +func (s *metadataDescriber) getSchema(keyspaceName string) (*KeyspaceMetadata, error) { s.mu.Lock() defer s.mu.Unlock() @@ -316,21 +316,21 @@ func (s *schemaDescriber) getSchema(keyspaceName string) (*KeyspaceMetadata, err return metadata, nil } -func (s *schemaDescriber) setTablets(tablets TabletInfoList) { +func (s *metadataDescriber) setTablets(tablets TabletInfoList) { s.mu.Lock() defer s.mu.Unlock() s.metadata.tabletsMetadata.set(tablets) } -func (s *schemaDescriber) getTablets() TabletInfoList { +func (s *metadataDescriber) getTablets() TabletInfoList { s.mu.Lock() defer s.mu.Unlock() return s.metadata.tabletsMetadata.get() } -func (s *schemaDescriber) addTablet(tablet *TabletInfo) error { +func (s *metadataDescriber) addTablet(tablet *TabletInfo) error { tablets := s.getTablets() tablets = tablets.addTabletToTabletsList(tablet) @@ -339,7 +339,7 @@ func (s *schemaDescriber) addTablet(tablet *TabletInfo) error { return nil } -func (s *schemaDescriber) removeTabletsWithHost(host *HostInfo) error { +func (s *metadataDescriber) removeTabletsWithHost(host *HostInfo) error { tablets := s.getTablets() tablets = tablets.removeTabletsWithHostFromTabletsList(host) @@ -348,7 +348,7 @@ func (s *schemaDescriber) removeTabletsWithHost(host *HostInfo) error { return nil } -func (s *schemaDescriber) removeTabletsWithKeyspace(keyspace string) error { +func (s *metadataDescriber) removeTabletsWithKeyspace(keyspace string) error { tablets := s.getTablets() tablets = tablets.removeTabletsWithKeyspaceFromTabletsList(keyspace) @@ -357,7 +357,7 @@ func (s *schemaDescriber) removeTabletsWithKeyspace(keyspace string) error { return nil } -func (s *schemaDescriber) removeTabletsWithTable(keyspace string, table string) error { +func (s *metadataDescriber) removeTabletsWithTable(keyspace string, table string) error { tablets := s.getTablets() tablets = tablets.removeTabletsWithTableFromTabletsList(keyspace, table) @@ -367,7 +367,7 @@ func (s *schemaDescriber) removeTabletsWithTable(keyspace string, table string) } // clears the already cached keyspace metadata -func (s *schemaDescriber) clearSchema(keyspaceName string) { +func (s *metadataDescriber) clearSchema(keyspaceName string) { s.mu.Lock() defer s.mu.Unlock() @@ -376,7 +376,7 @@ func (s *schemaDescriber) clearSchema(keyspaceName string) { // forcibly updates the current KeyspaceMetadata held by the schema describer // for a given named keyspace. -func (s *schemaDescriber) refreshSchema(keyspaceName string) error { +func (s *metadataDescriber) refreshSchema(keyspaceName string) error { var err error // query the system keyspace for schema data diff --git a/policies.go b/policies.go index 955d2ecc8..0fec485b4 100644 --- a/policies.go +++ b/policies.go @@ -663,7 +663,7 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost { var replicas []*HostInfo if session := qry.GetSession(); session != nil && session.tabletsRoutingV1 { - tablets := session.schemaDescriber.getTablets() + tablets := session.metadataDescriber.getTablets() // Search for tablets with Keyspace and Table from the Query l, r := tablets.findTablets(qry.Keyspace(), qry.Table()) diff --git a/recreate_test.go b/recreate_test.go index 88bbe0f84..e21ce5bad 100644 --- a/recreate_test.go +++ b/recreate_test.go @@ -143,7 +143,7 @@ func TestRecreateSchema(t *testing.T) { // Exec dumped queries to check if they are CQL-correct cleanup(t, session, test.Keyspace) - session.schemaDescriber.clearSchema(test.Keyspace) + session.metadataDescriber.clearSchema(test.Keyspace) for _, q := range trimQueries(strings.Split(dump, ";")) { qr := session.Query(q, nil) diff --git a/schema_queries_test.go b/schema_queries_test.go index 62f10f02d..9281d2328 100644 --- a/schema_queries_test.go +++ b/schema_queries_test.go @@ -16,9 +16,9 @@ func TestSchemaQueries(t *testing.T) { session := createSessionFromCluster(cluster, t) defer session.Close() - keyspaceMetadata, err := session.schemaDescriber.getSchema("gocql_test") + keyspaceMetadata, err := session.metadataDescriber.getSchema("gocql_test") if err != nil { t.Fatal("unable to get keyspace metadata for keyspace: ", err) } - assertTrue(t, "keyspace present in schemaDescriber", keyspaceMetadata.Name == "gocql_test") + assertTrue(t, "keyspace present in metadataDescriber", keyspaceMetadata.Name == "gocql_test") } diff --git a/session.go b/session.go index bc09fa4f6..9830e736f 100644 --- a/session.go +++ b/session.go @@ -35,7 +35,7 @@ type Session struct { pageSize int prefetch float64 routingKeyInfoCache routingKeyInfoLRU - schemaDescriber *schemaDescriber + metadataDescriber *metadataDescriber trace Tracer queryObserver QueryObserver batchObserver BatchObserver @@ -144,7 +144,7 @@ func NewSession(cfg ClusterConfig) (*Session, error) { } }() - s.schemaDescriber = newSchemaDescriber(s) + s.metadataDescriber = newMetadataDescriber(s) s.nodeEvents = newEventDebouncer("NodeEvents", s.handleNodeEvent, s.logger) s.schemaEvents = newEventDebouncer("SchemaEvents", s.handleSchemaEvent, s.logger) @@ -277,7 +277,7 @@ func (s *Session) init() error { if s.tabletsRoutingV1 { tablets := TabletInfoList{} - s.schemaDescriber.setTablets(tablets) + s.metadataDescriber.setTablets(tablets) } } } @@ -603,7 +603,7 @@ func (s *Session) KeyspaceMetadata(keyspace string) (*KeyspaceMetadata, error) { return nil, ErrNoKeyspace } - return s.schemaDescriber.getSchema(keyspace) + return s.metadataDescriber.getSchema(keyspace) } // TabletsMetadata returns the metadata about tablets @@ -615,7 +615,7 @@ func (s *Session) TabletsMetadata() (TabletInfoList, error) { return nil, ErrTabletsNotUsed } - return s.schemaDescriber.getTablets(), nil + return s.metadataDescriber.getTablets(), nil } func (s *Session) getConn() *Conn { @@ -637,7 +637,7 @@ func (s *Session) getConn() *Conn { } func (s *Session) getTablets() TabletInfoList { - return s.schemaDescriber.getTablets() + return s.metadataDescriber.getTablets() } // returns routing key indexes and type info