Skip to content

Commit

Permalink
feat: integrate storagev2 into index build process (milvus-io#28995)
Browse files Browse the repository at this point in the history
issue: milvus-io#28994

---------

Signed-off-by: sunby <[email protected]>
  • Loading branch information
sunby authored Dec 13, 2023
1 parent ed79505 commit ad866d2
Show file tree
Hide file tree
Showing 22 changed files with 1,110 additions and 439 deletions.
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,12 @@ require (

require github.com/apache/arrow/go/v12 v12.0.1

require github.com/milvus-io/milvus-storage/go v0.0.0-20231109072809-1cd7b0866092

require (
github.com/milvus-io/milvus-storage/go v0.0.0-20231109072809-1cd7b0866092
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81
github.com/quasilyte/go-ruleguard/dsl v0.3.22
golang.org/x/net v0.17.0
)

require (
Expand Down Expand Up @@ -169,7 +172,6 @@ require (
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 // indirect
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 // indirect
github.com/pingcap/kvproto v0.0.0-20221129023506-621ec37aac7a // indirect
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 // indirect
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
Expand Down Expand Up @@ -218,7 +220,6 @@ require (
go.uber.org/automaxprocs v1.5.2 // indirect
golang.org/x/arch v0.3.0 // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/term v0.13.0 // indirect
golang.org/x/time v0.3.0 // indirect
Expand Down
79 changes: 68 additions & 11 deletions internal/datacoord/index_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ package datacoord

import (
"context"
"fmt"
"path"
"sync"
"time"

"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
Expand Down Expand Up @@ -81,13 +83,15 @@ type indexBuilder struct {
nodeManager *IndexNodeManager
chunkManager storage.ChunkManager
indexEngineVersionManager IndexEngineVersionManager
handler Handler
}

func newIndexBuilder(
ctx context.Context,
metaTable *meta, nodeManager *IndexNodeManager,
chunkManager storage.ChunkManager,
indexEngineVersionManager IndexEngineVersionManager,
handler Handler,
) *indexBuilder {
ctx, cancel := context.WithCancel(ctx)

Expand All @@ -101,6 +105,7 @@ func newIndexBuilder(
policy: defaultBuildIndexPolicy,
nodeManager: nodeManager,
chunkManager: chunkManager,
handler: handler,
indexEngineVersionManager: indexEngineVersionManager,
}
ib.reloadFromKV()
Expand Down Expand Up @@ -299,18 +304,70 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
RequestTimeoutMs: Params.MinioCfg.RequestTimeoutMs.GetAsInt64(),
}
}
req := &indexpb.CreateJobRequest{
ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(),
IndexFilePrefix: path.Join(ib.chunkManager.RootPath(), common.SegmentIndexPath),
BuildID: buildID,
DataPaths: binLogs,
IndexVersion: meta.IndexVersion + 1,
StorageConfig: storageConfig,
IndexParams: indexParams,
TypeParams: typeParams,
NumRows: meta.NumRows,
CurrentIndexVersion: ib.indexEngineVersionManager.GetCurrentIndexEngineVersion(),

var req *indexpb.CreateJobRequest
if Params.CommonCfg.EnableStorageV2.GetAsBool() {
collectionInfo, err := ib.handler.GetCollection(ib.ctx, segment.GetCollectionID())
if err != nil {
log.Info("index builder get collection info failed", zap.Int64("collectionID", segment.GetCollectionID()), zap.Error(err))
return false
}

schema := collectionInfo.Schema
var field *schemapb.FieldSchema

for _, f := range schema.Fields {
if f.FieldID == fieldID {
field = f
break
}
}

dim, _ := storage.GetDimFromParams(field.TypeParams)
var scheme string
if Params.MinioCfg.UseSSL.GetAsBool() {
scheme = "https"
} else {
scheme = "http"
}

req = &indexpb.CreateJobRequest{
ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(),
IndexFilePrefix: path.Join(ib.chunkManager.RootPath(), common.SegmentIndexPath),
BuildID: buildID,
DataPaths: binLogs,
IndexVersion: meta.IndexVersion + 1,
StorageConfig: storageConfig,
IndexParams: indexParams,
TypeParams: typeParams,
NumRows: meta.NumRows,
CollectionID: segment.GetCollectionID(),
PartitionID: segment.GetPartitionID(),
SegmentID: segment.GetID(),
FieldID: fieldID,
FieldName: field.Name,
FieldType: field.DataType,
StorePath: fmt.Sprintf("s3://%s:%s@%s/%d?scheme=%s&endpoint_override=%s&allow_bucket_creation=true", Params.MinioCfg.AccessKeyID.GetValue(), Params.MinioCfg.SecretAccessKey.GetValue(), Params.MinioCfg.BucketName.GetValue(), segment.GetID(), scheme, Params.MinioCfg.Address.GetValue()),
StoreVersion: segment.GetStorageVersion(),
IndexStorePath: fmt.Sprintf("s3://%s:%s@%s/index/%d?scheme=%s&endpoint_override=%s&allow_bucket_creation=true", Params.MinioCfg.AccessKeyID.GetValue(), Params.MinioCfg.SecretAccessKey.GetValue(), Params.MinioCfg.BucketName.GetValue(), segment.GetID(), scheme, Params.MinioCfg.Address.GetValue()),
Dim: int64(dim),
CurrentIndexVersion: ib.indexEngineVersionManager.GetCurrentIndexEngineVersion(),
}
} else {
req = &indexpb.CreateJobRequest{
ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(),
IndexFilePrefix: path.Join(ib.chunkManager.RootPath(), common.SegmentIndexPath),
BuildID: buildID,
DataPaths: binLogs,
IndexVersion: meta.IndexVersion + 1,
StorageConfig: storageConfig,
IndexParams: indexParams,
TypeParams: typeParams,
NumRows: meta.NumRows,
CurrentIndexVersion: ib.indexEngineVersionManager.GetCurrentIndexEngineVersion(),
}
}

if err := ib.assignTask(client, req); err != nil {
// need to release lock then reassign, so set task state to retry
log.Ctx(ib.ctx).Warn("index builder assign task to IndexNode failed", zap.Int64("buildID", buildID),
Expand Down
137 changes: 136 additions & 1 deletion internal/datacoord/index_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"google.golang.org/grpc"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/metastore"
catalogmocks "github.com/milvus-io/milvus/internal/metastore/mocks"
"github.com/milvus-io/milvus/internal/metastore/model"
Expand Down Expand Up @@ -666,7 +667,7 @@ func TestIndexBuilder(t *testing.T) {
chunkManager := &mocks.ChunkManager{}
chunkManager.EXPECT().RootPath().Return("root")

ib := newIndexBuilder(ctx, mt, nodeManager, chunkManager, newIndexEngineVersionManager())
ib := newIndexBuilder(ctx, mt, nodeManager, chunkManager, newIndexEngineVersionManager(), nil)

assert.Equal(t, 6, len(ib.tasks))
assert.Equal(t, indexTaskInit, ib.tasks[buildID])
Expand Down Expand Up @@ -1061,3 +1062,137 @@ func TestIndexBuilder_Error(t *testing.T) {
assert.Equal(t, indexTaskRetry, state)
})
}

func TestIndexBuilderV2(t *testing.T) {
var (
collID = UniqueID(100)
partID = UniqueID(200)
indexID = UniqueID(300)
segID = UniqueID(500)
buildID = UniqueID(600)
nodeID = UniqueID(700)
)

paramtable.Init()
paramtable.Get().CommonCfg.EnableStorageV2.SwapTempValue("true")
defer paramtable.Get().CommonCfg.EnableStorageV2.SwapTempValue("false")
ctx := context.Background()
catalog := catalogmocks.NewDataCoordCatalog(t)
catalog.On("CreateSegmentIndex",
mock.Anything,
mock.Anything,
).Return(nil)
catalog.On("AlterSegmentIndexes",
mock.Anything,
mock.Anything,
).Return(nil)

ic := mocks.NewMockIndexNodeClient(t)
ic.EXPECT().GetJobStats(mock.Anything, mock.Anything, mock.Anything).
Return(&indexpb.GetJobStatsResponse{
Status: merr.Success(),
TotalJobNum: 1,
EnqueueJobNum: 0,
InProgressJobNum: 1,
TaskSlots: 1,
JobInfos: []*indexpb.JobInfo{
{
NumRows: 1024,
Dim: 128,
StartTime: 1,
EndTime: 10,
PodID: 1,
},
},
}, nil)
ic.EXPECT().QueryJobs(mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, in *indexpb.QueryJobsRequest, option ...grpc.CallOption) (*indexpb.QueryJobsResponse, error) {
indexInfos := make([]*indexpb.IndexTaskInfo, 0)
for _, buildID := range in.BuildIDs {
indexInfos = append(indexInfos, &indexpb.IndexTaskInfo{
BuildID: buildID,
State: commonpb.IndexState_Finished,
IndexFileKeys: []string{"file1", "file2"},
})
}
return &indexpb.QueryJobsResponse{
Status: merr.Success(),
ClusterID: in.ClusterID,
IndexInfos: indexInfos,
}, nil
})

ic.EXPECT().CreateJob(mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(merr.Success(), nil)

ic.EXPECT().DropJobs(mock.Anything, mock.Anything, mock.Anything).
Return(merr.Success(), nil)
mt := createMetaTable(catalog)
nodeManager := &IndexNodeManager{
ctx: ctx,
nodeClients: map[UniqueID]types.IndexNodeClient{
4: ic,
},
}
chunkManager := &mocks.ChunkManager{}
chunkManager.EXPECT().RootPath().Return("root")

handler := NewNMockHandler(t)
handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{
ID: collID,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{FieldID: fieldID, Name: "vec", TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "10"}}},
},
},
}, nil)

ib := newIndexBuilder(ctx, mt, nodeManager, chunkManager, newIndexEngineVersionManager(), handler)

assert.Equal(t, 6, len(ib.tasks))
assert.Equal(t, indexTaskInit, ib.tasks[buildID])
assert.Equal(t, indexTaskInProgress, ib.tasks[buildID+1])
// buildID+2 will be filter by isDeleted
assert.Equal(t, indexTaskInit, ib.tasks[buildID+3])
assert.Equal(t, indexTaskInProgress, ib.tasks[buildID+8])
assert.Equal(t, indexTaskInit, ib.tasks[buildID+9])
assert.Equal(t, indexTaskInit, ib.tasks[buildID+10])

ib.scheduleDuration = time.Millisecond * 500
ib.Start()

t.Run("enqueue", func(t *testing.T) {
segIdx := &model.SegmentIndex{
SegmentID: segID + 10,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID + 10,
NodeID: 0,
IndexVersion: 0,
IndexState: 0,
FailReason: "",
IsDeleted: false,
CreateTime: 0,
IndexFileKeys: nil,
IndexSize: 0,
}
err := ib.meta.AddSegmentIndex(segIdx)
assert.NoError(t, err)
ib.enqueue(buildID + 10)
})

t.Run("node down", func(t *testing.T) {
ib.nodeDown(nodeID)
})

for {
ib.taskMutex.RLock()
if len(ib.tasks) == 0 {
break
}
ib.taskMutex.RUnlock()
}
ib.Stop()
}
2 changes: 1 addition & 1 deletion internal/datacoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ func (s *Server) initMeta(chunkManager storage.ChunkManager) error {

func (s *Server) initIndexBuilder(manager storage.ChunkManager) {
if s.indexBuilder == nil {
s.indexBuilder = newIndexBuilder(s.ctx, s.meta, s.indexNodeManager, manager, s.indexEngineVersionManager)
s.indexBuilder = newIndexBuilder(s.ctx, s.meta, s.indexNodeManager, manager, s.indexEngineVersionManager, s.handler)
}
}

Expand Down
11 changes: 11 additions & 0 deletions internal/datacoord/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,17 @@ func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI
continue
}

if Params.CommonCfg.EnableStorageV2.GetAsBool() {
segmentInfos = append(segmentInfos, &datapb.SegmentInfo{
ID: segment.ID,
PartitionID: segment.PartitionID,
CollectionID: segment.CollectionID,
InsertChannel: segment.InsertChannel,
NumOfRows: segment.NumOfRows,
})
continue
}

binlogs := segment.GetBinlogs()
if len(binlogs) == 0 && segment.GetLevel() != datapb.SegmentLevel_L0 {
continue
Expand Down
Loading

0 comments on commit ad866d2

Please sign in to comment.