Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pubsub #34

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 0 additions & 24 deletions context.go

This file was deleted.

27 changes: 11 additions & 16 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,18 @@ go 1.13
require (
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d
github.com/iotexproject/go-pkgs v0.1.12
github.com/ipfs/go-cid v0.0.7
github.com/libp2p/go-libp2p v0.14.3
github.com/libp2p/go-libp2p-circuit v0.4.0
github.com/ipfs/go-cid v0.2.0
github.com/libp2p/go-libp2p v0.20.3
github.com/libp2p/go-libp2p-connmgr v0.2.4
github.com/libp2p/go-libp2p-core v0.8.5
github.com/libp2p/go-libp2p-discovery v0.5.0
github.com/libp2p/go-libp2p-kad-dht v0.11.1
github.com/libp2p/go-libp2p-pubsub v0.4.1
github.com/libp2p/go-libp2p-secio v0.2.2
github.com/libp2p/go-libp2p-transport-upgrader v0.4.2
github.com/libp2p/go-libp2p-yamux v0.5.4
github.com/libp2p/go-tcp-transport v0.2.3
github.com/multiformats/go-multiaddr v0.3.3
github.com/multiformats/go-multihash v0.0.15
github.com/libp2p/go-libp2p-core v0.16.1
github.com/libp2p/go-libp2p-kad-dht v0.16.0
github.com/libp2p/go-libp2p-pubsub v0.7.1
github.com/libp2p/go-libp2p-yamux v0.8.2
github.com/multiformats/go-multiaddr v0.5.0
github.com/multiformats/go-multihash v0.1.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.10.0
github.com/stretchr/testify v1.7.0
go.uber.org/zap v1.16.0
github.com/prometheus/client_golang v1.12.1
github.com/stretchr/testify v1.7.1
go.uber.org/zap v1.21.0
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324
)
732 changes: 460 additions & 272 deletions go.sum

Large diffs are not rendered by default.

