Skip to content

Commit

Permalink
[CLIENT-2862] Use default batch policies when the record level batch …
Browse files Browse the repository at this point in the history
…policy is nil
  • Loading branch information
khaf committed May 8, 2024
1 parent 2ebeb8c commit b8779e5
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 70 deletions.
33 changes: 0 additions & 33 deletions batch_attr.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,17 +166,6 @@ func (ba *batchAttr) adjustReadForAllBins(readAllBins bool) {
}
}

func (ba *batchAttr) setWrite(wp *BasePolicy) {
ba.filterExp = nil
ba.readAttr = 0
ba.writeAttr = _INFO2_WRITE | _INFO2_RESPOND_ALL_OPS
ba.infoAttr = 0
ba.expiration = 0
ba.generation = 0
ba.hasWrite = true
ba.sendKey = wp.SendKey
}

func (ba *batchAttr) setBatchWrite(wp *BatchWritePolicy) {
ba.filterExp = wp.FilterExpression
ba.readAttr = 0
Expand Down Expand Up @@ -254,17 +243,6 @@ func (ba *batchAttr) adjustWrite(ops []*Operation) {
}
}

func (ba *batchAttr) setUDF(up *BasePolicy) {
ba.filterExp = nil
ba.readAttr = 0
ba.writeAttr = _INFO2_WRITE
ba.infoAttr = 0
ba.expiration = 0
ba.generation = 0
ba.hasWrite = true
ba.sendKey = up.SendKey
}

func (ba *batchAttr) setBatchUDF(up *BatchUDFPolicy) {
ba.filterExp = up.FilterExpression
ba.readAttr = 0
Expand All @@ -284,17 +262,6 @@ func (ba *batchAttr) setBatchUDF(up *BatchUDFPolicy) {
}
}

func (ba *batchAttr) setDelete(dp *BasePolicy) {
ba.filterExp = nil
ba.readAttr = 0
ba.writeAttr = _INFO2_WRITE | _INFO2_RESPOND_ALL_OPS | _INFO2_DELETE
ba.infoAttr = 0
ba.expiration = 0
ba.generation = 0
ba.hasWrite = true
ba.sendKey = dp.SendKey
}

