Skip to content

Commit

Permalink
fix: stop-time validation issue (#31)
Browse files Browse the repository at this point in the history
* fix: stop-time validation issue

* fix: sink_type check
  • Loading branch information
spy16 authored Mar 1, 2023
1 parent 0f62fe6 commit f527729
Showing 1 changed file with 23 additions and 11 deletions.
34 changes: 23 additions & 11 deletions internal/server/v1/firehose/mappings.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type firehoseLabels struct {

type moduleConfig struct {
State string `json:"state"`
StopTime time.Time `json:"stop_time"`
StopTime *time.Time `json:"stop_time"`
Telegraf map[string]interface{} `json:"telegraf"`
Firehose moduleConfigFirehoseDef `json:"firehose"`
}
Expand Down Expand Up @@ -110,25 +110,33 @@ func makeLabelsMap(def models.Firehose) map[string]string {
}

func makeConfigStruct(cfg *models.FirehoseConfig, prj *shieldv1beta1.Project) (*structpb.Value, error) {
if cfg.BootstrapServers == nil {
switch {
case cfg.BootstrapServers == nil:
return nil, errors.ErrInvalid.WithMsgf("bootstrap_servers must be set")
} else if cfg.TopicName == nil {

case cfg.TopicName == nil:
return nil, errors.ErrInvalid.WithMsgf("topic_name must be set")
} else if cfg.ConsumerGroupID == nil {

case cfg.ConsumerGroupID == nil:
return nil, errors.ErrInvalid.WithMsgf("consumer_group_id must be set")

case cfg.SinkType == nil:
return nil, errors.ErrInvalid.WithMsgf("sink_type must be set")

case cfg.StreamName == nil:
return nil, errors.ErrInvalid.WithMsgf("stream_name must be set")

case cfg.InputSchemaProtoClass == nil:
return nil, errors.ErrInvalid.WithMsgf("input_schema_proto_class must be set")
}

var stopAt time.Time
var stopAt *time.Time
if cfg.StopDate != "" {
var err error
stopAt, err = time.Parse(time.RFC3339, cfg.StopDate)
t, err := time.Parse(time.RFC3339, cfg.StopDate)
if err != nil {
return nil, errors.ErrInvalid.WithMsgf("stop date must be valid RFC3339 timestamp")
}
} else {
// TODO: (hack) entropy has invalid check.
const day = 24 * time.Hour
stopAt = time.Now().Add(30 * day)
stopAt = &t
}

if cfg.Replicas == nil {
Expand All @@ -147,6 +155,10 @@ func makeConfigStruct(cfg *models.FirehoseConfig, prj *shieldv1beta1.Project) (*
}
}

cfg.EnvVars["SINK_TYPE"] = string(*cfg.SinkType)
cfg.EnvVars["STREAM_NAME"] = *cfg.StreamName
cfg.EnvVars["INPUT_SCHEMA_PROTO_CLASS"] = *cfg.InputSchemaProtoClass

return utils.GoValToProtoStruct(moduleConfig{
State: "RUNNING",
StopTime: stopAt,
Expand Down

0 comments on commit f527729

Please sign in to comment.