From 96be35a290a651b946896a7eb166ddba0c42192c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Fri, 13 Dec 2024 14:22:43 +0100 Subject: [PATCH 1/3] chore(pkg): export config cache to other services This is the first general step toward fixing #3892. It allows for accessing config cache service from other services. Actually using config cache service will be implemented in separate PRs. Ref #3892 --- pkg/cmd/scylla-manager/server.go | 3 ++ pkg/service/backup/service.go | 8 +++- .../backup/service_backup_integration_test.go | 1 + pkg/service/repair/service.go | 6 ++- .../repair/service_repair_integration_test.go | 2 + .../restore/helper_integration_test.go | 5 +++ pkg/service/restore/service.go | 6 ++- .../service_restore_integration_test.go | 5 +++ pkg/testutils/testhelper/helper.go | 38 +++++++++++++++++-- 9 files changed, 67 insertions(+), 7 deletions(-) diff --git a/pkg/cmd/scylla-manager/server.go b/pkg/cmd/scylla-manager/server.go index 14d0e9b388..d7d6b82761 100644 --- a/pkg/cmd/scylla-manager/server.go +++ b/pkg/cmd/scylla-manager/server.go @@ -102,6 +102,7 @@ func (s *server) makeServices(ctx context.Context) error { metrics.NewRepairMetrics().MustRegister(), s.clusterSvc.Client, s.clusterSvc.GetSession, + s.configCacheSvc, s.logger.Named("repair"), ) if err != nil { @@ -115,6 +116,7 @@ func (s *server) makeServices(ctx context.Context) error { s.clusterSvc.GetClusterName, s.clusterSvc.Client, s.clusterSvc.GetSession, + s.configCacheSvc, s.logger.Named("backup"), ) if err != nil { @@ -128,6 +130,7 @@ func (s *server) makeServices(ctx context.Context) error { metrics.NewRestoreMetrics().MustRegister(), s.clusterSvc.Client, s.clusterSvc.GetSession, + s.configCacheSvc, s.logger.Named("restore"), ) if err != nil { diff --git a/pkg/service/backup/service.go b/pkg/service/backup/service.go index 4456517106..048fb09275 100644 --- a/pkg/service/backup/service.go +++ b/pkg/service/backup/service.go @@ -22,6 +22,7 @@ import ( "github.com/scylladb/scylla-manager/v3/pkg/scyllaclient" . "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" "github.com/scylladb/scylla-manager/v3/pkg/service/cluster" + "github.com/scylladb/scylla-manager/v3/pkg/service/configcache" "github.com/scylladb/scylla-manager/v3/pkg/service/scheduler" "github.com/scylladb/scylla-manager/v3/pkg/util" "github.com/scylladb/scylla-manager/v3/pkg/util/inexlist/dcfilter" @@ -45,13 +46,15 @@ type Service struct { clusterName cluster.NameFunc scyllaClient scyllaclient.ProviderFunc clusterSession cluster.SessionFunc + configCache configcache.ConfigCacher logger log.Logger dth deduplicateTestHooks } -func NewService(session gocqlx.Session, config Config, metrics metrics.BackupMetrics, - clusterName cluster.NameFunc, scyllaClient scyllaclient.ProviderFunc, clusterSession cluster.SessionFunc, logger log.Logger, +func NewService(session gocqlx.Session, config Config, metrics metrics.BackupMetrics, clusterName cluster.NameFunc, + scyllaClient scyllaclient.ProviderFunc, clusterSession cluster.SessionFunc, configCache configcache.ConfigCacher, + logger log.Logger, ) (*Service, error) { if session.Session == nil || session.Closed() { return nil, errors.New("invalid session") @@ -76,6 +79,7 @@ func NewService(session gocqlx.Session, config Config, metrics metrics.BackupMet clusterName: clusterName, scyllaClient: scyllaClient, clusterSession: clusterSession, + configCache: configCache, logger: logger, }, nil } diff --git a/pkg/service/backup/service_backup_integration_test.go b/pkg/service/backup/service_backup_integration_test.go index 0b25c39aa1..5ed5d091c7 100644 --- a/pkg/service/backup/service_backup_integration_test.go +++ b/pkg/service/backup/service_backup_integration_test.go @@ -127,6 +127,7 @@ func newTestServiceWithUser(t *testing.T, session gocqlx.Session, client *scylla } return CreateManagedClusterSession(t, false, client, user, pass), nil }, + NewTestConfigCacheSvc(t, client.Config().Hosts), logger.Named("backup"), ) if err != nil { diff --git a/pkg/service/repair/service.go b/pkg/service/repair/service.go index 2ecc1484ba..0dc0c95aea 100644 --- a/pkg/service/repair/service.go +++ b/pkg/service/repair/service.go @@ -18,6 +18,7 @@ import ( "github.com/scylladb/scylla-manager/v3/pkg/schema/table" "github.com/scylladb/scylla-manager/v3/pkg/scyllaclient" "github.com/scylladb/scylla-manager/v3/pkg/service/cluster" + "github.com/scylladb/scylla-manager/v3/pkg/service/configcache" "github.com/scylladb/scylla-manager/v3/pkg/util" "github.com/scylladb/scylla-manager/v3/pkg/util/inexlist/dcfilter" "github.com/scylladb/scylla-manager/v3/pkg/util/inexlist/ksfilter" @@ -38,6 +39,7 @@ type Service struct { scyllaClient scyllaclient.ProviderFunc clusterSession cluster.SessionFunc + configCache configcache.ConfigCacher logger log.Logger intensityHandlers map[uuid.UUID]*intensityParallelHandler @@ -45,7 +47,8 @@ type Service struct { } func NewService(session gocqlx.Session, config Config, metrics metrics.RepairMetrics, - scyllaClient scyllaclient.ProviderFunc, clusterSession cluster.SessionFunc, logger log.Logger, + scyllaClient scyllaclient.ProviderFunc, clusterSession cluster.SessionFunc, configCache configcache.ConfigCacher, + logger log.Logger, ) (*Service, error) { if err := config.Validate(); err != nil { return nil, errors.Wrap(err, "invalid config") @@ -61,6 +64,7 @@ func NewService(session gocqlx.Session, config Config, metrics metrics.RepairMet metrics: metrics, scyllaClient: scyllaClient, clusterSession: clusterSession, + configCache: configCache, logger: logger, intensityHandlers: make(map[uuid.UUID]*intensityParallelHandler), }, nil diff --git a/pkg/service/repair/service_repair_integration_test.go b/pkg/service/repair/service_repair_integration_test.go index 6c8d5c5493..51d20a55e3 100644 --- a/pkg/service/repair/service_repair_integration_test.go +++ b/pkg/service/repair/service_repair_integration_test.go @@ -402,6 +402,7 @@ func newTestService(t *testing.T, session gocqlx.Session, client *scyllaclient.C func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) { return gocqlx.Session{}, errors.New("not implemented") }, + NewTestConfigCacheSvc(t, client.Config().Hosts), logger.Named("repair"), ) if err != nil { @@ -424,6 +425,7 @@ func newTestServiceWithClusterSession(t *testing.T, session gocqlx.Session, clie func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) { return CreateSession(t, client), nil }, + NewTestConfigCacheSvc(t, client.Config().Hosts), logger.Named("repair"), ) if err != nil { diff --git a/pkg/service/restore/helper_integration_test.go b/pkg/service/restore/helper_integration_test.go index 517884efcb..c4a7934780 100644 --- a/pkg/service/restore/helper_integration_test.go +++ b/pkg/service/restore/helper_integration_test.go @@ -135,6 +135,7 @@ func newBackupSvc(t *testing.T, mgrSession gocqlx.Session, client *scyllaclient. func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) { return CreateSession(t, client), nil }, + NewTestConfigCacheSvc(t, client.Config().Hosts), log.NewDevelopmentWithLevel(zapcore.ErrorLevel).Named("backup"), ) if err != nil { @@ -144,6 +145,8 @@ func newBackupSvc(t *testing.T, mgrSession gocqlx.Session, client *scyllaclient. } func newRestoreSvc(t *testing.T, mgrSession gocqlx.Session, client *scyllaclient.Client, user, pass string) *Service { + configCacheSvc := NewTestConfigCacheSvc(t, client.Config().Hosts) + repairSvc, err := repair.NewService( mgrSession, repair.DefaultConfig(), @@ -154,6 +157,7 @@ func newRestoreSvc(t *testing.T, mgrSession gocqlx.Session, client *scyllaclient func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) { return CreateSession(t, client), nil }, + configCacheSvc, log.NewDevelopmentWithLevel(zapcore.ErrorLevel).Named("repair"), ) if err != nil { @@ -171,6 +175,7 @@ func newRestoreSvc(t *testing.T, mgrSession gocqlx.Session, client *scyllaclient func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) { return CreateManagedClusterSession(t, false, client, user, pass), nil }, + configCacheSvc, log.NewDevelopmentWithLevel(zapcore.InfoLevel).Named("restore"), ) if err != nil { diff --git a/pkg/service/restore/service.go b/pkg/service/restore/service.go index 5de6f14a2c..82215a6c4e 100644 --- a/pkg/service/restore/service.go +++ b/pkg/service/restore/service.go @@ -15,6 +15,7 @@ import ( "github.com/scylladb/scylla-manager/v3/pkg/schema/table" "github.com/scylladb/scylla-manager/v3/pkg/scyllaclient" "github.com/scylladb/scylla-manager/v3/pkg/service/cluster" + "github.com/scylladb/scylla-manager/v3/pkg/service/configcache" "github.com/scylladb/scylla-manager/v3/pkg/service/repair" "github.com/scylladb/scylla-manager/v3/pkg/util/uuid" ) @@ -29,11 +30,13 @@ type Service struct { scyllaClient scyllaclient.ProviderFunc clusterSession cluster.SessionFunc + configCache configcache.ConfigCacher logger log.Logger } func NewService(repairSvc *repair.Service, session gocqlx.Session, config Config, metrics metrics.RestoreMetrics, - scyllaClient scyllaclient.ProviderFunc, clusterSession cluster.SessionFunc, logger log.Logger, + scyllaClient scyllaclient.ProviderFunc, clusterSession cluster.SessionFunc, configCache configcache.ConfigCacher, + logger log.Logger, ) (*Service, error) { if session.Session == nil || session.Closed() { return nil, errors.New("invalid session") @@ -53,6 +56,7 @@ func NewService(repairSvc *repair.Service, session gocqlx.Session, config Config metrics: metrics, scyllaClient: scyllaClient, clusterSession: clusterSession, + configCache: configCache, logger: logger, }, nil } diff --git a/pkg/service/restore/service_restore_integration_test.go b/pkg/service/restore/service_restore_integration_test.go index 810edbe934..39f8c0449b 100644 --- a/pkg/service/restore/service_restore_integration_test.go +++ b/pkg/service/restore/service_restore_integration_test.go @@ -109,6 +109,8 @@ func newTestClient(t *testing.T, hrt *HackableRoundTripper, logger log.Logger, c func newTestService(t *testing.T, session gocqlx.Session, client *scyllaclient.Client, c Config, logger log.Logger, user, pass string) (*Service, *backup.Service) { t.Helper() + configCacheSvc := NewTestConfigCacheSvc(t, client.Config().Hosts) + repairSvc, err := repair.NewService( session, repair.DefaultConfig(), @@ -119,6 +121,7 @@ func newTestService(t *testing.T, session gocqlx.Session, client *scyllaclient.C func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) { return CreateManagedClusterSession(t, false, client, user, pass), nil }, + configCacheSvc, log.NewDevelopmentWithLevel(zapcore.ErrorLevel).Named("repair"), ) if err != nil { @@ -138,6 +141,7 @@ func newTestService(t *testing.T, session gocqlx.Session, client *scyllaclient.C func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) { return CreateManagedClusterSession(t, false, client, user, pass), nil }, + configCacheSvc, log.NewDevelopmentWithLevel(zapcore.ErrorLevel).Named("backup"), ) if err != nil { @@ -155,6 +159,7 @@ func newTestService(t *testing.T, session gocqlx.Session, client *scyllaclient.C func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) { return CreateManagedClusterSession(t, false, client, user, pass), nil }, + configCacheSvc, logger.Named("restore"), ) if err != nil { diff --git a/pkg/testutils/testhelper/helper.go b/pkg/testutils/testhelper/helper.go index b7188d7f5e..01a2b115d4 100644 --- a/pkg/testutils/testhelper/helper.go +++ b/pkg/testutils/testhelper/helper.go @@ -5,12 +5,20 @@ package testhelper import ( "context" "testing" + "time" "github.com/scylladb/go-log" "github.com/scylladb/gocqlx/v2" + "github.com/scylladb/scylla-manager/v3/pkg/config/server" + "github.com/scylladb/scylla-manager/v3/pkg/metrics" + "github.com/scylladb/scylla-manager/v3/pkg/schema/table" + "github.com/scylladb/scylla-manager/v3/pkg/service/cluster" + "github.com/scylladb/scylla-manager/v3/pkg/service/configcache" + "github.com/scylladb/scylla-manager/v3/pkg/store" "github.com/scylladb/scylla-manager/v3/pkg/scyllaclient" - "github.com/scylladb/scylla-manager/v3/pkg/testutils" + . "github.com/scylladb/scylla-manager/v3/pkg/testutils" + . "github.com/scylladb/scylla-manager/v3/pkg/testutils/db" "github.com/scylladb/scylla-manager/v3/pkg/util/uuid" ) @@ -18,7 +26,7 @@ import ( type CommonTestHelper struct { Logger log.Logger Session gocqlx.Session - Hrt *testutils.HackableRoundTripper + Hrt *HackableRoundTripper Client *scyllaclient.Client ClusterID uuid.UUID @@ -57,7 +65,7 @@ func (h *CommonTestHelper) RestartAgents() { func execOnAllHosts(h *CommonTestHelper, cmd string) { h.T.Helper() for _, host := range h.GetAllHosts() { - stdout, stderr, err := testutils.ExecOnHost(host, cmd) + stdout, stderr, err := ExecOnHost(host, cmd) if err != nil { h.T.Log("stdout", stdout) h.T.Log("stderr", stderr) @@ -65,3 +73,27 @@ func execOnAllHosts(h *CommonTestHelper, cmd string) { } } } + +// NewTestConfigCacheSvc creates default config cache service which can be used +// for testing other services relaying on it. +func NewTestConfigCacheSvc(t *testing.T, hosts []string) configcache.ConfigCacher { + t.Helper() + + session := CreateScyllaManagerDBSession(t) + secretsStore := store.NewTableStore(session, table.Secrets) + + clusterSvc, err := cluster.NewService(session, metrics.NewClusterMetrics(), secretsStore, + scyllaclient.DefaultTimeoutConfig(), server.DefaultConfig().ClientCacheTimeout, log.NewDevelopment()) + if err != nil { + t.Fatal(err) + } + + scyllaClientProvider := func(context.Context, uuid.UUID) (*scyllaclient.Client, error) { + sc := scyllaclient.TestConfig(hosts, AgentAuthToken()) + sc.Timeout = time.Second + sc.Transport = NewHackableRoundTripper(scyllaclient.DefaultTransport()) + return scyllaclient.NewClient(sc, log.NewDevelopment()) + } + + return configcache.NewService(configcache.DefaultConfig(), clusterSvc, scyllaClientProvider, secretsStore, log.NewDevelopment()) +} From c6a71a5034961bdec977c55c7320b1197006368d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Fri, 13 Dec 2024 15:02:43 +0100 Subject: [PATCH 2/3] feat(configcache): add ReadAll method It's going to be useful when fixing #3892. Ref #3892 --- pkg/service/configcache/service.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/pkg/service/configcache/service.go b/pkg/service/configcache/service.go index 25060fb906..f868417674 100644 --- a/pkg/service/configcache/service.go +++ b/pkg/service/configcache/service.go @@ -22,6 +22,9 @@ type ConfigCacher interface { // or ErrNoHostConfig if config of the particular host doesn't exist. Read(clusterID uuid.UUID, host string) (NodeConfig, error) + // ReadAll calls Read on all AvailableHosts. + ReadAll(clusterID uuid.UUID) (map[string]NodeConfig, error) + // AvailableHosts returns list of hosts of given cluster that keep their configuration in cache. AvailableHosts(ctx context.Context, clusterID uuid.UUID) ([]string, error) @@ -88,6 +91,23 @@ func (svc *Service) Read(clusterID uuid.UUID, host string) (NodeConfig, error) { return hostConfig, nil } +// ReadAll calls Read on AvailableHosts. +func (svc *Service) ReadAll(clusterID uuid.UUID) (map[string]NodeConfig, error) { + clusterConfig, err := svc.readClusterConfig(clusterID) + if err != nil { + return nil, err + } + + out := make(map[string]NodeConfig) + clusterConfig.Range(func(key, value any) bool { + host := key.(string) + cfg := value.(NodeConfig) + out[host] = cfg + return true + }) + return out, nil +} + // Run starts the infinity loop responsible for updating the clusters configuration periodically. func (svc *Service) Run(ctx context.Context) { freq := time.NewTicker(svc.svcConfig.UpdateFrequency) From b9a178e743b4c3497c3c1f3c31316c33e646b951 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Mon, 16 Dec 2024 12:43:17 +0100 Subject: [PATCH 3/3] fix(pkg): init config cache with cluster --- .../backup/service_backup_integration_test.go | 8 ++-- .../repair/service_repair_integration_test.go | 18 ++++---- .../restore/helper_integration_test.go | 12 ++--- .../service_restore_integration_test.go | 6 +-- pkg/testutils/testconfig/testconfig.go | 6 ++- pkg/testutils/testhelper/helper.go | 46 +++++++++++++++++-- 6 files changed, 70 insertions(+), 26 deletions(-) diff --git a/pkg/service/backup/service_backup_integration_test.go b/pkg/service/backup/service_backup_integration_test.go index 5ed5d091c7..e9146dca0a 100644 --- a/pkg/service/backup/service_backup_integration_test.go +++ b/pkg/service/backup/service_backup_integration_test.go @@ -63,11 +63,11 @@ func newBackupTestHelperWithUser(t *testing.T, session gocqlx.Session, config ba S3InitBucket(t, location.Path) clusterID := uuid.MustRandom() - logger := log.NewDevelopmentWithLevel(zapcore.InfoLevel) + hrt := NewHackableRoundTripper(scyllaclient.DefaultTransport()) client := newTestClient(t, hrt, logger.Named("client"), clientConf) - service := newTestServiceWithUser(t, session, client, config, logger, user, pass) + service := newTestServiceWithUser(t, session, client, config, logger, clusterID, user, pass) cHelper := &CommonTestHelper{ Session: session, Hrt: hrt, @@ -108,7 +108,7 @@ func newTestClient(t *testing.T, hrt *HackableRoundTripper, logger log.Logger, c return c } -func newTestServiceWithUser(t *testing.T, session gocqlx.Session, client *scyllaclient.Client, c backup.Config, logger log.Logger, user, pass string) *backup.Service { +func newTestServiceWithUser(t *testing.T, session gocqlx.Session, client *scyllaclient.Client, c backup.Config, logger log.Logger, clusterID uuid.UUID, user, pass string) *backup.Service { t.Helper() s, err := backup.NewService( @@ -127,7 +127,7 @@ func newTestServiceWithUser(t *testing.T, session gocqlx.Session, client *scylla } return CreateManagedClusterSession(t, false, client, user, pass), nil }, - NewTestConfigCacheSvc(t, client.Config().Hosts), + NewTestConfigCacheSvc(t, clusterID, client.Config().Hosts), logger.Named("backup"), ) if err != nil { diff --git a/pkg/service/repair/service_repair_integration_test.go b/pkg/service/repair/service_repair_integration_test.go index 51d20a55e3..d382ed0803 100644 --- a/pkg/service/repair/service_repair_integration_test.go +++ b/pkg/service/repair/service_repair_integration_test.go @@ -60,12 +60,13 @@ type repairTestHelper struct { func newRepairTestHelper(t *testing.T, session gocqlx.Session, config repair.Config) *repairTestHelper { t.Helper() + clusterID := uuid.MustRandom() logger := log.NewDevelopmentWithLevel(zapcore.InfoLevel) hrt := NewHackableRoundTripper(scyllaclient.DefaultTransport()) hrt.SetInterceptor(repairInterceptor(scyllaclient.CommandSuccessful)) c := newTestClient(t, hrt, log.NopLogger) - s := newTestService(t, session, c, config, logger) + s := newTestService(t, session, c, config, logger, clusterID) return &repairTestHelper{ CommonTestHelper: &CommonTestHelper{ @@ -73,7 +74,7 @@ func newRepairTestHelper(t *testing.T, session gocqlx.Session, config repair.Con Session: session, Hrt: hrt, Client: c, - ClusterID: uuid.MustRandom(), + ClusterID: clusterID, TaskID: uuid.MustRandom(), RunID: uuid.NewTime(), T: t, @@ -86,8 +87,9 @@ func newRepairWithClusterSessionTestHelper(t *testing.T, session gocqlx.Session, hrt *HackableRoundTripper, c *scyllaclient.Client, config repair.Config) *repairTestHelper { t.Helper() + clusterID := uuid.MustRandom() logger := log.NewDevelopmentWithLevel(zapcore.InfoLevel) - s := newTestServiceWithClusterSession(t, session, c, config, logger) + s := newTestServiceWithClusterSession(t, session, c, config, logger, clusterID) return &repairTestHelper{ CommonTestHelper: &CommonTestHelper{ @@ -95,7 +97,7 @@ func newRepairWithClusterSessionTestHelper(t *testing.T, session gocqlx.Session, Session: session, Hrt: hrt, Client: c, - ClusterID: uuid.MustRandom(), + ClusterID: clusterID, TaskID: uuid.MustRandom(), RunID: uuid.NewTime(), T: t, @@ -389,7 +391,7 @@ func newTestClient(t *testing.T, hrt *HackableRoundTripper, logger log.Logger) * return c } -func newTestService(t *testing.T, session gocqlx.Session, client *scyllaclient.Client, c repair.Config, logger log.Logger) *repair.Service { +func newTestService(t *testing.T, session gocqlx.Session, client *scyllaclient.Client, c repair.Config, logger log.Logger, clusterID uuid.UUID) *repair.Service { t.Helper() s, err := repair.NewService( @@ -402,7 +404,7 @@ func newTestService(t *testing.T, session gocqlx.Session, client *scyllaclient.C func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) { return gocqlx.Session{}, errors.New("not implemented") }, - NewTestConfigCacheSvc(t, client.Config().Hosts), + NewTestConfigCacheSvc(t, clusterID, client.Config().Hosts), logger.Named("repair"), ) if err != nil { @@ -412,7 +414,7 @@ func newTestService(t *testing.T, session gocqlx.Session, client *scyllaclient.C return s } -func newTestServiceWithClusterSession(t *testing.T, session gocqlx.Session, client *scyllaclient.Client, c repair.Config, logger log.Logger) *repair.Service { +func newTestServiceWithClusterSession(t *testing.T, session gocqlx.Session, client *scyllaclient.Client, c repair.Config, logger log.Logger, clusterID uuid.UUID) *repair.Service { t.Helper() s, err := repair.NewService( @@ -425,7 +427,7 @@ func newTestServiceWithClusterSession(t *testing.T, session gocqlx.Session, clie func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) { return CreateSession(t, client), nil }, - NewTestConfigCacheSvc(t, client.Config().Hosts), + NewTestConfigCacheSvc(t, clusterID, client.Config().Hosts), logger.Named("repair"), ) if err != nil { diff --git a/pkg/service/restore/helper_integration_test.go b/pkg/service/restore/helper_integration_test.go index c4a7934780..648e86933c 100644 --- a/pkg/service/restore/helper_integration_test.go +++ b/pkg/service/restore/helper_integration_test.go @@ -113,15 +113,15 @@ func newTestHelper(t *testing.T, srcHosts, dstHosts []string) *testHelper { return &testHelper{ srcCluster: srcCluster, - srcBackupSvc: newBackupSvc(t, srcCluster.Session, srcCluster.Client), + srcBackupSvc: newBackupSvc(t, srcCluster.Session, srcCluster.Client, srcCluster.ClusterID), dstCluster: dstCluster, - dstRestoreSvc: newRestoreSvc(t, dstCluster.Session, dstCluster.Client, user, pass), + dstRestoreSvc: newRestoreSvc(t, dstCluster.Session, dstCluster.Client, dstCluster.ClusterID, user, pass), dstUser: user, dstPass: pass, } } -func newBackupSvc(t *testing.T, mgrSession gocqlx.Session, client *scyllaclient.Client) *backup.Service { +func newBackupSvc(t *testing.T, mgrSession gocqlx.Session, client *scyllaclient.Client, clusterID uuid.UUID) *backup.Service { svc, err := backup.NewService( mgrSession, defaultBackupTestConfig(), @@ -135,7 +135,7 @@ func newBackupSvc(t *testing.T, mgrSession gocqlx.Session, client *scyllaclient. func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) { return CreateSession(t, client), nil }, - NewTestConfigCacheSvc(t, client.Config().Hosts), + NewTestConfigCacheSvc(t, clusterID, client.Config().Hosts), log.NewDevelopmentWithLevel(zapcore.ErrorLevel).Named("backup"), ) if err != nil { @@ -144,8 +144,8 @@ func newBackupSvc(t *testing.T, mgrSession gocqlx.Session, client *scyllaclient. return svc } -func newRestoreSvc(t *testing.T, mgrSession gocqlx.Session, client *scyllaclient.Client, user, pass string) *Service { - configCacheSvc := NewTestConfigCacheSvc(t, client.Config().Hosts) +func newRestoreSvc(t *testing.T, mgrSession gocqlx.Session, client *scyllaclient.Client, clusterID uuid.UUID, user, pass string) *Service { + configCacheSvc := NewTestConfigCacheSvc(t, clusterID, client.Config().Hosts) repairSvc, err := repair.NewService( mgrSession, diff --git a/pkg/service/restore/service_restore_integration_test.go b/pkg/service/restore/service_restore_integration_test.go index 39f8c0449b..6b3b6b5c49 100644 --- a/pkg/service/restore/service_restore_integration_test.go +++ b/pkg/service/restore/service_restore_integration_test.go @@ -64,7 +64,7 @@ func newRestoreTestHelper(t *testing.T, session gocqlx.Session, config Config, l logger := log.NewDevelopmentWithLevel(zapcore.InfoLevel) hrt := NewHackableRoundTripper(scyllaclient.DefaultTransport()) client := newTestClient(t, hrt, logger.Named("client"), clientConf) - service, backupSvc := newTestService(t, session, client, config, logger, user, pass) + service, backupSvc := newTestService(t, session, client, config, logger, clusterID, user, pass) cHelper := &CommonTestHelper{ Session: session, Hrt: hrt, @@ -106,10 +106,10 @@ func newTestClient(t *testing.T, hrt *HackableRoundTripper, logger log.Logger, c return c } -func newTestService(t *testing.T, session gocqlx.Session, client *scyllaclient.Client, c Config, logger log.Logger, user, pass string) (*Service, *backup.Service) { +func newTestService(t *testing.T, session gocqlx.Session, client *scyllaclient.Client, c Config, logger log.Logger, clusterID uuid.UUID, user, pass string) (*Service, *backup.Service) { t.Helper() - configCacheSvc := NewTestConfigCacheSvc(t, client.Config().Hosts) + configCacheSvc := NewTestConfigCacheSvc(t, clusterID, client.Config().Hosts) repairSvc, err := repair.NewService( session, diff --git a/pkg/testutils/testconfig/testconfig.go b/pkg/testutils/testconfig/testconfig.go index 040218fdd5..81542779e6 100644 --- a/pkg/testutils/testconfig/testconfig.go +++ b/pkg/testutils/testconfig/testconfig.go @@ -133,7 +133,11 @@ func ScyllaManagerDBCluster() string { // IsSSLEnabled is a helper function to parse SSL_ENABLED env var. // SSL_ENABLED env var indicates if scylla cluster is configured to use ssl or not. func IsSSLEnabled() bool { - sslEnabled, err := strconv.ParseBool(os.Getenv("SSL_ENABLED")) + raw := os.Getenv("SSL_ENABLED") + if raw == "" { + return false + } + sslEnabled, err := strconv.ParseBool(raw) if err != nil { panic("parse SSL_ENABLED env var:" + err.Error()) } diff --git a/pkg/testutils/testhelper/helper.go b/pkg/testutils/testhelper/helper.go index 01a2b115d4..80c7af0047 100644 --- a/pkg/testutils/testhelper/helper.go +++ b/pkg/testutils/testhelper/helper.go @@ -4,6 +4,7 @@ package testhelper import ( "context" + "os" "testing" "time" @@ -12,13 +13,13 @@ import ( "github.com/scylladb/scylla-manager/v3/pkg/config/server" "github.com/scylladb/scylla-manager/v3/pkg/metrics" "github.com/scylladb/scylla-manager/v3/pkg/schema/table" + "github.com/scylladb/scylla-manager/v3/pkg/scyllaclient" "github.com/scylladb/scylla-manager/v3/pkg/service/cluster" "github.com/scylladb/scylla-manager/v3/pkg/service/configcache" "github.com/scylladb/scylla-manager/v3/pkg/store" - - "github.com/scylladb/scylla-manager/v3/pkg/scyllaclient" . "github.com/scylladb/scylla-manager/v3/pkg/testutils" . "github.com/scylladb/scylla-manager/v3/pkg/testutils/db" + "github.com/scylladb/scylla-manager/v3/pkg/testutils/testconfig" "github.com/scylladb/scylla-manager/v3/pkg/util/uuid" ) @@ -76,7 +77,7 @@ func execOnAllHosts(h *CommonTestHelper, cmd string) { // NewTestConfigCacheSvc creates default config cache service which can be used // for testing other services relaying on it. -func NewTestConfigCacheSvc(t *testing.T, hosts []string) configcache.ConfigCacher { +func NewTestConfigCacheSvc(t *testing.T, clusterID uuid.UUID, hosts []string) configcache.ConfigCacher { t.Helper() session := CreateScyllaManagerDBSession(t) @@ -87,6 +88,10 @@ func NewTestConfigCacheSvc(t *testing.T, hosts []string) configcache.ConfigCache if err != nil { t.Fatal(err) } + err = clusterSvc.PutCluster(context.Background(), ValidCluster(t, clusterID, hosts[0])) + if err != nil { + t.Fatal(err) + } scyllaClientProvider := func(context.Context, uuid.UUID) (*scyllaclient.Client, error) { sc := scyllaclient.TestConfig(hosts, AgentAuthToken()) @@ -95,5 +100,38 @@ func NewTestConfigCacheSvc(t *testing.T, hosts []string) configcache.ConfigCache return scyllaclient.NewClient(sc, log.NewDevelopment()) } - return configcache.NewService(configcache.DefaultConfig(), clusterSvc, scyllaClientProvider, secretsStore, log.NewDevelopment()) + svc := configcache.NewService(configcache.DefaultConfig(), clusterSvc, scyllaClientProvider, secretsStore, log.NewDevelopment()) + svc.Init(context.Background()) + return svc +} + +// ValidCluster return Cluster initialized according to test configuration. +func ValidCluster(t *testing.T, id uuid.UUID, host string) *cluster.Cluster { + t.Helper() + + c := &cluster.Cluster{ + ID: id, + Name: "name_" + id.String(), + Host: host, + Port: 10001, + AuthToken: AgentAuthToken(), + Username: testconfig.TestDBUsername(), + Password: testconfig.TestDBPassword(), + } + + if testconfig.IsSSLEnabled() { + sslOpts := testconfig.CQLSSLOptions() + userKey, err := os.ReadFile(sslOpts.KeyPath) + if err != nil { + t.Fatalf("read file (%s) err: %v", sslOpts.KeyPath, err) + } + userCrt, err := os.ReadFile(sslOpts.CertPath) + if err != nil { + t.Fatalf("read file (%s) err: %v", sslOpts.CertPath, err) + } + c.SSLUserKeyFile = userKey + c.SSLUserCertFile = userCrt + } + + return c }