diff --git a/core/capabilities/remote/executable/client.go b/core/capabilities/remote/executable/client.go index 08c773cdb86..9af32eb5f8e 100644 --- a/core/capabilities/remote/executable/client.go +++ b/core/capabilities/remote/executable/client.go @@ -140,40 +140,10 @@ func (c *client) Info(ctx context.Context) (commoncap.CapabilityInfo, error) { } func (c *client) RegisterToWorkflow(ctx context.Context, registerRequest commoncap.RegisterToWorkflowRequest) error { - req, err := request.NewClientRegisterToWorkflowRequest(ctx, c.lggr, registerRequest, c.remoteCapabilityInfo, c.localDONInfo, c.dispatcher, - c.requestTimeout) - - if err != nil { - return fmt.Errorf("failed to create client request: %w", err) - } - - if err = c.sendRequest(req); err != nil { - return fmt.Errorf("failed to send request: %w", err) - } - - resp := <-req.ResponseChan() - if resp.Err != nil { - return fmt.Errorf("error executing request: %w", resp.Err) - } return nil } func (c *client) UnregisterFromWorkflow(ctx context.Context, unregisterRequest commoncap.UnregisterFromWorkflowRequest) error { - req, err := request.NewClientUnregisterFromWorkflowRequest(ctx, c.lggr, unregisterRequest, c.remoteCapabilityInfo, - c.localDONInfo, c.dispatcher, c.requestTimeout) - - if err != nil { - return fmt.Errorf("failed to create client request: %w", err) - } - - if err = c.sendRequest(req); err != nil { - return fmt.Errorf("failed to send request: %w", err) - } - - resp := <-req.ResponseChan() - if resp.Err != nil { - return fmt.Errorf("error executing request: %w", resp.Err) - } return nil } diff --git a/core/capabilities/remote/executable/endtoend_test.go b/core/capabilities/remote/executable/endtoend_test.go index 29f29ed9ee1..376b4d5852f 100644 --- a/core/capabilities/remote/executable/endtoend_test.go +++ b/core/capabilities/remote/executable/endtoend_test.go @@ -26,84 +26,6 @@ import ( p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" ) -func Test_RemoteExecutableCapability_InsufficientCapabilityResponses(t *testing.T) { - ctx := testutils.Context(t) - - responseTest := func(t *testing.T, responseCh commoncap.CapabilityResponse, responseError error) { - assert.NotNil(t, responseError) - } - - capability := &TestCapability{} - - transmissionSchedule, err := values.NewMap(map[string]any{ - "schedule": transmission.Schedule_AllAtOnce, - "deltaStage": "10ms", - }) - require.NoError(t, err) - - var methods []func(ctx context.Context, caller commoncap.ExecutableCapability) - - methods = append(methods, func(ctx context.Context, caller commoncap.ExecutableCapability) { - executeCapability(ctx, t, caller, transmissionSchedule, responseTest) - }) - - methods = append(methods, func(ctx context.Context, caller commoncap.ExecutableCapability) { - registerWorkflow(ctx, t, caller, transmissionSchedule, func(t *testing.T, responseError error) { - require.Error(t, responseError) - }) - }) - - methods = append(methods, func(ctx context.Context, caller commoncap.ExecutableCapability) { - unregisterWorkflow(ctx, t, caller, transmissionSchedule, func(t *testing.T, responseError error) { - require.Error(t, responseError) - }) - }) - - for _, method := range methods { - testRemoteExecutableCapability(ctx, t, capability, 10, 9, 10*time.Millisecond, 10, 10, 10*time.Minute, method) - } -} - -func Test_RemoteExecutableCapability_InsufficientWorkflowRequests(t *testing.T) { - ctx := testutils.Context(t) - - responseTest := func(t *testing.T, responseCh commoncap.CapabilityResponse, responseError error) { - assert.NotNil(t, responseError) - } - - timeOut := 10 * time.Minute - - capability := &TestCapability{} - - transmissionSchedule, err := values.NewMap(map[string]any{ - "schedule": transmission.Schedule_AllAtOnce, - "deltaStage": "10ms", - }) - require.NoError(t, err) - - var methods []func(ctx context.Context, caller commoncap.ExecutableCapability) - - methods = append(methods, func(ctx context.Context, caller commoncap.ExecutableCapability) { - executeCapability(ctx, t, caller, transmissionSchedule, responseTest) - }) - - methods = append(methods, func(ctx context.Context, caller commoncap.ExecutableCapability) { - registerWorkflow(ctx, t, caller, transmissionSchedule, func(t *testing.T, responseError error) { - require.Error(t, responseError) - }) - }) - - methods = append(methods, func(ctx context.Context, caller commoncap.ExecutableCapability) { - unregisterWorkflow(ctx, t, caller, transmissionSchedule, func(t *testing.T, responseError error) { - require.Error(t, responseError) - }) - }) - - for _, method := range methods { - testRemoteExecutableCapability(ctx, t, capability, 10, 10, 10*time.Millisecond, 10, 9, timeOut, method) - } -} - func Test_RemoteExecutableCapability_TransmissionSchedules(t *testing.T) { ctx := testutils.Context(t) @@ -214,18 +136,6 @@ func Test_RemoteExecutionCapability_CapabilityError(t *testing.T) { }) }) - methods = append(methods, func(ctx context.Context, caller commoncap.ExecutableCapability) { - registerWorkflow(ctx, t, caller, transmissionSchedule, func(t *testing.T, responseError error) { - assert.Equal(t, "error executing request: failed to register to workflow: an error", responseError.Error()) - }) - }) - - methods = append(methods, func(ctx context.Context, caller commoncap.ExecutableCapability) { - unregisterWorkflow(ctx, t, caller, transmissionSchedule, func(t *testing.T, responseError error) { - assert.Equal(t, "error executing request: failed to unregister from workflow: an error", responseError.Error()) - }) - }) - for _, method := range methods { testRemoteExecutableCapability(ctx, t, capability, 10, 9, 10*time.Minute, 10, 9, 10*time.Minute, method) } @@ -250,18 +160,6 @@ func Test_RemoteExecutableCapability_RandomCapabilityError(t *testing.T) { }) }) - methods = append(methods, func(ctx context.Context, caller commoncap.ExecutableCapability) { - registerWorkflow(ctx, t, caller, transmissionSchedule, func(t *testing.T, responseError error) { - assert.Equal(t, "error executing request: request expired", responseError.Error()) - }) - }) - - methods = append(methods, func(ctx context.Context, caller commoncap.ExecutableCapability) { - unregisterWorkflow(ctx, t, caller, transmissionSchedule, func(t *testing.T, responseError error) { - assert.Equal(t, "error executing request: request expired", responseError.Error()) - }) - }) - for _, method := range methods { testRemoteExecutableCapability(ctx, t, capability, 10, 9, 10*time.Millisecond, 10, 9, 10*time.Minute, method) diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index b958e171c0c..ecb3ce60510 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -27,7 +27,11 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" ) -const fifteenMinutesMs = 15 * 60 * 1000 +const ( + fifteenMinutesMs = 15 * 60 * 1000 + reservedFieldNameStepTimeout = "cre_step_timeout" + maxStepTimeoutOverrideSec = 10 * 60 // 10 minutes +) type stepRequest struct { stepRef string @@ -769,10 +773,7 @@ func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) { // TODO ks-462 inputs logCustMsg(ctx, cma, "executing step", l) - stepCtx, cancel := context.WithTimeout(ctx, e.stepTimeoutDuration) - defer cancel() - - inputs, outputs, err := e.executeStep(stepCtx, l, msg) + inputs, outputs, err := e.executeStep(ctx, l, msg) var stepStatus string switch { case errors.Is(capabilities.ErrStopExecution, err): @@ -919,6 +920,20 @@ func (e *Engine) executeStep(ctx context.Context, lggr logger.Logger, msg stepRe if err != nil { return nil, nil, err } + stepTimeoutDuration := e.stepTimeoutDuration + if timeoutOverride, ok := config.Underlying[reservedFieldNameStepTimeout]; ok { + var desiredTimeout int64 + err2 := timeoutOverride.UnwrapTo(&desiredTimeout) + if err2 != nil { + e.logger.Warnw("couldn't decode step timeout override, using default", "error", err2, "default", stepTimeoutDuration) + } else { + if desiredTimeout > maxStepTimeoutOverrideSec { + e.logger.Warnw("desired step timeout is too large, limiting to max value", "maxValue", maxStepTimeoutOverrideSec) + desiredTimeout = maxStepTimeoutOverrideSec + } + stepTimeoutDuration = time.Duration(desiredTimeout) * time.Second + } + } tr := capabilities.CapabilityRequest{ Inputs: inputsMap, @@ -934,8 +949,11 @@ func (e *Engine) executeStep(ctx context.Context, lggr logger.Logger, msg stepRe }, } - e.metrics.incrementCapabilityInvocationCounter(ctx) - output, err := step.capability.Execute(ctx, tr) + stepCtx, cancel := context.WithTimeout(ctx, stepTimeoutDuration) + defer cancel() + + e.metrics.incrementCapabilityInvocationCounter(stepCtx) + output, err := step.capability.Execute(stepCtx, tr) if err != nil { return inputsMap, nil, err } diff --git a/core/services/workflows/engine_test.go b/core/services/workflows/engine_test.go index 3a2bc17bc36..df767b8f452 100644 --- a/core/services/workflows/engine_test.go +++ b/core/services/workflows/engine_test.go @@ -86,6 +86,7 @@ targets: address: "0x54e220867af6683aE6DcBF535B4f952cB5116510" params: ["$(report)"] abi: "receive(report bytes)" + cre_step_timeout: 610 ` type testHooks struct {