Skip to content

Commit

Permalink
updates and fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
amirylm committed Sep 18, 2023
1 parent e122178 commit 922f2ec
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 65 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,7 @@
# Go workspace file
go.work

bin
bin

cover.out
cover.html
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ BUILD_TARGET?=${APP_NAME}
BUILD_IMG?=${APP_NAME}
APP_VERSION?=$(git describe --tags $(git rev-list --tags --max-count=1) 2> /dev/null || echo "nightly")
CFG_PATH?=/route-p2p/router.json
TEST_PKG?=./...
TEST_PKG?=./core/...
TEST_TIMEOUT?=2m

lint:
Expand All @@ -13,13 +13,13 @@ fmt:
@go fmt ./...

test:
@go test -v -race -timeout=${TEST_TIMEOUT} `go list ./... | grep -v -E "cmd|scripts"`
@go test -v -race -timeout=${TEST_TIMEOUT} `go list ./... | grep -v -E "cmd|scripts|resources"`

test-pkg:
@go test -v -race -timeout=${TEST_TIMEOUT} ${TEST_PKG}

test-cov:
@go test -v -race -timeout=${TEST_TIMEOUT} -coverprofile cover.out `go list ./... | grep -v -E "cmd|scripts"`
@go test -v -race -timeout=${TEST_TIMEOUT} -coverprofile cover.out `go list ./... | grep -v -E "cmd|scripts|resources"`

