Skip to content

Commit

Permalink
fix(i): Aggregate filter returns one result (#3316)
Browse files Browse the repository at this point in the history
## Relevant issue(s)

Resolves #3313

## Description

This PR fixes an issue where aggregate filters would only return one
result when the first result did not match.

## Tasks

- [x] I made sure the code is well commented, particularly
hard-to-understand areas.
- [x] I made sure the repository-held documentation is changed
accordingly.
- [x] I made sure the pull request title adheres to the conventional
commit style (the subset used in the project can be found in
[tools/configs/chglog/config.yml](tools/configs/chglog/config.yml)).
- [x] I made sure to discuss its limitations such as threats to
validity, vulnerability to mistake and misuse, robustness to
invalidation of assumptions, resource requirements, ...

## How has this been tested?

Added integration tests.

Specify the platform(s) on which this was tested:
- MacOS
  • Loading branch information
nasdf authored Dec 11, 2024
1 parent 648924e commit e1502c5
Show file tree
Hide file tree
Showing 10 changed files with 1,123 additions and 398 deletions.
61 changes: 35 additions & 26 deletions internal/planner/average.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,38 +75,47 @@ func (n *averageNode) Close() error { return n.plan.Close()
func (n *averageNode) Source() planNode { return n.plan }

func (n *averageNode) Next() (bool, error) {
n.execInfo.iterations++
for {
n.execInfo.iterations++

hasNext, err := n.plan.Next()
if err != nil || !hasNext {
return hasNext, err
}
hasNext, err := n.plan.Next()
if err != nil || !hasNext {
return hasNext, err
}

n.currentValue = n.plan.Value()
n.currentValue = n.plan.Value()

countProp := n.currentValue.Fields[n.countFieldIndex]
typedCount, isInt := countProp.(int)
if !isInt {
return false, client.NewErrUnexpectedType[int]("count", countProp)
}
count := typedCount
countProp := n.currentValue.Fields[n.countFieldIndex]
typedCount, isInt := countProp.(int)
if !isInt {
return false, client.NewErrUnexpectedType[int]("count", countProp)
}
count := typedCount

if count == 0 {
n.currentValue.Fields[n.virtualFieldIndex] = float64(0)
return true, nil
}
if count == 0 {
n.currentValue.Fields[n.virtualFieldIndex] = float64(0)
return true, nil
}

sumProp := n.currentValue.Fields[n.sumFieldIndex]
switch sum := sumProp.(type) {
case float64:
n.currentValue.Fields[n.virtualFieldIndex] = sum / float64(count)
case int64:
n.currentValue.Fields[n.virtualFieldIndex] = float64(sum) / float64(count)
default:
return false, client.NewErrUnhandledType("sum", sumProp)
}
sumProp := n.currentValue.Fields[n.sumFieldIndex]
switch sum := sumProp.(type) {
case float64:
n.currentValue.Fields[n.virtualFieldIndex] = sum / float64(count)
case int64:
n.currentValue.Fields[n.virtualFieldIndex] = float64(sum) / float64(count)
default:
return false, client.NewErrUnhandledType("sum", sumProp)
}

return mapper.RunFilter(n.currentValue, n.aggregateFilter)
passes, err := mapper.RunFilter(n.currentValue, n.aggregateFilter)
if err != nil {
return false, err
}
if !passes {
continue
}
return true, nil
}
}

func (n *averageNode) SetPlan(p planNode) { n.plan = p }
Expand Down
115 changes: 62 additions & 53 deletions internal/planner/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,65 +125,74 @@ func (n *countNode) Explain(explainType request.ExplainType) (map[string]any, er
}

func (n *countNode) Next() (bool, error) {
n.execInfo.iterations++
for {
n.execInfo.iterations++

hasValue, err := n.plan.Next()
if err != nil || !hasValue {
return hasValue, err
}

n.currentValue = n.plan.Value()
// Can just scan for now, can be replaced later by something fancier if needed
var count int
for _, source := range n.aggregateMapping {
property := n.currentValue.Fields[source.Index]
v := reflect.ValueOf(property)
switch v.Kind() {
// v.Len will panic if v is not one of these types, we don't want it to panic
case reflect.Array, reflect.Chan, reflect.Map, reflect.Slice, reflect.String:
if source.Filter == nil && source.Limit == nil {
count = count + v.Len()
} else {
var arrayCount int
var err error
switch array := property.(type) {
case []core.Doc:
arrayCount = countDocs(array)

case []bool:
arrayCount, err = countItems(array, source.Filter, source.Limit)

case []immutable.Option[bool]:
arrayCount, err = countItems(array, source.Filter, source.Limit)

case []int64:
arrayCount, err = countItems(array, source.Filter, source.Limit)

case []immutable.Option[int64]:
arrayCount, err = countItems(array, source.Filter, source.Limit)

case []float64:
arrayCount, err = countItems(array, source.Filter, source.Limit)

case []immutable.Option[float64]:
arrayCount, err = countItems(array, source.Filter, source.Limit)

case []string:
arrayCount, err = countItems(array, source.Filter, source.Limit)
hasValue, err := n.plan.Next()
if err != nil || !hasValue {
return hasValue, err
}

case []immutable.Option[string]:
arrayCount, err = countItems(array, source.Filter, source.Limit)
}
if err != nil {
return false, err
n.currentValue = n.plan.Value()
// Can just scan for now, can be replaced later by something fancier if needed
var count int
for _, source := range n.aggregateMapping {
property := n.currentValue.Fields[source.Index]
v := reflect.ValueOf(property)
switch v.Kind() {
// v.Len will panic if v is not one of these types, we don't want it to panic
case reflect.Array, reflect.Chan, reflect.Map, reflect.Slice, reflect.String:
if source.Filter == nil && source.Limit == nil {
count = count + v.Len()
} else {
var arrayCount int
var err error
switch array := property.(type) {
case []core.Doc:
arrayCount = countDocs(array)

case []bool:
arrayCount, err = countItems(array, source.Filter, source.Limit)

case []immutable.Option[bool]:
arrayCount, err = countItems(array, source.Filter, source.Limit)

case []int64:
arrayCount, err = countItems(array, source.Filter, source.Limit)

case []immutable.Option[int64]:
arrayCount, err = countItems(array, source.Filter, source.Limit)

case []float64:
arrayCount, err = countItems(array, source.Filter, source.Limit)

case []immutable.Option[float64]:
arrayCount, err = countItems(array, source.Filter, source.Limit)

case []string:
arrayCount, err = countItems(array, source.Filter, source.Limit)

case []immutable.Option[string]:
arrayCount, err = countItems(array, source.Filter, source.Limit)
}
if err != nil {
return false, err
}
count += arrayCount
}
count += arrayCount
}
}
}
n.currentValue.Fields[n.virtualFieldIndex] = count

n.currentValue.Fields[n.virtualFieldIndex] = count
return mapper.RunFilter(n.currentValue, n.aggregateFilter)
passes, err := mapper.RunFilter(n.currentValue, n.aggregateFilter)
if err != nil {
return false, err
}
if !passes {
continue
}
return true, nil
}
}

// countDocs counts the number of documents in a slice, skipping over hidden items
Expand Down
Loading

0 comments on commit e1502c5

Please sign in to comment.