diff --git a/lib/store/cleanup.go b/lib/store/cleanup.go index c85668aa..788c98c3 100644 --- a/lib/store/cleanup.go +++ b/lib/store/cleanup.go @@ -4,7 +4,7 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, @@ -102,19 +102,11 @@ func (m *cleanupManager) addJob(tag string, config CleanupConfig, op base.FileOp ticker := m.clk.Ticker(config.Interval) - usageGauge := m.stats.Tagged(map[string]string{"job": tag}).Gauge("disk_usage") - go func() { for { select { case <-ticker.C: - log.Debugf("Performing cleanup of %s", op) - ttl := m.checkAggressiveCleanup(op, config, diskspaceutil.DiskSpaceUtil) - usage, err := m.scan(op, config.TTI, ttl) - if err != nil { - log.Errorf("Error scanning %s: %s", op, err) - } - usageGauge.Update(float64(usage)) + m.cleanup(tag, config, op) case <-m.stopc: ticker.Stop() return @@ -123,6 +115,24 @@ func (m *cleanupManager) addJob(tag string, config CleanupConfig, op base.FileOp }() } +func (m *cleanupManager) cleanup(tag string, config CleanupConfig, op base.FileOp) { + log.Debugf("Performing cleanup of %s", op) + stats := m.stats.Tagged(map[string]string{"job": tag}) + + usagePercent, err := diskspaceutil.DiskSpaceUtil() + if err != nil { + log.Errorf("Error checking disk space util %s: %s", op, err) + } else { + stats.Gauge("disk_usage_percent").Update(float64(usagePercent)) + } + ttl := m.calculateCleanupTTL(op, config, usagePercent) + usage, err := m.scan(op, config.TTI, ttl) + if err != nil { + log.Errorf("Error scanning %s: %s", op, err) + } + stats.Gauge("disk_usage").Update(float64(usage)) +} + func (m *cleanupManager) stop() { m.stopOnce.Do(func() { close(m.stopc) }) } @@ -174,17 +184,10 @@ func (m *cleanupManager) readyForDeletion( return m.clk.Now().Sub(lat.Time) > tti, nil } -func (m *cleanupManager) checkAggressiveCleanup(op base.FileOp, config CleanupConfig, util diskSpaceUtilFunc) time.Duration { - if config.AggressiveThreshold != 0 { - diskspaceutil, err := util() - if err != nil { - log.Errorf("Error checking disk space util %s: %s", op, err) - return config.TTL - } - if diskspaceutil >= config.AggressiveThreshold { - log.Debugf("Aggressive cleanup of %s triggers with disk space util %d", op, diskspaceutil) - return config.AggressiveTTL - } +func (m *cleanupManager) calculateCleanupTTL(op base.FileOp, config CleanupConfig, usagePercent int) time.Duration { + if usagePercent < config.AggressiveThreshold || config.AggressiveThreshold == 0 { + return config.TTL } - return config.TTL + log.Debugf("Aggressive cleanup of %s triggers with disk space util %d", op, usagePercent) + return config.AggressiveTTL } diff --git a/lib/store/cleanup_test.go b/lib/store/cleanup_test.go index e7f7c76b..beb2a445 100644 --- a/lib/store/cleanup_test.go +++ b/lib/store/cleanup_test.go @@ -4,7 +4,7 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, @@ -14,20 +14,19 @@ package store import ( - "errors" "io/ioutil" + "math/rand" "os" "testing" "time" + "github.com/andres-erbsen/clock" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally" "github.com/uber/kraken/core" "github.com/uber/kraken/lib/store/base" "github.com/uber/kraken/lib/store/metadata" "github.com/uber/kraken/utils/testutil" - - "github.com/andres-erbsen/clock" - "github.com/stretchr/testify/require" - "github.com/uber-go/tally" ) func fileOpFixture(clk clock.Clock) (base.FileState, base.FileOp, func()) { @@ -231,31 +230,46 @@ func TestCleanupManageDiskUsage(t *testing.T) { } func TestCleanupManagerAggressive(t *testing.T) { - require := require.New(t) - - config := CleanupConfig{ + aggressiveOnConfig := CleanupConfig{ AggressiveThreshold: 80, TTL: 10 * time.Second, AggressiveTTL: 5 * time.Second, } + aggressiveOffConfig := CleanupConfig{ + TTL: 10 * time.Second, + } - clk := clock.NewMock() - m, err := newCleanupManager(clk, tally.NoopScope) - require.NoError(err) - defer m.stop() - - _, op, cleanup := fileOpFixture(clk) - defer cleanup() - - require.Equal(m.checkAggressiveCleanup(op, config, func() (int, error) { - return 90, nil - }), 5*time.Second) + for _, tc := range []struct { + config CleanupConfig + usagePercent int + expectedTTL time.Duration + }{ + { + config: aggressiveOnConfig, + usagePercent: aggressiveOnConfig.AggressiveThreshold, + expectedTTL: aggressiveOnConfig.AggressiveTTL, + }, + { + config: aggressiveOnConfig, + usagePercent: aggressiveOnConfig.AggressiveThreshold - 20, + expectedTTL: aggressiveOnConfig.TTL, + }, + { + config: aggressiveOffConfig, + usagePercent: rand.Intn(101), + expectedTTL: aggressiveOffConfig.TTL, + }, + } { + require := require.New(t) + + clk := clock.NewMock() + m, err := newCleanupManager(clk, tally.NoopScope) + require.NoError(err) + defer m.stop() - require.Equal(m.checkAggressiveCleanup(op, config, func() (int, error) { - return 60, nil - }), 10*time.Second) + _, op, cleanup := fileOpFixture(clk) + defer cleanup() - require.Equal(m.checkAggressiveCleanup(op, config, func() (int, error) { - return 0, errors.New("fake error") - }), 10*time.Second) + require.Equal(m.calculateCleanupTTL(op, tc.config, tc.usagePercent), tc.expectedTTL) + } }