Skip to content

Commit

Permalink
feat: implement FetchFunc
Browse files Browse the repository at this point in the history
  • Loading branch information
agparadiso committed Nov 26, 2024
1 parent 1231f14 commit fc97fac
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 3 deletions.
7 changes: 4 additions & 3 deletions core/services/gateway/handlers/capabilities/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ import (

const (
// NOTE: more methods will go here. HTTP trigger/action/target; etc.
MethodWebAPITarget = "web_api_target"
MethodWebAPITrigger = "web_api_trigger"
MethodComputeAction = "compute_action"
MethodWebAPITarget = "web_api_target"
MethodWebAPITrigger = "web_api_trigger"
MethodComputeAction = "compute_action"
MethodWorkflowSyncer = "workflow_syncer"
)

type handler struct {
Expand Down
59 changes: 59 additions & 0 deletions core/services/workflows/syncer/fetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package syncer

import (
"context"
"encoding/json"
"fmt"
"net/http"
"strings"

"github.com/smartcontractkit/chainlink/v2/core/capabilities/validation"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi"
"github.com/smartcontractkit/chainlink/v2/core/logger"
ghcapabilities "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities"
)

func NewFetcherFunc(
ctx context.Context,
lggr logger.Logger,
och *webapi.OutgoingConnectorHandler,
workflowID, workflowExecutionID string,
idGenerator func() string) FetcherFunc {
return func(ctx context.Context, url string) ([]byte, error) {
if err := validation.ValidateWorkflowOrExecutionID(workflowID); err != nil {
return nil, fmt.Errorf("workflow ID %q is invalid: %w", workflowID, err)
}
if err := validation.ValidateWorkflowOrExecutionID(workflowExecutionID); err != nil {
return nil, fmt.Errorf("workflow execution ID %q is invalid: %w", workflowExecutionID, err)
}

messageID := strings.Join([]string{
workflowID,
workflowExecutionID,
ghcapabilities.MethodWorkflowSyncer,
idGenerator(),
}, "/")

payloadBytes, err := json.Marshal(ghcapabilities.Request{
URL: url,
Method: http.MethodGet,
})
if err != nil {
return nil, fmt.Errorf("failed to marshal fetch request: %w", err)
}

resp, err := och.HandleSingleNodeRequest(ctx, messageID, payloadBytes)
if err != nil {
return nil, err
}

lggr.Debugw("received gateway response", "resp", resp)
var payload ghcapabilities.Response
err = json.Unmarshal(resp.Body.Payload, &payload)
if err != nil {
return nil, err
}

return payload.Body, nil
}
}
101 changes: 101 additions & 0 deletions core/services/workflows/syncer/fetcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package syncer

import (
context "context"
"encoding/json"
"strings"
"testing"

"github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
gcmocks "github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector/mocks"
ghcapabilities "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common"
mock "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

func TestNewFetcherFunc(t *testing.T) {
ctx := context.Background()
lggr := logger.TestLogger(t)

config := webapi.ServiceConfig{
RateLimiter: common.RateLimiterConfig{
GlobalRPS: 100.0,
GlobalBurst: 100,
PerSenderRPS: 100.0,
PerSenderBurst: 100,
},
}

connector := gcmocks.NewGatewayConnector(t)
och, err := webapi.NewOutgoingConnectorHandler(connector, config, ghcapabilities.MethodComputeAction, lggr)
require.NoError(t, err)

workflowID := "15c631d295ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0"
workflowExecutionID := "25c631d295ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce1"
idGenerator := func() string { return "uniqueID" }
url := "http://example.com"

msgID := strings.Join([]string{
workflowID,
workflowExecutionID,
ghcapabilities.MethodWorkflowSyncer,
idGenerator(),
}, "/")

t.Run("OK-valid_request", func(t *testing.T) {
gatewayResp := gatewayResponse(t, msgID)
connector.EXPECT().SignAndSendToGateway(mock.Anything, "gateway1", mock.Anything).Run(func(ctx context.Context, gatewayID string, msg *api.MessageBody) {
och.HandleGatewayMessage(ctx, "gateway1", gatewayResp)
}).Return(nil).Times(1)
connector.EXPECT().DonID().Return("don-id")
connector.EXPECT().GatewayIDs().Return([]string{"gateway1", "gateway2"})

fetcher := NewFetcherFunc(ctx, lggr, och, workflowID, workflowExecutionID, idGenerator)

payload, err := fetcher(ctx, url)
require.NoError(t, err)

expectedPayload := []byte("response body")
require.Equal(t, expectedPayload, payload)
})

t.Run("NOK-invalid_workflow_id", func(t *testing.T) {
invalidWorkflowID := "invalidWorkflowID"
fetcher := NewFetcherFunc(ctx, lggr, och, invalidWorkflowID, workflowExecutionID, idGenerator)

_, err := fetcher(ctx, url)
require.Error(t, err)
require.Contains(t, err.Error(), "workflow ID")
})

t.Run("NOK-invalid_workflow_execution_id", func(t *testing.T) {
invalidWorkflowExecutionID := "invalidWorkflowExecutionID"
fetcher := NewFetcherFunc(ctx, lggr, och, workflowID, invalidWorkflowExecutionID, idGenerator)

_, err := fetcher(ctx, url)
require.Error(t, err)
require.Contains(t, err.Error(), "workflow execution ID")
})
}

func gatewayResponse(t *testing.T, msgID string) *api.Message {
headers := map[string]string{"Content-Type": "application/json"}
body := []byte("response body")
responsePayload, err := json.Marshal(ghcapabilities.Response{
StatusCode: 200,
Headers: headers,
Body: body,
ExecutionError: false,
})
require.NoError(t, err)
return &api.Message{
Body: api.MessageBody{
MessageId: msgID,
Method: ghcapabilities.MethodWebAPITarget,
Payload: responsePayload,
},
}
}

0 comments on commit fc97fac

Please sign in to comment.