Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve memory leak caused by circular reference in client finalizer #441

Merged
merged 1 commit into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion batch_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,18 @@ type batcher interface {
generateBatchNodes(*Cluster) ([]*batchNode, Error)
setSequence(int, int)

executeSingle(*Client) Error
executeSingle(clientIfc) Error
}

type clientIfc interface {
ClientIfc
execute(policy *WritePolicy, key *Key, packageName string, functionName string, args ...Value) (*Record, Error)
}

type batchCommand struct {
baseMultiCommand

client clientIfc
batch *batchNode
policy *BatchPolicy
sequenceAP int
Expand Down
12 changes: 9 additions & 3 deletions batch_command_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,22 @@ type batchCommandDelete struct {
}

func newBatchCommandDelete(
node *Node,
client clientIfc,
batch *batchNode,
policy *BatchPolicy,
batchDeletePolicy *BatchDeletePolicy,
keys []*Key,
records []*BatchRecord,
attr *batchAttr,
) *batchCommandDelete {
var node *Node
if batch != nil {
node = batch.Node
}

res := &batchCommandDelete{
batchCommand: batchCommand{
client: client,
baseMultiCommand: *newMultiCommand(node, nil, false),
policy: policy,
batch: batch,
Expand Down Expand Up @@ -168,7 +174,7 @@ func (cmd *batchCommandDelete) transactionType() transactionType {
return ttBatchWrite
}

func (cmd *batchCommandDelete) executeSingle(client *Client) Error {
func (cmd *batchCommandDelete) executeSingle(client clientIfc) Error {
policy := cmd.batchDeletePolicy.toWritePolicy(cmd.policy)
for i, key := range cmd.keys {
res, err := client.Operate(policy, key, DeleteOp())
Expand Down Expand Up @@ -197,7 +203,7 @@ func (cmd *batchCommandDelete) executeSingle(client *Client) Error {

func (cmd *batchCommandDelete) Execute() Error {
if len(cmd.keys) == 1 {
return cmd.executeSingle(cmd.node.cluster.client)
return cmd.executeSingle(cmd.client)
}
return cmd.execute(cmd)
}
Expand Down
12 changes: 9 additions & 3 deletions batch_command_exists.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,20 @@ type batchCommandExists struct {
}

func newBatchCommandExists(
node *Node,
client clientIfc,
batch *batchNode,
policy *BatchPolicy,
keys []*Key,
existsArray []bool,
) *batchCommandExists {
var node *Node
if batch != nil {
node = batch.Node
}

res := &batchCommandExists{
batchCommand: batchCommand{
client: client,
baseMultiCommand: *newMultiCommand(node, nil, false),
policy: policy,
batch: batch,
Expand Down Expand Up @@ -110,7 +116,7 @@ func (cmd *batchCommandExists) transactionType() transactionType {
return ttBatchRead
}

func (cmd *batchCommandExists) executeSingle(client *Client) Error {
func (cmd *batchCommandExists) executeSingle(client clientIfc) Error {
var err Error
for _, offset := range cmd.batch.offsets {
cmd.existsArray[offset], err = client.Exists(&cmd.policy.BasePolicy, cmd.keys[offset])
Expand All @@ -136,7 +142,7 @@ func (cmd *batchCommandExists) executeSingle(client *Client) Error {

func (cmd *batchCommandExists) Execute() Error {
if len(cmd.batch.offsets) == 1 {
return cmd.executeSingle(cmd.node.cluster.client)
return cmd.executeSingle(cmd.client)
}
return cmd.execute(cmd)
}
Expand Down
13 changes: 9 additions & 4 deletions batch_command_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ var batchObjectParser func(
) Error

func newBatchCommandGet(
node *Node,
client clientIfc,
batch *batchNode,
policy *BatchPolicy,
keys []*Key,
Expand All @@ -65,8 +65,14 @@ func newBatchCommandGet(
readAttr int,
isOperation bool,
) *batchCommandGet {
var node *Node
if batch != nil {
node = batch.Node
}

res := &batchCommandGet{
batchCommand: batchCommand{
client: client,
baseMultiCommand: *newMultiCommand(node, nil, isOperation),
policy: policy,
batch: batch,
Expand Down Expand Up @@ -158,7 +164,6 @@ func (cmd *batchCommandGet) parseRecordResults(ifc command, receiveSize int) (bo
cmd.objectsFound[batchIndex] = true
if err := batchObjectParser(cmd, batchIndex, opCount, fieldCount, generation, expiration); err != nil {
return false, err

}
}
}
Expand Down Expand Up @@ -216,7 +221,7 @@ func (cmd *batchCommandGet) transactionType() transactionType {
return ttBatchRead
}

func (cmd *batchCommandGet) executeSingle(client *Client) Error {
func (cmd *batchCommandGet) executeSingle(client clientIfc) Error {
for _, offset := range cmd.batch.offsets {
var err Error
if len(cmd.ops) > 0 {
Expand Down Expand Up @@ -254,7 +259,7 @@ func (cmd *batchCommandGet) executeSingle(client *Client) Error {

func (cmd *batchCommandGet) Execute() Error {
if cmd.objects == nil && len(cmd.batch.offsets) == 1 {
return cmd.executeSingle(cmd.node.cluster.client)
return cmd.executeSingle(cmd.client)
}
return cmd.execute(cmd)
}
Expand Down
17 changes: 9 additions & 8 deletions batch_command_operate.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

type batchCommandOperate struct {
batchCommand
client ClientIfc

attr *batchAttr
records []BatchRecordIfc
Expand All @@ -36,19 +35,23 @@ type batchCommandOperate struct {
}

func newBatchCommandOperate(
client ClientIfc,
node *Node,
client clientIfc,
batch *batchNode,
policy *BatchPolicy,
records []BatchRecordIfc,
) *batchCommandOperate {
var node *Node
if batch != nil {
node = batch.Node
}

res := &batchCommandOperate{
batchCommand: batchCommand{
client: client,
baseMultiCommand: *newMultiCommand(node, nil, true),
policy: policy,
batch: batch,
},
client: client,
records: records,
}
return res
Expand Down Expand Up @@ -228,7 +231,7 @@ func (cmd *batchCommandOperate) parseRecord(key *Key, opCount int, generation, e
return newRecord(cmd.node, key, bins, generation, expiration), nil
}

func (cmd *batchCommandOperate) executeSingle(client *Client) Error {
func (cmd *batchCommandOperate) executeSingle(client clientIfc) Error {
var res *Record
var err Error
for _, br := range cmd.records {
Expand All @@ -250,11 +253,9 @@ func (cmd *batchCommandOperate) executeSingle(client *Client) Error {
policy := cmd.client.getUsableBatchWritePolicy(br.Policy).toWritePolicy(cmd.policy)
policy.RespondPerEachOp = true
res, err = client.Operate(policy, br.Key, br.Ops...)
br.setRecord(res)
case *BatchDelete:
policy := cmd.client.getUsableBatchDeletePolicy(br.Policy).toWritePolicy(cmd.policy)
res, err = client.Operate(policy, br.Key, DeleteOp())
br.setRecord(res)
case *BatchUDF:
policy := cmd.client.getUsableBatchUDFPolicy(br.Policy).toWritePolicy(cmd.policy)
policy.RespondPerEachOp = true
Expand Down Expand Up @@ -287,7 +288,7 @@ func (cmd *batchCommandOperate) executeSingle(client *Client) Error {

func (cmd *batchCommandOperate) Execute() Error {
if cmd.objects == nil && len(cmd.records) == 1 {
return cmd.executeSingle(cmd.node.cluster.client)
return cmd.executeSingle(cmd.client)
}
return cmd.execute(cmd)
}
Expand Down
25 changes: 16 additions & 9 deletions batch_command_udf.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type batchCommandUDF struct {
}

func newBatchCommandUDF(
node *Node,
client clientIfc,
batch *batchNode,
policy *BatchPolicy,
batchUDFPolicy *BatchUDFPolicy,
Expand All @@ -43,18 +43,25 @@ func newBatchCommandUDF(
records []*BatchRecord,
attr *batchAttr,
) *batchCommandUDF {
var node *Node
if batch != nil {
node = batch.Node
}

res := &batchCommandUDF{
batchCommand: batchCommand{
client: client,
baseMultiCommand: *newMultiCommand(node, nil, false),
policy: policy,
batch: batch,
},
keys: keys,
records: records,
packageName: packageName,
functionName: functionName,
args: args,
attr: attr,
batchUDFPolicy: batchUDFPolicy,
keys: keys,
records: records,
packageName: packageName,
functionName: functionName,
args: args,
attr: attr,
}
return res
}
Expand Down Expand Up @@ -176,7 +183,7 @@ func (cmd *batchCommandUDF) isRead() bool {
return !cmd.attr.hasWrite
}

func (cmd *batchCommandUDF) executeSingle(client *Client) Error {
func (cmd *batchCommandUDF) executeSingle(client clientIfc) Error {
for i, key := range cmd.keys {
policy := cmd.batchUDFPolicy.toWritePolicy(cmd.policy)
policy.RespondPerEachOp = true
Expand Down Expand Up @@ -206,7 +213,7 @@ func (cmd *batchCommandUDF) executeSingle(client *Client) Error {

func (cmd *batchCommandUDF) Execute() Error {
if len(cmd.keys) == 1 {
return cmd.executeSingle(cmd.node.cluster.client)
return cmd.executeSingle(cmd.client)
}
return cmd.execute(cmd)
}
Expand Down
6 changes: 4 additions & 2 deletions batch_index_command_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type batchIndexCommandGet struct {
}

func newBatchIndexCommandGet(
client clientIfc,
batch *batchNode,
policy *BatchPolicy,
records []*BatchRead,
Expand All @@ -34,6 +35,7 @@ func newBatchIndexCommandGet(
res := &batchIndexCommandGet{
batchCommandGet{
batchCommand: batchCommand{
client: client,
baseMultiCommand: *newMultiCommand(node, nil, isOperation),
policy: policy,
batch: batch,
Expand All @@ -59,12 +61,12 @@ func (cmd *batchIndexCommandGet) writeBuffer(ifc command) Error {

func (cmd *batchIndexCommandGet) Execute() Error {
if len(cmd.batch.offsets) == 1 {
return cmd.executeSingle(cmd.node.cluster.client)
return cmd.executeSingle(cmd.client)
}
return cmd.execute(cmd)
}

func (cmd *batchIndexCommandGet) executeSingle(client *Client) Error {
func (cmd *batchIndexCommandGet) executeSingle(client clientIfc) Error {
for i, br := range cmd.indexRecords {
var ops []*Operation
if br.headerOnly() {
Expand Down
20 changes: 8 additions & 12 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,8 @@ func NewClientWithPolicyAndHost(policy *ClientPolicy, hosts ...*Host) (*Client,
DefaultInfoPolicy: NewInfoPolicy(),
}

// back reference especially used in batch commands
cluster.client = client

runtime.SetFinalizer(client, clientFinalizer)
return client, err

}

//-------------------------------------------------------
Expand Down Expand Up @@ -453,7 +449,7 @@ func (clnt *Client) BatchExists(policy *BatchPolicy, keys []*Key) ([]bool, Error
}

// pass nil to make sure it will be cloned and prepared
cmd := newBatchCommandExists(nil, nil, policy, keys, existsArray)
cmd := newBatchCommandExists(clnt, nil, policy, keys, existsArray)
filteredOut, err := clnt.batchExecute(policy, batchNodes, cmd)
if filteredOut > 0 {
err = chainErrors(ErrFilteredOut.err(), err)
Expand Down Expand Up @@ -526,7 +522,7 @@ func (clnt *Client) BatchGet(policy *BatchPolicy, keys []*Key, binNames ...strin
return nil, err
}

cmd := newBatchCommandGet(nil, nil, policy, keys, binNames, nil, records, _INFO1_READ, false)
cmd := newBatchCommandGet(clnt, nil, policy, keys, binNames, nil, records, _INFO1_READ, false)
filteredOut, err := clnt.batchExecute(policy, batchNodes, cmd)
if err != nil && !policy.AllowPartialResults {
return nil, err
Expand Down Expand Up @@ -556,7 +552,7 @@ func (clnt *Client) BatchGetOperate(policy *BatchPolicy, keys []*Key, ops ...*Op
return nil, err
}

cmd := newBatchCommandGet(nil, nil, policy, keys, nil, ops, records, _INFO1_READ, true)
cmd := newBatchCommandGet(clnt, nil, policy, keys, nil, ops, records, _INFO1_READ, true)
filteredOut, err := clnt.batchExecute(policy, batchNodes, cmd)
if err != nil && !policy.AllowPartialResults {
return nil, err
Expand All @@ -578,7 +574,7 @@ func (clnt *Client) BatchGetOperate(policy *BatchPolicy, keys []*Key, ops ...*Op
func (clnt *Client) BatchGetComplex(policy *BatchPolicy, records []*BatchRead) Error {
policy = clnt.getUsableBatchPolicy(policy)

cmd := newBatchIndexCommandGet(nil, policy, records, true)
cmd := newBatchIndexCommandGet(clnt, nil, policy, records, true)

batchNodes, err := newBatchIndexNodeList(clnt.cluster, policy, records)
if err != nil {
Expand Down Expand Up @@ -614,7 +610,7 @@ func (clnt *Client) BatchGetHeader(policy *BatchPolicy, keys []*Key) ([]*Record,
return nil, err
}

cmd := newBatchCommandGet(nil, nil, policy, keys, nil, nil, records, _INFO1_READ|_INFO1_NOBINDATA, false)
cmd := newBatchCommandGet(clnt, nil, policy, keys, nil, nil, records, _INFO1_READ|_INFO1_NOBINDATA, false)
filteredOut, err := clnt.batchExecute(policy, batchNodes, cmd)
if err != nil && !policy.AllowPartialResults {
return nil, err
Expand Down Expand Up @@ -650,7 +646,7 @@ func (clnt *Client) BatchDelete(policy *BatchPolicy, deletePolicy *BatchDeletePo
return nil, err
}

cmd := newBatchCommandDelete(nil, nil, policy, deletePolicy, keys, records, attr)
cmd := newBatchCommandDelete(clnt, nil, policy, deletePolicy, keys, records, attr)
_, err = clnt.batchExecute(policy, batchNodes, cmd)
return records, err
}
Expand All @@ -670,7 +666,7 @@ func (clnt *Client) BatchOperate(policy *BatchPolicy, records []BatchRecordIfc)
return err
}

cmd := newBatchCommandOperate(clnt, nil, nil, policy, records)
cmd := newBatchCommandOperate(clnt, nil, policy, records)
_, err = clnt.batchExecute(policy, batchNodes, cmd)
return err
}
Expand Down Expand Up @@ -701,7 +697,7 @@ func (clnt *Client) BatchExecute(policy *BatchPolicy, udfPolicy *BatchUDFPolicy,
return nil, err
}

cmd := newBatchCommandUDF(nil, nil, policy, udfPolicy, keys, packageName, functionName, args, records, attr)
cmd := newBatchCommandUDF(clnt, nil, policy, udfPolicy, keys, packageName, functionName, args, records, attr)
_, err = clnt.batchExecute(policy, batchNodes, cmd)
return records, err
}
Expand Down
Loading
Loading