Skip to content

Commit

Permalink
Change advertise and findpeers to async mode (#35)
Browse files Browse the repository at this point in the history
  • Loading branch information
Liuhaai authored Aug 2, 2022
1 parent 5fe315f commit 7634175
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 31 deletions.
2 changes: 1 addition & 1 deletion main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func main() {
p2p.Logger().Panic("Error when connecting to the bootstrap node", zap.Error(err))
}
host.JoinOverlay()
host.Advertise()
host.AdvertiseAsync()
}

tick := time.Tick(time.Duration(frequency) * time.Millisecond)
Expand Down
14 changes: 5 additions & 9 deletions p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,18 +411,14 @@ func (h *Host) JoinOverlay() {
h.peerManager.JoinOverlay()
}

// Advertise publish the groupID of host to the dht network
func (h *Host) Advertise() error {
// AdvertiseAsync publish the groupID of host to the dht network
func (h *Host) AdvertiseAsync() error {
return h.peerManager.Advertise()
}

// FindPeers connect host to the peers with the same groupID
func (h *Host) FindPeers(ctx context.Context) error {
if err := h.peerManager.ConnectPeers(ctx); err != nil {
Logger().Error("error when finding peers", zap.Error(err))
return err
}
return nil
// FindPeersAsync connect host to the peers with the same groupID
func (h *Host) FindPeersAsync() error {
return h.peerManager.ConnectPeers()
}

// AddUnicastPubSub adds a unicast topic that the host will pay attention to
Expand Down
39 changes: 28 additions & 11 deletions p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ func TestBroadcast(t *testing.T) {
require.NoError(t, hosts[i].Connect(ctx, bootstrapInfo))
}
hosts[i].JoinOverlay()
hosts[i].Advertise()
err := hosts[i].AdvertiseAsync()
require.NoError(t, err)
}

for i := 0; i < n; i++ {
Expand Down Expand Up @@ -95,7 +96,8 @@ func TestUnicast(t *testing.T) {
require.NoError(t, hosts[i].Connect(ctx, bootstrapInfo))
}
hosts[i].JoinOverlay()
hosts[i].Advertise()
err := hosts[i].AdvertiseAsync()
require.NoError(t, err)
}

for i, host := range hosts {
Expand Down Expand Up @@ -155,10 +157,20 @@ func TestPeerManager(t *testing.T) {

for _, host := range hosts {
host.JoinOverlay()
require.NoError(host.Advertise())
require.NoError(host.FindPeers(ctx))
require.NoError(host.AdvertiseAsync())
require.NoError(host.FindPeersAsync())
}

err = waitUntil(100*time.Millisecond, 3*time.Second, func() bool {
for _, host := range hosts {
if len(hosts) != len(host.ConnectedPeers()) {
return false
}
}
return true
})
require.NoError(err)

for _, host := range hosts {
for _, peer := range host.ConnectedPeers() {
// skip bootnode
Expand Down Expand Up @@ -212,18 +224,23 @@ func TestAddBootNode(t *testing.T) {

for _, host := range hosts {
host.JoinOverlay()
host.Advertise()
err := host.FindPeers(ctx)
require.NoError(err)
require.NoError(host.AdvertiseAsync())
require.NoError(host.FindPeersAsync())

bAddr, err := peer.AddrInfoToP2pAddrs(&bootstrapInfo)
require.NoError(err)
require.NoError(host.AddBootstrap(bAddr))
}

for _, host := range hosts {
require.Equal(n-2, len(host.ConnectedPeers()))
}
err = waitUntil(100*time.Millisecond, 3*time.Second, func() bool {
for _, host := range hosts {
if n-2 != len(host.ConnectedPeers()) {
return false
}
}
return true
})
require.NoError(err)

for i := range hosts {
require.NoError(hosts[i].Close())
Expand All @@ -246,7 +263,7 @@ func TestBlacklist(t *testing.T) {

for _, host := range hosts {
host.JoinOverlay()
host.Advertise()
require.NoError(host.AdvertiseAsync())
}

id1 := hosts[1].host.ID()
Expand Down
36 changes: 26 additions & 10 deletions peerManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type peerManager struct {
bootstrap map[core.PeerID]bool
blacklist *ttl.Cache
advertiseQueue chan string
findPeersQueue chan int

blacklistTolerance int
blacklistTimeout time.Duration
Expand Down Expand Up @@ -89,7 +90,7 @@ func newPeerManager(host core.Host, routing *discovery.RoutingDiscovery, ns stri

func (pm *peerManager) JoinOverlay() {
pm.once.Do(func() {
pm.advertiseQueue = make(chan string)
pm.advertiseQueue = make(chan string, 1)
ticker := time.NewTicker(pm.advertiseInterval)
go func(pm *peerManager) {
for {
Expand All @@ -107,14 +108,26 @@ func (pm *peerManager) JoinOverlay() {
}
}
}(pm)

pm.findPeersQueue = make(chan int, 1)
go func(pm *peerManager) {
for limit := range pm.findPeersQueue {
if err := pm.connectPeers(context.Background(), limit); err != nil {
Logger().Error("error when finding peers", zap.Error(err))
}
}
}(pm)
})
}

func (pm *peerManager) Advertise() error {
if pm.advertiseQueue == nil {
return errors.New("the host doesn't join the overlay")
}
pm.advertiseQueue <- pm.ns
select {
case pm.advertiseQueue <- pm.ns:
default:
}
return nil
}

Expand All @@ -124,19 +137,25 @@ func (pm *peerManager) AddBootstrap(ids ...core.PeerID) {
}
}

func (pm *peerManager) ConnectPeers(ctx context.Context) error {
return pm.connectPeers(ctx)
func (pm *peerManager) ConnectPeers() error {
if pm.findPeersQueue == nil {
return errors.New("the host doesn't join the overlay")
}
select {
case pm.findPeersQueue <- pm.peerLimit():
default:
}
return nil
}

func (pm *peerManager) connectPeers(ctx context.Context) error {
func (pm *peerManager) connectPeers(ctx context.Context, limit int) error {
// skip connecting when 80% peers are connected
if len(pm.host.Network().Conns()) >= pm.threshold() {
return nil
}
if pm.routing == nil {
panic(errNoDiscoverer)
}
limit := pm.peerLimit()
for retries := 0; retries < _connectRetry; retries++ {
reachLimit, err := pm.findPeers(ctx, limit)
if err != nil {
Expand All @@ -151,7 +170,7 @@ func (pm *peerManager) connectPeers(ctx context.Context) error {
break
}
// try to find peers by increasing peerlimit in the next round
limit += pm.peerLimit()
limit += limit
}
return errNoPeers
}
Expand Down Expand Up @@ -219,9 +238,6 @@ func (pm *peerManager) blocked(pr peer.ID) bool {
}

func (pm *peerManager) ConnectedPeers() []peer.AddrInfo {
if !pm.hasPeers() {
pm.connectPeers(context.Background())
}
ret := make([]core.PeerAddrInfo, 0)
conns := pm.host.Network().Conns()
// There might be multiple connections for one peerID,
Expand Down

0 comments on commit 7634175

Please sign in to comment.