Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZDM-522 Add size limit to PS Cache #99

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/datastax/go-cassandra-native-protocol v0.0.0-20220525125956-6158d9e218b8
github.com/gocql/gocql v0.0.0-20200624222514-34081eda590e
github.com/google/uuid v1.1.1
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/jpillora/backoff v1.0.0
github.com/kelseyhightower/envconfig v1.4.0
github.com/prometheus/client_golang v1.3.0
Expand All @@ -24,6 +25,7 @@ require (
github.com/golang/snappy v0.0.3 // indirect
github.com/google/go-cmp v0.5.2 // indirect
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect
github.com/kr/pretty v0.2.1 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
Expand Down
1 change: 1 addition & 0 deletions integration-tests/setup/testcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ func NewTestConfig(originHost string, targetHost string) *config.Config {

conf.ProxyMaxClientConnections = 1000
conf.ProxyMaxStreamIds = 2048
conf.ProxyMaxPreparedStatementCacheSize = 5000
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we know what this limit is on the server side? We don't have to match it but it would probably be good to know before we decide which limit to set as the default on the proxy

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The limit is based on size, rather than number of prepared statements: The default calculated value is 1/256th of the heap or 10 MB, whichever is greater.

When I set the default to 5000, my intention was to keep the PS cache memory footprint relatively small, given that the proxy usually runs on instances with limited resources. It is quite unlikely for applications to create a large number of prepared statements if they are using them correctly, so even if a user has multiple applications using the proxy I thought that this should be a reasonable value. On the other hand, the footprint of each statement in the proxy PS cache maps is small, so choosing a default value that is a bit higher should also be fine.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So for a 16GB heap the limit would be about 60MB, I think it would be good to get an estimate of what size an average prepared statement has on our cache so we can come up with a good value for this limit instead of guessing blindly. 5000 sounds a bit too low but without any data on the size that each statement takes I'm just guessing blindly. Is there server metrics for the prepared cache size? We could use those metrics in a benchmark to get some data that would help us come up with a good limit.


conf.RequestResponseMaxWorkers = -1
conf.WriteMaxWorkers = -1
Expand Down
11 changes: 6 additions & 5 deletions proxy/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,12 @@ type Config struct {

// Proxy bucket

ProxyListenAddress string `default:"localhost" split_words:"true"`
ProxyListenPort int `default:"14002" split_words:"true"`
ProxyRequestTimeoutMs int `default:"10000" split_words:"true"`
ProxyMaxClientConnections int `default:"1000" split_words:"true"`
ProxyMaxStreamIds int `default:"2048" split_words:"true"`
ProxyListenAddress string `default:"localhost" split_words:"true"`
ProxyListenPort int `default:"14002" split_words:"true"`
ProxyRequestTimeoutMs int `default:"10000" split_words:"true"`
ProxyMaxClientConnections int `default:"1000" split_words:"true"`
ProxyMaxStreamIds int `default:"2048" split_words:"true"`
ProxyMaxPreparedStatementCacheSize int `default:"5000" split_words:"true"`

ProxyTlsCaPath string `split_words:"true"`
ProxyTlsCertPath string `split_words:"true"`
Expand Down
16 changes: 8 additions & 8 deletions proxy/pkg/zdmproxy/cqlparser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ func TestInspectFrame(t *testing.T) {
targetPreparedId: []byte("LOCAL"),
prepareRequestInfo: NewPrepareRequestInfo(NewInterceptedRequestInfo(local, newStarSelectClause()), nil, false, "SELECT * FROM system.local", ""),
}
psCache := NewPreparedStatementCache()
psCache.cache["BOTH"] = bothCacheEntry
psCache.cache["ORIGIN"] = originCacheEntry
psCache.cache["TARGET"] = targetCacheEntry
psCache.interceptedCache["PEERS"] = peersCacheEntry
psCache.interceptedCache["PEERS_KS"] = peersKsCacheEntry
psCache.interceptedCache["LOCAL"] = localCacheEntry
psCache.interceptedCache["LOCAL_KS"] = localKsCacheEntry
psCache := createPSCacheForTests(t)
psCache.cache.Add("BOTH", bothCacheEntry)
psCache.cache.Add("ORIGIN", originCacheEntry)
psCache.cache.Add("TARGET", targetCacheEntry)
psCache.interceptedCache.Add("PEERS", peersCacheEntry)
psCache.interceptedCache.Add("PEERS_KS", peersKsCacheEntry)
psCache.interceptedCache.Add("LOCAL", localCacheEntry)
psCache.interceptedCache.Add("LOCAL_KS", localKsCacheEntry)
mh := newFakeMetricHandler()
km := ""
primaryClusterTarget := common.ClusterTypeTarget
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func getGeneralParamsForTests(t *testing.T) params {
require.Nil(t, err)

return params{
psCache: NewPreparedStatementCache(),
psCache: createPSCacheForTests(t),
mh: newFakeMetricHandler(),
kn: "",
primaryCluster: common.ClusterTypeOrigin,
Expand All @@ -38,6 +38,12 @@ func getGeneralParamsForTests(t *testing.T) params {
}
}

func createPSCacheForTests(t *testing.T) *PreparedStatementCache {
psCache, err := NewPreparedStatementCache(1000)
require.Nil(t, err)
return psCache
}

func buildQueryMessageForTests(queryString string) *message.Query {
return &message.Query{
Query: queryString,
Expand Down
5 changes: 4 additions & 1 deletion proxy/pkg/zdmproxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,10 @@ func (p *ZdmProxy) initializeGlobalStructures() error {
p.globalClientHandlersWg = &sync.WaitGroup{}
p.clientHandlersShutdownRequestCtx, p.clientHandlersShutdownRequestCancelFn = context.WithCancel(context.Background())

p.PreparedStatementCache = NewPreparedStatementCache()
p.PreparedStatementCache, err = NewPreparedStatementCache(p.Conf.ProxyMaxPreparedStatementCacheSize)
if err != nil {
return err
}

p.controlConnShutdownCtx, p.controlConnCancelFn = context.WithCancel(context.Background())
p.controlConnShutdownWg = &sync.WaitGroup{}
Expand Down
60 changes: 41 additions & 19 deletions proxy/pkg/zdmproxy/pscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,49 @@ import (
"encoding/hex"
"fmt"
"github.com/datastax/go-cassandra-native-protocol/message"
lru "github.com/hashicorp/golang-lru"
log "github.com/sirupsen/logrus"
"sync"
)

type PreparedStatementCache struct {
cache map[string]PreparedData // Map containing the prepared queries (raw bytes) keyed on prepareId
index map[string]string // Map that can be used as an index to look up origin prepareIds by target prepareId
cache *lru.Cache // Map containing the prepared queries (raw bytes) keyed on prepareId
index *lru.Cache // Map that can be used as an index to look up origin prepareIds by target prepareId

interceptedCache map[string]PreparedData // Map containing the prepared queries for intercepted requests
interceptedCache *lru.Cache // Map containing the prepared queries for intercepted requests

lock *sync.RWMutex
}

func NewPreparedStatementCache() *PreparedStatementCache {
func NewPreparedStatementCache(maxSize int) (*PreparedStatementCache, error) {
cache, err := lru.New(maxSize)
if err != nil {
return nil, fmt.Errorf("error initializing the PreparedStatementCache cache map: %v", err)
}

index, err := lru.New(maxSize)
if err != nil {
return nil, fmt.Errorf("error initializing the PreparedStatementCache index map: %v", err)
}

interceptedCache, err := lru.New(maxSize)
if err != nil {
return nil, fmt.Errorf("error initializing the PreparedStatementCache interceptedCache map: %v", err)
}

return &PreparedStatementCache{
cache: make(map[string]PreparedData),
index: make(map[string]string),
interceptedCache: make(map[string]PreparedData),
cache: cache,
index: index,
interceptedCache: interceptedCache,
lock: &sync.RWMutex{},
}
}, nil
}

func (psc PreparedStatementCache) GetPreparedStatementCacheSize() float64 {
psc.lock.RLock()
defer psc.lock.RUnlock()

return float64(len(psc.cache) + len(psc.interceptedCache))
return float64(psc.cache.Len() + psc.interceptedCache.Len())
}

func (psc *PreparedStatementCache) Store(
Expand All @@ -42,8 +58,8 @@ func (psc *PreparedStatementCache) Store(
psc.lock.Lock()
defer psc.lock.Unlock()

psc.cache[originPrepareIdStr] = NewPreparedData(originPreparedResult, targetPreparedResult, prepareRequestInfo)
psc.index[targetPrepareIdStr] = originPrepareIdStr
psc.cache.Add(originPrepareIdStr, NewPreparedData(originPreparedResult, targetPreparedResult, prepareRequestInfo))
psc.index.Add(targetPrepareIdStr, originPrepareIdStr)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm what happens if the limit is reached and the key that is selected to be evicted on one cache is different from the key that was selected on the other cache?

I'm not sure if bad behavior can happen if these two caches are out of sync or if it's fine...

If we do need both caches to be in sync then a potential alternative would be to provide an eviction callback to the cache that removes that key from the index cache. This way we could revert the index cache to a normal map (with our own locks) and perform the eviction ourselves.


log.Debugf("Storing PS cache entry: {OriginPreparedId=%v, TargetPreparedId: %v, RequestInfo: %v}",
hex.EncodeToString(originPreparedResult.PreparedQueryId), hex.EncodeToString(targetPreparedResult.PreparedQueryId), prepareRequestInfo)
Expand All @@ -55,7 +71,7 @@ func (psc *PreparedStatementCache) StoreIntercepted(preparedResult *message.Prep
defer psc.lock.Unlock()

preparedData := NewPreparedData(preparedResult, preparedResult, prepareRequestInfo)
psc.interceptedCache[prepareIdStr] = preparedData
psc.interceptedCache.Add(prepareIdStr, preparedData)

log.Debugf("Storing intercepted PS cache entry: {PreparedId=%v, RequestInfo: %v}",
hex.EncodeToString(preparedResult.PreparedQueryId), prepareRequestInfo)
Expand All @@ -64,31 +80,37 @@ func (psc *PreparedStatementCache) StoreIntercepted(preparedResult *message.Prep
func (psc *PreparedStatementCache) Get(originPreparedId []byte) (PreparedData, bool) {
psc.lock.RLock()
defer psc.lock.RUnlock()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to reevaluate these locks. This new cache implementation already does locking so it would be great if we could remove our locks.

data, ok := psc.cache[string(originPreparedId)]
if !ok {
data, ok = psc.interceptedCache[string(originPreparedId)]
data, ok := psc.cache.Get(string(originPreparedId))
if ok {
return data.(PreparedData), true
}
return data, ok

data, ok = psc.interceptedCache.Get(string(originPreparedId))
if ok {
return data.(PreparedData), true
}

return nil, false
}

func (psc *PreparedStatementCache) GetByTargetPreparedId(targetPreparedId []byte) (PreparedData, bool) {
psc.lock.RLock()
defer psc.lock.RUnlock()

originPreparedId, ok := psc.index[string(targetPreparedId)]
originPreparedId, ok := psc.index.Get(string(targetPreparedId))
if !ok {
// Don't bother attempting a lookup on the intercepted cache because this method should only be used to handle UNPREPARED responses
return nil, false
}

data, ok := psc.cache[originPreparedId]
data, ok := psc.cache.Get(originPreparedId)
if !ok {
log.Errorf("Could not get prepared data by target id even though there is an entry on the index map. "+
"This is most likely a bug. OriginPreparedId = %v, TargetPreparedId = %v", originPreparedId, targetPreparedId)
return nil, false
}

return data, true
return data.(PreparedData), true
}

type PreparedData interface {
Expand Down
Loading