From 43e888f13c735e4f0e8acaa6cc62b3f1f0e23005 Mon Sep 17 00:00:00 2001 From: Shivaprasad Bhat Date: Thu, 12 Jan 2023 15:02:23 +0530 Subject: [PATCH] fix: retain previous output during intermediate states (#106) * fix: retain previous output during intermediate states * feat: add new resource labels to resource revision * fix: firehose replicas should always be greater than 0 * fix: create resource revision only for user triggered updates * feat: add revision reason to resource revisions Co-authored-by: Rohil Surana --- core/mocks/resource_store.go | 24 ++++++++++++----------- core/module/driver.go | 1 + core/resource/resource.go | 3 ++- core/sync.go | 2 +- core/write.go | 6 +++--- core/write_test.go | 14 ++++++------- go.mod | 2 +- go.sum | 4 ++-- internal/server/v1/resources/mappers.go | 1 + internal/store/postgres/resource_store.go | 24 ++++++++++++----------- internal/store/postgres/revision_model.go | 3 ++- internal/store/postgres/revision_store.go | 5 +++-- internal/store/postgres/schema.sql | 3 ++- modules/firehose/plan.go | 9 ++++++++- modules/firehose/plan_test.go | 4 ++++ modules/firehose/schema/config.json | 3 ++- modules/firehose/schema/scale.json | 3 ++- modules/kubernetes/kubernetes.go | 2 +- 18 files changed, 68 insertions(+), 45 deletions(-) diff --git a/core/mocks/resource_store.go b/core/mocks/resource_store.go index 67f26cf6..b8d88731 100644 --- a/core/mocks/resource_store.go +++ b/core/mocks/resource_store.go @@ -269,20 +269,20 @@ func (_c *ResourceStore_Revisions_Call) Return(_a0 []resource.Revision, _a1 erro return _c } -// Update provides a mock function with given fields: ctx, r, hooks -func (_m *ResourceStore) Update(ctx context.Context, r resource.Resource, hooks ...resource.MutationHook) error { +// Update provides a mock function with given fields: ctx, r, saveRevision, reason, hooks +func (_m *ResourceStore) Update(ctx context.Context, r resource.Resource, saveRevision bool, reason string, hooks ...resource.MutationHook) error { _va := make([]interface{}, len(hooks)) for _i := range hooks { _va[_i] = hooks[_i] } var _ca []interface{} - _ca = append(_ca, ctx, r) + _ca = append(_ca, ctx, r, saveRevision, reason) _ca = append(_ca, _va...) ret := _m.Called(_ca...) var r0 error - if rf, ok := ret.Get(0).(func(context.Context, resource.Resource, ...resource.MutationHook) error); ok { - r0 = rf(ctx, r, hooks...) + if rf, ok := ret.Get(0).(func(context.Context, resource.Resource, bool, string, ...resource.MutationHook) error); ok { + r0 = rf(ctx, r, saveRevision, reason, hooks...) } else { r0 = ret.Error(0) } @@ -298,21 +298,23 @@ type ResourceStore_Update_Call struct { // Update is a helper method to define mock.On call // - ctx context.Context // - r resource.Resource +// - saveRevision bool +// - reason string // - hooks ...resource.MutationHook -func (_e *ResourceStore_Expecter) Update(ctx interface{}, r interface{}, hooks ...interface{}) *ResourceStore_Update_Call { +func (_e *ResourceStore_Expecter) Update(ctx interface{}, r interface{}, saveRevision interface{}, reason interface{}, hooks ...interface{}) *ResourceStore_Update_Call { return &ResourceStore_Update_Call{Call: _e.mock.On("Update", - append([]interface{}{ctx, r}, hooks...)...)} + append([]interface{}{ctx, r, saveRevision, reason}, hooks...)...)} } -func (_c *ResourceStore_Update_Call) Run(run func(ctx context.Context, r resource.Resource, hooks ...resource.MutationHook)) *ResourceStore_Update_Call { +func (_c *ResourceStore_Update_Call) Run(run func(ctx context.Context, r resource.Resource, saveRevision bool, reason string, hooks ...resource.MutationHook)) *ResourceStore_Update_Call { _c.Call.Run(func(args mock.Arguments) { - variadicArgs := make([]resource.MutationHook, len(args)-2) - for i, a := range args[2:] { + variadicArgs := make([]resource.MutationHook, len(args)-4) + for i, a := range args[4:] { if a != nil { variadicArgs[i] = a.(resource.MutationHook) } } - run(args[0].(context.Context), args[1].(resource.Resource), variadicArgs...) + run(args[0].(context.Context), args[1].(resource.Resource), args[2].(bool), args[3].(string), variadicArgs...) }) return _c } diff --git a/core/module/driver.go b/core/module/driver.go index 9dc30f57..f52be7e7 100644 --- a/core/module/driver.go +++ b/core/module/driver.go @@ -36,6 +36,7 @@ type Driver interface { type Plan struct { Resource resource.Resource ScheduleRunAt time.Time + Reason string } // Loggable extension of driver allows streaming log data for a resource. diff --git a/core/resource/resource.go b/core/resource/resource.go index a5d406bb..fc2b0a3a 100644 --- a/core/resource/resource.go +++ b/core/resource/resource.go @@ -21,7 +21,7 @@ type Store interface { List(ctx context.Context, filter Filter) ([]Resource, error) Create(ctx context.Context, r Resource, hooks ...MutationHook) error - Update(ctx context.Context, r Resource, hooks ...MutationHook) error + Update(ctx context.Context, r Resource, saveRevision bool, reason string, hooks ...MutationHook) error Delete(ctx context.Context, urn string, hooks ...MutationHook) error Revisions(ctx context.Context, selector RevisionsSelector) ([]Revision, error) @@ -68,6 +68,7 @@ type RevisionsSelector struct { type Revision struct { ID int64 `json:"id"` URN string `json:"urn"` + Reason string `json:"reason"` Labels map[string]string `json:"labels"` CreatedAt time.Time `json:"created_at"` diff --git a/core/sync.go b/core/sync.go index 0c98acee..b1f4ee27 100644 --- a/core/sync.go +++ b/core/sync.go @@ -103,7 +103,7 @@ func (s *Service) syncChange(ctx context.Context, urn string) (*resource.Resourc return nil, err } } else { - if err := s.upsert(ctx, module.Plan{Resource: *res}, false); err != nil { + if err := s.upsert(ctx, module.Plan{Resource: *res}, false, false, ""); err != nil { return nil, err } } diff --git a/core/write.go b/core/write.go index f18bc97a..7ed57253 100644 --- a/core/write.go +++ b/core/write.go @@ -69,7 +69,7 @@ func (s *Service) execAction(ctx context.Context, res resource.Resource, act mod planned.Resource.UpdatedAt = s.clock() } - if err := s.upsert(ctx, *planned, isCreate(act.Name)); err != nil { + if err := s.upsert(ctx, *planned, isCreate(act.Name), true, planned.Reason); err != nil { return nil, err } return &planned.Resource, nil @@ -101,7 +101,7 @@ func (s *Service) planChange(ctx context.Context, res resource.Resource, act mod return planned, nil } -func (s *Service) upsert(ctx context.Context, plan module.Plan, isCreate bool) error { +func (s *Service) upsert(ctx context.Context, plan module.Plan, isCreate bool, saveRevision bool, reason string) error { var hooks []resource.MutationHook hooks = append(hooks, func(ctx context.Context) error { if plan.Resource.State.IsTerminal() { @@ -122,7 +122,7 @@ func (s *Service) upsert(ctx context.Context, plan module.Plan, isCreate bool) e if isCreate { err = s.store.Create(ctx, plan.Resource, hooks...) } else { - err = s.store.Update(ctx, plan.Resource, hooks...) + err = s.store.Update(ctx, plan.Resource, saveRevision, reason, hooks...) } if err != nil { diff --git a/core/write_test.go b/core/write_test.go index fee6b135..5ab387cf 100644 --- a/core/write_test.go +++ b/core/write_test.go @@ -394,8 +394,8 @@ func TestService_UpdateResource(t *testing.T) { Once() resourceRepo.EXPECT(). - Update(mock.Anything, mock.Anything, mock.Anything). - Run(func(ctx context.Context, r resource.Resource, hooks ...resource.MutationHook) { + Update(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Run(func(ctx context.Context, r resource.Resource, saveRevision bool, reason string, hooks ...resource.MutationHook) { assert.Len(t, hooks, 1) assert.NoError(t, hooks[0](ctx)) }). @@ -453,9 +453,9 @@ func TestService_UpdateResource(t *testing.T) { Return(&testResource, nil).Once() resourceRepo.EXPECT(). - Update(mock.Anything, mock.Anything, mock.Anything). + Update(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(nil). - Run(func(ctx context.Context, r resource.Resource, hooks ...resource.MutationHook) { + Run(func(ctx context.Context, r resource.Resource, saveRevision bool, reason string, hooks ...resource.MutationHook) { assert.Len(t, hooks, 1) assert.NoError(t, hooks[0](ctx)) }). @@ -578,7 +578,7 @@ func TestService_DeleteResource(t *testing.T) { Once() resourceRepo.EXPECT(). - Update(mock.Anything, mock.Anything, mock.Anything). + Update(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(testErr). Once() @@ -625,7 +625,7 @@ func TestService_DeleteResource(t *testing.T) { Once() resourceRepo.EXPECT(). - Update(mock.Anything, mock.Anything, mock.Anything). + Update(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(nil). Once() @@ -780,7 +780,7 @@ func TestService_ApplyAction(t *testing.T) { }, nil). Once() resourceRepo.EXPECT(). - Update(mock.Anything, mock.Anything, mock.Anything). + Update(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(nil). Once() diff --git a/go.mod b/go.mod index 9377fd63..f9c597da 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/stretchr/testify v1.7.1 github.com/xeipuuv/gojsonschema v1.2.0 go.buf.build/odpf/gw/odpf/proton v1.1.122 - go.buf.build/odpf/gwv/odpf/proton v1.1.133 + go.buf.build/odpf/gwv/odpf/proton v1.1.172 go.opencensus.io v0.23.0 go.uber.org/zap v1.21.0 google.golang.org/grpc v1.46.2 diff --git a/go.sum b/go.sum index edaaba84..ee01b518 100644 --- a/go.sum +++ b/go.sum @@ -1409,8 +1409,8 @@ go.buf.build/odpf/gw/odpf/proton v1.1.122 h1:6NM4D8VwKIdq6F0A5nXnmxPp7LnzuwsGCeV go.buf.build/odpf/gw/odpf/proton v1.1.122/go.mod h1:FySqyI0YPPldpzXULKDcIC/bMJIdGaO6j36i1ZKJSvE= go.buf.build/odpf/gwv/envoyproxy/protoc-gen-validate v1.1.7/go.mod h1:2Tg6rYIoDhpl39Zd2+WBOF9uG4XxAOs0bK2Z2/bwTOc= go.buf.build/odpf/gwv/grpc-ecosystem/grpc-gateway v1.1.46/go.mod h1:UrBCdmHgaY/pLapYUMOq01c1yuzwT8AEBTsgpmzq2zo= -go.buf.build/odpf/gwv/odpf/proton v1.1.133 h1:lYGtd7HoAA/KtEOi9ncfx44Pdi9RTr7HyyqdqCTytRI= -go.buf.build/odpf/gwv/odpf/proton v1.1.133/go.mod h1:V6NNZKrRPHjMkIPiSXvwUHks0D8bUGPXAjXUaujG/90= +go.buf.build/odpf/gwv/odpf/proton v1.1.172 h1:cGk4ctsVhBK4d6mV+QVrJD0rWkXtDO+ogCA8l3BCkhk= +go.buf.build/odpf/gwv/odpf/proton v1.1.172/go.mod h1:V6NNZKrRPHjMkIPiSXvwUHks0D8bUGPXAjXUaujG/90= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= diff --git a/internal/server/v1/resources/mappers.go b/internal/server/v1/resources/mappers.go index 04d399d9..8bbcccd9 100644 --- a/internal/server/v1/resources/mappers.go +++ b/internal/server/v1/resources/mappers.go @@ -138,6 +138,7 @@ func revisionToProto(revision resource.Revision) (*entropyv1beta1.ResourceRevisi return &entropyv1beta1.ResourceRevision{ Id: strconv.FormatInt(revision.ID, decimalBase), Urn: revision.URN, + Reason: revision.Reason, Labels: revision.Labels, CreatedAt: timestamppb.New(revision.CreatedAt), Spec: spec, diff --git a/internal/store/postgres/resource_store.go b/internal/store/postgres/resource_store.go index 7fe41f0e..04bb6e2f 100644 --- a/internal/store/postgres/resource_store.go +++ b/internal/store/postgres/resource_store.go @@ -114,11 +114,11 @@ func (st *Store) Create(ctx context.Context, r resource.Resource, hooks ...resou return translateErr(err) } - // TODO: Add labels for revisions rev := resource.Revision{ URN: r.URN, Spec: r.Spec, - Labels: map[string]string{}, + Labels: r.Labels, + Reason: "resource created", } if err := insertRevision(ctx, tx, rev); err != nil { @@ -135,7 +135,7 @@ func (st *Store) Create(ctx context.Context, r resource.Resource, hooks ...resou return nil } -func (st *Store) Update(ctx context.Context, r resource.Resource, hooks ...resource.MutationHook) error { +func (st *Store) Update(ctx context.Context, r resource.Resource, saveRevision bool, reason string, hooks ...resource.MutationHook) error { updateResource := func(ctx context.Context, tx *sqlx.Tx) error { id, err := translateURNToID(ctx, tx, r.URN) if err != nil { @@ -165,15 +165,17 @@ func (st *Store) Update(ctx context.Context, r resource.Resource, hooks ...resou return err } - // TODO: Add labels for revisions - rev := resource.Revision{ - URN: r.URN, - Spec: r.Spec, - Labels: map[string]string{}, - } + if saveRevision { + rev := resource.Revision{ + URN: r.URN, + Spec: r.Spec, + Labels: r.Labels, + Reason: reason, + } - if err := insertRevision(ctx, tx, rev); err != nil { - return translateErr(err) + if err := insertRevision(ctx, tx, rev); err != nil { + return translateErr(err) + } } return runAllHooks(ctx, hooks) diff --git a/internal/store/postgres/revision_model.go b/internal/store/postgres/revision_model.go index f9a51c9b..785727ae 100644 --- a/internal/store/postgres/revision_model.go +++ b/internal/store/postgres/revision_model.go @@ -14,12 +14,13 @@ import ( type revisionModel struct { ID int64 `db:"id"` URN string `db:"urn"` + Reason string `db:"reason"` CreatedAt time.Time `db:"created_at"` SpecConfigs []byte `db:"spec_configs"` } func readRevisionRecord(ctx context.Context, r sqlx.QueryerContext, id int64, into *revisionModel) error { - cols := []string{"id", "urn", "created_at", "spec_configs"} + cols := []string{"id", "urn", "reason", "created_at", "spec_configs"} builder := sq.Select(cols...).From(tableRevisions).Where(sq.Eq{"id": id}) query, args, err := builder.PlaceholderFormat(sq.Dollar).ToSql() diff --git a/internal/store/postgres/revision_store.go b/internal/store/postgres/revision_store.go index cd0ace42..7b08ba1e 100644 --- a/internal/store/postgres/revision_store.go +++ b/internal/store/postgres/revision_store.go @@ -75,6 +75,7 @@ func (st *Store) getRevisionByID(ctx context.Context, id int64) (*resource.Revis return &resource.Revision{ ID: rec.ID, URN: rec.URN, + Reason: rec.Reason, Labels: tagsToLabelMap(tags), CreatedAt: rec.CreatedAt, Spec: resource.Spec{ @@ -98,8 +99,8 @@ func insertRevision(ctx context.Context, tx *sqlx.Tx, rev resource.Revision) err func insertRevisionRecord(ctx context.Context, runner sq.BaseRunner, r resource.Revision) (int64, error) { q := sq.Insert(tableRevisions). - Columns("urn", "spec_configs"). - Values(r.URN, r.Spec.Configs). + Columns("urn", "reason", "spec_configs"). + Values(r.URN, r.Reason, r.Spec.Configs). Suffix(`RETURNING "id"`). PlaceholderFormat(sq.Dollar) diff --git a/internal/store/postgres/schema.sql b/internal/store/postgres/schema.sql index 6995a1d3..6f7c6f06 100644 --- a/internal/store/postgres/schema.sql +++ b/internal/store/postgres/schema.sql @@ -65,4 +65,5 @@ CREATE TABLE IF NOT EXISTS modules ( updated_at timestamp with time zone NOT NULL DEFAULT current_timestamp ); -CREATE INDEX IF NOT EXISTS idx_modules_project ON modules (project); \ No newline at end of file +CREATE INDEX IF NOT EXISTS idx_modules_project ON modules (project); +ALTER TABLE revisions ADD COLUMN IF NOT EXISTS reason TEXT DEFAULT '' NOT NULL; diff --git a/modules/firehose/plan.go b/modules/firehose/plan.go index 400e478c..20a2b7e6 100644 --- a/modules/firehose/plan.go +++ b/modules/firehose/plan.go @@ -44,6 +44,7 @@ func (*firehoseModule) planCreate(res module.ExpandedResource, act module.Action if reqConf.StopTime != nil { plan.ScheduleRunAt = *reqConf.StopTime } + plan.Reason = "firehose created" return &plan, nil } @@ -70,6 +71,7 @@ func (*firehoseModule) planChange(res module.ExpandedResource, act module.Action if conf.StopTime != nil { plan.ScheduleRunAt = *conf.StopTime } + plan.Reason = "firehose config updated" case ScaleAction: var scaleParams struct { @@ -79,17 +81,21 @@ func (*firehoseModule) planChange(res module.ExpandedResource, act module.Action return nil, errors.ErrInvalid.WithMsgf("invalid config json: %v", err) } conf.Firehose.Replicas = scaleParams.Replicas + plan.Reason = "firehose scaled" case StartAction: conf.State = stateRunning + plan.Reason = "firehose started" case StopAction: conf.State = stateStopped + plan.Reason = "firehose stopped" } r.Spec.Configs = conf.JSON() r.State = resource.State{ Status: resource.StatusPending, + Output: res.State.Output, ModuleData: moduleData{ PendingSteps: []string{releaseUpdate}, }.JSON(), @@ -125,6 +131,7 @@ func (*firehoseModule) planReset(res module.ExpandedResource, act module.ActionR r.Spec.Configs = conf.JSON() r.State = resource.State{ Status: resource.StatusPending, + Output: res.State.Output, ModuleData: moduleData{ PendingSteps: []string{releaseUpdate, consumerReset, releaseUpdate}, ResetTo: resetTo, @@ -132,5 +139,5 @@ func (*firehoseModule) planReset(res module.ExpandedResource, act module.ActionR }.JSON(), } - return &module.Plan{Resource: r}, nil + return &module.Plan{Resource: r, Reason: "firehose consumer reset"}, nil } diff --git a/modules/firehose/plan_test.go b/modules/firehose/plan_test.go index 04e4d5be..e730b83f 100644 --- a/modules/firehose/plan_test.go +++ b/modules/firehose/plan_test.go @@ -64,6 +64,7 @@ func TestFirehoseModule_Plan(t *testing.T) { ModuleData: []byte(`{"pending_steps":["release_create"]}`), }, }, + Reason: "firehose created", }, }, { @@ -96,6 +97,7 @@ func TestFirehoseModule_Plan(t *testing.T) { ModuleData: []byte(`{"pending_steps":["release_update"]}`), }, }, + Reason: "firehose scaled", }, }, { @@ -119,6 +121,7 @@ func TestFirehoseModule_Plan(t *testing.T) { ModuleData: []byte(`{"pending_steps":["release_update","consumer_reset","release_update"],"reset_to":"2022-06-22T00:00:00+00:00","state_override":"STOPPED"}`), }, }, + Reason: "firehose consumer reset", }, }, { @@ -143,6 +146,7 @@ func TestFirehoseModule_Plan(t *testing.T) { }, }, ScheduleRunAt: parseTime("3022-07-13T00:40:14.028016Z"), + Reason: "firehose created", }, }, } diff --git a/modules/firehose/schema/config.json b/modules/firehose/schema/config.json index 23765598..98613e97 100644 --- a/modules/firehose/schema/config.json +++ b/modules/firehose/schema/config.json @@ -23,7 +23,8 @@ "properties": { "replicas": { "type": "number", - "default": 1 + "default": 1, + "minimum": 1 }, "kafka_broker_address": { "type": "string" diff --git a/modules/firehose/schema/scale.json b/modules/firehose/schema/scale.json index 5c684f64..f1ee90ff 100644 --- a/modules/firehose/schema/scale.json +++ b/modules/firehose/schema/scale.json @@ -4,7 +4,8 @@ "type": "object", "properties": { "replicas": { - "type": "number" + "type": "number", + "minimum": 1 } }, "required": ["replicas"] diff --git a/modules/kubernetes/kubernetes.go b/modules/kubernetes/kubernetes.go index 4cc1f8c8..733899b9 100644 --- a/modules/kubernetes/kubernetes.go +++ b/modules/kubernetes/kubernetes.go @@ -56,7 +56,7 @@ func (m *kubeModule) Plan(ctx context.Context, res module.ExpandedResource, act Status: resource.StatusCompleted, Output: output, } - return &module.Plan{Resource: res.Resource}, nil + return &module.Plan{Resource: res.Resource, Reason: "kubernetes cluster details updated"}, nil } func (*kubeModule) Sync(_ context.Context, res module.ExpandedResource) (*resource.State, error) {