Skip to content

Commit

Permalink
use twoWayFetchDirector
Browse files Browse the repository at this point in the history
  • Loading branch information
islamaliev committed Sep 29, 2023
1 parent c26278a commit 8c94ca0
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 100 deletions.
4 changes: 4 additions & 0 deletions planner/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,10 @@ func (n *multiScanNode) Close() error {
return n.scanNode.Close()
}

func (n *multiScanNode) DocumentMap() *core.DocumentMapping {
return n.scanNode.DocumentMap()
}

func (n *multiScanNode) addReader() {
n.numReaders++
}
180 changes: 82 additions & 98 deletions planner/type_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,10 @@ func (n *typeIndexJoin) simpleExplain() (map[string]any, error) {
switch joinType := n.joinPlan.(type) {
case *typeJoinOne:
// Add the direction attribute.
if joinType.primary {
simpleExplainMap[joinDirectionLabel] = joinDirectionPrimaryLabel
} else {
if joinType.isSecondary {
simpleExplainMap[joinDirectionLabel] = joinDirectionSecondaryLabel
} else {
simpleExplainMap[joinDirectionLabel] = joinDirectionPrimaryLabel
}

// Add the attribute(s).
Expand Down Expand Up @@ -214,7 +214,7 @@ func (n *typeIndexJoin) Explain(explainType request.ExplainType) (map[string]any
if err != nil {
return nil, err
}
result["subTypeScan"] = subScanExplain
result["subTypeScanNode"] = subScanExplain
}
return result, nil

Expand All @@ -230,10 +230,6 @@ func (n *typeIndexJoin) Merge() bool { return true }
// where the root type is the primary in a one-to-one relation request.
type typeJoinOne struct {
twoWayFetchDirector
p *Planner

primary bool
secondaryFieldIndex immutable.Option[int]
}

func (p *Planner) makeTypeJoinOne(
Expand Down Expand Up @@ -278,16 +274,17 @@ func (p *Planner) makeTypeJoinOne(

return &typeJoinOne{
twoWayFetchDirector: twoWayFetchDirector{
docMapper: docMapper{parent.documentMapping},
root: source,
subType: selectPlan,
subSelect: subType,
rootName: subTypeField.Name,
subTypeName: subType.Name,
docMapper: docMapper{parent.documentMapping},
root: source,
subType: selectPlan,
subSelect: subType,
rootName: subTypeField.Name,
subTypeName: subType.Name,
isSecondary: !isPrimary,
secondaryFieldIndex: secondaryFieldIndex,
secondaryFetchLimit: 1,
relatedFieldName: subTypeFieldDesc.Name + request.RelatedObjectID,
},
p: p,
primary: isPrimary,
secondaryFieldIndex: secondaryFieldIndex,
}, nil
}

Expand Down Expand Up @@ -323,68 +320,8 @@ func fetchDocsWithFieldValue(plan planNode, fieldName string, val any, limit uin
return docs, nil
}

func (n *typeJoinOne) valuesSecondary(doc core.Doc) (core.Doc, error) {
fieldName := n.rootName + request.RelatedObjectID
subDocs, err := fetchDocsWithFieldValue(n.subType, fieldName, doc.GetKey(), 1)
if err != nil {
return core.Doc{}, err
}

if len(subDocs) > 0 {
doc.Fields[n.subSelect.Index] = subDocs[0]
if n.secondaryFieldIndex.HasValue() {
doc.Fields[n.secondaryFieldIndex.Value()] = subDocs[0].GetKey()
}
}

return doc, nil
}

func (n *typeJoinOne) valuesPrimary(doc core.Doc) (core.Doc, error) {
// get the subtype doc key
subDocKey := n.docMapper.documentMapping.FirstOfName(doc, n.subTypeName+request.RelatedObjectID)

subDocKeyStr, ok := subDocKey.(string)
if !ok {
return doc, nil
}

// create the collection key for the sub doc
slct := n.subType.(*selectTopNode).selectNode
desc := slct.sourceInfo.collectionDescription
subKeyIndexKey := base.MakeDocKey(desc, subDocKeyStr)

// do a point lookup with the new span (index key)
n.subType.Spans(core.NewSpans(core.NewSpan(subKeyIndexKey, subKeyIndexKey.PrefixEnd())))

// re-initialize the sub type plan
if err := n.subType.Init(); err != nil {
return doc, NewErrSubTypeInit(err)
}

// if we don't find any docs from our point span lookup
// or if we encounter an error just return the base doc,
// with an empty map for the subDoc
next, err := n.subType.Next()

if err != nil {
return doc, err
}

if !next {
return doc, nil
}

subDoc := n.subType.Value()
doc.Fields[n.subSelect.Index] = subDoc

return doc, nil
}

type typeJoinMany struct {
twoWayFetchDirector

p *Planner
}

func prepareScanNodeFilterForTypeJoin(
Expand Down Expand Up @@ -451,14 +388,15 @@ func (p *Planner) makeTypeJoinMany(

return &typeJoinMany{
twoWayFetchDirector: twoWayFetchDirector{
docMapper: docMapper{parent.documentMapping},
root: source,
subType: selectPlan,
subSelect: subType,
rootName: rootField.Name,
subTypeName: subType.Name,
docMapper: docMapper{parent.documentMapping},
root: source,
subType: selectPlan,
subSelect: subType,
rootName: rootField.Name,
isSecondary: true,
subTypeName: subType.Name,
secondaryFetchLimit: 0,
},
p: p,
}, nil
}

Expand All @@ -470,7 +408,16 @@ func fetchPrimaryDoc(node, subNode planNode, parentProp string) (bool, error) {
subDoc := subNode.Value()
ind := subNode.DocumentMap().FirstIndexOfName(parentProp)

rootDocKey := base.MakeDocKey(node.(*scanNode).desc, subDoc.Fields[ind].(string))
docKeyStr, isStr := subDoc.Fields[ind].(string)
if !isStr {
return false, nil
}

scan := getScanNode(node)
if scan == nil {
return false, nil
}
rootDocKey := base.MakeDocKey(scan.desc, docKeyStr)

spans := core.NewSpans(core.NewSpan(rootDocKey, rootDocKey.PrefixEnd()))

Expand Down Expand Up @@ -500,6 +447,11 @@ type twoWayFetchDirector struct {

subSelect *mapper.Select

isSecondary bool
secondaryFieldIndex immutable.Option[int]
secondaryFetchLimit uint
relatedFieldName string

isInverted bool
}

Expand Down Expand Up @@ -533,6 +485,7 @@ func (n *twoWayFetchDirector) Source() planNode { return n.root }

func (d *twoWayFetchDirector) invert() {
d.isInverted = !d.isInverted
d.isSecondary = !d.isSecondary
}

func (d *twoWayFetchDirector) Next() (bool, error) {
Expand All @@ -551,13 +504,35 @@ func (d *twoWayFetchDirector) fetchDefault() (bool, error) {

d.currentValue = d.root.Value()

fieldName := d.rootName + request.RelatedObjectID
subDocs, err := fetchDocsWithFieldValue(d.subType, fieldName, d.currentValue.GetKey(), 0)
if err != nil {
return false, err
if d.isSecondary {
fieldName := d.rootName + request.RelatedObjectID
subDocs, err := fetchDocsWithFieldValue(d.subType, fieldName, d.currentValue.GetKey(), d.secondaryFetchLimit)
if err != nil {
return false, err
}
if d.secondaryFetchLimit == 1 {
if len(subDocs) != 0 {
d.currentValue.Fields[d.subSelect.Index] = subDocs[0]
if d.secondaryFieldIndex.HasValue() {
d.currentValue.Fields[d.secondaryFieldIndex.Value()] = subDocs[0].GetKey()
}
}
} else {
d.currentValue.Fields[d.subSelect.Index] = subDocs
}

Check failure on line 523 in planner/type_join.go

View workflow job for this annotation

GitHub Actions / Lint job

unnecessary trailing newline (whitespace)
} else {
hasRootDoc, err := fetchPrimaryDoc(d.subType, d.root, d.relatedFieldName)
if err != nil {
return false, err
}

if hasRootDoc {
d.currentValue = d.root.Value()
d.currentValue.Fields[d.subSelect.Index] = d.subType.Value()
}
}

d.currentValue.Fields[d.subSelect.Index] = subDocs
return true, nil
}

Expand All @@ -573,23 +548,32 @@ func (d *twoWayFetchDirector) fetchInverted() (bool, error) {
return false, nil
}

hasPrimaryDoc, err := fetchPrimaryDoc(d.root, d.subType, d.rootName+request.RelatedObjectID)
subDoc := d.subType.Value()

var hasRootDoc bool
if d.isSecondary {
var docs []core.Doc
docs, err = fetchDocsWithFieldValue(d.root, d.relatedFieldName, subDoc.GetKey(), d.secondaryFetchLimit)
hasRootDoc = len(docs) > 0
} else {
hasRootDoc, err = fetchPrimaryDoc(d.root, d.subType, d.rootName+request.RelatedObjectID)
}

if err != nil {
return false, err
}

if !hasPrimaryDoc {
if !hasRootDoc {
continue
}

d.currentValue = d.root.Value()

doc := d.root.Value()
subDoc := d.subType.Value()
doc.Fields[d.subSelect.Index] = subDoc
//if n.secondaryFieldIndex.HasValue() {
//doc.Fields[n.secondaryFieldIndex.Value()] = subDoc.GetKey()
//}
d.currentValue.Fields[d.subSelect.Index] = subDoc

if d.isSecondary && d.secondaryFieldIndex.HasValue() {
d.currentValue.Fields[d.secondaryFieldIndex.Value()] = subDoc.GetKey()
}

return true, nil
}
Expand Down
51 changes: 49 additions & 2 deletions tests/integration/index/query_with_relation_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
testUtils "github.com/sourcenetwork/defradb/tests/integration"
)

func TestQueryWithIndex_IfFilterOnIndexedOneToManyRelation_ShouldFilter(t *testing.T) {
func TestQueryWithIndexOnOneToManyRelation_IfFilterOnIndexedRelation_ShouldFilter(t *testing.T) {
test := testUtils.TestCase{
Description: "Filter on indexed relation field",
Actions: []any{
Expand Down Expand Up @@ -62,7 +62,7 @@ func TestQueryWithIndex_IfFilterOnIndexedOneToManyRelation_ShouldFilter(t *testi
testUtils.ExecuteTestCase(t, test)
}

func TestQueryWithIndex_IfFilterOnIndexedOneToOneRelation_ShouldFilter(t *testing.T) {
func TestQueryWithIndexOnOneToOneRelation_IfFilterOnIndexedRelation_ShouldFilter(t *testing.T) {
test := testUtils.TestCase{
Description: "Filter on indexed relation field",
Actions: []any{
Expand Down Expand Up @@ -107,3 +107,50 @@ func TestQueryWithIndex_IfFilterOnIndexedOneToOneRelation_ShouldFilter(t *testin

testUtils.ExecuteTestCase(t, test)
}

func TestQueryWithIndexOnOneToOneSecondaryRelation_IfFilterOnIndexedRelation_ShouldFilter(t *testing.T) {
test := testUtils.TestCase{
Description: "Filter on indexed relation field",
Actions: []any{
createSchemaWithDocs(`
type User {
name: String
age: Int
address: Address @primary
}
type Address {
user: User
city: String @index
street: String
}
`),
sendRequestAndExplain(`
User(filter: {
address: {city: {_eq: "Munich"}}
}) {
name
}`,
[]map[string]any{
{"name": "Islam"},
},
NewExplainAsserter().WithDocFetches(2).WithFieldFetches(3).WithIndexFetches(1),
),
sendRequestAndExplain(`
User(filter: {
address: {city: {_eq: "Montreal"}}
}) {
name
}`,
[]map[string]any{
{"name": "John"},
{"name": "Fred"},
{"name": "Shahzad"},
},
NewExplainAsserter().WithDocFetches(14).WithFieldFetches(17).WithIndexFetches(3),
),
},
}

testUtils.ExecuteTestCase(t, test)
}

0 comments on commit 8c94ca0

Please sign in to comment.