Skip to content

Commit

Permalink
[CLIENT-3049] Use a specialized pool for grpc conns to prevent premat…
Browse files Browse the repository at this point in the history
…ure reaping

Also fixes namespace parameter for a few code examples.
  • Loading branch information
khaf committed Jul 19, 2024
1 parent dc37254 commit a9a45ac
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 12 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
- name: Test pkg tests
run: go run github.com/onsi/ginkgo/v2/ginkgo -cover -race -r -keep-going -succinct -randomize-suites pkg
- name: Build Benchmark tool
run: cd tools/benchmark | go build -o benchmark .
run: cd tools/benchmark | go build -tags as_proxy -o benchmark .
- name: Build asinfo tool
run: cd tools/asinfo | go build -o asinfo .
- name: Build cli tool
Expand Down
4 changes: 2 additions & 2 deletions example_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
)

func ExampleClient_Add() {
key, err := as.NewKey("test", "test", "addkey")
key, err := as.NewKey(*namespace, "test", "addkey")
if err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -66,7 +66,7 @@ func ExampleClient_Add() {
}

func ExampleClient_Append() {
key, err := as.NewKey("test", "test", "appendkey")
key, err := as.NewKey(*namespace, "test", "appendkey")
if err != nil {
log.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion example_listiter_int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func ExampleListIter_int() {
// }

var v as.Value = as.NewValue(myListInt([]int{1, 2, 3}))
key, err := as.NewKey("test", "test", 1)
key, err := as.NewKey(*namespace, "test", 1)
if err != nil {
log.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion example_listiter_string_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func ExampleListIter_string() {
// }

var v as.Value = as.NewValue(myListString([]string{"a", "b", "c"}))
key, err := as.NewKey("test", "test", 1)
key, err := as.NewKey(*namespace, "test", 1)
if err != nil {
log.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion example_listiter_time_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func ExampleListIter_time() {
now2 := time.Unix(123123124, 0)
now3 := time.Unix(123123125, 0)
var v as.Value = as.NewValue(myListTime([]time.Time{now1, now2, now3}))
key, err := as.NewKey("test", "test", 1)
key, err := as.NewKey(*namespace, "test", 1)
if err != nil {
log.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion example_mapiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func ExampleMapIter() {

now := time.Unix(123123123, 0)
var v as.Value = as.NewValue(myMapStringTime(map[string]time.Time{"now": now}))
key, err := as.NewKey("test", "test", 1)
key, err := as.NewKey(*namespace, "test", 1)
if err != nil {
log.Fatal(err)
}
Expand Down
115 changes: 112 additions & 3 deletions proxy_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const notSupportedInProxyClient = "NOT SUPPORTED IN THE PROXY CLIENT"
type ProxyClient struct {
// only for GRPC
clientPolicy ClientPolicy
grpcConnPool *sync.Pool
grpcConnPool *grpcConnectionHeap
grpcHost *Host
dialOptions []grpc.DialOption

Expand Down Expand Up @@ -96,7 +96,7 @@ func NewProxyClientWithPolicyAndHost(policy *ClientPolicy, host *Host, dialOptio

grpcClient := &ProxyClient{
clientPolicy: *policy,
grpcConnPool: new(sync.Pool),
grpcConnPool: newGrpcConnectionHeap(policy.ConnectionQueueSize),
grpcHost: host,
dialOptions: dialOptions,

Expand Down Expand Up @@ -265,7 +265,7 @@ func (clnt *ProxyClient) setAuthToken(token string) {
func (clnt *ProxyClient) grpcConn() (*grpc.ClientConn, Error) {
pconn := clnt.grpcConnPool.Get()
if pconn != nil {
return pconn.(*grpc.ClientConn), nil
return pconn, nil
}

return clnt.createGrpcConn(!clnt.clientPolicy.RequiresAuthentication())
Expand Down Expand Up @@ -309,6 +309,7 @@ func (clnt *ProxyClient) createGrpcConn(noInterceptor bool) (*grpc.ClientConn, E
// Close closes all Grpcclient connections to database server nodes.
func (clnt *ProxyClient) Close() {
clnt.active.Set(false)
clnt.grpcConnPool.cleanup()
if clnt.authInterceptor != nil {
clnt.authInterceptor.close()
}
Expand Down Expand Up @@ -1410,3 +1411,111 @@ func (clnt *ProxyClient) getUsableInfoPolicy(policy *InfoPolicy) *InfoPolicy {
//-------------------------------------------------------
// Utility Functions
//-------------------------------------------------------

// grpcConnectionHeap is a non-blocking LIFO heap.
// If the heap is empty, nil is returned.
// if the heap is full, offer will return false
type grpcConnectionHeap struct {
head, tail uint32
data []*grpc.ClientConn
size uint32
full bool
mutex sync.Mutex
}

// newGrpcConnectionHeap creates a new heap with initial size.
func newGrpcConnectionHeap(size int) *grpcConnectionHeap {
if size <= 0 {
panic("Heap size cannot be less than 1")
}

return &grpcConnectionHeap{
full: false,
data: make([]*grpc.ClientConn, uint32(size)),
size: uint32(size),
}
}

func (h *grpcConnectionHeap) cleanup() {
h.mutex.Lock()
defer h.mutex.Unlock()

for i := range h.data {
if h.data[i] != nil {
h.data[i].Close()
}

h.data[i] = nil
}

// make sure offer and poll both fail
h.data = nil
h.full = true
h.head = 0
h.tail = 0
}

// Put adds an item to the heap unless the heap is full.
// In case the heap is full, the item will not be added to the heap
// and false will be returned
func (h *grpcConnectionHeap) Put(conn *grpc.ClientConn) bool {
h.mutex.Lock()

// make sure heap is not full or cleaned up
if h.full || len(h.data) == 0 {
h.mutex.Unlock()
return false
}

h.head = (h.head + 1) % h.size
h.full = (h.head == h.tail)
h.data[h.head] = conn
h.mutex.Unlock()
return true
}

// Poll removes and returns an item from the heap.
// If the heap is empty, nil will be returned.
func (h *grpcConnectionHeap) Get() (res *grpc.ClientConn) {
h.mutex.Lock()

// the heap has been cleaned up
if len(h.data) == 0 {
h.mutex.Unlock()
return nil
}

// if heap is not empty
if (h.tail != h.head) || h.full {
res = h.data[h.head]
h.data[h.head] = nil

h.full = false
if h.head == 0 {
h.head = h.size - 1
} else {
h.head--
}
}

h.mutex.Unlock()
return res
}

// Len returns the number of connections in the heap
func (h *grpcConnectionHeap) Len() int {
cnt := 0
h.mutex.Lock()

if !h.full {
if h.head >= h.tail {
cnt = int(h.head) - int(h.tail)
} else {
cnt = int(h.size) - (int(h.tail) - int(h.head))
}
} else {
cnt = int(h.size)
}
h.mutex.Unlock()
return cnt
}
2 changes: 1 addition & 1 deletion tools/benchmark/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ To build this tool:

```
cd $GOPATH/src/github.com/aerospike/aerospike-client-go/tools/benchmark
go build .
go build -tags as_proxy .
```

To see available switches:
Expand Down
2 changes: 1 addition & 1 deletion types/pool/tiered_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type TieredBufferPool struct {
// Min is Minimum the minimum buffer size.
Min int

// Max is the maximum buffer is. The pool will allocate buffers of that size,
// Max is the maximum buffer size. The pool will allocate buffers of that size,
// But will not store them back.
Max int

Expand Down

0 comments on commit a9a45ac

Please sign in to comment.