diff --git a/.gitignore b/.gitignore index 82faf1e..31f9c8d 100644 --- a/.gitignore +++ b/.gitignore @@ -20,4 +20,7 @@ # Go workspace file go.work -bin \ No newline at end of file +bin + +cover.out +cover.html \ No newline at end of file diff --git a/Makefile b/Makefile index 30434b6..3c9e160 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ BUILD_TARGET?=${APP_NAME} BUILD_IMG?=${APP_NAME} APP_VERSION?=$(git describe --tags $(git rev-list --tags --max-count=1) 2> /dev/null || echo "nightly") CFG_PATH?=/route-p2p/router.json -TEST_PKG?=./... +TEST_PKG?=./core/... TEST_TIMEOUT?=2m lint: @@ -13,13 +13,13 @@ fmt: @go fmt ./... test: - @go test -v -race -timeout=${TEST_TIMEOUT} `go list ./... | grep -v -E "cmd|scripts"` + @go test -v -race -timeout=${TEST_TIMEOUT} `go list ./... | grep -v -E "cmd|scripts|resources"` test-pkg: @go test -v -race -timeout=${TEST_TIMEOUT} ${TEST_PKG} test-cov: - @go test -v -race -timeout=${TEST_TIMEOUT} -coverprofile cover.out `go list ./... | grep -v -E "cmd|scripts"` + @go test -v -race -timeout=${TEST_TIMEOUT} -coverprofile cover.out `go list ./... | grep -v -E "cmd|scripts|resources"` test-open-cov: @make test-cov diff --git a/core/daemon.go b/core/daemon.go index e7653d0..196b641 100644 --- a/core/daemon.go +++ b/core/daemon.go @@ -56,7 +56,11 @@ func (d *Daemon) Start(ctx context.Context) error { go d.router.Start(ctx) if d.cfg.Discovery != nil { - _, bootstrappers := parseDiscoveryConfig(*d.cfg.Discovery) + _, bootstrappers, err := parseDiscoveryConfig(*d.cfg.Discovery) + if err != nil { + return err + } + d.lggr.Debugw("connecting to bootstrappers", "bootstrappers", bootstrappers, "raw", d.cfg.Discovery.Bootstrappers) for _, b := range bootstrappers { d.connect(b) } diff --git a/core/daemon_test.go b/core/daemon_test.go index 52facc3..d2aedf9 100644 --- a/core/daemon_test.go +++ b/core/daemon_test.go @@ -26,7 +26,58 @@ func TestDaemon_Sanity(t *testing.T) { require.NoError(t, logging.SetLogLevelRegex("p2pmq", "debug")) - bootAddr := "/ip4/0.0.0.0/tcp/5001" + hitMap := map[string]*atomic.Int32{} + for i := 0; i < n; i++ { + hitMap[fmt.Sprintf("test-%d", i+1)] = &atomic.Int32{} + } + + daemons, done := setupDaemons(ctx, t, n, func(m *pubsub.Message) { + hitMap[m.GetTopic()].Add(1) + // lggr.Infow("got pubsub message", "topic", m.GetTopic(), "from", m.GetFrom(), "data", string(m.GetData())) + }) + defer done() + + <-time.After(time.Second * 5) // TODO: avoid timeout + + t.Log("peers connected") + + var wg sync.WaitGroup + for _, d := range daemons { + wg.Add(1) + go func(d *Daemon) { + defer wg.Done() + for i := 0; i < n; i++ { + require.NoError(t, d.Subscribe(ctx, fmt.Sprintf("test-%d", i+1))) + } + }(d) + } + + wg.Wait() + + <-time.After(time.Second * 2) // TODO: avoid timeout + + for r := 0; r < rounds; r++ { + for i, d := range daemons { + wg.Add(1) + go func(d *Daemon, r, i int) { + defer wg.Done() + require.NoError(t, d.Publish(ctx, fmt.Sprintf("test-%d", i+1), []byte(fmt.Sprintf("round-%d-test-data-%d", r+1, i+1)))) + }(d, r, i) + } + } + + wg.Wait() + + <-time.After(time.Second * 2) // TODO: avoid timeout + + for topic, counter := range hitMap { + count := int(counter.Load()) / n // per node + require.Equal(t, rounds, count, "should get %d messages on topic %s", rounds, topic) + } +} + +func setupDaemons(ctx context.Context, t *testing.T, n int, routingFn func(*pubsub.Message)) ([]*Daemon, func()) { + bootAddr := "/ip4/127.0.0.1/tcp/5001" boot, err := NewDaemon(ctx, commons.Config{ ListenAddrs: []string{ bootAddr, @@ -42,9 +93,6 @@ func TestDaemon_Sanity(t *testing.T) { go func() { _ = boot.Start(ctx) }() - defer func() { - _ = boot.Close() - }() t.Logf("started bootstrapper %s", boot.host.ID()) @@ -59,24 +107,20 @@ func TestDaemon_Sanity(t *testing.T) { for i := 0; i < n; i++ { cfg := commons.Config{ ListenAddrs: []string{ - "/ip4/0.0.0.0/tcp/0", + "/ip4/127.0.0.1/tcp/0", }, - MdnsTag: "p2pmq/mdns/test", + // MdnsTag: "p2pmq/mdns/test", Discovery: &commons.DiscoveryConfig{ Mode: commons.ModeServer, ProtocolPrefix: "p2pmq/kad/test", Bootstrappers: []string{ - bootAddr, + fmt.Sprintf("%s/p2p/%s", bootAddr, boot.host.ID()), }, }, Pubsub: &commons.PubsubConfig{}, } - d, err := NewDaemon(ctx, cfg, NewRouter(1024, 4, func(m *pubsub.Message) { - counter := hitMap[m.GetTopic()] - counter.Add(1) - // lggr.Infow("got pubsub message", "topic", m.GetTopic(), "from", m.GetFrom(), "data", string(m.GetData())) - }), fmt.Sprintf("peer-%d", i+1)) + d, err := NewDaemon(ctx, cfg, NewRouter(1024, 4, routingFn), fmt.Sprintf("peer-%d", i+1)) require.NoError(t, err) daemons[i] = d t.Logf("created daemon %d: %s", i+1, d.host.ID()) @@ -86,51 +130,15 @@ func TestDaemon_Sanity(t *testing.T) { require.NoError(t, d.Start(ctx)) t.Logf("started daemon %d: %s", i+1, d.host.ID()) } - defer func() { - for _, d := range daemons { - _ = d.Close() - } - }() waitDaemonsConnected(n) - <-time.After(time.Second * 5) // TODO: avoid timeout - - t.Log("peers connected") - - var wg sync.WaitGroup - for _, d := range daemons { - wg.Add(1) - go func(d *Daemon) { - defer wg.Done() - for i := 0; i < n; i++ { - require.NoError(t, d.Subscribe(ctx, fmt.Sprintf("test-%d", i+1))) - } - }(d) - } - - wg.Wait() - - <-time.After(time.Second * 2) // TODO: avoid timeout - - for r := 0; r < rounds; r++ { - for i, d := range daemons { - wg.Add(1) - go func(d *Daemon, r, i int) { - defer wg.Done() - require.NoError(t, d.Publish(ctx, fmt.Sprintf("test-%d", i+1), []byte(fmt.Sprintf("round-%d-test-data-%d", r+1, i+1)))) - }(d, r, i) + return daemons, func() { + _ = boot.Close() + for _, d := range daemons { + _ = d.Close() } } - - wg.Wait() - - <-time.After(time.Second * 2) // TODO: avoid timeout - - for topic, counter := range hitMap { - count := int(counter.Load()) / n // per node - require.Equal(t, rounds, count, "should get %d messages on topic %s", rounds, topic) - } } func waitDaemonsConnected(n int, daemons ...*Daemon) { diff --git a/core/dht.go b/core/dht.go index 3c13e2d..d32d745 100644 --- a/core/dht.go +++ b/core/dht.go @@ -2,6 +2,7 @@ package core import ( "context" + "fmt" "github.com/amirylm/p2pmq/commons" dht "github.com/libp2p/go-libp2p-kad-dht" @@ -22,7 +23,7 @@ func (d *Daemon) dhtRoutingFactory(ctx context.Context, opts ...dhtopts.Option) } } -func parseDiscoveryConfig(opts commons.DiscoveryConfig) (dht.ModeOpt, []peer.AddrInfo) { +func parseDiscoveryConfig(opts commons.DiscoveryConfig) (dht.ModeOpt, []peer.AddrInfo, error) { var dmode dht.ModeOpt switch opts.Mode { case commons.ModeClient: @@ -38,11 +39,10 @@ func parseDiscoveryConfig(opts commons.DiscoveryConfig) (dht.ModeOpt, []peer.Add for _, bnstr := range opts.Bootstrappers { bn, err := peer.AddrInfoFromString(bnstr) if err != nil { - // TODO: handle err - continue + return dmode, nil, fmt.Errorf("failed to parse bootstrapper addr %s: %w", bnstr, err) } bootstrappers = append(bootstrappers, *bn) } - return dmode, bootstrappers + return dmode, bootstrappers, nil } diff --git a/core/setup.go b/core/setup.go index 911aaf6..0b20ef5 100644 --- a/core/setup.go +++ b/core/setup.go @@ -57,7 +57,10 @@ func (d *Daemon) setup(ctx context.Context, cfg commons.Config) (err error) { if cfg.Discovery != nil { cfg.Discovery.Defaults() - dmode, bootstrappers := parseDiscoveryConfig(*cfg.Discovery) + dmode, bootstrappers, err := parseDiscoveryConfig(*cfg.Discovery) + if err != nil { + return err + } dhtOpts := []dht.Option{ dht.ProtocolPrefix(protocol.ID(cfg.Discovery.ProtocolPrefix)), dht.Mode(dmode), diff --git a/go.mod b/go.mod index 6e1509d..39899e2 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,6 @@ require ( github.com/libp2p/go-libp2p v0.31.0 github.com/libp2p/go-libp2p-kad-dht v0.25.1 github.com/libp2p/go-libp2p-pubsub v0.9.3 - github.com/multiformats/go-multiaddr v0.11.0 go.uber.org/zap v1.25.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -15,6 +14,7 @@ require ( require ( github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/multiformats/go-multiaddr v0.11.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect ) diff --git a/resources/config/default.p2pmq.yaml b/resources/config/default.p2pmq.yaml index 6ac27eb..023f671 100644 --- a/resources/config/default.p2pmq.yaml +++ b/resources/config/default.p2pmq.yaml @@ -4,5 +4,5 @@ discovery: serviceTag: p2pmq/kad mode: server bootstrappers: - - /ip4/0.0.0.0/tcp/5001 -pubsub: \ No newline at end of file + - /ip4/0.0.0.0/tcp/5001/p2p/12D3KooWH9DU2whKMyL4WXExj2iZNHmEfg5kDtBKWfmw4QGjJ1eV # change peer id +pubsub: