Skip to content

Commit

Permalink
Merge pull request #33 from codecrafters-io/alpha-fixes-8
Browse files Browse the repository at this point in the history
Refactor cluster metadata serialization and add custom byte matching logic
  • Loading branch information
ryan-gang authored Oct 24, 2024
2 parents 54440f0 + 2148566 commit 234e099
Show file tree
Hide file tree
Showing 9 changed files with 728 additions and 434 deletions.
7 changes: 1 addition & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ test_concurrent_requests_with_kafka: build
dist/main.out


test_current:
test_all:
make test_base_with_kafka
make test_concurrent_requests_with_kafka
make test_describe_topic_partitions_with_kafka
Expand All @@ -37,11 +37,6 @@ test_fetch_with_kafka: build
CODECRAFTERS_TEST_CASES_JSON="[{\"slug\":\"gs0\",\"tester_log_prefix\":\"stage-F1\",\"title\":\"Stage #F1: API Version with Fetch Key\"}, {\"slug\":\"dh6\",\"tester_log_prefix\":\"stage-F2\",\"title\":\"Stage #F2: Fetch with no topics\"}, {\"slug\":\"hn6\",\"tester_log_prefix\":\"stage-F3\",\"title\":\"Stage #F3: Fetch with unknown topic\"}, {\"slug\":\"cm4\",\"tester_log_prefix\":\"stage-F4\",\"title\":\"Stage #F4: Fetch with empty topic\"}, {\"slug\":\"eg2\",\"tester_log_prefix\":\"stage-F5\",\"title\":\"Stage #F5: Single Fetch from Disk\"}, {\"slug\":\"fd8\",\"tester_log_prefix\":\"stage-F6\",\"title\":\"Stage #F6: Multi Fetch from Disk\"}]" \
dist/main.out

test_75_with_kafka: build
CODECRAFTERS_REPOSITORY_DIR=./internal/test_helpers/pass_all \
CODECRAFTERS_TEST_CASES_JSON="[{\"slug\":\"fd8\",\"tester_log_prefix\":\"stage-F6\",\"title\":\"Stage #F6: Multi Fetch from Disk\"}]" \
dist/main.out

test:
TESTER_DIR=$(shell pwd) go test -v ./internal/ -failfast --count=1

Expand Down
54 changes: 54 additions & 0 deletions internal/assertions/fetch_response_assertion.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (

"github.com/codecrafters-io/kafka-tester/protocol"
kafkaapi "github.com/codecrafters-io/kafka-tester/protocol/api"
realencoder "github.com/codecrafters-io/kafka-tester/protocol/encoder"
"github.com/codecrafters-io/tester-utils/bytes_diff_visualizer"
"github.com/codecrafters-io/tester-utils/logger"
)

Expand Down Expand Up @@ -220,6 +222,46 @@ func (a *FetchResponseAssertion) assertRecords(expectedRecords []kafkaapi.Record
return a
}

func (a *FetchResponseAssertion) AssertRecordBatchBytes() *FetchResponseAssertion {
if a.err != nil {
return a
}

actualRecordBatches := []kafkaapi.RecordBatch{}
for _, topic := range a.ActualValue.TopicResponses {
for _, partition := range topic.PartitionResponses {
actualRecordBatches = append(actualRecordBatches, partition.RecordBatches...)
}
}

expectedRecordBatches := []kafkaapi.RecordBatch{}
for _, topic := range a.ExpectedValue.TopicResponses {
for _, partition := range topic.PartitionResponses {
expectedRecordBatches = append(expectedRecordBatches, partition.RecordBatches...)
}
}

expectedRecordBatchBytes := encodeRecordBatches(expectedRecordBatches)
actualRecordBatchBytes := encodeRecordBatches(actualRecordBatches)
// Byte Comparison for expected v actual RecordBatch bytes
// As we write them to disk, and expect users to not change the values
// we can use a simple byte comparison here.
if !bytes.Equal(expectedRecordBatchBytes, actualRecordBatchBytes) {
result := bytes_diff_visualizer.VisualizeByteDiff(expectedRecordBatchBytes, actualRecordBatchBytes)
a.logger.Errorf("")
for _, line := range result {
a.logger.Errorf(line)
}
a.logger.Errorf("")
a.err = fmt.Errorf("RecordBatch bytes do not match with the contents on disk")
return a
}

a.logger.Successf("✓ RecordBatch bytes match with the contents on disk")
return a

}

func (a FetchResponseAssertion) Run() error {
// firstLevelFields: ["ThrottleTimeMs", "ErrorCode", "SessionID"]
// secondLevelFields (Topics): ["Topic"]
Expand All @@ -228,3 +270,15 @@ func (a FetchResponseAssertion) Run() error {
// fifthLevelFields (Records): ["Value"]
return a.err
}

func encodeRecordBatches(recordBatches []kafkaapi.RecordBatch) []byte {
// Given an array of RecordBatch, encodes them using the encoder.RealEncoder
// and returns the resulting bytes.

encoder := realencoder.RealEncoder{}
encoder.Init(make([]byte, 4096))
for _, recordBatch := range recordBatches {
recordBatch.Encode(&encoder)
}
return encoder.Bytes()[:encoder.Offset()]
}
10 changes: 4 additions & 6 deletions internal/stagef5.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ func testFetchWithSingleMessage(stageHarness *test_case_harness.TestCaseHarness)
Magic: 0,
Attributes: 0,
LastOffsetDelta: 0,
FirstTimestamp: 0,
MaxTimestamp: 0,
FirstTimestamp: 1726045973899,
MaxTimestamp: 1726045973899,
ProducerId: 0,
ProducerEpoch: 0,
BaseSequence: 0,
Expand All @@ -120,7 +120,7 @@ func testFetchWithSingleMessage(stageHarness *test_case_harness.TestCaseHarness)
Attributes: 0,
TimestampDelta: 0,
OffsetDelta: 0,
Key: []byte{},
Key: nil,
Value: []byte(common.MESSAGE1),
Headers: []kafkaapi.RecordHeader{},
},
Expand All @@ -134,11 +134,9 @@ func testFetchWithSingleMessage(stageHarness *test_case_harness.TestCaseHarness)
},
}

