diff --git a/kafka/handler/msgDelegate.go b/kafka/handler/msgDelegate.go index 2fc8b69..8b5ebb1 100644 --- a/kafka/handler/msgDelegate.go +++ b/kafka/handler/msgDelegate.go @@ -62,7 +62,7 @@ ConsumerLoop: if err != nil { return err } - if configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize-m.Count < len(validators) { + if configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize-*m.Count < len(validators) { logging.Error("Delegate transaction number is higher than slots available, probably increase to tendermint MaxBatchSize") return nil } @@ -92,7 +92,7 @@ ConsumerLoop: logging.Error("failed to produce message from: MsgDelegate to ToTendermint") return err } - m.Count++ + *m.Count++ } } session.MarkMessage(kafkaMsg, "") diff --git a/kafka/handler/msgSend.go b/kafka/handler/msgSend.go index 70fedbe..f09739f 100644 --- a/kafka/handler/msgSend.go +++ b/kafka/handler/msgSend.go @@ -28,7 +28,7 @@ func (m MsgHandler) HandleMsgSend(session sarama.ConsumerGroupSession, claim sar return err } - loop := configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize - m.Count + loop := configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize - *m.Count if loop <= len(validators) || m.WithdrawRewards { return nil } @@ -53,8 +53,8 @@ func (m MsgHandler) HandleMsgSend(session sarama.ConsumerGroupSession, claim sar if err != nil { return err } - m.Count = m.Count + len(validators) - if !checkCount(m.Count, configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize) { + *m.Count = *m.Count + len(validators) + if !checkCount(*m.Count, configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize) { break ConsumerLoop } m.WithdrawRewards = true @@ -65,8 +65,8 @@ func (m MsgHandler) HandleMsgSend(session sarama.ConsumerGroupSession, claim sar break ConsumerLoop } session.MarkMessage(kafkaMsg, "") - m.Count++ - if !checkCount(m.Count, configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize) { + *m.Count++ + if !checkCount(*m.Count, configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize) { break ConsumerLoop } default: diff --git a/kafka/handler/msgUnbond.go b/kafka/handler/msgUnbond.go index edc47a9..c360738 100644 --- a/kafka/handler/msgUnbond.go +++ b/kafka/handler/msgUnbond.go @@ -23,7 +23,7 @@ func (m MsgHandler) HandleMsgUnbond(session sarama.ConsumerGroupSession, claim s } }() - if !checkCount(m.Count, configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize) { + if !checkCount(*m.Count, configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize) { return nil } @@ -46,8 +46,8 @@ ConsumerLoop: break ConsumerLoop } session.MarkMessage(kafkaMsg, "") - m.Count++ - if !checkCount(m.Count, configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize) { + *m.Count++ + if !checkCount(*m.Count, configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize) { break ConsumerLoop } default: diff --git a/kafka/handler/redelegate.go b/kafka/handler/redelegate.go index d02a7f9..5bfaf66 100644 --- a/kafka/handler/redelegate.go +++ b/kafka/handler/redelegate.go @@ -48,7 +48,7 @@ func (m MsgHandler) HandleRedelegate(session sarama.ConsumerGroupSession, claim if err != nil { return err } - if configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize-m.Count < len(validatorSet) { + if configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize-*m.Count < len(validatorSet) { logging.Error("ReDelegate transaction number is higher than slots available, probably increase to tendermint MaxBatchSize") return nil } @@ -96,7 +96,7 @@ func (m MsgHandler) HandleRedelegate(session sarama.ConsumerGroupSession, claim logging.Error("failed to produce message from topic Redelegate to ToTendermint") return err } - m.Count++ + *m.Count++ } } session.MarkMessage(kafkaMsg, "") diff --git a/kafka/handler/retryTendermint.go b/kafka/handler/retryTendermint.go index cb82ea8..64fe1dc 100644 --- a/kafka/handler/retryTendermint.go +++ b/kafka/handler/retryTendermint.go @@ -25,7 +25,7 @@ func (m MsgHandler) HandleRetryTendermint(session sarama.ConsumerGroupSession, c logging.Error("failed to close producer in topic RetryTendermint, error:", err) } }() - if m.Count <= 0 { + if *m.Count < 0 { return nil } claimMsgChan := claim.Messages() @@ -51,7 +51,7 @@ ConsumerLoop: if err != nil { return err } - loop := configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize - m.Count + loop := configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize - *m.Count if loop <= len(validators) { return nil } @@ -60,20 +60,21 @@ ConsumerLoop: return err } m.WithdrawRewards = true - m.Count = configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize - loop - if !checkCount(m.Count, configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize) { + *m.Count = configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize - loop + if !checkCount(*m.Count, configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize) { break ConsumerLoop } } + logging.Info("Producing to kafka From RetryTendermint to ToTendermint: ", msg) err = utils.ProducerDeliverMessage(kafkaMsg.Value, utils.ToTendermint, producer) if err != nil { logging.Error("failed to produce from: RetryTendermint to: ToTendermint, error:", err) break ConsumerLoop } session.MarkMessage(kafkaMsg, "") - m.Count++ - if !checkCount(m.Count, configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize) { + *m.Count++ + if !checkCount(*m.Count, configuration.GetAppConfig().Kafka.ToTendermint.MaxBatchSize) { break ConsumerLoop } default: diff --git a/kafka/handler/type.go b/kafka/handler/type.go index 8d5e601..9410260 100644 --- a/kafka/handler/type.go +++ b/kafka/handler/type.go @@ -18,7 +18,7 @@ type MsgHandler struct { ProtoCodec *codec.ProtoCodec Chain *relayer.Chain EthClient *ethclient.Client - Count int + Count *int WithdrawRewards bool } diff --git a/kafka/routines.go b/kafka/routines.go index 18542cf..260125a 100644 --- a/kafka/routines.go +++ b/kafka/routines.go @@ -62,8 +62,9 @@ func consumeToEthMsgs(ctx context.Context, state utils.KafkaState, protoCodec *codec.ProtoCodec, chain *relayer.Chain, ethereumClient *ethclient.Client, end, ended chan bool) { consumerGroup := state.ConsumerGroup[utils.GroupToEth] for { + count := 0 msgHandler := handler.MsgHandler{ProtoCodec: protoCodec, - Chain: chain, EthClient: ethereumClient, Count: 0} + Chain: chain, EthClient: ethereumClient, Count: &count} err := consumerGroup.Consume(ctx, []string{utils.ToEth}, msgHandler) if err != nil { logging.Error("Consumer group.Consume:", err) @@ -89,8 +90,9 @@ func consumeToTendermintMessages(ctx context.Context, state utils.KafkaState, groupMsgToTendermint := state.ConsumerGroup[utils.GroupToTendermint] for { + count := 0 msgHandler := handler.MsgHandler{ProtoCodec: protoCodec, - Chain: chain, EthClient: ethereumClient, Count: 0, WithdrawRewards: false} + Chain: chain, EthClient: ethereumClient, Count: &count, WithdrawRewards: false} err := groupRedelegate.Consume(ctx, []string{utils.Redelegate}, msgHandler) if err != nil { logging.Error("Consumer groupRedelegate.Consume:", err) @@ -138,8 +140,9 @@ func consumeUnbondings(ctx context.Context, state utils.KafkaState, logging.Fatal(err) } if time.Now().Unix() > nextEpochTime.Epoch { + count := 0 msgHandler := handler.MsgHandler{ProtoCodec: protoCodec, - Chain: chain, EthClient: ethereumClient, Count: 0} + Chain: chain, EthClient: ethereumClient, Count: &count} err := ethUnbondConsumerGroup.Consume(ctx, []string{utils.EthUnbond}, msgHandler) if err != nil { logging.Error("Consumer group.Consume for EthUnbond:", err)