Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Let other services use config cache #4166

Merged
merged 3 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/cmd/scylla-manager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func (s *server) makeServices(ctx context.Context) error {
metrics.NewRepairMetrics().MustRegister(),
s.clusterSvc.Client,
s.clusterSvc.GetSession,
s.configCacheSvc,
s.logger.Named("repair"),
)
if err != nil {
Expand All @@ -115,6 +116,7 @@ func (s *server) makeServices(ctx context.Context) error {
s.clusterSvc.GetClusterName,
s.clusterSvc.Client,
s.clusterSvc.GetSession,
s.configCacheSvc,
s.logger.Named("backup"),
)
if err != nil {
Expand All @@ -128,6 +130,7 @@ func (s *server) makeServices(ctx context.Context) error {
metrics.NewRestoreMetrics().MustRegister(),
s.clusterSvc.Client,
s.clusterSvc.GetSession,
s.configCacheSvc,
s.logger.Named("restore"),
)
if err != nil {
Expand Down
8 changes: 6 additions & 2 deletions pkg/service/backup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
. "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
"github.com/scylladb/scylla-manager/v3/pkg/service/cluster"
"github.com/scylladb/scylla-manager/v3/pkg/service/configcache"
"github.com/scylladb/scylla-manager/v3/pkg/service/scheduler"
"github.com/scylladb/scylla-manager/v3/pkg/util"
"github.com/scylladb/scylla-manager/v3/pkg/util/inexlist/dcfilter"
Expand All @@ -45,13 +46,15 @@ type Service struct {
clusterName cluster.NameFunc
scyllaClient scyllaclient.ProviderFunc
clusterSession cluster.SessionFunc
configCache configcache.ConfigCacher
logger log.Logger

dth deduplicateTestHooks
}

func NewService(session gocqlx.Session, config Config, metrics metrics.BackupMetrics,
clusterName cluster.NameFunc, scyllaClient scyllaclient.ProviderFunc, clusterSession cluster.SessionFunc, logger log.Logger,
func NewService(session gocqlx.Session, config Config, metrics metrics.BackupMetrics, clusterName cluster.NameFunc,
scyllaClient scyllaclient.ProviderFunc, clusterSession cluster.SessionFunc, configCache configcache.ConfigCacher,
logger log.Logger,
) (*Service, error) {
if session.Session == nil || session.Closed() {
return nil, errors.New("invalid session")
Expand All @@ -76,6 +79,7 @@ func NewService(session gocqlx.Session, config Config, metrics metrics.BackupMet
clusterName: clusterName,
scyllaClient: scyllaClient,
clusterSession: clusterSession,
configCache: configCache,
logger: logger,
}, nil
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/service/backup/service_backup_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ func newBackupTestHelperWithUser(t *testing.T, session gocqlx.Session, config ba
S3InitBucket(t, location.Path)

clusterID := uuid.MustRandom()

logger := log.NewDevelopmentWithLevel(zapcore.InfoLevel)

hrt := NewHackableRoundTripper(scyllaclient.DefaultTransport())
client := newTestClient(t, hrt, logger.Named("client"), clientConf)
service := newTestServiceWithUser(t, session, client, config, logger, user, pass)
service := newTestServiceWithUser(t, session, client, config, logger, clusterID, user, pass)
cHelper := &CommonTestHelper{
Session: session,
Hrt: hrt,
Expand Down Expand Up @@ -108,7 +108,7 @@ func newTestClient(t *testing.T, hrt *HackableRoundTripper, logger log.Logger, c
return c
}

func newTestServiceWithUser(t *testing.T, session gocqlx.Session, client *scyllaclient.Client, c backup.Config, logger log.Logger, user, pass string) *backup.Service {
func newTestServiceWithUser(t *testing.T, session gocqlx.Session, client *scyllaclient.Client, c backup.Config, logger log.Logger, clusterID uuid.UUID, user, pass string) *backup.Service {
t.Helper()

s, err := backup.NewService(
Expand All @@ -127,6 +127,7 @@ func newTestServiceWithUser(t *testing.T, session gocqlx.Session, client *scylla
}
return CreateManagedClusterSession(t, false, client, user, pass), nil
},
NewTestConfigCacheSvc(t, clusterID, client.Config().Hosts),
logger.Named("backup"),
)
if err != nil {
Expand Down
20 changes: 20 additions & 0 deletions pkg/service/configcache/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ type ConfigCacher interface {
// or ErrNoHostConfig if config of the particular host doesn't exist.
Read(clusterID uuid.UUID, host string) (NodeConfig, error)

// ReadAll calls Read on all AvailableHosts.
ReadAll(clusterID uuid.UUID) (map[string]NodeConfig, error)

// AvailableHosts returns list of hosts of given cluster that keep their configuration in cache.
AvailableHosts(ctx context.Context, clusterID uuid.UUID) ([]string, error)

Expand Down Expand Up @@ -88,6 +91,23 @@ func (svc *Service) Read(clusterID uuid.UUID, host string) (NodeConfig, error) {
return hostConfig, nil
}

// ReadAll calls Read on AvailableHosts.
func (svc *Service) ReadAll(clusterID uuid.UUID) (map[string]NodeConfig, error) {
clusterConfig, err := svc.readClusterConfig(clusterID)
if err != nil {
return nil, err
}

out := make(map[string]NodeConfig)
clusterConfig.Range(func(key, value any) bool {
host := key.(string)
cfg := value.(NodeConfig)
out[host] = cfg
return true
})
return out, nil
}

// Run starts the infinity loop responsible for updating the clusters configuration periodically.
func (svc *Service) Run(ctx context.Context) {
freq := time.NewTicker(svc.svcConfig.UpdateFrequency)
Expand Down
6 changes: 5 additions & 1 deletion pkg/service/repair/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/scylladb/scylla-manager/v3/pkg/schema/table"
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
"github.com/scylladb/scylla-manager/v3/pkg/service/cluster"
"github.com/scylladb/scylla-manager/v3/pkg/service/configcache"
"github.com/scylladb/scylla-manager/v3/pkg/util"
"github.com/scylladb/scylla-manager/v3/pkg/util/inexlist/dcfilter"
"github.com/scylladb/scylla-manager/v3/pkg/util/inexlist/ksfilter"
Expand All @@ -38,14 +39,16 @@ type Service struct {

scyllaClient scyllaclient.ProviderFunc
clusterSession cluster.SessionFunc
configCache configcache.ConfigCacher
logger log.Logger

intensityHandlers map[uuid.UUID]*intensityParallelHandler
mu sync.Mutex
}

func NewService(session gocqlx.Session, config Config, metrics metrics.RepairMetrics,
scyllaClient scyllaclient.ProviderFunc, clusterSession cluster.SessionFunc, logger log.Logger,
scyllaClient scyllaclient.ProviderFunc, clusterSession cluster.SessionFunc, configCache configcache.ConfigCacher,
logger log.Logger,
) (*Service, error) {
if err := config.Validate(); err != nil {
return nil, errors.Wrap(err, "invalid config")
Expand All @@ -61,6 +64,7 @@ func NewService(session gocqlx.Session, config Config, metrics metrics.RepairMet
metrics: metrics,
scyllaClient: scyllaClient,
clusterSession: clusterSession,
configCache: configCache,
logger: logger,
intensityHandlers: make(map[uuid.UUID]*intensityParallelHandler),
}, nil
Expand Down
16 changes: 10 additions & 6 deletions pkg/service/repair/service_repair_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,21 @@ type repairTestHelper struct {
func newRepairTestHelper(t *testing.T, session gocqlx.Session, config repair.Config) *repairTestHelper {
t.Helper()

clusterID := uuid.MustRandom()
logger := log.NewDevelopmentWithLevel(zapcore.InfoLevel)

hrt := NewHackableRoundTripper(scyllaclient.DefaultTransport())
hrt.SetInterceptor(repairInterceptor(scyllaclient.CommandSuccessful))
c := newTestClient(t, hrt, log.NopLogger)
s := newTestService(t, session, c, config, logger)
s := newTestService(t, session, c, config, logger, clusterID)

return &repairTestHelper{
CommonTestHelper: &CommonTestHelper{
Logger: logger,
Session: session,
Hrt: hrt,
Client: c,
ClusterID: uuid.MustRandom(),
ClusterID: clusterID,
TaskID: uuid.MustRandom(),
RunID: uuid.NewTime(),
T: t,
Expand All @@ -86,16 +87,17 @@ func newRepairWithClusterSessionTestHelper(t *testing.T, session gocqlx.Session,
hrt *HackableRoundTripper, c *scyllaclient.Client, config repair.Config) *repairTestHelper {
t.Helper()

clusterID := uuid.MustRandom()
logger := log.NewDevelopmentWithLevel(zapcore.InfoLevel)
s := newTestServiceWithClusterSession(t, session, c, config, logger)
s := newTestServiceWithClusterSession(t, session, c, config, logger, clusterID)

return &repairTestHelper{
CommonTestHelper: &CommonTestHelper{
Logger: logger,
Session: session,
Hrt: hrt,
Client: c,
ClusterID: uuid.MustRandom(),
ClusterID: clusterID,
TaskID: uuid.MustRandom(),
RunID: uuid.NewTime(),
T: t,
Expand Down Expand Up @@ -389,7 +391,7 @@ func newTestClient(t *testing.T, hrt *HackableRoundTripper, logger log.Logger) *
return c
}

func newTestService(t *testing.T, session gocqlx.Session, client *scyllaclient.Client, c repair.Config, logger log.Logger) *repair.Service {
func newTestService(t *testing.T, session gocqlx.Session, client *scyllaclient.Client, c repair.Config, logger log.Logger, clusterID uuid.UUID) *repair.Service {
t.Helper()

s, err := repair.NewService(
Expand All @@ -402,6 +404,7 @@ func newTestService(t *testing.T, session gocqlx.Session, client *scyllaclient.C
func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) {
return gocqlx.Session{}, errors.New("not implemented")
},
NewTestConfigCacheSvc(t, clusterID, client.Config().Hosts),
logger.Named("repair"),
)
if err != nil {
Expand All @@ -411,7 +414,7 @@ func newTestService(t *testing.T, session gocqlx.Session, client *scyllaclient.C
return s
}

func newTestServiceWithClusterSession(t *testing.T, session gocqlx.Session, client *scyllaclient.Client, c repair.Config, logger log.Logger) *repair.Service {
func newTestServiceWithClusterSession(t *testing.T, session gocqlx.Session, client *scyllaclient.Client, c repair.Config, logger log.Logger, clusterID uuid.UUID) *repair.Service {
t.Helper()

s, err := repair.NewService(
Expand All @@ -424,6 +427,7 @@ func newTestServiceWithClusterSession(t *testing.T, session gocqlx.Session, clie
func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) {
return CreateSession(t, client), nil
},
NewTestConfigCacheSvc(t, clusterID, client.Config().Hosts),
logger.Named("repair"),
)
if err != nil {
Expand Down
13 changes: 9 additions & 4 deletions pkg/service/restore/helper_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,15 @@ func newTestHelper(t *testing.T, srcHosts, dstHosts []string) *testHelper {

return &testHelper{
srcCluster: srcCluster,
srcBackupSvc: newBackupSvc(t, srcCluster.Session, srcCluster.Client),
srcBackupSvc: newBackupSvc(t, srcCluster.Session, srcCluster.Client, srcCluster.ClusterID),
dstCluster: dstCluster,
dstRestoreSvc: newRestoreSvc(t, dstCluster.Session, dstCluster.Client, user, pass),
dstRestoreSvc: newRestoreSvc(t, dstCluster.Session, dstCluster.Client, dstCluster.ClusterID, user, pass),
dstUser: user,
dstPass: pass,
}
}

func newBackupSvc(t *testing.T, mgrSession gocqlx.Session, client *scyllaclient.Client) *backup.Service {
func newBackupSvc(t *testing.T, mgrSession gocqlx.Session, client *scyllaclient.Client, clusterID uuid.UUID) *backup.Service {
svc, err := backup.NewService(
mgrSession,
defaultBackupTestConfig(),
Expand All @@ -135,6 +135,7 @@ func newBackupSvc(t *testing.T, mgrSession gocqlx.Session, client *scyllaclient.
func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) {
return CreateSession(t, client), nil
},
NewTestConfigCacheSvc(t, clusterID, client.Config().Hosts),
log.NewDevelopmentWithLevel(zapcore.ErrorLevel).Named("backup"),
)
if err != nil {
Expand All @@ -143,7 +144,9 @@ func newBackupSvc(t *testing.T, mgrSession gocqlx.Session, client *scyllaclient.
return svc
}

func newRestoreSvc(t *testing.T, mgrSession gocqlx.Session, client *scyllaclient.Client, user, pass string) *Service {
func newRestoreSvc(t *testing.T, mgrSession gocqlx.Session, client *scyllaclient.Client, clusterID uuid.UUID, user, pass string) *Service {
configCacheSvc := NewTestConfigCacheSvc(t, clusterID, client.Config().Hosts)

repairSvc, err := repair.NewService(
mgrSession,
repair.DefaultConfig(),
Expand All @@ -154,6 +157,7 @@ func newRestoreSvc(t *testing.T, mgrSession gocqlx.Session, client *scyllaclient
func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) {
return CreateSession(t, client), nil
},
configCacheSvc,
log.NewDevelopmentWithLevel(zapcore.ErrorLevel).Named("repair"),
)
if err != nil {
Expand All @@ -171,6 +175,7 @@ func newRestoreSvc(t *testing.T, mgrSession gocqlx.Session, client *scyllaclient
func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) {
return CreateManagedClusterSession(t, false, client, user, pass), nil
},
configCacheSvc,
log.NewDevelopmentWithLevel(zapcore.InfoLevel).Named("restore"),
)
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion pkg/service/restore/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/scylladb/scylla-manager/v3/pkg/schema/table"
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
"github.com/scylladb/scylla-manager/v3/pkg/service/cluster"
"github.com/scylladb/scylla-manager/v3/pkg/service/configcache"
"github.com/scylladb/scylla-manager/v3/pkg/service/repair"
"github.com/scylladb/scylla-manager/v3/pkg/util/uuid"
)
Expand All @@ -29,11 +30,13 @@ type Service struct {

scyllaClient scyllaclient.ProviderFunc
clusterSession cluster.SessionFunc
configCache configcache.ConfigCacher
logger log.Logger
}

func NewService(repairSvc *repair.Service, session gocqlx.Session, config Config, metrics metrics.RestoreMetrics,
scyllaClient scyllaclient.ProviderFunc, clusterSession cluster.SessionFunc, logger log.Logger,
scyllaClient scyllaclient.ProviderFunc, clusterSession cluster.SessionFunc, configCache configcache.ConfigCacher,
logger log.Logger,
) (*Service, error) {
if session.Session == nil || session.Closed() {
return nil, errors.New("invalid session")
Expand All @@ -53,6 +56,7 @@ func NewService(repairSvc *repair.Service, session gocqlx.Session, config Config
metrics: metrics,
scyllaClient: scyllaClient,
clusterSession: clusterSession,
configCache: configCache,
logger: logger,
}, nil
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/service/restore/service_restore_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func newRestoreTestHelper(t *testing.T, session gocqlx.Session, config Config, l
logger := log.NewDevelopmentWithLevel(zapcore.InfoLevel)
hrt := NewHackableRoundTripper(scyllaclient.DefaultTransport())
client := newTestClient(t, hrt, logger.Named("client"), clientConf)
service, backupSvc := newTestService(t, session, client, config, logger, user, pass)
service, backupSvc := newTestService(t, session, client, config, logger, clusterID, user, pass)
cHelper := &CommonTestHelper{
Session: session,
Hrt: hrt,
Expand Down Expand Up @@ -106,9 +106,11 @@ func newTestClient(t *testing.T, hrt *HackableRoundTripper, logger log.Logger, c
return c
}

func newTestService(t *testing.T, session gocqlx.Session, client *scyllaclient.Client, c Config, logger log.Logger, user, pass string) (*Service, *backup.Service) {
func newTestService(t *testing.T, session gocqlx.Session, client *scyllaclient.Client, c Config, logger log.Logger, clusterID uuid.UUID, user, pass string) (*Service, *backup.Service) {
t.Helper()

configCacheSvc := NewTestConfigCacheSvc(t, clusterID, client.Config().Hosts)

repairSvc, err := repair.NewService(
session,
repair.DefaultConfig(),
Expand All @@ -119,6 +121,7 @@ func newTestService(t *testing.T, session gocqlx.Session, client *scyllaclient.C
func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) {
return CreateManagedClusterSession(t, false, client, user, pass), nil
},
configCacheSvc,
log.NewDevelopmentWithLevel(zapcore.ErrorLevel).Named("repair"),
)
if err != nil {
Expand All @@ -138,6 +141,7 @@ func newTestService(t *testing.T, session gocqlx.Session, client *scyllaclient.C
func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) {
return CreateManagedClusterSession(t, false, client, user, pass), nil
},
configCacheSvc,
log.NewDevelopmentWithLevel(zapcore.ErrorLevel).Named("backup"),
)
if err != nil {
Expand All @@ -155,6 +159,7 @@ func newTestService(t *testing.T, session gocqlx.Session, client *scyllaclient.C
func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) {
return CreateManagedClusterSession(t, false, client, user, pass), nil
},
configCacheSvc,
logger.Named("restore"),
)
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion pkg/testutils/testconfig/testconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,11 @@ func ScyllaManagerDBCluster() string {
// IsSSLEnabled is a helper function to parse SSL_ENABLED env var.
// SSL_ENABLED env var indicates if scylla cluster is configured to use ssl or not.
func IsSSLEnabled() bool {
sslEnabled, err := strconv.ParseBool(os.Getenv("SSL_ENABLED"))
raw := os.Getenv("SSL_ENABLED")
if raw == "" {
return false
}
sslEnabled, err := strconv.ParseBool(raw)
if err != nil {
panic("parse SSL_ENABLED env var:" + err.Error())
}
Expand Down
Loading
Loading