diff --git a/cli/utils.go b/cli/utils.go index 2240dec8..67025e10 100644 --- a/cli/utils.go +++ b/cli/utils.go @@ -3,7 +3,6 @@ package cli import ( "errors" "fmt" - "io/ioutil" "os" "path/filepath" @@ -22,7 +21,7 @@ const ( type RunEFunc func(cmd *cobra.Command, args []string) error func parseFile(filePath string, v protoreflect.ProtoMessage) error { - b, err := ioutil.ReadFile(filePath) + b, err := os.ReadFile(filePath) if err != nil { return err } diff --git a/docs/modules/firehose.md b/docs/modules/firehose.md index ee4f1b52..4e981e8a 100644 --- a/docs/modules/firehose.md +++ b/docs/modules/firehose.md @@ -18,7 +18,6 @@ The configuration struct for Firehose module looks like: ``` type moduleConfig struct { State string `json:"state"` - ChartVersion string `json:"chart_version"` Firehose struct { Replicas int `json:"replicas"` KafkaBrokerAddress string `json:"kafka_broker_address"` diff --git a/modules/firehose/config.go b/modules/firehose/config.go index aa05d474..0dc09ca3 100644 --- a/modules/firehose/config.go +++ b/modules/firehose/config.go @@ -11,16 +11,6 @@ import ( "github.com/odpf/entropy/pkg/helm" ) -const ( - defaultNamespace = "firehose" - defaultChartString = "firehose" - defaultVersionString = "0.1.1" - defaultRepositoryString = "https://odpf.github.io/charts/" - defaultImagePullPolicy = "IfNotPresent" - defaultImageRepository = "odpf/firehose" - defaultImageTag = "latest" -) - var ( //go:embed schema/config.json completeConfigSchema string @@ -33,11 +23,10 @@ var ( ) type moduleConfig struct { - State string `json:"state"` - ChartVersion string `json:"chart_version"` - StopTime *time.Time `json:"stop_time"` - Telegraf map[string]interface{} `json:"telegraf"` - Firehose struct { + State string `json:"state"` + StopTime *time.Time `json:"stop_time"` + Telegraf map[string]interface{} `json:"telegraf"` + Firehose struct { Replicas int `json:"replicas"` KafkaBrokerAddress string `json:"kafka_broker_address"` KafkaTopic string `json:"kafka_topic"` @@ -46,25 +35,29 @@ type moduleConfig struct { } `json:"firehose"` } -func (mc *moduleConfig) sanitiseAndValidate() error { +func (mc *moduleConfig) validate() error { if mc.StopTime != nil && mc.StopTime.Before(time.Now()) { return errors.ErrInvalid. WithMsgf("value for stop_time must be greater than current time") } - if mc.ChartVersion == "" { - mc.ChartVersion = defaultVersionString - } return nil } -func (mc moduleConfig) GetHelmReleaseConfig(r resource.Resource) *helm.ReleaseConfig { +func (mc moduleConfig) GetHelmReleaseConfig(r resource.Resource) (*helm.ReleaseConfig, error) { + var output Output + err := json.Unmarshal(r.State.Output, &output) + if err != nil { + return nil, errors.ErrInvalid.WithMsgf("invalid output json: %v", err) + } + defaults := output.Defaults + rc := helm.DefaultReleaseConfig() rc.Name = fmt.Sprintf("%s-%s-firehose", r.Project, r.Name) - rc.Repository = defaultRepositoryString - rc.Chart = defaultChartString - rc.Namespace = defaultNamespace + rc.Repository = defaults.ChartRepository + rc.Chart = defaults.ChartName + rc.Namespace = defaults.Namespace rc.ForceUpdate = true - rc.Version = mc.ChartVersion + rc.Version = defaults.ChartVersion fc := mc.Firehose fc.EnvVariables["SOURCE_KAFKA_BROKERS"] = fc.KafkaBrokerAddress @@ -75,9 +68,9 @@ func (mc moduleConfig) GetHelmReleaseConfig(r resource.Resource) *helm.ReleaseCo "replicaCount": mc.Firehose.Replicas, "firehose": map[string]interface{}{ "image": map[string]interface{}{ - "repository": defaultImageRepository, - "pullPolicy": defaultImagePullPolicy, - "tag": defaultImageTag, + "repository": defaults.ImageRepository, + "pullPolicy": defaults.ImagePullPolicy, + "tag": defaults.ImageTag, }, "config": fc.EnvVariables, }, @@ -87,7 +80,7 @@ func (mc moduleConfig) GetHelmReleaseConfig(r resource.Resource) *helm.ReleaseCo } rc.Values = hv - return rc + return rc, nil } func (mc moduleConfig) JSON() []byte { diff --git a/modules/firehose/log.go b/modules/firehose/log.go index a96627c6..921158ae 100644 --- a/modules/firehose/log.go +++ b/modules/firehose/log.go @@ -26,10 +26,16 @@ func (*firehoseModule) Log(ctx context.Context, res module.ExpandedResource, fil if filter == nil { filter = make(map[string]string) } - filter["app"] = conf.GetHelmReleaseConfig(r).Name + + hc, err := conf.GetHelmReleaseConfig(r) + if err != nil { + return nil, err + } + + filter["app"] = hc.Name kubeCl := kube.NewClient(kubeOut.Configs) - logs, err := kubeCl.StreamLogs(ctx, defaultNamespace, filter) + logs, err := kubeCl.StreamLogs(ctx, hc.Namespace, filter) if err != nil { return nil, err } diff --git a/modules/firehose/module.go b/modules/firehose/module.go index 4b8915d8..7c9a8385 100644 --- a/modules/firehose/module.go +++ b/modules/firehose/module.go @@ -9,10 +9,11 @@ import ( ) const ( - StopAction = "stop" - StartAction = "start" - ScaleAction = "scale" - ResetAction = "reset" + StopAction = "stop" + StartAction = "start" + ScaleAction = "scale" + ResetAction = "reset" + UpgradeAction = "upgrade" ) const ( @@ -68,10 +69,47 @@ var Module = module.Descriptor{ Description: "Reset firehose kafka consumer group to given timestamp", ParamSchema: resetActionSchema, }, + { + Name: UpgradeAction, + Description: "Upgrade firehose to current stable version", + }, }, DriverFactory: func(conf json.RawMessage) (module.Driver, error) { - return &firehoseModule{}, nil + fm := firehoseModuleWithDefaultConfigs() + err := json.Unmarshal(conf, fm) + if err != nil { + return nil, err + } + return fm, nil }, } -type firehoseModule struct{} +type firehoseModule struct { + Config config `json:"config"` +} + +type config struct { + ChartRepository string `json:"chart_repository,omitempty"` + ChartName string `json:"chart_name,omitempty"` + ChartVersion string `json:"chart_version,omitempty"` + ImageRepository string `json:"image_repository,omitempty"` + ImageName string `json:"image_name,omitempty"` + ImageTag string `json:"image_tag,omitempty"` + Namespace string `json:"namespace,omitempty"` + ImagePullPolicy string `json:"image_pull_policy,omitempty"` +} + +func firehoseModuleWithDefaultConfigs() *firehoseModule { + return &firehoseModule{ + config{ + ChartRepository: "https://odpf.github.io/charts/", + ChartName: "firehose", + ChartVersion: "0.1.3", + ImageRepository: "odpf/firehose", + ImageName: "firehose", + ImageTag: "latest", + Namespace: "firehose", + ImagePullPolicy: "IfNotPresent", + }, + } +} diff --git a/modules/firehose/output.go b/modules/firehose/output.go index 773c39fc..2dcb5f37 100644 --- a/modules/firehose/output.go +++ b/modules/firehose/output.go @@ -11,9 +11,10 @@ import ( ) type Output struct { - Namespace string `json:"namespace"` - ReleaseName string `json:"release_name"` - Pods []kube.Pod `json:"pods"` + Namespace string `json:"namespace,omitempty"` + ReleaseName string `json:"release_name,omitempty"` + Pods []kube.Pod `json:"pods,omitempty"` + Defaults config `json:"defaults,omitempty"` } func (out Output) JSON() []byte { @@ -30,15 +31,26 @@ func (m *firehoseModule) Output(ctx context.Context, res module.ExpandedResource return nil, errors.ErrInvalid.WithMsgf("invalid config json: %v", err) } + var output Output + if err := json.Unmarshal(res.Resource.State.Output, &output); err != nil { + return nil, errors.ErrInvalid.WithMsgf("invalid output json: %v", err) + } + pods, err := m.podDetails(ctx, res) if err != nil { return nil, err } + hc, err := conf.GetHelmReleaseConfig(res.Resource) + if err != nil { + return nil, err + } + return Output{ - Namespace: conf.GetHelmReleaseConfig(res.Resource).Namespace, - ReleaseName: conf.GetHelmReleaseConfig(res.Resource).Name, + Namespace: hc.Namespace, + ReleaseName: hc.Name, Pods: pods, + Defaults: output.Defaults, }.JSON(), nil } @@ -55,6 +67,11 @@ func (*firehoseModule) podDetails(ctx context.Context, res module.ExpandedResour return nil, err } + hc, err := conf.GetHelmReleaseConfig(r) + if err != nil { + return nil, err + } + kubeCl := kube.NewClient(kubeOut.Configs) - return kubeCl.GetPodDetails(ctx, defaultNamespace, map[string]string{"app": conf.GetHelmReleaseConfig(r).Name}) + return kubeCl.GetPodDetails(ctx, hc.Namespace, map[string]string{"app": hc.Name}) } diff --git a/modules/firehose/plan.go b/modules/firehose/plan.go index 20a2b7e6..49fc74a3 100644 --- a/modules/firehose/plan.go +++ b/modules/firehose/plan.go @@ -20,7 +20,7 @@ func (m *firehoseModule) Plan(_ context.Context, res module.ExpandedResource, ac } } -func (*firehoseModule) planCreate(res module.ExpandedResource, act module.ActionRequest) (*module.Plan, error) { +func (m *firehoseModule) planCreate(res module.ExpandedResource, act module.ActionRequest) (*module.Plan, error) { var plan module.Plan r := res.Resource @@ -28,16 +28,21 @@ func (*firehoseModule) planCreate(res module.ExpandedResource, act module.Action if err := json.Unmarshal(act.Params, &reqConf); err != nil { return nil, errors.ErrInvalid.WithMsgf("invalid config json: %v", err) } - if err := reqConf.sanitiseAndValidate(); err != nil { + if err := reqConf.validate(); err != nil { return nil, err } + output := Output{ + Defaults: m.Config, + }.JSON() + r.Spec.Configs = reqConf.JSON() r.State = resource.State{ Status: resource.StatusPending, ModuleData: moduleData{ PendingSteps: []string{releaseCreate}, }.JSON(), + Output: output, } plan.Resource = r @@ -48,7 +53,7 @@ func (*firehoseModule) planCreate(res module.ExpandedResource, act module.Action return &plan, nil } -func (*firehoseModule) planChange(res module.ExpandedResource, act module.ActionRequest) (*module.Plan, error) { +func (m *firehoseModule) planChange(res module.ExpandedResource, act module.ActionRequest) (*module.Plan, error) { var plan module.Plan r := res.Resource @@ -63,7 +68,7 @@ func (*firehoseModule) planChange(res module.ExpandedResource, act module.Action if err := json.Unmarshal(act.Params, &reqConf); err != nil { return nil, errors.ErrInvalid.WithMsgf("invalid config json: %v", err) } - if err := reqConf.sanitiseAndValidate(); err != nil { + if err := reqConf.validate(); err != nil { return nil, err } conf = reqConf @@ -90,6 +95,18 @@ func (*firehoseModule) planChange(res module.ExpandedResource, act module.Action case StopAction: conf.State = stateStopped plan.Reason = "firehose stopped" + + case UpgradeAction: + var output Output + err := json.Unmarshal(res.State.Output, &output) + if err != nil { + return nil, errors.ErrInvalid.WithMsgf("invalid output json: %v", err) + } + + output.Defaults = m.Config + res.State.Output = output.JSON() + + plan.Reason = "firehose upgraded" } r.Spec.Configs = conf.JSON() diff --git a/modules/firehose/plan_test.go b/modules/firehose/plan_test.go index e730b83f..0f215ac1 100644 --- a/modules/firehose/plan_test.go +++ b/modules/firehose/plan_test.go @@ -22,7 +22,7 @@ func TestFirehoseModule_Plan(t *testing.T) { Name: "test", Project: "demo", Spec: resource.Spec{ - Configs: []byte(`{"state":"RUNNING","chart_version":"0.1.1","firehose":{"replicas":1,"kafka_broker_address":"localhost:9092","kafka_topic":"test-topic","kafka_consumer_id":"test-consumer-id","env_variables":{}}}`), + Configs: []byte(`{"state":"RUNNING","firehose":{"replicas":1,"kafka_broker_address":"localhost:9092","kafka_topic":"test-topic","kafka_consumer_id":"test-consumer-id","env_variables":{}}}`), }, State: resource.State{}, } @@ -57,11 +57,12 @@ func TestFirehoseModule_Plan(t *testing.T) { Name: "test", Project: "demo", Spec: resource.Spec{ - Configs: []byte(`{"state":"RUNNING","chart_version":"0.1.1","stop_time":null,"telegraf":null,"firehose":{"replicas":1,"kafka_broker_address":"localhost:9092","kafka_topic":"test-topic","kafka_consumer_id":"test-consumer-id","env_variables":{}}}`), + Configs: []byte(`{"state":"RUNNING","stop_time":null,"telegraf":null,"firehose":{"replicas":1,"kafka_broker_address":"localhost:9092","kafka_topic":"test-topic","kafka_consumer_id":"test-consumer-id","env_variables":{}}}`), }, State: resource.State{ Status: resource.StatusPending, ModuleData: []byte(`{"pending_steps":["release_create"]}`), + Output: []byte(`{"defaults":{}}`), }, }, Reason: "firehose created", @@ -90,7 +91,7 @@ func TestFirehoseModule_Plan(t *testing.T) { Name: "test", Project: "demo", Spec: resource.Spec{ - Configs: []byte(`{"state":"RUNNING","chart_version":"0.1.1","stop_time":null,"telegraf":null,"firehose":{"replicas":5,"kafka_broker_address":"localhost:9092","kafka_topic":"test-topic","kafka_consumer_id":"test-consumer-id","env_variables":{}}}`), + Configs: []byte(`{"state":"RUNNING","stop_time":null,"telegraf":null,"firehose":{"replicas":5,"kafka_broker_address":"localhost:9092","kafka_topic":"test-topic","kafka_consumer_id":"test-consumer-id","env_variables":{}}}`), }, State: resource.State{ Status: resource.StatusPending, @@ -114,7 +115,7 @@ func TestFirehoseModule_Plan(t *testing.T) { Name: "test", Project: "demo", Spec: resource.Spec{ - Configs: []byte(`{"state":"RUNNING","chart_version":"0.1.1","stop_time":null,"telegraf":null,"firehose":{"replicas":1,"kafka_broker_address":"localhost:9092","kafka_topic":"test-topic","kafka_consumer_id":"test-consumer-id","env_variables":{}}}`), + Configs: []byte(`{"state":"RUNNING","stop_time":null,"telegraf":null,"firehose":{"replicas":1,"kafka_broker_address":"localhost:9092","kafka_topic":"test-topic","kafka_consumer_id":"test-consumer-id","env_variables":{}}}`), }, State: resource.State{ Status: resource.StatusPending, @@ -138,11 +139,12 @@ func TestFirehoseModule_Plan(t *testing.T) { Name: "test", Project: "demo", Spec: resource.Spec{ - Configs: []byte(`{"state":"RUNNING","chart_version":"0.1.1","stop_time":"3022-07-13T00:40:14.028016Z","telegraf":null,"firehose":{"replicas":1,"kafka_broker_address":"localhost:9092","kafka_topic":"test-topic","kafka_consumer_id":"test-consumer-id","env_variables":{}}}`), + Configs: []byte(`{"state":"RUNNING","stop_time":"3022-07-13T00:40:14.028016Z","telegraf":null,"firehose":{"replicas":1,"kafka_broker_address":"localhost:9092","kafka_topic":"test-topic","kafka_consumer_id":"test-consumer-id","env_variables":{}}}`), }, State: resource.State{ Status: resource.StatusPending, ModuleData: []byte(`{"pending_steps":["release_create"]}`), + Output: []byte(`{"defaults":{}}`), }, }, ScheduleRunAt: parseTime("3022-07-13T00:40:14.028016Z"), diff --git a/modules/firehose/schema/config.json b/modules/firehose/schema/config.json index 98613e97..9b11611c 100644 --- a/modules/firehose/schema/config.json +++ b/modules/firehose/schema/config.json @@ -11,9 +11,6 @@ ], "default": "RUNNING" }, - "chart_version": { - "type": "string" - }, "stop_time": { "type": "string", "format": "date-time" diff --git a/modules/firehose/sync.go b/modules/firehose/sync.go index 5abd220f..dd2178e7 100644 --- a/modules/firehose/sync.go +++ b/modules/firehose/sync.go @@ -59,10 +59,9 @@ func (m *firehoseModule) Sync(ctx context.Context, res module.ExpandedResource) } case consumerReset: if err := m.consumerReset(ctx, - conf.Firehose.KafkaBrokerAddress, - conf.Firehose.KafkaConsumerID, + conf, + r, data.ResetTo, - conf.GetHelmReleaseConfig(r).Namespace, kubeOut); err != nil { return nil, err } @@ -97,27 +96,36 @@ func (*firehoseModule) releaseSync(isCreate bool, conf moduleConfig, r resource. conf.Firehose.Replicas = 0 } + hc, err := conf.GetHelmReleaseConfig(r) + if err != nil { + return err + } + var helmErr error if isCreate { - _, helmErr = helmCl.Create(conf.GetHelmReleaseConfig(r)) + _, helmErr = helmCl.Create(hc) } else { - _, helmErr = helmCl.Update(conf.GetHelmReleaseConfig(r)) + _, helmErr = helmCl.Update(hc) } return helmErr } -func (*firehoseModule) consumerReset(ctx context.Context, brokers string, consumerID string, resetTo string, namespace string, out kubernetes.Output) error { - cgm := kafka.NewConsumerGroupManager(brokers, kube.NewClient(out.Configs), namespace) +func (*firehoseModule) consumerReset(ctx context.Context, conf moduleConfig, r resource.Resource, resetTo string, out kubernetes.Output) error { + releaseConfig, err := conf.GetHelmReleaseConfig(r) + if err != nil { + return err + } + + cgm := kafka.NewConsumerGroupManager(conf.Firehose.KafkaBrokerAddress, kube.NewClient(out.Configs), releaseConfig.Namespace) - var err error switch resetTo { case ResetToEarliest: - err = cgm.ResetOffsetToEarliest(ctx, consumerID) + err = cgm.ResetOffsetToEarliest(ctx, conf.Firehose.KafkaConsumerID) case ResetToLatest: - err = cgm.ResetOffsetToLatest(ctx, consumerID) + err = cgm.ResetOffsetToLatest(ctx, conf.Firehose.KafkaConsumerID) default: - err = cgm.ResetOffsetToDatetime(ctx, consumerID, resetTo) + err = cgm.ResetOffsetToDatetime(ctx, conf.Firehose.KafkaConsumerID, resetTo) } return handleErr(err)