Skip to content

Commit

Permalink
Add additional idempotence check to cover Kafka server restart, while…
Browse files Browse the repository at this point in the history
… EthConnect stays running (#227)

* Idempotency check on the way into the inflight pool

Signed-off-by: Peter Broadhurst <[email protected]>

* Update initialization for idempotency check receipt store, and txprocessor impl

Signed-off-by: Peter Broadhurst <[email protected]>

* Add unit tests for idempotence

Signed-off-by: Peter Broadhurst <[email protected]>

* Check on TX Hash, in case redelivery happens after full receipt has been persisted

Signed-off-by: Peter Broadhurst <[email protected]>

* Provide more detailed error when idempotency check fails on receipt store

Signed-off-by: Peter Broadhurst <[email protected]>

* Better logging and handle redelivery with extra receipt

Signed-off-by: Peter Broadhurst <[email protected]>

* Do not store receipts when we get a redelivery notification

Signed-off-by: Peter Broadhurst <[email protected]>

* Store special record if we lose the reply

Signed-off-by: Peter Broadhurst <[email protected]>

Signed-off-by: Peter Broadhurst <[email protected]>
  • Loading branch information
peterbroadhurst authored Aug 31, 2022
1 parent 5ee45fd commit 0e6f5b0
Show file tree
Hide file tree
Showing 26 changed files with 750 additions and 229 deletions.
2 changes: 2 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
"ffcmocks",
"fftypes",
"hashicorp",
"idempotency",
"Infof",
"kvstore",
"receiptsmocks",
"smconf",
"stretchr",
"Tracef",
Expand Down
17 changes: 9 additions & 8 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,12 @@ mocks-$(strip $(1))-$(strip $(2)): ${MOCKERY} sarama
${MOCKERY} --case underscore --dir $(1) --name $(2) --outpkg $(3) --output mocks/$(strip $(3))
endef

$(eval $(call makemock, internal/contractregistry, ContractStore, contractregistrymocks))
$(eval $(call makemock, internal/contractregistry, RemoteRegistry, contractregistrymocks))
$(eval $(call makemock, internal/eth, RPCClient, ethmocks))
$(eval $(call makemock, internal/ffcapiconnector, FFCServer, ffcapiconnectormocks))
$(eval $(call makemock, $$(SARAMA_PATH), Client, saramamocks))
$(eval $(call makemock, $$(SARAMA_PATH), ConsumerGroup, saramamocks))
$(eval $(call makemock, $$(SARAMA_PATH), ConsumerGroupSession, saramamocks))
$(eval $(call makemock, $$(SARAMA_PATH), ConsumerGroupClaim, saramamocks))
$(eval $(call makemock, internal/contractregistry, ContractStore, contractregistrymocks))
$(eval $(call makemock, internal/contractregistry, RemoteRegistry, contractregistrymocks))
$(eval $(call makemock, internal/eth, RPCClient, ethmocks))
$(eval $(call makemock, internal/ffcapiconnector, FFCServer, ffcapiconnectormocks))
$(eval $(call makemock, internal/receipts, ReceiptStorePersistence, receiptsmocks))
$(eval $(call makemock, $$(SARAMA_PATH), Client, saramamocks))
$(eval $(call makemock, $$(SARAMA_PATH), ConsumerGroup, saramamocks))
$(eval $(call makemock, $$(SARAMA_PATH), ConsumerGroupSession, saramamocks))
$(eval $(call makemock, $$(SARAMA_PATH), ConsumerGroupClaim, saramamocks))
58 changes: 39 additions & 19 deletions cmd/ethconnect.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/hyperledger/firefly-ethconnect/internal/errors"
"github.com/hyperledger/firefly-ethconnect/internal/kafka"
"github.com/hyperledger/firefly-ethconnect/internal/receipts"
"github.com/hyperledger/firefly-ethconnect/internal/rest"
"github.com/hyperledger/firefly-ethconnect/internal/utils"
"github.com/icza/dyno"
Expand Down Expand Up @@ -56,19 +57,14 @@ func initLogging(debugLevel int) {
switch debugLevel {
case 0:
log.SetLevel(log.ErrorLevel)
break
case 1:
log.SetLevel(log.InfoLevel)
break
case 2:
log.SetLevel(log.DebugLevel)
break
case 3:
log.SetLevel(log.TraceLevel)
break
default:
log.SetLevel(log.DebugLevel)
break
}
log.Debugf("Log level set to %d", debugLevel)
}
Expand Down Expand Up @@ -168,33 +164,57 @@ func startServer() (err error) {

anyRoutineFinished := make(chan bool)
var dontPrintYaml = false
for name, conf := range serverConfig.KafkaBridges {
kafkaBridge := kafka.NewKafkaBridge(&dontPrintYaml)
kafkaBridge.SetConf(conf)
if err := kafkaBridge.ValidateConf(); err != nil {
return err
}
go func(name string, anyRoutineFinished chan bool) {
log.Infof("Starting Kafka->Ethereum bridge '%s'", name)
if err := kafkaBridge.Start(); err != nil {
log.Errorf("Kafka->Ethereum bridge failed: %s", err)
}
anyRoutineFinished <- true
}(name, anyRoutineFinished)
}

// Merge in legacy named 'webbhooks' configs
if serverConfig.RESTGateways == nil {
serverConfig.RESTGateways = make(map[string]*rest.RESTGatewayConf)
}
for name, conf := range serverConfig.Webhooks {
serverConfig.RESTGateways[name] = conf
}
var idempotencyCheckReceiptStore receipts.ReceiptStorePersistence
restGateways := make(map[string]*rest.RESTGateway)
for name, conf := range serverConfig.RESTGateways {
restGateway := rest.NewRESTGateway(&dontPrintYaml)
restGateways[name] = restGateway
restGateway.SetConf(conf)
if err := restGateway.ValidateConf(); err != nil {
return err
}
// This is a slightly awkward cross-component call, to account for the most popular pattern of usage:
// - Run in server mode
// - Single REST API Gateway
// - Single Kafka bridge co-located in the same process
// In this scenario, we can pass the receipt store to the Kafka bridge for it to do
// additional idempotency checks that prevent res-submission of transactions.
if idempotencyCheckReceiptStore == nil {
idempotencyCheckReceiptStore, err = restGateway.InitReceiptStore()
if err != nil {
return err
}
}

}

// Start the kafka bridges, passing in the receipt store if we have one
for name, conf := range serverConfig.KafkaBridges {
kafkaBridge := kafka.NewKafkaBridge(&dontPrintYaml)
kafkaBridge.SetConf(conf)
if err := kafkaBridge.ValidateConf(); err != nil {
return err
}
go func(name string, anyRoutineFinished chan bool) {
log.Infof("Starting Kafka->Ethereum bridge '%s'", name)
if err := kafkaBridge.Start(idempotencyCheckReceiptStore); err != nil {
log.Errorf("Kafka->Ethereum bridge failed: %s", err)
}
anyRoutineFinished <- true
}(name, anyRoutineFinished)
}

// Start the rest gateways
for name, rgw := range restGateways {
restGateway := rgw
go func(name string, anyRoutineFinished chan bool) {
log.Infof("Starting REST gateway '%s'", name)
if err := restGateway.Start(); err != nil {
Expand Down
8 changes: 5 additions & 3 deletions internal/contractgateway/rest2eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,13 +582,14 @@ func (r *rest2eth) deployContract(res http.ResponseWriter, req *http.Request, fr
}
} else {
ack := !getFlyParamBool("noack", req) // turn on ack's by default
immediateReceipt := strings.EqualFold(getFlyParam("acktype", req), "receipt")
deployMsg.AckType = strings.ToLower(getFlyParam("acktype", req))
immediateReceipt := deployMsg.AckType == "receipt"

// Async messages are dispatched as generic map payloads.
// We are confident in the re-serialization here as we've deserialized from JSON then built our own structure
msgBytes, _ := json.Marshal(deployMsg)
var mapMsg map[string]interface{}
json.Unmarshal(msgBytes, &mapMsg)
_ = json.Unmarshal(msgBytes, &mapMsg)
if asyncResponse, status, err := r.asyncDispatcher.DispatchMsgAsync(req.Context(), mapMsg, ack, immediateReceipt); err != nil {
r.restErrReply(res, req, err, status)
} else {
Expand Down Expand Up @@ -630,7 +631,8 @@ func (r *rest2eth) sendTransaction(res http.ResponseWriter, req *http.Request, f
}
} else {
ack := !getFlyParamBool("noack", req) // turn on ack's by default
immediateReceipt := strings.EqualFold(getFlyParam("acktype", req), "receipt")
msg.AckType = strings.ToLower(getFlyParam("acktype", req))
immediateReceipt := msg.AckType == "receipt"

// Async messages are dispatched as generic map payloads.
// We are confident in the re-serialization here as we've deserialized from JSON then built our own structure
Expand Down
3 changes: 3 additions & 0 deletions internal/contractgateway/syncdispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/hyperledger/firefly-ethconnect/internal/eth"
"github.com/hyperledger/firefly-ethconnect/internal/messages"
"github.com/hyperledger/firefly-ethconnect/internal/receipts"
"github.com/hyperledger/firefly-ethconnect/internal/tx"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -58,6 +59,8 @@ func (p *mockProcessor) OnMessage(c tx.TxnContext) {
}
}
func (p *mockProcessor) Init(eth.RPCClient) {}
func (p *mockProcessor) SetReceiptStoreForIdempotencyCheck(receiptStore receipts.ReceiptStorePersistence) {
}

type mockReplyProcessor struct {
err error
Expand Down
4 changes: 4 additions & 0 deletions internal/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,10 @@ var (

// ReceiptStoreKeyNotUnique non-unique request ID
ReceiptStoreKeyNotUnique = e(100219, "Request ID is not unique")
// ReceiptErrorIdempotencyCheck failed to query receipt during idempotency check
ReceiptErrorIdempotencyCheck = e(100220, "Failed querying the receipt store, performing duplicate message check on ackmode=receipt for id %s: %s")
// ResubmissionPreventedCheckTransactionHash redelivery was prevented by the processor
ResubmissionPreventedCheckTransactionHash = e(100221, "Resubmission of this transaction was prevented by the REST API Gateway. Check the status of the transaction by the transaction hash")
)

type EthconnectError interface {
Expand Down
30 changes: 18 additions & 12 deletions internal/kafka/kafkabridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/hyperledger/firefly-ethconnect/internal/errors"
"github.com/hyperledger/firefly-ethconnect/internal/eth"
"github.com/hyperledger/firefly-ethconnect/internal/messages"
"github.com/hyperledger/firefly-ethconnect/internal/receipts"
"github.com/hyperledger/firefly-ethconnect/internal/tx"
"github.com/hyperledger/firefly-ethconnect/internal/utils"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -87,7 +88,7 @@ func (k *KafkaBridge) CobraInit() (cmd *cobra.Command) {
Short: "Kafka->Ethereum (JSON/RPC) Bridge",
RunE: func(cmd *cobra.Command, args []string) (err error) {
log.Infof("Starting Kafka bridge")
err = k.Start()
err = k.Start(nil /* only available in full server mode with co-located REST API Gateway */)
return
},
PreRunE: func(cmd *cobra.Command, args []string) (err error) {
Expand Down Expand Up @@ -145,7 +146,7 @@ func (k *KafkaBridge) addInflightMsg(msg *sarama.ConsumerMessage, producer Kafka
// is very important that the consumer of the wrapped context object calls Reply
pCtx = &ctx
k.inFlight[ctx.reqOffset] = pCtx
log.Infof("Message now in-flight: %s", pCtx)
log.Debugf("Message now in-flight: %s", pCtx)
// Attempt to process the headers from the original message,
// which could fail. In which case we still have a msgContext inflight
// that needs Reply (and offset commit). So our caller must
Expand Down Expand Up @@ -200,19 +201,19 @@ func (k *KafkaBridge) setInFlightComplete(ctx *msgContext, consumer KafkaConsume

// Build an offset sorted list of the inflight
ctx.complete = true
var completeInParition []*msgContext
var completeInPartition []*msgContext
for _, inflight := range k.inFlight {
if inflight.saramaMsg.Partition == ctx.saramaMsg.Partition {
completeInParition = append(completeInParition, inflight)
completeInPartition = append(completeInPartition, inflight)
}
}
sort.Sort(ctxByOffset(completeInParition))
sort.Sort(ctxByOffset(completeInPartition))

// Go forwards until the first that isn't complete
var readyToAck []*msgContext
for i := 0; i < len(completeInParition); i++ {
if completeInParition[i].complete {
readyToAck = append(readyToAck, completeInParition[i])
for i := 0; i < len(completeInPartition); i++ {
if completeInPartition[i].complete {
readyToAck = append(readyToAck, completeInPartition[i])
} else {
break
}
Expand All @@ -221,7 +222,7 @@ func (k *KafkaBridge) setInFlightComplete(ctx *msgContext, consumer KafkaConsume
canMark := len(readyToAck) > 0
log.Debugf("Ready=%d:%d CanMark=%t Infight=%d InflightSamePartition=%d ReadyToAck=%d",
ctx.saramaMsg.Partition, ctx.saramaMsg.Offset, canMark,
len(k.inFlight), len(completeInParition), len(readyToAck))
len(k.inFlight), len(completeInPartition), len(readyToAck))
if canMark {
// Remove all the ready-to-acks from the in-flight list
for i := 0; i < len(readyToAck); i++ {
Expand Down Expand Up @@ -345,7 +346,7 @@ func (k *KafkaBridge) ConsumerMessagesLoop(consumer KafkaConsumer, producer Kafk
log.Debugf("Kafka consumer loop started")
for msg := range consumer.Messages() {
k.inFlightCond.L.Lock()
log.Infof("Kafka consumer received message: Partition=%d Offset=%d", msg.Partition, msg.Offset)
log.Debugf("Kafka consumer received message: Partition=%d Offset=%d", msg.Partition, msg.Offset)

// We cannot build up an infinite number of messages in memory
for len(k.inFlight) >= k.conf.MaxInFlight {
Expand All @@ -361,6 +362,8 @@ func (k *KafkaBridge) ConsumerMessagesLoop(consumer KafkaConsumer, producer Kafk
// This was a dup
} else if err == nil {
// Dispatch for processing if we parsed the message successfully
headers := msgCtx.Headers()
log.Infof("Kafka consumer dispatching message '%s' Type=%s ID=%s", msgCtx.reqOffset, headers.MsgType, headers.ID)
k.processor.OnMessage(msgCtx)
} else {
// Dispatch a generic 'bad data' reply
Expand Down Expand Up @@ -399,7 +402,8 @@ func (k *KafkaBridge) ProducerSuccessLoop(consumer KafkaConsumer, producer Kafka
k.inFlightCond.L.Lock()
reqOffset := msg.Metadata.(string)
if ctx, ok := k.inFlight[reqOffset]; ok {
log.Infof("Reply sent: %s", ctx)
headers := ctx.Headers()
log.Infof("Kafka consumer replying to message '%s' Type=%s ID=%s", ctx.reqOffset, headers.MsgType, headers.ID)
// While still holding the lock, add this to the completed list
_ = k.setInFlightComplete(ctx, consumer)
// We've reduced the in-flight count - wake any waiting consumer go func
Expand All @@ -423,7 +427,9 @@ func (k *KafkaBridge) connect() (err error) {
}

// Start kicks off the bridge
func (k *KafkaBridge) Start() (err error) {
func (k *KafkaBridge) Start(receiptStore receipts.ReceiptStorePersistence) (err error) {

k.processor.SetReceiptStoreForIdempotencyCheck(receiptStore)

if *k.printYAML {
b, err := utils.MarshalToYAML(&k.conf)
Expand Down
5 changes: 5 additions & 0 deletions internal/kafka/kafkabridge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/hyperledger/firefly-ethconnect/internal/errors"
"github.com/hyperledger/firefly-ethconnect/internal/eth"
"github.com/hyperledger/firefly-ethconnect/internal/messages"
"github.com/hyperledger/firefly-ethconnect/internal/receipts"
"github.com/hyperledger/firefly-ethconnect/internal/tx"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -93,6 +94,10 @@ func (p *testKafkaMsgProcessor) OnMessage(msg tx.TxnContext) {
p.messages <- msg
return
}

func (p *testKafkaMsgProcessor) SetReceiptStoreForIdempotencyCheck(receiptStore receipts.ReceiptStorePersistence) {
}

func TestNewKafkaBridge(t *testing.T) {
assert := assert.New(t)

Expand Down
10 changes: 10 additions & 0 deletions internal/messages/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ const (
MsgTypeTransactionSuccess = "TransactionSuccess"
// MsgTypeTransactionFailure - a transaction receipt where status is 0
MsgTypeTransactionFailure = "TransactionFailure"
// MsgTypeTransactionRedeliveryPrevented - idempotency check caught a redelivery of the message
MsgTypeTransactionRedeliveryPrevented = "TransactionRedeliveryPrevented"
// RecordHeaderAccessToken - record header name for passing JWT token over messaging
RecordHeaderAccessToken = "fly-accesstoken"
)
Expand Down Expand Up @@ -194,6 +196,14 @@ type TransactionReceipt struct {
RegisterAs string `json:"registerAs,omitempty"`
}

// TransactionRedeliveryNotification is sent on redelivery of a message, when the ackmode=receipt
// idempotency check is enabled. The REST API Gateway (or other consumer), should avoid overwriting
// any received receipt when it gets this.
type TransactionRedeliveryNotification struct {
ReplyCommon
TransactionHash string `json:"transactionHash"`
}

// TransactionInfo is the detailed transaction info returned by eth_getTransactionByXXXXX
// For the big numbers, we pass a simple string as well as a full
// ethereum hex encoding version
Expand Down
Loading

0 comments on commit 0e6f5b0

Please sign in to comment.