diff --git a/internal/db/collection.go b/internal/db/collection.go index 1e143e56cd..ee372db469 100644 --- a/internal/db/collection.go +++ b/internal/db/collection.go @@ -71,7 +71,7 @@ func (c *collection) newFetcher() fetcher.Fetcher { if c.fetcherFactory != nil { innerFetcher = c.fetcherFactory() } else { - innerFetcher = new(fetcher.DocumentFetcher) + innerFetcher = fetcher.NewDocumentFetcher() } return lens.NewFetcher(innerFetcher, c.db.LensRegistry()) diff --git a/internal/db/fetcher/deleted.go b/internal/db/fetcher/deleted.go new file mode 100644 index 0000000000..eeb127f479 --- /dev/null +++ b/internal/db/fetcher/deleted.go @@ -0,0 +1,95 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package fetcher + +import ( + "errors" + + "github.com/sourcenetwork/immutable" +) + +// deleted is a fetcher that orchastrates the fetching of deleted and active documents. +type deleted struct { + activeFetcher fetcher + activeDocID immutable.Option[string] + + deletedFetcher fetcher + deletedDocID immutable.Option[string] + + currentFetcher fetcher +} + +var _ fetcher = (*deleted)(nil) + +func newDeletedFetcher( + activeFetcher fetcher, + deletedFetcher fetcher, +) *deleted { + return &deleted{ + activeFetcher: activeFetcher, + deletedFetcher: deletedFetcher, + } +} + +func (f *deleted) NextDoc() (immutable.Option[string], error) { + if !f.activeDocID.HasValue() { + var err error + f.activeDocID, err = f.activeFetcher.NextDoc() + if err != nil { + return immutable.None[string](), err + } + } + + if !f.deletedDocID.HasValue() { + var err error + f.deletedDocID, err = f.deletedFetcher.NextDoc() + if err != nil { + return immutable.None[string](), err + } + } + + if !f.activeDocID.HasValue() || (f.deletedDocID.HasValue() && f.deletedDocID.Value() < f.activeDocID.Value()) { + f.currentFetcher = f.deletedFetcher + return f.deletedDocID, nil + } + + f.currentFetcher = f.activeFetcher + return f.activeDocID, nil +} + +func (f *deleted) GetFields() (immutable.Option[EncodedDocument], error) { + doc, err := f.currentFetcher.GetFields() + if err != nil { + return immutable.None[EncodedDocument](), err + } + + if f.activeFetcher == f.currentFetcher { + f.activeDocID = immutable.None[string]() + } else { + f.deletedDocID = immutable.None[string]() + } + + return doc, nil +} + +func (f *deleted) Close() error { + activeErr := f.activeFetcher.Close() + if activeErr != nil { + deletedErr := f.deletedFetcher.Close() + if deletedErr != nil { + return errors.Join(activeErr, deletedErr) + } + + return activeErr + } + + return f.deletedFetcher.Close() +} diff --git a/internal/db/fetcher/document.go b/internal/db/fetcher/document.go new file mode 100644 index 0000000000..8f4e69cf76 --- /dev/null +++ b/internal/db/fetcher/document.go @@ -0,0 +1,197 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package fetcher + +import ( + "bytes" + "context" + + dsq "github.com/ipfs/go-datastore/query" + "github.com/sourcenetwork/immutable" + + "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/defradb/datastore/iterable" + "github.com/sourcenetwork/defradb/internal/db/base" + "github.com/sourcenetwork/defradb/internal/keys" +) + +// document is the type responsible for fetching documents from the datastore. +// +// It does not filter the data in any way. +type document struct { + // The set of fields to fetch, mapped by field ID. + fieldsByID map[uint32]client.FieldDefinition + // The status to assign fetched documents. + status client.DocumentStatus + // Statistics on the actions of this instance. + execInfo *ExecInfo + // The iterable results that documents will be fetched from. + kvResultsIter dsq.Results + + // The most recently yielded item from kvResultsIter. + currentKV keyValue + // nextKV may hold a datastore key value retrieved from kvResultsIter + // that was not yet ready to be yielded from the instance. + // + // When the next document is requested, this value should be yielded + // before resuming iteration through the kvResultsIter. + nextKV immutable.Option[keyValue] +} + +var _ fetcher = (*document)(nil) + +func newDocumentFetcher( + ctx context.Context, + fieldsByID map[uint32]client.FieldDefinition, + kvIter iterable.Iterator, + prefix keys.DataStoreKey, + status client.DocumentStatus, + execInfo *ExecInfo, +) (*document, error) { + if status == client.Active { + prefix = prefix.WithValueFlag() + } else if status == client.Deleted { + prefix = prefix.WithDeletedFlag() + } + + kvResultsIter, err := kvIter.IteratePrefix(ctx, prefix.ToDS(), prefix.PrefixEnd().ToDS()) + if err != nil { + return nil, err + } + + return &document{ + fieldsByID: fieldsByID, + kvResultsIter: kvResultsIter, + status: status, + execInfo: execInfo, + }, nil +} + +// keyValue is a KV store response containing the resulting core.DataStoreKey and byte array value. +type keyValue struct { + Key keys.DataStoreKey + Value []byte +} + +func (f *document) NextDoc() (immutable.Option[string], error) { + if f.nextKV.HasValue() { + docID := f.nextKV.Value().Key.DocID + f.currentKV = f.nextKV.Value() + + f.nextKV = immutable.None[keyValue]() + f.execInfo.DocsFetched++ + + return immutable.Some(docID), nil + } + + for { + res, ok := f.kvResultsIter.NextSync() + if res.Error != nil { + return immutable.None[string](), res.Error + } + if !ok { + return immutable.None[string](), nil + } + + dsKey, err := keys.NewDataStoreKey(res.Key) + if err != nil { + return immutable.None[string](), err + } + + if dsKey.DocID != f.currentKV.Key.DocID { + f.currentKV = keyValue{ + Key: dsKey, + Value: res.Value, + } + break + } + } + + f.execInfo.DocsFetched++ + + return immutable.Some(f.currentKV.Key.DocID), nil +} + +func (f *document) GetFields() (immutable.Option[EncodedDocument], error) { + doc := encodedDocument{} + doc.id = []byte(f.currentKV.Key.DocID) + doc.status = f.status + doc.properties = map[client.FieldDefinition]*encProperty{} + + err := f.appendKv(&doc, f.currentKV) + if err != nil { + return immutable.None[EncodedDocument](), err + } + + for { + res, ok := f.kvResultsIter.NextSync() + if !ok { + break + } + + dsKey, err := keys.NewDataStoreKey(res.Key) + if err != nil { + return immutable.None[EncodedDocument](), err + } + + kv := keyValue{ + Key: dsKey, + Value: res.Value, + } + + if dsKey.DocID != f.currentKV.Key.DocID { + f.nextKV = immutable.Some(kv) + break + } + + err = f.appendKv(&doc, kv) + if err != nil { + return immutable.None[EncodedDocument](), err + } + } + + return immutable.Some[EncodedDocument](&doc), nil +} + +func (f *document) appendKv(doc *encodedDocument, kv keyValue) error { + if kv.Key.FieldID == keys.DATASTORE_DOC_VERSION_FIELD_ID { + doc.schemaVersionID = string(kv.Value) + return nil + } + + // we have to skip the object marker + if bytes.Equal(kv.Value, []byte{base.ObjectMarker}) { + return nil + } + + fieldID, err := kv.Key.FieldIDAsUint() + if err != nil { + return err + } + + fieldDesc, ok := f.fieldsByID[fieldID] + if !ok { + return nil + } + + f.execInfo.FieldsFetched++ + + doc.properties[fieldDesc] = &encProperty{ + Desc: fieldDesc, + Raw: kv.Value, + } + + return nil +} + +func (f *document) Close() error { + return f.kvResultsIter.Close() +} diff --git a/internal/db/fetcher/fetcher.go b/internal/db/fetcher/fetcher.go index 877cbfa7c8..61c4ef5166 100644 --- a/internal/db/fetcher/fetcher.go +++ b/internal/db/fetcher/fetcher.go @@ -11,13 +11,7 @@ package fetcher import ( - "bytes" "context" - "slices" - "strings" - - "github.com/bits-and-blooms/bitset" - dsq "github.com/ipfs/go-datastore/query" "github.com/sourcenetwork/immutable" @@ -25,13 +19,9 @@ import ( acpIdentity "github.com/sourcenetwork/defradb/acp/identity" "github.com/sourcenetwork/defradb/client" "github.com/sourcenetwork/defradb/datastore" - "github.com/sourcenetwork/defradb/datastore/iterable" "github.com/sourcenetwork/defradb/internal/core" - "github.com/sourcenetwork/defradb/internal/db/base" - "github.com/sourcenetwork/defradb/internal/db/permission" "github.com/sourcenetwork/defradb/internal/keys" "github.com/sourcenetwork/defradb/internal/planner/mapper" - "github.com/sourcenetwork/defradb/internal/request/graphql/parser" ) // ExecInfo contains statistics about the fetcher execution. @@ -77,633 +67,23 @@ type Fetcher interface { Close() error } -// keyValue is a KV store response containing the resulting core.Key and byte array value. -type keyValue struct { - Key keys.DataStoreKey - Value []byte -} - -var ( - _ Fetcher = (*DocumentFetcher)(nil) -) - -// DocumentFetcher is a utility to incrementally fetch all the documents. -type DocumentFetcher struct { - identity immutable.Option[acpIdentity.Identity] - acp immutable.Option[acp.ACP] - passedPermissionCheck bool // have valid permission to access - - col client.Collection - deletedDocs bool - - txn datastore.Txn - prefixes []keys.DataStoreKey - order []dsq.Order - curPrefixIndex int - - filter *mapper.Filter - ranFilter bool // did we run the filter - passedFilter bool // did we pass the filter - - filterFields map[uint32]client.FieldDefinition - selectFields map[uint32]client.FieldDefinition - - // static bitset to which stores the IDs of fields - // needed for filtering. - // - // This is compared against the encdoc.filterSet which - // is a dynamic bitset, that gets updated as fields are - // added to the encdoc, and cleared on reset. +// fetcher fetches documents from the store, performing low-level filtering +// when appropriate (e.g. ACP). +type fetcher interface { + // NextDoc progresses the internal iterator(s) to the next document, yielding + // its docID if found. // - // We compare the two bitsets to determine if we've collected - // all the necessary fields to run the filter. - // - // This is *much* more effecient for comparison then most (any?) - // other approach. - // - // When proper seek() is added, this will also be responsible - // for effectiently finding the next field to seek to. - filterSet *bitset.BitSet - - doc *encodedDocument - mapping *core.DocumentMapping - - initialized bool - - kv *keyValue - kvIter iterable.Iterator - kvResultsIter dsq.Results - kvEnd bool - isReadingDocument bool - - // Since deleted documents are stored under a different instance type than active documents, - // we use a parallel fetcher to be able to return the documents in the expected order. - // That being lexicographically ordered docIDs. - deletedDocFetcher *DocumentFetcher - - execInfo ExecInfo -} - -// Init implements DocumentFetcher. -func (df *DocumentFetcher) Init( - ctx context.Context, - identity immutable.Option[acpIdentity.Identity], - txn datastore.Txn, - acp immutable.Option[acp.ACP], - col client.Collection, - fields []client.FieldDefinition, - filter *mapper.Filter, - docmapper *core.DocumentMapping, - showDeleted bool, -) error { - df.txn = txn - - err := df.init(identity, acp, col, fields, filter, docmapper) - if err != nil { - return err - } - - if showDeleted { - if df.deletedDocFetcher == nil { - df.deletedDocFetcher = new(DocumentFetcher) - df.deletedDocFetcher.txn = txn - } - return df.deletedDocFetcher.init(identity, acp, col, fields, filter, docmapper) - } - - return nil -} - -func (df *DocumentFetcher) init( - identity immutable.Option[acpIdentity.Identity], - acp immutable.Option[acp.ACP], - col client.Collection, - fields []client.FieldDefinition, - filter *mapper.Filter, - docMapper *core.DocumentMapping, -) error { - df.identity = identity - df.acp = acp - df.col = col - df.initialized = true - df.filter = filter - df.isReadingDocument = false - df.doc = new(encodedDocument) - df.mapping = docMapper - - if df.filter != nil && docMapper == nil { - return ErrMissingMapper - } - - if df.kvResultsIter != nil { - if err := df.kvResultsIter.Close(); err != nil { - return err - } - } - df.kvResultsIter = nil - if df.kvIter != nil { - if err := df.kvIter.Close(); err != nil { - return err - } - } - df.kvIter = nil + // If None is returned, iteration is complete and there are no more documents left + // to fetch. + NextDoc() (immutable.Option[string], error) - df.selectFields = make(map[uint32]client.FieldDefinition, len(fields)) - // if we haven't been told to get specific fields - // get them all - var targetFields []client.FieldDefinition - if len(fields) == 0 { - targetFields = df.col.Definition().GetFields() - } else { - targetFields = fields - } - - for _, field := range targetFields { - df.selectFields[uint32(field.ID)] = field - } - - if df.filter != nil { - conditions := df.filter.ToMap(df.mapping) - parsedfilterFields, err := parser.ParseFilterFieldsForDescription(conditions, df.col.Definition()) - if err != nil { - return err - } - df.filterFields = make(map[uint32]client.FieldDefinition, len(parsedfilterFields)) - df.filterSet = bitset.New(uint(len(col.Schema().Fields))) - for _, field := range parsedfilterFields { - df.filterFields[uint32(field.ID)] = field - df.filterSet.Set(uint(field.ID)) - } - } - - return nil -} - -func (df *DocumentFetcher) Start(ctx context.Context, prefixes ...keys.Walkable) error { - err := df.start(ctx, prefixes, false) - if err != nil { - return err - } - - if df.deletedDocFetcher != nil { - return df.deletedDocFetcher.start(ctx, prefixes, true) - } - - return nil -} - -// Start implements DocumentFetcher. -func (df *DocumentFetcher) start(ctx context.Context, prefixes []keys.Walkable, withDeleted bool) error { - if df.col == nil { - return client.NewErrUninitializeProperty("DocumentFetcher", "CollectionDescription") - } - if df.doc == nil { - return client.NewErrUninitializeProperty("DocumentFetcher", "Document") - } - - df.deletedDocs = withDeleted - - if len(prefixes) == 0 { // no specified prefixes so create a prefix scan key for the entire collection - prefix := base.MakeDataStoreKeyWithCollectionDescription(df.col.Description()) - if withDeleted { - prefix = prefix.WithDeletedFlag() - } else { - prefix = prefix.WithValueFlag() - } - df.prefixes = []keys.DataStoreKey{prefix} - } else { - valuePrefixes := make([]keys.DataStoreKey, len(prefixes)) - prefixCache := make(map[string]struct{}) - for i, prefix := range prefixes { - // if we have a duplicate prefix, skip it - if _, exists := prefixCache[prefix.ToString()]; exists { - continue - } - prefixCache[prefix.ToString()] = struct{}{} - if withDeleted { - // DocumentFetcher only ever recieves document keys - //nolint:forcetypeassert - valuePrefixes[i] = prefix.(keys.DataStoreKey).WithDeletedFlag() - } else { - // DocumentFetcher only ever recieves document keys - //nolint:forcetypeassert - valuePrefixes[i] = prefix.(keys.DataStoreKey).WithValueFlag() - } - } - - slices.SortFunc(valuePrefixes, func(a, b keys.DataStoreKey) int { - return strings.Compare(a.ToString(), b.ToString()) - }) - - df.prefixes = valuePrefixes - } - df.curPrefixIndex = -1 - df.order = []dsq.Order{dsq.OrderByKey{}} - - _, err := df.startNextPrefix(ctx) - return err -} - -func (df *DocumentFetcher) startNextPrefix(ctx context.Context) (bool, error) { - nextPrefixIndex := df.curPrefixIndex + 1 - if nextPrefixIndex >= len(df.prefixes) { - return false, nil - } - - var err error - if df.kvIter == nil { - df.kvIter, err = df.txn.Datastore().GetIterator(dsq.Query{ - Orders: df.order, - }) - } - if err != nil { - return false, err - } - - if df.kvResultsIter != nil { - err = df.kvResultsIter.Close() - if err != nil { - return false, err - } - } - - prefix := df.prefixes[nextPrefixIndex] - df.kvResultsIter, err = df.kvIter.IteratePrefix(ctx, prefix.ToDS(), prefix.PrefixEnd().ToDS()) - if err != nil { - return false, err - } - df.curPrefixIndex = nextPrefixIndex - - _, _, err = df.nextKey(ctx, false) - return err == nil, err -} - -// nextKey gets the next kv. It sets both kv and kvEnd internally. -// It returns true if the current doc is completed. -// The first call to nextKey CANNOT have seekNext be true (ErrFailedToSeek) -func (df *DocumentFetcher) nextKey(ctx context.Context, seekNext bool) (prefixDone bool, docDone bool, err error) { - // safety against seekNext on first call - if seekNext && df.kv == nil { - return false, false, ErrFailedToSeek - } - - if seekNext { - curKey := df.kv.Key - curKey.FieldID = "" // clear field so prefixEnd applies to docID - seekKey := curKey.PrefixEnd().ToString() - prefixDone, df.kv, err = df.seekKV(seekKey) - // handle any internal errors - if err != nil { - return false, false, err - } - } else { - prefixDone, df.kv, err = df.nextKV() - // handle any internal errors - if err != nil { - return false, false, err - } - } - - if df.kv != nil && (df.kv.Key.InstanceType != keys.ValueKey && df.kv.Key.InstanceType != keys.DeletedKey) { - // We can only ready value values, if we escape the collection's value keys - // then we must be done and can stop reading - prefixDone = true - } - - df.kvEnd = prefixDone - if df.kvEnd { - err = df.kvResultsIter.Close() - if err != nil { - return false, false, err - } - morePrefixes, err := df.startNextPrefix(ctx) - if err != nil { - return false, false, err - } - df.isReadingDocument = false - return !morePrefixes, true, nil - } - - // check if we've crossed document boundries - if (df.doc.id != nil && df.kv.Key.DocID != string(df.doc.id)) || seekNext { - df.isReadingDocument = false - return false, true, nil - } - return false, false, nil -} - -// nextKV is a lower-level utility compared to nextKey. The differences are as follows: -// - It directly interacts with the KVIterator. -// - Returns true if the entire iterator/prefix is exhausted -// - Returns a kv pair instead of internally updating -func (df *DocumentFetcher) nextKV() (iterDone bool, kv *keyValue, err error) { - done, dsKey, res, err := df.nextKVRaw() - if done || err != nil { - return done, nil, err - } - - kv = &keyValue{ - Key: dsKey, - Value: res.Value, - } - return false, kv, nil -} - -// seekKV will seek through results/iterator until it reaches -// the target key, or if the target key doesn't exist, the -// next smallest key that is greater than the target. -func (df *DocumentFetcher) seekKV(key string) (bool, *keyValue, error) { - // make sure the current kv is *before* the target key - switch strings.Compare(df.kv.Key.ToString(), key) { - case 0: - // equal, we should just return the kv state - return df.kvEnd, df.kv, nil - case 1: - // greater, error - return false, nil, NewErrFailedToSeek(key, nil) - } - - for { - done, dsKey, res, err := df.nextKVRaw() - if done || err != nil { - return done, nil, err - } - - switch strings.Compare(dsKey.ToString(), key) { - case -1: - // before, so lets seek again - continue - case 0, 1: - // equal or greater (first), return a formatted kv - kv := &keyValue{ - Key: dsKey, - Value: res.Value, // @todo make lazy - } - return false, kv, nil - } - } -} - -// nextKV is a lower-level utility compared to nextKey. The differences are as follows: -// - It directly interacts with the KVIterator. -// - Returns true if the entire iterator/prefix is exhausted -// - Returns a kv pair instead of internally updating -func (df *DocumentFetcher) nextKVRaw() (bool, keys.DataStoreKey, dsq.Result, error) { - res, available := df.kvResultsIter.NextSync() - if !available { - return true, keys.DataStoreKey{}, res, nil - } - err := res.Error - if err != nil { - return true, keys.DataStoreKey{}, res, err - } - - dsKey, err := keys.NewDataStoreKey(res.Key) - if err != nil { - return true, keys.DataStoreKey{}, res, err - } - - return false, dsKey, res, nil -} - -// processKV continuously processes the key value pairs we've received -// and step by step constructs the current encoded document -func (df *DocumentFetcher) processKV(kv *keyValue) error { - // skip MerkleCRDT meta-data priority key-value pair - // implement here <-- - // instance := kv.Key.Name() - // if instance != "v" { - // return nil - // } - if df.doc == nil { - return client.NewErrUninitializeProperty("DocumentFetcher", "Document") - } - - if !df.isReadingDocument { - df.isReadingDocument = true - df.doc.Reset() - - // re-init doc state - if df.filterSet != nil { - df.doc.filterSet = bitset.New(df.filterSet.Len()) - if df.filterSet.Test(0) { - df.doc.filterSet.Set(0) // mark docID as set - } - } - df.doc.id = []byte(kv.Key.DocID) - df.passedPermissionCheck = false - df.passedFilter = false - df.ranFilter = false - - if df.deletedDocs { - df.doc.status = client.Deleted - } else { - df.doc.status = client.Active - } - } - - if kv.Key.FieldID == keys.DATASTORE_DOC_VERSION_FIELD_ID { - df.doc.schemaVersionID = string(kv.Value) - return nil - } - - // we have to skip the object marker - if bytes.Equal(df.kv.Value, []byte{base.ObjectMarker}) { - return nil - } - - // extract the FieldID and update the encoded doc properties map - fieldID, err := kv.Key.FieldIDAsUint() - if err != nil { - return err - } - fieldDesc, exists := df.selectFields[fieldID] - if !exists { - fieldDesc, exists = df.filterFields[fieldID] - if !exists { - return nil // if we can't find this field in our sets, just ignore it - } - } - - ufid := uint(fieldID) - - property := &encProperty{ - Desc: fieldDesc, - Raw: kv.Value, - } - - if df.filterSet != nil && df.filterSet.Test(ufid) { - df.doc.filterSet.Set(ufid) - property.IsFilter = true - } - - df.execInfo.FieldsFetched++ - - df.doc.properties[fieldDesc] = property - - return nil -} - -// FetchNext returns a raw binary encoded document. It iterates over all the relevant -// keypairs from the underlying store and constructs the document. -func (df *DocumentFetcher) FetchNext(ctx context.Context) (EncodedDocument, ExecInfo, error) { - var resultExecInfo ExecInfo - - // If the deletedDocFetcher isn't nil, this means that the user requested to include the deleted documents - // in the query. To keep the active and deleted docs in lexicographic order of docIDs, we use the two distinct - // fetchers and fetch the one that has the next lowest (or highest if requested in reverse order) docID value. - ddf := df.deletedDocFetcher - if ddf != nil { - // If we've reached the end of the deleted docs, we can skip to getting the next active docs. - if !ddf.kvEnd { - if df.kvEnd || ddf.kv.Key.DocID < df.kv.Key.DocID { - encdoc, execInfo, err := ddf.FetchNext(ctx) - - if err != nil { - return nil, ExecInfo{}, err - } - - resultExecInfo.Add(execInfo) - if encdoc != nil { - return encdoc, resultExecInfo, nil - } - } - } - } - - encdoc, execInfo, err := df.fetchNext(ctx) - - if err != nil { - return nil, ExecInfo{}, err - } - - resultExecInfo.Add(execInfo) - return encdoc, resultExecInfo, err -} - -func (df *DocumentFetcher) fetchNext(ctx context.Context) (EncodedDocument, ExecInfo, error) { - if df.kvEnd { - return nil, ExecInfo{}, nil - } - - if df.kv == nil { - return nil, ExecInfo{}, client.NewErrUninitializeProperty("DocumentFetcher", "kv") - } - - prevExecInfo := df.execInfo - defer func() { df.execInfo.Add(prevExecInfo) }() - df.execInfo.Reset() - // iterate until we have collected all the necessary kv pairs for the doc - // we'll know when were done when either - // A) Reach the end of the iterator - for { - if err := df.processKV(df.kv); err != nil { - return nil, ExecInfo{}, err - } - - if df.filter != nil { - // only run filter if we've collected all the fields - // required for filtering. This is tracked by the bitsets. - if df.filterSet.Equal(df.doc.filterSet) { - filterDoc, err := DecodeToDoc(df.doc, df.mapping, true) - if err != nil { - return nil, ExecInfo{}, err - } - - df.ranFilter = true - df.passedFilter, err = mapper.RunFilter(filterDoc, df.filter) - if err != nil { - return nil, ExecInfo{}, err - } - } - } - - // Check if we have read access, for document on this collection, with the given identity. - if !df.passedPermissionCheck { - if !df.acp.HasValue() { - // If no acp is available, then we have unrestricted access. - df.passedPermissionCheck = true - } else { - hasPermission, err := permission.CheckAccessOfDocOnCollectionWithACP( - ctx, - df.identity, - df.acp.Value(), - df.col, - acp.ReadPermission, - df.kv.Key.DocID, - ) - - if err != nil { - df.passedPermissionCheck = false - return nil, ExecInfo{}, err - } - - df.passedPermissionCheck = hasPermission - } - } - - // if we don't pass the filter (ran and pass) or if we don't have access to document then - // there is no point in collecting other select fields, so we seek to the next doc. - prefixsDone, docDone, err := df.nextKey(ctx, !df.passedPermissionCheck || !df.passedFilter && df.ranFilter) - - if err != nil { - return nil, ExecInfo{}, err - } - - if !docDone { - continue - } - - df.execInfo.DocsFetched++ - - if df.passedPermissionCheck { - if df.filter != nil { - // if we passed, return - if df.passedFilter { - return df.doc, df.execInfo, nil - } else if !df.ranFilter { // if we didn't run, run it - decodedDoc, err := DecodeToDoc(df.doc, df.mapping, false) - if err != nil { - return nil, ExecInfo{}, err - } - df.passedFilter, err = mapper.RunFilter(decodedDoc, df.filter) - if err != nil { - return nil, ExecInfo{}, err - } - if df.passedFilter { - return df.doc, df.execInfo, nil - } - } - } else { - return df.doc, df.execInfo, nil - } - } - - if prefixsDone { - return nil, df.execInfo, nil - } - } -} - -// Close closes the DocumentFetcher. -func (df *DocumentFetcher) Close() error { - if df.kvIter != nil { - err := df.kvIter.Close() - if err != nil { - return err - } - } - - if df.kvResultsIter != nil { - err := df.kvResultsIter.Close() - if err != nil { - return err - } - } - - if df.deletedDocFetcher != nil { - return df.deletedDocFetcher.Close() - } + // GetFields returns the EncodedDocument for the last docID yielded from [NextDoc()]. + // + // If the field values for that document do not pass all filters, None will be returned - + // this does not indicate that iteration has been completed, new documents may still be yielded + // by [NextDoc()]. + GetFields() (immutable.Option[EncodedDocument], error) - return nil + // Close disposes of all resources used by this fetcher and its children. + Close() error } diff --git a/internal/db/fetcher/filtered.go b/internal/db/fetcher/filtered.go new file mode 100644 index 0000000000..511e93ca14 --- /dev/null +++ b/internal/db/fetcher/filtered.go @@ -0,0 +1,76 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package fetcher + +import ( + "github.com/sourcenetwork/immutable" + + "github.com/sourcenetwork/defradb/internal/core" + "github.com/sourcenetwork/defradb/internal/planner/mapper" +) + +// filtered fetcher is responsible for the filtering documents based on the provided +// conditions. +type filtered struct { + filter *mapper.Filter + mapping *core.DocumentMapping + + fetcher fetcher +} + +var _ fetcher = (*filtered)(nil) + +func newFilteredFetcher( + filter *mapper.Filter, + mapping *core.DocumentMapping, + fetcher fetcher, +) *filtered { + return &filtered{ + filter: filter, + mapping: mapping, + fetcher: fetcher, + } +} + +func (f *filtered) NextDoc() (immutable.Option[string], error) { + return f.fetcher.NextDoc() +} + +func (f *filtered) GetFields() (immutable.Option[EncodedDocument], error) { + doc, err := f.fetcher.GetFields() + if err != nil { + return immutable.None[EncodedDocument](), err + } + + if !doc.HasValue() { + return immutable.None[EncodedDocument](), nil + } + + decodedDoc, err := DecodeToDoc(doc.Value(), f.mapping, false) + if err != nil { + return immutable.None[EncodedDocument](), err + } + + passedFilter, err := mapper.RunFilter(decodedDoc, f.filter) + if err != nil { + return immutable.None[EncodedDocument](), err + } + + if !passedFilter { + return immutable.None[EncodedDocument](), nil + } + + return doc, nil +} + +func (f *filtered) Close() error { + return f.fetcher.Close() +} diff --git a/internal/db/fetcher/permissioned.go b/internal/db/fetcher/permissioned.go new file mode 100644 index 0000000000..16165e45db --- /dev/null +++ b/internal/db/fetcher/permissioned.go @@ -0,0 +1,88 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package fetcher + +import ( + "context" + + "github.com/sourcenetwork/immutable" + + "github.com/sourcenetwork/defradb/acp" + acpIdentity "github.com/sourcenetwork/defradb/acp/identity" + "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/defradb/internal/db/permission" +) + +// permissioned fetcher applies access control based filtering to documents fetched. +type permissioned struct { + ctx context.Context + + identity immutable.Option[acpIdentity.Identity] + acp acp.ACP + col client.Collection + + fetcher fetcher +} + +var _ fetcher = (*permissioned)(nil) + +func newPermissionedFetcher( + ctx context.Context, + identity immutable.Option[acpIdentity.Identity], + acp acp.ACP, + col client.Collection, + fetcher fetcher, +) *permissioned { + return &permissioned{ + ctx: ctx, + identity: identity, + acp: acp, + col: col, + fetcher: fetcher, + } +} + +func (f *permissioned) NextDoc() (immutable.Option[string], error) { + docID, err := f.fetcher.NextDoc() + if err != nil { + return immutable.None[string](), err + } + + if !docID.HasValue() { + return immutable.None[string](), nil + } + + hasPermission, err := permission.CheckAccessOfDocOnCollectionWithACP( + f.ctx, + f.identity, + f.acp, + f.col, + acp.ReadPermission, + docID.Value(), + ) + if err != nil { + return immutable.None[string](), err + } + + if !hasPermission { + return f.NextDoc() + } + + return docID, nil +} + +func (f *permissioned) GetFields() (immutable.Option[EncodedDocument], error) { + return f.fetcher.GetFields() +} + +func (f *permissioned) Close() error { + return f.fetcher.Close() +} diff --git a/internal/db/fetcher/prefix.go b/internal/db/fetcher/prefix.go new file mode 100644 index 0000000000..d494384670 --- /dev/null +++ b/internal/db/fetcher/prefix.go @@ -0,0 +1,142 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package fetcher + +import ( + "context" + "slices" + "strings" + + dsq "github.com/ipfs/go-datastore/query" + "github.com/sourcenetwork/immutable" + + "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/defradb/datastore" + "github.com/sourcenetwork/defradb/datastore/iterable" + "github.com/sourcenetwork/defradb/internal/keys" +) + +// prefix is a fetcher type responsible for iterating through multiple prefixes. +// +// It manages the document fetcher instances that will do the actual scanning. +type prefix struct { + // The prefixes that this prefix fetcher must fetch from. + prefixes []keys.DataStoreKey + // The Iterator this prefix fetcher will use to scan. + kvIter iterable.Iterator + + // The index of the current prefix being fetched. + currentPrefix int + // The child document fetcher, specific to the current prefix. + fetcher *document + + // The below properties are only held here in order to pass them on to the next + // child fetcher instance. + ctx context.Context + fieldsByID map[uint32]client.FieldDefinition + status client.DocumentStatus + execInfo *ExecInfo +} + +var _ fetcher = (*prefix)(nil) + +func newPrefixFetcher( + ctx context.Context, + txn datastore.Txn, + prefixes []keys.DataStoreKey, + col client.Collection, + fieldsByID map[uint32]client.FieldDefinition, + status client.DocumentStatus, + execInfo *ExecInfo, +) (*prefix, error) { + kvIter, err := txn.Datastore().GetIterator(dsq.Query{}) + if err != nil { + return nil, err + } + + if len(prefixes) == 0 { + // If no prefixes are provided, scan the entire collection. + prefixes = append(prefixes, keys.DataStoreKey{ + CollectionRootID: col.Description().RootID, + }) + } else { + uniquePrefixes := make(map[keys.DataStoreKey]struct{}, len(prefixes)) + for _, prefix := range prefixes { + // Deduplicate the prefixes to make sure that any given document is only yielded + // once. + uniquePrefixes[prefix] = struct{}{} + } + + prefixes = make([]keys.DataStoreKey, 0, len(uniquePrefixes)) + for prefix := range uniquePrefixes { + prefixes = append(prefixes, prefix) + } + + // Sort the prefixes, so that documents are returned in the order they would be if the + // whole store was scanned. + slices.SortFunc(prefixes, func(a, b keys.DataStoreKey) int { + return strings.Compare(a.ToString(), b.ToString()) + }) + } + + fetcher, err := newDocumentFetcher(ctx, fieldsByID, kvIter, prefixes[0], status, execInfo) + if err != nil { + return nil, err + } + + return &prefix{ + kvIter: kvIter, + prefixes: prefixes, + ctx: ctx, + fieldsByID: fieldsByID, + status: status, + fetcher: fetcher, + execInfo: execInfo, + }, nil +} + +func (f *prefix) NextDoc() (immutable.Option[string], error) { + docID, err := f.fetcher.NextDoc() + if err != nil { + return immutable.None[string](), err + } + + if !docID.HasValue() { + f.currentPrefix++ + if f.fetcher != nil { + err := f.fetcher.Close() + if err != nil { + return immutable.None[string](), err + } + } + + if len(f.prefixes) > f.currentPrefix { + prefix := f.prefixes[f.currentPrefix] + + f.fetcher, err = newDocumentFetcher(f.ctx, f.fieldsByID, f.kvIter, prefix, f.status, f.execInfo) + if err != nil { + return immutable.None[string](), err + } + + return f.NextDoc() + } + } + + return docID, nil +} + +func (f *prefix) GetFields() (immutable.Option[EncodedDocument], error) { + return f.fetcher.GetFields() +} + +func (f *prefix) Close() error { + return f.kvIter.Close() +} diff --git a/internal/db/fetcher/versioned.go b/internal/db/fetcher/versioned.go index 6abb42aab2..de829514ba 100644 --- a/internal/db/fetcher/versioned.go +++ b/internal/db/fetcher/versioned.go @@ -80,7 +80,7 @@ var ( // within a new fetcher? type VersionedFetcher struct { // embed the regular doc fetcher - *DocumentFetcher + Fetcher txn datastore.Txn ctx context.Context @@ -130,8 +130,8 @@ func (vf *VersionedFetcher) Init( } // run the DF init, VersionedFetchers only supports the Primary (0) index - vf.DocumentFetcher = new(DocumentFetcher) - return vf.DocumentFetcher.Init( + vf.Fetcher = NewDocumentFetcher() + return vf.Fetcher.Init( ctx, identity, vf.store, @@ -156,7 +156,7 @@ func (vf *VersionedFetcher) Start(ctx context.Context, prefixes ...keys.Walkable return NewErrFailedToSeek(prefix.Cid, err) } - return vf.DocumentFetcher.Start(ctx) + return vf.Fetcher.Start(ctx) } // Start a fetcher with the needed info (cid embedded in a prefix) @@ -180,7 +180,7 @@ func (vf *VersionedFetcher) SeekTo(ctx context.Context, c cid.Cid) error { return err } - return vf.DocumentFetcher.Start(ctx) + return vf.Fetcher.Start(ctx) } // seekTo seeks to the given CID version by stepping through the CRDT state graph from the beginning @@ -389,5 +389,5 @@ func (vf *VersionedFetcher) Close() error { return err } - return vf.DocumentFetcher.Close() + return vf.Fetcher.Close() } diff --git a/internal/db/fetcher/wrapper.go b/internal/db/fetcher/wrapper.go new file mode 100644 index 0000000000..1272b303d5 --- /dev/null +++ b/internal/db/fetcher/wrapper.go @@ -0,0 +1,183 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package fetcher + +import ( + "context" + + "github.com/sourcenetwork/immutable" + + "github.com/sourcenetwork/defradb/acp" + acpIdentity "github.com/sourcenetwork/defradb/acp/identity" + "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/defradb/datastore" + "github.com/sourcenetwork/defradb/internal/core" + "github.com/sourcenetwork/defradb/internal/keys" + "github.com/sourcenetwork/defradb/internal/planner/mapper" + "github.com/sourcenetwork/defradb/internal/request/graphql/parser" +) + +// wrapper is a fetcher type that bridges between the existing [Fetcher] interface +// and the newer [fetcher] interface. +type wrapper struct { + fetcher fetcher + execInfo *ExecInfo + + // The below properties are only held in state in order to temporarily adhear to the [Fetcher] + // interface. They can be remove from state once the [Fetcher] interface is cleaned up. + identity immutable.Option[acpIdentity.Identity] + txn datastore.Txn + acp immutable.Option[acp.ACP] + col client.Collection + fields []client.FieldDefinition + filter *mapper.Filter + docMapper *core.DocumentMapping + showDeleted bool +} + +var _ Fetcher = (*wrapper)(nil) + +func NewDocumentFetcher() Fetcher { + return &wrapper{} +} + +func (f *wrapper) Init( + ctx context.Context, + identity immutable.Option[acpIdentity.Identity], + txn datastore.Txn, + acp immutable.Option[acp.ACP], + col client.Collection, + fields []client.FieldDefinition, + filter *mapper.Filter, + docMapper *core.DocumentMapping, + showDeleted bool, +) error { + f.identity = identity + f.txn = txn + f.acp = acp + f.col = col + f.fields = fields + f.filter = filter + f.docMapper = docMapper + f.showDeleted = showDeleted + + return nil +} + +func (f *wrapper) Start(ctx context.Context, prefixes ...keys.Walkable) error { + err := f.Close() + if err != nil { + return err + } + + dsPrefixes := make([]keys.DataStoreKey, 0, len(prefixes)) + for _, prefix := range prefixes { + dsPrefix, ok := prefix.(keys.DataStoreKey) + if !ok { + continue + } + + dsPrefixes = append(dsPrefixes, dsPrefix) + } + + if f.filter != nil && len(f.fields) > 0 { + conditions := f.filter.ToMap(f.docMapper) + parsedfilterFields, err := parser.ParseFilterFieldsForDescription(conditions, f.col.Definition()) + if err != nil { + return err + } + + existingFields := make(map[client.FieldID]struct{}, len(f.fields)) + for _, field := range f.fields { + existingFields[field.ID] = struct{}{} + } + + for _, field := range parsedfilterFields { + if _, ok := existingFields[field.ID]; !ok { + f.fields = append(f.fields, field) + } + existingFields[field.ID] = struct{}{} + } + } + + if len(f.fields) == 0 { + f.fields = f.col.Definition().GetFields() + } + + fieldsByID := make(map[uint32]client.FieldDefinition, len(f.fields)) + for _, field := range f.fields { + fieldsByID[uint32(field.ID)] = field + } + + var execInfo ExecInfo + f.execInfo = &execInfo + + var fetcher fetcher + fetcher, err = newPrefixFetcher(ctx, f.txn, dsPrefixes, f.col, fieldsByID, client.Active, &execInfo) + if err != nil { + return nil + } + + if f.showDeleted { + deletedFetcher, err := newPrefixFetcher(ctx, f.txn, dsPrefixes, f.col, fieldsByID, client.Deleted, &execInfo) + if err != nil { + return nil + } + + fetcher = newDeletedFetcher(fetcher, deletedFetcher) + } + + if f.acp.HasValue() { + fetcher = newPermissionedFetcher(ctx, f.identity, f.acp.Value(), f.col, fetcher) + } + + if f.filter != nil { + fetcher = newFilteredFetcher(f.filter, f.docMapper, fetcher) + } + + f.fetcher = fetcher + return nil +} + +func (f *wrapper) FetchNext(ctx context.Context) (EncodedDocument, ExecInfo, error) { + docID, err := f.fetcher.NextDoc() + if err != nil { + return nil, ExecInfo{}, err + } + + if !docID.HasValue() { + execInfo := *f.execInfo + f.execInfo.Reset() + + return nil, execInfo, nil + } + + doc, err := f.fetcher.GetFields() + if err != nil { + return nil, ExecInfo{}, err + } + + if !doc.HasValue() { + return f.FetchNext(ctx) + } + + execInfo := *f.execInfo + f.execInfo.Reset() + + return doc.Value(), execInfo, nil +} + +func (f *wrapper) Close() error { + if f.fetcher != nil { + return f.fetcher.Close() + } + return nil +} diff --git a/internal/db/fetcher_test.go b/internal/db/fetcher_test.go deleted file mode 100644 index 48c159d3c0..0000000000 --- a/internal/db/fetcher_test.go +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright 2022 Democratized Data Foundation -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package db - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/sourcenetwork/defradb/internal/db/fetcher" -) - -func TestFetcherStartWithoutInit(t *testing.T) { - ctx := context.Background() - df := new(fetcher.DocumentFetcher) - err := df.Start(ctx) - assert.Error(t, err) -} diff --git a/internal/planner/scan.go b/internal/planner/scan.go index 4f577519a3..4e03e22fee 100644 --- a/internal/planner/scan.go +++ b/internal/planner/scan.go @@ -160,7 +160,7 @@ func (scan *scanNode) initFetcher( if cid.HasValue() { f = new(fetcher.VersionedFetcher) } else { - f = new(fetcher.DocumentFetcher) + f = fetcher.NewDocumentFetcher() if index.HasValue() { fieldsToMove := make([]mapper.Field, 0, len(index.Value().Fields)) diff --git a/tests/integration/explain/execute/update_test.go b/tests/integration/explain/execute/update_test.go index 4a525b2cd5..50cddaf4d2 100644 --- a/tests/integration/explain/execute/update_test.go +++ b/tests/integration/explain/execute/update_test.go @@ -119,7 +119,7 @@ func TestExecuteExplainMutationRequestWithUpdateUsingFilter(t *testing.T) { "scanNode": dataMap{ "iterations": uint64(4), "docFetches": uint64(4), - "fieldFetches": uint64(6), + "fieldFetches": uint64(8), "indexFetches": uint64(0), }, }, diff --git a/tests/integration/explain/execute/upsert_test.go b/tests/integration/explain/execute/upsert_test.go index 65b2abeed1..e492a932ba 100644 --- a/tests/integration/explain/execute/upsert_test.go +++ b/tests/integration/explain/execute/upsert_test.go @@ -55,7 +55,7 @@ func TestExecuteExplainMutationRequest_WithUpsertAndMatchingFilter_Succeeds(t *t "scanNode": dataMap{ "iterations": uint64(4), "docFetches": uint64(4), - "fieldFetches": uint64(6), + "fieldFetches": uint64(8), "indexFetches": uint64(0), }, }, diff --git a/tests/integration/explain/execute/with_limit_test.go b/tests/integration/explain/execute/with_limit_test.go index 6a930067df..ff2ca0b437 100644 --- a/tests/integration/explain/execute/with_limit_test.go +++ b/tests/integration/explain/execute/with_limit_test.go @@ -117,7 +117,7 @@ func TestExecuteExplainRequestWithBothLimitAndOffsetOnParentAndLimitOnChild(t *t "subTypeScanNode": dataMap{ "iterations": uint64(2), "docFetches": uint64(3), - "fieldFetches": uint64(5), + "fieldFetches": uint64(6), "indexFetches": uint64(0), }, }, diff --git a/tests/integration/explain/execute/with_max_test.go b/tests/integration/explain/execute/with_max_test.go index 139c86e210..6fa390be7c 100644 --- a/tests/integration/explain/execute/with_max_test.go +++ b/tests/integration/explain/execute/with_max_test.go @@ -119,7 +119,7 @@ func TestExecuteExplainRequest_MaxOfRelatedOneToManyField_Succeeds(t *testing.T) "subTypeScanNode": dataMap{ "iterations": uint64(5), "docFetches": uint64(6), - "fieldFetches": uint64(9), + "fieldFetches": uint64(12), "indexFetches": uint64(0), }, }, diff --git a/tests/integration/explain/execute/with_min_test.go b/tests/integration/explain/execute/with_min_test.go index 204a055f32..4626071d32 100644 --- a/tests/integration/explain/execute/with_min_test.go +++ b/tests/integration/explain/execute/with_min_test.go @@ -119,7 +119,7 @@ func TestExecuteExplainRequest_MinOfRelatedOneToManyField_Succeeds(t *testing.T) "subTypeScanNode": dataMap{ "iterations": uint64(5), "docFetches": uint64(6), - "fieldFetches": uint64(9), + "fieldFetches": uint64(12), "indexFetches": uint64(0), }, }, diff --git a/tests/integration/explain/execute/with_order_test.go b/tests/integration/explain/execute/with_order_test.go index cc6e0c68d1..bbb2312c38 100644 --- a/tests/integration/explain/execute/with_order_test.go +++ b/tests/integration/explain/execute/with_order_test.go @@ -201,7 +201,7 @@ func TestExecuteExplainRequestWithOrderFieldOnChild(t *testing.T) { "subTypeScanNode": dataMap{ "iterations": uint64(5), "docFetches": uint64(6), - "fieldFetches": uint64(9), + "fieldFetches": uint64(12), "indexFetches": uint64(0), }, }, @@ -266,7 +266,7 @@ func TestExecuteExplainRequestWithOrderFieldOnBothParentAndChild(t *testing.T) { "subTypeScanNode": dataMap{ "iterations": uint64(5), "docFetches": uint64(6), - "fieldFetches": uint64(9), + "fieldFetches": uint64(12), "indexFetches": uint64(0), }, }, diff --git a/tests/integration/explain/execute/with_sum_test.go b/tests/integration/explain/execute/with_sum_test.go index 23dc85ce38..c7d614bf2f 100644 --- a/tests/integration/explain/execute/with_sum_test.go +++ b/tests/integration/explain/execute/with_sum_test.go @@ -119,7 +119,7 @@ func TestExecuteExplainRequestSumOfRelatedOneToManyField(t *testing.T) { "subTypeScanNode": dataMap{ "iterations": uint64(5), "docFetches": uint64(6), - "fieldFetches": uint64(9), + "fieldFetches": uint64(12), "indexFetches": uint64(0), }, }, diff --git a/tests/integration/index/query_with_composite_index_only_filter_test.go b/tests/integration/index/query_with_composite_index_only_filter_test.go index eb4871ecfc..63e704cc78 100644 --- a/tests/integration/index/query_with_composite_index_only_filter_test.go +++ b/tests/integration/index/query_with_composite_index_only_filter_test.go @@ -705,7 +705,7 @@ func TestQueryWithCompositeIndex_IfFirstFieldIsNotInFilter_ShouldNotUseIndex(t * name } }`, - Asserter: testUtils.NewExplainAsserter().WithFieldFetches(11).WithIndexFetches(0), + Asserter: testUtils.NewExplainAsserter().WithFieldFetches(20).WithIndexFetches(0), }, }, } diff --git a/tests/integration/index/query_with_relation_filter_test.go b/tests/integration/index/query_with_relation_filter_test.go index deba6946e2..fa7fb6c443 100644 --- a/tests/integration/index/query_with_relation_filter_test.go +++ b/tests/integration/index/query_with_relation_filter_test.go @@ -345,7 +345,7 @@ func TestQueryWithIndexOnOneToOnePrimaryRelation_IfFilterOnIndexedFieldOfRelatio // we make 1 index fetch to get the only address with city == "London" // then we scan all 10 users to find one with matching "address_id" // after this we fetch the name of the user - Asserter: testUtils.NewExplainAsserter().WithFieldFetches(11).WithIndexFetches(1), + Asserter: testUtils.NewExplainAsserter().WithFieldFetches(20).WithIndexFetches(1), }, testUtils.Request{ Request: req2, @@ -362,7 +362,7 @@ func TestQueryWithIndexOnOneToOnePrimaryRelation_IfFilterOnIndexedFieldOfRelatio // we make 3 index fetch to get the 3 address with city == "Montreal" // then we scan all 10 users to find one with matching "address_id" for each address // after this we fetch the name of each user - Asserter: testUtils.NewExplainAsserter().WithFieldFetches(33).WithIndexFetches(3), + Asserter: testUtils.NewExplainAsserter().WithFieldFetches(60).WithIndexFetches(3), }, }, } diff --git a/tests/integration/index/query_with_unique_composite_index_filter_test.go b/tests/integration/index/query_with_unique_composite_index_filter_test.go index 6c1222e069..9928cb684b 100644 --- a/tests/integration/index/query_with_unique_composite_index_filter_test.go +++ b/tests/integration/index/query_with_unique_composite_index_filter_test.go @@ -857,7 +857,7 @@ func TestQueryWithUniqueCompositeIndex_IfFirstFieldIsNotInFilter_ShouldNotUseInd name } }`, - Asserter: testUtils.NewExplainAsserter().WithFieldFetches(11).WithIndexFetches(0), + Asserter: testUtils.NewExplainAsserter().WithFieldFetches(20).WithIndexFetches(0), }, }, }