From dd6b748e3d0b82662f2960aee2f254041340d73b Mon Sep 17 00:00:00 2001 From: ilija42 <57732589+ilija42@users.noreply.github.com> Date: Mon, 1 Apr 2024 18:56:15 +0200 Subject: [PATCH] Fix InMemoryDataSourceCache cleanup (#12647) * Fix InMemoryDataSourceCache cleanup * Add changeset * Make linter happy * Handle data source cache unit test cleanup * Use services.StopChan in inMemoryDataSourceCache * Move ctxWithTimeout into updateCache Co-authored-by: Jordan Krage --------- Co-authored-by: Jordan Krage --- .changeset/fresh-oranges-brake.md | 5 +++ core/services/ocr2/plugins/median/services.go | 7 +++- core/services/ocrcommon/data_source.go | 42 ++++++++++++++++--- core/services/ocrcommon/data_source_test.go | 5 ++- 4 files changed, 51 insertions(+), 8 deletions(-) create mode 100644 .changeset/fresh-oranges-brake.md diff --git a/.changeset/fresh-oranges-brake.md b/.changeset/fresh-oranges-brake.md new file mode 100644 index 00000000000..52562ee7413 --- /dev/null +++ b/.changeset/fresh-oranges-brake.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +fix jfpc cache cleanup diff --git a/core/services/ocr2/plugins/median/services.go b/core/services/ocr2/plugins/median/services.go index a432045c867..4615f934511 100644 --- a/core/services/ocr2/plugins/median/services.go +++ b/core/services/ocr2/plugins/median/services.go @@ -129,9 +129,12 @@ func NewMedianServices(ctx context.Context, if !pluginConfig.JuelsPerFeeCoinCacheDisabled { lggr.Infof("juelsPerFeeCoin data source caching is enabled") - if juelsPerFeeCoinSource, err = ocrcommon.NewInMemoryDataSourceCache(juelsPerFeeCoinSource, kvStore, pluginConfig.JuelsPerFeeCoinCacheDuration.Duration()); err != nil { - return nil, err + juelsPerFeeCoinSourceCache, err2 := ocrcommon.NewInMemoryDataSourceCache(juelsPerFeeCoinSource, kvStore, pluginConfig.JuelsPerFeeCoinCacheDuration.Duration()) + if err2 != nil { + return nil, err2 } + juelsPerFeeCoinSource = juelsPerFeeCoinSourceCache + srvs = append(srvs, juelsPerFeeCoinSourceCache) } if cmdName := env.MedianPlugin.Cmd.Get(); cmdName != "" { diff --git a/core/services/ocrcommon/data_source.go b/core/services/ocrcommon/data_source.go index f810c8e044d..b80cf9ab00a 100644 --- a/core/services/ocrcommon/data_source.go +++ b/core/services/ocrcommon/data_source.go @@ -12,6 +12,7 @@ import ( "github.com/smartcontractkit/libocr/offchainreporting2/reportingplugin/median" ocr2types "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink/v2/core/bridges" serializablebig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -104,7 +105,13 @@ const defaultCacheFreshness = time.Minute * 5 const defaultCacheFreshnessAlert = time.Hour * 24 const dataSourceCacheKey = "dscache" -func NewInMemoryDataSourceCache(ds median.DataSource, kvStore job.KVStore, cacheFreshness time.Duration) (median.DataSource, error) { +type DataSourceCacheService interface { + Start(context.Context) error + Close() error + median.DataSource +} + +func NewInMemoryDataSourceCache(ds median.DataSource, kvStore job.KVStore, cacheFreshness time.Duration) (DataSourceCacheService, error) { inMemoryDS, ok := ds.(*inMemoryDataSource) if !ok { return nil, errors.Errorf("unsupported data source type: %T, only inMemoryDataSource supported", ds) @@ -118,8 +125,9 @@ func NewInMemoryDataSourceCache(ds median.DataSource, kvStore job.KVStore, cache kvStore: kvStore, cacheFreshness: cacheFreshness, inMemoryDataSource: inMemoryDS, + chStop: make(chan struct{}), + chDone: make(chan struct{}), } - go func() { dsCache.updater() }() return dsCache, nil } @@ -225,21 +233,45 @@ type inMemoryDataSourceCache struct { // Even if updates fail, previous values are returned. cacheFreshness time.Duration mu sync.RWMutex + chStop services.StopChan + chDone chan struct{} latestUpdateErr error latestTrrs pipeline.TaskRunResults latestResult pipeline.FinalResult kvStore job.KVStore } +func (ds *inMemoryDataSourceCache) Start(context.Context) error { + go func() { ds.updater() }() + return nil +} + +func (ds *inMemoryDataSourceCache) Close() error { + close(ds.chStop) + <-ds.chDone + return nil +} + // updater periodically updates data source cache. func (ds *inMemoryDataSourceCache) updater() { ticker := time.NewTicker(ds.cacheFreshness) - for ; true; <-ticker.C { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + updateCache := func() { + ctx, cancel := ds.chStop.CtxCancel(context.WithTimeout(context.Background(), time.Second*10)) + defer cancel() if err := ds.updateCache(ctx); err != nil { ds.lggr.Warnf("failed to update cache, err: %v", err) } - cancel() + } + + updateCache() + for { + select { + case <-ticker.C: + updateCache() + case <-ds.chStop: + close(ds.chDone) + return + } } } diff --git a/core/services/ocrcommon/data_source_test.go b/core/services/ocrcommon/data_source_test.go index 2e1b4f63df7..fa128ef447e 100644 --- a/core/services/ocrcommon/data_source_test.go +++ b/core/services/ocrcommon/data_source_test.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" serializablebig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -78,6 +79,7 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) { mockKVStore.On("Get", mock.Anything, mock.IsType(&ocrcommon.ResultTimePair{})).Return(nil) dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Second*2) require.NoError(t, err) + servicetest.Run(t, dsCache) mockVal := int64(1) // Test if Observe notices that cache updater failed and can refresh the cache on its own @@ -112,12 +114,12 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) { dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Hour*100) require.NoError(t, err) changeResultValue(runner, "-1", true, false) + servicetest.Run(t, dsCache) time.Sleep(time.Millisecond * 100) val, err := dsCache.Observe(testutils.Context(t), types.ReportTimestamp{}) require.NoError(t, err) assert.Equal(t, persistedVal.String(), val.String()) - }) t.Run("test total updater fail with no persisted value ", func(t *testing.T) { @@ -131,6 +133,7 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) { dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Hour*100) require.NoError(t, err) changeResultValue(runner, "-1", true, false) + servicetest.Run(t, dsCache) time.Sleep(time.Millisecond * 100) _, err = dsCache.Observe(testutils.Context(t), types.ReportTimestamp{})