Skip to content

Commit

Permalink
feat: add parsing for 3 types of metadata payloads inside cluster met…
Browse files Browse the repository at this point in the history
…adata
  • Loading branch information
ryan-gang committed Sep 17, 2024
1 parent b9d925e commit c6c6caf
Showing 1 changed file with 139 additions and 0 deletions.
139 changes: 139 additions & 0 deletions protocol/api/fetch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,3 +630,142 @@ func (rh *RecordHeader) Decode(pd *decoder.RealDecoder, logger *logger.Logger, i

return nil
}

type payload struct {
FrameVersion int8
Type int8
Version int8
Data json.RawMessage
}

func (p *payload) Decode(data []byte) (err error) {
partialDecoder := decoder.RealDecoder{}
partialDecoder.Init(data)

p.FrameVersion, err = partialDecoder.GetInt8() // Frame Version: 0
if err != nil {
return err
}

p.Type, err = partialDecoder.GetInt8()
if err != nil {
return err
}

p.Version, err = partialDecoder.GetInt8()
if err != nil {
return err
}

jsonObject := map[string]interface{}{}

switch p.Type {
case 5:
jsonObject["type"] = "PARTITION_CHANGE_RECORD"
jsonObject["version"] = p.Version
partitionId, err := partialDecoder.GetInt32()
if err != nil {
return err
}
jsonObject["partitionId"] = partitionId

topicId, err := getUUID(&partialDecoder)
if err != nil {
return err
}
jsonObject["topicId"] = topicId

// skip 3 bytes (not sure why) ToDo
_, err = partialDecoder.GetRawBytes(3)
if err != nil {
return err
}

leader, err := partialDecoder.GetInt32()
if err != nil {
return err
}
jsonObject["leader"] = leader

if partialDecoder.Remaining() > 0 {
return errors.NewPacketDecodingError(fmt.Sprintf("Remaining bytes after decoding: %d", partialDecoder.Remaining()), "PARTITION_CHANGE_RECORD")
}
case 20:
jsonObject["type"] = "NO_OP_RECORD"
jsonObject["version"] = p.Version

// COMPACT_NULLABLE_BYTES
// 0x00 - NULL
dataLength, err := partialDecoder.GetUnsignedVarint()
if err != nil {
return err
}
if dataLength == 0 {
jsonObject["data"] = nil
} else {
jsonObject["data"] = make([]interface{}, dataLength)
}

if partialDecoder.Remaining() > 0 {
return errors.NewPacketDecodingError(fmt.Sprintf("Remaining bytes after decoding: %d", partialDecoder.Remaining()), "PARTITION_CHANGE_RECORD")
}
case 17:
jsonObject["type"] = "NO_OP_RECORD"
jsonObject["version"] = p.Version

brokerId, err := partialDecoder.GetInt32()
if err != nil {
return err
}
jsonObject["brokerId"] = brokerId

brokerEpoch, err := partialDecoder.GetInt64()
if err != nil {
return err
}
jsonObject["brokerEpoch"] = brokerEpoch

fenced, err := partialDecoder.GetInt8()
if err != nil {
return err
}
jsonObject["fenced"] = fenced

// if p.Version == 1 { // seems to be present always
inControlledShutdown, err := partialDecoder.GetInt8()
if err != nil {
return err
}
jsonObject["inControlledShutdown"] = inControlledShutdown
// }

// ToDo: Not sure why we need this
_, err = partialDecoder.GetRawBytes(2)
if err != nil {
return err
}

if partialDecoder.Remaining() > 0 {
return errors.NewPacketDecodingError(fmt.Sprintf("Remaining bytes after decoding: %d", partialDecoder.Remaining()), "PARTITION_CHANGE_RECORD")
}
}
jsonData, err := json.Marshal(jsonObject)
if err != nil {
return err
}
p.Data = json.RawMessage(jsonData)

return nil
}

func getUUID(pd *decoder.RealDecoder) (string, error) {
topicUUIDBytes, err := pd.GetRawBytes(16)
if err != nil {
return "", err
}
topicUUID, err := encoder.DecodeUUID(topicUUIDBytes)
if err != nil {
return "", err
}
return topicUUID, nil
}

0 comments on commit c6c6caf

Please sign in to comment.