Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make zdm-proxy generate its own prepared ids instead of relying on always preparing queries on both clusters #122

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integration-tests/asyncreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func TestAsyncReadsRequestTypes(t *testing.T) {
},
expectedOpCode: primitive.OpCodeResult,
sentPrimary: true,
sentSecondary: true,
sentSecondary: false,
sentAsync: true,
primedQuery: nil,
prepared: true,
Expand Down
407 changes: 261 additions & 146 deletions proxy/pkg/zdmproxy/clienthandler.go

Large diffs are not rendered by default.

30 changes: 24 additions & 6 deletions proxy/pkg/zdmproxy/clusterconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,28 +350,28 @@ func (cc *ClusterConnector) handleAsyncResponse(response *frame.RawFrame) *frame
}
switch msg := errMsg.(type) {
case *message.Unprepared:
var preparedData PreparedData
var preparedEntry PreparedEntry
var ok bool
if cc.clusterType == common.ClusterTypeTarget {
preparedData, ok = cc.psCache.GetByTargetPreparedId(msg.Id)
preparedEntry, ok = cc.psCache.GetByTargetPreparedId(msg.Id)
} else {
preparedData, ok = cc.psCache.Get(msg.Id)
preparedEntry, ok = cc.psCache.GetByOriginPreparedId(msg.Id)
}
if !ok {
log.Warnf("Received UNPREPARED for async request with prepare ID %v "+
"but could not find prepared data.", hex.EncodeToString(msg.Id))
} else {
prepare := &message.Prepare{
Query: preparedData.GetPrepareRequestInfo().GetQuery(),
Keyspace: preparedData.GetPrepareRequestInfo().GetKeyspace(),
Query: preparedEntry.GetPrepareRequestInfo().GetQuery(),
Keyspace: preparedEntry.GetPrepareRequestInfo().GetKeyspace(),
}
prepareFrame := frame.NewFrame(response.Header.Version, response.Header.StreamId, prepare)
prepareRawFrame, err := defaultCodec.ConvertToRawFrame(prepareFrame)
if err != nil {
log.Errorf("Could not send async PREPARE because convert raw frame failed: %v.", err.Error())
} else {
sent := cc.sendAsyncRequestToCluster(
preparedData.GetPrepareRequestInfo(), prepareRawFrame, false, time.Now(),
preparedEntry.GetPrepareRequestInfo(), prepareRawFrame, false, time.Now(),
time.Duration(cc.conf.ProxyRequestTimeoutMs)*time.Millisecond,
func() {
cc.clientHandlerRequestWg.Done()
Expand All @@ -384,6 +384,24 @@ func (cc *ClusterConnector) handleAsyncResponse(response *frame.RawFrame) *frame
default:
log.Warnf("Async Request failed with error code %v. Error message: %v", errMsg.GetErrorCode(), errMsg.GetErrorMessage())
}
} else {
decodedMsg, err := defaultCodec.ConvertFromRawFrame(response)
if err != nil {
log.Warnf("Could not decode async result: %v", err)
}
switch msg := decodedMsg.Body.Message.(type) {
case *message.PreparedResult:
prepareRequestInfo, ok := reqCtx.GetRequestInfo().(*PrepareRequestInfo)
if !ok {
log.Errorf("Received prepared result on async connector but request info is not prepared: %T.", reqCtx.GetRequestInfo())
}
switch cc.clusterType {
case common.ClusterTypeTarget:
_ = cc.psCache.StorePreparedOnTarget(msg, prepareRequestInfo)
case common.ClusterTypeOrigin:
_ = cc.psCache.StorePreparedOnOrigin(msg, prepareRequestInfo)
}
}
}

if callDone {
Expand Down
31 changes: 19 additions & 12 deletions proxy/pkg/zdmproxy/cqlparser.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ func buildRequestInfo(
} else if len(stmtsReplacedTerms) == 1 {
replacedTerms = stmtsReplacedTerms[0].replacedTerms
}
return NewPrepareRequestInfo(baseRequestInfo, replacedTerms, stmtQueryData.queryData.hasPositionalBindMarkers(), prepareMsg.Query, prepareMsg.Keyspace), nil
return NewPrepareRequestInfo(baseRequestInfo, stmtQueryData.queryData.isFullyQualified(), replacedTerms,
stmtQueryData.queryData.hasPositionalBindMarkers(), currentKeyspaceName, prepareMsg.Query, prepareMsg.Keyspace), nil
case primitive.OpCodeBatch:
decodedFrame, err := frameContext.GetOrDecodeFrame()
if err != nil {
Expand All @@ -113,11 +114,11 @@ func buildRequestInfo(
if !ok {
return nil, fmt.Errorf("could not convert message with batch op code to batch type, got %v instead", decodedFrame.Body.Message)
}
preparedDataByStmtIdxMap := make(map[int]PreparedData)
preparedDataByStmtIdxMap := make(map[int]PreparedEntry)
for childIdx, child := range batchMsg.Children {
switch queryOrId := child.QueryOrId.(type) {
case []byte:
preparedData, err := getPreparedData(psCache, mh, queryOrId, primitive.OpCodeBatch, decodedFrame)
preparedData, err := getPreparedEntry(psCache, mh, queryOrId, primitive.OpCodeBatch, decodedFrame)
if err != nil {
return nil, err
} else {
Expand All @@ -136,11 +137,11 @@ func buildRequestInfo(
if !ok {
return nil, fmt.Errorf("expected Execute but got %v instead", decodedFrame.Body.Message.GetOpCode())
}
preparedData, err := getPreparedData(psCache, mh, executeMsg.QueryId, primitive.OpCodeExecute, decodedFrame)
preparedEntry, err := getPreparedEntry(psCache, mh, executeMsg.QueryId, primitive.OpCodeExecute, decodedFrame)
if err != nil {
return nil, err
} else {
return NewExecuteRequestInfo(preparedData), nil
return NewExecuteRequestInfo(preparedEntry), nil
}
case primitive.OpCodeAuthResponse:
if forwardAuthToTarget {
Expand All @@ -155,21 +156,27 @@ func buildRequestInfo(
}
}

func getPreparedData(
func getPreparedEntry(
psCache *PreparedStatementCache,
mh *metrics.MetricHandler,
preparedId []byte,
clientPreparedId []byte,
code primitive.OpCode,
decodedFrame *frame.Frame) (PreparedData, error) {
if preparedData, ok := psCache.Get(preparedId); ok {
log.Tracef("%v with prepared-id = '%s' has prepared-data = %v", code.String(), hex.EncodeToString(preparedId), preparedData)
decodedFrame *frame.Frame) (PreparedEntry, error) {
if len(clientPreparedId) != 16 {
log.Warnf("Unexpected length of prepared id %v for %v, expected md5 digest of length 16. Returning UNPREPARED",
hex.EncodeToString(clientPreparedId), code.String())
return nil, &UnpreparedExecuteError{Header: decodedFrame.Header, Body: decodedFrame.Body, preparedId: clientPreparedId}
}
md5Id := ConvertToMd5Digest(clientPreparedId)
if preparedData, ok := psCache.GetByClientPreparedId(md5Id); ok {
log.Tracef("%v with prepared-id = '%s' has prepared-data = %v", code.String(), hex.EncodeToString(clientPreparedId), preparedData)
// The forward decision was set in the cache when handling the corresponding PREPARE request
return preparedData, nil
} else {
log.Warnf("No cached entry for prepared-id = '%s' for %v.", hex.EncodeToString(preparedId), code.String())
log.Warnf("No cached entry for prepared-id = '%s' for %v.", hex.EncodeToString(clientPreparedId), code.String())
mh.GetProxyMetrics().PSCacheMissCount.Add(1)
// return meaningful error to caller so it can generate an unprepared response
return nil, &UnpreparedExecuteError{Header: decodedFrame.Header, Body: decodedFrame.Body, preparedId: preparedId}
return nil, &UnpreparedExecuteError{Header: decodedFrame.Header, Body: decodedFrame.Body, preparedId: clientPreparedId}
}
}

Expand Down
Loading
Loading