forked from lompy/tclientpool
-
Notifications
You must be signed in to change notification settings - Fork 0
/
tclientpool.go
134 lines (110 loc) · 3.52 KB
/
tclientpool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package tclientpool
import (
"context"
"fmt"
"github.com/apache/thrift/lib/go/thrift"
pool "github.com/jolestar/go-commons-pool"
)
const defaultMaxUsageCount = 100
type wrappedClient struct {
transport thrift.TTransport
client thrift.TClient
usageCount int
}
func (c *wrappedClient) Open() error {
return c.transport.Open()
}
func (c *wrappedClient) Close() error {
return c.transport.Close()
}
func (c *wrappedClient) IsOpen() bool {
return c.transport.IsOpen()
}
func (c *wrappedClient) Call(ctx context.Context, method string, args, result thrift.TStruct) error {
return c.client.Call(ctx, method, args, result)
}
// TClientFactory is a function which is used to populate pool with objects.
type TClientFactory func() (thrift.TTransport, thrift.TClient, error)
// pooledObjectFactory implements pool.PoolObjectFactory interface.
type pooledObjectFactory struct {
tClientFactory TClientFactory
maxUsageCount int
}
func (f *pooledObjectFactory) MakeObject(_ctx context.Context) (*pool.PooledObject, error) {
t, c, err := f.tClientFactory()
if err != nil {
return nil, err
}
return pool.NewPooledObject(&wrappedClient{transport: t, client: c}), nil
}
func (f *pooledObjectFactory) DestroyObject(_ctx context.Context, po *pool.PooledObject) error {
return po.Object.(*wrappedClient).Close()
}
func (f *pooledObjectFactory) ValidateObject(_ctx context.Context, po *pool.PooledObject) bool {
client := po.Object.(*wrappedClient)
if f.maxUsageCount < 0 || client.usageCount < f.maxUsageCount {
return client.IsOpen()
}
return false
}
func (f *pooledObjectFactory) ActivateObject(_ctx context.Context, po *pool.PooledObject) error {
return po.Object.(*wrappedClient).Open()
}
func (f *pooledObjectFactory) PassivateObject(_ctx context.Context, po *pool.PooledObject) error {
client := po.Object.(*wrappedClient)
client.usageCount++
return nil
}
// TClientPool implements thrift.TClient interface.
type TClientPool struct {
pool *pool.ObjectPool
}
// Call implements method from thrift.TClient interface.
func (p *TClientPool) Call(ctx context.Context, method string, args, result thrift.TStruct) (err error) {
obj, err := p.pool.BorrowObject(ctx)
if err != nil {
return err
}
defer func() {
if e := p.pool.ReturnObject(ctx, obj); e != nil {
if err == nil {
err = e
} else {
err = fmt.Errorf("%s; %s", err.Error(), e.Error())
}
}
}()
err = obj.(*wrappedClient).Call(ctx, method, args, result)
return
}
// Close destroys all objects in pool, closing all thrift.TTransports.
func (p *TClientPool) Close() {
p.pool.Close(context.Background())
return
}
// TClientPoolOptions contains options for TClientPool
type TClientPoolOptions struct {
Factory TClientFactory
MaxTotal int
MaxUsageCount int
}
// NewTClientPoolWithOptions initializes new TClientPool by TClientFactory and maxTotal of object in pool.
func NewTClientPoolWithOptions(options TClientPoolOptions) *TClientPool {
ctx := context.Background()
maxUsageCount := options.MaxUsageCount
if maxUsageCount == 0 {
maxUsageCount = defaultMaxUsageCount
}
p := pool.NewObjectPoolWithDefaultConfig(ctx, &pooledObjectFactory{options.Factory, maxUsageCount})
p.Config.MaxTotal = options.MaxTotal
p.Config.MaxIdle = options.MaxTotal
p.Config.TestOnBorrow = true
return &TClientPool{p}
}
// NewTClientPool initializes new TClientPool by TClientFactory and maxTotal of object in pool.
func NewTClientPool(f TClientFactory, maxTotal int) *TClientPool {
return NewTClientPoolWithOptions(TClientPoolOptions{
Factory: f,
MaxTotal: maxTotal,
})
}