7 changes: 5 additions & 2 deletions main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,15 @@ func main() {
receiveCounter.WithLabelValues(id, host.HostIdentity()).Inc()
return nil
}
HandleUnicastMsg := func(ctx context.Context, _ peer.AddrInfo, data []byte) error {
HandleBroadcastMsg := func(ctx context.Context, _ peer.ID, data []byte) error {
return HandleMsg(ctx, data)
}
if err := host.AddBroadcastPubSub(ctx, "measurement", HandleMsg); err != nil {
if err := host.AddBroadcastPubSub(ctx, "measurement", HandleBroadcastMsg); err != nil {
p2p.Logger().Panic("Error when adding broadcast pubsub.", zap.Error(err))
}
HandleUnicastMsg := func(ctx context.Context, _ peer.AddrInfo, data []byte) error {
return HandleMsg(ctx, data)
}
if err := host.AddUnicastPubSub("measurement", HandleUnicastMsg); err != nil {
p2p.Logger().Panic("Error when adding unicast pubsub", zap.Error(err))
}
Expand Down
70 changes: 42 additions & 28 deletions p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,22 @@ import (
"golang.org/x/time/rate"

"github.com/libp2p/go-libp2p"
relay "github.com/libp2p/go-libp2p-circuit"
connmgr "github.com/libp2p/go-libp2p-connmgr"
core "github.com/libp2p/go-libp2p-core"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/pnet"
"github.com/libp2p/go-libp2p-core/protocol"
discovery "github.com/libp2p/go-libp2p-discovery"
dht "github.com/libp2p/go-libp2p-kad-dht"
pubsub "github.com/libp2p/go-libp2p-pubsub"
secio "github.com/libp2p/go-libp2p-secio"
tptu "github.com/libp2p/go-libp2p-transport-upgrader"
yamux "github.com/libp2p/go-libp2p-yamux"
"github.com/libp2p/go-tcp-transport"
"github.com/libp2p/go-libp2p/p2p/discovery/routing"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
)

type (
// HandleBroadcast defines the callback function triggered when a broadcast message reaches a host
HandleBroadcast func(ctx context.Context, data []byte) error
HandleBroadcast func(context.Context, peer.ID, []byte) error

// HandleUnicast defines the callback function triggered when a unicast message reaches a host
HandleUnicast func(context.Context, peer.AddrInfo, []byte) error
Expand Down Expand Up @@ -69,6 +66,10 @@ type (
GroupID string `yaml:"groupID"`
MaxPeer int `yaml:"maxPeer"`
BlacklistTolerance int `yaml:"blacklistTolerance"`
// BufferSize in go-libp2p-pubsub
PeerOutboundQueueSize int `yaml:"peerOutboundQueueSize"`
ValidateQueueSize int `yaml:"validateQueueSize"`
SubscribeBufferSize int `yaml:"subscribeBufferSize"`
}

// RateLimitConfig all numbers are per second value.
Expand Down Expand Up @@ -107,6 +108,10 @@ var (
GroupID: "iotex",
MaxPeer: 30,
BlacklistTolerance: 3,
// TODO: experimental value
PeerOutboundQueueSize: 30000,
ValidateQueueSize: 30000,
SubscribeBufferSize: 10000,
}

// DefaultRatelimitConfig is the default rate limit config
Expand Down Expand Up @@ -317,23 +322,21 @@ func NewHost(ctx context.Context, options ...Option) (*Host, error) {
return addrs
}),
libp2p.Identity(prikey),
libp2p.DefaultSecurity,
libp2p.Transport(func(upgrader *tptu.Upgrader) *tcp.TcpTransport {
return &tcp.TcpTransport{Upgrader: upgrader, ConnectTimeout: cfg.ConnectTimeout}
}),
libp2p.Transport(tcp.NewTCPTransport, tcp.WithConnectionTimeout(cfg.ConnectTimeout)),
libp2p.Muxer("/yamux/2.0.0", yamux.DefaultTransport),
libp2p.ConnectionManager(connmgr.NewConnManager(cfg.ConnLowWater, cfg.ConnHighWater, cfg.ConnGracePeriod)),
}

if !cfg.SecureIO {
opts = append(opts, libp2p.NoSecurity)
} else {
opts = append(opts, libp2p.Security(secio.ID, secio.New))
opts = append(opts, libp2p.DefaultSecurity)
}

// relay option
if cfg.Relay == "active" {
opts = append(opts, libp2p.EnableRelay(relay.OptActive, relay.OptHop))
Logger().Panic("v1 relay is deprecated in the latest libp2p")
// opts = append(opts, libp2p.EnableRelay(relay.OptActive, relay.OptHop))
} else if cfg.Relay == "nat" {
opts = append(opts, libp2p.EnableRelay(), libp2p.NATPortMap())
} else {
Expand All @@ -353,7 +356,7 @@ func NewHost(ctx context.Context, options ...Option) (*Host, error) {
opts = append(opts, libp2p.PrivateNetwork(p))
}

host, err := libp2p.New(ctx, opts...)
host, err := libp2p.New(opts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -391,7 +394,7 @@ func NewHost(ctx context.Context, options ...Option) (*Host, error) {
ctx: ctx,
peersLimiters: limiters,
unicastLimiter: rate.NewLimiter(rate.Limit(cfg.RateLimit.GlobalUnicastAvg), cfg.RateLimit.GlobalUnicastBurst),
peerManager: newPeerManager(host, discovery.NewRoutingDiscovery(kad), cfg.GroupID,
peerManager: newPeerManager(host, routing.NewRoutingDiscovery(kad), cfg.GroupID,
withMaxPeers(cfg.MaxPeer), withBlacklistTolerance(cfg.BlacklistTolerance), withBlacklistTimeout(cfg.BlackListTimeout)),
}

Expand Down Expand Up @@ -485,6 +488,8 @@ func (h *Host) AddBroadcastPubSub(ctx context.Context, topic string, callback Ha
h.ctx,
h.host,
pubsub.WithBlacklist(blacklist),
pubsub.WithPeerOutboundQueueSize(h.cfg.PeerOutboundQueueSize),
pubsub.WithValidateQueueSize(h.cfg.ValidateQueueSize),
)
if err != nil {
return err
Expand All @@ -493,7 +498,7 @@ func (h *Host) AddBroadcastPubSub(ctx context.Context, topic string, callback Ha
if err != nil {
return err
}
sub, err := top.Subscribe()
sub, err := top.Subscribe(pubsub.WithBufferSize(h.cfg.SubscribeBufferSize))
if err != nil {
return err
}
Expand All @@ -513,20 +518,13 @@ func (h *Host) AddBroadcastPubSub(ctx context.Context, topic string, callback Ha
Logger().Error("Error when subscribing a broadcast message.", zap.Error(err))
continue
}
src := msg.GetFrom()
allowed, err := h.allowSource(src)
if err != nil {
Logger().Error("Error when checking if the source is allowed.", zap.Error(err))
continue
if h.cfg.EnableRateLimit {
err := h.handleRateLimit(topic, msg)
if err != nil {
continue
}
}
if !allowed {
h.blacklists[topic].Add(src)
Logger().Warn("Blacklist a peer", zap.Any("id", src))
continue
}
h.blacklists[topic].Remove(src)
bctx := context.WithValue(ctx, broadcastCtxKey{}, msg)
if err := callback(bctx, msg.Data); err != nil {
if err := callback(ctx, msg.GetFrom(), msg.Data); err != nil {
Logger().Error("Error when processing a broadcast message.", zap.Error(err))
}
}
Expand All @@ -548,6 +546,22 @@ func (h *Host) AddBroadcastPubSub(ctx context.Context, topic string, callback Ha
return nil
}

func (h *Host) handleRateLimit(topic string, msg *pubsub.Message) error {
src := msg.GetFrom()
allowed, err := h.allowSource(src)
if err != nil {
Logger().Error("Error when checking if the source is allowed.", zap.Error(err))
return err
}
if !allowed {
h.blacklists[topic].Add(src)
Logger().Warn("Blacklist a peer", zap.Any("id", src))
return err
}
h.blacklists[topic].Remove(src)
return nil
}

// ConnectWithMultiaddr connects a peer given the multi address
func (h *Host) ConnectWithMultiaddr(ctx context.Context, ma multiaddr.Multiaddr) error {
target, err := peer.AddrInfoFromP2pAddr(ma)
Expand Down
20 changes: 15 additions & 5 deletions p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@ import (

func TestBroadcast(t *testing.T) {
runP2P := func(t *testing.T, options ...Option) {
ctx := context.Background()
n := 10
hosts := make([]*Host, n)
var (
ctx = context.Background()
n = 10
hosts = make([]*Host, n)
count int32 = 0
broadcastCount int32 = 0
)
for i := 0; i < n; i++ {
opts := []Option{
Port(30000 + i),
Expand All @@ -30,9 +34,10 @@ func TestBroadcast(t *testing.T) {
opts = append(opts, options...)
host, err := NewHost(ctx, opts...)
require.NoError(t, err)
require.NoError(t, host.AddBroadcastPubSub(ctx, "test", func(ctx context.Context, data []byte) error {
require.NoError(t, host.AddBroadcastPubSub(ctx, "test", func(_ context.Context, _ peer.ID, data []byte) error {
fmt.Print(string(data))
fmt.Printf(", received by %s\n", host.HostIdentity())
atomic.AddInt32(&count, 1)
return nil
}))
hosts[i] = host
Expand All @@ -52,8 +57,13 @@ func TestBroadcast(t *testing.T) {
t,
hosts[i].Broadcast(ctx, "test", []byte(fmt.Sprintf("msg sent from %s", hosts[i].HostIdentity()))),
)
broadcastCount++
}

err := waitUntil(100*time.Millisecond, 3*time.Second, func() bool {
return atomic.LoadInt32(&count) >= broadcastCount*(broadcastCount-1)
})
require.NoError(t, err)
time.Sleep(100 * time.Millisecond)
for i := 0; i < n; i++ {
require.NoError(t, hosts[i].Close())
Expand All @@ -80,7 +90,7 @@ func TestUnicast(t *testing.T) {
for i := 0; i < n; i++ {
host, err := NewHost(ctx, Port(30000+i), SecureIO(), MasterKey(strconv.Itoa(i)))
require.NoError(t, err)
require.NoError(t, host.AddUnicastPubSub("test", func(ctx context.Context, _ peer.AddrInfo, data []byte) error {
require.NoError(t, host.AddUnicastPubSub("test", func(_ context.Context, _ peer.AddrInfo, data []byte) error {
fmt.Print(string(data))
fmt.Printf(", received by %s\n", host.HostIdentity())
atomic.AddInt32(&count, 1)
Expand Down
7 changes: 4 additions & 3 deletions peerManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import (

"github.com/iotexproject/go-pkgs/cache/ttl"
core "github.com/libp2p/go-libp2p-core"
discovery "github.com/libp2p/go-libp2p-core/discovery"
"github.com/libp2p/go-libp2p-core/peer"
discovery "github.com/libp2p/go-libp2p-discovery"
"github.com/libp2p/go-libp2p/p2p/discovery/routing"
"github.com/multiformats/go-multiaddr"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -49,7 +50,7 @@ func withAdvertiseInterval(t time.Duration) peerManagerOpt {
}

type peerManager struct {
routing *discovery.RoutingDiscovery
routing *routing.RoutingDiscovery
host core.Host
ns string
maxPeers int // unlimited peers when maxPeers = 0
Expand All @@ -65,7 +66,7 @@ type peerManager struct {
once sync.Once
}

func newPeerManager(host core.Host, routing *discovery.RoutingDiscovery, ns string, opts ...peerManagerOpt) *peerManager {
func newPeerManager(host core.Host, routing *routing.RoutingDiscovery, ns string, opts ...peerManagerOpt) *peerManager {
rand.Seed(time.Now().UnixNano())
pm := &peerManager{
host: host,
Expand Down