diff --git a/tsdb/store.go b/tsdb/store.go index 0391aef91fd..1a96016edd3 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -88,7 +88,12 @@ func (se *shardErrorMap) setShardOpenError(shardID uint64, err error) { if err == nil { delete(se.shardErrors, shardID) } else { - se.shardErrors[shardID] = &ErrPreviousShardFail{error: fmt.Errorf("opening shard previously failed with: %w", err)} + // Ignore incoming error if it is from a previous open failure. We don't want to keep + // re-wrapping the same error. For safety, make sure we have an ErrPreviousShardFail in + // case we hadn't recorded it. + if !errors.Is(err, ErrPreviousShardFail{}) || !errors.Is(se.shardErrors[shardID], ErrPreviousShardFail{}) { + se.shardErrors[shardID] = &ErrPreviousShardFail{error: fmt.Errorf("opening shard previously failed with: %w", err)} + } } } @@ -303,6 +308,142 @@ func (s *Store) Open(ctx context.Context) error { return nil } +// generateTrailingPath returns the last part of a shard path or WAL path +// based on the shardID, db, and rp. +func (s *Store) generateTrailingPath(shardID uint64, db, rp string) string { + return filepath.Join(db, rp, strconv.FormatUint(shardID, 10)) +} + +// generatePath returns the path to a shard based on its db, rp, and shardID. +func (s *Store) generatePath(shardID uint64, db, rp string) string { + return filepath.Join(s.path, s.generateTrailingPath(shardID, db, rp)) +} + +// generateWALPath returns the WAL path to a shard based on its db, rp, and shardID. +func (s *Store) generateWALPath(shardID uint64, db, rp string) string { + return filepath.Join(s.EngineOptions.Config.WALDir, s.generateTrailingPath(shardID, db, rp)) +} + +// shardLoader is an independent object that can load shards from disk in a thread-safe manner. +// It should be created with Store.newShardLoader. The result of shardLoader.Load should then +// be registered with Store.registerShard. +type shardLoader struct { + // NOTE: shardLoader should not directly reference the Store that creates it or any of its fields. + + shardID uint64 + db string + rp string + sfile *SeriesFile + engineOpts EngineOptions + enabled bool + logger *zap.Logger + + // Shard we are working with. Could be created by the loader or given by client code. + shard *Shard + + // Should be loaded even if loading failed previously? + force bool + + // Path to shard on disk + path string + + // Path to WAL on disk + walPath string + + // loadErr indicates if Load should fail immediately with an error. + loadErr error +} + +// Load loads a shard from disk in a thread-safe manner. After a call to Load, +// the result must be registered with Store.registerShard, whether or not an error +// occurred. The returned shard is guaranteed to not be nil and have the correct shard ID, +// although it will not be properly loaded if there was an error. +func (l *shardLoader) Load(ctx context.Context) *shardResponse { + // Open engine. + if l.shard == nil { + l.shard = NewShard(l.shardID, l.path, l.walPath, l.sfile, l.engineOpts) + + // Set options based on caller preferences. + l.shard.EnableOnOpen = l.enabled + l.shard.CompactionDisabled = l.engineOpts.CompactionDisabled + l.shard.WithLogger(l.logger) + } + + err := func() error { + // Stop and return error if previous open failed. + if l.loadErr != nil { + return l.loadErr + } + + // Open the shard. + return l.shard.Open(ctx) + }() + + return &shardResponse{s: l.shard, err: err} +} + +type shardLoaderOption func(*shardLoader) + +// withForceLoad allows forcing shard opens even if a previous load failed with an error. +func withForceLoad(force bool) shardLoaderOption { + return func(l *shardLoader) { + l.force = force + } +} + +// withExistingShard uses an existing Shard already registered with Store instead +// of creating a new one. +func withExistingShard(shard *Shard) shardLoaderOption { + return func(l *shardLoader) { + l.shard = shard + } +} + +// newShardLoader generates a shardLoader that can be used to load a shard in a +// thread-safe manner. The result of the shardLoader.Load() must then be +// populated into s using Store.registerShard. +// s.mu must be held before calling newShardLoader. newShardLoader is not thread-safe. +// Note that any errors detected during newShardLoader will not be returned to caller until +// Load is called. This is to simplify error handling for client code. +func (s *Store) newShardLoader(shardID uint64, db, rp string, enabled bool, opts ...shardLoaderOption) *shardLoader { + l := &shardLoader{ + shardID: shardID, + db: db, + rp: rp, + engineOpts: s.EngineOptions, + enabled: enabled, + logger: s.baseLogger, + + path: s.generatePath(shardID, db, rp), + walPath: s.generateWALPath(shardID, db, rp), + } + + for _, o := range opts { + o(l) + } + + // Check for error from last load attempt. + lastErr, _ := s.badShards.shardError(shardID) + if lastErr != nil && !l.force { + l.loadErr = fmt.Errorf("not attempting to open shard %d; %w", shardID, lastErr) + return l + } + + // Provide an implementation of the ShardIDSets + l.engineOpts.SeriesIDSets = shardSet{store: s, db: db} + + // Retrieve cached series file or load it if not cached in s. + sfile, err := s.openSeriesFile(db) + if err != nil { + l.loadErr = fmt.Errorf("error loading series file for database %q: %w", db, err) + return l + } + l.sfile = sfile + + return l +} + +// loadShards loads all shards on disk. s.mu must be held before calling loadShards. func (s *Store) loadShards(ctx context.Context) error { // Limit the number of concurrent TSM files to be opened to the number of cores. s.EngineOptions.OpenLimiter = limiter.NewFixed(runtime.GOMAXPROCS(0)) @@ -351,162 +492,90 @@ func (s *Store) loadShards(ctx context.Context) error { log, logEnd := logger.NewOperation(context.TODO(), s.Logger, "Open store", "tsdb_open") defer logEnd() - shardLoaderWg := new(sync.WaitGroup) t := limiter.NewFixed(runtime.GOMAXPROCS(0)) - // Determine how many shards we need to open by checking the store path. - dbDirs, err := os.ReadDir(s.path) + // Get list of shards and their db / rp. + shards, err := s.findShards(log) if err != nil { - return err + return fmt.Errorf("error while finding shards to load: %w", err) } - walkShardsAndProcess := func(fn func(sfile *SeriesFile, sh os.DirEntry, db os.DirEntry, rp os.DirEntry) error) error { - for _, db := range dbDirs { - rpDirs, err := s.getRetentionPolicyDirs(db, log) - if err != nil { - return err - } else if rpDirs == nil { - continue - } - - // Load series file. - sfile, err := s.openSeriesFile(db.Name()) - if err != nil { - return err - } - - for _, rp := range rpDirs { - shardDirs, err := s.getShards(rp, db, log) - if err != nil { - return err - } else if shardDirs == nil { - continue - } - - for _, sh := range shardDirs { - // Series file should not be in a retention policy but skip just in case. - if sh.Name() == SeriesFileDirectory { - log.Warn("Skipping series file in retention policy dir", zap.String("path", filepath.Join(s.path, db.Name(), rp.Name()))) - continue - } - - if err := fn(sfile, sh, db, rp); err != nil { - return err - } - } - } - } - - return nil - } - - // We use `rawShardCount` as a buffer size for channel creation below. - // If there is no startupProgressMetrics count then this will be 0 creating a - // zero buffer channel. - rawShardCount := 0 + // Setup progress metrics. if s.startupProgressMetrics != nil { - err := walkShardsAndProcess(func(sfile *SeriesFile, sh os.DirEntry, db os.DirEntry, rp os.DirEntry) error { - rawShardCount++ + for _, _ = range shards { s.startupProgressMetrics.AddShard() - return nil - }) - if err != nil { - return err } } - shardResC := make(chan *shardResponse, rawShardCount) - err = walkShardsAndProcess(func(sfile *SeriesFile, sh os.DirEntry, db os.DirEntry, rp os.DirEntry) error { - shardLoaderWg.Add(1) + // Do the actual work of loading shards. + shardResC := make(chan *shardResponse, len(shards)) + pendingShardCount := 0 + for _, sh := range shards { + pendingShardCount++ - go func(db, rp, sh string) { - defer shardLoaderWg.Done() - path := filepath.Join(s.path, db, rp, sh) - walPath := filepath.Join(s.EngineOptions.Config.WALDir, db, rp, sh) + // loader must be created serially for thread-safety, then they can be used in parallel manner. + loader := s.newShardLoader(sh.id, sh.db, sh.rp, false) - if err := t.Take(ctx); err != nil { - log.Error("failed to open shard at path", zap.String("path", path), zap.Error(err)) - shardResC <- &shardResponse{err: fmt.Errorf("failed to open shard at path %q: %w", path, err)} - return - } + // Now perform the actual loading in parallel in separate goroutines. + go func() { + t.Take(ctx) defer t.Release() + log := log.With(logger.Shard(sh.id), zap.String("path", loader.path)) start := time.Now() - - // Shard file names are numeric shardIDs - shardID, err := strconv.ParseUint(sh, 10, 64) - if err != nil { - log.Info("invalid shard ID found at path", zap.String("path", path)) - shardResC <- &shardResponse{err: fmt.Errorf("%s is not a valid ID. Skipping shard.", sh)} - if s.startupProgressMetrics != nil { - s.startupProgressMetrics.CompletedShard() - } - return - } - - if s.EngineOptions.ShardFilter != nil && !s.EngineOptions.ShardFilter(db, rp, shardID) { - log.Info("skipping shard", zap.String("path", path), logger.Shard(shardID)) - shardResC <- &shardResponse{} - if s.startupProgressMetrics != nil { - s.startupProgressMetrics.CompletedShard() - } - return - } - - // Copy options and assign shared index. - opt := s.EngineOptions - - // Provide an implementation of the ShardIDSets - opt.SeriesIDSets = shardSet{store: s, db: db} - - // Open engine. - shard := NewShard(shardID, path, walPath, sfile, opt) - - // Disable compactions, writes and queries until all shards are loaded - shard.EnableOnOpen = false - shard.CompactionDisabled = s.EngineOptions.CompactionDisabled - shard.WithLogger(s.baseLogger) - - err = s.OpenShard(ctx, shard, false) - if err != nil { - log.Error("Failed to open shard", logger.Shard(shardID), zap.Error(err)) - shardResC <- &shardResponse{err: fmt.Errorf("failed to open shard: %d: %w", shardID, err)} - if s.startupProgressMetrics != nil { - s.startupProgressMetrics.CompletedShard() - } - return + res := loader.Load(ctx) + if res.err != nil { + log.Info("Opened shard", zap.String("index_version", res.s.IndexType()), zap.Duration("duration", time.Since(start))) + } else { + log.Error("Failed to open shard", zap.Error(res.err)) } - shardResC <- &shardResponse{s: shard} - log.Info("Opened shard", zap.String("index_version", shard.IndexType()), zap.String("path", path), zap.Duration("duration", time.Since(start))) + shardResC <- res if s.startupProgressMetrics != nil { s.startupProgressMetrics.CompletedShard() } - }(db.Name(), rp.Name(), sh.Name()) + }() + } - return nil - }) - if err != nil { - return err + // Register shards serially as the parallel goroutines finish opening them. + for finishedShardCount := 0; finishedShardCount < pendingShardCount; finishedShardCount++ { + res := <-shardResC + s.registerShard(res) } - if err := s.enableShards(shardLoaderWg, shardResC); err != nil { - return err + // Check and log if any databases are running multiple index types. + s.warnMixedIndexTypes() + + // Enable all shards + for _, sh := range s.shards { + sh.SetEnabled(true) + if isIdle, _ := sh.IsIdle(); isIdle { + if err := sh.Free(); err != nil { + return err + } + } } return nil } -func (s *Store) enableShards(wg *sync.WaitGroup, resC chan *shardResponse) error { - go func() { - wg.Wait() - close(resC) - }() +// registerShard registers a shardResponse from a shardLoader.Load operation with s. +// registerShard should always be called with the result of shardLoader.Load, even if +// the shard loading failed. This makes sure errors opening shards are properly tracked. +// s.mu should be held before calling registerShard. registerShard is not thread-safe and +// and should not be used in a paralell manner. +func (s *Store) registerShard(res *shardResponse) { + if res.s == nil { + s.Logger.Error("registerShard called with nil") + return + } + if res.err != nil { + s.badShards.setShardOpenError(res.s.ID(), res.err) + return + } - for res := range resC { - if res.s == nil || res.err != nil { - continue - } + // Avoid registering an already registered shard. + if s.shards[res.s.id] != res.s { s.shards[res.s.id] = res.s s.epochs[res.s.id] = newEpochTracker() if _, ok := s.databases[res.s.database]; !ok { @@ -514,9 +583,25 @@ func (s *Store) enableShards(wg *sync.WaitGroup, resC chan *shardResponse) error } s.databases[res.s.database].addIndexType(res.s.IndexType()) } +} + +// warnMixedIndexTypes checks the databases listed in dbList for mixed +// index types and logs warnings if any are found. If no dbList is given, then +// all databases in s are checked. +func (s *Store) warnMixedIndexTypes(dbList ...string) { + var dbStates map[string]*databaseState + if len(dbList) == 0 { + dbStates = s.databases + } else { + dbStates = make(map[string]*databaseState) + for _, db := range dbList { + if state, ok := s.databases[db]; ok { + dbStates[db] = state + } + } - // Check if any databases are running multiple index types. - for db, state := range s.databases { + } + for db, state := range dbStates { if state.hasMultipleIndexTypes() { var fields []zapcore.Field for idx, cnt := range state.indexTypes { @@ -525,18 +610,6 @@ func (s *Store) enableShards(wg *sync.WaitGroup, resC chan *shardResponse) error s.Logger.Warn("Mixed shard index types", append(fields, logger.Database(db))...) } } - - // Enable all shards - for _, sh := range s.shards { - sh.SetEnabled(true) - if isIdle, _ := sh.IsIdle(); isIdle { - if err := sh.Free(); err != nil { - return err - } - } - } - - return nil } // Close closes the store and all associated shards. After calling Close accessing @@ -646,18 +719,20 @@ func (e ErrPreviousShardFail) Error() string { return e.error.Error() } -func (s *Store) OpenShard(ctx context.Context, sh *Shard, force bool) error { +func (s *Store) ReopenShard(ctx context.Context, shardID uint64, force bool) error { + sh := s.Shard(shardID) if sh == nil { - return errors.New("cannot open nil shard") - } - oldErr, bad := s.badShards.shardError(sh.ID()) - if force || !bad { - err := sh.Open(ctx) - s.badShards.setShardOpenError(sh.ID(), err) - return err - } else { - return fmt.Errorf("not attempting to open shard %d; %w", sh.ID(), oldErr) + return ErrShardNotFound } + + s.mu.Lock() + defer s.mu.Unlock() + + loader := s.newShardLoader(shardID, "", "", true, withExistingShard(sh), withForceLoad(force)) + res := loader.Load(ctx) + s.registerShard(res) + + return res.err } func (s *Store) SetShardOpenErrorForTest(shardID uint64, err error) { @@ -735,40 +810,11 @@ func (s *Store) CreateShard(ctx context.Context, database, retentionPolicy strin return err } - // Retrieve database series file. - sfile, err := s.openSeriesFile(database) - if err != nil { - return err - } - - // Copy index options and pass in shared index. - opt := s.EngineOptions - opt.SeriesIDSets = shardSet{store: s, db: database} - - path := filepath.Join(s.path, database, retentionPolicy, strconv.FormatUint(shardID, 10)) - shard := NewShard(shardID, path, walPath, sfile, opt) - shard.WithLogger(s.baseLogger) - shard.EnableOnOpen = enabled - - if err := s.OpenShard(ctx, shard, false); err != nil { - return err - } - - s.shards[shardID] = shard - s.epochs[shardID] = newEpochTracker() - if _, ok := s.databases[database]; !ok { - s.databases[database] = new(databaseState) - } - s.databases[database].addIndexType(shard.IndexType()) - if state := s.databases[database]; state.hasMultipleIndexTypes() { - var fields []zapcore.Field - for idx, cnt := range state.indexTypes { - fields = append(fields, zap.Int(fmt.Sprintf("%s_count", idx), cnt)) - } - s.Logger.Warn("Mixed shard index types", append(fields, logger.Database(database))...) - } - - return nil + loader := s.newShardLoader(shardID, database, retentionPolicy, enabled) + res := loader.Load(ctx) + s.registerShard(res) + s.warnMixedIndexTypes(database) + return res.err } // CreateShardSnapShot will create a hard link to the underlying shard and return a path. @@ -2248,6 +2294,68 @@ func (s shardSet) ForEach(f func(ids *SeriesIDSet)) error { return nil } +type shardInfo struct { + id uint64 + db string + rp string +} + +// findShards returns a list of all shards and their db / rp that are found +// in s.path. +func (s *Store) findShards(log *zap.Logger) ([]shardInfo, error) { + var shards []shardInfo + + // Determine how many shards we need to open by checking the store path. + dbDirs, err := os.ReadDir(s.path) + if err != nil { + return nil, err + } + + for _, db := range dbDirs { + rpDirs, err := s.getRetentionPolicyDirs(db, log) + if err != nil { + return nil, err + } else if rpDirs == nil { + continue + } + + for _, rp := range rpDirs { + shardDirs, err := s.getShards(rp, db, log) + if err != nil { + return nil, err + } else if shardDirs == nil { + continue + } + + for _, sh := range shardDirs { + fullPath := filepath.Join(s.path, db.Name(), rp.Name()) + + // Series file should not be in a retention policy but skip just in case. + if sh.Name() == SeriesFileDirectory { + log.Warn("Skipping series file in retention policy dir", zap.String("path", fullPath)) + continue + } + + // Shard file names are numeric shardIDs + shardID, err := strconv.ParseUint(sh.Name(), 10, 64) + if err != nil { + log.Warn("invalid shard ID found at path", zap.String("path", fullPath)) + continue + } + + if s.EngineOptions.ShardFilter != nil && !s.EngineOptions.ShardFilter(db.Name(), rp.Name(), shardID) { + log.Info("skipping shard", zap.String("path", fullPath), logger.Shard(shardID)) + continue + } + + shards = append(shards, shardInfo{id: shardID, db: db.Name(), rp: rp.Name()}) + } + } + } + + return shards, nil +} + func (s *Store) getRetentionPolicyDirs(db os.DirEntry, log *zap.Logger) ([]os.DirEntry, error) { dbPath := filepath.Join(s.path, db.Name()) if !db.IsDir() { diff --git a/tsdb/store_test.go b/tsdb/store_test.go index f0c4cc4515b..755a1e07324 100644 --- a/tsdb/store_test.go +++ b/tsdb/store_test.go @@ -19,14 +19,13 @@ import ( "testing" "time" - "github.com/influxdata/influxdb/v2/predicate" - "github.com/davecgh/go-spew/spew" "github.com/influxdata/influxdb/v2/influxql/query" "github.com/influxdata/influxdb/v2/internal" "github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxdb/v2/pkg/deep" "github.com/influxdata/influxdb/v2/pkg/slices" + "github.com/influxdata/influxdb/v2/predicate" "github.com/influxdata/influxdb/v2/tsdb" "github.com/influxdata/influxql" "github.com/stretchr/testify/require" @@ -171,7 +170,7 @@ func TestStore_StartupShardProgress(t *testing.T) { // Equality check to make sure shards are always added prior to // completion being called. This test opens 3 total shards - 1 shard // fails, but we still want to track that it was attempted to be opened. - require.Equal(t, msl.shardTracker, []string{ + require.Equal(t, msl.Tracked(), []string{ "shard-add", "shard-add", "shard-complete", @@ -209,7 +208,7 @@ func TestStore_BadShardLoading(t *testing.T) { require.NotNil(t, sh) s.SetShardOpenErrorForTest(sh.ID(), errors.New("a shard opening error occurred")) - err2 := s.OpenShard(context.Background(), s.Shard(sh.ID()), false) + err2 := s.ReopenShard(context.Background(), sh.ID(), false) require.Error(t, err2, "no error opening bad shard") msl := &mockStartupLogger{} @@ -220,7 +219,7 @@ func TestStore_BadShardLoading(t *testing.T) { // Equality check to make sure shards are always added prior to // completion being called. This test opens 3 total shards - 1 shard // fails, but we still want to track that it was attempted to be opened. - require.Equal(t, msl.shardTracker, []string{ + require.Equal(t, msl.Tracked(), []string{ "shard-add", "shard-add", "shard-add", @@ -241,24 +240,30 @@ func TestStore_BadShard(t *testing.T) { for _, idx := range indexes { func() { s := MustOpenStore(t, idx) - defer require.NoErrorf(t, s.Close(), "closing store with index type: %s", idx) + defer func() { + require.NoErrorf(t, s.Close(), "closing store with index type: %s", idx) + }() - sh := tsdb.NewTempShard(t, idx) - shId := sh.ID() - err := s.OpenShard(context.Background(), sh.Shard, false) + var shId uint64 = 1 + require.NoError(t, s.CreateShard(context.Background(), "db0", "rp0", shId, true)) + err := s.ReopenShard(context.Background(), shId, false) require.NoError(t, err, "opening temp shard") - require.NoError(t, sh.Close(), "closing temporary shard") expErr := errors.New(errStr) - s.SetShardOpenErrorForTest(sh.ID(), expErr) - err2 := s.OpenShard(context.Background(), sh.Shard, false) + s.SetShardOpenErrorForTest(shId, expErr) + err2 := s.ReopenShard(context.Background(), shId, false) + require.Error(t, err2, "no error opening bad shard") + require.True(t, errors.Is(err2, tsdb.ErrPreviousShardFail{}), "exp: ErrPreviousShardFail, got: %v", err2) + require.EqualError(t, err2, fmt.Errorf("not attempting to open shard %d; opening shard previously failed with: %w", shId, expErr).Error()) + + // make sure we didn't modify the shard open error when we tried to reopen it + err2 = s.ReopenShard(context.Background(), shId, false) require.Error(t, err2, "no error opening bad shard") require.True(t, errors.Is(err2, tsdb.ErrPreviousShardFail{}), "exp: ErrPreviousShardFail, got: %v", err2) require.EqualError(t, err2, fmt.Errorf("not attempting to open shard %d; opening shard previously failed with: %w", shId, expErr).Error()) // This should succeed with the force (and because opening an open shard automatically succeeds) - require.NoError(t, s.OpenShard(context.Background(), sh.Shard, true), "forced re-opening previously failing shard") - require.NoError(t, sh.Close()) + require.NoError(t, s.ReopenShard(context.Background(), shId, true), "forced re-opening previously failing shard") }() } } @@ -2195,7 +2200,7 @@ func TestStore_MeasurementNames_ConcurrentDropShard(t *testing.T) { return } time.Sleep(500 * time.Microsecond) - if err := s.OpenShard(context.Background(), sh, false); err != nil { + if err := s.ReopenShard(context.Background(), sh.ID(), false); err != nil { errC <- err return } @@ -2280,7 +2285,7 @@ func TestStore_TagKeys_ConcurrentDropShard(t *testing.T) { return } time.Sleep(500 * time.Microsecond) - if err := s.OpenShard(context.Background(), sh, false); err != nil { + if err := s.ReopenShard(context.Background(), sh.ID(), false); err != nil { errC <- err return } @@ -2371,7 +2376,7 @@ func TestStore_TagValues_ConcurrentDropShard(t *testing.T) { return } time.Sleep(500 * time.Microsecond) - if err := s.OpenShard(context.Background(), sh, false); err != nil { + if err := s.ReopenShard(context.Background(), sh.ID(), false); err != nil { errC <- err return } @@ -2866,18 +2871,29 @@ func dirExists(path string) bool { } type mockStartupLogger struct { - shardTracker []string - mu sync.Mutex + // mu protects all following members. + mu sync.Mutex + + _shardTracker []string } func (m *mockStartupLogger) AddShard() { m.mu.Lock() - m.shardTracker = append(m.shardTracker, "shard-add") + m._shardTracker = append(m._shardTracker, fmt.Sprintf("shard-add")) m.mu.Unlock() } func (m *mockStartupLogger) CompletedShard() { m.mu.Lock() - m.shardTracker = append(m.shardTracker, "shard-complete") + m._shardTracker = append(m._shardTracker, fmt.Sprintf("shard-complete")) m.mu.Unlock() } + +func (m *mockStartupLogger) Tracked() []string { + m.mu.Lock() + defer m.mu.Unlock() + + tracked := make([]string, len(m._shardTracker)) + copy(tracked, m._shardTracker) + return tracked +}