test-open-cov:
@make test-cov
Expand Down
6 changes: 5 additions & 1 deletion core/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@ func (d *Daemon) Start(ctx context.Context) error {
go d.router.Start(ctx)

if d.cfg.Discovery != nil {
_, bootstrappers := parseDiscoveryConfig(*d.cfg.Discovery)
_, bootstrappers, err := parseDiscoveryConfig(*d.cfg.Discovery)
if err != nil {
return err
}
d.lggr.Debugw("connecting to bootstrappers", "bootstrappers", bootstrappers, "raw", d.cfg.Discovery.Bootstrappers)
for _, b := range bootstrappers {
d.connect(b)
}
Expand Down
112 changes: 60 additions & 52 deletions core/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,58 @@ func TestDaemon_Sanity(t *testing.T) {

require.NoError(t, logging.SetLogLevelRegex("p2pmq", "debug"))

bootAddr := "/ip4/0.0.0.0/tcp/5001"
hitMap := map[string]*atomic.Int32{}
for i := 0; i < n; i++ {
hitMap[fmt.Sprintf("test-%d", i+1)] = &atomic.Int32{}
}

daemons, done := setupDaemons(ctx, t, n, func(m *pubsub.Message) {
hitMap[m.GetTopic()].Add(1)
// lggr.Infow("got pubsub message", "topic", m.GetTopic(), "from", m.GetFrom(), "data", string(m.GetData()))
})
defer done()

<-time.After(time.Second * 5) // TODO: avoid timeout

t.Log("peers connected")

var wg sync.WaitGroup
for _, d := range daemons {
wg.Add(1)
go func(d *Daemon) {
defer wg.Done()
for i := 0; i < n; i++ {
require.NoError(t, d.Subscribe(ctx, fmt.Sprintf("test-%d", i+1)))
}
}(d)
}

wg.Wait()

<-time.After(time.Second * 2) // TODO: avoid timeout

for r := 0; r < rounds; r++ {
for i, d := range daemons {
wg.Add(1)
go func(d *Daemon, r, i int) {
defer wg.Done()
require.NoError(t, d.Publish(ctx, fmt.Sprintf("test-%d", i+1), []byte(fmt.Sprintf("round-%d-test-data-%d", r+1, i+1))))
}(d, r, i)
}
}

wg.Wait()

<-time.After(time.Second * 2) // TODO: avoid timeout

for topic, counter := range hitMap {
count := int(counter.Load()) / n // per node
require.Equal(t, rounds, count, "should get %d messages on topic %s", rounds, topic)
}
}

func setupDaemons(ctx context.Context, t *testing.T, n int, routingFn func(*pubsub.Message)) ([]*Daemon, func()) {
bootAddr := "/ip4/127.0.0.1/tcp/5001"
boot, err := NewDaemon(ctx, commons.Config{
ListenAddrs: []string{
bootAddr,
Expand All @@ -42,9 +93,6 @@ func TestDaemon_Sanity(t *testing.T) {
go func() {
_ = boot.Start(ctx)
}()
defer func() {
_ = boot.Close()
}()

t.Logf("started bootstrapper %s", boot.host.ID())

Expand All @@ -59,24 +107,20 @@ func TestDaemon_Sanity(t *testing.T) {
for i := 0; i < n; i++ {
cfg := commons.Config{
ListenAddrs: []string{
"/ip4/0.0.0.0/tcp/0",
"/ip4/127.0.0.1/tcp/0",
},
MdnsTag: "p2pmq/mdns/test",
// MdnsTag: "p2pmq/mdns/test",
Discovery: &commons.DiscoveryConfig{
Mode: commons.ModeServer,
ProtocolPrefix: "p2pmq/kad/test",
Bootstrappers: []string{
bootAddr,
fmt.Sprintf("%s/p2p/%s", bootAddr, boot.host.ID()),
},
},
Pubsub: &commons.PubsubConfig{},
}

d, err := NewDaemon(ctx, cfg, NewRouter(1024, 4, func(m *pubsub.Message) {
counter := hitMap[m.GetTopic()]
counter.Add(1)
// lggr.Infow("got pubsub message", "topic", m.GetTopic(), "from", m.GetFrom(), "data", string(m.GetData()))
}), fmt.Sprintf("peer-%d", i+1))
d, err := NewDaemon(ctx, cfg, NewRouter(1024, 4, routingFn), fmt.Sprintf("peer-%d", i+1))
require.NoError(t, err)
daemons[i] = d
t.Logf("created daemon %d: %s", i+1, d.host.ID())
Expand All @@ -86,51 +130,15 @@ func TestDaemon_Sanity(t *testing.T) {
require.NoError(t, d.Start(ctx))
t.Logf("started daemon %d: %s", i+1, d.host.ID())
}
defer func() {
for _, d := range daemons {
_ = d.Close()
}
}()

waitDaemonsConnected(n)

<-time.After(time.Second * 5) // TODO: avoid timeout

t.Log("peers connected")

var wg sync.WaitGroup
for _, d := range daemons {
wg.Add(1)
go func(d *Daemon) {
defer wg.Done()
for i := 0; i < n; i++ {
require.NoError(t, d.Subscribe(ctx, fmt.Sprintf("test-%d", i+1)))
}
}(d)
}

wg.Wait()

<-time.After(time.Second * 2) // TODO: avoid timeout

for r := 0; r < rounds; r++ {
for i, d := range daemons {
wg.Add(1)
go func(d *Daemon, r, i int) {
defer wg.Done()
require.NoError(t, d.Publish(ctx, fmt.Sprintf("test-%d", i+1), []byte(fmt.Sprintf("round-%d-test-data-%d", r+1, i+1))))
}(d, r, i)
return daemons, func() {
_ = boot.Close()
for _, d := range daemons {
_ = d.Close()
}
}

wg.Wait()

<-time.After(time.Second * 2) // TODO: avoid timeout

for topic, counter := range hitMap {
count := int(counter.Load()) / n // per node
require.Equal(t, rounds, count, "should get %d messages on topic %s", rounds, topic)
}
}

func waitDaemonsConnected(n int, daemons ...*Daemon) {
Expand Down
8 changes: 4 additions & 4 deletions core/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package core

import (
"context"
"fmt"

"github.com/amirylm/p2pmq/commons"
dht "github.com/libp2p/go-libp2p-kad-dht"
Expand All @@ -22,7 +23,7 @@ func (d *Daemon) dhtRoutingFactory(ctx context.Context, opts ...dhtopts.Option)
}
}

func parseDiscoveryConfig(opts commons.DiscoveryConfig) (dht.ModeOpt, []peer.AddrInfo) {
func parseDiscoveryConfig(opts commons.DiscoveryConfig) (dht.ModeOpt, []peer.AddrInfo, error) {
var dmode dht.ModeOpt
switch opts.Mode {
case commons.ModeClient:
Expand All @@ -38,11 +39,10 @@ func parseDiscoveryConfig(opts commons.DiscoveryConfig) (dht.ModeOpt, []peer.Add
for _, bnstr := range opts.Bootstrappers {
bn, err := peer.AddrInfoFromString(bnstr)
if err != nil {
// TODO: handle err
continue
return dmode, nil, fmt.Errorf("failed to parse bootstrapper addr %s: %w", bnstr, err)
}
bootstrappers = append(bootstrappers, *bn)
}

return dmode, bootstrappers
return dmode, bootstrappers, nil
}
5 changes: 4 additions & 1 deletion core/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ func (d *Daemon) setup(ctx context.Context, cfg commons.Config) (err error) {

if cfg.Discovery != nil {
cfg.Discovery.Defaults()
dmode, bootstrappers := parseDiscoveryConfig(*cfg.Discovery)
dmode, bootstrappers, err := parseDiscoveryConfig(*cfg.Discovery)
if err != nil {
return err
}
dhtOpts := []dht.Option{
dht.ProtocolPrefix(protocol.ID(cfg.Discovery.ProtocolPrefix)),
dht.Mode(dmode),
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ require (
github.com/libp2p/go-libp2p v0.31.0
github.com/libp2p/go-libp2p-kad-dht v0.25.1
github.com/libp2p/go-libp2p-pubsub v0.9.3
github.com/multiformats/go-multiaddr v0.11.0
go.uber.org/zap v1.25.0
gopkg.in/yaml.v3 v3.0.1
)

require (
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/multiformats/go-multiaddr v0.11.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
)
Expand Down
4 changes: 2 additions & 2 deletions resources/config/default.p2pmq.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ discovery:
serviceTag: p2pmq/kad
mode: server
bootstrappers:
- /ip4/0.0.0.0/tcp/5001
pubsub:
- /ip4/0.0.0.0/tcp/5001/p2p/12D3KooWH9DU2whKMyL4WXExj2iZNHmEfg5kDtBKWfmw4QGjJ1eV # change peer id
pubsub:

0 comments on commit 922f2ec

Please sign in to comment.