From 80775bd3505d972699482d2ab6a15c38572f399c Mon Sep 17 00:00:00 2001 From: Karol Kokoszka Date: Thu, 18 Apr 2024 11:23:20 +0200 Subject: [PATCH 1/2] fix(cluster-svc): introduce interface defining functionality --- pkg/service/cluster/service.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/pkg/service/cluster/service.go b/pkg/service/cluster/service.go index b353f0a648..5ff9810d88 100644 --- a/pkg/service/cluster/service.go +++ b/pkg/service/cluster/service.go @@ -47,6 +47,20 @@ type Change struct { WithoutRepair bool } +// Servicer interface defines the responsibilities of the cluster service. +// It's a duplicate of the restapi.ClusterService, but I want to avoid doing bigger refactor +// and removing the interface from restapi package (although nothing prevents us from doing so). +type Servicer interface { + ListClusters(ctx context.Context, f *Filter) ([]*Cluster, error) + GetCluster(ctx context.Context, idOrName string) (*Cluster, error) + PutCluster(ctx context.Context, c *Cluster) error + DeleteCluster(ctx context.Context, id uuid.UUID) error + CheckCQLCredentials(id uuid.UUID) (bool, error) + DeleteCQLCredentials(ctx context.Context, id uuid.UUID) error + DeleteSSLUserCert(ctx context.Context, id uuid.UUID) error + ListNodes(ctx context.Context, id uuid.UUID) ([]Node, error) +} + // Service manages cluster configurations. type Service struct { session gocqlx.Session From b0bc2d12ebd93d76c3c08669e6dd24b7bf176fc1 Mon Sep 17 00:00:00 2001 From: Karol Kokoszka Date: Thu, 18 Apr 2024 11:23:55 +0200 Subject: [PATCH 2/2] feat(config-cache): initial stub for cluster config cache service Fixes #3802 --- pkg/service/configcache/errors.go | 12 ++ pkg/service/configcache/service.go | 223 +++++++++++++++++++++++ pkg/service/configcache/service_test.go | 229 ++++++++++++++++++++++++ 3 files changed, 464 insertions(+) create mode 100644 pkg/service/configcache/errors.go create mode 100644 pkg/service/configcache/service.go create mode 100644 pkg/service/configcache/service_test.go diff --git a/pkg/service/configcache/errors.go b/pkg/service/configcache/errors.go new file mode 100644 index 0000000000..989dcb1e2c --- /dev/null +++ b/pkg/service/configcache/errors.go @@ -0,0 +1,12 @@ +// Copyright (C) 2024 ScyllaDB + +package configcache + +import "errors" + +var ( + // ErrNoClusterConfig is thrown when there is no config for given cluster in cache. + ErrNoClusterConfig = errors.New("no cluster config available") + // ErrNoHostConfig is thrown when there is no config for given host in cache. + ErrNoHostConfig = errors.New("no host config available") +) diff --git a/pkg/service/configcache/service.go b/pkg/service/configcache/service.go new file mode 100644 index 0000000000..948d022c2c --- /dev/null +++ b/pkg/service/configcache/service.go @@ -0,0 +1,223 @@ +// Copyright (C) 2024 ScyllaDB + +package configcache + +import ( + "context" + "crypto/tls" + "fmt" + "sync" + "time" + + "github.com/pkg/errors" + "github.com/scylladb/scylla-manager/v3/pkg/scyllaclient" + "github.com/scylladb/scylla-manager/v3/pkg/secrets" + "github.com/scylladb/scylla-manager/v3/pkg/service" + "github.com/scylladb/scylla-manager/v3/pkg/service/cluster" + "github.com/scylladb/scylla-manager/v3/pkg/store" + "github.com/scylladb/scylla-manager/v3/pkg/util/uuid" +) + +// ConnectionType defines an enum for different types of configuration. +type ConnectionType int + +const ( + // CQL defines cql connection type. + CQL ConnectionType = iota + // Alternator defines alternator connection type. + Alternator + + updateFrequency = 5 * time.Minute +) + +type tlsConfigWithAddress struct { + *tls.Config + Address string +} + +// NodeConfig keeps the node current node configuration together with the TLS details per different type of connection. +type NodeConfig struct { + *scyllaclient.NodeInfo + TLSConfig map[ConnectionType]*tlsConfigWithAddress +} + +// ConfigCacher is the interface defining the cache behavior. +type ConfigCacher interface { + Read(clusterID uuid.UUID, host string) (NodeConfig, error) +} + +// Service is responsible for handling all cluster configuration cache related operations. +// Use svc.Read(clusterID, host) to read the configuration of particular host in given cluster. +// Use svc.Run() to let the cache update itself periodically with the current configuration. +// +// Current implementation doesn't care about changing IP, what means that the configuration for given cluster and host +// will stay there forever. +type Service struct { + clusterSvc cluster.Servicer + scyllaClient scyllaclient.ProviderFunc + secretsStore store.Store + + configs sync.Map +} + +// Read returns either the host configuration that is currently stored in the cache, +// ErrNoClusterConfig if config for the whole cluster doesn't exist, +// or ErrNoHostConfig if config of the particular host doesn't exist. +func (svc *Service) Read(clusterID uuid.UUID, host string) (NodeConfig, error) { + emptyConfig := NodeConfig{} + + clusterConfig, err := svc.readClusterConfig(clusterID) + if err != nil { + return emptyConfig, err + } + + rawHostConfig, ok := clusterConfig.Load(host) + if !ok { + return emptyConfig, ErrNoHostConfig + } + hostConfig, ok := rawHostConfig.(NodeConfig) + if !ok { + panic("cluster host emptyConfig cache stores unexpected type") + } + return hostConfig, nil +} + +func (svc *Service) readClusterConfig(clusterID uuid.UUID) (*sync.Map, error) { + emptyConfig := &sync.Map{} + + rawClusterConfig, ok := svc.configs.Load(clusterID) + if !ok { + return emptyConfig, ErrNoClusterConfig + } + clusterConfig, ok := rawClusterConfig.(*sync.Map) + if !ok { + panic("cluster cache emptyConfig stores unexpected type") + } + return clusterConfig, nil +} + +// Run starts the infinity loop responsible for updating the clusters configuration periodically. +func (svc *Service) Run(ctx context.Context) { + freq := time.NewTicker(updateFrequency) + + for { + // make sure to shut down when the context is cancelled + select { + case <-ctx.Done(): + return + default: + } + + select { + case <-freq.C: + svc.update(ctx) + case <-ctx.Done(): + return + } + } +} + +func (svc *Service) update(ctx context.Context) { + clusters, err := svc.clusterSvc.ListClusters(ctx, nil) + if err != nil { + fmt.Println(err) + } + + for _, c := range clusters { + c := c + currentClusterConfig, err := svc.readClusterConfig(c.ID) + if err != nil && errors.Is(err, ErrNoClusterConfig) { + fmt.Println(err) + } + + go func() { + client, err := svc.scyllaClient(ctx, c.ID) + if err != nil { + fmt.Println(err) + } + + // Hosts that are going to be asked about the configuration are exactly the same as + // the ones used by the scylla client. + hostsWg := sync.WaitGroup{} + for _, host := range client.Config().Hosts { + hostsWg.Add(1) + host := host + go func() { + defer hostsWg.Done() + + config, err := svc.retrieveClusterHostConfig(ctx, host, client, c) + if err != nil { + fmt.Println(err) + } + currentClusterConfig.Store(host, config) + }() + } + hostsWg.Wait() + }() + } +} + +func (svc *Service) retrieveClusterHostConfig(ctx context.Context, host string, client *scyllaclient.Client, + c *cluster.Cluster, +) (*NodeConfig, error) { + config := &NodeConfig{} + + nodeInfoResp, err := client.NodeInfo(ctx, host) + if err != nil { + return config, errors.Wrap(err, "fetch node info") + } + + config.TLSConfig = make(map[ConnectionType]*tlsConfigWithAddress, 2) + for _, p := range []ConnectionType{CQL, Alternator} { + var tlsEnabled, clientCertAuth bool + var address string + if p == CQL { + address = nodeInfoResp.CQLAddr(host) + tlsEnabled, clientCertAuth = nodeInfoResp.CQLTLSEnabled() + tlsEnabled = tlsEnabled && !c.ForceTLSDisabled + if tlsEnabled && !c.ForceNonSSLSessionPort { + address = nodeInfoResp.CQLSSLAddr(host) + } + } else if p == Alternator { + tlsEnabled, clientCertAuth = nodeInfoResp.AlternatorTLSEnabled() + address = nodeInfoResp.AlternatorAddr(host) + } + if tlsEnabled { + tlsConfig, err := svc.tlsConfig(c.ID, clientCertAuth) + if err != nil && !errors.Is(err, service.ErrNotFound) { + return config, errors.Wrap(err, "fetch TLS config") + } + if clientCertAuth && errors.Is(err, service.ErrNotFound) { + return config, errors.Wrap(err, "client encryption is enabled, but certificate is missing") + } + config.TLSConfig[p] = &tlsConfigWithAddress{ + Config: tlsConfig, + Address: address, + } + } + } + + return config, nil +} + +func (svc *Service) tlsConfig(clusterID uuid.UUID, clientCertAuth bool) (*tls.Config, error) { + cfg := tls.Config{ + InsecureSkipVerify: true, + } + + if clientCertAuth { + id := &secrets.TLSIdentity{ + ClusterID: clusterID, + } + if err := svc.secretsStore.Get(id); err != nil { + return nil, errors.Wrap(err, "get SSL user cert from secrets store") + } + keyPair, err := tls.X509KeyPair(id.Cert, id.PrivateKey) + if err != nil { + return nil, errors.Wrap(err, "invalid SSL user key pair") + } + cfg.Certificates = []tls.Certificate{keyPair} + } + + return &cfg, nil +} diff --git a/pkg/service/configcache/service_test.go b/pkg/service/configcache/service_test.go new file mode 100644 index 0000000000..a199a6480c --- /dev/null +++ b/pkg/service/configcache/service_test.go @@ -0,0 +1,229 @@ +// Copyright (C) 2024 ScyllaDB + +package configcache + +import ( + "context" + "crypto/sha256" + "encoding/json" + "sync" + "testing" + "time" + + "github.com/scylladb/scylla-manager/v3/pkg/scyllaclient" + "github.com/scylladb/scylla-manager/v3/pkg/service/cluster" + "github.com/scylladb/scylla-manager/v3/pkg/store" + "github.com/scylladb/scylla-manager/v3/pkg/util/uuid" +) + +func TestService_Read(t *testing.T) { + emptyConfigHash, err := NodeConfig{}.sha256hash() + if err != nil { + t.Fatalf("unable to create sha256 hash out of empty NodeConfig, err = {%v}", err) + } + host1NodeConfig := NodeConfig{ + NodeInfo: &scyllaclient.NodeInfo{ + AgentVersion: "expectedVersion", + }, + } + host1ConfigHash, err := host1NodeConfig.sha256hash() + if err != nil { + t.Fatal(err) + } + + cluster1UUID, err := uuid.NewRandom() + host1ID := "host1" + if err != nil { + t.Fatalf("unable to generate random UUID for test purpose") + } + initialState := convertMapToSyncMap( + map[any]any{ + cluster1UUID: convertMapToSyncMap( + map[any]any{ + host1ID: host1NodeConfig, + }, + ), + }, + ) + + for _, tc := range []struct { + name string + cluster uuid.UUID + host string + state *sync.Map + resultErr error + resultConfigHash [32]byte + }{ + { + name: "host configuration doesn't exist", + host: "host_that_does_not_exist", + cluster: cluster1UUID, + state: initialState, + resultErr: ErrNoHostConfig, + resultConfigHash: emptyConfigHash, + }, + { + name: "cluster configuration doesn't exist", + host: host1ID, + cluster: uuid.Nil, + state: initialState, + resultErr: ErrNoClusterConfig, + resultConfigHash: emptyConfigHash, + }, + { + name: "retrieves the config", + host: host1ID, + cluster: cluster1UUID, + state: initialState, + resultErr: nil, + resultConfigHash: host1ConfigHash, + }, + } { + t.Run(tc.name, func(t *testing.T) { + // Given + svc := Service{ + clusterSvc: &mockClusterServicer{}, + scyllaClient: mockProviderFunc, + secretsStore: &mockStore{}, + configs: *tc.state, + } + + // When + conf, err := svc.Read(tc.cluster, tc.host) + + // Then + if err != tc.resultErr { + t.Fatalf("expected error = {%v}, but got {%v}", tc.resultErr, err) + } + confHash, err := conf.sha256hash() + if err != nil { + t.Fatalf("unable to create hash out of NodeConf, err = {%v}", err) + } + if confHash != tc.resultConfigHash { + t.Fatalf("expected hash = {%s}, but got {%s}", tc.resultConfigHash, confHash) + } + }) + } +} + +func TestService_Run(t *testing.T) { + t.Run("validate context cancellation handling", func(t *testing.T) { + svc := Service{ + clusterSvc: &mockClusterServicer{}, + scyllaClient: mockProviderFunc, + secretsStore: &mockStore{}, + configs: sync.Map{}, + } + + ctx, cancel := context.WithCancel(context.Background()) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + + svc.Run(ctx) + }() + + time.Sleep(3 * time.Second) + cancel() + + wg.Wait() + }) +} + +func (nc NodeConfig) sha256hash() (hash [32]byte, err error) { + data, err := json.Marshal(nc) + if err != nil { + return hash, err + } + + return sha256.Sum256(data), nil +} + +// utility functions +func convertMapToSyncMap(m map[any]any) *sync.Map { + syncM := sync.Map{} + for k, v := range m { + syncM.Store(k, v) + } + return &syncM +} + +// internal mock implementation of interfaces + +// mockClusterServicer implements the Servicer interface as a mock for testing purposes. +type mockClusterServicer struct{} + +// ListClusters mocks the ListClusters method of Servicer. +func (s *mockClusterServicer) ListClusters(ctx context.Context, f *cluster.Filter) ([]*cluster.Cluster, error) { + return nil, nil +} + +// GetCluster mocks the GetCluster method of Servicer. +func (s *mockClusterServicer) GetCluster(ctx context.Context, idOrName string) (*cluster.Cluster, error) { + return nil, nil +} + +// PutCluster mocks the PutCluster method of Servicer. +func (s *mockClusterServicer) PutCluster(ctx context.Context, c *cluster.Cluster) error { + return nil +} + +// DeleteCluster mocks the DeleteCluster method of Servicer. +func (s *mockClusterServicer) DeleteCluster(ctx context.Context, id uuid.UUID) error { + return nil +} + +// CheckCQLCredentials mocks the CheckCQLCredentials method of Servicer. +func (s *mockClusterServicer) CheckCQLCredentials(id uuid.UUID) (bool, error) { + return false, nil +} + +// DeleteCQLCredentials mocks the DeleteCQLCredentials method of Servicer. +func (s *mockClusterServicer) DeleteCQLCredentials(ctx context.Context, id uuid.UUID) error { + return nil +} + +// DeleteSSLUserCert mocks the DeleteSSLUserCert method of Servicer. +func (s *mockClusterServicer) DeleteSSLUserCert(ctx context.Context, id uuid.UUID) error { + return nil +} + +// ListNodes mocks the ListNodes method of Servicer. +func (s *mockClusterServicer) ListNodes(ctx context.Context, id uuid.UUID) ([]cluster.Node, error) { + return nil, nil +} + +// mockStore implements the Store interface as a mock for testing purposes. +type mockStore struct{} + +// Put mocks the Put method of Store. +func (s *mockStore) Put(v store.Entry) error { + return nil +} + +// Get mocks the Get method of Store. +func (s *mockStore) Get(v store.Entry) error { + return nil +} + +// Check mocks the Check method of Store. +func (s *mockStore) Check(v store.Entry) (bool, error) { + return false, nil +} + +// Delete mocks the Delete method of Store. +func (s *mockStore) Delete(v store.Entry) error { + return nil +} + +// DeleteAll mocks the DeleteAll method of Store. +func (s *mockStore) DeleteAll(clusterID uuid.UUID) error { + return nil +} + +var ( + mockProviderFunc = func(ctx context.Context, clusterID uuid.UUID) (*scyllaclient.Client, error) { + return nil, nil + } +)