diff --git a/core/services/gateway/handlers/capabilities/handler.go b/core/services/gateway/handlers/capabilities/handler.go index 904a64c8896..90bc2065edd 100644 --- a/core/services/gateway/handlers/capabilities/handler.go +++ b/core/services/gateway/handlers/capabilities/handler.go @@ -20,9 +20,10 @@ import ( const ( // NOTE: more methods will go here. HTTP trigger/action/target; etc. - MethodWebAPITarget = "web_api_target" - MethodWebAPITrigger = "web_api_trigger" - MethodComputeAction = "compute_action" + MethodWebAPITarget = "web_api_target" + MethodWebAPITrigger = "web_api_trigger" + MethodComputeAction = "compute_action" + MethodWorkflowSyncer = "workflow_syncer" ) type handler struct { diff --git a/core/services/workflows/syncer/fetcher.go b/core/services/workflows/syncer/fetcher.go new file mode 100644 index 00000000000..c1ddd3530da --- /dev/null +++ b/core/services/workflows/syncer/fetcher.go @@ -0,0 +1,59 @@ +package syncer + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + + "github.com/smartcontractkit/chainlink/v2/core/capabilities/validation" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi" + "github.com/smartcontractkit/chainlink/v2/core/logger" + ghcapabilities "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities" +) + +func NewFetcherFunc( + ctx context.Context, + lggr logger.Logger, + och *webapi.OutgoingConnectorHandler, + workflowID, workflowExecutionID string, + idGenerator func() string) FetcherFunc { + return func(ctx context.Context, url string) ([]byte, error) { + if err := validation.ValidateWorkflowOrExecutionID(workflowID); err != nil { + return nil, fmt.Errorf("workflow ID %q is invalid: %w", workflowID, err) + } + if err := validation.ValidateWorkflowOrExecutionID(workflowExecutionID); err != nil { + return nil, fmt.Errorf("workflow execution ID %q is invalid: %w", workflowExecutionID, err) + } + + messageID := strings.Join([]string{ + workflowID, + workflowExecutionID, + ghcapabilities.MethodWorkflowSyncer, + idGenerator(), + }, "/") + + payloadBytes, err := json.Marshal(ghcapabilities.Request{ + URL: url, + Method: http.MethodGet, + }) + if err != nil { + return nil, fmt.Errorf("failed to marshal fetch request: %w", err) + } + + resp, err := och.HandleSingleNodeRequest(ctx, messageID, payloadBytes) + if err != nil { + return nil, err + } + + lggr.Debugw("received gateway response", "resp", resp) + var payload ghcapabilities.Response + err = json.Unmarshal(resp.Body.Payload, &payload) + if err != nil { + return nil, err + } + + return payload.Body, nil + } +} diff --git a/core/services/workflows/syncer/fetcher_test.go b/core/services/workflows/syncer/fetcher_test.go new file mode 100644 index 00000000000..808284d67ac --- /dev/null +++ b/core/services/workflows/syncer/fetcher_test.go @@ -0,0 +1,81 @@ +package syncer + +import ( + context "context" + "strings" + "testing" + + "github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/api" + gcmocks "github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector/mocks" + ghcapabilities "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common" + mock "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestNewFetcherFunc(t *testing.T) { + ctx := context.Background() + lggr := logger.TestLogger(t) + + config := webapi.ServiceConfig{ + RateLimiter: common.RateLimiterConfig{ + GlobalRPS: 100.0, + GlobalBurst: 100, + PerSenderRPS: 100.0, + PerSenderBurst: 100, + }, + } + + connector := gcmocks.NewGatewayConnector(t) + och, err := webapi.NewOutgoingConnectorHandler(connector, config, ghcapabilities.MethodComputeAction, lggr) + require.NoError(t, err) + + workflowID := "15c631d295ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0" + workflowExecutionID := "25c631d295ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce1" + idGenerator := func() string { return "uniqueID" } + url := "http://example.com" + + msgID := strings.Join([]string{ + workflowID, + workflowExecutionID, + ghcapabilities.MethodWorkflowSyncer, + idGenerator(), + }, "/") + + t.Run("OK-valid_request", func(t *testing.T) { + gatewayResp := gatewayResponse(t, msgID) + connector.EXPECT().SignAndSendToGateway(mock.Anything, "gateway1", mock.Anything).Run(func(ctx context.Context, gatewayID string, msg *api.MessageBody) { + och.HandleGatewayMessage(ctx, "gateway1", gatewayResp) + }).Return(nil).Times(1) + connector.EXPECT().DonID().Return("don-id") + connector.EXPECT().GatewayIDs().Return([]string{"gateway1", "gateway2"}) + + fetcher := NewFetcherFunc(ctx, lggr, och, workflowID, workflowExecutionID, idGenerator) + + payload, err := fetcher(ctx, url) + require.NoError(t, err) + + expectedPayload := []byte("response body") + require.Equal(t, expectedPayload, payload) + }) + + t.Run("NOK-invalid_workflow_id", func(t *testing.T) { + invalidWorkflowID := "invalidWorkflowID" + fetcher := NewFetcherFunc(ctx, lggr, och, invalidWorkflowID, workflowExecutionID, idGenerator) + + _, err := fetcher(ctx, url) + require.Error(t, err) + require.Contains(t, err.Error(), "workflow ID") + }) + + t.Run("NOK-invalid_workflow_execution_id", func(t *testing.T) { + invalidWorkflowExecutionID := "invalidWorkflowExecutionID" + fetcher := NewFetcherFunc(ctx, lggr, och, workflowID, invalidWorkflowExecutionID, idGenerator) + + _, err := fetcher(ctx, url) + require.Error(t, err) + require.Contains(t, err.Error(), "workflow execution ID") + }) +} diff --git a/core/services/workflows/syncer/workflow_registry_test.go b/core/services/workflows/syncer/workflow_registry_test.go index 652b20deea1..fd8d23890ff 100644 --- a/core/services/workflows/syncer/workflow_registry_test.go +++ b/core/services/workflows/syncer/workflow_registry_test.go @@ -3,6 +3,7 @@ package syncer import ( "context" "encoding/hex" + "encoding/json" "strconv" "testing" "time" @@ -15,6 +16,8 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/api" + ghcapabilities "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities" "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" "github.com/smartcontractkit/chainlink/v2/core/utils/matches" @@ -99,3 +102,22 @@ func Test_Workflow_Registry_Syncer(t *testing.T) { return secrets == wantContents }, 5*time.Second, time.Second) } + +func gatewayResponse(t *testing.T, msgID string) *api.Message { + headers := map[string]string{"Content-Type": "application/json"} + body := []byte("response body") + responsePayload, err := json.Marshal(ghcapabilities.Response{ + StatusCode: 200, + Headers: headers, + Body: body, + ExecutionError: false, + }) + require.NoError(t, err) + return &api.Message{ + Body: api.MessageBody{ + MessageId: msgID, + Method: ghcapabilities.MethodWebAPITarget, + Payload: responsePayload, + }, + } +}