Skip to content

Commit

Permalink
add sequence id for status update event
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
skeeey committed Jan 3, 2024
1 parent 04e951f commit dd0cff2
Show file tree
Hide file tree
Showing 12 changed files with 581 additions and 13 deletions.
14 changes: 14 additions & 0 deletions cloudevents/generic/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ const (
// ExtensionResourceVersion is the cloud event extension key of the resource version.
ExtensionResourceVersion = "resourceversion"

// ExtensionStatusUpdateSequenceID is the cloud event extension key of the status update event sequence ID.
// The status update event sequence id represents the order in which status update events occur on a single agent.
ExtensionStatusUpdateSequenceID = "sequenceid"

// ExtensionDeletionTimestamp is the cloud event extension key of the deletion timestamp.
ExtensionDeletionTimestamp = "deletiontimestamp"

Expand Down Expand Up @@ -159,6 +163,7 @@ type EventBuilder struct {
clusterName string
originalSource string
resourceID string
sequenceID string
resourceVersion *int64
eventType CloudEventsType
deletionTimestamp time.Time
Expand All @@ -181,6 +186,11 @@ func (b *EventBuilder) WithResourceVersion(resourceVersion int64) *EventBuilder
return b
}

func (b *EventBuilder) WithStatusUpdateSequenceID(sequenceID string) *EventBuilder {
b.sequenceID = sequenceID
return b
}

func (b *EventBuilder) WithClusterName(clusterName string) *EventBuilder {
b.clusterName = clusterName
return b
Expand Down Expand Up @@ -211,6 +221,10 @@ func (b *EventBuilder) NewEvent() cloudevents.Event {
evt.SetExtension(ExtensionResourceVersion, *b.resourceVersion)
}

if len(b.sequenceID) != 0 {
evt.SetExtension(ExtensionStatusUpdateSequenceID, b.sequenceID)
}

if len(b.clusterName) != 0 {
evt.SetExtension(ExtensionClusterName, b.clusterName)
}
Expand Down
13 changes: 13 additions & 0 deletions cloudevents/work/agent/codec/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"strconv"

"github.com/bwmarrin/snowflake"
cloudevents "github.com/cloudevents/sdk-go/v2"
cloudeventstypes "github.com/cloudevents/sdk-go/v2/types"

Expand All @@ -27,6 +28,17 @@ const (
CloudEventsOriginalSourceAnnotationKey = "cloudevents.open-cluster-management.io/originalsource"
)

var sequenceGenerator *snowflake.Node

func init() {
// init the snowflake id generator with node id 1 for each single agent. Each single agent has its own consumer id
// to be identified, and we can ensure the order of status update event from the same agent via sequence id. The
// events from different agents are independent, hence the ordering among them needs not to be guaranteed.
// The snowflake `NewNode` returns error only when the snowflake node id is less than 1 or great than 1024, so the
// error can be ignored here.
sequenceGenerator, _ = snowflake.NewNode(1)
}

// ManifestCodec is a codec to encode/decode a ManifestWork/cloudevent with ManifestBundle for an agent.
type ManifestCodec struct {
restMapper meta.RESTMapper
Expand Down Expand Up @@ -65,6 +77,7 @@ func (c *ManifestCodec) Encode(source string, eventType types.CloudEventsType, w

evt := types.NewEventBuilder(source, eventType).
WithResourceID(string(work.UID)).
WithStatusUpdateSequenceID(sequenceGenerator.Generate().String()).
WithResourceVersion(resourceVersion).
WithClusterName(work.Namespace).
WithOriginalSource(originalSource).
Expand Down
9 changes: 2 additions & 7 deletions cloudevents/work/agent/codec/manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (

func TestManifestEventDataType(t *testing.T) {
codec := NewManifestCodec(nil)

if codec.EventDataType() != payload.ManifestEventDataType {
t.Errorf("unexpected event data type %s", codec.EventDataType())
}
Expand Down Expand Up @@ -143,9 +142,7 @@ func TestManifestEncode(t *testing.T) {

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
codec := NewManifestCodec(nil)

_, err := codec.Encode("cluster1-work-agent", c.eventType, c.work)
_, err := NewManifestCodec(nil).Encode("cluster1-work-agent", c.eventType, c.work)
if c.expectedErr {
if err == nil {
t.Errorf("expected an error, but failed")
Expand Down Expand Up @@ -283,9 +280,7 @@ func TestManifestDecode(t *testing.T) {

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
codec := NewManifestCodec(nil)

_, err := codec.Decode(c.event)
_, err := NewManifestCodec(nil).Decode(c.event)
if c.expectedErr {
if err == nil {
t.Errorf("expected an error, but failed")
Expand Down
1 change: 1 addition & 0 deletions cloudevents/work/agent/codec/manifestbundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func (c *ManifestBundleCodec) Encode(source string, eventType types.CloudEventsT

evt := types.NewEventBuilder(source, eventType).
WithResourceID(string(work.UID)).
WithStatusUpdateSequenceID(sequenceGenerator.Generate().String()).
WithResourceVersion(resourceVersion).
WithClusterName(work.Namespace).
WithOriginalSource(originalSource).
Expand Down
8 changes: 2 additions & 6 deletions cloudevents/work/agent/codec/manifestbundle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,7 @@ func TestManifestBundleEncode(t *testing.T) {

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
codec := NewManifestBundleCodec()

_, err := codec.Encode("cluster1-work-agent", c.eventType, c.work)
_, err := NewManifestBundleCodec().Encode("cluster1-work-agent", c.eventType, c.work)
if c.expectedErr {
if err == nil {
t.Errorf("expected an error, but failed")
Expand Down Expand Up @@ -231,9 +229,7 @@ func TestManifestBundleDecode(t *testing.T) {

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
codec := NewManifestBundleCodec()

_, err := codec.Decode(c.event)
_, err := NewManifestBundleCodec().Decode(c.event)
if c.expectedErr {
if err == nil {
t.Errorf("expected an error, but failed")
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module open-cluster-management.io/api
go 1.20

require (
github.com/bwmarrin/snowflake v0.3.0
github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 v2.0.0-20231030012137-0836a524e995
github.com/cloudevents/sdk-go/v2 v2.14.0
github.com/eclipse/paho.golang v0.11.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM=
github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ=
github.com/bwmarrin/snowflake v0.3.0 h1:xm67bEhkKh6ij1790JB83OujPR5CzNe8QuQqAgISZN0=
github.com/bwmarrin/snowflake v0.3.0/go.mod h1:NdZxfVWX+oR6y2K0o6qAYv6gIOP9rjG0/E9WsDpxqwE=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 v2.0.0-20231030012137-0836a524e995 h1:pXyRKZ0T5WoB6X9QnHS5cEyW0Got39bNQIECxGUKVO4=
Expand Down
12 changes: 12 additions & 0 deletions vendor/github.com/bwmarrin/snowflake/.travis.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions vendor/github.com/bwmarrin/snowflake/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

143 changes: 143 additions & 0 deletions vendor/github.com/bwmarrin/snowflake/README.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit dd0cff2

Please sign in to comment.