diff --git a/.github/workflows/go-check.yml b/.github/workflows/go-check.yml new file mode 100644 index 00000000..724ef8e3 --- /dev/null +++ b/.github/workflows/go-check.yml @@ -0,0 +1,19 @@ +name: Go Checks + +on: + pull_request: + push: + branches: ["master"] + workflow_dispatch: + merge_group: + +permissions: + contents: read + +concurrency: + group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.event_name == 'push' && github.sha || github.ref }} + cancel-in-progress: true + +jobs: + go-check: + uses: ipdxco/unified-github-workflows/.github/workflows/go-check.yml@v1.0 diff --git a/.github/workflows/go-test-config.json b/.github/workflows/go-test-config.json new file mode 100644 index 00000000..b0642fbe --- /dev/null +++ b/.github/workflows/go-test-config.json @@ -0,0 +1,4 @@ +{ + "skipOSes": ["windows", "macos"], + "skipRace": true +} diff --git a/.github/workflows/go-test.yml b/.github/workflows/go-test.yml new file mode 100644 index 00000000..505ece58 --- /dev/null +++ b/.github/workflows/go-test.yml @@ -0,0 +1,21 @@ +name: Go Test + +on: + pull_request: + push: + branches: ["master"] + workflow_dispatch: + merge_group: + +permissions: + contents: read + +concurrency: + group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.event_name == 'push' && github.sha || github.ref }} + cancel-in-progress: true + +jobs: + go-test: + uses: ipdxco/unified-github-workflows/.github/workflows/go-test.yml@v1.0 + secrets: + CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} diff --git a/.github/workflows/release-check.yml b/.github/workflows/release-check.yml new file mode 100644 index 00000000..681b5ef1 --- /dev/null +++ b/.github/workflows/release-check.yml @@ -0,0 +1,21 @@ +name: Release Checker + +on: + pull_request_target: + paths: ["version.json"] + types: [ opened, synchronize, reopened, labeled, unlabeled ] + workflow_dispatch: + +permissions: + contents: write + pull-requests: write + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + release-check: + uses: ipdxco/unified-github-workflows/.github/workflows/release-check.yml@v1.0 + with: + sources: '["version.json"]' diff --git a/.github/workflows/releaser.yml b/.github/workflows/releaser.yml new file mode 100644 index 00000000..a2b2a044 --- /dev/null +++ b/.github/workflows/releaser.yml @@ -0,0 +1,21 @@ +name: Releaser + +on: + push: + paths: ["version.json"] + workflow_dispatch: + +permissions: + contents: write + +concurrency: + group: ${{ github.workflow }}-${{ github.sha }} + cancel-in-progress: true + +jobs: + releaser: + uses: ipdxco/unified-github-workflows/.github/workflows/releaser.yml@v1.0 + with: + sources: '["version.json"]' + secrets: + UCI_GITHUB_TOKEN: ${{ secrets.UCI_GITHUB_TOKEN }} diff --git a/.github/workflows/tagpush.yml b/.github/workflows/tagpush.yml new file mode 100644 index 00000000..5ef3fb9e --- /dev/null +++ b/.github/workflows/tagpush.yml @@ -0,0 +1,18 @@ +name: Tag Push Checker + +on: + push: + tags: + - v* + +permissions: + contents: read + issues: write + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + releaser: + uses: ipdxco/unified-github-workflows/.github/workflows/tagpush.yml@v1.0 diff --git a/backoff.go b/backoff.go index 4909e153..99ca7fd0 100644 --- a/backoff.go +++ b/backoff.go @@ -43,7 +43,6 @@ func newBackoff(ctx context.Context, sizeThreshold int, cleanupInterval time.Dur info: make(map[peer.ID]*backoffHistory), } - rand.Seed(time.Now().UnixNano()) // used for jitter go b.cleanupLoop(ctx) return b diff --git a/blacklist_test.go b/blacklist_test.go index 045a9c85..a19c46e4 100644 --- a/blacklist_test.go +++ b/blacklist_test.go @@ -38,7 +38,7 @@ func TestBlacklist(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) psubs := getPubsubs(ctx, hosts) connect(t, hosts[0], hosts[1]) @@ -66,7 +66,7 @@ func TestBlacklist2(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) psubs := getPubsubs(ctx, hosts) connect(t, hosts[0], hosts[1]) @@ -99,7 +99,7 @@ func TestBlacklist3(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) psubs := getPubsubs(ctx, hosts) psubs[1].BlacklistPeer(hosts[0].ID()) diff --git a/discovery_test.go b/discovery_test.go index 66c9c80e..f539e69d 100644 --- a/discovery_test.go +++ b/discovery_test.go @@ -134,7 +134,7 @@ func TestSimpleDiscovery(t *testing.T) { server := newDiscoveryServer() discOpts := []discovery.Option{discovery.Limit(numHosts), discovery.TTL(1 * time.Minute)} - hosts := getNetHosts(t, ctx, numHosts) + hosts := getDefaultHosts(t, numHosts) psubs := make([]*PubSub, numHosts) topicHandlers := make([]*Topic, numHosts) @@ -234,7 +234,7 @@ func TestGossipSubDiscoveryAfterBootstrap(t *testing.T) { discOpts := []discovery.Option{discovery.Limit(numHosts), discovery.TTL(ttl)} // Put the pubsub clients into two partitions - hosts := getNetHosts(t, ctx, numHosts) + hosts := getDefaultHosts(t, numHosts) psubs := make([]*PubSub, numHosts) topicHandlers := make([]*Topic, numHosts) diff --git a/floodsub_test.go b/floodsub_test.go index 35dd0d53..e7bf379f 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -3,11 +3,12 @@ package pubsub import ( "bytes" "context" + crand "crypto/rand" "crypto/sha256" "encoding/base64" "fmt" "io" - "math/rand" + mrand "math/rand" "sort" "sync" "testing" @@ -20,15 +21,13 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" - bhost "github.com/libp2p/go-libp2p/p2p/host/blank" - swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing" - + //lint:ignore SA1019 "github.com/libp2p/go-msgio/protoio" is deprecated "github.com/libp2p/go-msgio/protoio" ) func checkMessageRouting(t *testing.T, topic string, pubs []*PubSub, subs []*Subscription) { data := make([]byte, 16) - rand.Read(data) + crand.Read(data) for _, p := range pubs { err := p.Publish(topic, data) @@ -42,19 +41,6 @@ func checkMessageRouting(t *testing.T, topic string, pubs []*PubSub, subs []*Sub } } -func getNetHosts(t *testing.T, ctx context.Context, n int) []host.Host { - var out []host.Host - - for i := 0; i < n; i++ { - netw := swarmt.GenSwarm(t) - h := bhost.NewBlankHost(netw) - t.Cleanup(func() { h.Close() }) - out = append(out, h) - } - - return out -} - func connect(t *testing.T, a, b host.Host) { pinfo := a.Peerstore().PeerInfo(a.ID()) err := b.Connect(context.Background(), pinfo) @@ -74,7 +60,7 @@ func denseConnect(t *testing.T, hosts []host.Host) { func connectSome(t *testing.T, hosts []host.Host, d int) { for i, a := range hosts { for j := 0; j < d; j++ { - n := rand.Intn(len(hosts)) + n := mrand.Intn(len(hosts)) if n == i { j-- continue @@ -151,7 +137,7 @@ func assertNeverReceives(t *testing.T, ch *Subscription, timeout time.Duration) func TestBasicFloodsub(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 20) + hosts := getDefaultHosts(t, 20) psubs := getPubsubs(ctx, hosts) @@ -173,7 +159,7 @@ func TestBasicFloodsub(t *testing.T) { for i := 0; i < 100; i++ { msg := []byte(fmt.Sprintf("%d the flooooooood %d", i, i)) - owner := rand.Intn(len(psubs)) + owner := mrand.Intn(len(psubs)) psubs[owner].Publish("foobar", msg) @@ -193,7 +179,7 @@ func TestMultihops(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 6) + hosts := getDefaultHosts(t, 6) psubs := getPubsubs(ctx, hosts) @@ -235,7 +221,7 @@ func TestReconnects(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 3) + hosts := getDefaultHosts(t, 3) psubs := getPubsubs(ctx, hosts) @@ -309,7 +295,7 @@ func TestNoConnection(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 10) + hosts := getDefaultHosts(t, 10) psubs := getPubsubs(ctx, hosts) @@ -334,7 +320,7 @@ func TestSelfReceive(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - host := getNetHosts(t, ctx, 1)[0] + host := getDefaultHosts(t, 1)[0] psub, err := NewFloodSub(ctx, host) if err != nil { @@ -368,7 +354,7 @@ func TestOneToOne(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) psubs := getPubsubs(ctx, hosts) connect(t, hosts[0], hosts[1]) @@ -401,7 +387,7 @@ func TestTreeTopology(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 10) + hosts := getDefaultHosts(t, 10) psubs := getPubsubs(ctx, hosts) connect(t, hosts[0], hosts[1]) @@ -464,7 +450,7 @@ func TestFloodSubPluggableProtocol(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 3) + hosts := getDefaultHosts(t, 3) psubA := mustCreatePubSub(ctx, t, hosts[0], "/esh/floodsub", "/lsr/floodsub") psubB := mustCreatePubSub(ctx, t, hosts[1], "/esh/floodsub") @@ -496,7 +482,7 @@ func TestFloodSubPluggableProtocol(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) psubA := mustCreatePubSub(ctx, t, hosts[0], "/esh/floodsub") psubB := mustCreatePubSub(ctx, t, hosts[1], "/lsr/floodsub") @@ -551,7 +537,7 @@ func TestSubReporting(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - host := getNetHosts(t, ctx, 1)[0] + host := getDefaultHosts(t, 1)[0] psub, err := NewFloodSub(ctx, host) if err != nil { t.Fatal(err) @@ -593,7 +579,7 @@ func TestPeerTopicReporting(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 4) + hosts := getDefaultHosts(t, 4) psubs := getPubsubs(ctx, hosts) connect(t, hosts[0], hosts[1]) @@ -650,7 +636,7 @@ func TestSubscribeMultipleTimes(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) psubs := getPubsubs(ctx, hosts) connect(t, hosts[0], hosts[1]) @@ -695,7 +681,7 @@ func TestPeerDisconnect(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) psubs := getPubsubs(ctx, hosts) connect(t, hosts[0], hosts[1]) @@ -743,7 +729,7 @@ func TestWithNoSigning(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) psubs := getPubsubs(ctx, hosts, WithNoAuthor(), WithMessageIdFn(func(pmsg *pb.Message) string { // silly content-based test message-ID: just use the data as whole return base64.URLEncoding.EncodeToString(pmsg.Data) @@ -788,7 +774,7 @@ func TestWithSigning(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) psubs := getPubsubs(ctx, hosts, WithStrictSignatureVerification(true)) connect(t, hosts[0], hosts[1]) @@ -830,7 +816,7 @@ func TestImproperlySignedMessageRejected(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) adversary := hosts[0] honestPeer := hosts[1] @@ -948,7 +934,7 @@ func TestMessageSender(t *testing.T) { const topic = "foobar" - hosts := getNetHosts(t, ctx, 3) + hosts := getDefaultHosts(t, 3) psubs := getPubsubs(ctx, hosts) var msgs []*Subscription @@ -1002,7 +988,7 @@ func TestConfigurableMaxMessageSize(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 10) + hosts := getDefaultHosts(t, 10) // use a 4mb limit; default is 1mb; we'll test with a 2mb payload. psubs := getPubsubs(ctx, hosts, WithMaxMessageSize(1<<22)) @@ -1022,7 +1008,7 @@ func TestConfigurableMaxMessageSize(t *testing.T) { // 2mb payload. msg := make([]byte, 1<<21) - rand.Read(msg) + crand.Read(msg) err := psubs[0].Publish(topic, msg) if err != nil { t.Fatal(err) @@ -1045,7 +1031,7 @@ func TestAnnounceRetry(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) ps := getPubsub(ctx, hosts[0]) watcher := &announceWatcher{} hosts[1].SetStreamHandler(FloodSubID, watcher.handleStream) @@ -1117,7 +1103,7 @@ func TestPubsubWithAssortedOptions(t *testing.T) { return string(hash[:]) } - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) psubs := getPubsubs(ctx, hosts, WithMessageIdFn(hashMsgID), WithPeerOutboundQueueSize(10), @@ -1152,8 +1138,7 @@ func TestWithInvalidMessageAuthor(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - h := bhost.NewBlankHost(swarmt.GenSwarm(t)) - defer h.Close() + h := getDefaultHosts(t, 1)[0] _, err := NewFloodSub(ctx, h, WithMessageAuthor("bogotr0n")) if err == nil { t.Fatal("expected error") @@ -1168,10 +1153,9 @@ func TestPreconnectedNodes(t *testing.T) { defer cancel() // Create hosts - h1 := bhost.NewBlankHost(swarmt.GenSwarm(t)) - h2 := bhost.NewBlankHost(swarmt.GenSwarm(t)) - defer h1.Close() - defer h2.Close() + hosts := getDefaultHosts(t, 2) + h1 := hosts[0] + h2 := hosts[1] opts := []Option{WithDiscovery(&dummyDiscovery{})} // Setup first PubSub @@ -1229,10 +1213,9 @@ func TestDedupInboundStreams(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - h1 := bhost.NewBlankHost(swarmt.GenSwarm(t)) - h2 := bhost.NewBlankHost(swarmt.GenSwarm(t)) - defer h1.Close() - defer h2.Close() + hosts := getDefaultHosts(t, 2) + h1 := hosts[0] + h2 := hosts[1] _, err := NewFloodSub(ctx, h1) if err != nil { @@ -1247,18 +1230,30 @@ func TestDedupInboundStreams(t *testing.T) { if err != nil { t.Fatal(err) } + _, err = s1.Read(nil) // force protocol negotiation to complete + if err != nil { + t.Fatal(err) + } time.Sleep(100 * time.Millisecond) s2, err := h2.NewStream(ctx, h1.ID(), FloodSubID) if err != nil { t.Fatal(err) } + _, err = s2.Read(nil) // force protocol negotiation to complete + if err != nil { + t.Fatal(err) + } time.Sleep(100 * time.Millisecond) s3, err := h2.NewStream(ctx, h1.ID(), FloodSubID) if err != nil { t.Fatal(err) } + _, err = s3.Read(nil) // force protocol negotiation to complete + if err != nil { + t.Fatal(err) + } time.Sleep(100 * time.Millisecond) // check that s1 and s2 have been reset diff --git a/gossipsub.go b/gossipsub.go index 21f34bec..da841615 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -3,6 +3,7 @@ package pubsub import ( "context" "fmt" + "io" "math/rand" "sort" "time" @@ -543,6 +544,13 @@ func (gs *GossipSubRouter) manageAddrBook() { for { select { case <-gs.p.ctx.Done(): + cabCloser, ok := gs.cab.(io.Closer) + if ok { + errClose := cabCloser.Close() + if errClose != nil { + log.Warnf("failed to close addr book: %v", errClose) + } + } return case ev := <-sub.Out(): switch ev := ev.(type) { diff --git a/gossipsub_connmgr_test.go b/gossipsub_connmgr_test.go index 0a97312c..a5477026 100644 --- a/gossipsub_connmgr_test.go +++ b/gossipsub_connmgr_test.go @@ -7,15 +7,14 @@ import ( "github.com/benbjohnson/clock" "github.com/libp2p/go-libp2p/core/host" - swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing" + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" - bhost "github.com/libp2p/go-libp2p/p2p/host/blank" "github.com/libp2p/go-libp2p/p2p/net/connmgr" ) func TestGossipsubConnTagMessageDeliveries(t *testing.T) { - t.Skip("Test disabled with go-libp2p v0.22.0") // TODO: reenable test when updating to v0.23.0 ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -70,9 +69,14 @@ func TestGossipsubConnTagMessageDeliveries(t *testing.T) { t.Fatal(err) } - netw := swarmt.GenSwarm(t) - defer netw.Close() - h := bhost.NewBlankHost(netw, bhost.WithConnectionManager(connmgrs[i])) + h, err := libp2p.New( + libp2p.ResourceManager(&network.NullResourceManager{}), + libp2p.ConnectionManager(connmgrs[i]), + ) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { h.Close() }) honestHosts[i] = h honestPeers[h.ID()] = struct{}{} } @@ -83,9 +87,9 @@ func TestGossipsubConnTagMessageDeliveries(t *testing.T) { WithFloodPublish(true)) // sybil squatters to be connected later - sybilHosts := getNetHosts(t, ctx, nSquatter) + sybilHosts := getDefaultHosts(t, nSquatter) for _, h := range sybilHosts { - squatter := &sybilSquatter{h: h} + squatter := &sybilSquatter{h: h, ignoreErrors: true} h.SetStreamHandler(GossipSubID_v10, squatter.handleStream) } @@ -139,18 +143,6 @@ func TestGossipsubConnTagMessageDeliveries(t *testing.T) { allHosts := append(honestHosts, sybilHosts...) connectAll(t, allHosts) - // verify that we have a bunch of connections - for _, h := range honestHosts { - if len(h.Network().Conns()) != nHonest+nSquatter-1 { - t.Errorf("expected to have conns to all peers, have %d", len(h.Network().Conns())) - } - } - - // force the connection managers to trim, so we don't need to muck about with timing as much - for _, cm := range connmgrs { - cm.TrimOpenConns(ctx) - } - // we should still have conns to all the honest peers, but not the sybils for _, h := range honestHosts { nHonestConns := 0 diff --git a/gossipsub_feat_test.go b/gossipsub_feat_test.go index 712f16df..93cfb4c3 100644 --- a/gossipsub_feat_test.go +++ b/gossipsub_feat_test.go @@ -42,7 +42,7 @@ func TestGossipSubCustomProtocols(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 3) + hosts := getDefaultHosts(t, 3) gsubs := getGossipsubs(ctx, hosts[:2], WithGossipSubProtocols(protos, features)) fsub := getPubsub(ctx, hosts[2]) diff --git a/gossipsub_matchfn_test.go b/gossipsub_matchfn_test.go index 279f0d34..4d688d25 100644 --- a/gossipsub_matchfn_test.go +++ b/gossipsub_matchfn_test.go @@ -17,7 +17,7 @@ func TestGossipSubMatchingFn(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - h := getNetHosts(t, ctx, 4) + h := getDefaultHosts(t, 4) psubs := []*PubSub{ getGossipsub(ctx, h[0], WithProtocolMatchFn(protocolNameMatch), WithGossipSubProtocols([]protocol.ID{customsubA100, GossipSubID_v11}, GossipSubDefaultFeatures)), getGossipsub(ctx, h[1], WithProtocolMatchFn(protocolNameMatch), WithGossipSubProtocols([]protocol.ID{customsubA101Beta}, GossipSubDefaultFeatures)), diff --git a/gossipsub_spam_test.go b/gossipsub_spam_test.go index f31daaab..ab22e7a9 100644 --- a/gossipsub_spam_test.go +++ b/gossipsub_spam_test.go @@ -2,7 +2,7 @@ package pubsub import ( "context" - "math/rand" + "crypto/rand" "strconv" "sync" "testing" @@ -15,6 +15,7 @@ import ( pb "github.com/libp2p/go-libp2p-pubsub/pb" + //lint:ignore SA1019 "github.com/libp2p/go-msgio/protoio" is deprecated "github.com/libp2p/go-msgio/protoio" ) @@ -25,7 +26,7 @@ func TestGossipsubAttackSpamIWANT(t *testing.T) { defer cancel() // Create legitimate and attacker hosts - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) legit := hosts[0] attacker := hosts[1] @@ -142,7 +143,7 @@ func TestGossipsubAttackSpamIHAVE(t *testing.T) { defer cancel() // Create legitimate and attacker hosts - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) legit := hosts[0] attacker := hosts[1] @@ -195,6 +196,7 @@ func TestGossipsubAttackSpamIHAVE(t *testing.T) { Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicID: sub.Topicid}}}, }) + sub := sub go func() { defer cancel() @@ -292,7 +294,7 @@ func TestGossipsubAttackGRAFTNonExistentTopic(t *testing.T) { defer cancel() // Create legitimate and attacker hosts - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) legit := hosts[0] attacker := hosts[1] @@ -376,7 +378,7 @@ func TestGossipsubAttackGRAFTDuringBackoff(t *testing.T) { defer cancel() // Create legitimate and attacker hosts - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) legit := hosts[0] attacker := hosts[1] @@ -430,6 +432,7 @@ func TestGossipsubAttackGRAFTDuringBackoff(t *testing.T) { Control: &pb.ControlMessage{Graft: graft}, }) + sub := sub go func() { defer cancel() @@ -617,7 +620,7 @@ func TestGossipsubAttackInvalidMessageSpam(t *testing.T) { defer cancel() // Create legitimate and attacker hosts - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) legit := hosts[0] attacker := hosts[1] diff --git a/gossipsub_test.go b/gossipsub_test.go index 5933f4b5..4481be9e 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -3,9 +3,10 @@ package pubsub import ( "bytes" "context" + crand "crypto/rand" "fmt" "io" - "math/rand" + mrand "math/rand" "sync" "sync/atomic" "testing" @@ -13,16 +14,13 @@ import ( pb "github.com/libp2p/go-libp2p-pubsub/pb" - "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/record" - bhost "github.com/libp2p/go-libp2p/p2p/host/blank" - swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing" - + //lint:ignore SA1019 "github.com/libp2p/go-msgio/protoio" is deprecated "github.com/libp2p/go-msgio/protoio" ) @@ -45,7 +43,7 @@ func getGossipsubs(ctx context.Context, hs []host.Host, opts ...Option) []*PubSu func TestSparseGossipsub(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 20) + hosts := getDefaultHosts(t, 20) psubs := getGossipsubs(ctx, hosts) @@ -67,7 +65,7 @@ func TestSparseGossipsub(t *testing.T) { for i := 0; i < 100; i++ { msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) - owner := rand.Intn(len(psubs)) + owner := mrand.Intn(len(psubs)) psubs[owner].Publish("foobar", msg) @@ -86,7 +84,7 @@ func TestSparseGossipsub(t *testing.T) { func TestDenseGossipsub(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 20) + hosts := getDefaultHosts(t, 20) psubs := getGossipsubs(ctx, hosts) @@ -108,7 +106,7 @@ func TestDenseGossipsub(t *testing.T) { for i := 0; i < 100; i++ { msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) - owner := rand.Intn(len(psubs)) + owner := mrand.Intn(len(psubs)) psubs[owner].Publish("foobar", msg) @@ -127,7 +125,7 @@ func TestDenseGossipsub(t *testing.T) { func TestGossipsubFanout(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 20) + hosts := getDefaultHosts(t, 20) psubs := getGossipsubs(ctx, hosts) @@ -196,7 +194,7 @@ func TestGossipsubFanout(t *testing.T) { func TestGossipsubFanoutMaintenance(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 20) + hosts := getDefaultHosts(t, 20) psubs := getGossipsubs(ctx, hosts) @@ -281,7 +279,7 @@ func TestGossipsubFanoutExpiry(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 10) + hosts := getDefaultHosts(t, 10) psubs := getGossipsubs(ctx, hosts) @@ -340,7 +338,7 @@ func TestGossipsubFanoutExpiry(t *testing.T) { func TestGossipsubGossip(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 20) + hosts := getDefaultHosts(t, 20) psubs := getGossipsubs(ctx, hosts) @@ -362,7 +360,7 @@ func TestGossipsubGossip(t *testing.T) { for i := 0; i < 100; i++ { msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) - owner := rand.Intn(len(psubs)) + owner := mrand.Intn(len(psubs)) psubs[owner].Publish("foobar", msg) @@ -388,7 +386,7 @@ func TestGossipsubGossipPiggyback(t *testing.T) { t.Skip("test no longer relevant; gossip propagation has become eager") ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 20) + hosts := getDefaultHosts(t, 20) psubs := getGossipsubs(ctx, hosts) @@ -420,7 +418,7 @@ func TestGossipsubGossipPiggyback(t *testing.T) { for i := 0; i < 100; i++ { msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) - owner := rand.Intn(len(psubs)) + owner := mrand.Intn(len(psubs)) psubs[owner].Publish("foobar", msg) psubs[owner].Publish("bazcrux", msg) @@ -457,7 +455,7 @@ func TestGossipsubGossipPropagation(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 20) + hosts := getDefaultHosts(t, 20) psubs := getGossipsubs(ctx, hosts) hosts1 := hosts[:GossipSubD+1] @@ -537,7 +535,7 @@ func TestGossipsubGossipPropagation(t *testing.T) { func TestGossipsubPrune(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 20) + hosts := getDefaultHosts(t, 20) psubs := getGossipsubs(ctx, hosts) @@ -567,7 +565,7 @@ func TestGossipsubPrune(t *testing.T) { for i := 0; i < 10; i++ { msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) - owner := rand.Intn(len(psubs)) + owner := mrand.Intn(len(psubs)) psubs[owner].Publish("foobar", msg) @@ -586,7 +584,7 @@ func TestGossipsubPrune(t *testing.T) { func TestGossipsubPruneBackoffTime(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 10) + hosts := getDefaultHosts(t, 10) // App specific score that we'll change later. currentScoreForHost0 := int32(0) @@ -665,7 +663,7 @@ func TestGossipsubPruneBackoffTime(t *testing.T) { msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) // Don't publish from host 0, since everyone should have pruned it. - owner := rand.Intn(len(psubs)-1) + 1 + owner := mrand.Intn(len(psubs)-1) + 1 psubs[owner].Publish("foobar", msg) @@ -684,7 +682,7 @@ func TestGossipsubPruneBackoffTime(t *testing.T) { func TestGossipsubGraft(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 20) + hosts := getDefaultHosts(t, 20) psubs := getGossipsubs(ctx, hosts) @@ -710,7 +708,7 @@ func TestGossipsubGraft(t *testing.T) { for i := 0; i < 100; i++ { msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) - owner := rand.Intn(len(psubs)) + owner := mrand.Intn(len(psubs)) psubs[owner].Publish("foobar", msg) @@ -729,7 +727,7 @@ func TestGossipsubGraft(t *testing.T) { func TestGossipsubRemovePeer(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 20) + hosts := getDefaultHosts(t, 20) psubs := getGossipsubs(ctx, hosts) @@ -759,7 +757,7 @@ func TestGossipsubRemovePeer(t *testing.T) { for i := 0; i < 10; i++ { msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) - owner := 5 + rand.Intn(len(psubs)-5) + owner := 5 + mrand.Intn(len(psubs)-5) psubs[owner].Publish("foobar", msg) @@ -779,7 +777,7 @@ func TestGossipsubGraftPruneRetry(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 10) + hosts := getDefaultHosts(t, 10) psubs := getGossipsubs(ctx, hosts) denseConnect(t, hosts) @@ -807,7 +805,7 @@ func TestGossipsubGraftPruneRetry(t *testing.T) { for i, topic := range topics { msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) - owner := rand.Intn(len(psubs)) + owner := mrand.Intn(len(psubs)) psubs[owner].Publish(topic, msg) @@ -829,7 +827,7 @@ func TestGossipsubControlPiggyback(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 10) + hosts := getDefaultHosts(t, 10) psubs := getGossipsubs(ctx, hosts) denseConnect(t, hosts) @@ -853,7 +851,7 @@ func TestGossipsubControlPiggyback(t *testing.T) { // create a background flood of messages that overloads the queues done := make(chan struct{}) go func() { - owner := rand.Intn(len(psubs)) + owner := mrand.Intn(len(psubs)) for i := 0; i < 10000; i++ { msg := []byte("background flooooood") psubs[owner].Publish("flood", msg) @@ -891,7 +889,7 @@ func TestGossipsubControlPiggyback(t *testing.T) { for i, topic := range topics { msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) - owner := rand.Intn(len(psubs)) + owner := mrand.Intn(len(psubs)) psubs[owner].Publish(topic, msg) @@ -910,7 +908,7 @@ func TestGossipsubControlPiggyback(t *testing.T) { func TestMixedGossipsub(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 30) + hosts := getDefaultHosts(t, 30) gsubs := getGossipsubs(ctx, hosts[:20]) fsubs := getPubsubs(ctx, hosts[20:]) @@ -934,7 +932,7 @@ func TestMixedGossipsub(t *testing.T) { for i := 0; i < 100; i++ { msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) - owner := rand.Intn(len(psubs)) + owner := mrand.Intn(len(psubs)) psubs[owner].Publish("foobar", msg) @@ -954,7 +952,7 @@ func TestGossipsubMultihops(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 6) + hosts := getDefaultHosts(t, 6) psubs := getGossipsubs(ctx, hosts) @@ -997,7 +995,7 @@ func TestGossipsubTreeTopology(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 10) + hosts := getDefaultHosts(t, 10) psubs := getGossipsubs(ctx, hosts) connect(t, hosts[0], hosts[1]) @@ -1061,7 +1059,7 @@ func TestGossipsubStarTopology(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 20) + hosts := getDefaultHosts(t, 20) psubs := getGossipsubs(ctx, hosts, WithPeerExchange(true), WithFloodPublish(true)) // configure the center of the star with a very low D @@ -1223,7 +1221,7 @@ func TestGossipsubDirectPeers(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - h := getNetHosts(t, ctx, 3) + h := getDefaultHosts(t, 3) psubs := []*PubSub{ getGossipsub(ctx, h[0], WithDirectConnectTicks(2)), getGossipsub(ctx, h[1], WithDirectPeers([]peer.AddrInfo{{ID: h[2].ID(), Addrs: h[2].Addrs()}}), WithDirectConnectTicks(2)), @@ -1287,7 +1285,7 @@ func TestGossipSubPeerFilter(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - h := getNetHosts(t, ctx, 3) + h := getDefaultHosts(t, 3) psubs := []*PubSub{ getGossipsub(ctx, h[0], WithPeerFilter(func(pid peer.ID, topic string) bool { return pid == h[1].ID() @@ -1329,7 +1327,7 @@ func TestGossipsubDirectPeersFanout(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - h := getNetHosts(t, ctx, 3) + h := getDefaultHosts(t, 3) psubs := []*PubSub{ getGossipsub(ctx, h[0]), getGossipsub(ctx, h[1], WithDirectPeers([]peer.AddrInfo{{ID: h[2].ID(), Addrs: h[2].Addrs()}})), @@ -1416,7 +1414,7 @@ func TestGossipsubFloodPublish(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 20) + hosts := getDefaultHosts(t, 20) psubs := getGossipsubs(ctx, hosts, WithFloodPublish(true)) // build the star @@ -1451,7 +1449,7 @@ func TestGossipsubEnoughPeers(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 20) + hosts := getDefaultHosts(t, 20) psubs := getGossipsubs(ctx, hosts) for _, ps := range psubs { @@ -1500,7 +1498,7 @@ func TestGossipsubCustomParams(t *testing.T) { wantedMaxPendingConns := 23 params.MaxPendingConnections = wantedMaxPendingConns - hosts := getNetHosts(t, ctx, 1) + hosts := getDefaultHosts(t, 1) psubs := getGossipsubs(ctx, hosts, WithGossipSubParams(params)) @@ -1529,7 +1527,7 @@ func TestGossipsubNegativeScore(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 20) + hosts := getDefaultHosts(t, 20) psubs := getGossipsubs(ctx, hosts, WithPeerScore( &PeerScoreParams{ @@ -1613,7 +1611,7 @@ func TestGossipsubScoreValidatorEx(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 3) + hosts := getDefaultHosts(t, 3) psubs := getGossipsubs(ctx, hosts, WithPeerScore( &PeerScoreParams{ @@ -1701,8 +1699,7 @@ func TestGossipsubPiggybackControl(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - h := bhost.NewBlankHost(swarmt.GenSwarm(t)) - defer h.Close() + h := getDefaultHosts(t, 1)[0] ps := getGossipsub(ctx, h) blah := peer.ID("bogotr0n") @@ -1750,7 +1747,7 @@ func TestGossipsubMultipleGraftTopics(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) psubs := getGossipsubs(ctx, hosts) sparseConnect(t, hosts) @@ -1818,7 +1815,7 @@ func TestGossipsubOpportunisticGrafting(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 50) + hosts := getDefaultHosts(t, 50) // pubsubs for the first 10 hosts psubs := getGossipsubs(ctx, hosts[:10], WithFloodPublish(true), @@ -1919,7 +1916,7 @@ func TestGossipSubLeaveTopic(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - h := getNetHosts(t, ctx, 2) + h := getDefaultHosts(t, 2) psubs := []*PubSub{ getGossipsub(ctx, h[0]), getGossipsub(ctx, h[1]), @@ -1928,13 +1925,11 @@ func TestGossipSubLeaveTopic(t *testing.T) { connect(t, h[0], h[1]) // Join all peers - var subs []*Subscription for _, ps := range psubs { - sub, err := ps.Subscribe("test") + _, err := ps.Subscribe("test") if err != nil { t.Fatal(err) } - subs = append(subs, sub) } time.Sleep(time.Second) @@ -1990,7 +1985,7 @@ func TestGossipSubJoinTopic(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - h := getNetHosts(t, ctx, 3) + h := getDefaultHosts(t, 3) psubs := []*PubSub{ getGossipsub(ctx, h[0]), getGossipsub(ctx, h[1]), @@ -2009,13 +2004,11 @@ func TestGossipSubJoinTopic(t *testing.T) { router0.backoff["test"] = peerMap // Join all peers - var subs []*Subscription for _, ps := range psubs { - sub, err := ps.Subscribe("test") + _, err := ps.Subscribe("test") if err != nil { t.Fatal(err) } - subs = append(subs, sub) } time.Sleep(time.Second) @@ -2032,7 +2025,8 @@ func TestGossipSubJoinTopic(t *testing.T) { } type sybilSquatter struct { - h host.Host + h host.Host + ignoreErrors bool // set to false to ignore connection/stream errors. } func (sq *sybilSquatter) handleStream(s network.Stream) { @@ -2040,7 +2034,10 @@ func (sq *sybilSquatter) handleStream(s network.Stream) { os, err := sq.h.NewStream(context.Background(), s.Conn().RemotePeer(), GossipSubID_v10) if err != nil { - panic(err) + if !sq.ignoreErrors { + panic(err) + } + return } // send a subscription for test in the output stream to become candidate for GRAFT @@ -2051,7 +2048,10 @@ func (sq *sybilSquatter) handleStream(s network.Stream) { topic := "test" err = w.WriteMsg(&pb.RPC{Subscriptions: []*pb.RPC_SubOpts{{Subscribe: &truth, Topicid: &topic}}}) if err != nil { - panic(err) + if !sq.ignoreErrors { + panic(err) + } + return } var rpc pb.RPC @@ -2072,7 +2072,7 @@ func TestGossipsubPeerScoreInspect(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) inspector := &mockPeerScoreInspector{} psub1 := getGossipsub(ctx, hosts[0], @@ -2132,7 +2132,7 @@ func TestGossipsubPeerScoreResetTopicParams(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 1) + hosts := getDefaultHosts(t, 1) ps := getGossipsub(ctx, hosts[0], WithPeerScore( @@ -2199,7 +2199,7 @@ func TestGossipsubRPCFragmentation(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) ps := getGossipsub(ctx, hosts[0]) // make a fake peer that requests everything through IWANT gossip @@ -2222,7 +2222,7 @@ func TestGossipsubRPCFragmentation(t *testing.T) { msgSize := 20000 for i := 0; i < nMessages; i++ { msg := make([]byte, msgSize) - rand.Read(msg) + crand.Read(msg) ps.Publish("test", msg) time.Sleep(20 * time.Millisecond) } @@ -2362,7 +2362,7 @@ func TestFragmentRPCFunction(t *testing.T) { mkMsg := func(size int) *pb.Message { msg := &pb.Message{} msg.Data = make([]byte, size-4) // subtract the protobuf overhead, so msg.Size() returns requested size - rand.Read(msg.Data) + crand.Read(msg.Data) return msg } @@ -2476,7 +2476,7 @@ func TestFragmentRPCFunction(t *testing.T) { messageIds := make([]string, msgsPerTopic) for m := 0; m < msgsPerTopic; m++ { mid := make([]byte, messageIdSize) - rand.Read(mid) + crand.Read(mid) messageIds[m] = string(mid) } rpc.Control.Ihave[i] = &pb.ControlIHave{MessageIDs: messageIds} @@ -2497,7 +2497,7 @@ func TestFragmentRPCFunction(t *testing.T) { // It should not be present in the fragmented messages, but smaller IDs should be rpc.Reset() giantIdBytes := make([]byte, limit*2) - rand.Read(giantIdBytes) + crand.Read(giantIdBytes) rpc.Control = &pb.ControlMessage{ Iwant: []*pb.ControlIWant{ {MessageIDs: []string{"hello", string(giantIdBytes)}}, @@ -2553,21 +2553,6 @@ func FuzzAppendOrMergeRPC(f *testing.F) { }) } -func getDefaultHosts(t *testing.T, n int) []host.Host { - var out []host.Host - - for i := 0; i < n; i++ { - h, err := libp2p.New(libp2p.ResourceManager(&network.NullResourceManager{})) - if err != nil { - t.Fatal(err) - } - t.Cleanup(func() { h.Close() }) - out = append(out, h) - } - - return out -} - func TestGossipsubManagesAnAddressBook(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/notify.go b/notify.go deleted file mode 100644 index f560d398..00000000 --- a/notify.go +++ /dev/null @@ -1,75 +0,0 @@ -package pubsub - -import ( - "github.com/libp2p/go-libp2p/core/network" - "github.com/libp2p/go-libp2p/core/peer" - ma "github.com/multiformats/go-multiaddr" -) - -var _ network.Notifiee = (*PubSubNotif)(nil) - -type PubSubNotif PubSub - -func (p *PubSubNotif) OpenedStream(n network.Network, s network.Stream) { -} - -func (p *PubSubNotif) ClosedStream(n network.Network, s network.Stream) { -} - -func (p *PubSubNotif) Connected(n network.Network, c network.Conn) { - // ignore transient connections - if c.Stat().Limited { - return - } - - go func() { - p.newPeersPrioLk.RLock() - p.newPeersMx.Lock() - p.newPeersPend[c.RemotePeer()] = struct{}{} - p.newPeersMx.Unlock() - p.newPeersPrioLk.RUnlock() - - select { - case p.newPeers <- struct{}{}: - default: - } - }() -} - -func (p *PubSubNotif) Disconnected(n network.Network, c network.Conn) { -} - -func (p *PubSubNotif) Listen(n network.Network, _ ma.Multiaddr) { -} - -func (p *PubSubNotif) ListenClose(n network.Network, _ ma.Multiaddr) { -} - -func (p *PubSubNotif) Initialize() { - isTransient := func(pid peer.ID) bool { - for _, c := range p.host.Network().ConnsToPeer(pid) { - if !c.Stat().Limited { - return false - } - } - - return true - } - - p.newPeersPrioLk.RLock() - p.newPeersMx.Lock() - for _, pid := range p.host.Network().Peers() { - if isTransient(pid) { - continue - } - - p.newPeersPend[pid] = struct{}{} - } - p.newPeersMx.Unlock() - p.newPeersPrioLk.RUnlock() - - select { - case p.newPeers <- struct{}{}: - default: - } -} diff --git a/notify_test.go b/notify_test.go new file mode 100644 index 00000000..fa5b755a --- /dev/null +++ b/notify_test.go @@ -0,0 +1,76 @@ +package pubsub + +import ( + "context" + "testing" + "time" + + "github.com/libp2p/go-libp2p/p2p/protocol/identify" +) + +func TestNotifyPeerProtocolsUpdated(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hosts := getDefaultHosts(t, 2) + + // Initialize id services. + { + ids1, err := identify.NewIDService(hosts[0]) + if err != nil { + t.Fatal(err) + } + ids1.Start() + defer ids1.Close() + + ids2, err := identify.NewIDService(hosts[1]) + if err != nil { + t.Fatal(err) + } + ids2.Start() + defer ids2.Close() + } + + psubs0 := getPubsub(ctx, hosts[0]) + connect(t, hosts[0], hosts[1]) + // Delay to make sure that peers are connected. + <-time.After(time.Millisecond * 100) + psubs1 := getPubsub(ctx, hosts[1]) + + // Pubsub 0 joins topic "test". + topic0, err := psubs0.Join("test") + if err != nil { + t.Fatal(err) + } + defer topic0.Close() + + sub0, err := topic0.Subscribe() + if err != nil { + t.Fatal(err) + } + defer sub0.Cancel() + + // Pubsub 1 joins topic "test". + topic1, err := psubs1.Join("test") + if err != nil { + t.Fatal(err) + } + defer topic1.Close() + + sub1, err := topic1.Subscribe() + if err != nil { + t.Fatal(err) + } + defer sub1.Cancel() + + // Delay before checking results (similar to most tests). + <-time.After(time.Millisecond * 100) + + if len(topic0.ListPeers()) == 0 { + t.Fatalf("topic0 should at least have 1 peer") + } + + if len(topic1.ListPeers()) == 0 { + t.Fatalf("topic1 should at least have 1 peer") + } +} diff --git a/peer_notify.go b/peer_notify.go new file mode 100644 index 00000000..44aceeef --- /dev/null +++ b/peer_notify.go @@ -0,0 +1,112 @@ +package pubsub + +import ( + "context" + + "github.com/libp2p/go-libp2p/core/event" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" +) + +func (ps *PubSub) watchForNewPeers(ctx context.Context) { + // We don't bother subscribing to "connectivity" events because we always run identify after + // every new connection. + sub, err := ps.host.EventBus().Subscribe([]interface{}{ + &event.EvtPeerIdentificationCompleted{}, + &event.EvtPeerProtocolsUpdated{}, + }) + if err != nil { + log.Errorf("failed to subscribe to peer identification events: %v", err) + return + } + defer sub.Close() + + ps.newPeersPrioLk.RLock() + ps.newPeersMx.Lock() + for _, pid := range ps.host.Network().Peers() { + if ps.host.Network().Connectedness(pid) != network.Connected { + continue + } + ps.newPeersPend[pid] = struct{}{} + } + ps.newPeersMx.Unlock() + ps.newPeersPrioLk.RUnlock() + + select { + case ps.newPeers <- struct{}{}: + default: + } + + var supportsProtocol func(protocol.ID) bool + if ps.protoMatchFunc != nil { + var supportedProtocols []func(protocol.ID) bool + for _, proto := range ps.rt.Protocols() { + + supportedProtocols = append(supportedProtocols, ps.protoMatchFunc(proto)) + } + supportsProtocol = func(proto protocol.ID) bool { + for _, fn := range supportedProtocols { + if (fn)(proto) { + return true + } + } + return false + } + } else { + supportedProtocols := make(map[protocol.ID]struct{}) + for _, proto := range ps.rt.Protocols() { + supportedProtocols[proto] = struct{}{} + } + supportsProtocol = func(proto protocol.ID) bool { + _, ok := supportedProtocols[proto] + return ok + } + } + + for ctx.Err() == nil { + var ev any + select { + case <-ctx.Done(): + return + case ev = <-sub.Out(): + } + + var protos []protocol.ID + var peer peer.ID + switch ev := ev.(type) { + case event.EvtPeerIdentificationCompleted: + peer = ev.Peer + protos = ev.Protocols + case event.EvtPeerProtocolsUpdated: + peer = ev.Peer + protos = ev.Added + default: + continue + } + + // We don't bother checking connectivity (connected and non-"limited") here because + // we'll check when actually handling the new peer. + + for _, p := range protos { + if supportsProtocol(p) { + ps.notifyNewPeer(peer) + break + } + } + } + +} + +func (ps *PubSub) notifyNewPeer(peer peer.ID) { + ps.newPeersPrioLk.RLock() + ps.newPeersMx.Lock() + ps.newPeersPend[peer] = struct{}{} + ps.newPeersMx.Unlock() + ps.newPeersPrioLk.RUnlock() + + select { + case ps.newPeers <- struct{}{}: + default: + } +} diff --git a/pubsub.go b/pubsub.go index a9d646e8..330b6f11 100644 --- a/pubsub.go +++ b/pubsub.go @@ -331,14 +331,12 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option h.SetStreamHandler(id, ps.handleNewStream) } } - h.Network().Notify((*PubSubNotif)(ps)) + go ps.watchForNewPeers(ctx) ps.val.Start(ps) go ps.processLoop(ctx) - (*PubSubNotif)(ps).Initialize() - return ps, nil } @@ -698,6 +696,8 @@ func (p *PubSub) handlePendingPeers() { p.newPeersPrioLk.Unlock() for pid := range newPeers { + // Make sure we have a non-limited connection. We do this late because we may have + // disconnected in the meantime. if p.host.Network().Connectedness(pid) != network.Connected { continue } diff --git a/pubsub_test.go b/pubsub_test.go index 4a033159..245a69df 100644 --- a/pubsub_test.go +++ b/pubsub_test.go @@ -4,13 +4,32 @@ import ( "context" "testing" "time" + + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" ) +func getDefaultHosts(t *testing.T, n int) []host.Host { + var out []host.Host + + for i := 0; i < n; i++ { + h, err := libp2p.New(libp2p.ResourceManager(&network.NullResourceManager{})) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { h.Close() }) + out = append(out, h) + } + + return out +} + // See https://github.com/libp2p/go-libp2p-pubsub/issues/426 func TestPubSubRemovesBlacklistedPeer(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) bl := NewMapBlacklist() diff --git a/randomsub_test.go b/randomsub_test.go index 8eb640ea..5c817b7c 100644 --- a/randomsub_test.go +++ b/randomsub_test.go @@ -40,7 +40,7 @@ func TestRandomsubSmall(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 10) + hosts := getDefaultHosts(t, 10) psubs := getRandomsubs(ctx, hosts, 10) connectAll(t, hosts) @@ -77,7 +77,7 @@ func TestRandomsubBig(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 50) + hosts := getDefaultHosts(t, 50) psubs := getRandomsubs(ctx, hosts, 50) connectSome(t, hosts, 12) @@ -114,7 +114,7 @@ func TestRandomsubMixed(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 40) + hosts := getDefaultHosts(t, 40) fsubs := getPubsubs(ctx, hosts[:10]) rsubs := getRandomsubs(ctx, hosts[10:], 30) psubs := append(fsubs, rsubs...) @@ -153,7 +153,7 @@ func TestRandomsubEnoughPeers(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 40) + hosts := getDefaultHosts(t, 40) fsubs := getPubsubs(ctx, hosts[:10]) rsubs := getRandomsubs(ctx, hosts[10:], 30) psubs := append(fsubs, rsubs...) diff --git a/subscription_filter_test.go b/subscription_filter_test.go index 8a4fe4db..0057cdcf 100644 --- a/subscription_filter_test.go +++ b/subscription_filter_test.go @@ -19,15 +19,15 @@ func TestBasicSubscriptionFilter(t *testing.T) { topic3 := "test3" yes := true subs := []*pb.RPC_SubOpts{ - &pb.RPC_SubOpts{ + { Topicid: &topic1, Subscribe: &yes, }, - &pb.RPC_SubOpts{ + { Topicid: &topic2, Subscribe: &yes, }, - &pb.RPC_SubOpts{ + { Topicid: &topic3, Subscribe: &yes, }, @@ -108,24 +108,24 @@ func TestSubscriptionFilterDeduplication(t *testing.T) { yes := true no := false subs := []*pb.RPC_SubOpts{ - &pb.RPC_SubOpts{ + { Topicid: &topic1, Subscribe: &yes, }, - &pb.RPC_SubOpts{ + { Topicid: &topic1, Subscribe: &yes, }, - &pb.RPC_SubOpts{ + { Topicid: &topic2, Subscribe: &yes, }, - &pb.RPC_SubOpts{ + { Topicid: &topic2, Subscribe: &no, }, - &pb.RPC_SubOpts{ + { Topicid: &topic3, Subscribe: &yes, }, @@ -150,7 +150,7 @@ func TestSubscriptionFilterRPC(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) ps1 := getPubsub(ctx, hosts[0], WithSubscriptionFilter(NewAllowlistSubscriptionFilter("test1", "test2"))) ps2 := getPubsub(ctx, hosts[1], WithSubscriptionFilter(NewAllowlistSubscriptionFilter("test2", "test3"))) diff --git a/timecache/time_cache.go b/timecache/time_cache.go index e33bc354..ee34fd5b 100644 --- a/timecache/time_cache.go +++ b/timecache/time_cache.go @@ -2,12 +2,8 @@ package timecache import ( "time" - - logger "github.com/ipfs/go-log/v2" ) -var log = logger.Logger("pubsub/timecache") - // Stategy is the TimeCache expiration strategy to use. type Strategy uint8 diff --git a/topic_test.go b/topic_test.go index 9ad3146d..ef05feb4 100644 --- a/topic_test.go +++ b/topic_test.go @@ -99,7 +99,7 @@ func testTopicCloseWithOpenResource(t *testing.T, openResource func(topic *Topic const numHosts = 1 topicID := "foobar" - hosts := getNetHosts(t, ctx, numHosts) + hosts := getDefaultHosts(t, numHosts) ps := getPubsub(ctx, hosts[0]) // Try create and cancel topic @@ -139,7 +139,7 @@ func TestTopicReuse(t *testing.T) { const numHosts = 2 topicID := "foobar" - hosts := getNetHosts(t, ctx, numHosts) + hosts := getDefaultHosts(t, numHosts) sender := getPubsub(ctx, hosts[0], WithDiscovery(&dummyDiscovery{})) receiver := getPubsub(ctx, hosts[1]) @@ -233,7 +233,7 @@ func TestTopicEventHandlerCancel(t *testing.T) { const numHosts = 5 topicID := "foobar" - hosts := getNetHosts(t, ctx, numHosts) + hosts := getDefaultHosts(t, numHosts) ps := getPubsub(ctx, hosts[0]) // Try create and cancel topic @@ -265,7 +265,7 @@ func TestSubscriptionJoinNotification(t *testing.T) { const numLateSubscribers = 10 const numHosts = 20 - hosts := getNetHosts(t, ctx, numHosts) + hosts := getDefaultHosts(t, numHosts) topics := getTopics(getPubsubs(ctx, hosts), "foobar") evts := getTopicEvts(topics) @@ -331,7 +331,7 @@ func TestSubscriptionLeaveNotification(t *testing.T) { defer cancel() const numHosts = 20 - hosts := getNetHosts(t, ctx, numHosts) + hosts := getDefaultHosts(t, numHosts) psubs := getPubsubs(ctx, hosts) topics := getTopics(psubs, "foobar") evts := getTopicEvts(topics) @@ -416,7 +416,7 @@ func TestSubscriptionManyNotifications(t *testing.T) { const topic = "foobar" const numHosts = 33 - hosts := getNetHosts(t, ctx, numHosts) + hosts := getDefaultHosts(t, numHosts) topics := getTopics(getPubsubs(ctx, hosts), topic) evts := getTopicEvts(topics) @@ -521,7 +521,7 @@ func TestSubscriptionNotificationSubUnSub(t *testing.T) { const topic = "foobar" const numHosts = 35 - hosts := getNetHosts(t, ctx, numHosts) + hosts := getDefaultHosts(t, numHosts) topics := getTopics(getPubsubs(ctx, hosts), topic) for i := 1; i < numHosts; i++ { @@ -539,7 +539,7 @@ func TestTopicRelay(t *testing.T) { const topic = "foobar" const numHosts = 5 - hosts := getNetHosts(t, ctx, numHosts) + hosts := getDefaultHosts(t, numHosts) topics := getTopics(getPubsubs(ctx, hosts), topic) // [0.Rel] - [1.Rel] - [2.Sub] @@ -603,7 +603,7 @@ func TestTopicRelayReuse(t *testing.T) { const topic = "foobar" const numHosts = 1 - hosts := getNetHosts(t, ctx, numHosts) + hosts := getDefaultHosts(t, numHosts) pubsubs := getPubsubs(ctx, hosts) topics := getTopics(pubsubs, topic) @@ -670,7 +670,7 @@ func TestTopicRelayOnClosedTopic(t *testing.T) { const topic = "foobar" const numHosts = 1 - hosts := getNetHosts(t, ctx, numHosts) + hosts := getDefaultHosts(t, numHosts) topics := getTopics(getPubsubs(ctx, hosts), topic) err := topics[0].Close() @@ -690,7 +690,7 @@ func TestProducePanic(t *testing.T) { const numHosts = 5 topicID := "foobar" - hosts := getNetHosts(t, ctx, numHosts) + hosts := getDefaultHosts(t, numHosts) ps := getPubsub(ctx, hosts[0]) // Create topic @@ -743,7 +743,7 @@ func notifSubThenUnSub(ctx context.Context, t *testing.T, topics []*Topic) { } // Wait for the unsubscribe messages to reach the primary peer - for len(primaryTopic.ListPeers()) < 0 { + for len(primaryTopic.ListPeers()) > 0 { time.Sleep(time.Millisecond * 100) } @@ -792,7 +792,7 @@ func TestMinTopicSizeNoDiscovery(t *testing.T) { const numHosts = 3 topicID := "foobar" - hosts := getNetHosts(t, ctx, numHosts) + hosts := getDefaultHosts(t, numHosts) sender := getPubsub(ctx, hosts[0]) receiver1 := getPubsub(ctx, hosts[1]) @@ -872,7 +872,7 @@ func TestWithTopicMsgIdFunction(t *testing.T) { const topicA, topicB = "foobarA", "foobarB" const numHosts = 2 - hosts := getNetHosts(t, ctx, numHosts) + hosts := getDefaultHosts(t, numHosts) pubsubs := getPubsubs(ctx, hosts, WithMessageIdFn(func(pmsg *pb.Message) string { hash := sha256.Sum256(pmsg.Data) return string(hash[:]) @@ -932,7 +932,7 @@ func TestTopicPublishWithKeyInvalidParameters(t *testing.T) { const numHosts = 5 virtualPeer := tnet.RandPeerNetParamsOrFatal(t) - hosts := getNetHosts(t, ctx, numHosts) + hosts := getDefaultHosts(t, numHosts) topics := getTopics(getPubsubs(ctx, hosts), topic) t.Run("nil sign private key should error", func(t *testing.T) { @@ -959,7 +959,7 @@ func TestTopicRelayPublishWithKey(t *testing.T) { const numHosts = 5 virtualPeer := tnet.RandPeerNetParamsOrFatal(t) - hosts := getNetHosts(t, ctx, numHosts) + hosts := getDefaultHosts(t, numHosts) topics := getTopics(getPubsubs(ctx, hosts), topic) // [0.Rel] - [1.Rel] - [2.Sub] @@ -1026,7 +1026,7 @@ func TestWithLocalPublication(t *testing.T) { const topic = "test" - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) pubsubs := getPubsubs(ctx, hosts) topics := getTopics(pubsubs, topic) connectAll(t, hosts) diff --git a/trace_test.go b/trace_test.go index fb8cb56d..287216f1 100644 --- a/trace_test.go +++ b/trace_test.go @@ -17,9 +17,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" - bhost "github.com/libp2p/go-libp2p/p2p/host/blank" - swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing" - + //lint:ignore SA1019 "github.com/libp2p/go-msgio/protoio" is deprecated "github.com/libp2p/go-msgio/protoio" ) @@ -27,7 +25,7 @@ func testWithTracer(t *testing.T, tracer EventTracer) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 20) + hosts := getDefaultHosts(t, 20) psubs := getGossipsubs(ctx, hosts, WithEventTracer(tracer), // to bootstrap from star topology @@ -302,10 +300,9 @@ func TestRemoteTracer(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - h1 := bhost.NewBlankHost(swarmt.GenSwarm(t)) - h2 := bhost.NewBlankHost(swarmt.GenSwarm(t)) - defer h1.Close() - defer h2.Close() + hosts := getDefaultHosts(t, 2) + h1 := hosts[0] + h2 := hosts[1] mrt := &mockRemoteTracer{} h1.SetStreamHandler(RemoteTracerProtoID, mrt.handleStream) diff --git a/tracer.go b/tracer.go index 8e744c91..cbb92ad7 100644 --- a/tracer.go +++ b/tracer.go @@ -17,6 +17,7 @@ import ( "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/protocol" + //lint:ignore SA1019 "github.com/libp2p/go-msgio/protoio" is deprecated "github.com/libp2p/go-msgio/protoio" ) diff --git a/validation_builtin_test.go b/validation_builtin_test.go index df406f26..bca8774c 100644 --- a/validation_builtin_test.go +++ b/validation_builtin_test.go @@ -38,7 +38,7 @@ func testBasicSeqnoValidator(t *testing.T, ttl time.Duration) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 20) + hosts := getDefaultHosts(t, 20) psubs := getPubsubsWithOptionC(ctx, hosts, func(i int) Option { return WithDefaultValidator(NewBasicSeqnoValidator(newMockPeerMetadataStore())) @@ -86,7 +86,7 @@ func TestBasicSeqnoValidatorReplay(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 20) + hosts := getDefaultHosts(t, 20) psubs := getPubsubsWithOptionC(ctx, hosts[:19], func(i int) Option { return WithDefaultValidator(NewBasicSeqnoValidator(newMockPeerMetadataStore())) @@ -246,7 +246,7 @@ func (r *replayActor) replay(msg *pb.Message) { var peers []peer.ID r.mx.Lock() - for p, _ := range r.out { + for p := range r.out { if rng.Intn(2) > 0 { peers = append(peers, p) } diff --git a/validation_test.go b/validation_test.go index b56e7677..0a09f70b 100644 --- a/validation_test.go +++ b/validation_test.go @@ -15,7 +15,7 @@ func TestRegisterUnregisterValidator(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 1) + hosts := getDefaultHosts(t, 1) psubs := getPubsubs(ctx, hosts) err := psubs[0].RegisterTopicValidator("foo", func(context.Context, peer.ID, *Message) bool { @@ -40,7 +40,7 @@ func TestRegisterValidatorEx(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 3) + hosts := getDefaultHosts(t, 3) psubs := getPubsubs(ctx, hosts) err := psubs[0].RegisterTopicValidator("test", @@ -69,7 +69,7 @@ func TestValidate(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) psubs := getPubsubs(ctx, hosts) connect(t, hosts[0], hosts[1]) @@ -123,7 +123,7 @@ func TestValidate2(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 1) + hosts := getDefaultHosts(t, 1) psubs := getPubsubs(ctx, hosts) topic := "foobar" @@ -201,7 +201,7 @@ func TestValidateOverload(t *testing.T) { for tci, tc := range tcs { t.Run(fmt.Sprintf("%d", tci), func(t *testing.T) { - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) psubs := getPubsubs(ctx, hosts) connect(t, hosts[0], hosts[1]) @@ -273,7 +273,7 @@ func TestValidateAssortedOptions(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 10) + hosts := getDefaultHosts(t, 10) psubs := getPubsubs(ctx, hosts, WithValidateQueueSize(10), WithValidateThrottle(10), diff --git a/version.json b/version.json new file mode 100644 index 00000000..ea22ea59 --- /dev/null +++ b/version.json @@ -0,0 +1,3 @@ +{ + "version": "v0.11.0" +}