-
Notifications
You must be signed in to change notification settings - Fork 288
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
codec(ticdc): kafka simple encoding protocol support encoding checkpoint event. #9911
Merged
Merged
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
9cc8dc8
add simple encoder and decoder interface declaration.
3AceShowHand 9d40cd1
add basic about support encoding checkpoint event.
3AceShowHand 2f0501f
support simple encoding the checkpoint event by using json as the mar…
3AceShowHand 23b4523
add encode and decode checkpoint
3AceShowHand 974514b
add more comment.
3AceShowHand 1a1c9ae
add version to simple message.
3AceShowHand File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
// Copyright 2023 PingCAP, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package simple | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/pingcap/tiflow/cdc/model" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestEncodeCheckpoint(t *testing.T) { | ||
t.Parallel() | ||
|
||
enc := NewBuilder().Build() | ||
|
||
checkpoint := 23 | ||
m, err := enc.EncodeCheckpointEvent(uint64(checkpoint)) | ||
require.NoError(t, err) | ||
|
||
dec := NewDecoder() | ||
err = dec.AddKeyValue(m.Key, m.Value) | ||
require.NoError(t, err) | ||
|
||
messageType, hasNext, err := dec.HasNext() | ||
require.NoError(t, err) | ||
require.True(t, hasNext) | ||
require.Equal(t, model.MessageTypeResolved, messageType) | ||
|
||
ts, err := dec.NextResolvedEvent() | ||
require.NoError(t, err) | ||
require.Equal(t, uint64(checkpoint), ts) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
// Copyright 2023 PingCAP, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package simple | ||
|
||
const ( | ||
defaultVersion = 0 | ||
) | ||
|
||
// EventType describes the type of the event. | ||
type EventType string | ||
|
||
const ( | ||
// WatermarkType is the type of the watermark event. | ||
WatermarkType EventType = "WATERMARK" | ||
) | ||
|
||
type message struct { | ||
Version int `json:"version"` | ||
// Scheme and Table is empty for the resolved ts event. | ||
Schema string `json:"schema,omitempty"` | ||
Table string `json:"table,omitempty"` | ||
Type EventType `json:"type"` | ||
CommitTs uint64 `json:"commitTs"` | ||
// Data is available for the row changed event. | ||
Data map[string]interface{} `json:"data,omitempty"` | ||
Old map[string]interface{} `json:"old,omitempty"` | ||
} | ||
|
||
func newResolvedMessage(ts uint64) *message { | ||
return &message{ | ||
Version: defaultVersion, | ||
Type: WatermarkType, | ||
CommitTs: ts, | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why use "Simple" as the new protocol name? We will add claim-check large message feature for this protocol, we will add water mark, checkpoint, schema tracking & bootstrap and more other features, can we use a proper name that can represent its functionalities? How about SufficientJson since it is json, but not each message contains the full schema information and is more sufficient than CanalJson?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or ExtensibleJson which means we added many customized feature for this new protocol?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name is not decided yet, use
simple
temporarily.Json
should not be part of the protocol, since this new protocol does not only support JSON format, we may make it support other encoding formats such as avro, cbor.Sufficient sounds good to me. Extensible is also a good name, but it's not easy to explain the
customized
, since this new protocol belongs to the TiCDC itself, all things are customized.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the concrete name, further discussion is required.