Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Let other services use config cache #4166

Merged
merged 3 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/cmd/scylla-manager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func (s *server) makeServices(ctx context.Context) error {
metrics.NewRepairMetrics().MustRegister(),
s.clusterSvc.Client,
s.clusterSvc.GetSession,
s.configCacheSvc,
s.logger.Named("repair"),
)
if err != nil {
Expand All @@ -115,6 +116,7 @@ func (s *server) makeServices(ctx context.Context) error {
s.clusterSvc.GetClusterName,
s.clusterSvc.Client,
s.clusterSvc.GetSession,
s.configCacheSvc,
s.logger.Named("backup"),
)
if err != nil {
Expand All @@ -128,6 +130,7 @@ func (s *server) makeServices(ctx context.Context) error {
metrics.NewRestoreMetrics().MustRegister(),
s.clusterSvc.Client,
s.clusterSvc.GetSession,
s.configCacheSvc,
s.logger.Named("restore"),
)
if err != nil {
Expand Down
8 changes: 6 additions & 2 deletions pkg/service/backup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
. "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
"github.com/scylladb/scylla-manager/v3/pkg/service/cluster"
"github.com/scylladb/scylla-manager/v3/pkg/service/configcache"
"github.com/scylladb/scylla-manager/v3/pkg/service/scheduler"
"github.com/scylladb/scylla-manager/v3/pkg/util"
"github.com/scylladb/scylla-manager/v3/pkg/util/inexlist/dcfilter"
Expand All @@ -45,13 +46,15 @@ type Service struct {
clusterName cluster.NameFunc
scyllaClient scyllaclient.ProviderFunc
clusterSession cluster.SessionFunc
configCache configcache.ConfigCacher
logger log.Logger

dth deduplicateTestHooks
}

func NewService(session gocqlx.Session, config Config, metrics metrics.BackupMetrics,
clusterName cluster.NameFunc, scyllaClient scyllaclient.ProviderFunc, clusterSession cluster.SessionFunc, logger log.Logger,
func NewService(session gocqlx.Session, config Config, metrics metrics.BackupMetrics, clusterName cluster.NameFunc,
scyllaClient scyllaclient.ProviderFunc, clusterSession cluster.SessionFunc, configCache configcache.ConfigCacher,
logger log.Logger,
) (*Service, error) {
if session.Session == nil || session.Closed() {
return nil, errors.New("invalid session")
Expand All @@ -76,6 +79,7 @@ func NewService(session gocqlx.Session, config Config, metrics metrics.BackupMet
clusterName: clusterName,
scyllaClient: scyllaClient,
clusterSession: clusterSession,
configCache: configCache,
logger: logger,
}, nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/service/backup/service_backup_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func newTestServiceWithUser(t *testing.T, session gocqlx.Session, client *scylla
}
return CreateManagedClusterSession(t, false, client, user, pass), nil
},
NewTestConfigCacheSvc(t, client.Config().Hosts),
logger.Named("backup"),
)
if err != nil {
Expand Down
20 changes: 20 additions & 0 deletions pkg/service/configcache/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ type ConfigCacher interface {
// or ErrNoHostConfig if config of the particular host doesn't exist.
Read(clusterID uuid.UUID, host string) (NodeConfig, error)

// ReadAll calls Read on all AvailableHosts.
ReadAll(clusterID uuid.UUID) (map[string]NodeConfig, error)

// AvailableHosts returns list of hosts of given cluster that keep their configuration in cache.
AvailableHosts(ctx context.Context, clusterID uuid.UUID) ([]string, error)

Expand Down Expand Up @@ -88,6 +91,23 @@ func (svc *Service) Read(clusterID uuid.UUID, host string) (NodeConfig, error) {
return hostConfig, nil
}

// ReadAll calls Read on AvailableHosts.
func (svc *Service) ReadAll(clusterID uuid.UUID) (map[string]NodeConfig, error) {
clusterConfig, err := svc.readClusterConfig(clusterID)
if err != nil {
return nil, err
}

out := make(map[string]NodeConfig)
clusterConfig.Range(func(key, value any) bool {
host := key.(string)
cfg := value.(NodeConfig)
out[host] = cfg
return true
})
return out, nil
}

// Run starts the infinity loop responsible for updating the clusters configuration periodically.
func (svc *Service) Run(ctx context.Context) {
freq := time.NewTicker(svc.svcConfig.UpdateFrequency)
Expand Down
6 changes: 5 additions & 1 deletion pkg/service/repair/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/scylladb/scylla-manager/v3/pkg/schema/table"
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
"github.com/scylladb/scylla-manager/v3/pkg/service/cluster"
"github.com/scylladb/scylla-manager/v3/pkg/service/configcache"
"github.com/scylladb/scylla-manager/v3/pkg/util"
"github.com/scylladb/scylla-manager/v3/pkg/util/inexlist/dcfilter"
"github.com/scylladb/scylla-manager/v3/pkg/util/inexlist/ksfilter"
Expand All @@ -38,14 +39,16 @@ type Service struct {

scyllaClient scyllaclient.ProviderFunc
clusterSession cluster.SessionFunc
configCache configcache.ConfigCacher
logger log.Logger

intensityHandlers map[uuid.UUID]*intensityParallelHandler
mu sync.Mutex
}

func NewService(session gocqlx.Session, config Config, metrics metrics.RepairMetrics,
scyllaClient scyllaclient.ProviderFunc, clusterSession cluster.SessionFunc, logger log.Logger,
scyllaClient scyllaclient.ProviderFunc, clusterSession cluster.SessionFunc, configCache configcache.ConfigCacher,
logger log.Logger,
) (*Service, error) {
if err := config.Validate(); err != nil {
return nil, errors.Wrap(err, "invalid config")
Expand All @@ -61,6 +64,7 @@ func NewService(session gocqlx.Session, config Config, metrics metrics.RepairMet
metrics: metrics,
scyllaClient: scyllaClient,
clusterSession: clusterSession,
configCache: configCache,
logger: logger,
intensityHandlers: make(map[uuid.UUID]*intensityParallelHandler),
}, nil
Expand Down
2 changes: 2 additions & 0 deletions pkg/service/repair/service_repair_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ func newTestService(t *testing.T, session gocqlx.Session, client *scyllaclient.C
func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) {
return gocqlx.Session{}, errors.New("not implemented")
},
NewTestConfigCacheSvc(t, client.Config().Hosts),
logger.Named("repair"),
)
if err != nil {
Expand All @@ -424,6 +425,7 @@ func newTestServiceWithClusterSession(t *testing.T, session gocqlx.Session, clie
func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) {
return CreateSession(t, client), nil
},
NewTestConfigCacheSvc(t, client.Config().Hosts),
logger.Named("repair"),
)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions pkg/service/restore/helper_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func newBackupSvc(t *testing.T, mgrSession gocqlx.Session, client *scyllaclient.
func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) {
return CreateSession(t, client), nil
},
NewTestConfigCacheSvc(t, client.Config().Hosts),
log.NewDevelopmentWithLevel(zapcore.ErrorLevel).Named("backup"),
)
if err != nil {
Expand All @@ -144,6 +145,8 @@ func newBackupSvc(t *testing.T, mgrSession gocqlx.Session, client *scyllaclient.
}

func newRestoreSvc(t *testing.T, mgrSession gocqlx.Session, client *scyllaclient.Client, user, pass string) *Service {
configCacheSvc := NewTestConfigCacheSvc(t, client.Config().Hosts)

repairSvc, err := repair.NewService(
mgrSession,
repair.DefaultConfig(),
Expand All @@ -154,6 +157,7 @@ func newRestoreSvc(t *testing.T, mgrSession gocqlx.Session, client *scyllaclient
func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) {
return CreateSession(t, client), nil
},
configCacheSvc,
log.NewDevelopmentWithLevel(zapcore.ErrorLevel).Named("repair"),
)
if err != nil {
Expand All @@ -171,6 +175,7 @@ func newRestoreSvc(t *testing.T, mgrSession gocqlx.Session, client *scyllaclient
func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) {
return CreateManagedClusterSession(t, false, client, user, pass), nil
},
configCacheSvc,
log.NewDevelopmentWithLevel(zapcore.InfoLevel).Named("restore"),
)
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion pkg/service/restore/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/scylladb/scylla-manager/v3/pkg/schema/table"
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
"github.com/scylladb/scylla-manager/v3/pkg/service/cluster"
"github.com/scylladb/scylla-manager/v3/pkg/service/configcache"
"github.com/scylladb/scylla-manager/v3/pkg/service/repair"
"github.com/scylladb/scylla-manager/v3/pkg/util/uuid"
)
Expand All @@ -29,11 +30,13 @@ type Service struct {

scyllaClient scyllaclient.ProviderFunc
clusterSession cluster.SessionFunc
configCache configcache.ConfigCacher
logger log.Logger
}

func NewService(repairSvc *repair.Service, session gocqlx.Session, config Config, metrics metrics.RestoreMetrics,
scyllaClient scyllaclient.ProviderFunc, clusterSession cluster.SessionFunc, logger log.Logger,
scyllaClient scyllaclient.ProviderFunc, clusterSession cluster.SessionFunc, configCache configcache.ConfigCacher,
logger log.Logger,
) (*Service, error) {
if session.Session == nil || session.Closed() {
return nil, errors.New("invalid session")
Expand All @@ -53,6 +56,7 @@ func NewService(repairSvc *repair.Service, session gocqlx.Session, config Config
metrics: metrics,
scyllaClient: scyllaClient,
clusterSession: clusterSession,
configCache: configCache,
logger: logger,
}, nil
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/service/restore/service_restore_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ func newTestClient(t *testing.T, hrt *HackableRoundTripper, logger log.Logger, c
func newTestService(t *testing.T, session gocqlx.Session, client *scyllaclient.Client, c Config, logger log.Logger, user, pass string) (*Service, *backup.Service) {
t.Helper()

configCacheSvc := NewTestConfigCacheSvc(t, client.Config().Hosts)

repairSvc, err := repair.NewService(
session,
repair.DefaultConfig(),
Expand All @@ -119,6 +121,7 @@ func newTestService(t *testing.T, session gocqlx.Session, client *scyllaclient.C
func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) {
return CreateManagedClusterSession(t, false, client, user, pass), nil
},
configCacheSvc,
log.NewDevelopmentWithLevel(zapcore.ErrorLevel).Named("repair"),
)
if err != nil {
Expand All @@ -138,6 +141,7 @@ func newTestService(t *testing.T, session gocqlx.Session, client *scyllaclient.C
func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) {
return CreateManagedClusterSession(t, false, client, user, pass), nil
},
configCacheSvc,
log.NewDevelopmentWithLevel(zapcore.ErrorLevel).Named("backup"),
)
if err != nil {
Expand All @@ -155,6 +159,7 @@ func newTestService(t *testing.T, session gocqlx.Session, client *scyllaclient.C
func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) {
return CreateManagedClusterSession(t, false, client, user, pass), nil
},
configCacheSvc,
logger.Named("restore"),
)
if err != nil {
Expand Down
38 changes: 35 additions & 3 deletions pkg/testutils/testhelper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,28 @@ package testhelper
import (
"context"
"testing"
"time"

"github.com/scylladb/go-log"
"github.com/scylladb/gocqlx/v2"
"github.com/scylladb/scylla-manager/v3/pkg/config/server"
"github.com/scylladb/scylla-manager/v3/pkg/metrics"
"github.com/scylladb/scylla-manager/v3/pkg/schema/table"
"github.com/scylladb/scylla-manager/v3/pkg/service/cluster"
"github.com/scylladb/scylla-manager/v3/pkg/service/configcache"
"github.com/scylladb/scylla-manager/v3/pkg/store"

"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
"github.com/scylladb/scylla-manager/v3/pkg/testutils"
. "github.com/scylladb/scylla-manager/v3/pkg/testutils"
. "github.com/scylladb/scylla-manager/v3/pkg/testutils/db"
"github.com/scylladb/scylla-manager/v3/pkg/util/uuid"
)

// CommonTestHelper common tester object for backups and repairs.
type CommonTestHelper struct {
Logger log.Logger
Session gocqlx.Session
Hrt *testutils.HackableRoundTripper
Hrt *HackableRoundTripper
Client *scyllaclient.Client

ClusterID uuid.UUID
Expand Down Expand Up @@ -57,11 +65,35 @@ func (h *CommonTestHelper) RestartAgents() {
func execOnAllHosts(h *CommonTestHelper, cmd string) {
h.T.Helper()
for _, host := range h.GetAllHosts() {
stdout, stderr, err := testutils.ExecOnHost(host, cmd)
stdout, stderr, err := ExecOnHost(host, cmd)
if err != nil {
h.T.Log("stdout", stdout)
h.T.Log("stderr", stderr)
h.T.Fatal("Command failed on host", host, err)
}
}
}

// NewTestConfigCacheSvc creates default config cache service which can be used
// for testing other services relaying on it.
func NewTestConfigCacheSvc(t *testing.T, hosts []string) configcache.ConfigCacher {
t.Helper()

session := CreateScyllaManagerDBSession(t)
secretsStore := store.NewTableStore(session, table.Secrets)

clusterSvc, err := cluster.NewService(session, metrics.NewClusterMetrics(), secretsStore,
scyllaclient.DefaultTimeoutConfig(), server.DefaultConfig().ClientCacheTimeout, log.NewDevelopment())
if err != nil {
t.Fatal(err)
}

scyllaClientProvider := func(context.Context, uuid.UUID) (*scyllaclient.Client, error) {
sc := scyllaclient.TestConfig(hosts, AgentAuthToken())
sc.Timeout = time.Second
sc.Transport = NewHackableRoundTripper(scyllaclient.DefaultTransport())
return scyllaclient.NewClient(sc, log.NewDevelopment())
}

return configcache.NewService(configcache.DefaultConfig(), clusterSvc, scyllaClientProvider, secretsStore, log.NewDevelopment())
}
Loading