func (ba *batchAttr) setBatchDelete(dp *BatchDeletePolicy) {
ba.filterExp = dp.FilterExpression
ba.readAttr = 0
Expand Down
15 changes: 9 additions & 6 deletions batch_command_operate.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

type batchCommandOperate struct {
batchCommand
client ClientIfc

attr *batchAttr
records []BatchRecordIfc
Expand All @@ -35,6 +36,7 @@ type batchCommandOperate struct {
}

func newBatchCommandOperate(
client ClientIfc,
node *Node,
batch *batchNode,
policy *BatchPolicy,
Expand All @@ -46,6 +48,7 @@ func newBatchCommandOperate(
policy: policy,
batch: batch,
},
client: client,
records: records,
}
return res
Expand All @@ -68,7 +71,7 @@ func (cmd *batchCommandOperate) cloneBatchCommand(batch *batchNode) batcher {
}

func (cmd *batchCommandOperate) writeBuffer(ifc command) Error {
attr, err := cmd.setBatchOperateIfc(cmd.policy, cmd.records, cmd.batch)
attr, err := cmd.setBatchOperateIfc(cmd.client, cmd.policy, cmd.records, cmd.batch)
cmd.attr = attr
return err
}
Expand Down Expand Up @@ -242,18 +245,18 @@ func (cmd *batchCommandOperate) executeSingle(client *Client) Error {
} else if len(ops) == 0 {
ops = append(ops, GetOp())
}
res, err = client.Operate(br.Policy.toWritePolicy(cmd.policy), br.Key, ops...)
res, err = client.Operate(cmd.client.getUsableBatchReadPolicy(br.Policy).toWritePolicy(cmd.policy), br.Key, ops...)
case *BatchWrite:
policy := br.Policy.toWritePolicy(cmd.policy)
policy := cmd.client.getUsableBatchWritePolicy(br.Policy).toWritePolicy(cmd.policy)
policy.RespondPerEachOp = true
res, err = client.Operate(policy, br.Key, br.Ops...)
br.setRecord(res)
case *BatchDelete:
policy := br.Policy.toWritePolicy(cmd.policy)
policy := cmd.client.getUsableBatchDeletePolicy(br.Policy).toWritePolicy(cmd.policy)
res, err = client.Operate(policy, br.Key, DeleteOp())
br.setRecord(res)
case *BatchUDF:
policy := br.Policy.toWritePolicy(cmd.policy)
policy := cmd.client.getUsableBatchUDFPolicy(br.Policy).toWritePolicy(cmd.policy)
policy.RespondPerEachOp = true
res, err = client.execute(policy, br.Key, br.PackageName, br.FunctionName, br.FunctionArgs...)
}
Expand Down Expand Up @@ -283,7 +286,7 @@ func (cmd *batchCommandOperate) executeSingle(client *Client) Error {
}

func (cmd *batchCommandOperate) Execute() Error {
if len(cmd.records) == 1 {
if cmd.objects == nil && len(cmd.records) == 1 {
return cmd.executeSingle(cmd.node.cluster.client)
}
return cmd.execute(cmd)
Expand Down
15 changes: 14 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type Client struct {
// DefaultBatchPolicy is the default parent policy used in batch read commands. Base policy fields
// include socketTimeout, totalTimeout, maxRetries, etc...
DefaultBatchPolicy *BatchPolicy
// DefaultBatchReadPolicy is the default read policy used in batch operate commands.
DefaultBatchReadPolicy *BatchReadPolicy
// DefaultBatchWritePolicy is the default write policy used in batch operate commands.
// Write policy fields include generation, expiration, durableDelete, etc...
DefaultBatchWritePolicy *BatchWritePolicy
Expand Down Expand Up @@ -97,6 +99,7 @@ func NewClientWithPolicyAndHost(policy *ClientPolicy, hosts ...*Host) (*Client,
cluster: cluster,
DefaultPolicy: NewPolicy(),
DefaultBatchPolicy: NewBatchPolicy(),
DefaultBatchReadPolicy: NewBatchReadPolicy(),
DefaultBatchWritePolicy: NewBatchWritePolicy(),
DefaultBatchDeletePolicy: NewBatchDeletePolicy(),
DefaultBatchUDFPolicy: NewBatchUDFPolicy(),
Expand Down Expand Up @@ -656,7 +659,7 @@ func (clnt *Client) BatchOperate(policy *BatchPolicy, records []BatchRecordIfc)
return err
}

cmd := newBatchCommandOperate(nil, nil, policy, records)
cmd := newBatchCommandOperate(clnt, nil, nil, policy, records)
_, err = clnt.batchExecute(policy, batchNodes, cmd)
return err
}
Expand Down Expand Up @@ -1857,6 +1860,16 @@ func (clnt *Client) getUsableBaseBatchWritePolicy(policy *BatchPolicy) *BatchPol
return policy
}

func (clnt *Client) getUsableBatchReadPolicy(policy *BatchReadPolicy) *BatchReadPolicy {
if policy == nil {
if clnt.DefaultBatchReadPolicy != nil {
return clnt.DefaultBatchReadPolicy
}
return NewBatchReadPolicy()
}
return policy
}

func (clnt *Client) getUsableBatchWritePolicy(policy *BatchWritePolicy) *BatchWritePolicy {
if policy == nil {
if clnt.DefaultBatchWritePolicy != nil {
Expand Down
13 changes: 13 additions & 0 deletions client_ifc.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,19 @@ type ClientIfc interface {

// TODO: Synchronization here for the sake of dynamic config in the future

getUsablePolicy(*BasePolicy) *BasePolicy
getUsableWritePolicy(*WritePolicy) *WritePolicy
getUsableScanPolicy(*ScanPolicy) *ScanPolicy
getUsableQueryPolicy(*QueryPolicy) *QueryPolicy
getUsableAdminPolicy(*AdminPolicy) *AdminPolicy
getUsableInfoPolicy(*InfoPolicy) *InfoPolicy

getUsableBatchPolicy(*BatchPolicy) *BatchPolicy
getUsableBatchReadPolicy(*BatchReadPolicy) *BatchReadPolicy
getUsableBatchWritePolicy(*BatchWritePolicy) *BatchWritePolicy
getUsableBatchDeletePolicy(*BatchDeletePolicy) *BatchDeletePolicy
getUsableBatchUDFPolicy(*BatchUDFPolicy) *BatchUDFPolicy

GetDefaultPolicy() *BasePolicy
GetDefaultBatchPolicy() *BatchPolicy
GetDefaultBatchWritePolicy() *BatchWritePolicy
Expand Down
27 changes: 5 additions & 22 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ func (cmd *baseCommand) setUdf(policy *WritePolicy, key *Key, packageName string
return nil
}

func (cmd *baseCommand) setBatchOperateIfc(policy *BatchPolicy, records []BatchRecordIfc, batch *batchNode) (*batchAttr, Error) {
func (cmd *baseCommand) setBatchOperateIfc(client ClientIfc, policy *BatchPolicy, records []BatchRecordIfc, batch *batchNode) (*batchAttr, Error) {
offsets := batch.offsets
max := len(batch.offsets)

Expand Down Expand Up @@ -706,12 +706,7 @@ func (cmd *baseCommand) setBatchOperateIfc(policy *BatchPolicy, records []BatchR
case _BRT_BATCH_READ:
br := record.(*BatchRead)

if br.Policy != nil {
attr.setBatchRead(br.Policy)
} else {
attr.setRead(policy)
}

attr.setBatchRead(client.getUsableBatchReadPolicy(br.Policy))
if len(br.BinNames) > 0 {
cmd.writeBatchBinNames(key, br.BinNames, attr, attr.filterExp)
} else if br.Ops != nil {
Expand All @@ -725,22 +720,14 @@ func (cmd *baseCommand) setBatchOperateIfc(policy *BatchPolicy, records []BatchR
case _BRT_BATCH_WRITE:
bw := record.(*BatchWrite)

if bw.Policy != nil {
attr.setBatchWrite(bw.Policy)
} else {
attr.setWrite(&policy.BasePolicy)
}
attr.setBatchWrite(client.getUsableBatchWritePolicy(bw.Policy))
attr.adjustWrite(bw.Ops)
cmd.writeBatchOperations(key, bw.Ops, attr, attr.filterExp)

case _BRT_BATCH_UDF:
bu := record.(*BatchUDF)

if bu.Policy != nil {
attr.setBatchUDF(bu.Policy)
} else {
attr.setUDF(&policy.BasePolicy)
}
attr.setBatchUDF(client.getUsableBatchUDFPolicy(bu.Policy))
cmd.writeBatchWrite(key, attr, attr.filterExp, 3, 0)
cmd.writeFieldString(bu.PackageName, UDF_PACKAGE_NAME)
cmd.writeFieldString(bu.FunctionName, UDF_FUNCTION)
Expand All @@ -749,11 +736,7 @@ func (cmd *baseCommand) setBatchOperateIfc(policy *BatchPolicy, records []BatchR
case _BRT_BATCH_DELETE:
bd := record.(*BatchDelete)

if bd.Policy != nil {
attr.setBatchDelete(bd.Policy)
} else {
attr.setDelete(&policy.BasePolicy)
}
attr.setBatchDelete(client.getUsableBatchDeletePolicy(bd.Policy))
cmd.writeBatchWrite(key, attr, attr.filterExp, 0, 0)
}
prev = record
Expand Down
29 changes: 22 additions & 7 deletions proxy_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type ProxyClient struct {
// DefaultBatchPolicy is the default parent policy used in batch read commands. Base policy fields
// include socketTimeout, totalTimeout, maxRetries, etc...
DefaultBatchPolicy *BatchPolicy
// DefaultBatchReadPolicy is the default read policy used in batch operate commands.
DefaultBatchReadPolicy *BatchReadPolicy
// DefaultBatchWritePolicy is the default write policy used in batch operate commands.
// Write policy fields include generation, expiration, durableDelete, etc...
DefaultBatchWritePolicy *BatchWritePolicy
Expand Down Expand Up @@ -98,13 +100,15 @@ func NewProxyClientWithPolicyAndHost(policy *ClientPolicy, host *Host, dialOptio

DefaultPolicy: NewPolicy(),
DefaultBatchPolicy: NewBatchPolicy(),
DefaultBatchReadPolicy: NewBatchReadPolicy(),
DefaultBatchWritePolicy: NewBatchWritePolicy(),
DefaultBatchDeletePolicy: NewBatchDeletePolicy(),
DefaultBatchUDFPolicy: NewBatchUDFPolicy(),
DefaultWritePolicy: NewWritePolicy(0, 0),
DefaultScanPolicy: NewScanPolicy(),
DefaultQueryPolicy: NewQueryPolicy(),
DefaultAdminPolicy: NewAdminPolicy(),
DefaultInfoPolicy: NewInfoPolicy(),
}

if policy.RequiresAuthentication() {
Expand Down Expand Up @@ -576,10 +580,11 @@ func (clnt *ProxyClient) GetHeader(policy *BasePolicy, key *Key) (*Record, Error
// The policy can be used to specify timeouts.
// If the policy is nil, the default relevant policy will be used.
func (clnt *ProxyClient) BatchGet(policy *BatchPolicy, keys []*Key, binNames ...string) ([]*Record, Error) {
policy = clnt.getUsableBatchPolicy(policy)
batchRecordsIfc := make([]BatchRecordIfc, 0, len(keys))
batchRecords := make([]*BatchRecord, 0, len(keys))
for _, key := range keys {
batchRead, batchRecord := newBatchRead(nil, key, binNames)
batchRead, batchRecord := newBatchRead(clnt.DefaultBatchReadPolicy, key, binNames)
batchRecordsIfc = append(batchRecordsIfc, batchRead)
batchRecords = append(batchRecords, batchRecord)
}
Expand All @@ -603,10 +608,11 @@ func (clnt *ProxyClient) BatchGet(policy *BatchPolicy, keys []*Key, binNames ...
//
// If a batch request to a node fails, the entire batch is cancelled.
func (clnt *ProxyClient) BatchGetOperate(policy *BatchPolicy, keys []*Key, ops ...*Operation) ([]*Record, Error) {
policy = clnt.getUsableBatchPolicy(policy)
batchRecordsIfc := make([]BatchRecordIfc, 0, len(keys))
batchRecords := make([]*BatchRecord, 0, len(keys))
for _, key := range keys {
batchRead, batchRecord := newBatchReadOps(nil, key, ops...)
batchRead, batchRecord := newBatchReadOps(clnt.DefaultBatchReadPolicy, key, ops...)
batchRecordsIfc = append(batchRecordsIfc, batchRead)
batchRecords = append(batchRecords, batchRecord)
}
Expand All @@ -631,6 +637,7 @@ func (clnt *ProxyClient) BatchGetOperate(policy *BatchPolicy, keys []*Key, ops .
// The policy can be used to specify timeouts and maximum concurrent goroutines.
// This method requires Aerospike Server version >= 3.6.0.
func (clnt *ProxyClient) BatchGetComplex(policy *BatchPolicy, records []*BatchRead) Error {
policy = clnt.getUsableBatchPolicy(policy)
batchRecordsIfc := make([]BatchRecordIfc, 0, len(records))
for _, record := range records {
batchRecordsIfc = append(batchRecordsIfc, record)
Expand All @@ -650,18 +657,16 @@ func (clnt *ProxyClient) BatchGetComplex(policy *BatchPolicy, records []*BatchRe
// The policy can be used to specify timeouts.
// If the policy is nil, the default relevant policy will be used.
func (clnt *ProxyClient) BatchGetHeader(policy *BatchPolicy, keys []*Key) ([]*Record, Error) {
policy = clnt.getUsableBatchPolicy(policy)
batchRecordsIfc := make([]BatchRecordIfc, 0, len(keys))
for _, key := range keys {
batchRecordsIfc = append(batchRecordsIfc, NewBatchReadHeader(nil, key))
batchRecordsIfc = append(batchRecordsIfc, NewBatchReadHeader(clnt.DefaultBatchReadPolicy, key))
}

filteredOut, err := clnt.batchOperate(policy, batchRecordsIfc)
records := make([]*Record, 0, len(keys))
for i := range batchRecordsIfc {
records = append(records, batchRecordsIfc[i].BatchRec().Record)
// if nerr := batchRecordsIfc[i].BatchRec().Err; nerr != nil {
// err = chainErrors(err, nerr)
// }
}

if filteredOut > 0 {
Expand Down Expand Up @@ -702,7 +707,7 @@ func (clnt *ProxyClient) batchOperate(policy *BatchPolicy, records []BatchRecord
return 0, err
}

cmd := newBatchCommandOperate(nil, batchNode, policy, records)
cmd := newBatchCommandOperate(clnt, nil, batchNode, policy, records)
return cmd.filteredOutCnt, cmd.ExecuteGRPC(clnt)
}

Expand Down Expand Up @@ -1240,6 +1245,16 @@ func (clnt *ProxyClient) getUsableBaseBatchWritePolicy(policy *BatchPolicy) *Bat
return policy
}

func (clnt *ProxyClient) getUsableBatchReadPolicy(policy *BatchReadPolicy) *BatchReadPolicy {
if policy == nil {
if clnt.DefaultBatchReadPolicy != nil {
return clnt.DefaultBatchReadPolicy
}
return NewBatchReadPolicy()
}
return policy
}

func (clnt *ProxyClient) getUsableBatchWritePolicy(policy *BatchWritePolicy) *BatchWritePolicy {
if policy == nil {
if clnt.DefaultBatchWritePolicy != nil {
Expand Down
2 changes: 1 addition & 1 deletion proxy_client_reflect.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (clnt *ProxyClient) BatchGetObjects(policy *BatchPolicy, keys []*Key, objec
return nil, err
}

cmd := newBatchCommandOperate(nil, batchNode, policy, batchRecordsIfc)
cmd := newBatchCommandOperate(clnt, nil, batchNode, policy, batchRecordsIfc)

objectsFound := make([]bool, len(keys))
cmd.objects = objectsVal
Expand Down

0 comments on commit b8779e5

Please sign in to comment.