Skip to content

Commit

Permalink
Merge branch 'main' of github.com:duke-git/lancet
Browse files Browse the repository at this point in the history
  • Loading branch information
duke-git committed Feb 29, 2024
2 parents 5db1d07 + 606d887 commit 7a98c43
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 35 deletions.
88 changes: 88 additions & 0 deletions fileutil/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ import (
"fmt"
"io"
"io/fs"
"log"
"net/http"
"os"
"path/filepath"
"runtime"
"sort"
"strings"
"sync"

"github.com/duke-git/lancet/v2/validator"
)
Expand Down Expand Up @@ -866,3 +868,89 @@ func isCsvSupportedType(v interface{}) bool {
return false
}
}

// ChunkRead 从文件的指定偏移读取块并返回块内所有行
func ChunkRead(f *os.File, offset int64, size int, bufPool *sync.Pool) []string {
buf := bufPool.Get().([]byte)[:size] // 从Pool获取缓冲区并调整大小
n, err := f.ReadAt(buf, offset) // 从指定偏移读取数据到缓冲区
if err != nil && err != io.EOF {
log.Fatal(err)
}
buf = buf[:n] // 调整切片以匹配实际读取的字节数

var lines []string
var lineStart int
for i, b := range buf {
if b == '\n' {
line := string(buf[lineStart:i]) // 不包括换行符
lines = append(lines, line)
lineStart = i + 1 // 设置下一行的开始
}
}

if lineStart < len(buf) { // 处理块末尾的行
line := string(buf[lineStart:])
lines = append(lines, line)
}
bufPool.Put(buf) // 读取完成后,将缓冲区放回Pool
return lines
}

// 并行读取文件并将每个块的行发送到指定通道
// filePath 文件路径
// ChunkSizeMB 分块的大小(单位MB,设置为0时使用默认100MB),设置过大反而不利,视情调整
// MaxGoroutine 并发读取分块的数量,设置为0时使用CPU核心数
// linesCh用于接收返回结果的通道。
func ParallelChunkRead(filePath string, linesCh chan<- []string, ChunkSizeMB, MaxGoroutine int) {
if ChunkSizeMB == 0 {
ChunkSizeMB = 100
}
ChunkSize := ChunkSizeMB * 1024 * 1024
// 内存复用
bufPool := sync.Pool{
New: func() interface{} {
return make([]byte, 0, ChunkSize)
},
}

if MaxGoroutine == 0 {
MaxGoroutine = runtime.NumCPU() // 设置为0时使用CPU核心数
}

f, err := os.Open(filePath)
if err != nil {
log.Fatalf("failed to open file: %v", err)
}
defer f.Close()

info, err := f.Stat()
if err != nil {
log.Fatalf("failed to get file info: %v", err)
}

wg := sync.WaitGroup{}
chunkOffsetCh := make(chan int64, MaxGoroutine)

// 分配工作
go func() {
for i := int64(0); i < info.Size(); i += int64(ChunkSize) {
chunkOffsetCh <- i
}
close(chunkOffsetCh)
}()

// 启动工作协程
for i := 0; i < MaxGoroutine; i++ {
wg.Add(1)
go func() {
for chunkOffset := range chunkOffsetCh {
linesCh <- ChunkRead(f, chunkOffset, ChunkSize, &bufPool)
}
wg.Done()
}()
}

// 等待所有解析完成后关闭行通道
wg.Wait()
close(linesCh)
}
66 changes: 41 additions & 25 deletions iterator/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,15 @@ type Iterator[T any] interface {
Next() (item T, ok bool)
}

// StopIterator is an interface for stopping Iterator.
// ResettableIterator supports to reset the iterator
type ResettableIterator[T any] interface {
Iterator[T]
// Reset allows for the iteration process over a sequence to be restarted from the beginning.
// It enables reusing the iterator for multiple traversals without needing to recreate it.
Reset()
}

