Skip to content

Commit

Permalink
feat: add ReduceConcurrent
Browse files Browse the repository at this point in the history
  • Loading branch information
duke-git committed Aug 15, 2024
1 parent 3058479 commit c0b200f
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 4 deletions.
41 changes: 37 additions & 4 deletions docs/api/packages/slice.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ import (
- [Merge](#Merge)
- [Reverse](#Reverse)
- [Reduce<sup>deprecated</sup>](#Reduce)
- [ReduceConcurrent](#ReduceConcurrent)
- [ReduceBy](#ReduceBy)
- [ReduceRight](#ReduceRight)
- [Replace](#Replace)
Expand Down Expand Up @@ -1578,15 +1579,15 @@ import (

func main() {
nums := []int{1, 2, 3, 4, 5, 6}
result := slice.MapConcurrent(nums, func(_, n int) int {
return n * n
}, 4)

fmt.Println(result)
fmt.Println(result)

// Output:
// [1 4 9 16 25 36]
// Output:
// [1 4 9 16 25 36]
}
```

Expand Down Expand Up @@ -1759,6 +1760,38 @@ func main() {
}
```

### <span id="ReduceConcurrent">ReduceConcurrent</span>

<p>对切片元素执行并发reduce操作。</p>

<b>函数签名:</b>

```go
func ReduceConcurrent[T any](slice []T, initial T, reducer func(index int, item T, agg T) T, numThreads int) T
```

<b>示例:<span style="float:right;display:inline-block;">[运行]()</span></b>

```go
import (
"fmt"
"github.com/duke-git/lancet/v2/slice"
)

func main() {
nums := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}

result := slice.ReduceConcurrent(nums, 0, func(_ int, item, agg int) int {
return agg + item
}, 1)

fmt.Println(result)

// Output:
// 55
}
```

### <span id="ReduceBy">ReduceBy</span>

<p>对切片元素执行reduce操作。</p>
Expand Down
34 changes: 34 additions & 0 deletions docs/en/api/packages/slice.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ import (
- [Merge](#Merge)
- [Reverse](#Reverse)
- [Reduce<sup>deprecated</sup>](#Reduce)
- [ReduceConcurrent](#ReduceConcurrent)
- [ReduceBy](#ReduceBy)
- [ReduceRight](#ReduceRight)
- [Replace](#Replace)
Expand Down Expand Up @@ -1754,6 +1755,39 @@ func main() {
}
```

### <span id="ReduceConcurrent">ReduceConcurrent</span>

<p>Reduces the slice to a single value by applying the reducer function to each item in the slice concurrently.</p>

<b>Signature:</b>

```go
func ReduceConcurrent[T any](slice []T, initial T, reducer func(index int, item T, agg T) T, numThreads int) T
```

<b>Example:<span style="float:right;display:inline-block;">[运行]()</span></b>

```go
import (
"fmt"
"github.com/duke-git/lancet/v2/slice"
)

func main() {
nums := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}

result := slice.ReduceConcurrent(nums, 0, func(_ int, item, agg int) int {
return agg + item
}, 1)

fmt.Println(result)

// Output:
// 55
}
```


### <span id="ReduceBy">ReduceBy</span>

<p>Produces a value from slice by accumulating the result of each element as passed through the reducer function.</p>
Expand Down
44 changes: 44 additions & 0 deletions slice/slice_concurrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,50 @@ func MapConcurrent[T any, U any](slice []T, iteratee func(index int, item T) U,
return result
}

// ReduceConcurrent reduces the slice to a single value by applying the reducer function to each item in the slice concurrently.
// Play: todo
func ReduceConcurrent[T any](slice []T, initial T, reducer func(index int, item T, agg T) T, numThreads int) T {
if numThreads <= 0 {
numThreads = 1
}

var wg sync.WaitGroup
var mu sync.Mutex

sliceLen := len(slice)
chunkSize := (sliceLen + numThreads - 1) / numThreads
results := make([]T, numThreads)

for i := 0; i < numThreads; i++ {
start := i * chunkSize
end := start + chunkSize
if end > sliceLen {
end = sliceLen
}

wg.Add(1)
go func(i, start, end int) {
defer wg.Done()
tempResult := initial
for j := start; j < end; j++ {
tempResult = reducer(j, slice[j], tempResult)
}
mu.Lock()
results[i] = tempResult
mu.Unlock()
}(i, start, end)
}

wg.Wait()

result := initial
for i, r := range results {
result = reducer(i, result, r)
}

return result
}

// FilterConcurrent applies the provided filter function `predicate` to each element of the input slice concurrently.
// Play: todo
func FilterConcurrent[T any](slice []T, predicate func(index int, item T) bool, numThreads int) []T {
Expand Down
12 changes: 12 additions & 0 deletions slice/slice_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,18 @@ func ExampleReduce() {
// 6
}

func ExampleReduceConcurrent() {
nums := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
result := ReduceConcurrent(nums, 0, func(_ int, item, agg int) int {
return agg + item
}, 1)

fmt.Println(result)

// Output:
// 55
}

func ExampleReduceBy() {
result1 := ReduceBy([]int{1, 2, 3, 4}, 0, func(_ int, item int, agg int) int {
return agg + item
Expand Down
38 changes: 38 additions & 0 deletions slice/slice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,44 @@ func TestReduce(t *testing.T) {
}
}

func TestReduceConcurrent(t *testing.T) {
t.Parallel()

assert := internal.NewAssert(t, "TestReduceConcurrent")

t.Run("basic", func(t *testing.T) {
nums := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
result := ReduceConcurrent(nums, 0, func(_ int, item, agg int) int {
return agg + item
}, 4)
assert.Equal(55, result)
})

t.Run("empty slice", func(t *testing.T) {
nums := []int{}
result := ReduceConcurrent(nums, 0, func(_ int, item, agg int) int {
return agg + item
}, 4)
assert.Equal(0, result)
})

t.Run("single thread", func(t *testing.T) {
nums := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
result := ReduceConcurrent(nums, 0, func(_ int, item, agg int) int {
return agg + item
}, 1)
assert.Equal(55, result)
})

t.Run("negative threads", func(t *testing.T) {
nums := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
result := ReduceConcurrent(nums, 0, func(_ int, item, agg int) int {
return agg + item
}, -1)
assert.Equal(55, result)
})
}

func TestReduceBy(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit c0b200f

Please sign in to comment.