Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: DAG sync and move merge outside of net package #2658

Merged
1 change: 1 addition & 0 deletions cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
node.WithPeers(peers...),
// db options
db.WithUpdateEvents(),
db.WithDAGMergeEvents(),

Check warning on line 119 in cli/start.go

View check run for this annotation

Codecov / codecov/patch

cli/start.go#L119

Added line #L119 was not covered by tests
db.WithMaxRetries(cfg.GetInt("datastore.MaxTxnRetries")),
// net node options
net.WithListenAddresses(cfg.GetStringSlice("net.p2pAddresses")...),
Expand Down
15 changes: 0 additions & 15 deletions client/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,21 +122,6 @@ type Collection interface {

// GetIndexes returns all the indexes that exist on the collection.
GetIndexes(ctx context.Context) ([]IndexDescription, error)

// CreateDocIndex creates an index for the given document.
// WARNING: This method is only for internal use and is not supposed to be called by the client
// as it might compromise the integrity of the database. This method will be removed in the future
CreateDocIndex(context.Context, *Document) error

// UpdateDocIndex updates the index for the given document.
// WARNING: This method is only for internal use and is not supposed to be called by the client
// as it might compromise the integrity of the database. This method will be removed in the future
UpdateDocIndex(ctx context.Context, oldDoc, newDoc *Document) error

// DeleteDocIndex deletes the index for the given document.
// WARNING: This method is only for internal use and is not supposed to be called by the client
// as it might compromise the integrity of the database. This method will be removed in the future
DeleteDocIndex(context.Context, *Document) error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

praise: nice to see these removed from the client interface

}

// DocIDResult wraps the result of an attempt at a DocID retrieval operation.
Expand Down
130 changes: 0 additions & 130 deletions client/mocks/collection.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 8 additions & 4 deletions datastore/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,10 +347,14 @@ func (d *Datastore) executePurge(ctx context.Context) {
}

func (d *Datastore) handleContextDone(ctx context.Context) {
<-ctx.Done()
// It is safe to ignore the error since the only error that could occur is if the
// datastore is already closed, in which case the purpose of the `Close` call is already covered.
_ = d.Close()
select {
case <-d.closing:
return
case <-ctx.Done():
// It is safe to ignore the error since the only error that could occur is if the
// datastore is already closed, in which case the purpose of the `Close` call is already covered.
_ = d.Close()
}
}

// commit commits the given transaction to the datastore.
Expand Down
33 changes: 33 additions & 0 deletions events/dag_sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package events

import (
"sync"

"github.com/ipfs/go-cid"

"github.com/sourcenetwork/immutable"
)

// DAGMergeChannel is the bus onto which dag merge are published.
type DAGMergeChannel = immutable.Option[Channel[DAGMerge]]

// DAGMerge is a notification that a merge can be performed up to the provided CID.
type DAGMerge struct {
// Cid is the id of the composite commit that formed this update in the DAG.
Cid cid.Cid
// SchemaRoot is the root identifier of the schema that defined the shape of the document that was updated.
SchemaRoot string
// Wg is a wait group that can be used to synchronize the merge,
// allowing the caller to optionnaly block until the merge is complete.
Wg *sync.WaitGroup
}
3 changes: 3 additions & 0 deletions events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,7 @@ func New[T any](commandBufferSize int, eventBufferSize int) Channel[T] {
type Events struct {
// Updates publishes an `Update` for each document written to in the database.
Updates UpdateChannel

// DAGMerges publishes a `DAGMerge` for each completed DAG sync process over P2P.
DAGMerges DAGMergeChannel
}
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1166,8 +1166,6 @@ github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49u
github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM=
github.com/vito/go-sse v1.0.0 h1:e6/iTrrvy8BRrOwJwmQmlndlil+TLdxXvHi55ZDzH6M=
github.com/vito/go-sse v1.0.0/go.mod h1:2wkcaQ+jtlZ94Uve8gYZjFpL68luAjssTINA2hpgcZs=
github.com/warpfork/go-testmark v0.12.1 h1:rMgCpJfwy1sJ50x0M0NgyphxYYPMOODIJHhsXyEHU0s=
github.com/warpfork/go-testmark v0.12.1/go.mod h1:kHwy7wfvGSPh1rQJYKayD4AbtNaeyZdcGi9tNJTaa5Y=
github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0 h1:GDDkbFiaK8jsSDJfjId/PEGEShv6ugrt4kYsC5UIDaQ=
github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw=
github.com/wasmerio/wasmer-go v1.0.4 h1:MnqHoOGfiQ8MMq2RF6wyCeebKOe84G88h5yv+vmxJgs=
Expand Down
12 changes: 0 additions & 12 deletions http/client_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,15 +413,3 @@ func (c *Collection) GetIndexes(ctx context.Context) ([]client.IndexDescription,
}
return indexes, nil
}

func (c *Collection) CreateDocIndex(context.Context, *client.Document) error {
return ErrMethodIsNotImplemented
}

func (c *Collection) UpdateDocIndex(ctx context.Context, oldDoc, newDoc *client.Document) error {
return ErrMethodIsNotImplemented
}

