Skip to content

Commit

Permalink
fix kafka retry and count update.
Browse files Browse the repository at this point in the history
puneet2019 committed May 2, 2022
1 parent 23a5f49 commit 5c4f117
Showing 7 changed files with 26 additions and 22 deletions.
4 changes: 2 additions & 2 deletions kafka/handler/msgDelegate.go
Original file line number Diff line number Diff line change
@@ -62,7 +62,7 @@ ConsumerLoop:
if err != nil {
return err
}
if configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize-m.Count < len(validators) {
if configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize-*m.Count < len(validators) {
logging.Error("Delegate transaction number is higher than slots available, probably increase to tendermint MaxBatchSize")
return nil
}
@@ -92,7 +92,7 @@ ConsumerLoop:
logging.Error("failed to produce message from: MsgDelegate to ToTendermint")
return err
}
m.Count++
*m.Count++
}
}
session.MarkMessage(kafkaMsg, "")
10 changes: 5 additions & 5 deletions kafka/handler/msgSend.go
Original file line number Diff line number Diff line change
@@ -28,7 +28,7 @@ func (m MsgHandler) HandleMsgSend(session sarama.ConsumerGroupSession, claim sar
return err
}

loop := configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize - m.Count
loop := configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize - *m.Count
if loop <= len(validators) || m.WithdrawRewards {
return nil
}
@@ -53,8 +53,8 @@ func (m MsgHandler) HandleMsgSend(session sarama.ConsumerGroupSession, claim sar
if err != nil {
return err
}
m.Count = m.Count + len(validators)
if !checkCount(m.Count, configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize) {
*m.Count = *m.Count + len(validators)
if !checkCount(*m.Count, configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize) {
break ConsumerLoop
}
m.WithdrawRewards = true
@@ -65,8 +65,8 @@ func (m MsgHandler) HandleMsgSend(session sarama.ConsumerGroupSession, claim sar
break ConsumerLoop
}
session.MarkMessage(kafkaMsg, "")
m.Count++
if !checkCount(m.Count, configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize) {
*m.Count++
if !checkCount(*m.Count, configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize) {
break ConsumerLoop
}
default:
6 changes: 3 additions & 3 deletions kafka/handler/msgUnbond.go
Original file line number Diff line number Diff line change
@@ -23,7 +23,7 @@ func (m MsgHandler) HandleMsgUnbond(session sarama.ConsumerGroupSession, claim s
}
}()

if !checkCount(m.Count, configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize) {
if !checkCount(*m.Count, configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize) {
return nil
}

@@ -46,8 +46,8 @@ ConsumerLoop:
break ConsumerLoop
}
session.MarkMessage(kafkaMsg, "")
m.Count++
if !checkCount(m.Count, configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize) {
*m.Count++
if !checkCount(*m.Count, configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize) {
break ConsumerLoop
}
default:
4 changes: 2 additions & 2 deletions kafka/handler/redelegate.go
Original file line number Diff line number Diff line change
@@ -48,7 +48,7 @@ func (m MsgHandler) HandleRedelegate(session sarama.ConsumerGroupSession, claim
if err != nil {
return err
}
if configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize-m.Count < len(validatorSet) {
if configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize-*m.Count < len(validatorSet) {
logging.Error("ReDelegate transaction number is higher than slots available, probably increase to tendermint MaxBatchSize")
return nil
}
@@ -96,7 +96,7 @@ func (m MsgHandler) HandleRedelegate(session sarama.ConsumerGroupSession, claim
logging.Error("failed to produce message from topic Redelegate to ToTendermint")
return err
}
m.Count++
*m.Count++
}
}
session.MarkMessage(kafkaMsg, "")
13 changes: 7 additions & 6 deletions kafka/handler/retryTendermint.go
Original file line number Diff line number Diff line change
@@ -25,7 +25,7 @@ func (m MsgHandler) HandleRetryTendermint(session sarama.ConsumerGroupSession, c
logging.Error("failed to close producer in topic RetryTendermint, error:", err)
}
}()
if m.Count <= 0 {
if *m.Count < 0 {
return nil
}
claimMsgChan := claim.Messages()
@@ -51,7 +51,7 @@ ConsumerLoop:
if err != nil {
return err
}
loop := configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize - m.Count
loop := configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize - *m.Count
if loop <= len(validators) {
return nil
}
@@ -60,20 +60,21 @@ ConsumerLoop:
return err
}
m.WithdrawRewards = true
m.Count = configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize - loop
if !checkCount(m.Count, configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize) {
*m.Count = configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize - loop
if !checkCount(*m.Count, configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize) {
break ConsumerLoop
}
}

logging.Info("Producing to kafka From RetryTendermint to ToTendermint: ", msg)
err = utils.ProducerDeliverMessage(kafkaMsg.Value, utils.ToTendermint, producer)
if err != nil {
logging.Error("failed to produce from: RetryTendermint to: ToTendermint, error:", err)
break ConsumerLoop
}
session.MarkMessage(kafkaMsg, "")
m.Count++
if !checkCount(m.Count, configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize) {
*m.Count++
if !checkCount(*m.Count, configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize) {
break ConsumerLoop
}
default:
2 changes: 1 addition & 1 deletion kafka/handler/type.go
Original file line number Diff line number Diff line change
@@ -18,7 +18,7 @@ type MsgHandler struct {
ProtoCodec *codec.ProtoCodec
Chain *relayer.Chain
EthClient *ethclient.Client
Count int
Count *int
WithdrawRewards bool
}

9 changes: 6 additions & 3 deletions kafka/routines.go
Original file line number Diff line number Diff line change
@@ -62,8 +62,9 @@ func consumeToEthMsgs(ctx context.Context, state utils.KafkaState,
protoCodec *codec.ProtoCodec, chain *relayer.Chain, ethereumClient *ethclient.Client, end, ended chan bool) {
consumerGroup := state.ConsumerGroup[utils.GroupToEth]
for {
count := 0
msgHandler := handler.MsgHandler{ProtoCodec: protoCodec,
Chain: chain, EthClient: ethereumClient, Count: 0}
Chain: chain, EthClient: ethereumClient, Count: &count}
err := consumerGroup.Consume(ctx, []string{utils.ToEth}, msgHandler)
if err != nil {
logging.Error("Consumer group.Consume:", err)
@@ -89,8 +90,9 @@ func consumeToTendermintMessages(ctx context.Context, state utils.KafkaState,
groupMsgToTendermint := state.ConsumerGroup[utils.GroupToTendermint]

for {
count := 0
msgHandler := handler.MsgHandler{ProtoCodec: protoCodec,
Chain: chain, EthClient: ethereumClient, Count: 0, WithdrawRewards: false}
Chain: chain, EthClient: ethereumClient, Count: &count, WithdrawRewards: false}
err := groupRedelegate.Consume(ctx, []string{utils.Redelegate}, msgHandler)
if err != nil {
logging.Error("Consumer groupRedelegate.Consume:", err)
@@ -138,8 +140,9 @@ func consumeUnbondings(ctx context.Context, state utils.KafkaState,
logging.Fatal(err)
}
if time.Now().Unix() > nextEpochTime.Epoch {
count := 0
msgHandler := handler.MsgHandler{ProtoCodec: protoCodec,
Chain: chain, EthClient: ethereumClient, Count: 0}
Chain: chain, EthClient: ethereumClient, Count: &count}
err := ethUnbondConsumerGroup.Consume(ctx, []string{utils.EthUnbond}, msgHandler)
if err != nil {
logging.Error("Consumer group.Consume for EthUnbond:", err)

0 comments on commit 5c4f117

Please sign in to comment.