Skip to content

Commit

Permalink
fix: add the db information in the dml message
Browse files Browse the repository at this point in the history
Signed-off-by: SimFG <[email protected]>
  • Loading branch information
SimFG committed Nov 25, 2024
1 parent fbb68ca commit 3acbd48
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 28 deletions.
2 changes: 2 additions & 0 deletions internal/proxy/meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/milvus-io/milvus/pkg/util"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/expr"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
Expand Down Expand Up @@ -350,6 +351,7 @@ func InitMetaCache(ctx context.Context, rootCoord types.RootCoordClient, queryCo
if err != nil {
return err
}
expr.Register("cache", globalMetaCache)

// The privilege info is a little more. And to get this info, the query operation of involving multiple table queries is required.
resp, err := rootCoord.ListPolicy(ctx, &internalpb.ListPolicyRequest{})
Expand Down
2 changes: 2 additions & 0 deletions internal/proxy/msg_pack.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ func genInsertMsgsByPartition(ctx context.Context,
),
CollectionID: insertMsg.CollectionID,
PartitionID: partitionID,
DbName: insertMsg.DbName,
DbID: insertMsg.DbID,
CollectionName: insertMsg.CollectionName,
PartitionName: partitionName,
SegmentID: segmentID,
Expand Down
22 changes: 12 additions & 10 deletions internal/proxy/task_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type deleteTask struct {
primaryKeys *schemapb.IDs
collectionID UniqueID
partitionID UniqueID
dbID UniqueID
partitionKeyMode bool

// set by scheduler
Expand Down Expand Up @@ -146,16 +147,12 @@ func (dt *deleteTask) Execute(ctx context.Context) (err error) {
return err
}

result, numRows, err := repackDeleteMsgByHash(
ctx,
dt.primaryKeys,
dt.vChannels,
dt.idAllocator,
dt.ts,
dt.collectionID,
dt.req.GetCollectionName(),
dt.partitionID,
dt.req.GetPartitionName(),
result, numRows, err := repackDeleteMsgByHash(ctx,
dt.primaryKeys, dt.vChannels,
dt.idAllocator, dt.ts,
dt.collectionID, dt.req.GetCollectionName(),
dt.partitionID, dt.req.GetPartitionName(),
dt.dbID, dt.req.GetDbName(),
)
if err != nil {
return err
Expand Down Expand Up @@ -204,6 +201,8 @@ func repackDeleteMsgByHash(
collectionName string,
partitionID int64,
partitionName string,
dbID int64,
dbName string,
) (map[uint32][]*msgstream.DeleteMsg, int64, error) {
maxSize := Params.PulsarCfg.MaxMessageSize.GetAsInt()
hashValues := typeutil.HashPK2Channels(primaryKeys, vChannels)
Expand Down Expand Up @@ -233,6 +232,8 @@ func repackDeleteMsgByHash(
PartitionID: partitionID,
CollectionName: collectionName,
PartitionName: partitionName,
DbID: dbID,
DbName: dbName,
PrimaryKeys: &schemapb.IDs{},
ShardName: vchannel,
},
Expand Down Expand Up @@ -413,6 +414,7 @@ func (dr *deleteRunner) produce(ctx context.Context, primaryKeys *schemapb.IDs)
partitionKeyMode: dr.partitionKeyMode,
vChannels: dr.vChannels,
primaryKeys: primaryKeys,
dbID: dr.dbID,
}
var enqueuedTask task = dt
if streamingutil.IsStreamingServiceEnabled() {
Expand Down
17 changes: 6 additions & 11 deletions internal/proxy/task_delete_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,12 @@ func (dt *deleteTaskByStreamingService) Execute(ctx context.Context) (err error)
}

dt.tr = timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute delete %d", dt.ID()))
result, numRows, err := repackDeleteMsgByHash(
ctx,
dt.primaryKeys,
dt.vChannels,
dt.idAllocator,
dt.ts,
dt.collectionID,
dt.req.GetCollectionName(),
dt.partitionID,
dt.req.GetPartitionName(),
)
result, numRows, err := repackDeleteMsgByHash(ctx,
dt.primaryKeys, dt.vChannels,
dt.idAllocator, dt.ts,
dt.collectionID, dt.req.GetCollectionName(),
dt.partitionID, dt.req.GetPartitionName(),
dt.dbID, dt.req.GetDbName())
if err != nil {
return err
}
Expand Down
8 changes: 8 additions & 0 deletions internal/proxy/task_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type insertTask struct {
pChannels []pChan
schema *schemapb.CollectionSchema
partitionKeys *schemapb.FieldData
dbID UniqueID
}

// TraceCtx returns insertTask context
Expand Down Expand Up @@ -132,6 +133,13 @@ func (it *insertTask) PreExecute(ctx context.Context) error {
}
it.schema = schema.CollectionSchema

dbInfo, err := globalMetaCache.GetDatabaseInfo(it.ctx, it.insertMsg.GetDbName())
if err != nil {
log.Warn("get database info failed", zap.String("databaseName", it.insertMsg.GetDbName()), zap.Error(err))
return merr.WrapErrAsInputErrorWhen(err, merr.ErrDatabaseNotFound)
}

Check warning on line 140 in internal/proxy/task_insert.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/task_insert.go#L138-L140

Added lines #L138 - L140 were not covered by tests
it.insertMsg.DbID = dbInfo.dbID

rowNums := uint32(it.insertMsg.NRows())
// set insertTask.rowIDs
var rowIDBegin UniqueID
Expand Down
12 changes: 12 additions & 0 deletions internal/proxy/task_upsert.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type upsertTask struct {
idAllocator *allocator.IDAllocator
segIDAssigner *segIDAssigner
collectionID UniqueID
dbID UniqueID
chMgr channelsMgr
chTicker channelsTimeTicker
vChannels []vChan
Expand Down Expand Up @@ -301,6 +302,15 @@ func (it *upsertTask) PreExecute(ctx context.Context) error {
}
it.schema = schema

dbInfo, err := globalMetaCache.GetDatabaseInfo(ctx, it.req.GetDbName())
if err != nil {
log.Warn("Failed to get database info",
zap.String("dbName", it.req.GetDbName()),
zap.Error(err))
return err
}

Check warning on line 311 in internal/proxy/task_upsert.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/task_upsert.go#L307-L311

Added lines #L307 - L311 were not covered by tests
it.dbID = dbInfo.dbID

it.partitionKeyMode, err = isPartitionKeyMode(ctx, it.req.GetDbName(), collectionName)
if err != nil {
log.Warn("check partition key mode failed",
Expand Down Expand Up @@ -335,6 +345,7 @@ func (it *upsertTask) PreExecute(ctx context.Context) error {
NumRows: uint64(it.req.NumRows),
Version: msgpb.InsertDataVersion_ColumnBased,
DbName: it.req.DbName,
DbID: it.dbID,
},
},
DeleteMsg: &msgstream.DeleteMsg{
Expand All @@ -344,6 +355,7 @@ func (it *upsertTask) PreExecute(ctx context.Context) error {
commonpbutil.WithSourceID(paramtable.GetNodeID()),
),
DbName: it.req.DbName,
DbID: it.dbID,
CollectionName: it.req.CollectionName,
NumRows: int64(it.req.NumRows),
PartitionName: it.req.PartitionName,
Expand Down
11 changes: 4 additions & 7 deletions internal/proxy/task_upsert_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,11 @@ func (it *upsertTaskByStreamingService) packDeleteMessage(ctx context.Context) (
result, numRows, err := repackDeleteMsgByHash(
ctx,
it.upsertMsg.DeleteMsg.PrimaryKeys,
vChannels,
it.idAllocator,
it.BeginTs(),
it.upsertMsg.DeleteMsg.CollectionID,
vChannels, it.idAllocator,
it.BeginTs(), it.upsertMsg.DeleteMsg.CollectionID,

Check warning on line 113 in internal/proxy/task_upsert_streaming.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/task_upsert_streaming.go#L112-L113

Added lines #L112 - L113 were not covered by tests
it.upsertMsg.DeleteMsg.CollectionName,
it.upsertMsg.DeleteMsg.PartitionID,
it.upsertMsg.DeleteMsg.PartitionName,
)
it.upsertMsg.DeleteMsg.PartitionID, it.upsertMsg.DeleteMsg.PartitionName,
it.dbID, it.req.GetDbName())

Check warning on line 116 in internal/proxy/task_upsert_streaming.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/task_upsert_streaming.go#L115-L116

Added lines #L115 - L116 were not covered by tests
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 3acbd48

Please sign in to comment.