Skip to content

Commit

Permalink
feat(config-cache): initial stub for cluster config cache service
Browse files Browse the repository at this point in the history
  • Loading branch information
karol-kokoszka committed Apr 15, 2024
1 parent 1634cd1 commit 41e3c5b
Show file tree
Hide file tree
Showing 2 changed files with 233 additions and 0 deletions.
8 changes: 8 additions & 0 deletions pkg/service/configcache/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package configcache

import "errors"

var (
ErrNoClusterConfig = errors.New("no cluster config available")
ErrNoHostConfig = errors.New("no host config available")
)
225 changes: 225 additions & 0 deletions pkg/service/configcache/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
package configcache

import (
"context"
"crypto/tls"
"sync"
"time"

"github.com/pkg/errors"
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
"github.com/scylladb/scylla-manager/v3/pkg/secrets"
"github.com/scylladb/scylla-manager/v3/pkg/service"
"github.com/scylladb/scylla-manager/v3/pkg/service/cluster"
"github.com/scylladb/scylla-manager/v3/pkg/store"
"github.com/scylladb/scylla-manager/v3/pkg/util/uuid"
)

type ConnectionType int

const (
CQL ConnectionType = iota
Alternator

updateFrequency = 30 * time.Minute
)

type tlsConfigWithAddress struct {
*tls.Config
Address string
}

type NodeConfig struct {
*scyllaclient.NodeInfo
TLSConfig map[ConnectionType]*tlsConfigWithAddress
}

type ConfigCacher interface {
Read(clusterID uuid.UUID, host string)
}

// Service is responsible for handling all cluster configuration cache related operations.
// Use svc.Read(clusterID, host) to read the configuration of particular host in given cluster.
// Use svc.Run() to let the cache update itself periodically with the current configuration.
//
// Current implementation doesn't care about changing IP, what means that the configuration for given cluster and host
// will stay there forever.
//
// TODO: add constructor
// TODO: pass logger and use it to log errors
// TODO: add logic saying if the cache is ready after the first run
// TODO: extend integration tests of healtcheck to simulate scenario with unresponsive agent API
// TODO: implement tests assuring that the configuration is continuously being updated
// TODO: start the cache service on manager start
// TODO: add panic recovery
// TODO: redirect cache hits in healtcheck to use new service
// TODO: remove old references in healtcheck service
// TODO: update cache hitting across whole codebase
type Service struct {
clusterSvc cluster.Service
scyllaClient scyllaclient.ProviderFunc
secretsStore store.Store

// maps clusterID (uuid.UUID) to configcache.ClusterConfig
configs sync.Map
}

// Read returns either the host configuration that is currently stored in the cache,
// ErrNoClusterConfig if config for the whole cluster doesn't exist,
// or ErrNoHostConfig if config of the particular host doesn't exist.
func (svc *Service) Read(clusterID uuid.UUID, host string) (NodeConfig, error) {
emptyConfig := NodeConfig{}

clusterConfig, err := svc.readClusterConfig(clusterID)
if err != nil {
return emptyConfig, err
}

rawHostConfig, ok := clusterConfig.Load(host)
if !ok {
return emptyConfig, ErrNoHostConfig
}
hostConfig, ok := rawHostConfig.(NodeConfig)
if !ok {
panic("cluster host emptyConfig cache stores unexpected type")
}
return hostConfig, nil
}

func (svc *Service) readClusterConfig(clusterID uuid.UUID) (*sync.Map, error) {
emptyConfig := &sync.Map{}

rawClusterConfig, ok := svc.configs.Load(clusterID)
if !ok {
return emptyConfig, ErrNoClusterConfig
}
clusterConfig, ok := rawClusterConfig.(*sync.Map)
if !ok {
panic("cluster cache emptyConfig stores unexpected type")
}
return clusterConfig, nil
}

func (svc *Service) Run(ctx context.Context) {
freq := time.NewTicker(updateFrequency)

for {
// make sure to shut down when the context is cancelled
select {
case <-ctx.Done():
return
default:
}

select {
case <-freq.C:
svc.update(ctx)
}
}
}

func (svc *Service) update(ctx context.Context) {
clusters, err := svc.clusterSvc.ListClusters(ctx, nil)
if err != nil {
// TODO: log it
// return errors.Wrapf(err, "cannot update config cache clusters list")
}

for _, c := range clusters {
c := c
currentClusterConfig, err := svc.readClusterConfig(c.ID)
if err != nil && err != ErrNoClusterConfig {
// TODO: log it
}

go func() {
client, err := svc.scyllaClient(ctx, c.ID)
if err != nil {
// TODO: log it
}

// hosts that are going to be asked about the configuration are exactly the same as
// the ones used by the scylla client
hostsWg := sync.WaitGroup{}
for _, host := range client.Config().Hosts {
hostsWg.Add(1)
host := host
go func() {
defer hostsWg.Done()

config, err := svc.retrieveClusterHostConfig(ctx, host, client, c)
if err != nil {
// TODO: log it
}
currentClusterConfig.Store(host, config)
}()
}
hostsWg.Wait()
}()
}
}

func (svc *Service) retrieveClusterHostConfig(ctx context.Context, host string, client *scyllaclient.Client,
c *cluster.Cluster) (*NodeConfig, error) {

config := &NodeConfig{}

nodeInfoResp, err := client.NodeInfo(ctx, host)
if err != nil {
return config, errors.Wrap(err, "fetch node info")
}

config.TLSConfig = make(map[ConnectionType]*tlsConfigWithAddress, 2)
for _, p := range []ConnectionType{CQL, Alternator} {
var tlsEnabled, clientCertAuth bool
var address string
if p == CQL {
address = nodeInfoResp.CQLAddr(host)
tlsEnabled, clientCertAuth = nodeInfoResp.CQLTLSEnabled()
tlsEnabled = tlsEnabled && !c.ForceTLSDisabled
if tlsEnabled && !c.ForceNonSSLSessionPort {
address = nodeInfoResp.CQLSSLAddr(host)
}
} else if p == Alternator {
tlsEnabled, clientCertAuth = nodeInfoResp.AlternatorTLSEnabled()
address = nodeInfoResp.AlternatorAddr(host)
}
if tlsEnabled {
tlsConfig, err := svc.tlsConfig(c.ID, clientCertAuth)
if err != nil && !errors.Is(err, service.ErrNotFound) {
return config, errors.Wrap(err, "fetch TLS config")
}
if clientCertAuth && errors.Is(err, service.ErrNotFound) {
return config, errors.Wrap(err, "client encryption is enabled, but certificate is missing")
}
config.TLSConfig[p] = &tlsConfigWithAddress{
Config: tlsConfig,
Address: address,
}
}
}

return config, nil
}

func (svc *Service) tlsConfig(clusterID uuid.UUID, clientCertAuth bool) (*tls.Config, error) {
cfg := tls.Config{
InsecureSkipVerify: true,
}

if clientCertAuth {
id := &secrets.TLSIdentity{
ClusterID: clusterID,
}
if err := svc.secretsStore.Get(id); err != nil {
return nil, errors.Wrap(err, "get SSL user cert from secrets store")
}
keyPair, err := tls.X509KeyPair(id.Cert, id.PrivateKey)
if err != nil {
return nil, errors.Wrap(err, "invalid SSL user key pair")
}
cfg.Certificates = []tls.Certificate{keyPair}
}

return &cfg, nil
}

0 comments on commit 41e3c5b

Please sign in to comment.