diff --git a/pkg/config/groups.go b/pkg/config/groups.go index 99fdd38e4..310478c27 100644 --- a/pkg/config/groups.go +++ b/pkg/config/groups.go @@ -67,7 +67,9 @@ type clientConfiguration struct { } type kadcastConfiguration struct { - Enabled bool + Enabled bool + Limit string + Address string BootstrapAddr []string diff --git a/pkg/config/samples/default.dusk.toml b/pkg/config/samples/default.dusk.toml index 63f0a9cd5..e20e4fc55 100644 --- a/pkg/config/samples/default.dusk.toml +++ b/pkg/config/samples/default.dusk.toml @@ -86,8 +86,9 @@ address="monitor.dusk.network:1337" # Kadcast peer settings [kadcast] -# if disabled, gossip protocol is active -enabled=false +enabled=true +# Back pressure on one-to-one messaging +limit = "50ms" # grpc client connection config [kadcast.grpc] diff --git a/pkg/p2p/kadcast/writer/base.go b/pkg/p2p/kadcast/writer/base.go index 2a05c7595..61b644261 100644 --- a/pkg/p2p/kadcast/writer/base.go +++ b/pkg/p2p/kadcast/writer/base.go @@ -10,12 +10,14 @@ import ( "bytes" "context" "encoding/binary" + "time" "github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/protocol" "github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/topics" "github.com/dusk-network/dusk-blockchain/pkg/util/nativeutils/eventbus" "github.com/dusk-network/dusk-protobuf/autogen/go/rusk" "github.com/sirupsen/logrus" + "golang.org/x/time/rate" crypto "github.com/dusk-network/dusk-crypto/hash" ) @@ -36,6 +38,34 @@ type Base struct { ctx context.Context topic topics.Topic + + limiter *rate.Limiter +} + +func newBase(ctx context.Context, s eventbus.Subscriber, g *protocol.Gossip, rusk rusk.NetworkClient, t topics.Topic) Base { + return Base{ + subscriber: s, + gossip: g, + client: rusk, + ctx: ctx, + topic: t, + limiter: nil, + } +} + +func newBaseWithLimiter(ctx context.Context, s eventbus.Subscriber, g *protocol.Gossip, rusk rusk.NetworkClient, t topics.Topic, limit string) Base { + b := newBase(ctx, s, g, rusk, t) + + if len(limit) > 0 { + timeout, err := time.ParseDuration(limit) + if err != nil { + log.WithError(err).Error("could not parse kadcast limit") + } + + b.limiter = rate.NewLimiter(rate.Every(timeout), 1) + } + + return b } // Send is a wrapper of rusk.NetworkClient Send method. @@ -58,6 +88,12 @@ func (b *Base) Send(data []byte, addr string) error { Message: blob.Bytes(), } + if b.limiter != nil { + if err := b.limiter.WaitN(b.ctx, 1); err != nil { + return err + } + } + // send message if _, err := b.client.Send(b.ctx, m); err != nil { log.WithError(err).Warn("failed to send message") diff --git a/pkg/p2p/kadcast/writer/broadcast.go b/pkg/p2p/kadcast/writer/broadcast.go index 410e01db7..90ca7402a 100644 --- a/pkg/p2p/kadcast/writer/broadcast.go +++ b/pkg/p2p/kadcast/writer/broadcast.go @@ -28,13 +28,7 @@ type Broadcast struct { // NewBroadcast ... func NewBroadcast(ctx context.Context, s eventbus.Subscriber, g *protocol.Gossip, rusk rusk.NetworkClient) ring.Writer { b := &Broadcast{ - Base: Base{ - subscriber: s, - gossip: g, - client: rusk, - ctx: ctx, - topic: topics.Kadcast, - }, + Base: newBase(ctx, s, g, rusk, topics.Kadcast), } b.Subscribe() diff --git a/pkg/p2p/kadcast/writer/sendmany.go b/pkg/p2p/kadcast/writer/sendmany.go index 1ef1186ac..3a1d15f91 100644 --- a/pkg/p2p/kadcast/writer/sendmany.go +++ b/pkg/p2p/kadcast/writer/sendmany.go @@ -10,6 +10,7 @@ import ( "context" "errors" + "github.com/dusk-network/dusk-blockchain/pkg/config" "github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/protocol" "github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/topics" "github.com/dusk-network/dusk-blockchain/pkg/util/container/ring" @@ -25,14 +26,10 @@ type SendToMany struct { // NewSendToMany ... func NewSendToMany(ctx context.Context, s eventbus.Subscriber, g *protocol.Gossip, rusk rusk.NetworkClient) ring.Writer { + l := config.Get().Kadcast.Limit + w := &SendToMany{ - Base: Base{ - subscriber: s, - gossip: g, - client: rusk, - ctx: ctx, - topic: topics.KadcastSendToMany, - }, + Base: newBaseWithLimiter(ctx, s, g, rusk, topics.KadcastSendToMany, l), } w.Subscribe() diff --git a/pkg/p2p/kadcast/writer/sendone.go b/pkg/p2p/kadcast/writer/sendone.go index 3ca40c6e7..ce497b0f1 100644 --- a/pkg/p2p/kadcast/writer/sendone.go +++ b/pkg/p2p/kadcast/writer/sendone.go @@ -10,6 +10,7 @@ import ( "context" "errors" + "github.com/dusk-network/dusk-blockchain/pkg/config" "github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/protocol" "github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/topics" "github.com/dusk-network/dusk-blockchain/pkg/util/container/ring" @@ -25,14 +26,10 @@ type SendToOne struct { // NewSendToOne ... func NewSendToOne(ctx context.Context, s eventbus.Subscriber, g *protocol.Gossip, rusk rusk.NetworkClient) ring.Writer { + l := config.Get().Kadcast.Limit + w := &SendToOne{ - Base: Base{ - subscriber: s, - gossip: g, - client: rusk, - ctx: ctx, - topic: topics.KadcastSendToOne, - }, + Base: newBaseWithLimiter(ctx, s, g, rusk, topics.KadcastSendToOne, l), } w.Subscribe()