-
Notifications
You must be signed in to change notification settings - Fork 34
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(config-cache): initial stub for cluster config cache service #3803
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
// Copyright (C) 2024 ScyllaDB | ||
|
||
package configcache | ||
|
||
import "errors" | ||
|
||
var ( | ||
// ErrNoClusterConfig is thrown when there is no config for given cluster in cache. | ||
ErrNoClusterConfig = errors.New("no cluster config available") | ||
// ErrNoHostConfig is thrown when there is no config for given host in cache. | ||
ErrNoHostConfig = errors.New("no host config available") | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,223 @@ | ||
// Copyright (C) 2024 ScyllaDB | ||
|
||
package configcache | ||
|
||
import ( | ||
"context" | ||
"crypto/tls" | ||
"fmt" | ||
"sync" | ||
"time" | ||
|
||
"github.com/pkg/errors" | ||
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient" | ||
"github.com/scylladb/scylla-manager/v3/pkg/secrets" | ||
"github.com/scylladb/scylla-manager/v3/pkg/service" | ||
"github.com/scylladb/scylla-manager/v3/pkg/service/cluster" | ||
"github.com/scylladb/scylla-manager/v3/pkg/store" | ||
"github.com/scylladb/scylla-manager/v3/pkg/util/uuid" | ||
) | ||
|
||
// ConnectionType defines an enum for different types of configuration. | ||
type ConnectionType int | ||
|
||
const ( | ||
// CQL defines cql connection type. | ||
CQL ConnectionType = iota | ||
// Alternator defines alternator connection type. | ||
Alternator | ||
|
||
updateFrequency = 5 * time.Minute | ||
) | ||
|
||
type tlsConfigWithAddress struct { | ||
*tls.Config | ||
Address string | ||
} | ||
|
||
// NodeConfig keeps the node current node configuration together with the TLS details per different type of connection. | ||
type NodeConfig struct { | ||
*scyllaclient.NodeInfo | ||
TLSConfig map[ConnectionType]*tlsConfigWithAddress | ||
} | ||
|
||
// ConfigCacher is the interface defining the cache behavior. | ||
type ConfigCacher interface { | ||
Read(clusterID uuid.UUID, host string) (NodeConfig, error) | ||
} | ||
|
||
// Service is responsible for handling all cluster configuration cache related operations. | ||
// Use svc.Read(clusterID, host) to read the configuration of particular host in given cluster. | ||
// Use svc.Run() to let the cache update itself periodically with the current configuration. | ||
// | ||
// Current implementation doesn't care about changing IP, what means that the configuration for given cluster and host | ||
// will stay there forever. | ||
type Service struct { | ||
clusterSvc cluster.Servicer | ||
scyllaClient scyllaclient.ProviderFunc | ||
secretsStore store.Store | ||
|
||
configs sync.Map | ||
} | ||
|
||
// Read returns either the host configuration that is currently stored in the cache, | ||
// ErrNoClusterConfig if config for the whole cluster doesn't exist, | ||
// or ErrNoHostConfig if config of the particular host doesn't exist. | ||
func (svc *Service) Read(clusterID uuid.UUID, host string) (NodeConfig, error) { | ||
emptyConfig := NodeConfig{} | ||
|
||
clusterConfig, err := svc.readClusterConfig(clusterID) | ||
if err != nil { | ||
return emptyConfig, err | ||
} | ||
|
||
rawHostConfig, ok := clusterConfig.Load(host) | ||
if !ok { | ||
return emptyConfig, ErrNoHostConfig | ||
} | ||
hostConfig, ok := rawHostConfig.(NodeConfig) | ||
if !ok { | ||
panic("cluster host emptyConfig cache stores unexpected type") | ||
} | ||
return hostConfig, nil | ||
} | ||
|
||
func (svc *Service) readClusterConfig(clusterID uuid.UUID) (*sync.Map, error) { | ||
emptyConfig := &sync.Map{} | ||
|
||
rawClusterConfig, ok := svc.configs.Load(clusterID) | ||
if !ok { | ||
return emptyConfig, ErrNoClusterConfig | ||
} | ||
clusterConfig, ok := rawClusterConfig.(*sync.Map) | ||
if !ok { | ||
panic("cluster cache emptyConfig stores unexpected type") | ||
} | ||
return clusterConfig, nil | ||
} | ||
|
||
// Run starts the infinity loop responsible for updating the clusters configuration periodically. | ||
func (svc *Service) Run(ctx context.Context) { | ||
freq := time.NewTicker(updateFrequency) | ||
|
||
for { | ||
// make sure to shut down when the context is cancelled | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
default: | ||
} | ||
|
||
select { | ||
case <-freq.C: | ||
svc.update(ctx) | ||
case <-ctx.Done(): | ||
return | ||
} | ||
} | ||
} | ||
|
||
func (svc *Service) update(ctx context.Context) { | ||
clusters, err := svc.clusterSvc.ListClusters(ctx, nil) | ||
if err != nil { | ||
fmt.Println(err) | ||
} | ||
|
||
for _, c := range clusters { | ||
c := c | ||
currentClusterConfig, err := svc.readClusterConfig(c.ID) | ||
if err != nil && errors.Is(err, ErrNoClusterConfig) { | ||
fmt.Println(err) | ||
} | ||
|
||
go func() { | ||
client, err := svc.scyllaClient(ctx, c.ID) | ||
if err != nil { | ||
fmt.Println(err) | ||
} | ||
|
||
// Hosts that are going to be asked about the configuration are exactly the same as | ||
// the ones used by the scylla client. | ||
hostsWg := sync.WaitGroup{} | ||
Comment on lines
+133
to
+141
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it would be safer to wait for all updates before returning from this function. Otherwise it's possible that a single cluster will be updated in parallel by different calls to update function. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, you are right and it's addressed in the PR fixing the next issue. Let me merge this PR to feature branch and you can continue with the review of next PR. |
||
for _, host := range client.Config().Hosts { | ||
hostsWg.Add(1) | ||
host := host | ||
go func() { | ||
defer hostsWg.Done() | ||
|
||
config, err := svc.retrieveClusterHostConfig(ctx, host, client, c) | ||
if err != nil { | ||
fmt.Println(err) | ||
} | ||
currentClusterConfig.Store(host, config) | ||
}() | ||
} | ||
hostsWg.Wait() | ||
}() | ||
} | ||
} | ||
|
||
func (svc *Service) retrieveClusterHostConfig(ctx context.Context, host string, client *scyllaclient.Client, | ||
c *cluster.Cluster, | ||
) (*NodeConfig, error) { | ||
config := &NodeConfig{} | ||
|
||
nodeInfoResp, err := client.NodeInfo(ctx, host) | ||
if err != nil { | ||
return config, errors.Wrap(err, "fetch node info") | ||
} | ||
|
||
config.TLSConfig = make(map[ConnectionType]*tlsConfigWithAddress, 2) | ||
for _, p := range []ConnectionType{CQL, Alternator} { | ||
var tlsEnabled, clientCertAuth bool | ||
var address string | ||
if p == CQL { | ||
address = nodeInfoResp.CQLAddr(host) | ||
tlsEnabled, clientCertAuth = nodeInfoResp.CQLTLSEnabled() | ||
tlsEnabled = tlsEnabled && !c.ForceTLSDisabled | ||
if tlsEnabled && !c.ForceNonSSLSessionPort { | ||
address = nodeInfoResp.CQLSSLAddr(host) | ||
} | ||
} else if p == Alternator { | ||
tlsEnabled, clientCertAuth = nodeInfoResp.AlternatorTLSEnabled() | ||
address = nodeInfoResp.AlternatorAddr(host) | ||
} | ||
if tlsEnabled { | ||
tlsConfig, err := svc.tlsConfig(c.ID, clientCertAuth) | ||
if err != nil && !errors.Is(err, service.ErrNotFound) { | ||
return config, errors.Wrap(err, "fetch TLS config") | ||
} | ||
if clientCertAuth && errors.Is(err, service.ErrNotFound) { | ||
return config, errors.Wrap(err, "client encryption is enabled, but certificate is missing") | ||
} | ||
config.TLSConfig[p] = &tlsConfigWithAddress{ | ||
Config: tlsConfig, | ||
Address: address, | ||
} | ||
} | ||
} | ||
Comment on lines
+171
to
+198
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know that's how it was done before, but I don't see the reasons for keeping a map consisting of only two entries. Also this for loop which fills this map is cloudy as CQL and ALTERNATOR are still if-ed in the first half of the loop. Also getting configs from map is less clear (are the configs always there? Should we validate that every time we want to get them?) Wouldn't it be better to directly store 2 TLS configs in Node config? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are right. |
||
|
||
return config, nil | ||
} | ||
|
||
func (svc *Service) tlsConfig(clusterID uuid.UUID, clientCertAuth bool) (*tls.Config, error) { | ||
cfg := tls.Config{ | ||
InsecureSkipVerify: true, | ||
} | ||
|
||
if clientCertAuth { | ||
id := &secrets.TLSIdentity{ | ||
ClusterID: clusterID, | ||
} | ||
if err := svc.secretsStore.Get(id); err != nil { | ||
return nil, errors.Wrap(err, "get SSL user cert from secrets store") | ||
} | ||
keyPair, err := tls.X509KeyPair(id.Cert, id.PrivateKey) | ||
if err != nil { | ||
return nil, errors.Wrap(err, "invalid SSL user key pair") | ||
} | ||
cfg.Certificates = []tls.Certificate{keyPair} | ||
} | ||
|
||
return &cfg, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a note that this opens a client without closing it. We probably can't close it because of the cache, but then maybe it makes sense to create client from scratch so that it can be closed right after?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It actually doesn't lead to TCP connections leak. Here is why.
Keep-alive is enabled by default on HTTP level in go, what means that the client will reuse TCP connection for multiple requests to the same server. That is good.
Scylla-manager explicitly sets the keep-alive on TCP level too
scylla-manager/pkg/scyllaclient/client.go
Line 52 in 701b8cb
Setting keep-alive on TCP level means that inactive connections are eventually detected and closed by the operating system.
The scylla client cache is OK as is, we don't need to explicitly close the client.