diff --git a/.changeset/lemon-ladybugs-doubt.md b/.changeset/lemon-ladybugs-doubt.md new file mode 100644 index 00000000000..d7d1c7a8492 --- /dev/null +++ b/.changeset/lemon-ladybugs-doubt.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Add kv store tied to jobs and use it for juels fee per coin cache to store persisted values for backup diff --git a/core/services/job/kv_orm.go b/core/services/job/kv_orm.go new file mode 100644 index 00000000000..890336b4ec7 --- /dev/null +++ b/core/services/job/kv_orm.go @@ -0,0 +1,68 @@ +package job + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/jmoiron/sqlx" + "github.com/jmoiron/sqlx/types" + + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/pg" +) + +// KVStore is a simple KV store that can store and retrieve serializable data. +// +//go:generate mockery --quiet --name KVStore --output ./mocks/ --case=underscore +type KVStore interface { + Store(key string, val interface{}) error + Get(key string, dest interface{}) error +} + +type kVStore struct { + jobID int32 + q pg.Q + lggr logger.SugaredLogger +} + +var _ KVStore = (*kVStore)(nil) + +func NewKVStore(jobID int32, db *sqlx.DB, cfg pg.QConfig, lggr logger.Logger) kVStore { + namedLogger := logger.Sugared(lggr.Named("JobORM")) + return kVStore{ + jobID: jobID, + q: pg.NewQ(db, namedLogger, cfg), + lggr: namedLogger, + } +} + +// Store saves serializable value by key. +func (kv kVStore) Store(key string, val interface{}) error { + jsonVal, err := json.Marshal(val) + if err != nil { + return err + } + + sql := `INSERT INTO job_kv_store (job_id, key, val) + VALUES ($1, $2, $3) + ON CONFLICT (job_id, key) DO UPDATE SET + val = EXCLUDED.val, + updated_at = $4;` + + if err = kv.q.ExecQ(sql, kv.jobID, key, types.JSONText(jsonVal), time.Now()); err != nil { + return fmt.Errorf("failed to store value: %s for key: %s for jobID: %d : %w", string(jsonVal), key, kv.jobID, err) + } + return nil +} + +// Get retrieves serializable value by key. +func (kv kVStore) Get(key string, dest interface{}) error { + var ret json.RawMessage + sql := "SELECT val FROM job_kv_store WHERE job_id = $1 AND key = $2" + if err := kv.q.Get(&ret, sql, kv.jobID, key); err != nil { + return fmt.Errorf("failed to get value by key: %s for jobID: %d : %w", key, kv.jobID, err) + } + + return json.Unmarshal(ret, dest) +} diff --git a/core/services/job/kv_orm_test.go b/core/services/job/kv_orm_test.go new file mode 100644 index 00000000000..794e27b3c9f --- /dev/null +++ b/core/services/job/kv_orm_test.go @@ -0,0 +1,85 @@ +package job_test + +import ( + "fmt" + "reflect" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/bridges" + "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/directrequest" + "github.com/smartcontractkit/chainlink/v2/core/services/job" + "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" + "github.com/smartcontractkit/chainlink/v2/core/testdata/testspecs" +) + +func TestJobKVStore(t *testing.T) { + config := configtest.NewTestGeneralConfig(t) + db := pgtest.NewSqlxDB(t) + + lggr := logger.TestLogger(t) + + pipelineORM := pipeline.NewORM(db, logger.TestLogger(t), config.Database(), config.JobPipeline().MaxSuccessfulRuns()) + bridgesORM := bridges.NewORM(db, logger.TestLogger(t), config.Database()) + + jobID := int32(1337) + kvStore := job.NewKVStore(jobID, db, config.Database(), lggr) + jobORM := NewTestORM(t, db, pipelineORM, bridgesORM, cltest.NewKeyStore(t, db, config.Database()), config.Database()) + + jb, err := directrequest.ValidatedDirectRequestSpec(testspecs.GetDirectRequestSpec()) + require.NoError(t, err) + jb.ID = jobID + require.NoError(t, jobORM.CreateJob(&jb)) + + type testData struct { + Test string + } + + type nested struct { + Contact testData // Nested struct + } + + values := []interface{}{ + 42, // int + "hello", // string + 3.14, // float64 + true, // bool + []int{1, 2, 3}, // slice of ints + map[string]int{"a": 1, "b": 2}, // map of string to int + testData{Test: "value1"}, // regular struct + nested{testData{"value2"}}, // nested struct + } + + for i, value := range values { + testKey := "test_key_" + fmt.Sprint(i) + require.NoError(t, kvStore.Store(testKey, value)) + + // Get the type of the current value + valueType := reflect.TypeOf(value) + // Create a new instance of the value's type + temp := reflect.New(valueType).Interface() + + require.NoError(t, kvStore.Get(testKey, &temp)) + + tempValue := reflect.ValueOf(temp).Elem().Interface() + require.Equal(t, value, tempValue) + } + + key := "test_key_updating" + td1 := testData{Test: "value1"} + td2 := testData{Test: "value2"} + + var retData testData + require.NoError(t, kvStore.Store(key, td1)) + require.NoError(t, kvStore.Get(key, &retData)) + require.Equal(t, td1, retData) + + require.NoError(t, kvStore.Store(key, td2)) + require.NoError(t, kvStore.Get(key, &retData)) + require.Equal(t, td2, retData) +} diff --git a/core/services/job/mocks/kv_store.go b/core/services/job/mocks/kv_store.go new file mode 100644 index 00000000000..48e4538f606 --- /dev/null +++ b/core/services/job/mocks/kv_store.go @@ -0,0 +1,60 @@ +// Code generated by mockery v2.38.0. DO NOT EDIT. + +package mocks + +import mock "github.com/stretchr/testify/mock" + +// KVStore is an autogenerated mock type for the KVStore type +type KVStore struct { + mock.Mock +} + +// Get provides a mock function with given fields: key, dest +func (_m *KVStore) Get(key string, dest interface{}) error { + ret := _m.Called(key, dest) + + if len(ret) == 0 { + panic("no return value specified for Get") + } + + var r0 error + if rf, ok := ret.Get(0).(func(string, interface{}) error); ok { + r0 = rf(key, dest) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Store provides a mock function with given fields: key, val +func (_m *KVStore) Store(key string, val interface{}) error { + ret := _m.Called(key, val) + + if len(ret) == 0 { + panic("no return value specified for Store") + } + + var r0 error + if rf, ok := ret.Get(0).(func(string, interface{}) error); ok { + r0 = rf(key, val) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewKVStore creates a new instance of KVStore. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewKVStore(t interface { + mock.TestingT + Cleanup(func()) +}) *KVStore { + mock := &KVStore{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index 198b6fc0553..33f0af6db10 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -375,6 +375,8 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, jb job.Job) ([]job.Servi } lggr := logger.Sugared(d.lggr.Named(jb.ExternalJobID.String()).With(lggrCtx.Args()...)) + kvStore := job.NewKVStore(jb.ID, d.db, d.cfg.Database(), lggr) + rid, err := spec.RelayID() if err != nil { return nil, ErrJobSpecNoRelayer{Err: err, PluginName: string(spec.PluginType)} @@ -448,7 +450,7 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, jb job.Job) ([]job.Servi return d.newServicesLLO(ctx, lggr, jb, bootstrapPeers, kb, ocrDB, lc, ocrLogger) case types.Median: - return d.newServicesMedian(ctx, lggr, jb, bootstrapPeers, kb, ocrDB, lc, ocrLogger) + return d.newServicesMedian(ctx, lggr, jb, bootstrapPeers, kb, kvStore, ocrDB, lc, ocrLogger) case types.DKG: return d.newServicesDKG(lggr, jb, bootstrapPeers, kb, ocrDB, lc, ocrLogger) @@ -927,6 +929,7 @@ func (d *Delegate) newServicesMedian( jb job.Job, bootstrapPeers []commontypes.BootstrapperLocator, kb ocr2key.KeyBundle, + kvStore job.KVStore, ocrDB *db, lc ocrtypes.LocalConfig, ocrLogger commontypes.Logger, @@ -962,7 +965,7 @@ func (d *Delegate) newServicesMedian( return nil, ErrRelayNotEnabled{Err: err, PluginName: "median", Relay: spec.Relay} } - medianServices, err2 := median.NewMedianServices(ctx, jb, d.isNewlyCreatedJob, relayer, d.pipelineRunner, lggr, oracleArgsNoPlugin, mConfig, enhancedTelemChan, errorLog) + medianServices, err2 := median.NewMedianServices(ctx, jb, d.isNewlyCreatedJob, relayer, kvStore, d.pipelineRunner, lggr, oracleArgsNoPlugin, mConfig, enhancedTelemChan, errorLog) if ocrcommon.ShouldCollectEnhancedTelemetry(&jb) { enhancedTelemService := ocrcommon.NewEnhancedTelemetryService(&jb, enhancedTelemChan, make(chan struct{}), d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, spec.ContractID, synchronization.EnhancedEA), lggr.Named("EnhancedTelemetry")) diff --git a/core/services/ocr2/plugins/median/services.go b/core/services/ocr2/plugins/median/services.go index 74690565e94..a432045c867 100644 --- a/core/services/ocr2/plugins/median/services.go +++ b/core/services/ocr2/plugins/median/services.go @@ -56,6 +56,7 @@ func NewMedianServices(ctx context.Context, jb job.Job, isNewlyCreatedJob bool, relayer loop.Relayer, + kvStore job.KVStore, pipelineRunner pipeline.Runner, lggr logger.Logger, argsNoPlugin libocr.OCR2OracleArgs, @@ -128,7 +129,7 @@ func NewMedianServices(ctx context.Context, if !pluginConfig.JuelsPerFeeCoinCacheDisabled { lggr.Infof("juelsPerFeeCoin data source caching is enabled") - if juelsPerFeeCoinSource, err = ocrcommon.NewInMemoryDataSourceCache(juelsPerFeeCoinSource, pluginConfig.JuelsPerFeeCoinCacheDuration.Duration()); err != nil { + if juelsPerFeeCoinSource, err = ocrcommon.NewInMemoryDataSourceCache(juelsPerFeeCoinSource, kvStore, pluginConfig.JuelsPerFeeCoinCacheDuration.Duration()); err != nil { return nil, err } } diff --git a/core/services/ocrcommon/data_source.go b/core/services/ocrcommon/data_source.go index 3dd7e4eebc0..011b8d0644d 100644 --- a/core/services/ocrcommon/data_source.go +++ b/core/services/ocrcommon/data_source.go @@ -13,6 +13,7 @@ import ( ocr2types "github.com/smartcontractkit/libocr/offchainreporting2plus/types" "github.com/smartcontractkit/chainlink/v2/core/bridges" + serializablebig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" @@ -99,20 +100,22 @@ func NewInMemoryDataSource(pr pipeline.Runner, jb job.Job, spec pipeline.Spec, l } } -const defaultInMemoryCacheDuration = time.Minute * 5 +const defaultCacheFreshness = time.Minute * 5 +const dataSourceCacheKey = "dscache" -func NewInMemoryDataSourceCache(ds median.DataSource, cacheExpiryDuration time.Duration) (median.DataSource, error) { +func NewInMemoryDataSourceCache(ds median.DataSource, kvStore job.KVStore, cacheFreshness time.Duration) (median.DataSource, error) { inMemoryDS, ok := ds.(*inMemoryDataSource) if !ok { return nil, errors.Errorf("unsupported data source type: %T, only inMemoryDataSource supported", ds) } - if cacheExpiryDuration == 0 { - cacheExpiryDuration = defaultInMemoryCacheDuration + if cacheFreshness == 0 { + cacheFreshness = defaultCacheFreshness } dsCache := &inMemoryDataSourceCache{ - cacheExpiration: cacheExpiryDuration, + kvStore: kvStore, + cacheFreshness: cacheFreshness, inMemoryDataSource: inMemoryDS, } go func() { dsCache.updater() }() @@ -217,20 +220,23 @@ func (ds *inMemoryDataSource) Observe(ctx context.Context, timestamp ocr2types.R // If cache update is overdue Observe defaults to standard inMemoryDataSource behaviour. type inMemoryDataSourceCache struct { *inMemoryDataSource - cacheExpiration time.Duration + // cacheFreshness indicates duration between cache updates. + // Even if updates fail, previous values are returned. + cacheFreshness time.Duration mu sync.RWMutex latestUpdateErr error latestTrrs pipeline.TaskRunResults latestResult pipeline.FinalResult + kvStore job.KVStore } // updater periodically updates data source cache. func (ds *inMemoryDataSourceCache) updater() { - ticker := time.NewTicker(ds.cacheExpiration) + ticker := time.NewTicker(ds.cacheFreshness) for ; true; <-ticker.C { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) if err := ds.updateCache(ctx); err != nil { - ds.lggr.Infow("failed to update cache", "err", err) + ds.lggr.Warnf("failed to update cache", "err", err) } cancel() } @@ -239,15 +245,35 @@ func (ds *inMemoryDataSourceCache) updater() { func (ds *inMemoryDataSourceCache) updateCache(ctx context.Context) error { ds.mu.Lock() defer ds.mu.Unlock() - _, ds.latestTrrs, ds.latestUpdateErr = ds.executeRun(ctx) - if ds.latestUpdateErr != nil { - return errors.Wrapf(ds.latestUpdateErr, "error executing run for spec ID %v", ds.spec.ID) - } else if ds.latestTrrs.FinalResult(ds.lggr).HasErrors() { - ds.latestUpdateErr = errjoin.Join(ds.latestTrrs.FinalResult(ds.lggr).AllErrors...) + + // check for any errors + _, latestTrrs, latestUpdateErr := ds.executeRun(ctx) + if latestTrrs.FinalResult(ds.lggr).HasErrors() { + latestUpdateErr = errjoin.Join(append(latestTrrs.FinalResult(ds.lggr).AllErrors, latestUpdateErr)...) + } + + if latestUpdateErr != nil { + previousUpdateErr := ds.latestUpdateErr + ds.latestUpdateErr = latestUpdateErr + // raise log severity + if previousUpdateErr != nil { + ds.lggr.Errorf("consecutive cache updates errored: previous err: %w new err: %w", previousUpdateErr, ds.latestUpdateErr) + } return errors.Wrapf(ds.latestUpdateErr, "error executing run for spec ID %v", ds.spec.ID) } + ds.latestTrrs = latestTrrs ds.latestResult = ds.latestTrrs.FinalResult(ds.lggr) + value, err := ds.inMemoryDataSource.parse(ds.latestResult) + if err != nil { + return errors.Wrapf(err, "invalid result") + } + + // backup in case data source fails continuously and node gets rebooted + if err = ds.kvStore.Store(dataSourceCacheKey, serializablebig.New(value)); err != nil { + ds.lggr.Errorf("failed to persist latest task run value", err) + } + return nil } @@ -261,7 +287,7 @@ func (ds *inMemoryDataSourceCache) get(ctx context.Context) (pipeline.FinalResul ds.mu.RUnlock() if err := ds.updateCache(ctx); err != nil { - ds.lggr.Errorw("failed to update cache, returning stale result now", "err", err) + ds.lggr.Warnf("failed to update cache, returning stale result now", "err", err) } ds.mu.RLock() @@ -270,7 +296,13 @@ func (ds *inMemoryDataSourceCache) get(ctx context.Context) (pipeline.FinalResul } func (ds *inMemoryDataSourceCache) Observe(ctx context.Context, timestamp ocr2types.ReportTimestamp) (*big.Int, error) { + var val serializablebig.Big latestResult, latestTrrs := ds.get(ctx) + if latestTrrs == nil { + ds.lggr.Errorf("cache is empty, returning persisted value now") + return val.ToInt(), ds.kvStore.Get(dataSourceCacheKey, &val) + } + setEATelemetry(ds.inMemoryDataSource, latestResult, latestTrrs, ObservationTimestamp{ Round: timestamp.Round, Epoch: timestamp.Epoch, diff --git a/core/services/ocrcommon/data_source_test.go b/core/services/ocrcommon/data_source_test.go index 2e4e07973a0..a921bc060ff 100644 --- a/core/services/ocrcommon/data_source_test.go +++ b/core/services/ocrcommon/data_source_test.go @@ -14,9 +14,11 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + serializablebig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/job" + "github.com/smartcontractkit/chainlink/v2/core/services/job/mocks" "github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" pipelinemocks "github.com/smartcontractkit/chainlink/v2/core/services/pipeline/mocks" @@ -47,13 +49,7 @@ func Test_InMemoryDataSource(t *testing.T) { } func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) { - runner := pipelinemocks.NewRunner(t) - - ds := ocrcommon.NewInMemoryDataSource(runner, job.Job{}, pipeline.Spec{}, logger.TestLogger(t)) - dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, time.Second*2) - require.NoError(t, err) - - changeResultValue := func(value string, returnErr, once bool) { + changeResultValue := func(runner *pipelinemocks.Runner, value string, returnErr, once bool) { result := pipeline.Result{ Value: value, Error: nil, @@ -74,23 +70,72 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) { call.Once() } } - - mockVal := int64(1) - // Test if Observe notices that cache updater failed and can refresh the cache on its own - // 1. Set initial value - changeResultValue(fmt.Sprint(mockVal), false, true) - time.Sleep(time.Millisecond * 100) - val, err := dsCache.Observe(testutils.Context(t), types.ReportTimestamp{}) - require.NoError(t, err) - assert.Equal(t, mockVal, val.Int64()) - // 2. Set values again, but make it error in updater - changeResultValue(fmt.Sprint(mockVal+1), true, true) - time.Sleep(time.Second*2 + time.Millisecond*100) - // 3. Set value in between updates and call Observe (shouldn't flake because of huge wait time) - changeResultValue(fmt.Sprint(mockVal+2), false, false) - val, err = dsCache.Observe(testutils.Context(t), types.ReportTimestamp{}) - require.NoError(t, err) - assert.Equal(t, mockVal+2, val.Int64()) + t.Run("test normal cache updater fail recovery", func(t *testing.T) { + runner := pipelinemocks.NewRunner(t) + ds := ocrcommon.NewInMemoryDataSource(runner, job.Job{}, pipeline.Spec{}, logger.TestLogger(t)) + mockKVStore := mocks.KVStore{} + mockKVStore.On("Store", mock.Anything, mock.Anything).Return(nil) + mockKVStore.On("Get", mock.Anything, mock.AnythingOfType("*big.Big")).Return(nil) + dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Second*2) + require.NoError(t, err) + + mockVal := int64(1) + // Test if Observe notices that cache updater failed and can refresh the cache on its own + // 1. Set initial value + changeResultValue(runner, fmt.Sprint(mockVal), false, true) + time.Sleep(time.Millisecond * 100) + val, err := dsCache.Observe(testutils.Context(t), types.ReportTimestamp{}) + require.NoError(t, err) + assert.Equal(t, mockVal, val.Int64()) + // 2. Set values again, but make it error in updater + changeResultValue(runner, fmt.Sprint(mockVal+1), true, true) + time.Sleep(time.Second*2 + time.Millisecond*100) + // 3. Set value in between updates and call Observe (shouldn't flake because of huge wait time) + changeResultValue(runner, fmt.Sprint(mockVal+2), false, false) + val, err = dsCache.Observe(testutils.Context(t), types.ReportTimestamp{}) + require.NoError(t, err) + assert.Equal(t, mockVal+2, val.Int64()) + }) + + t.Run("test total updater fail with persisted value recovery", func(t *testing.T) { + persistedVal := big.NewInt(1337) + runner := pipelinemocks.NewRunner(t) + ds := ocrcommon.NewInMemoryDataSource(runner, job.Job{}, pipeline.Spec{}, logger.TestLogger(t)) + + mockKVStore := mocks.KVStore{} + mockKVStore.On("Get", mock.Anything, mock.AnythingOfType("*big.Big")).Return(nil).Run(func(args mock.Arguments) { + arg := args.Get(1).(*serializablebig.Big) + arg.ToInt().Set(persistedVal) + }) + + // set updater to a long time so that it doesn't log errors after the test is done + dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Hour*100) + require.NoError(t, err) + changeResultValue(runner, "-1", true, false) + + time.Sleep(time.Millisecond * 100) + val, err := dsCache.Observe(testutils.Context(t), types.ReportTimestamp{}) + require.NoError(t, err) + assert.Equal(t, persistedVal.String(), val.String()) + + }) + + t.Run("test total updater fail with no persisted value ", func(t *testing.T) { + runner := pipelinemocks.NewRunner(t) + ds := ocrcommon.NewInMemoryDataSource(runner, job.Job{}, pipeline.Spec{}, logger.TestLogger(t)) + + mockKVStore := mocks.KVStore{} + mockKVStore.On("Get", mock.Anything, mock.AnythingOfType("*big.Big")).Return(nil).Return(assert.AnError) + + // set updater to a long time so that it doesn't log errors after the test is done + dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Hour*100) + require.NoError(t, err) + changeResultValue(runner, "-1", true, false) + + time.Sleep(time.Millisecond * 100) + _, err = dsCache.Observe(testutils.Context(t), types.ReportTimestamp{}) + require.Error(t, err) + }) } func Test_InMemoryDataSourceWithProm(t *testing.T) { diff --git a/core/store/migrate/migrations/0227_kv_store_table.sql b/core/store/migrate/migrations/0227_kv_store_table.sql new file mode 100644 index 00000000000..403e157af44 --- /dev/null +++ b/core/store/migrate/migrations/0227_kv_store_table.sql @@ -0,0 +1,13 @@ +-- +goose Up + +CREATE TABLE job_kv_store ( + job_id INTEGER NOT NULL REFERENCES jobs(id), + key TEXT NOT NULL, + val JSONB NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + PRIMARY KEY (job_id, key) +); + +-- +goose Down +DROP TABLE job_kv_store;