Skip to content

Commit

Permalink
apply feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
fredcarle committed Jun 3, 2024
1 parent 307bcb1 commit 6cf9e4f
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 42 deletions.
8 changes: 4 additions & 4 deletions internal/db/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (db *db) newMergeProcessor(

type mergeTarget struct {
heads map[cid.Cid]*coreblock.Block
headHeigth uint64
headHeight uint64
}

func newMergeTarget() mergeTarget {
Expand Down Expand Up @@ -189,7 +189,7 @@ func (mp *mergeProcessor) loadComposites(
return nil
}

if block.Delta.GetPriority() >= mt.headHeigth {
if block.Delta.GetPriority() >= mt.headHeight {
mp.composites.PushFront(block)
for _, link := range block.Links {
if link.Name == core.HEAD {
Expand All @@ -214,7 +214,7 @@ func (mp *mergeProcessor) loadComposites(
}

newMT.heads[link.Cid] = childBlock
newMT.headHeigth = childBlock.Delta.GetPriority()
newMT.headHeight = childBlock.Delta.GetPriority()
}
}
return mp.loadComposites(ctx, blockCid, newMT)
Expand Down Expand Up @@ -385,7 +385,7 @@ func getHeadsAsMergeTarget(ctx context.Context, txn datastore.Txn, dsKey core.Da

mt.heads[cid] = block
// All heads have the same height so overwriting is ok.
mt.headHeigth = block.Delta.GetPriority()
mt.headHeight = block.Delta.GetPriority()
}
return mt, nil
}
Expand Down
42 changes: 4 additions & 38 deletions net/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type blockProcessor struct {
*Peer
wg *sync.WaitGroup
bsSession *blockservice.Session
queuedChildren *cidSafeSet
queuedChildren *sync.Map
}

func newBlockProcessor(
Expand All @@ -45,7 +45,7 @@ func newBlockProcessor(
Peer: p,
wg: &sync.WaitGroup{},
bsSession: blockservice.NewSession(ctx, p.bserv),
queuedChildren: newCidSafeSet(),
queuedChildren: &sync.Map{},
}
}

Expand Down Expand Up @@ -93,7 +93,7 @@ func (bp *blockProcessor) handleChildBlocks(
if exists {
continue
}
if bp.queuedChildren.Visit(link.Cid) {
if _, loaded := bp.queuedChildren.LoadOrStore(link.Cid, struct{}{}); !loaded {
links = append(links, link.Cid)
}
}
Expand All @@ -119,40 +119,6 @@ func (bp *blockProcessor) handleChildBlocks(
}

for _, link := range links {
bp.queuedChildren.Remove(link)
bp.queuedChildren.Delete(link)
}
}

type cidSafeSet struct {
set map[cid.Cid]struct{}
mux sync.Mutex
}

func newCidSafeSet() *cidSafeSet {
return &cidSafeSet{
set: make(map[cid.Cid]struct{}),
}
}

// Visit checks if we can visit this node, or
// if its already being visited
func (s *cidSafeSet) Visit(c cid.Cid) bool {
var b bool
s.mux.Lock()
{
if _, ok := s.set[c]; !ok {
s.set[c] = struct{}{}
b = true
}
}
s.mux.Unlock()
return b
}

func (s *cidSafeSet) Remove(c cid.Cid) {
s.mux.Lock()
{
delete(s.set, c)
}
s.mux.Unlock()
}

0 comments on commit 6cf9e4f

Please sign in to comment.