func (c *Collection) DeleteDocIndex(context.Context, *client.Document) error {
return ErrMethodIsNotImplemented
}
4 changes: 2 additions & 2 deletions internal/core/crdt/composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
return &CompositeDAGDelta{
DocID: []byte(c.key.DocID),
FieldName: c.fieldName,
SchemaVersionID: c.schemaVersionKey.SchemaVersionId,
SchemaVersionID: c.schemaVersionKey.SchemaVersionID,
Status: status,
}
}
Expand Down Expand Up @@ -130,7 +130,7 @@
// been migrated yet locally.
schemaVersionId = dagDelta.SchemaVersionID
} else {
schemaVersionId = c.schemaVersionKey.SchemaVersionId
schemaVersionId = c.schemaVersionKey.SchemaVersionID

Check warning on line 133 in internal/core/crdt/composite.go

View check run for this annotation

Codecov / codecov/patch

internal/core/crdt/composite.go#L133

Added line #L133 was not covered by tests
}

err = c.store.Put(ctx, versionKey.ToDS(), []byte(schemaVersionId))
Expand Down
2 changes: 1 addition & 1 deletion internal/core/crdt/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (c Counter[T]) Increment(ctx context.Context, value T) (*CounterDelta[T], e
DocID: []byte(c.key.DocID),
FieldName: c.fieldName,
Data: value,
SchemaVersionID: c.schemaVersionKey.SchemaVersionId,
SchemaVersionID: c.schemaVersionKey.SchemaVersionID,
Nonce: nonce,
}, nil
}
Expand Down
2 changes: 1 addition & 1 deletion internal/core/crdt/lwwreg.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (reg LWWRegister) Set(value []byte) *LWWRegDelta {
Data: value,
DocID: []byte(reg.key.DocID),
FieldName: reg.fieldName,
SchemaVersionID: reg.schemaVersionKey.SchemaVersionId,
SchemaVersionID: reg.schemaVersionKey.SchemaVersionID,
}
}

Expand Down
10 changes: 5 additions & 5 deletions internal/core/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ var _ Key = (*CollectionNameKey)(nil)
//
// This key should be removed in https://github.com/sourcenetwork/defradb/issues/1085
type CollectionSchemaVersionKey struct {
SchemaVersionId string
SchemaVersionID string
CollectionID uint32
}

Expand Down Expand Up @@ -296,7 +296,7 @@ func NewCollectionNameKey(name string) CollectionNameKey {

func NewCollectionSchemaVersionKey(schemaVersionId string, collectionID uint32) CollectionSchemaVersionKey {
return CollectionSchemaVersionKey{
SchemaVersionId: schemaVersionId,
SchemaVersionID: schemaVersionId,
CollectionID: collectionID,
}
}
Expand All @@ -309,7 +309,7 @@ func NewCollectionSchemaVersionKeyFromString(key string) (CollectionSchemaVersio
}

return CollectionSchemaVersionKey{
SchemaVersionId: elements[len(elements)-2],
SchemaVersionID: elements[len(elements)-2],
CollectionID: uint32(colID),
}, nil
}
Expand Down Expand Up @@ -591,8 +591,8 @@ func (k CollectionNameKey) ToDS() ds.Key {
func (k CollectionSchemaVersionKey) ToString() string {
result := COLLECTION_SCHEMA_VERSION

if k.SchemaVersionId != "" {
result = result + "/" + k.SchemaVersionId
if k.SchemaVersionID != "" {
result = result + "/" + k.SchemaVersionID
}

if k.CollectionID != 0 {
Expand Down
46 changes: 3 additions & 43 deletions internal/db/collection_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,53 +109,13 @@ func (db *db) fetchCollectionIndexDescriptions(
return indexDescriptions, nil
}

func (c *collection) CreateDocIndex(ctx context.Context, doc *client.Document) error {
ctx, txn, err := ensureContextTxn(ctx, c.db, false)
if err != nil {
return err
}
defer txn.Discard(ctx)

err = c.indexNewDoc(ctx, doc)
if err != nil {
return err
}

return txn.Commit(ctx)
}

func (c *collection) UpdateDocIndex(ctx context.Context, oldDoc, newDoc *client.Document) error {
ctx, txn, err := ensureContextTxn(ctx, c.db, false)
if err != nil {
return err
}
defer txn.Discard(ctx)

err = c.deleteIndexedDoc(ctx, oldDoc)
if err != nil {
return err
}
err = c.indexNewDoc(ctx, newDoc)
if err != nil {
return err
}

return txn.Commit(ctx)
}

func (c *collection) DeleteDocIndex(ctx context.Context, doc *client.Document) error {
ctx, txn, err := ensureContextTxn(ctx, c.db, false)
func (c *collection) updateDocIndex(ctx context.Context, oldDoc, newDoc *client.Document) error {
err := c.deleteIndexedDoc(ctx, oldDoc)
if err != nil {
return err
}
defer txn.Discard(ctx)

err = c.deleteIndexedDoc(ctx, doc)
if err != nil {
return err
}

return txn.Commit(ctx)
return c.indexNewDoc(ctx, newDoc)
}

func (c *collection) indexNewDoc(ctx context.Context, doc *client.Document) error {
Expand Down
Loading
Loading