Skip to content

Commit

Permalink
Store keyspace metadata in copy-on-write map
Browse files Browse the repository at this point in the history
  • Loading branch information
sylwiaszunejko committed Nov 28, 2024
1 parent f5e1fca commit 1cf82d9
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 9 deletions.
71 changes: 63 additions & 8 deletions metadata_scylla.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"fmt"
"strings"
"sync"
"sync/atomic"
)

// schema metadata for a keyspace
Expand Down Expand Up @@ -134,6 +135,59 @@ type IndexMetadata struct {
Options map[string]string
}

// cowTabletList implements a copy on write keyspace metadata map, its equivalent type is map[string]*KeyspaceMetadata
type cowKeyspaceMetadataMap struct {
keyspaceMap atomic.Value
mu sync.Mutex
}

func (c *cowKeyspaceMetadataMap) get() map[string]*KeyspaceMetadata {
l, ok := c.keyspaceMap.Load().(map[string]*KeyspaceMetadata)
if !ok {
return nil
}
return l
}

func (c *cowKeyspaceMetadataMap) getKeyspace(keyspaceName string) (*KeyspaceMetadata, bool) {
m, ok := c.keyspaceMap.Load().(map[string]*KeyspaceMetadata)
if !ok {
return nil, ok
}
val, ok := m[keyspaceName]
return val, ok
}

func (c *cowKeyspaceMetadataMap) set(keyspaceName string, keyspaceMetadata *KeyspaceMetadata) bool {
c.mu.Lock()
m := c.get()

newM := map[string]*KeyspaceMetadata{}
for name, metadata := range m {
newM[name] = metadata
}
newM[keyspaceName] = keyspaceMetadata

c.keyspaceMap.Store(newM)
c.mu.Unlock()
return true
}

func (c *cowKeyspaceMetadataMap) remove(keyspaceName string) {
c.mu.Lock()
m := c.get()

newM := map[string]*KeyspaceMetadata{}
for name, meta := range m {
if name != keyspaceName {
newM[name] = meta
}
}

c.keyspaceMap.Store(newM)
c.mu.Unlock()
}

const (
IndexKindCustom = "CUSTOM"
)
Expand Down Expand Up @@ -218,7 +272,8 @@ func columnKindFromSchema(kind string) (ColumnKind, error) {
}

type Metadata struct {
tabletsMetadata cowTabletList
tabletsMetadata cowTabletList
keyspaceMetadata cowKeyspaceMetadataMap
}

// queries the cluster for schema information for a specific keyspace and for tablets
Expand All @@ -227,8 +282,6 @@ type schemaDescriber struct {
mu sync.Mutex

metadata *Metadata

cache map[string]*KeyspaceMetadata
}

// creates a session bound schema describer which will query and cache
Expand All @@ -237,7 +290,6 @@ func newSchemaDescriber(session *Session) *schemaDescriber {
return &schemaDescriber{
session: session,
metadata: &Metadata{},
cache: map[string]*KeyspaceMetadata{},
}
}

Expand All @@ -247,15 +299,18 @@ func (s *schemaDescriber) getSchema(keyspaceName string) (*KeyspaceMetadata, err
s.mu.Lock()
defer s.mu.Unlock()

metadata, found := s.cache[keyspaceName]
metadata, found := s.metadata.keyspaceMetadata.getKeyspace(keyspaceName)
if !found {
// refresh the cache for this keyspace
err := s.refreshSchema(keyspaceName)
if err != nil {
return nil, err
}

metadata = s.cache[keyspaceName]
metadata, found = s.metadata.keyspaceMetadata.getKeyspace(keyspaceName)
if !found {
return nil, fmt.Errorf("Metadata not found for keyspace: %s", keyspaceName)
}
}

return metadata, nil
Expand Down Expand Up @@ -316,7 +371,7 @@ func (s *schemaDescriber) clearSchema(keyspaceName string) {
s.mu.Lock()
defer s.mu.Unlock()

delete(s.cache, keyspaceName)
s.metadata.keyspaceMetadata.remove(keyspaceName)
}

// forcibly updates the current KeyspaceMetadata held by the schema describer
Expand Down Expand Up @@ -368,7 +423,7 @@ func (s *schemaDescriber) refreshSchema(keyspaceName string) error {
compileMetadata(keyspace, tables, columns, functions, aggregates, types, indexes, views, createStmts)

// update the cache
s.cache[keyspaceName] = keyspace
s.metadata.keyspaceMetadata.set(keyspaceName, keyspace)

return nil
}
Expand Down
6 changes: 5 additions & 1 deletion schema_queries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,9 @@ func TestSchemaQueries(t *testing.T) {
session := createSessionFromCluster(cluster, t)
defer session.Close()

assertTrue(t, "keyspace present in schemaDescriber", session.schemaDescriber.cache["gocql_test"].Name == "gocql_test")
keyspaceMetadata, ok := session.schemaDescriber.getSchema("gocql_test")
if !ok {
t.Fatalf("unable to get keyspace metadata for keyspace: %s", "gocql_test")
}
assertTrue(t, "keyspace present in schemaDescriber", keyspaceMetadata.Name == "gocql_test")
}

0 comments on commit 1cf82d9

Please sign in to comment.