From 3acbd48e9a0c66a7ea1551fb4cb2ae15771b97ee Mon Sep 17 00:00:00 2001 From: SimFG Date: Mon, 25 Nov 2024 10:19:19 +0800 Subject: [PATCH] fix: add the db information in the dml message Signed-off-by: SimFG --- internal/proxy/meta_cache.go | 2 ++ internal/proxy/msg_pack.go | 2 ++ internal/proxy/task_delete.go | 22 ++++++++++++---------- internal/proxy/task_delete_streaming.go | 17 ++++++----------- internal/proxy/task_insert.go | 8 ++++++++ internal/proxy/task_upsert.go | 12 ++++++++++++ internal/proxy/task_upsert_streaming.go | 11 ++++------- 7 files changed, 46 insertions(+), 28 deletions(-) diff --git a/internal/proxy/meta_cache.go b/internal/proxy/meta_cache.go index 75e2578606b45..daea35b137909 100644 --- a/internal/proxy/meta_cache.go +++ b/internal/proxy/meta_cache.go @@ -42,6 +42,7 @@ import ( "github.com/milvus-io/milvus/pkg/util" "github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/conc" + "github.com/milvus-io/milvus/pkg/util/expr" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -350,6 +351,7 @@ func InitMetaCache(ctx context.Context, rootCoord types.RootCoordClient, queryCo if err != nil { return err } + expr.Register("cache", globalMetaCache) // The privilege info is a little more. And to get this info, the query operation of involving multiple table queries is required. resp, err := rootCoord.ListPolicy(ctx, &internalpb.ListPolicyRequest{}) diff --git a/internal/proxy/msg_pack.go b/internal/proxy/msg_pack.go index 426cc0a9d1f7b..848aee1a890a0 100644 --- a/internal/proxy/msg_pack.go +++ b/internal/proxy/msg_pack.go @@ -58,6 +58,8 @@ func genInsertMsgsByPartition(ctx context.Context, ), CollectionID: insertMsg.CollectionID, PartitionID: partitionID, + DbName: insertMsg.DbName, + DbID: insertMsg.DbID, CollectionName: insertMsg.CollectionName, PartitionName: partitionName, SegmentID: segmentID, diff --git a/internal/proxy/task_delete.go b/internal/proxy/task_delete.go index ff585ff643f84..47f1464b30e89 100644 --- a/internal/proxy/task_delete.go +++ b/internal/proxy/task_delete.go @@ -56,6 +56,7 @@ type deleteTask struct { primaryKeys *schemapb.IDs collectionID UniqueID partitionID UniqueID + dbID UniqueID partitionKeyMode bool // set by scheduler @@ -146,16 +147,12 @@ func (dt *deleteTask) Execute(ctx context.Context) (err error) { return err } - result, numRows, err := repackDeleteMsgByHash( - ctx, - dt.primaryKeys, - dt.vChannels, - dt.idAllocator, - dt.ts, - dt.collectionID, - dt.req.GetCollectionName(), - dt.partitionID, - dt.req.GetPartitionName(), + result, numRows, err := repackDeleteMsgByHash(ctx, + dt.primaryKeys, dt.vChannels, + dt.idAllocator, dt.ts, + dt.collectionID, dt.req.GetCollectionName(), + dt.partitionID, dt.req.GetPartitionName(), + dt.dbID, dt.req.GetDbName(), ) if err != nil { return err @@ -204,6 +201,8 @@ func repackDeleteMsgByHash( collectionName string, partitionID int64, partitionName string, + dbID int64, + dbName string, ) (map[uint32][]*msgstream.DeleteMsg, int64, error) { maxSize := Params.PulsarCfg.MaxMessageSize.GetAsInt() hashValues := typeutil.HashPK2Channels(primaryKeys, vChannels) @@ -233,6 +232,8 @@ func repackDeleteMsgByHash( PartitionID: partitionID, CollectionName: collectionName, PartitionName: partitionName, + DbID: dbID, + DbName: dbName, PrimaryKeys: &schemapb.IDs{}, ShardName: vchannel, }, @@ -413,6 +414,7 @@ func (dr *deleteRunner) produce(ctx context.Context, primaryKeys *schemapb.IDs) partitionKeyMode: dr.partitionKeyMode, vChannels: dr.vChannels, primaryKeys: primaryKeys, + dbID: dr.dbID, } var enqueuedTask task = dt if streamingutil.IsStreamingServiceEnabled() { diff --git a/internal/proxy/task_delete_streaming.go b/internal/proxy/task_delete_streaming.go index 5e9e107d72d86..8e3d2f272af68 100644 --- a/internal/proxy/task_delete_streaming.go +++ b/internal/proxy/task_delete_streaming.go @@ -30,17 +30,12 @@ func (dt *deleteTaskByStreamingService) Execute(ctx context.Context) (err error) } dt.tr = timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute delete %d", dt.ID())) - result, numRows, err := repackDeleteMsgByHash( - ctx, - dt.primaryKeys, - dt.vChannels, - dt.idAllocator, - dt.ts, - dt.collectionID, - dt.req.GetCollectionName(), - dt.partitionID, - dt.req.GetPartitionName(), - ) + result, numRows, err := repackDeleteMsgByHash(ctx, + dt.primaryKeys, dt.vChannels, + dt.idAllocator, dt.ts, + dt.collectionID, dt.req.GetCollectionName(), + dt.partitionID, dt.req.GetPartitionName(), + dt.dbID, dt.req.GetDbName()) if err != nil { return err } diff --git a/internal/proxy/task_insert.go b/internal/proxy/task_insert.go index fd86fc9d3c343..192e061e19eed 100644 --- a/internal/proxy/task_insert.go +++ b/internal/proxy/task_insert.go @@ -38,6 +38,7 @@ type insertTask struct { pChannels []pChan schema *schemapb.CollectionSchema partitionKeys *schemapb.FieldData + dbID UniqueID } // TraceCtx returns insertTask context @@ -132,6 +133,13 @@ func (it *insertTask) PreExecute(ctx context.Context) error { } it.schema = schema.CollectionSchema + dbInfo, err := globalMetaCache.GetDatabaseInfo(it.ctx, it.insertMsg.GetDbName()) + if err != nil { + log.Warn("get database info failed", zap.String("databaseName", it.insertMsg.GetDbName()), zap.Error(err)) + return merr.WrapErrAsInputErrorWhen(err, merr.ErrDatabaseNotFound) + } + it.insertMsg.DbID = dbInfo.dbID + rowNums := uint32(it.insertMsg.NRows()) // set insertTask.rowIDs var rowIDBegin UniqueID diff --git a/internal/proxy/task_upsert.go b/internal/proxy/task_upsert.go index 154bbba8753b7..8b20078387726 100644 --- a/internal/proxy/task_upsert.go +++ b/internal/proxy/task_upsert.go @@ -56,6 +56,7 @@ type upsertTask struct { idAllocator *allocator.IDAllocator segIDAssigner *segIDAssigner collectionID UniqueID + dbID UniqueID chMgr channelsMgr chTicker channelsTimeTicker vChannels []vChan @@ -301,6 +302,15 @@ func (it *upsertTask) PreExecute(ctx context.Context) error { } it.schema = schema + dbInfo, err := globalMetaCache.GetDatabaseInfo(ctx, it.req.GetDbName()) + if err != nil { + log.Warn("Failed to get database info", + zap.String("dbName", it.req.GetDbName()), + zap.Error(err)) + return err + } + it.dbID = dbInfo.dbID + it.partitionKeyMode, err = isPartitionKeyMode(ctx, it.req.GetDbName(), collectionName) if err != nil { log.Warn("check partition key mode failed", @@ -335,6 +345,7 @@ func (it *upsertTask) PreExecute(ctx context.Context) error { NumRows: uint64(it.req.NumRows), Version: msgpb.InsertDataVersion_ColumnBased, DbName: it.req.DbName, + DbID: it.dbID, }, }, DeleteMsg: &msgstream.DeleteMsg{ @@ -344,6 +355,7 @@ func (it *upsertTask) PreExecute(ctx context.Context) error { commonpbutil.WithSourceID(paramtable.GetNodeID()), ), DbName: it.req.DbName, + DbID: it.dbID, CollectionName: it.req.CollectionName, NumRows: int64(it.req.NumRows), PartitionName: it.req.PartitionName, diff --git a/internal/proxy/task_upsert_streaming.go b/internal/proxy/task_upsert_streaming.go index 12cb78ba9329b..d80ced83fdbab 100644 --- a/internal/proxy/task_upsert_streaming.go +++ b/internal/proxy/task_upsert_streaming.go @@ -109,14 +109,11 @@ func (it *upsertTaskByStreamingService) packDeleteMessage(ctx context.Context) ( result, numRows, err := repackDeleteMsgByHash( ctx, it.upsertMsg.DeleteMsg.PrimaryKeys, - vChannels, - it.idAllocator, - it.BeginTs(), - it.upsertMsg.DeleteMsg.CollectionID, + vChannels, it.idAllocator, + it.BeginTs(), it.upsertMsg.DeleteMsg.CollectionID, it.upsertMsg.DeleteMsg.CollectionName, - it.upsertMsg.DeleteMsg.PartitionID, - it.upsertMsg.DeleteMsg.PartitionName, - ) + it.upsertMsg.DeleteMsg.PartitionID, it.upsertMsg.DeleteMsg.PartitionName, + it.dbID, it.req.GetDbName()) if err != nil { return nil, err }