From 473f9c9f3e7b98fdd6eed188f1700eacec8b12da Mon Sep 17 00:00:00 2001 From: donutloop Date: Thu, 29 Feb 2024 04:30:12 +0100 Subject: [PATCH 1/2] Iterator: general refactoring and reset method (#193) Feature Reset allows for the iteration process over a sequence to be restarted from the beginning. It enables reusing the iterator for multiple traversals without needing to recreate it. Refactoring It is a idiomatic practice to design functions and methods to return concrete struct types. This approach promotes flexibility and decoupling, allowing the calling code to work with any implementation that satisfies the interface --- iterator/iterator.go | 66 ++++++++++++++++++++------------- iterator/iterator_test.go | 75 ++++++++++++++++++++++++++++++++++++-- iterator/operation_test.go | 14 +++---- 3 files changed, 120 insertions(+), 35 deletions(-) diff --git a/iterator/iterator.go b/iterator/iterator.go index 1acda7ee..949405e3 100644 --- a/iterator/iterator.go +++ b/iterator/iterator.go @@ -27,7 +27,15 @@ type Iterator[T any] interface { Next() (item T, ok bool) } -// StopIterator is an interface for stopping Iterator. +// ResettableIterator supports to reset the iterator +type ResettableIterator[T any] interface { + Iterator[T] + // Reset allows for the iteration process over a sequence to be restarted from the beginning. + // It enables reusing the iterator for multiple traversals without needing to recreate it. + Reset() +} + +// StopIterator is an interface for stopping Iterator. type StopIterator[T any] interface { Iterator[T] @@ -81,8 +89,8 @@ type PrevIterator[T any] interface { //////////////////////////////////////////////////////////////////////////////////////////////////// // FromSlice returns an iterator over a slice of data. -func FromSlice[T any](slice []T) Iterator[T] { - return &sliceIterator[T]{slice: slice, index: -1} +func FromSlice[T any](slice []T) *SliceIterator[T] { + return &SliceIterator[T]{slice: slice, index: -1} } func ToSlice[T any](iter Iterator[T]) []T { @@ -93,16 +101,16 @@ func ToSlice[T any](iter Iterator[T]) []T { return result } -type sliceIterator[T any] struct { +type SliceIterator[T any] struct { slice []T index int } -func (iter *sliceIterator[T]) HasNext() bool { +func (iter *SliceIterator[T]) HasNext() bool { return iter.index < len(iter.slice)-1 } -func (iter *sliceIterator[T]) Next() (T, bool) { +func (iter *SliceIterator[T]) Next() (T, bool) { iter.index++ ok := iter.index >= 0 && iter.index < len(iter.slice) @@ -116,7 +124,7 @@ func (iter *sliceIterator[T]) Next() (T, bool) { } // Prev implements PrevIterator. -func (iter *sliceIterator[T]) Prev() { +func (iter *SliceIterator[T]) Prev() { if iter.index == -1 { panic("Next function should be called Prev") } @@ -128,7 +136,7 @@ func (iter *sliceIterator[T]) Prev() { } // Set implements SetIterator. -func (iter *sliceIterator[T]) Set(value T) { +func (iter *SliceIterator[T]) Set(value T) { if iter.index == -1 { panic("Next function should be called Set") } @@ -138,52 +146,60 @@ func (iter *sliceIterator[T]) Set(value T) { iter.slice[iter.index] = value } +func (iter *SliceIterator[T]) Reset() { + iter.index = -1 +} + // FromRange creates a iterator which returns the numeric range between start inclusive and end // exclusive by the step size. start should be less than end, step shoud be positive. -func FromRange[T constraints.Integer | constraints.Float](start, end, step T) Iterator[T] { +func FromRange[T constraints.Integer | constraints.Float](start, end, step T) *RangeIterator[T] { if end < start { panic("RangeIterator: start should be before end") } else if step <= 0 { panic("RangeIterator: step should be positive") } - return &rangeIterator[T]{start: start, end: end, step: step} + return &RangeIterator[T]{start: start, end: end, step: step, current: start} } -type rangeIterator[T constraints.Integer | constraints.Float] struct { - start, end, step T +type RangeIterator[T constraints.Integer | constraints.Float] struct { + start, end, step, current T } -func (iter *rangeIterator[T]) HasNext() bool { - return iter.start < iter.end +func (iter *RangeIterator[T]) HasNext() bool { + return iter.current < iter.end } -func (iter *rangeIterator[T]) Next() (T, bool) { - if iter.start >= iter.end { +func (iter *RangeIterator[T]) Next() (T, bool) { + if iter.current >= iter.end { var zero T return zero, false } - num := iter.start - iter.start += iter.step + num := iter.current + iter.current += iter.step return num, true } -// FromRange creates a iterator which returns the numeric range between start inclusive and end -// exclusive by the step size. start should be less than end, step shoud be positive. -func FromChannel[T any](channel <-chan T) Iterator[T] { - return &channelIterator[T]{channel: channel} +func (iter *RangeIterator[T]) Reset() { + iter.current = iter.start +} + +// FromChannel creates an iterator which returns items received from the provided channel. +// The iteration continues until the channel is closed. +func FromChannel[T any](channel <-chan T) *ChannelIterator[T] { + return &ChannelIterator[T]{channel: channel} } -type channelIterator[T any] struct { +type ChannelIterator[T any] struct { channel <-chan T } -func (iter *channelIterator[T]) Next() (T, bool) { +func (iter *ChannelIterator[T]) Next() (T, bool) { item, ok := <-iter.channel return item, ok } -func (iter *channelIterator[T]) HasNext() bool { +func (iter *ChannelIterator[T]) HasNext() bool { return len(iter.channel) == 0 } diff --git a/iterator/iterator_test.go b/iterator/iterator_test.go index 726175b3..3a6e606f 100644 --- a/iterator/iterator_test.go +++ b/iterator/iterator_test.go @@ -50,15 +50,36 @@ func TestSliceIterator(t *testing.T) { assert.Equal(false, ok) }) + // Reset + t.Run("slice iterator Reset: ", func(t *testing.T) { + iter1 := FromSlice([]int{1, 2, 3, 4}) + for i := 0; i < 4; i++ { + item, ok := iter1.Next() + if !ok { + break + } + assert.Equal(i+1, item) + } + + iter1.Reset() + + for i := 0; i < 4; i++ { + item, ok := iter1.Next() + if !ok { + break + } + assert.Equal(i+1, item) + } + }) + t.Run("slice iterator ToSlice: ", func(t *testing.T) { iter := FromSlice([]int{1, 2, 3, 4}) item, _ := iter.Next() assert.Equal(1, item) - data := ToSlice(iter) + data := ToSlice[int](iter) assert.Equal([]int{2, 3, 4}, data) }) - } func TestRangeIterator(t *testing.T) { @@ -84,6 +105,54 @@ func TestRangeIterator(t *testing.T) { _, ok = iter.Next() assert.Equal(false, ok) assert.Equal(false, iter.HasNext()) + + iter.Reset() + + item, ok = iter.Next() + assert.Equal(1, item) + assert.Equal(true, ok) + + item, ok = iter.Next() + assert.Equal(2, item) + assert.Equal(true, ok) + + item, ok = iter.Next() + assert.Equal(3, item) + assert.Equal(true, ok) + + _, ok = iter.Next() + assert.Equal(false, ok) + assert.Equal(false, iter.HasNext()) + }) + + t.Run("range iterator reset: ", func(t *testing.T) { + iter := FromRange(1, 4, 1) + + item, ok := iter.Next() + assert.Equal(1, item) + assert.Equal(true, ok) + + item, ok = iter.Next() + assert.Equal(2, item) + assert.Equal(true, ok) + + iter.Reset() + + item, ok = iter.Next() + assert.Equal(1, item) + assert.Equal(true, ok) + + item, ok = iter.Next() + assert.Equal(2, item) + assert.Equal(true, ok) + + item, ok = iter.Next() + assert.Equal(3, item) + assert.Equal(true, ok) + + _, ok = iter.Next() + assert.Equal(false, ok) + assert.Equal(false, iter.HasNext()) }) } @@ -93,7 +162,7 @@ func TestChannelIterator(t *testing.T) { assert := internal.NewAssert(t, "TestRangeIterator") - iter := FromSlice([]int{1, 2, 3, 4}) + var iter Iterator[int] = FromSlice([]int{1, 2, 3, 4}) ctx, cancel := context.WithCancel(context.Background()) iter = FromChannel(ToChannel(ctx, iter, 0)) diff --git a/iterator/operation_test.go b/iterator/operation_test.go index 4c2517ea..e0889e9a 100644 --- a/iterator/operation_test.go +++ b/iterator/operation_test.go @@ -21,7 +21,7 @@ func TestMapIterator(t *testing.T) { assert := internal.NewAssert(t, "TestMapIterator") - iter := FromSlice([]int{1, 2, 3, 4}) + var iter Iterator[int] = FromSlice([]int{1, 2, 3, 4}) iter = Map(iter, func(n int) int { return n / 2 }) @@ -34,7 +34,7 @@ func TestFilterIterator(t *testing.T) { assert := internal.NewAssert(t, "TestFilterIterator") - iter := FromSlice([]int{1, 2, 3, 4}) + var iter Iterator[int] = FromSlice([]int{1, 2, 3, 4}) iter = Filter(iter, func(n int) bool { return n < 3 }) @@ -47,10 +47,10 @@ func TestJoinIterator(t *testing.T) { assert := internal.NewAssert(t, "TestJoinIterator") - iter1 := FromSlice([]int{1, 2}) - iter2 := FromSlice([]int{3, 4}) + var iter1 Iterator[int] = FromSlice([]int{1, 2}) + var iter2 Iterator[int] = FromSlice([]int{3, 4}) - iter := Join(iter1, iter2) + var iter Iterator[int] = Join(iter1, iter2) item, ok := iter.Next() assert.Equal(1, item) @@ -64,7 +64,7 @@ func TestReduce(t *testing.T) { assert := internal.NewAssert(t, "TestReduce") - iter := FromSlice([]int{1, 2, 3, 4}) + var iter Iterator[int] = FromSlice([]int{1, 2, 3, 4}) sum := Reduce(iter, 0, func(a, b int) int { return a + b }) assert.Equal(10, sum) } @@ -74,7 +74,7 @@ func TestTakeIterator(t *testing.T) { assert := internal.NewAssert(t, "TestTakeIterator") - iter := FromSlice([]int{1, 2, 3, 4, 5}) + var iter Iterator[int] = FromSlice([]int{1, 2, 3, 4, 5}) iter = Take(iter, 3) From 606d8872306d0757240dd80c21084ae01cc1ab6a Mon Sep 17 00:00:00 2001 From: colorcrow Date: Thu, 29 Feb 2024 11:32:10 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E5=A2=9E=E5=8A=A0ParallelChunkRead?= =?UTF-8?q?=E6=96=B9=E6=B3=95=EF=BC=8C=E5=88=86=E5=9D=97=E5=B9=B6=E5=8F=91?= =?UTF-8?q?=E8=AF=BB=E5=8F=96=E8=B6=85=E5=A4=A7=E6=96=87=E6=9C=AC=20(#192)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- fileutil/file.go | 88 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/fileutil/file.go b/fileutil/file.go index 878cdcd4..a5631b02 100644 --- a/fileutil/file.go +++ b/fileutil/file.go @@ -16,12 +16,14 @@ import ( "fmt" "io" "io/fs" + "log" "net/http" "os" "path/filepath" "runtime" "sort" "strings" + "sync" "github.com/duke-git/lancet/v2/validator" ) @@ -866,3 +868,89 @@ func isCsvSupportedType(v interface{}) bool { return false } } + +// ChunkRead 从文件的指定偏移读取块并返回块内所有行 +func ChunkRead(f *os.File, offset int64, size int, bufPool *sync.Pool) []string { + buf := bufPool.Get().([]byte)[:size] // 从Pool获取缓冲区并调整大小 + n, err := f.ReadAt(buf, offset) // 从指定偏移读取数据到缓冲区 + if err != nil && err != io.EOF { + log.Fatal(err) + } + buf = buf[:n] // 调整切片以匹配实际读取的字节数 + + var lines []string + var lineStart int + for i, b := range buf { + if b == '\n' { + line := string(buf[lineStart:i]) // 不包括换行符 + lines = append(lines, line) + lineStart = i + 1 // 设置下一行的开始 + } + } + + if lineStart < len(buf) { // 处理块末尾的行 + line := string(buf[lineStart:]) + lines = append(lines, line) + } + bufPool.Put(buf) // 读取完成后,将缓冲区放回Pool + return lines +} + +// 并行读取文件并将每个块的行发送到指定通道 +// filePath 文件路径 +// ChunkSizeMB 分块的大小(单位MB,设置为0时使用默认100MB),设置过大反而不利,视情调整 +// MaxGoroutine 并发读取分块的数量,设置为0时使用CPU核心数 +// linesCh用于接收返回结果的通道。 +func ParallelChunkRead(filePath string, linesCh chan<- []string, ChunkSizeMB, MaxGoroutine int) { + if ChunkSizeMB == 0 { + ChunkSizeMB = 100 + } + ChunkSize := ChunkSizeMB * 1024 * 1024 + // 内存复用 + bufPool := sync.Pool{ + New: func() interface{} { + return make([]byte, 0, ChunkSize) + }, + } + + if MaxGoroutine == 0 { + MaxGoroutine = runtime.NumCPU() // 设置为0时使用CPU核心数 + } + + f, err := os.Open(filePath) + if err != nil { + log.Fatalf("failed to open file: %v", err) + } + defer f.Close() + + info, err := f.Stat() + if err != nil { + log.Fatalf("failed to get file info: %v", err) + } + + wg := sync.WaitGroup{} + chunkOffsetCh := make(chan int64, MaxGoroutine) + + // 分配工作 + go func() { + for i := int64(0); i < info.Size(); i += int64(ChunkSize) { + chunkOffsetCh <- i + } + close(chunkOffsetCh) + }() + + // 启动工作协程 + for i := 0; i < MaxGoroutine; i++ { + wg.Add(1) + go func() { + for chunkOffset := range chunkOffsetCh { + linesCh <- ChunkRead(f, chunkOffset, ChunkSize, &bufPool) + } + wg.Done() + }() + } + + // 等待所有解析完成后关闭行通道 + wg.Wait() + close(linesCh) +}