diff --git a/cmd/influxd/run/startup_logger.go b/cmd/influxd/run/startup_logger.go new file mode 100644 index 00000000000..e471deff07f --- /dev/null +++ b/cmd/influxd/run/startup_logger.go @@ -0,0 +1,32 @@ +package run + +import ( + "fmt" + "sync/atomic" + + "go.uber.org/zap" +) + +type StartupProgressLogger struct { + shardsCompleted atomic.Uint64 + shardsTotal atomic.Uint64 + logger *zap.Logger +} + +func NewStartupProgressLogger(logger *zap.Logger) *StartupProgressLogger { + return &StartupProgressLogger{ + logger: logger, + } +} + +func (s *StartupProgressLogger) AddShard() { + s.shardsTotal.Add(1) +} + +func (s *StartupProgressLogger) CompletedShard() { + shardsCompleted := s.shardsCompleted.Add(1) + totalShards := s.shardsTotal.Load() + + percentShards := float64(shardsCompleted) / float64(totalShards) * 100 + s.logger.Info(fmt.Sprintf("Finished loading shard, current progress %.1f%% shards (%d / %d).", percentShards, shardsCompleted, totalShards)) +} diff --git a/storage/engine.go b/storage/engine.go index 340a96af9d6..59f8aec7370 100644 --- a/storage/engine.go +++ b/storage/engine.go @@ -9,6 +9,7 @@ import ( "time" "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/cmd/influxd/run" "github.com/influxdata/influxdb/v2/influxql/query" "github.com/influxdata/influxdb/v2/kit/platform" errors2 "github.com/influxdata/influxdb/v2/kit/platform/errors" @@ -167,6 +168,9 @@ func (e *Engine) WithLogger(log *zap.Logger) { if e.precreatorService != nil { e.precreatorService.WithLogger(log) } + + sl := run.NewStartupProgressLogger(e.logger) + e.tsdbStore.WithStartupMetrics(sl) } // PrometheusCollectors returns all the prometheus collectors associated with diff --git a/tsdb/store.go b/tsdb/store.go index ca04fce6bae..0391aef91fd 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -50,6 +50,12 @@ const SeriesFileDirectory = "_series" // databaseState keeps track of the state of a database. type databaseState struct{ indexTypes map[string]int } +// struct to hold the result of opening each reader in a goroutine +type shardResponse struct { + s *Shard + err error +} + // addIndexType records that the database has a shard with the given index type. func (d *databaseState) addIndexType(indexType string) { if d.indexTypes == nil { @@ -118,6 +124,11 @@ type Store struct { baseLogger *zap.Logger Logger *zap.Logger + startupProgressMetrics interface { + AddShard() + CompletedShard() + } + closing chan struct{} wg sync.WaitGroup opened bool @@ -148,6 +159,13 @@ func (s *Store) WithLogger(log *zap.Logger) { } } +func (s *Store) WithStartupMetrics(sp interface { + AddShard() + CompletedShard() +}) { + s.startupProgressMetrics = sp +} + // CollectBucketMetrics sets prometheus metrics for each bucket func (s *Store) CollectBucketMetrics() { // Collect all the bucket cardinality estimations @@ -286,12 +304,6 @@ func (s *Store) Open(ctx context.Context) error { } func (s *Store) loadShards(ctx context.Context) error { - // res holds the result from opening each shard in a goroutine - type res struct { - s *Shard - err error - } - // Limit the number of concurrent TSM files to be opened to the number of cores. s.EngineOptions.OpenLimiter = limiter.NewFixed(runtime.GOMAXPROCS(0)) @@ -339,9 +351,8 @@ func (s *Store) loadShards(ctx context.Context) error { log, logEnd := logger.NewOperation(context.TODO(), s.Logger, "Open store", "tsdb_open") defer logEnd() + shardLoaderWg := new(sync.WaitGroup) t := limiter.NewFixed(runtime.GOMAXPROCS(0)) - resC := make(chan *res) - var n int // Determine how many shards we need to open by checking the store path. dbDirs, err := os.ReadDir(s.path) @@ -349,119 +360,150 @@ func (s *Store) loadShards(ctx context.Context) error { return err } - for _, db := range dbDirs { - dbPath := filepath.Join(s.path, db.Name()) - if !db.IsDir() { - log.Info("Skipping database dir", zap.String("name", db.Name()), zap.String("reason", "not a directory")) - continue - } + walkShardsAndProcess := func(fn func(sfile *SeriesFile, sh os.DirEntry, db os.DirEntry, rp os.DirEntry) error) error { + for _, db := range dbDirs { + rpDirs, err := s.getRetentionPolicyDirs(db, log) + if err != nil { + return err + } else if rpDirs == nil { + continue + } - if s.EngineOptions.DatabaseFilter != nil && !s.EngineOptions.DatabaseFilter(db.Name()) { - log.Info("Skipping database dir", logger.Database(db.Name()), zap.String("reason", "failed database filter")) - continue - } + // Load series file. + sfile, err := s.openSeriesFile(db.Name()) + if err != nil { + return err + } - // Load series file. - sfile, err := s.openSeriesFile(db.Name()) - if err != nil { - return err + for _, rp := range rpDirs { + shardDirs, err := s.getShards(rp, db, log) + if err != nil { + return err + } else if shardDirs == nil { + continue + } + + for _, sh := range shardDirs { + // Series file should not be in a retention policy but skip just in case. + if sh.Name() == SeriesFileDirectory { + log.Warn("Skipping series file in retention policy dir", zap.String("path", filepath.Join(s.path, db.Name(), rp.Name()))) + continue + } + + if err := fn(sfile, sh, db, rp); err != nil { + return err + } + } + } } - // Load each retention policy within the database directory. - rpDirs, err := os.ReadDir(dbPath) + return nil + } + + // We use `rawShardCount` as a buffer size for channel creation below. + // If there is no startupProgressMetrics count then this will be 0 creating a + // zero buffer channel. + rawShardCount := 0 + if s.startupProgressMetrics != nil { + err := walkShardsAndProcess(func(sfile *SeriesFile, sh os.DirEntry, db os.DirEntry, rp os.DirEntry) error { + rawShardCount++ + s.startupProgressMetrics.AddShard() + return nil + }) if err != nil { return err } + } - for _, rp := range rpDirs { - rpPath := filepath.Join(s.path, db.Name(), rp.Name()) - if !rp.IsDir() { - log.Info("Skipping retention policy dir", zap.String("name", rp.Name()), zap.String("reason", "not a directory")) - continue - } + shardResC := make(chan *shardResponse, rawShardCount) + err = walkShardsAndProcess(func(sfile *SeriesFile, sh os.DirEntry, db os.DirEntry, rp os.DirEntry) error { + shardLoaderWg.Add(1) - // The .series directory is not a retention policy. - if rp.Name() == SeriesFileDirectory { - continue - } + go func(db, rp, sh string) { + defer shardLoaderWg.Done() + path := filepath.Join(s.path, db, rp, sh) + walPath := filepath.Join(s.EngineOptions.Config.WALDir, db, rp, sh) - if s.EngineOptions.RetentionPolicyFilter != nil && !s.EngineOptions.RetentionPolicyFilter(db.Name(), rp.Name()) { - log.Info("Skipping retention policy dir", logger.RetentionPolicy(rp.Name()), zap.String("reason", "failed retention policy filter")) - continue + if err := t.Take(ctx); err != nil { + log.Error("failed to open shard at path", zap.String("path", path), zap.Error(err)) + shardResC <- &shardResponse{err: fmt.Errorf("failed to open shard at path %q: %w", path, err)} + return } + defer t.Release() - shardDirs, err := os.ReadDir(rpPath) + start := time.Now() + + // Shard file names are numeric shardIDs + shardID, err := strconv.ParseUint(sh, 10, 64) if err != nil { - return err + log.Info("invalid shard ID found at path", zap.String("path", path)) + shardResC <- &shardResponse{err: fmt.Errorf("%s is not a valid ID. Skipping shard.", sh)} + if s.startupProgressMetrics != nil { + s.startupProgressMetrics.CompletedShard() + } + return } - for _, sh := range shardDirs { - // Series file should not be in a retention policy but skip just in case. - if sh.Name() == SeriesFileDirectory { - log.Warn("Skipping series file in retention policy dir", zap.String("path", filepath.Join(s.path, db.Name(), rp.Name()))) - continue + if s.EngineOptions.ShardFilter != nil && !s.EngineOptions.ShardFilter(db, rp, shardID) { + log.Info("skipping shard", zap.String("path", path), logger.Shard(shardID)) + shardResC <- &shardResponse{} + if s.startupProgressMetrics != nil { + s.startupProgressMetrics.CompletedShard() } + return + } - n++ - go func(db, rp, sh string) { - path := filepath.Join(s.path, db, rp, sh) - walPath := filepath.Join(s.EngineOptions.Config.WALDir, db, rp, sh) + // Copy options and assign shared index. + opt := s.EngineOptions - if err := t.Take(ctx); err != nil { - log.Error("failed to open shard at path", zap.String("path", path), zap.Error(err)) - resC <- &res{err: fmt.Errorf("failed to open shard at path %q: %w", path, err)} - return - } - defer t.Release() + // Provide an implementation of the ShardIDSets + opt.SeriesIDSets = shardSet{store: s, db: db} - start := time.Now() + // Open engine. + shard := NewShard(shardID, path, walPath, sfile, opt) - // Shard file names are numeric shardIDs - shardID, err := strconv.ParseUint(sh, 10, 64) - if err != nil { - log.Error("invalid shard ID found at path", zap.String("path", path)) - resC <- &res{err: fmt.Errorf("%s is not a valid ID. Skipping shard", sh)} - return - } + // Disable compactions, writes and queries until all shards are loaded + shard.EnableOnOpen = false + shard.CompactionDisabled = s.EngineOptions.CompactionDisabled + shard.WithLogger(s.baseLogger) - if s.EngineOptions.ShardFilter != nil && !s.EngineOptions.ShardFilter(db, rp, shardID) { - log.Warn("skipping shard", zap.String("path", path), logger.Shard(shardID)) - resC <- &res{} - return - } - - // Copy options and assign shared index. - opt := s.EngineOptions + err = s.OpenShard(ctx, shard, false) + if err != nil { + log.Error("Failed to open shard", logger.Shard(shardID), zap.Error(err)) + shardResC <- &shardResponse{err: fmt.Errorf("failed to open shard: %d: %w", shardID, err)} + if s.startupProgressMetrics != nil { + s.startupProgressMetrics.CompletedShard() + } + return + } - // Provide an implementation of the ShardIDSets - opt.SeriesIDSets = shardSet{store: s, db: db} + shardResC <- &shardResponse{s: shard} + log.Info("Opened shard", zap.String("index_version", shard.IndexType()), zap.String("path", path), zap.Duration("duration", time.Since(start))) + if s.startupProgressMetrics != nil { + s.startupProgressMetrics.CompletedShard() + } + }(db.Name(), rp.Name(), sh.Name()) - // Open engine. - shard := NewShard(shardID, path, walPath, sfile, opt) + return nil + }) + if err != nil { + return err + } - // Disable compactions, writes and queries until all shards are loaded - shard.EnableOnOpen = false - shard.CompactionDisabled = s.EngineOptions.CompactionDisabled - shard.WithLogger(s.baseLogger) + if err := s.enableShards(shardLoaderWg, shardResC); err != nil { + return err + } - err = s.OpenShard(ctx, shard, false) - if err != nil { - log.Error("Failed to open shard", logger.Shard(shardID), zap.Error(err)) - resC <- &res{err: fmt.Errorf("failed to open shard: %d: %w", shardID, err)} - return - } + return nil +} - resC <- &res{s: shard} - log.Info("Opened shard", zap.String("index_version", shard.IndexType()), zap.String("path", path), zap.Duration("duration", time.Since(start))) - }(db.Name(), rp.Name(), sh.Name()) - } - } - } +func (s *Store) enableShards(wg *sync.WaitGroup, resC chan *shardResponse) error { + go func() { + wg.Wait() + close(resC) + }() - // Gather results of opening shards concurrently, keeping track of how - // many databases we are managing. - for i := 0; i < n; i++ { - res := <-resC + for res := range resC { if res.s == nil || res.err != nil { continue } @@ -472,7 +514,6 @@ func (s *Store) loadShards(ctx context.Context) error { } s.databases[res.s.database].addIndexType(res.s.IndexType()) } - close(resC) // Check if any databases are running multiple index types. for db, state := range s.databases { @@ -892,9 +933,7 @@ func (s *Store) DeleteDatabase(name string) error { // no files locally, so nothing to do return nil } - shards := s.filterShards(func(sh *Shard) bool { - return sh.database == name - }) + shards := s.filterShards(byDatabase(name)) s.mu.RUnlock() if err := s.walkShards(shards, func(sh *Shard) error { @@ -956,9 +995,7 @@ func (s *Store) DeleteRetentionPolicy(database, name string) error { // unknown database, nothing to do return nil } - shards := s.filterShards(func(sh *Shard) bool { - return sh.database == database && sh.retentionPolicy == name - }) + shards := s.filterShards(ComposeShardFilter(byDatabase(database), byRetentionPolicy(name))) s.mu.RUnlock() // Close and delete all shards under the retention policy on the @@ -1052,6 +1089,20 @@ func (s *Store) filterShards(fn func(sh *Shard) bool) []*Shard { return shards } +type ShardPredicate = func(sh *Shard) bool + +func ComposeShardFilter(fns ...ShardPredicate) ShardPredicate { + return func(sh *Shard) bool { + for _, fn := range fns { + if !fn(sh) { + return false + } + } + + return true + } +} + // byDatabase provides a predicate for filterShards that matches on the name of // the database passed in. func byDatabase(name string) func(sh *Shard) bool { @@ -1060,16 +1111,20 @@ func byDatabase(name string) func(sh *Shard) bool { } } +// byRetentionPolicy provides a predicate for filterShards that matches on the name of +// the retention policy passed in. +func byRetentionPolicy(name string) ShardPredicate { + return func(sh *Shard) bool { + return sh.retentionPolicy == name + } +} + // walkShards apply a function to each shard in parallel. fn must be safe for // concurrent use. If any of the functions return an error, the first error is // returned. func (s *Store) walkShards(shards []*Shard, fn func(sh *Shard) error) error { - // struct to hold the result of opening each reader in a goroutine - type res struct { - err error - } - resC := make(chan res) + resC := make(chan shardResponse, len(shards)) var n int for _, sh := range shards { @@ -1077,11 +1132,11 @@ func (s *Store) walkShards(shards []*Shard, fn func(sh *Shard) error) error { go func(sh *Shard) { if err := fn(sh); err != nil { - resC <- res{err: fmt.Errorf("shard %d: %s", sh.id, err)} + resC <- shardResponse{err: fmt.Errorf("shard %d: %s", sh.id, err)} return } - resC <- res{} + resC <- shardResponse{} }(sh) } @@ -2192,3 +2247,49 @@ func (s shardSet) ForEach(f func(ids *SeriesIDSet)) error { } return nil } + +func (s *Store) getRetentionPolicyDirs(db os.DirEntry, log *zap.Logger) ([]os.DirEntry, error) { + dbPath := filepath.Join(s.path, db.Name()) + if !db.IsDir() { + log.Info("Skipping database dir", zap.String("name", db.Name()), zap.String("reason", "not a directory")) + return nil, nil + } + + if s.EngineOptions.DatabaseFilter != nil && !s.EngineOptions.DatabaseFilter(db.Name()) { + log.Info("Skipping database dir", logger.Database(db.Name()), zap.String("reason", "failed database filter")) + return nil, nil + } + + // Load each retention policy within the database directory. + rpDirs, err := os.ReadDir(dbPath) + if err != nil { + return nil, err + } + + return rpDirs, nil +} + +func (s *Store) getShards(rpDir os.DirEntry, dbDir os.DirEntry, log *zap.Logger) ([]os.DirEntry, error) { + rpPath := filepath.Join(s.path, dbDir.Name(), rpDir.Name()) + if !rpDir.IsDir() { + log.Info("Skipping retention policy dir", zap.String("name", rpDir.Name()), zap.String("reason", "not a directory")) + return nil, nil + } + + // The .series directory is not a retention policy. + if rpDir.Name() == SeriesFileDirectory { + return nil, nil + } + + if s.EngineOptions.RetentionPolicyFilter != nil && !s.EngineOptions.RetentionPolicyFilter(dbDir.Name(), rpDir.Name()) { + log.Info("Skipping retention policy dir", logger.RetentionPolicy(rpDir.Name()), zap.String("reason", "failed retention policy filter")) + return nil, nil + } + + shardDirs, err := os.ReadDir(rpPath) + if err != nil { + return nil, err + } + + return shardDirs, nil +} diff --git a/tsdb/store_test.go b/tsdb/store_test.go index 88e2373e87e..f0c4cc4515b 100644 --- a/tsdb/store_test.go +++ b/tsdb/store_test.go @@ -145,6 +145,96 @@ func TestStore_CreateShard(t *testing.T) { } } +// Ensure the store can create a new shard. +func TestStore_StartupShardProgress(t *testing.T) { + t.Parallel() + + test := func(index string) { + s := MustOpenStore(t, index) + defer s.Close() + + // Create a new shard and verify that it exists. + require.NoError(t, s.CreateShard(context.Background(), "db0", "rp0", 1, true)) + sh := s.Shard(1) + require.NotNil(t, sh) + + // Create another shard and verify that it exists. + require.NoError(t, s.CreateShard(context.Background(), "db0", "rp0", 2, true)) + sh = s.Shard(2) + require.NotNil(t, sh) + + msl := &mockStartupLogger{} + + // Reopen shard and recheck. + require.NoError(t, s.Reopen(t, WithStartupMetrics(msl))) + + // Equality check to make sure shards are always added prior to + // completion being called. This test opens 3 total shards - 1 shard + // fails, but we still want to track that it was attempted to be opened. + require.Equal(t, msl.shardTracker, []string{ + "shard-add", + "shard-add", + "shard-complete", + "shard-complete", + }) + } + + for _, index := range tsdb.RegisteredIndexes() { + t.Run(index, func(t *testing.T) { test(index) }) + } +} + +// Introduces a test to ensure that shard loading still accounts for bad shards. We still want these to show up +// during the initial shard loading even though its in a error state. +func TestStore_BadShardLoading(t *testing.T) { + t.Parallel() + + test := func(index string) { + s := MustOpenStore(t, index) + defer s.Close() + + // Create a new shard and verify that it exists. + require.NoError(t, s.CreateShard(context.Background(), "db0", "rp0", 1, true)) + sh := s.Shard(1) + require.NotNil(t, sh) + + // Create another shard and verify that it exists. + require.NoError(t, s.CreateShard(context.Background(), "db0", "rp0", 2, true)) + sh = s.Shard(2) + require.NotNil(t, sh) + + // Create another shard and verify that it exists. + require.NoError(t, s.CreateShard(context.Background(), "db0", "rp0", 3, true)) + sh = s.Shard(3) + require.NotNil(t, sh) + + s.SetShardOpenErrorForTest(sh.ID(), errors.New("a shard opening error occurred")) + err2 := s.OpenShard(context.Background(), s.Shard(sh.ID()), false) + require.Error(t, err2, "no error opening bad shard") + + msl := &mockStartupLogger{} + + // Reopen shard and recheck. + require.NoError(t, s.Reopen(t, WithStartupMetrics(msl))) + + // Equality check to make sure shards are always added prior to + // completion being called. This test opens 3 total shards - 1 shard + // fails, but we still want to track that it was attempted to be opened. + require.Equal(t, msl.shardTracker, []string{ + "shard-add", + "shard-add", + "shard-add", + "shard-complete", + "shard-complete", + "shard-complete", + }) + } + + for _, index := range tsdb.RegisteredIndexes() { + t.Run(index, func(t *testing.T) { test(index) }) + } +} + func TestStore_BadShard(t *testing.T) { const errStr = "a shard open error" indexes := tsdb.RegisteredIndexes() @@ -2623,6 +2713,13 @@ func WithWALFlushOnShutdown(flush bool) StoreOption { } } +func WithStartupMetrics(sm *mockStartupLogger) StoreOption { + return func(s *Store) error { + s.WithStartupMetrics(sm) + return nil + } +} + // NewStore returns a new instance of Store with a temporary path. func NewStore(tb testing.TB, index string, opts ...StoreOption) *Store { tb.Helper() @@ -2767,3 +2864,20 @@ func dirExists(path string) bool { } return !os.IsNotExist(err) } + +type mockStartupLogger struct { + shardTracker []string + mu sync.Mutex +} + +func (m *mockStartupLogger) AddShard() { + m.mu.Lock() + m.shardTracker = append(m.shardTracker, "shard-add") + m.mu.Unlock() +} + +func (m *mockStartupLogger) CompletedShard() { + m.mu.Lock() + m.shardTracker = append(m.shardTracker, "shard-complete") + m.mu.Unlock() +}