Skip to content

Commit

Permalink
feat(config-cache): initial stub for cluster config cache service
Browse files Browse the repository at this point in the history
Fixes #3802
  • Loading branch information
karol-kokoszka committed Apr 18, 2024
1 parent 80775bd commit 3247350
Show file tree
Hide file tree
Showing 3 changed files with 464 additions and 0 deletions.
12 changes: 12 additions & 0 deletions pkg/service/configcache/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright (C) 2024 ScyllaDB

package configcache

import "errors"

var (
// ErrNoClusterConfig is thrown when there is no config for given cluster in cache.
ErrNoClusterConfig = errors.New("no cluster config available")
// ErrNoHostConfig is thrown when there is no config for given host in cache.
ErrNoHostConfig = errors.New("no host config available")
)
223 changes: 223 additions & 0 deletions pkg/service/configcache/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
// Copyright (C) 2024 ScyllaDB

package configcache

import (
"context"
"crypto/tls"
"fmt"
"sync"
"time"

"github.com/pkg/errors"
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
"github.com/scylladb/scylla-manager/v3/pkg/secrets"
"github.com/scylladb/scylla-manager/v3/pkg/service"
"github.com/scylladb/scylla-manager/v3/pkg/service/cluster"
"github.com/scylladb/scylla-manager/v3/pkg/store"
"github.com/scylladb/scylla-manager/v3/pkg/util/uuid"
)

// ConnectionType defines an enum for different types of configuration.
type ConnectionType int

const (
// CQL defines cql connection type.
CQL ConnectionType = iota
// Alternator defines alternator connection type.
Alternator

updateFrequency = 30 * time.Minute
)

type tlsConfigWithAddress struct {
*tls.Config
Address string
}

// NodeConfig keeps the node current node configuration together with the TLS details per different type of connection.
type NodeConfig struct {
*scyllaclient.NodeInfo
TLSConfig map[ConnectionType]*tlsConfigWithAddress
}

// ConfigCacher is the interface defining the cache behavior.
type ConfigCacher interface {
Read(clusterID uuid.UUID, host string) (NodeConfig, error)
}

// Service is responsible for handling all cluster configuration cache related operations.
// Use svc.Read(clusterID, host) to read the configuration of particular host in given cluster.
// Use svc.Run() to let the cache update itself periodically with the current configuration.
//
// Current implementation doesn't care about changing IP, what means that the configuration for given cluster and host
// will stay there forever.
type Service struct {
clusterSvc cluster.Servicer
scyllaClient scyllaclient.ProviderFunc
secretsStore store.Store

configs sync.Map
}

// Read returns either the host configuration that is currently stored in the cache,
// ErrNoClusterConfig if config for the whole cluster doesn't exist,
// or ErrNoHostConfig if config of the particular host doesn't exist.
func (svc *Service) Read(clusterID uuid.UUID, host string) (NodeConfig, error) {
emptyConfig := NodeConfig{}

clusterConfig, err := svc.readClusterConfig(clusterID)
if err != nil {
return emptyConfig, err
}

rawHostConfig, ok := clusterConfig.Load(host)
if !ok {
return emptyConfig, ErrNoHostConfig
}
hostConfig, ok := rawHostConfig.(NodeConfig)
if !ok {
panic("cluster host emptyConfig cache stores unexpected type")
}
return hostConfig, nil
}

func (svc *Service) readClusterConfig(clusterID uuid.UUID) (*sync.Map, error) {
emptyConfig := &sync.Map{}

rawClusterConfig, ok := svc.configs.Load(clusterID)
if !ok {
return emptyConfig, ErrNoClusterConfig
}
clusterConfig, ok := rawClusterConfig.(*sync.Map)
if !ok {
panic("cluster cache emptyConfig stores unexpected type")
}
return clusterConfig, nil
}

// Run starts the infinity loop responsible for updating the clusters configuration periodically.
func (svc *Service) Run(ctx context.Context) {
freq := time.NewTicker(updateFrequency)

for {
// make sure to shut down when the context is cancelled
select {
case <-ctx.Done():
return
default:
}

select {
case <-freq.C:
svc.update(ctx)
case <-ctx.Done():
return
}
}
}

func (svc *Service) update(ctx context.Context) {
clusters, err := svc.clusterSvc.ListClusters(ctx, nil)
if err != nil {
fmt.Println(err)
}

for _, c := range clusters {
c := c
currentClusterConfig, err := svc.readClusterConfig(c.ID)
if err != nil && errors.Is(err, ErrNoClusterConfig) {
fmt.Println(err)
}

go func() {
client, err := svc.scyllaClient(ctx, c.ID)
if err != nil {
fmt.Println(err)
}

// Hosts that are going to be asked about the configuration are exactly the same as
// the ones used by the scylla client.
hostsWg := sync.WaitGroup{}
for _, host := range client.Config().Hosts {
hostsWg.Add(1)
host := host
go func() {
defer hostsWg.Done()

config, err := svc.retrieveClusterHostConfig(ctx, host, client, c)
if err != nil {
fmt.Println(err)
}
currentClusterConfig.Store(host, config)
}()
}
hostsWg.Wait()
}()
}
}

func (svc *Service) retrieveClusterHostConfig(ctx context.Context, host string, client *scyllaclient.Client,
c *cluster.Cluster,
) (*NodeConfig, error) {
config := &NodeConfig{}

nodeInfoResp, err := client.NodeInfo(ctx, host)
if err != nil {
return config, errors.Wrap(err, "fetch node info")
}

config.TLSConfig = make(map[ConnectionType]*tlsConfigWithAddress, 2)
for _, p := range []ConnectionType{CQL, Alternator} {
var tlsEnabled, clientCertAuth bool
var address string
if p == CQL {
address = nodeInfoResp.CQLAddr(host)
tlsEnabled, clientCertAuth = nodeInfoResp.CQLTLSEnabled()
tlsEnabled = tlsEnabled && !c.ForceTLSDisabled
if tlsEnabled && !c.ForceNonSSLSessionPort {
address = nodeInfoResp.CQLSSLAddr(host)
}
} else if p == Alternator {
tlsEnabled, clientCertAuth = nodeInfoResp.AlternatorTLSEnabled()
address = nodeInfoResp.AlternatorAddr(host)
}
if tlsEnabled {
tlsConfig, err := svc.tlsConfig(c.ID, clientCertAuth)
if err != nil && !errors.Is(err, service.ErrNotFound) {
return config, errors.Wrap(err, "fetch TLS config")
}
if clientCertAuth && errors.Is(err, service.ErrNotFound) {
return config, errors.Wrap(err, "client encryption is enabled, but certificate is missing")
}
config.TLSConfig[p] = &tlsConfigWithAddress{
Config: tlsConfig,
Address: address,
}
}
}

return config, nil
}

func (svc *Service) tlsConfig(clusterID uuid.UUID, clientCertAuth bool) (*tls.Config, error) {
cfg := tls.Config{
InsecureSkipVerify: true,
}

if clientCertAuth {
id := &secrets.TLSIdentity{
ClusterID: clusterID,
}
if err := svc.secretsStore.Get(id); err != nil {
return nil, errors.Wrap(err, "get SSL user cert from secrets store")
}
keyPair, err := tls.X509KeyPair(id.Cert, id.PrivateKey)
if err != nil {
return nil, errors.Wrap(err, "invalid SSL user key pair")
}
cfg.Certificates = []tls.Certificate{keyPair}
}

return &cfg, nil
}
Loading

0 comments on commit 3247350

Please sign in to comment.