From b46c5ec8ba811cf743644e394e8b613eebd2c74c Mon Sep 17 00:00:00 2001 From: Keenan Nemetz Date: Thu, 13 Jun 2024 19:20:33 -0700 Subject: [PATCH] test walk sync process --- go.mod | 2 + go.sum | 2 + net/process.go | 124 ------------------------------------------------- net/server.go | 42 +++++++---------- 4 files changed, 21 insertions(+), 149 deletions(-) delete mode 100644 net/process.go diff --git a/go.mod b/go.mod index fc838114a6..3bdbf60438 100644 --- a/go.mod +++ b/go.mod @@ -185,10 +185,12 @@ require ( github.com/ipfs/go-ipfs-delay v0.0.1 // indirect github.com/ipfs/go-ipfs-pq v0.0.3 // indirect github.com/ipfs/go-ipfs-util v0.0.3 // indirect + github.com/ipfs/go-ipld-legacy v0.2.1 // indirect github.com/ipfs/go-log v1.0.5 // indirect github.com/ipfs/go-metrics-interface v0.0.1 // indirect github.com/ipfs/go-peertaskqueue v0.8.1 // indirect github.com/ipfs/kubo v0.25.0 // indirect + github.com/ipld/go-codec-dagpb v1.6.0 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect github.com/jmhodges/levigo v1.0.0 // indirect diff --git a/go.sum b/go.sum index 23410e3394..9842b09146 100644 --- a/go.sum +++ b/go.sum @@ -1166,6 +1166,8 @@ github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49u github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM= github.com/vito/go-sse v1.0.0 h1:e6/iTrrvy8BRrOwJwmQmlndlil+TLdxXvHi55ZDzH6M= github.com/vito/go-sse v1.0.0/go.mod h1:2wkcaQ+jtlZ94Uve8gYZjFpL68luAjssTINA2hpgcZs= +github.com/warpfork/go-testmark v0.12.1 h1:rMgCpJfwy1sJ50x0M0NgyphxYYPMOODIJHhsXyEHU0s= +github.com/warpfork/go-testmark v0.12.1/go.mod h1:kHwy7wfvGSPh1rQJYKayD4AbtNaeyZdcGi9tNJTaa5Y= github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0 h1:GDDkbFiaK8jsSDJfjId/PEGEShv6ugrt4kYsC5UIDaQ= github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw= github.com/wasmerio/wasmer-go v1.0.4 h1:MnqHoOGfiQ8MMq2RF6wyCeebKOe84G88h5yv+vmxJgs= diff --git a/net/process.go b/net/process.go deleted file mode 100644 index b4f85134fb..0000000000 --- a/net/process.go +++ /dev/null @@ -1,124 +0,0 @@ -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package net - -import ( - "context" - "sync" - "time" - - "github.com/ipfs/boxo/blockservice" - "github.com/ipfs/go-cid" - "github.com/ipld/go-ipld-prime/linking" - cidlink "github.com/ipld/go-ipld-prime/linking/cid" - "github.com/sourcenetwork/corelog" - - coreblock "github.com/sourcenetwork/defradb/internal/core/block" -) - -var ( - dagSyncTimeout = time.Second * 60 -) - -type blockProcessor struct { - *Peer - wg *sync.WaitGroup - bsSession *blockservice.Session - queuedChildren *sync.Map -} - -func newBlockProcessor( - ctx context.Context, - p *Peer, -) *blockProcessor { - return &blockProcessor{ - Peer: p, - wg: &sync.WaitGroup{}, - bsSession: blockservice.NewSession(ctx, p.bserv), - queuedChildren: &sync.Map{}, - } -} - -// processRemoteBlock stores the block in the DAG store and initiates a sync of the block's children. -func (bp *blockProcessor) processRemoteBlock( - ctx context.Context, - block *coreblock.Block, -) error { - // Store the block in the DAG store - lsys := cidlink.DefaultLinkSystem() - lsys.SetWriteStorage(bp.db.Blockstore().AsIPLDStorage()) - _, err := lsys.Store(linking.LinkContext{Ctx: ctx}, coreblock.GetLinkPrototype(), block.GenerateNode()) - if err != nil { - return err - } - // Initiate a sync of the block's children - bp.wg.Add(1) - bp.handleChildBlocks(ctx, block) - - return nil -} - -func (bp *blockProcessor) handleChildBlocks( - ctx context.Context, - block *coreblock.Block, -) { - defer bp.wg.Done() - - if len(block.Links) == 0 { - return - } - - links := make([]cid.Cid, 0, len(block.Links)) - for _, link := range block.Links { - exists, err := bp.db.Blockstore().Has(ctx, link.Cid) - if err != nil { - log.ErrorContextE( - ctx, - "Failed to check if block exists", - err, - corelog.Any("CID", link.Cid), - ) - continue - } - if exists { - continue - } - if _, loaded := bp.queuedChildren.LoadOrStore(link.Cid, struct{}{}); !loaded { - links = append(links, link.Cid) - } - } - - getCtx, cancel := context.WithTimeout(ctx, dagSyncTimeout) - defer cancel() - - childBlocks := bp.bsSession.GetBlocks(getCtx, links) - - for rawBlock := range childBlocks { - block, err := coreblock.GetFromBytes(rawBlock.RawData()) - if err != nil { - log.ErrorContextE( - ctx, - "Failed to get block from bytes", - err, - corelog.Any("CID", rawBlock.Cid()), - ) - continue - } - bp.wg.Add(1) - go bp.handleChildBlocks(ctx, block) - } - - for _, link := range links { - bp.queuedChildren.Delete(link) - } -} diff --git a/net/server.go b/net/server.go index fe2a3ae943..890f09aeea 100644 --- a/net/server.go +++ b/net/server.go @@ -17,7 +17,8 @@ import ( "fmt" "sync" - "github.com/ipfs/go-cid" + "github.com/ipfs/boxo/ipld/merkledag" + cid "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p/core/event" libpeer "github.com/libp2p/go-libp2p/core/peer" "github.com/sourcenetwork/corelog" @@ -28,9 +29,9 @@ import ( "google.golang.org/protobuf/proto" "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/defradb/datastore/badger/v4" "github.com/sourcenetwork/defradb/errors" "github.com/sourcenetwork/defradb/events" - coreblock "github.com/sourcenetwork/defradb/internal/core/block" pb "github.com/sourcenetwork/defradb/net/pb" ) @@ -150,7 +151,7 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL if err != nil { return nil, err } - cid, err := cid.Cast(req.Body.Cid) + headCID, err := cid.Cast(req.Body.Cid) if err != nil { return nil, err } @@ -177,38 +178,29 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL } }() - // check if we already have this block - exists, err := s.peer.db.Blockstore().Has(ctx, cid) - if err != nil { - return nil, NewErrCheckingForExistingBlock(err, cid.String()) - } - if exists { - return &pb.PushLogReply{}, nil + onError := merkledag.OnError(func(c cid.Cid, err error) error { + if errors.Is(err, badger.ErrTxnConflict) { + return nil // transaction conflicts are fine to ignore + } + return err + }) + visit := func(c cid.Cid) bool { + has, _ := s.peer.db.Blockstore().Has(ctx, c) + return !has } - - block, err := coreblock.GetFromBytes(req.Body.Log.Block) + dagServ := merkledag.NewDAGService(s.peer.bserv) + walkOpts := []merkledag.WalkOption{onError, merkledag.Concurrent()} + err = merkledag.Walk(ctx, dagServ.GetLinks, headCID, visit, walkOpts...) if err != nil { return nil, err } - bp := newBlockProcessor(ctx, s.peer) - err = bp.processRemoteBlock(ctx, block) - if err != nil { - log.ErrorContextE( - ctx, - "Failed to process remote block", - err, - corelog.String("DocID", docID.String()), - corelog.Any("CID", cid), - ) - } - bp.wg.Wait() if s.peer.db.Events().DAGMerges.HasValue() { wg := &sync.WaitGroup{} wg.Add(1) s.peer.db.Events().DAGMerges.Value().Publish(events.DAGMerge{ DocID: docID.String(), - Cid: cid, + Cid: headCID, SchemaRoot: string(req.Body.SchemaRoot), Wg: wg, })