// StopIterator is an interface for stopping Iterator.
type StopIterator[T any] interface {
Iterator[T]

Expand Down Expand Up @@ -81,8 +89,8 @@ type PrevIterator[T any] interface {
////////////////////////////////////////////////////////////////////////////////////////////////////

// FromSlice returns an iterator over a slice of data.
func FromSlice[T any](slice []T) Iterator[T] {
return &sliceIterator[T]{slice: slice, index: -1}
func FromSlice[T any](slice []T) *SliceIterator[T] {
return &SliceIterator[T]{slice: slice, index: -1}
}

func ToSlice[T any](iter Iterator[T]) []T {
Expand All @@ -93,16 +101,16 @@ func ToSlice[T any](iter Iterator[T]) []T {
return result
}

type sliceIterator[T any] struct {
type SliceIterator[T any] struct {
slice []T
index int
}

func (iter *sliceIterator[T]) HasNext() bool {
func (iter *SliceIterator[T]) HasNext() bool {
return iter.index < len(iter.slice)-1
}

func (iter *sliceIterator[T]) Next() (T, bool) {
func (iter *SliceIterator[T]) Next() (T, bool) {
iter.index++

ok := iter.index >= 0 && iter.index < len(iter.slice)
Expand All @@ -116,7 +124,7 @@ func (iter *sliceIterator[T]) Next() (T, bool) {
}

// Prev implements PrevIterator.
func (iter *sliceIterator[T]) Prev() {
func (iter *SliceIterator[T]) Prev() {
if iter.index == -1 {
panic("Next function should be called Prev")
}
Expand All @@ -128,7 +136,7 @@ func (iter *sliceIterator[T]) Prev() {
}

// Set implements SetIterator.
func (iter *sliceIterator[T]) Set(value T) {
func (iter *SliceIterator[T]) Set(value T) {
if iter.index == -1 {
panic("Next function should be called Set")
}
Expand All @@ -138,52 +146,60 @@ func (iter *sliceIterator[T]) Set(value T) {
iter.slice[iter.index] = value
}

func (iter *SliceIterator[T]) Reset() {
iter.index = -1
}

// FromRange creates a iterator which returns the numeric range between start inclusive and end
// exclusive by the step size. start should be less than end, step shoud be positive.
func FromRange[T constraints.Integer | constraints.Float](start, end, step T) Iterator[T] {
func FromRange[T constraints.Integer | constraints.Float](start, end, step T) *RangeIterator[T] {
if end < start {
panic("RangeIterator: start should be before end")
} else if step <= 0 {
panic("RangeIterator: step should be positive")
}

return &rangeIterator[T]{start: start, end: end, step: step}
return &RangeIterator[T]{start: start, end: end, step: step, current: start}
}

type rangeIterator[T constraints.Integer | constraints.Float] struct {
start, end, step T
type RangeIterator[T constraints.Integer | constraints.Float] struct {
start, end, step, current T
}

func (iter *rangeIterator[T]) HasNext() bool {
return iter.start < iter.end
func (iter *RangeIterator[T]) HasNext() bool {
return iter.current < iter.end
}

func (iter *rangeIterator[T]) Next() (T, bool) {
if iter.start >= iter.end {
func (iter *RangeIterator[T]) Next() (T, bool) {
if iter.current >= iter.end {
var zero T
return zero, false
}
num := iter.start
iter.start += iter.step
num := iter.current
iter.current += iter.step
return num, true
}

// FromRange creates a iterator which returns the numeric range between start inclusive and end
// exclusive by the step size. start should be less than end, step shoud be positive.
func FromChannel[T any](channel <-chan T) Iterator[T] {
return &channelIterator[T]{channel: channel}
func (iter *RangeIterator[T]) Reset() {
iter.current = iter.start
}

// FromChannel creates an iterator which returns items received from the provided channel.
// The iteration continues until the channel is closed.
func FromChannel[T any](channel <-chan T) *ChannelIterator[T] {
return &ChannelIterator[T]{channel: channel}
}

type channelIterator[T any] struct {
type ChannelIterator[T any] struct {
channel <-chan T
}

func (iter *channelIterator[T]) Next() (T, bool) {
func (iter *ChannelIterator[T]) Next() (T, bool) {
item, ok := <-iter.channel
return item, ok
}

func (iter *channelIterator[T]) HasNext() bool {
func (iter *ChannelIterator[T]) HasNext() bool {
return len(iter.channel) == 0
}

Expand Down
75 changes: 72 additions & 3 deletions iterator/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,36 @@ func TestSliceIterator(t *testing.T) {
assert.Equal(false, ok)
})

// Reset
t.Run("slice iterator Reset: ", func(t *testing.T) {
iter1 := FromSlice([]int{1, 2, 3, 4})
for i := 0; i < 4; i++ {
item, ok := iter1.Next()
if !ok {
break
}
assert.Equal(i+1, item)
}

iter1.Reset()

for i := 0; i < 4; i++ {
item, ok := iter1.Next()
if !ok {
break
}
assert.Equal(i+1, item)
}
})

t.Run("slice iterator ToSlice: ", func(t *testing.T) {
iter := FromSlice([]int{1, 2, 3, 4})
item, _ := iter.Next()
assert.Equal(1, item)

data := ToSlice(iter)
data := ToSlice[int](iter)
assert.Equal([]int{2, 3, 4}, data)
})

}

func TestRangeIterator(t *testing.T) {
Expand All @@ -84,6 +105,54 @@ func TestRangeIterator(t *testing.T) {
_, ok = iter.Next()
assert.Equal(false, ok)
assert.Equal(false, iter.HasNext())

iter.Reset()

item, ok = iter.Next()
assert.Equal(1, item)
assert.Equal(true, ok)

item, ok = iter.Next()
assert.Equal(2, item)
assert.Equal(true, ok)

item, ok = iter.Next()
assert.Equal(3, item)
assert.Equal(true, ok)

_, ok = iter.Next()
assert.Equal(false, ok)
assert.Equal(false, iter.HasNext())
})

t.Run("range iterator reset: ", func(t *testing.T) {
iter := FromRange(1, 4, 1)

item, ok := iter.Next()
assert.Equal(1, item)
assert.Equal(true, ok)

item, ok = iter.Next()
assert.Equal(2, item)
assert.Equal(true, ok)

iter.Reset()

item, ok = iter.Next()
assert.Equal(1, item)
assert.Equal(true, ok)

item, ok = iter.Next()
assert.Equal(2, item)
assert.Equal(true, ok)

item, ok = iter.Next()
assert.Equal(3, item)
assert.Equal(true, ok)

_, ok = iter.Next()
assert.Equal(false, ok)
assert.Equal(false, iter.HasNext())
})

}
Expand All @@ -93,7 +162,7 @@ func TestChannelIterator(t *testing.T) {

assert := internal.NewAssert(t, "TestRangeIterator")

iter := FromSlice([]int{1, 2, 3, 4})
var iter Iterator[int] = FromSlice([]int{1, 2, 3, 4})

ctx, cancel := context.WithCancel(context.Background())
iter = FromChannel(ToChannel(ctx, iter, 0))
Expand Down
Loading

0 comments on commit 7a98c43

Please sign in to comment.