Skip to content

Commit

Permalink
lint + small updates
Browse files Browse the repository at this point in the history
  • Loading branch information
amirylm committed Sep 18, 2023
1 parent 922f2ec commit 851d8f0
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 19 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ TEST_PKG?=./core/...
TEST_TIMEOUT?=2m

lint:
@docker run --rm -v $(shell pwd):/app -w /app golangci/golangci-lint:latest golangci-lint run -v --timeout=5m ./...
@docker run --rm -v $(shell pwd):/app -w /app golangci/golangci-lint:v1.54 golangci-lint run -v --timeout=5m ./...

fmt:
@go fmt ./...
Expand Down
8 changes: 5 additions & 3 deletions commons/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,11 @@ func (dc *DiscoveryConfig) Defaults() {

// PubsubConfig contains config for the pubsub router
type PubsubConfig struct {
Topics []TopicConfig `json:"topics" yaml:"topics"`
Overlay *OverlayParams `json:"overlay,omitempty" yaml:"overlay,omitempty"`
SubFilter *SubscriptionFilter `json:"subFilter,omitempty" yaml:"subFilter,omitempty"`
Topics []TopicConfig `json:"topics" yaml:"topics"`
Overlay *OverlayParams `json:"overlay,omitempty" yaml:"overlay,omitempty"`
SubFilter *SubscriptionFilter `json:"subFilter,omitempty" yaml:"subFilter,omitempty"`
MaxMessageSize int `json:"maxMessageSize,omitempty" yaml:"maxMessageSize,omitempty"`
Trace bool `json:"trace,omitempty" yaml:"trace,omitempty"`
}

// TopicConfig contains configuration of a pubsub topic
Expand Down
6 changes: 3 additions & 3 deletions core/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ type Daemon struct {
mdnsSvc mdns.Service
pubsub *pubsub.PubSub

pubsubState pubsubManager
denylist pubsub.Blacklist
router *Router[*pubsub.Message]
manager pubsubManager
denylist pubsub.Blacklist
router *Router[*pubsub.Message]
}

func NewDaemon(ctx context.Context, cfg commons.Config, router *Router[*pubsub.Message], lggrNS string) (*Daemon, error) {
Expand Down
4 changes: 3 additions & 1 deletion core/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ func setupDaemons(ctx context.Context, t *testing.T, n int, routingFn func(*pubs
fmt.Sprintf("%s/p2p/%s", bootAddr, boot.host.ID()),
},
},
Pubsub: &commons.PubsubConfig{},
Pubsub: &commons.PubsubConfig{
Trace: true,
},
}

d, err := NewDaemon(ctx, cfg, NewRouter(1024, 4, routingFn), fmt.Sprintf("peer-%d", i+1))
Expand Down
27 changes: 17 additions & 10 deletions core/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ func (d *Daemon) setupPubsubRouter(ctx context.Context, cfg commons.Config) erro
pubsub.WithMessageIdFn(msgIDSha256(20)),
}

if cfg.Pubsub.MaxMessageSize > 0 {
opts = append(opts, pubsub.WithMaxMessageSize(cfg.Pubsub.MaxMessageSize))
}

if overlay := cfg.Pubsub.Overlay; overlay != nil && overlay.SeenTtl.Milliseconds() > 0 {
opts = append(opts, pubsub.WithSeenMessagesTTL(cfg.Pubsub.Overlay.SeenTtl))
}
Expand All @@ -45,13 +49,17 @@ func (d *Daemon) setupPubsubRouter(ctx context.Context, cfg commons.Config) erro
opts = append(opts, pubsub.WithSubscriptionFilter(sf))
}

if cfg.Pubsub.Trace {
opts = append(opts, pubsub.WithEventTracer(newPubsubTracer(d.lggr.Named("PubsubTracer"))))
}

ps, err := pubsub.NewGossipSub(ctx, d.host, opts...)
if err != nil {
return err
}
d.pubsub = ps
d.denylist = denylist
d.pubsubState.topics = make(map[string]*topicWrapper)
d.manager.topics = make(map[string]*topicWrapper)

return nil
}
Expand All @@ -66,7 +74,7 @@ func (d *Daemon) Publish(ctx context.Context, topicName string, data []byte) err
}

func (d *Daemon) Leave(topicName string) error {
tw := d.pubsubState.getTopicWrapper(topicName)
tw := d.manager.getTopicWrapper(topicName)
state := tw.state.Load()
switch state {
case topicStateJoined, topicStateErr:
Expand All @@ -82,7 +90,7 @@ func (d *Daemon) Leave(topicName string) error {
}

func (d *Daemon) Unsubscribe(topicName string) {
tw := d.pubsubState.getTopicWrapper(topicName)
tw := d.manager.getTopicWrapper(topicName)
if tw.state.Load() != topicStateUnknown {
return
}
Expand Down Expand Up @@ -134,34 +142,35 @@ func (d *Daemon) listenSubscription(sub *pubsub.Subscription) {
}

func (d *Daemon) tryJoin(topicName string) (*pubsub.Topic, error) {
topicW := d.pubsubState.getTopicWrapper(topicName)
topicW := d.manager.getTopicWrapper(topicName)
if topicW != nil {
if topicW.state.Load() == topicStateJoining {
return nil, fmt.Errorf("already tring to join topic %s", topicName)
}
return topicW.topic, nil
}
d.pubsubState.joiningTopic(topicName)
d.manager.joiningTopic(topicName)
topic, err := d.pubsub.Join(topicName)
if err != nil {
return nil, err
}
d.pubsubState.upgradeTopic(topicName, topic)
d.manager.upgradeTopic(topicName, topic)

return topic, nil
}

func (d *Daemon) trySubscribe(topic *pubsub.Topic) (sub *pubsub.Subscription, err error) {
topicName := topic.String()
sub = d.pubsubState.getSub(topicName)
sub = d.manager.getSub(topicName)
if sub != nil {
return nil, nil
}
// TODO: create []pubsub.SubOpt
sub, err = topic.Subscribe()
if err != nil {
return nil, err
}
d.pubsubState.addSub(topicName, sub)
d.manager.addSub(topicName, sub)
return sub, nil
}

Expand Down Expand Up @@ -233,8 +242,6 @@ func (pm *pubsubManager) joiningTopic(name string) {
pm.lock.Lock()
defer pm.lock.Unlock()

// TODO: clear previous topic if exist

tw := &topicWrapper{}
tw.state.Store(topicStateJoining)
pm.topics[name] = tw
Expand Down
1 change: 0 additions & 1 deletion core/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ func (d *Daemon) setupMdnsDiscovery(ctx context.Context, host host.Host, service
d.connect(p)
case <-ctx.Done():
return
default:
}
}
}()
Expand Down

0 comments on commit 851d8f0

Please sign in to comment.