diff --git a/Makefile b/Makefile index 126841ab..d5da9d03 100644 --- a/Makefile +++ b/Makefile @@ -39,3 +39,4 @@ genmocks: mockgen -source=chains/evm/transactor/transact.go -destination=./mock/transact.go -package mock mockgen -source=chains/evm/transactor/signAndSend/signAndSend.go -destination=./mock/signAndSend.go -package mock mockgen -source=./store/store.go -destination=./mock/store.go -package mock + mockgen -source=./relayer/message/handler.go -destination=./mock/message.go -package mock diff --git a/chains/evm/chain.go b/chains/evm/chain.go index 121ba39b..f8ac774e 100644 --- a/chains/evm/chain.go +++ b/chains/evm/chain.go @@ -7,8 +7,8 @@ import ( "context" "math/big" - "github.com/ChainSafe/sygma-core/store" - "github.com/ChainSafe/sygma-core/types" + "github.com/ChainSafe/sygma-core/relayer/message" + "github.com/ChainSafe/sygma-core/relayer/proposal" "github.com/rs/zerolog" "github.com/rs/zerolog/log" ) @@ -18,14 +18,18 @@ type EventListener interface { } type ProposalExecutor interface { - Execute(messages []*types.Message) error + Execute(props []*proposal.Proposal) error +} + +type MessageHandler interface { + HandleMessage(m *message.Message) (*proposal.Proposal, error) } // EVMChain is struct that aggregates all data required for type EVMChain struct { - listener EventListener - executor ProposalExecutor - blockstore *store.BlockStore + listener EventListener + executor ProposalExecutor + messageHandler MessageHandler domainID uint8 startBlock *big.Int @@ -33,14 +37,14 @@ type EVMChain struct { logger zerolog.Logger } -func NewEVMChain(listener EventListener, executor ProposalExecutor, blockstore *store.BlockStore, domainID uint8, startBlock *big.Int) *EVMChain { +func NewEVMChain(listener EventListener, messageHandler MessageHandler, executor ProposalExecutor, domainID uint8, startBlock *big.Int) *EVMChain { return &EVMChain{ - listener: listener, - executor: executor, - blockstore: blockstore, - domainID: domainID, - startBlock: startBlock, - logger: log.With().Uint8("domainID", domainID).Logger(), + listener: listener, + executor: executor, + domainID: domainID, + startBlock: startBlock, + messageHandler: messageHandler, + logger: log.With().Uint8("domainID", domainID).Logger(), } } @@ -51,10 +55,14 @@ func (c *EVMChain) PollEvents(ctx context.Context) { go c.listener.ListenToEvents(ctx, c.startBlock) } -func (c *EVMChain) Write(msgs []*types.Message) error { - err := c.executor.Execute(msgs) +func (c *EVMChain) ReceiveMessage(m *message.Message) (*proposal.Proposal, error) { + return c.messageHandler.HandleMessage(m) +} + +func (c *EVMChain) Write(props []*proposal.Proposal) error { + err := c.executor.Execute(props) if err != nil { - c.logger.Err(err).Msgf("error writing messages %+v on network %d", msgs, c.DomainID()) + c.logger.Err(err).Msgf("error writing proposals %+v on network %d", props, c.DomainID()) return err } diff --git a/chains/evm/executor/message-handler.go b/chains/evm/executor/message-handler.go deleted file mode 100644 index 6803a945..00000000 --- a/chains/evm/executor/message-handler.go +++ /dev/null @@ -1,71 +0,0 @@ -package executor - -import ( - "fmt" - - "github.com/ChainSafe/sygma-core/types" - "github.com/ethereum/go-ethereum/common" - "github.com/rs/zerolog/log" -) - -type HandlerMatcher interface { - GetHandlerAddressForResourceID(resourceID types.ResourceID) (common.Address, error) - ContractAddress() *common.Address -} - -type MessageHandlerFunc func(m *types.Message, handlerAddr, bridgeAddress common.Address) (*types.Proposal, error) - -// NewEVMMessageHandler creates an instance of EVMMessageHandler that contains -// message handler functions for converting deposit message into a chain specific -// proposal -func NewEVMMessageHandler(handlerMatcher HandlerMatcher) *EVMMessageHandler { - return &EVMMessageHandler{ - handlerMatcher: handlerMatcher, - } -} - -type EVMMessageHandler struct { - handlerMatcher HandlerMatcher - handlers map[common.Address]MessageHandlerFunc -} - -func (mh *EVMMessageHandler) HandleMessage(m *types.Message) (*types.Proposal, error) { - // Matching resource ID with handler. - addr, err := mh.handlerMatcher.GetHandlerAddressForResourceID(m.ResourceId) - if err != nil { - return nil, err - } - // Based on handler that registered on BridgeContract - handleMessage, err := mh.MatchAddressWithHandlerFunc(addr) - if err != nil { - return nil, err - } - log.Info().Str("type", string(m.Type)).Uint8("src", m.Source).Uint8("dst", m.Destination).Uint64("nonce", m.DepositNonce).Str("resourceID", fmt.Sprintf("%x", m.ResourceId)).Msg("Handling new message") - prop, err := handleMessage(m, addr, *mh.handlerMatcher.ContractAddress()) - if err != nil { - return nil, err - } - return prop, nil -} - -func (mh *EVMMessageHandler) MatchAddressWithHandlerFunc(addr common.Address) (MessageHandlerFunc, error) { - h, ok := mh.handlers[addr] - if !ok { - return nil, fmt.Errorf("no corresponding message handler for this address %s exists", addr.Hex()) - } - return h, nil -} - -// RegisterEventHandler registers an message handler by associating a handler function to a specified address -func (mh *EVMMessageHandler) RegisterMessageHandler(address string, handler MessageHandlerFunc) { - if address == "" { - return - } - if mh.handlers == nil { - mh.handlers = make(map[common.Address]MessageHandlerFunc) - } - - log.Debug().Msgf("Registered message handler for address %s", address) - - mh.handlers[common.HexToAddress(address)] = handler -} diff --git a/chains/substrate/chain.go b/chains/substrate/chain.go index acd3358d..2b030f66 100644 --- a/chains/substrate/chain.go +++ b/chains/substrate/chain.go @@ -4,24 +4,28 @@ import ( "context" "math/big" - "github.com/ChainSafe/sygma-core/chains/substrate/client" - "github.com/ChainSafe/sygma-core/store" - "github.com/ChainSafe/sygma-core/types" + "github.com/ChainSafe/sygma-core/relayer/message" + "github.com/ChainSafe/sygma-core/relayer/proposal" "github.com/rs/zerolog" "github.com/rs/zerolog/log" ) -type BatchProposalExecutor interface { - Execute(msgs []*types.Message) error +type ProposalExecutor interface { + Execute(props []*proposal.Proposal) error } -type SubstrateChain struct { - client *client.SubstrateClient +type MessageHandler interface { + HandleMessage(m *message.Message) (*proposal.Proposal, error) +} - listener EventListener - executor BatchProposalExecutor +type EventListener interface { + ListenToEvents(ctx context.Context, startBlock *big.Int) +} - blockstore *store.BlockStore +type SubstrateChain struct { + listener EventListener + messageHandler MessageHandler + executor ProposalExecutor domainID uint8 startBlock *big.Int @@ -29,16 +33,12 @@ type SubstrateChain struct { logger zerolog.Logger } -type EventListener interface { - ListenToEvents(ctx context.Context, startBlock *big.Int) -} - -func NewSubstrateChain(client *client.SubstrateClient, listener EventListener, blockstore *store.BlockStore, executor BatchProposalExecutor, domainID uint8, startBlock *big.Int) *SubstrateChain { +func NewSubstrateChain(listener EventListener, messageHandler MessageHandler, executor ProposalExecutor, domainID uint8, startBlock *big.Int) *SubstrateChain { return &SubstrateChain{ - client: client, listener: listener, - blockstore: blockstore, executor: executor, + domainID: domainID, + startBlock: startBlock, logger: log.With().Uint8("domainID", domainID).Logger()} } @@ -49,10 +49,14 @@ func (c *SubstrateChain) PollEvents(ctx context.Context) { go c.listener.ListenToEvents(ctx, c.startBlock) } -func (c *SubstrateChain) Write(msgs []*types.Message) error { - err := c.executor.Execute(msgs) +func (c *SubstrateChain) ReceiveMessage(m *message.Message) (*proposal.Proposal, error) { + return c.messageHandler.HandleMessage(m) +} + +func (c *SubstrateChain) Write(props []*proposal.Proposal) error { + err := c.executor.Execute(props) if err != nil { - c.logger.Err(err).Msgf("error writing messages %+v on network %d", msgs, c.DomainID()) + c.logger.Err(err).Msgf("error writing proposals %+v on network %d", props, c.DomainID()) return err } diff --git a/chains/substrate/executor/message-handler.go b/chains/substrate/executor/message-handler.go deleted file mode 100644 index ea8ef225..00000000 --- a/chains/substrate/executor/message-handler.go +++ /dev/null @@ -1,61 +0,0 @@ -// The Licensed Work is (c) 2022 Sygma -// SPDX-License-Identifier: LGPL-3.0-only - -package executor - -import ( - "fmt" - - "github.com/ChainSafe/sygma-core/types" - - "github.com/rs/zerolog/log" -) - -type Handlers map[types.TransferType]MessageHandlerFunc -type MessageHandlerFunc func(m *types.Message) (*types.Proposal, error) - -type SubstrateMessageHandler struct { - handlers Handlers -} - -// NewSubstrateMessageHandler creates an instance of SubstrateMessageHandler that contains -// message handler functions for converting deposit message into a chain specific -// proposal -func NewSubstrateMessageHandler() *SubstrateMessageHandler { - return &SubstrateMessageHandler{ - handlers: make(map[types.TransferType]MessageHandlerFunc), - } -} - -func (mh *SubstrateMessageHandler) HandleMessage(m *types.Message) (*types.Proposal, error) { - // Based on handler that was registered on BridgeContract - handleMessage, err := mh.matchTransferTypeHandlerFunc(m.Type) - if err != nil { - return nil, err - } - log.Info().Str("type", string(m.Type)).Uint8("src", m.Source).Uint8("dst", m.Destination).Uint64("nonce", m.DepositNonce).Str("resourceID", fmt.Sprintf("%x", m.ResourceId)).Msg("Handling new message") - prop, err := handleMessage(m) - if err != nil { - return nil, err - } - return prop, nil -} - -func (mh *SubstrateMessageHandler) matchTransferTypeHandlerFunc(transferType types.TransferType) (MessageHandlerFunc, error) { - h, ok := mh.handlers[transferType] - if !ok { - return nil, fmt.Errorf("no corresponding message handler for this transfer type %s exists", transferType) - } - return h, nil -} - -// RegisterEventHandler registers an message handler by associating a handler function to a specified transfer type -func (mh *SubstrateMessageHandler) RegisterMessageHandler(transferType types.TransferType, handler MessageHandlerFunc) { - if transferType == "" { - return - } - - log.Info().Msgf("Registered message handler for transfer type %s", transferType) - - mh.handlers[transferType] = handler -} diff --git a/chains/substrate/executor/message-handler_test.go b/chains/substrate/executor/message-handler_test.go deleted file mode 100644 index 8bed70a3..00000000 --- a/chains/substrate/executor/message-handler_test.go +++ /dev/null @@ -1,130 +0,0 @@ -// The Licensed Work is (c) 2022 Sygma -// SPDX-License-Identifier: LGPL-3.0-only - -package executor_test - -import ( - "bytes" - "errors" - "math/big" - "testing" - "unsafe" - - "github.com/ChainSafe/sygma-core/chains/substrate/executor" - "github.com/ChainSafe/sygma-core/types" - "github.com/centrifuge/go-substrate-rpc-client/v4/scale" - "github.com/centrifuge/go-substrate-rpc-client/v4/signature" - substrateTypes "github.com/centrifuge/go-substrate-rpc-client/v4/types" - "github.com/ethereum/go-ethereum/common" - - "github.com/stretchr/testify/suite" -) - -var SubstratePK = signature.KeyringPair{ - URI: "//Alice", - PublicKey: []byte{0xd4, 0x35, 0x93, 0xc7, 0x15, 0xfd, 0xd3, 0x1c, 0x61, 0x14, 0x1a, 0xbd, 0x4, 0xa9, 0x9f, 0xd6, 0x82, 0x2c, 0x85, 0x58, 0x85, 0x4c, 0xcd, 0xe3, 0x9a, 0x56, 0x84, 0xe7, 0xa5, 0x6d, 0xa2, 0x7d}, - Address: "5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY", -} - -type MessageHandlerTestSuite struct { - suite.Suite -} - -func TestRunFungibleTransferHandlerTestSuite(t *testing.T) { - suite.Run(t, new(MessageHandlerTestSuite)) -} - -func (s *MessageHandlerTestSuite) TestSuccesfullyRegisterFungibleTransferMessageHandler() { - recipientAddr := *(*[]substrateTypes.U8)(unsafe.Pointer(&SubstratePK.PublicKey)) - recipient := ConstructRecipientData(recipientAddr) - - messageData := &types.Message{ - Source: 1, - Destination: 0, - DepositNonce: 1, - ResourceId: [32]byte{0}, - Type: "fungible", - Payload: []interface{}{ - []byte{2}, // amount - recipient, - }, - Metadata: types.Metadata{}, - } - - invalidMessageData := &types.Message{ - Source: 1, - Destination: 0, - DepositNonce: 1, - ResourceId: [32]byte{0}, - Type: "nonFungible", - Payload: []interface{}{ - []byte{2}, // amount - recipient, - }, - Metadata: types.Metadata{}, - } - - depositMessageHandler := executor.NewSubstrateMessageHandler() - // Register FungibleTransferMessageHandler function - depositMessageHandler.RegisterMessageHandler("fungible", FungibleMessageHandler) - prop1, err1 := depositMessageHandler.HandleMessage(messageData) - s.Nil(err1) - s.NotNil(prop1) - - // Use unregistered transfer type - prop2, err2 := depositMessageHandler.HandleMessage(invalidMessageData) - s.Nil(prop2) - s.NotNil(err2) -} - -func FungibleMessageHandler(m *types.Message) (*types.Proposal, error) { - if len(m.Payload) != 2 { - return nil, errors.New("malformed payload. Len of payload should be 2") - } - amount, ok := m.Payload[0].([]byte) - if !ok { - return nil, errors.New("wrong payload amount format") - } - recipient, ok := m.Payload[1].([]byte) - if !ok { - return nil, errors.New("wrong payload recipient format") - } - var data []byte - data = append(data, common.LeftPadBytes(amount, 32)...) // amount (uint256) - - recipientLen := big.NewInt(int64(len(recipient))).Bytes() - data = append(data, common.LeftPadBytes(recipientLen, 32)...) - data = append(data, recipient...) - return types.NewProposal(m.Source, m.Destination, m.DepositNonce, m.ResourceId, data, m.Metadata), nil -} - -func ConstructRecipientData(recipient []substrateTypes.U8) []byte { - rec := substrateTypes.MultiLocationV1{ - Parents: 0, - Interior: substrateTypes.JunctionsV1{ - IsX1: true, - X1: substrateTypes.JunctionV1{ - IsAccountID32: true, - AccountID32NetworkID: substrateTypes.NetworkID{ - IsAny: true, - }, - AccountID: recipient, - }, - }, - } - - encodedRecipient := bytes.NewBuffer([]byte{}) - encoder := scale.NewEncoder(encodedRecipient) - _ = rec.Encode(*encoder) - - recipientBytes := encodedRecipient.Bytes() - var finalRecipient []byte - - // remove accountID size data - // this is a fix because the substrate decoder is not able to parse the data with extra data - // that represents size of the recipient byte array - finalRecipient = append(finalRecipient, recipientBytes[:4]...) - finalRecipient = append(finalRecipient, recipientBytes[5:]...) - - return finalRecipient -} diff --git a/mock/message.go b/mock/message.go new file mode 100644 index 00000000..d31c4112 --- /dev/null +++ b/mock/message.go @@ -0,0 +1,55 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: ./relayer/message/handler.go +// +// Generated by this command: +// +// mockgen -source=./relayer/message/handler.go -destination=./mock/message.go -package mock +// +// Package mock is a generated GoMock package. +package mock + +import ( + reflect "reflect" + + message "github.com/ChainSafe/sygma-core/relayer/message" + proposal "github.com/ChainSafe/sygma-core/relayer/proposal" + gomock "go.uber.org/mock/gomock" +) + +// MockHandler is a mock of Handler interface. +type MockHandler struct { + ctrl *gomock.Controller + recorder *MockHandlerMockRecorder +} + +// MockHandlerMockRecorder is the mock recorder for MockHandler. +type MockHandlerMockRecorder struct { + mock *MockHandler +} + +// NewMockHandler creates a new mock instance. +func NewMockHandler(ctrl *gomock.Controller) *MockHandler { + mock := &MockHandler{ctrl: ctrl} + mock.recorder = &MockHandlerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockHandler) EXPECT() *MockHandlerMockRecorder { + return m.recorder +} + +// HandleMessage mocks base method. +func (m_2 *MockHandler) HandleMessage(m *message.Message) (*proposal.Proposal, error) { + m_2.ctrl.T.Helper() + ret := m_2.ctrl.Call(m_2, "HandleMessage", m) + ret0, _ := ret[0].(*proposal.Proposal) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// HandleMessage indicates an expected call of HandleMessage. +func (mr *MockHandlerMockRecorder) HandleMessage(m any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HandleMessage", reflect.TypeOf((*MockHandler)(nil).HandleMessage), m) +} diff --git a/mock/relayer.go b/mock/relayer.go index 3e202412..7d95f411 100644 --- a/mock/relayer.go +++ b/mock/relayer.go @@ -12,69 +12,11 @@ import ( context "context" reflect "reflect" - types "github.com/ChainSafe/sygma-core/types" + message "github.com/ChainSafe/sygma-core/relayer/message" + proposal "github.com/ChainSafe/sygma-core/relayer/proposal" gomock "go.uber.org/mock/gomock" ) -// MockDepositMeter is a mock of DepositMeter interface. -type MockDepositMeter struct { - ctrl *gomock.Controller - recorder *MockDepositMeterMockRecorder -} - -// MockDepositMeterMockRecorder is the mock recorder for MockDepositMeter. -type MockDepositMeterMockRecorder struct { - mock *MockDepositMeter -} - -// NewMockDepositMeter creates a new mock instance. -func NewMockDepositMeter(ctrl *gomock.Controller) *MockDepositMeter { - mock := &MockDepositMeter{ctrl: ctrl} - mock.recorder = &MockDepositMeterMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockDepositMeter) EXPECT() *MockDepositMeterMockRecorder { - return m.recorder -} - -// TrackDepositMessage mocks base method. -func (m_2 *MockDepositMeter) TrackDepositMessage(m *types.Message) { - m_2.ctrl.T.Helper() - m_2.ctrl.Call(m_2, "TrackDepositMessage", m) -} - -// TrackDepositMessage indicates an expected call of TrackDepositMessage. -func (mr *MockDepositMeterMockRecorder) TrackDepositMessage(m any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TrackDepositMessage", reflect.TypeOf((*MockDepositMeter)(nil).TrackDepositMessage), m) -} - -// TrackExecutionError mocks base method. -func (m_2 *MockDepositMeter) TrackExecutionError(m *types.Message) { - m_2.ctrl.T.Helper() - m_2.ctrl.Call(m_2, "TrackExecutionError", m) -} - -// TrackExecutionError indicates an expected call of TrackExecutionError. -func (mr *MockDepositMeterMockRecorder) TrackExecutionError(m any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TrackExecutionError", reflect.TypeOf((*MockDepositMeter)(nil).TrackExecutionError), m) -} - -// TrackSuccessfulExecutionLatency mocks base method. -func (m_2 *MockDepositMeter) TrackSuccessfulExecutionLatency(m *types.Message) { - m_2.ctrl.T.Helper() - m_2.ctrl.Call(m_2, "TrackSuccessfulExecutionLatency", m) -} - -// TrackSuccessfulExecutionLatency indicates an expected call of TrackSuccessfulExecutionLatency. -func (mr *MockDepositMeterMockRecorder) TrackSuccessfulExecutionLatency(m any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TrackSuccessfulExecutionLatency", reflect.TypeOf((*MockDepositMeter)(nil).TrackSuccessfulExecutionLatency), m) -} - // MockRelayedChain is a mock of RelayedChain interface. type MockRelayedChain struct { ctrl *gomock.Controller @@ -124,16 +66,31 @@ func (mr *MockRelayedChainMockRecorder) PollEvents(ctx any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PollEvents", reflect.TypeOf((*MockRelayedChain)(nil).PollEvents), ctx) } +// ReceiveMessage mocks base method. +func (m_2 *MockRelayedChain) ReceiveMessage(m *message.Message) (*proposal.Proposal, error) { + m_2.ctrl.T.Helper() + ret := m_2.ctrl.Call(m_2, "ReceiveMessage", m) + ret0, _ := ret[0].(*proposal.Proposal) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ReceiveMessage indicates an expected call of ReceiveMessage. +func (mr *MockRelayedChainMockRecorder) ReceiveMessage(m any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReceiveMessage", reflect.TypeOf((*MockRelayedChain)(nil).ReceiveMessage), m) +} + // Write mocks base method. -func (m *MockRelayedChain) Write(messages []*types.Message) error { +func (m *MockRelayedChain) Write(proposals []*proposal.Proposal) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Write", messages) + ret := m.ctrl.Call(m, "Write", proposals) ret0, _ := ret[0].(error) return ret0 } // Write indicates an expected call of Write. -func (mr *MockRelayedChainMockRecorder) Write(messages any) *gomock.Call { +func (mr *MockRelayedChainMockRecorder) Write(proposals any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Write", reflect.TypeOf((*MockRelayedChain)(nil).Write), messages) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Write", reflect.TypeOf((*MockRelayedChain)(nil).Write), proposals) } diff --git a/observability/metrics.go b/observability/metrics.go index 0d3d4397..1b6e25c2 100644 --- a/observability/metrics.go +++ b/observability/metrics.go @@ -7,7 +7,6 @@ import ( "sync" "time" - "github.com/ChainSafe/sygma-core/types" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" "go.opentelemetry.io/otel/metric" @@ -76,35 +75,8 @@ type RelayerMetrics struct { // NewRelayerMetrics initializes OpenTelemetry metrics func NewRelayerMetrics(meter metric.Meter, attributes ...attribute.KeyValue) (*RelayerMetrics, error) { opts := api.WithAttributes(attributes...) - depositEventCounter, err := meter.Int64Counter( - "relayer.DepositEventCount", - metric.WithDescription("Number of deposit events per domain")) - if err != nil { - return nil, err - - } - executionErrorCount, err := meter.Int64Counter( - "relayer.ExecutionErrorCount", - metric.WithDescription("Number of executions that failed")) - if err != nil { - return nil, err - } - executionLatencyPerRoute, err := meter.Int64Histogram( - "relayer.ExecutionLatencyPerRoute", - metric.WithDescription("Execution time histogram between indexing event and executing it per route")) - if err != nil { - return nil, err - } - executionLatency, err := meter.Int64Histogram( - "relayer.ExecutionLatency", - metric.WithDescription("Execution time histogram between indexing even`t and executing it"), - metric.WithUnit("ms")) - if err != nil { - return nil, err - } blockDeltaMap := make(map[uint8]*big.Int) - blockDeltaGauge, err := meter.Int64ObservableGauge( "relayer.BlockDelta", metric.WithInt64Callback(func(context context.Context, result metric.Int64Observer) error { @@ -119,68 +91,14 @@ func NewRelayerMetrics(meter metric.Meter, attributes ...attribute.KeyValue) (*R metric.WithDescription("Difference between chain head and current indexed block per domain"), ) return &RelayerMetrics{ - meter: meter, - MessageEventTime: make(map[string]time.Time), - Opts: opts, - DepositEventCount: depositEventCounter, - ExecutionErrorCount: executionErrorCount, - ExecutionLatencyPerRoute: executionLatencyPerRoute, - ExecutionLatency: executionLatency, - BlockDelta: blockDeltaGauge, - BlockDeltaMap: blockDeltaMap, + meter: meter, + MessageEventTime: make(map[string]time.Time), + Opts: opts, + BlockDelta: blockDeltaGauge, + BlockDeltaMap: blockDeltaMap, }, err } -// TrackDepositMessage extracts metrics from deposit message and sends -// them to OpenTelemetry collector -func (t *RelayerMetrics) TrackDepositMessage(m *types.Message) { - t.DepositEventCount.Add(context.Background(), 1, t.Opts, api.WithAttributes(attribute.Int64("source", int64(m.Source)))) - - t.lock.Lock() - defer t.lock.Unlock() - t.MessageEventTime[m.ID()] = time.Now() -} - -func (t *RelayerMetrics) TrackExecutionError(m *types.Message) { - t.ExecutionErrorCount.Add(context.Background(), 1, t.Opts, api.WithAttributes(attribute.Int64("destination", int64(m.Source)))) - - t.lock.Lock() - defer t.lock.Unlock() - delete(t.MessageEventTime, m.ID()) -} - -func (t *RelayerMetrics) TrackSuccessfulExecutionLatency(m *types.Message) { - executionLatency := time.Since(t.MessageEventTime[m.ID()]).Milliseconds() / 1000 - t.ExecutionLatency.Record(context.Background(), executionLatency) - t.ExecutionLatencyPerRoute.Record( - context.Background(), - executionLatency, - t.Opts, - api.WithAttributes(attribute.Int64("source", int64(m.Source))), - api.WithAttributes(attribute.Int64("destination", int64(m.Destination))), - ) - - t.lock.Lock() - defer t.lock.Unlock() - delete(t.MessageEventTime, m.ID()) -} - -func (t *RelayerMetrics) TrackSuccessfulExecution(m *types.Message) { - executionLatency := time.Since(t.MessageEventTime[m.ID()]).Milliseconds() / 1000 - t.ExecutionLatency.Record(context.Background(), executionLatency) - t.ExecutionLatencyPerRoute.Record( - context.Background(), - executionLatency, - t.Opts, - api.WithAttributes(attribute.Int64("source", int64(m.Source))), - api.WithAttributes(attribute.Int64("destination", int64(m.Destination))), - ) - - t.lock.Lock() - defer t.lock.Unlock() - delete(t.MessageEventTime, m.ID()) -} - func (t *RelayerMetrics) TrackBlockDelta(domainID uint8, head *big.Int, current *big.Int) { t.lock.Lock() defer t.lock.Unlock() diff --git a/relayer/message/handler.go b/relayer/message/handler.go new file mode 100644 index 00000000..2d685763 --- /dev/null +++ b/relayer/message/handler.go @@ -0,0 +1,35 @@ +package message + +import ( + "fmt" + + "github.com/ChainSafe/sygma-core/relayer/proposal" +) + +type Handler interface { + HandleMessage(m *Message) (*proposal.Proposal, error) +} + +type MessageHandler struct { + handlers map[MessageType]Handler +} + +func NewMessageHandler() *MessageHandler { + return &MessageHandler{ + handlers: make(map[MessageType]Handler), + } +} + +// HandlerMessage calls associated handler for that message type and returns a proposal to be submitted on-chain +func (h *MessageHandler) HandleMessage(m *Message) (*proposal.Proposal, error) { + mh, ok := h.handlers[m.Type] + if !ok { + return nil, fmt.Errorf("no handler found for type %s", m.Type) + } + return mh.HandleMessage(m) +} + +// RegisterMessageHandler registers a message handler by associating a handler to a message type +func (mh *MessageHandler) RegisterMessageHandler(t MessageType, h Handler) { + mh.handlers[t] = h +} diff --git a/relayer/message/handler_test.go b/relayer/message/handler_test.go new file mode 100644 index 00000000..1ed4b5d5 --- /dev/null +++ b/relayer/message/handler_test.go @@ -0,0 +1,71 @@ +package message_test + +import ( + "fmt" + "testing" + + "github.com/ChainSafe/sygma-core/mock" + "github.com/ChainSafe/sygma-core/relayer/message" + "github.com/ChainSafe/sygma-core/relayer/proposal" + "github.com/stretchr/testify/suite" + "go.uber.org/mock/gomock" +) + +type MessageHandlerTestSuite struct { + suite.Suite + + mockHandler *mock.MockHandler +} + +func TestRunMessageHandlerTestSuite(t *testing.T) { + suite.Run(t, new(MessageHandlerTestSuite)) +} + +func (s *MessageHandlerTestSuite) SetupTest() { + gomockController := gomock.NewController(s.T()) + s.mockHandler = mock.NewMockHandler(gomockController) +} + +func (s *MessageHandlerTestSuite) TestHandleMessageWithoutRegisteredHandler() { + mh := message.NewMessageHandler() + + _, err := mh.HandleMessage(&message.Message{Type: "invalid"}) + + s.NotNil(err) +} + +func (s *MessageHandlerTestSuite) TestHandleMessageWithInvalidType() { + mh := message.NewMessageHandler() + mh.RegisterMessageHandler("invalid", s.mockHandler) + + _, err := mh.HandleMessage(&message.Message{Type: "valid"}) + + s.NotNil(err) +} + +func (s *MessageHandlerTestSuite) TestHandleMessageHandlerReturnsError() { + s.mockHandler.EXPECT().HandleMessage(gomock.Any()).Return(nil, fmt.Errorf("error")) + + mh := message.NewMessageHandler() + mh.RegisterMessageHandler("valid", s.mockHandler) + + _, err := mh.HandleMessage(&message.Message{Type: "valid"}) + + s.NotNil(err) +} + +func (s *MessageHandlerTestSuite) TestHandleMessageWithValidType() { + expectedProp := &proposal.Proposal{ + Type: "prop", + } + s.mockHandler.EXPECT().HandleMessage(gomock.Any()).Return(expectedProp, nil) + + mh := message.NewMessageHandler() + mh.RegisterMessageHandler("valid", s.mockHandler) + + msg := message.NewMessage(1, 2, nil, "valid") + prop, err := mh.HandleMessage(msg) + + s.Nil(err) + s.Equal(prop, expectedProp) +} diff --git a/relayer/message/message.go b/relayer/message/message.go new file mode 100644 index 00000000..7be00fc0 --- /dev/null +++ b/relayer/message/message.go @@ -0,0 +1,23 @@ +package message + +type MessageType string +type Message struct { + Source uint8 // Source where message was initiated + Destination uint8 // Destination chain of message + Data interface{} // Data associated with the message + Type MessageType // Message type +} + +func NewMessage( + source uint8, + destination uint8, + data interface{}, + msgType MessageType, +) *Message { + return &Message{ + source, + destination, + data, + msgType, + } +} diff --git a/relayer/proposal/proposal.go b/relayer/proposal/proposal.go new file mode 100644 index 00000000..f20cefd7 --- /dev/null +++ b/relayer/proposal/proposal.go @@ -0,0 +1,18 @@ +package proposal + +type ProposalType string +type Proposal struct { + Source uint8 + Destination uint8 + Data interface{} + Type ProposalType +} + +func NewProposal(source, destination uint8, data []byte, propType ProposalType) *Proposal { + return &Proposal{ + Source: source, + Destination: destination, + Data: data, + Type: propType, + } +} diff --git a/relayer/relayer.go b/relayer/relayer.go index 7cd3defa..4c8a948d 100644 --- a/relayer/relayer.go +++ b/relayer/relayer.go @@ -6,40 +6,39 @@ package relayer import ( "context" - "github.com/ChainSafe/sygma-core/types" + "github.com/ChainSafe/sygma-core/relayer/message" + "github.com/ChainSafe/sygma-core/relayer/proposal" "github.com/rs/zerolog/log" ) -type DepositMeter interface { - TrackDepositMessage(m *types.Message) - TrackExecutionError(m *types.Message) - TrackSuccessfulExecutionLatency(m *types.Message) -} - type RelayedChain interface { + // PollEvents starts listening for on-chain events PollEvents(ctx context.Context) - Write(messages []*types.Message) error + // ReceiveMessage accepts the message from the source chain and converts it into + // a Proposal to be submitted on-chain + ReceiveMessage(m *message.Message) (*proposal.Proposal, error) + // Write submits proposals on-chain. + // If multiple proposals submitted they are expected to be able to be batched. + Write(proposals []*proposal.Proposal) error DomainID() uint8 } -func NewRelayer(chains []RelayedChain, metrics DepositMeter) *Relayer { - return &Relayer{relayedChains: chains, metrics: metrics} +func NewRelayer(chains map[uint8]RelayedChain) *Relayer { + return &Relayer{relayedChains: chains} } type Relayer struct { - metrics DepositMeter - relayedChains []RelayedChain - registry map[uint8]RelayedChain + relayedChains map[uint8]RelayedChain } -// Start function starts the relayer. Relayer routine is starting all the chains -// and passing them with a channel that accepts unified cross chain message format -func (r *Relayer) Start(ctx context.Context, msgChan chan []*types.Message) { - log.Debug().Msgf("Starting relayer") +// Start function starts polling events for each chain and listens to cross-chain messages. +// If an array of messages is sent to the channel they are expected to be to the same destination and +// able to be handled in batches. +func (r *Relayer) Start(ctx context.Context, msgChan chan []*message.Message) { + log.Info().Msgf("Starting relayer") for _, c := range r.relayedChains { log.Debug().Msgf("Starting chain %v", c.DomainID()) - r.addRelayedChain(c) go c.PollEvents(ctx) } @@ -54,33 +53,32 @@ func (r *Relayer) Start(ctx context.Context, msgChan chan []*types.Message) { } } -// Route function runs destination writer by mapping DestinationID from message to registered writer. -func (r *Relayer) route(msgs []*types.Message) { - destChain, ok := r.registry[msgs[0].Destination] +// Route function routes the messages to the destination chain. +func (r *Relayer) route(msgs []*message.Message) { + destChain, ok := r.relayedChains[msgs[0].Destination] if !ok { - log.Error().Msgf("no resolver for destID %v to send message registered", msgs[0].Destination) + log.Error().Uint8("domainID", destChain.DomainID()).Msgf("No chain registered for destination domain") return } - log.Debug().Msgf("Sending messages %+v to destination %v", msgs, destChain.DomainID()) - err := destChain.Write(msgs) - if err != nil { - for _, m := range msgs { - log.Err(err).Msgf("Failed sending messages %+v to destination %v", m, destChain.DomainID()) - r.metrics.TrackExecutionError(m) + props := make([]*proposal.Proposal, 0) + for _, m := range msgs { + prop, err := destChain.ReceiveMessage(m) + if err != nil { + log.Err(err).Uint8("domainID", destChain.DomainID()).Msgf("Failed receiving message %+v", m) + continue + } + if prop != nil { + props = append(props, prop) } - return } - - for _, m := range msgs { - r.metrics.TrackSuccessfulExecutionLatency(m) + if len(props) == 0 { + return } -} -func (r *Relayer) addRelayedChain(c RelayedChain) { - if r.registry == nil { - r.registry = make(map[uint8]RelayedChain) + err := destChain.Write(props) + if err != nil { + log.Err(err).Uint8("domainID", destChain.DomainID()).Msgf("Failed writing message") + return } - domainID := c.DomainID() - r.registry[domainID] = c } diff --git a/relayer/relayer_test.go b/relayer/relayer_test.go index f18bf4b0..679d7df7 100644 --- a/relayer/relayer_test.go +++ b/relayer/relayer_test.go @@ -1,11 +1,14 @@ package relayer import ( + "context" "fmt" "testing" + "time" "github.com/ChainSafe/sygma-core/mock" - "github.com/ChainSafe/sygma-core/types" + "github.com/ChainSafe/sygma-core/relayer/message" + "github.com/ChainSafe/sygma-core/relayer/proposal" "github.com/stretchr/testify/suite" "go.uber.org/mock/gomock" ) @@ -13,7 +16,6 @@ import ( type RouteTestSuite struct { suite.Suite mockRelayedChain *mock.MockRelayedChain - mockMetrics *mock.MockDepositMeter } func TestRunRouteTestSuite(t *testing.T) { @@ -25,46 +27,91 @@ func (s *RouteTestSuite) TearDownSuite() {} func (s *RouteTestSuite) SetupTest() { gomockController := gomock.NewController(s.T()) s.mockRelayedChain = mock.NewMockRelayedChain(gomockController) - s.mockMetrics = mock.NewMockDepositMeter(gomockController) } func (s *RouteTestSuite) TearDownTest() {} -func (s *RouteTestSuite) TestLogsErrorIfDestinationDoesNotExist() { - relayer := Relayer{ - metrics: s.mockMetrics, +func (s *RouteTestSuite) TestStartListensOnChannel() { + ctx, cancel := context.WithCancel(context.TODO()) + + s.mockRelayedChain.EXPECT().DomainID().Return(uint8(1)) + s.mockRelayedChain.EXPECT().PollEvents(gomock.Any()) + s.mockRelayedChain.EXPECT().DomainID().DoAndReturn(func() uint8 { + cancel() + return 1 + }) + s.mockRelayedChain.EXPECT().ReceiveMessage(gomock.Any()).Return(nil, fmt.Errorf("error")) + chains := make(map[uint8]RelayedChain) + chains[1] = s.mockRelayedChain + relayer := NewRelayer( + chains, + ) + + msgChan := make(chan []*message.Message, 1) + msgChan <- []*message.Message{ + {Destination: 1}, } + relayer.Start(ctx, msgChan) + time.Sleep(time.Millisecond * 100) +} + +func (s *RouteTestSuite) TestReceiveMessageFails() { + s.mockRelayedChain.EXPECT().DomainID().Return(uint8(1)).Times(1) + s.mockRelayedChain.EXPECT().ReceiveMessage(gomock.Any()).Return(nil, fmt.Errorf("error")) + chains := make(map[uint8]RelayedChain) + chains[1] = s.mockRelayedChain + relayer := NewRelayer( + chains, + ) + + relayer.route([]*message.Message{ + {Destination: 1}, + }) +} - relayer.route([]*types.Message{ - {}, +func (s *RouteTestSuite) TestAvoidWriteWithoutProposals() { + s.mockRelayedChain.EXPECT().ReceiveMessage(gomock.Any()).Return(nil, nil) + chains := make(map[uint8]RelayedChain) + chains[1] = s.mockRelayedChain + relayer := NewRelayer( + chains, + ) + + relayer.route([]*message.Message{ + {Destination: 1}, }) } -func (s *RouteTestSuite) TestWriteFail() { - s.mockMetrics.EXPECT().TrackExecutionError(gomock.Any()) - s.mockRelayedChain.EXPECT().DomainID().Return(uint8(1)).Times(3) - s.mockRelayedChain.EXPECT().Write(gomock.Any()).Return(fmt.Errorf("error")) +func (s *RouteTestSuite) TestWriteFails() { + props := make([]*proposal.Proposal, 1) + prop := &proposal.Proposal{} + props[0] = prop + s.mockRelayedChain.EXPECT().ReceiveMessage(gomock.Any()).Return(prop, nil) + s.mockRelayedChain.EXPECT().Write(props).Return(fmt.Errorf("error")) + s.mockRelayedChain.EXPECT().DomainID().Return(uint8(1)).Times(1) + chains := make(map[uint8]RelayedChain) + chains[1] = s.mockRelayedChain relayer := NewRelayer( - []RelayedChain{}, - s.mockMetrics, + chains, ) - relayer.addRelayedChain(s.mockRelayedChain) - relayer.route([]*types.Message{ + relayer.route([]*message.Message{ {Destination: 1}, }) } -func (s *RouteTestSuite) TestWritesToDestChainIfMessageValid() { - s.mockMetrics.EXPECT().TrackSuccessfulExecutionLatency(gomock.Any()) - s.mockRelayedChain.EXPECT().DomainID().Return(uint8(1)).Times(2) - s.mockRelayedChain.EXPECT().Write(gomock.Any()) +func (s *RouteTestSuite) TestWritesToChain() { + props := make([]*proposal.Proposal, 1) + prop := &proposal.Proposal{} + props[0] = prop + s.mockRelayedChain.EXPECT().ReceiveMessage(gomock.Any()).Return(prop, nil) + s.mockRelayedChain.EXPECT().Write(props).Return(nil) + chains := make(map[uint8]RelayedChain) + chains[1] = s.mockRelayedChain relayer := NewRelayer( - []RelayedChain{}, - s.mockMetrics, + chains, ) - relayer.addRelayedChain(s.mockRelayedChain) - relayer.route([]*types.Message{ + relayer.route([]*message.Message{ {Destination: 1}, }) } diff --git a/types/message.go b/types/message.go deleted file mode 100644 index e305f4ed..00000000 --- a/types/message.go +++ /dev/null @@ -1,43 +0,0 @@ -package types - -import ( - "strconv" -) - -type Metadata struct { - Data map[string]interface{} -} -type TransferType string -type Message struct { - Source uint8 // Source where message was initiated - Destination uint8 // Destination chain of message - DepositNonce uint64 // Nonce for the deposit - ResourceId ResourceID - Payload []interface{} // data associated with event sequence - Metadata Metadata // Arbitrary data that will be most likely be used by the relayer - Type TransferType -} - -func NewMessage( - source uint8, - destination uint8, - depositNonce uint64, - resourceId ResourceID, - transferType TransferType, - payload []interface{}, - metadata Metadata, -) *Message { - return &Message{ - source, - destination, - depositNonce, - resourceId, - payload, - metadata, - transferType, - } -} - -func (m Message) ID() string { - return strconv.FormatInt(int64(m.Source), 10) + "-" + strconv.FormatInt(int64(m.Destination), 10) + "-" + strconv.FormatInt(int64(m.DepositNonce), 10) -} diff --git a/types/proposal.go b/types/proposal.go deleted file mode 100644 index 47673ee3..00000000 --- a/types/proposal.go +++ /dev/null @@ -1,21 +0,0 @@ -package types - -func NewProposal(source, destination uint8, depositNonce uint64, resourceId ResourceID, data []byte, metadata Metadata) *Proposal { - return &Proposal{ - OriginDomainID: source, - DepositNonce: depositNonce, - ResourceID: resourceId, - Destination: destination, - Data: data, - Metadata: metadata, - } -} - -type Proposal struct { - OriginDomainID uint8 - DepositNonce uint64 - ResourceID ResourceID - Data []byte - Destination uint8 - Metadata Metadata -} diff --git a/types/types.go b/types/types.go deleted file mode 100644 index a1b2abf2..00000000 --- a/types/types.go +++ /dev/null @@ -1,3 +0,0 @@ -package types - -type ResourceID [32]byte