Skip to content

Commit

Permalink
draft
Browse files Browse the repository at this point in the history
  • Loading branch information
joao-r-reis committed Jun 25, 2024
1 parent a6a5fc1 commit 7560b7d
Show file tree
Hide file tree
Showing 7 changed files with 615 additions and 226 deletions.
2 changes: 1 addition & 1 deletion integration-tests/asyncreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func TestAsyncReadsRequestTypes(t *testing.T) {
},
expectedOpCode: primitive.OpCodeResult,
sentPrimary: true,
sentSecondary: true,
sentSecondary: false,
sentAsync: true,
primedQuery: nil,
prepared: true,
Expand Down
407 changes: 261 additions & 146 deletions proxy/pkg/zdmproxy/clienthandler.go

Large diffs are not rendered by default.

30 changes: 24 additions & 6 deletions proxy/pkg/zdmproxy/clusterconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,28 +350,28 @@ func (cc *ClusterConnector) handleAsyncResponse(response *frame.RawFrame) *frame
}
switch msg := errMsg.(type) {
case *message.Unprepared:
var preparedData PreparedData
var preparedEntry PreparedEntry
var ok bool
if cc.clusterType == common.ClusterTypeTarget {
preparedData, ok = cc.psCache.GetByTargetPreparedId(msg.Id)
preparedEntry, ok = cc.psCache.GetByTargetPreparedId(msg.Id)
} else {
preparedData, ok = cc.psCache.Get(msg.Id)
preparedEntry, ok = cc.psCache.GetByOriginPreparedId(msg.Id)
}
if !ok {
log.Warnf("Received UNPREPARED for async request with prepare ID %v "+
"but could not find prepared data.", hex.EncodeToString(msg.Id))
} else {
prepare := &message.Prepare{
Query: preparedData.GetPrepareRequestInfo().GetQuery(),
Keyspace: preparedData.GetPrepareRequestInfo().GetKeyspace(),
Query: preparedEntry.GetPrepareRequestInfo().GetQuery(),
Keyspace: preparedEntry.GetPrepareRequestInfo().GetKeyspace(),
}
prepareFrame := frame.NewFrame(response.Header.Version, response.Header.StreamId, prepare)
prepareRawFrame, err := defaultCodec.ConvertToRawFrame(prepareFrame)
if err != nil {
log.Errorf("Could not send async PREPARE because convert raw frame failed: %v.", err.Error())
} else {
sent := cc.sendAsyncRequestToCluster(
preparedData.GetPrepareRequestInfo(), prepareRawFrame, false, time.Now(),
preparedEntry.GetPrepareRequestInfo(), prepareRawFrame, false, time.Now(),
time.Duration(cc.conf.ProxyRequestTimeoutMs)*time.Millisecond,
func() {
cc.clientHandlerRequestWg.Done()
Expand All @@ -384,6 +384,24 @@ func (cc *ClusterConnector) handleAsyncResponse(response *frame.RawFrame) *frame
default:
log.Warnf("Async Request failed with error code %v. Error message: %v", errMsg.GetErrorCode(), errMsg.GetErrorMessage())
}
} else {
decodedMsg, err := defaultCodec.ConvertFromRawFrame(response)
if err != nil {
log.Warnf("Could not decode async result: %v", err)
}
switch msg := decodedMsg.Body.Message.(type) {
case *message.PreparedResult:
prepareRequestInfo, ok := reqCtx.GetRequestInfo().(*PrepareRequestInfo)
if !ok {
log.Errorf("Received prepared result on async connector but request info is not prepared: %T.", reqCtx.GetRequestInfo())
}
switch cc.clusterType {
case common.ClusterTypeTarget:
_ = cc.psCache.StorePreparedOnTarget(msg, prepareRequestInfo)
case common.ClusterTypeOrigin:
_ = cc.psCache.StorePreparedOnOrigin(msg, prepareRequestInfo)
}
}
}

if callDone {
Expand Down
31 changes: 19 additions & 12 deletions proxy/pkg/zdmproxy/cqlparser.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ func buildRequestInfo(
} else if len(stmtsReplacedTerms) == 1 {
replacedTerms = stmtsReplacedTerms[0].replacedTerms
}
return NewPrepareRequestInfo(baseRequestInfo, replacedTerms, stmtQueryData.queryData.hasPositionalBindMarkers(), prepareMsg.Query, prepareMsg.Keyspace), nil
return NewPrepareRequestInfo(baseRequestInfo, stmtQueryData.queryData.isFullyQualified(), replacedTerms,
stmtQueryData.queryData.hasPositionalBindMarkers(), currentKeyspaceName, prepareMsg.Query, prepareMsg.Keyspace), nil
case primitive.OpCodeBatch:
decodedFrame, err := frameContext.GetOrDecodeFrame()
if err != nil {
Expand All @@ -113,11 +114,11 @@ func buildRequestInfo(
if !ok {
return nil, fmt.Errorf("could not convert message with batch op code to batch type, got %v instead", decodedFrame.Body.Message)
}
preparedDataByStmtIdxMap := make(map[int]PreparedData)
preparedDataByStmtIdxMap := make(map[int]PreparedEntry)
for childIdx, child := range batchMsg.Children {
switch queryOrId := child.QueryOrId.(type) {
case []byte:
preparedData, err := getPreparedData(psCache, mh, queryOrId, primitive.OpCodeBatch, decodedFrame)
preparedData, err := getPreparedEntry(psCache, mh, queryOrId, primitive.OpCodeBatch, decodedFrame)
if err != nil {
return nil, err
} else {
Expand All @@ -136,11 +137,11 @@ func buildRequestInfo(
if !ok {
return nil, fmt.Errorf("expected Execute but got %v instead", decodedFrame.Body.Message.GetOpCode())
}
preparedData, err := getPreparedData(psCache, mh, executeMsg.QueryId, primitive.OpCodeExecute, decodedFrame)
preparedEntry, err := getPreparedEntry(psCache, mh, executeMsg.QueryId, primitive.OpCodeExecute, decodedFrame)
if err != nil {
return nil, err
} else {
return NewExecuteRequestInfo(preparedData), nil
return NewExecuteRequestInfo(preparedEntry), nil
}
case primitive.OpCodeAuthResponse:
if forwardAuthToTarget {
Expand All @@ -155,21 +156,27 @@ func buildRequestInfo(
}
}

func getPreparedData(
func getPreparedEntry(
psCache *PreparedStatementCache,
mh *metrics.MetricHandler,
preparedId []byte,
clientPreparedId []byte,
code primitive.OpCode,
decodedFrame *frame.Frame) (PreparedData, error) {
if preparedData, ok := psCache.Get(preparedId); ok {
log.Tracef("%v with prepared-id = '%s' has prepared-data = %v", code.String(), hex.EncodeToString(preparedId), preparedData)
decodedFrame *frame.Frame) (PreparedEntry, error) {
if len(clientPreparedId) != 16 {
log.Warnf("Unexpected length of prepared id %v for %v, expected md5 digest of length 16. Returning UNPREPARED",
hex.EncodeToString(clientPreparedId), code.String())
return nil, &UnpreparedExecuteError{Header: decodedFrame.Header, Body: decodedFrame.Body, preparedId: clientPreparedId}
}
md5Id := ConvertToMd5Digest(clientPreparedId)
if preparedData, ok := psCache.GetByClientPreparedId(md5Id); ok {
log.Tracef("%v with prepared-id = '%s' has prepared-data = %v", code.String(), hex.EncodeToString(clientPreparedId), preparedData)
// The forward decision was set in the cache when handling the corresponding PREPARE request
return preparedData, nil
} else {
log.Warnf("No cached entry for prepared-id = '%s' for %v.", hex.EncodeToString(preparedId), code.String())
log.Warnf("No cached entry for prepared-id = '%s' for %v.", hex.EncodeToString(clientPreparedId), code.String())
mh.GetProxyMetrics().PSCacheMissCount.Add(1)
// return meaningful error to caller so it can generate an unprepared response
return nil, &UnpreparedExecuteError{Header: decodedFrame.Header, Body: decodedFrame.Body, preparedId: preparedId}
return nil, &UnpreparedExecuteError{Header: decodedFrame.Header, Body: decodedFrame.Body, preparedId: clientPreparedId}
}
}

Expand Down
Loading

0 comments on commit 7560b7d

Please sign in to comment.