Skip to content

Commit

Permalink
test walk sync process
Browse files Browse the repository at this point in the history
  • Loading branch information
nasdf committed Jun 14, 2024
1 parent 10e35c3 commit b46c5ec
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 149 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,12 @@ require (
github.com/ipfs/go-ipfs-delay v0.0.1 // indirect
github.com/ipfs/go-ipfs-pq v0.0.3 // indirect
github.com/ipfs/go-ipfs-util v0.0.3 // indirect
github.com/ipfs/go-ipld-legacy v0.2.1 // indirect
github.com/ipfs/go-log v1.0.5 // indirect
github.com/ipfs/go-metrics-interface v0.0.1 // indirect
github.com/ipfs/go-peertaskqueue v0.8.1 // indirect
github.com/ipfs/kubo v0.25.0 // indirect
github.com/ipld/go-codec-dagpb v1.6.0 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/jmhodges/levigo v1.0.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,8 @@ github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49u
github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM=
github.com/vito/go-sse v1.0.0 h1:e6/iTrrvy8BRrOwJwmQmlndlil+TLdxXvHi55ZDzH6M=
github.com/vito/go-sse v1.0.0/go.mod h1:2wkcaQ+jtlZ94Uve8gYZjFpL68luAjssTINA2hpgcZs=
github.com/warpfork/go-testmark v0.12.1 h1:rMgCpJfwy1sJ50x0M0NgyphxYYPMOODIJHhsXyEHU0s=
github.com/warpfork/go-testmark v0.12.1/go.mod h1:kHwy7wfvGSPh1rQJYKayD4AbtNaeyZdcGi9tNJTaa5Y=
github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0 h1:GDDkbFiaK8jsSDJfjId/PEGEShv6ugrt4kYsC5UIDaQ=
github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw=
github.com/wasmerio/wasmer-go v1.0.4 h1:MnqHoOGfiQ8MMq2RF6wyCeebKOe84G88h5yv+vmxJgs=
Expand Down
124 changes: 0 additions & 124 deletions net/process.go

This file was deleted.

42 changes: 17 additions & 25 deletions net/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import (
"fmt"
"sync"

"github.com/ipfs/go-cid"
"github.com/ipfs/boxo/ipld/merkledag"
cid "github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p/core/event"
libpeer "github.com/libp2p/go-libp2p/core/peer"
"github.com/sourcenetwork/corelog"
Expand All @@ -28,9 +29,9 @@ import (
"google.golang.org/protobuf/proto"

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/datastore/badger/v4"
"github.com/sourcenetwork/defradb/errors"
"github.com/sourcenetwork/defradb/events"
coreblock "github.com/sourcenetwork/defradb/internal/core/block"
pb "github.com/sourcenetwork/defradb/net/pb"
)

Expand Down Expand Up @@ -150,7 +151,7 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL
if err != nil {
return nil, err
}
cid, err := cid.Cast(req.Body.Cid)
headCID, err := cid.Cast(req.Body.Cid)
if err != nil {
return nil, err
}
Expand All @@ -177,38 +178,29 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL
}
}()

// check if we already have this block
exists, err := s.peer.db.Blockstore().Has(ctx, cid)
if err != nil {
return nil, NewErrCheckingForExistingBlock(err, cid.String())
}
if exists {
return &pb.PushLogReply{}, nil
onError := merkledag.OnError(func(c cid.Cid, err error) error {
if errors.Is(err, badger.ErrTxnConflict) {
return nil // transaction conflicts are fine to ignore

Check warning on line 183 in net/server.go

View check run for this annotation

Codecov / codecov/patch

net/server.go#L182-L183

Added lines #L182 - L183 were not covered by tests
}
return err

Check warning on line 185 in net/server.go

View check run for this annotation

Codecov / codecov/patch

net/server.go#L185

Added line #L185 was not covered by tests
})
visit := func(c cid.Cid) bool {
has, _ := s.peer.db.Blockstore().Has(ctx, c)
return !has
}

block, err := coreblock.GetFromBytes(req.Body.Log.Block)
dagServ := merkledag.NewDAGService(s.peer.bserv)
walkOpts := []merkledag.WalkOption{onError, merkledag.Concurrent()}
err = merkledag.Walk(ctx, dagServ.GetLinks, headCID, visit, walkOpts...)
if err != nil {
return nil, err
}

bp := newBlockProcessor(ctx, s.peer)
err = bp.processRemoteBlock(ctx, block)
if err != nil {
log.ErrorContextE(
ctx,
"Failed to process remote block",
err,
corelog.String("DocID", docID.String()),
corelog.Any("CID", cid),
)
}
bp.wg.Wait()
if s.peer.db.Events().DAGMerges.HasValue() {
wg := &sync.WaitGroup{}
wg.Add(1)
s.peer.db.Events().DAGMerges.Value().Publish(events.DAGMerge{
DocID: docID.String(),
Cid: cid,
Cid: headCID,
SchemaRoot: string(req.Body.SchemaRoot),
Wg: wg,
})
Expand Down

0 comments on commit b46c5ec

Please sign in to comment.