// Byte to Byte comparison for RecordBatches
// Wire up ByteDiffVisualizer

return assertions.NewFetchResponseAssertion(*responseBody, expectedFetchResponse, logger).
AssertBody([]string{"ThrottleTimeMs", "ErrorCode"}).
AssertTopics([]string{"Topic"}, []string{"ErrorCode", "PartitionIndex"}, []string{"BaseOffset"}, []string{"Value"}).
AssertRecordBatchBytes().
Run()
}
9 changes: 5 additions & 4 deletions internal/stagef6.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ func testFetchMultipleMessages(stageHarness *test_case_harness.TestCaseHarness)
Magic: 0,
Attributes: 0,
LastOffsetDelta: 0,
FirstTimestamp: 0,
MaxTimestamp: 0,
FirstTimestamp: 1726045973899,
MaxTimestamp: 1726045973899,
ProducerId: 0,
ProducerEpoch: 0,
BaseSequence: 0,
Expand All @@ -133,8 +133,8 @@ func testFetchMultipleMessages(stageHarness *test_case_harness.TestCaseHarness)
Magic: 0,
Attributes: 0,
LastOffsetDelta: 0,
FirstTimestamp: 0,
MaxTimestamp: 0,
FirstTimestamp: 1726045973899,
MaxTimestamp: 1726045973899,
ProducerId: 0,
ProducerEpoch: 0,
BaseSequence: 0,
Expand All @@ -161,5 +161,6 @@ func testFetchMultipleMessages(stageHarness *test_case_harness.TestCaseHarness)
return assertions.NewFetchResponseAssertion(*responseBody, expectedFetchResponse, logger).
AssertBody([]string{"ThrottleTimeMs", "ErrorCode"}).
AssertTopics([]string{"Topic"}, []string{"ErrorCode", "PartitionIndex"}, []string{"BaseOffset"}, []string{"Value"}).
AssertRecordBatchBytes().
Run()
}
6 changes: 4 additions & 2 deletions internal/stages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestStages(t *testing.T) {
NormalizeOutputFunc: normalizeTesterOutput,
},
"fetch_pass": {
StageSlugs: []string{"gs0", "dh6", "hn6", "cm4"},
StageSlugs: []string{"gs0", "dh6", "hn6", "cm4", "eg2", "fd8"},
CodePath: "./test_helpers/pass_all",
ExpectedExitCode: 0,
StdoutFixturePath: "./test_helpers/fixtures/fetch/pass",
Expand All @@ -60,13 +60,15 @@ func normalizeTesterOutput(testerOutput []byte) []byte {
"Name": {regexp.MustCompile(`✓ Topic Name: [0-9A-Za-z]{3}`)},
"UUID": {regexp.MustCompile(`✓ Topic UUID: [0-9]{8}-[0-9]{4}-[0-9]{4}-[0-9]{4}-[0-9]{12}`)},
"value_length": {regexp.MustCompile(`- .value_length \([0-9]{1,}\)`)},
"value": {regexp.MustCompile(`- .value \("[A-Za-z0-9 !]{1,}"\)`)},
"value": {regexp.MustCompile(`- .[vV]alue \("[A-Za-z0-9 !]{1,}"\)`)},
"name": {regexp.MustCompile(`- .name \([A-Za-z -]{1,}\)`)},
"topic_name": {regexp.MustCompile(`- .topic_name \([A-Za-z0-9 ]{1,}\)`)},
"next_cursor": {regexp.MustCompile(`- .next_cursor \(\{[A-Za-z0-9 ]{1,}\}\)`)},
"Messages": {regexp.MustCompile(`✓ Messages: \["[A-Za-z !]{1,}"\]`)},
"Topic Name": {regexp.MustCompile(`✓ TopicResponse\[[0-9]{1,}\] Topic Name: [A-Za-z -]{3,}`)},
"Topic UUID": {regexp.MustCompile(`✓ TopicResponse\[[0-9]{1,}\] Topic UUID: [0-9 -]{1,}`)},
"Record Value": {regexp.MustCompile(`✓ Record\[[0-9]{1,}\] Value: [A-Za-z0-9 !]{1,}`)},
"RecordBatch BaseOffset": {regexp.MustCompile(`✓ RecordBatch\[[0-9]{1,}\] BaseOffset: [0-9]{1,}`)},
}

for replacement, regexes := range replacements {
Expand Down
Loading

0 comments on commit 234e099

Please sign in to comment.