-
Notifications
You must be signed in to change notification settings - Fork 34
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(config-cache): initial stub for cluster config cache service
- Loading branch information
1 parent
1634cd1
commit e14498f
Showing
2 changed files
with
228 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
package configcache | ||
|
||
import "errors" | ||
|
||
var ( | ||
ErrNoClusterConfig = errors.New("no cluster config available") | ||
ErrNoHostConfig = errors.New("no host config available") | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,220 @@ | ||
package configcache | ||
|
||
import ( | ||
"context" | ||
"crypto/tls" | ||
"sync" | ||
"time" | ||
|
||
"github.com/pkg/errors" | ||
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient" | ||
"github.com/scylladb/scylla-manager/v3/pkg/secrets" | ||
"github.com/scylladb/scylla-manager/v3/pkg/service" | ||
"github.com/scylladb/scylla-manager/v3/pkg/service/cluster" | ||
"github.com/scylladb/scylla-manager/v3/pkg/store" | ||
"github.com/scylladb/scylla-manager/v3/pkg/util/uuid" | ||
) | ||
|
||
// ConnectionType defines an enum for different types of configuration | ||
type ConnectionType int | ||
|
||
const ( | ||
CQL ConnectionType = iota | ||
Alternator | ||
|
||
updateFrequency = 30 * time.Minute | ||
) | ||
|
||
type tlsConfigWithAddress struct { | ||
*tls.Config | ||
Address string | ||
} | ||
|
||
// NodeConfig keeps the node current node configuration together with the TLS details per different type of connection. | ||
type NodeConfig struct { | ||
*scyllaclient.NodeInfo | ||
TLSConfig map[ConnectionType]*tlsConfigWithAddress | ||
} | ||
|
||
// ConfigCacher is the interface defining the cache behavior. | ||
type ConfigCacher interface { | ||
Read(clusterID uuid.UUID, host string) (NodeConfig, error) | ||
} | ||
|
||
// Service is responsible for handling all cluster configuration cache related operations. | ||
// Use svc.Read(clusterID, host) to read the configuration of particular host in given cluster. | ||
// Use svc.Run() to let the cache update itself periodically with the current configuration. | ||
// | ||
// Current implementation doesn't care about changing IP, what means that the configuration for given cluster and host | ||
// will stay there forever. | ||
type Service struct { | ||
clusterSvc cluster.Service | ||
scyllaClient scyllaclient.ProviderFunc | ||
secretsStore store.Store | ||
|
||
// maps clusterID (uuid.UUID) to configcache.ClusterConfig | ||
configs sync.Map | ||
} | ||
|
||
// Read returns either the host configuration that is currently stored in the cache, | ||
// ErrNoClusterConfig if config for the whole cluster doesn't exist, | ||
// or ErrNoHostConfig if config of the particular host doesn't exist. | ||
func (svc *Service) Read(clusterID uuid.UUID, host string) (NodeConfig, error) { | ||
emptyConfig := NodeConfig{} | ||
|
||
clusterConfig, err := svc.readClusterConfig(clusterID) | ||
if err != nil { | ||
return emptyConfig, err | ||
} | ||
|
||
rawHostConfig, ok := clusterConfig.Load(host) | ||
if !ok { | ||
return emptyConfig, ErrNoHostConfig | ||
} | ||
hostConfig, ok := rawHostConfig.(NodeConfig) | ||
if !ok { | ||
panic("cluster host emptyConfig cache stores unexpected type") | ||
} | ||
return hostConfig, nil | ||
} | ||
|
||
func (svc *Service) readClusterConfig(clusterID uuid.UUID) (*sync.Map, error) { | ||
emptyConfig := &sync.Map{} | ||
|
||
rawClusterConfig, ok := svc.configs.Load(clusterID) | ||
if !ok { | ||
return emptyConfig, ErrNoClusterConfig | ||
} | ||
clusterConfig, ok := rawClusterConfig.(*sync.Map) | ||
if !ok { | ||
panic("cluster cache emptyConfig stores unexpected type") | ||
} | ||
return clusterConfig, nil | ||
} | ||
|
||
// Run starts the infinity loop responsible for updating the clusters configuration periodically. | ||
func (svc *Service) Run(ctx context.Context) { | ||
freq := time.NewTicker(updateFrequency) | ||
|
||
for { | ||
// make sure to shut down when the context is cancelled | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
default: | ||
} | ||
|
||
select { | ||
case <-freq.C: | ||
svc.update(ctx) | ||
case <-ctx.Done(): | ||
return | ||
} | ||
} | ||
} | ||
|
||
func (svc *Service) update(ctx context.Context) { | ||
clusters, err := svc.clusterSvc.ListClusters(ctx, nil) | ||
if err != nil { | ||
// TODO: log it | ||
// return errors.Wrapf(err, "cannot update config cache clusters list") | ||
} | ||
|
||
for _, c := range clusters { | ||
c := c | ||
currentClusterConfig, err := svc.readClusterConfig(c.ID) | ||
if err != nil && err != ErrNoClusterConfig { | ||
// TODO: log it | ||
} | ||
|
||
go func() { | ||
client, err := svc.scyllaClient(ctx, c.ID) | ||
if err != nil { | ||
// TODO: log it | ||
} | ||
|
||
// hosts that are going to be asked about the configuration are exactly the same as | ||
// the ones used by the scylla client | ||
hostsWg := sync.WaitGroup{} | ||
for _, host := range client.Config().Hosts { | ||
hostsWg.Add(1) | ||
host := host | ||
go func() { | ||
defer hostsWg.Done() | ||
|
||
config, err := svc.retrieveClusterHostConfig(ctx, host, client, c) | ||
if err != nil { | ||
// TODO: log it | ||
} | ||
currentClusterConfig.Store(host, config) | ||
}() | ||
} | ||
hostsWg.Wait() | ||
}() | ||
} | ||
} | ||
|
||
func (svc *Service) retrieveClusterHostConfig(ctx context.Context, host string, client *scyllaclient.Client, | ||
c *cluster.Cluster) (*NodeConfig, error) { | ||
|
||
config := &NodeConfig{} | ||
|
||
nodeInfoResp, err := client.NodeInfo(ctx, host) | ||
if err != nil { | ||
return config, errors.Wrap(err, "fetch node info") | ||
} | ||
|
||
config.TLSConfig = make(map[ConnectionType]*tlsConfigWithAddress, 2) | ||
for _, p := range []ConnectionType{CQL, Alternator} { | ||
var tlsEnabled, clientCertAuth bool | ||
var address string | ||
if p == CQL { | ||
address = nodeInfoResp.CQLAddr(host) | ||
tlsEnabled, clientCertAuth = nodeInfoResp.CQLTLSEnabled() | ||
tlsEnabled = tlsEnabled && !c.ForceTLSDisabled | ||
if tlsEnabled && !c.ForceNonSSLSessionPort { | ||
address = nodeInfoResp.CQLSSLAddr(host) | ||
} | ||
} else if p == Alternator { | ||
tlsEnabled, clientCertAuth = nodeInfoResp.AlternatorTLSEnabled() | ||
address = nodeInfoResp.AlternatorAddr(host) | ||
} | ||
if tlsEnabled { | ||
tlsConfig, err := svc.tlsConfig(c.ID, clientCertAuth) | ||
if err != nil && !errors.Is(err, service.ErrNotFound) { | ||
return config, errors.Wrap(err, "fetch TLS config") | ||
} | ||
if clientCertAuth && errors.Is(err, service.ErrNotFound) { | ||
return config, errors.Wrap(err, "client encryption is enabled, but certificate is missing") | ||
} | ||
config.TLSConfig[p] = &tlsConfigWithAddress{ | ||
Config: tlsConfig, | ||
Address: address, | ||
} | ||
} | ||
} | ||
|
||
return config, nil | ||
} | ||
|
||
func (svc *Service) tlsConfig(clusterID uuid.UUID, clientCertAuth bool) (*tls.Config, error) { | ||
cfg := tls.Config{ | ||
InsecureSkipVerify: true, | ||
} | ||
|
||
if clientCertAuth { | ||
id := &secrets.TLSIdentity{ | ||
ClusterID: clusterID, | ||
} | ||
if err := svc.secretsStore.Get(id); err != nil { | ||
return nil, errors.Wrap(err, "get SSL user cert from secrets store") | ||
} | ||
keyPair, err := tls.X509KeyPair(id.Cert, id.PrivateKey) | ||
if err != nil { | ||
return nil, errors.Wrap(err, "invalid SSL user key pair") | ||
} | ||
cfg.Certificates = []tls.Certificate{keyPair} | ||
} | ||
|
||
return &cfg, nil | ||
} |