diff --git a/internal/server/v1/firehose/mappings.go b/internal/server/v1/firehose/mappings.go index e61c8c3..f8fa0b6 100644 --- a/internal/server/v1/firehose/mappings.go +++ b/internal/server/v1/firehose/mappings.go @@ -33,7 +33,7 @@ type firehoseLabels struct { type moduleConfig struct { State string `json:"state"` - StopTime time.Time `json:"stop_time"` + StopTime *time.Time `json:"stop_time"` Telegraf map[string]interface{} `json:"telegraf"` Firehose moduleConfigFirehoseDef `json:"firehose"` } @@ -110,25 +110,33 @@ func makeLabelsMap(def models.Firehose) map[string]string { } func makeConfigStruct(cfg *models.FirehoseConfig, prj *shieldv1beta1.Project) (*structpb.Value, error) { - if cfg.BootstrapServers == nil { + switch { + case cfg.BootstrapServers == nil: return nil, errors.ErrInvalid.WithMsgf("bootstrap_servers must be set") - } else if cfg.TopicName == nil { + + case cfg.TopicName == nil: return nil, errors.ErrInvalid.WithMsgf("topic_name must be set") - } else if cfg.ConsumerGroupID == nil { + + case cfg.ConsumerGroupID == nil: return nil, errors.ErrInvalid.WithMsgf("consumer_group_id must be set") + + case cfg.SinkType == nil: + return nil, errors.ErrInvalid.WithMsgf("sink_type must be set") + + case cfg.StreamName == nil: + return nil, errors.ErrInvalid.WithMsgf("stream_name must be set") + + case cfg.InputSchemaProtoClass == nil: + return nil, errors.ErrInvalid.WithMsgf("input_schema_proto_class must be set") } - var stopAt time.Time + var stopAt *time.Time if cfg.StopDate != "" { - var err error - stopAt, err = time.Parse(time.RFC3339, cfg.StopDate) + t, err := time.Parse(time.RFC3339, cfg.StopDate) if err != nil { return nil, errors.ErrInvalid.WithMsgf("stop date must be valid RFC3339 timestamp") } - } else { - // TODO: (hack) entropy has invalid check. - const day = 24 * time.Hour - stopAt = time.Now().Add(30 * day) + stopAt = &t } if cfg.Replicas == nil { @@ -147,6 +155,10 @@ func makeConfigStruct(cfg *models.FirehoseConfig, prj *shieldv1beta1.Project) (* } } + cfg.EnvVars["SINK_TYPE"] = string(*cfg.SinkType) + cfg.EnvVars["STREAM_NAME"] = *cfg.StreamName + cfg.EnvVars["INPUT_SCHEMA_PROTO_CLASS"] = *cfg.InputSchemaProtoClass + return utils.GoValToProtoStruct(moduleConfig{ State: "RUNNING", StopTime: stopAt,