From b1c87f48264f26c5b21f49705de88edbc68d379b Mon Sep 17 00:00:00 2001 From: Lifosmin Simon Date: Thu, 12 Oct 2023 14:40:23 +0700 Subject: [PATCH] chore: fix mapping env vars --- internal/server/v1/dlq/mapper.go | 84 +++++++++++++++++++++++++------- 1 file changed, 66 insertions(+), 18 deletions(-) diff --git a/internal/server/v1/dlq/mapper.go b/internal/server/v1/dlq/mapper.go index f73577b..65e28b6 100644 --- a/internal/server/v1/dlq/mapper.go +++ b/internal/server/v1/dlq/mapper.go @@ -1,6 +1,7 @@ package dlq import ( + "errors" "fmt" "strconv" @@ -13,14 +14,71 @@ import ( "github.com/goto/dex/internal/server/utils" ) -func enrichDlqJob(job *models.DlqJob, f models.Firehose, cfg *DlqJobConfig) { - job.ResourceID = f.Urn - job.Namespace = "" // TBA - job.KubeCluster = *f.Configs.KubeCluster +func enrichDlqJob(job *models.DlqJob, res *entropyv1beta1.Resource, cfg *DlqJobConfig) error { + var kubeCluster string + for _, dep := range res.Spec.GetDependencies() { + if dep.GetKey() == "kube_cluster" { + kubeCluster = dep.GetValue() + } + } + var modConf entropy.FirehoseConfig + if err := utils.ProtoStructToGoVal(res.Spec.GetConfigs(), &modConf); err != nil { + return err + } + + outputMap := res.GetState().GetOutput().GetStructValue().AsMap() + namespaceAny, exists := outputMap["namespace"] + if !exists { + return errors.New("could not find firehose namespace from resource output") + } + namespace, ok := namespaceAny.(string) + if !ok { + return errors.New("firehose namespace format is invalid") + } + + envs := modConf.EnvVariables + job.ResourceID = res.GetUrn() + job.Namespace = namespace + job.KubeCluster = kubeCluster job.ContainerImage = cfg.DlqJobImage job.PrometheusHost = cfg.PrometheusHost - job.EnvVars = f.Configs.EnvVars - job.DlqGcsCredentialPath = f.Configs.EnvVars["DLQ_GCS_CREDENTIAL_PATH"] + job.EnvVars = map[string]string{ + "DLQ_BATCH_SIZE": fmt.Sprintf("%d", job.BatchSize), + "DLQ_NUM_THREADS": fmt.Sprintf("%d", job.NumThreads), + "DLQ_ERROR_TYPES": job.ErrorTypes, + "DLQ_INPUT_DATE": job.Date, + "DLQ_TOPIC_NAME": job.Topic, + "METRIC_STATSD_TAGS": "a=b", // TBA + "SINK_TYPE": envs["SINK_TYPE"], + "DLQ_PREFIX_DIR": "test-firehose", + "DLQ_FINISHED_STATUS_FILE": "/shared/job-finished", + "DLQ_GCS_BUCKET_NAME": envs["DLQ_GCS_BUCKET_NAME"], + "DLQ_GCS_CREDENTIAL_PATH": envs["DLQ_GCS_CREDENTIAL_PATH"], + "DLQ_GCS_GOOGLE_CLOUD_PROJECT_ID": envs["DLQ_GCS_GOOGLE_CLOUD_PROJECT_ID"], + "JAVA_TOOL_OPTIONS": envs["JAVA_TOOL_OPTIONS"], + "_JAVA_OPTIONS": envs["_JAVA_OPTIONS"], + "INPUT_SCHEMA_PROTO_CLASS": envs["INPUT_SCHEMA_PROTO_CLASS"], + "SCHEMA_REGISTRY_STENCIL_ENABLE": envs["SCHEMA_REGISTRY_STENCIL_ENABLE"], + "SCHEMA_REGISTRY_STENCIL_URLS": envs["SCHEMA_REGISTRY_STENCIL_URLS"], + "SINK_BIGQUERY_ADD_METADATA_ENABLED": envs["SINK_BIGQUERY_ADD_METADATA_ENABLED"], + "SINK_BIGQUERY_CLIENT_CONNECT_TIMEOUT_MS": envs["SINK_BIGQUERY_CLIENT_CONNECT_TIMEOUT_MS"], + "SINK_BIGQUERY_CLIENT_READ_TIMEOUT_MS": envs["SINK_BIGQUERY_CLIENT_READ_TIMEOUT_MS"], + "SINK_BIGQUERY_CREDENTIAL_PATH": envs["SINK_BIGQUERY_CREDENTIAL_PATH"], + "SINK_BIGQUERY_DATASET_LABELS": envs["SINK_BIGQUERY_DATASET_LABELS"], + "SINK_BIGQUERY_DATASET_LOCATION": envs["SINK_BIGQUERY_DATASET_LOCATION"], + "SINK_BIGQUERY_DATASET_NAME": envs["SINK_BIGQUERY_DATASET_NAME"], + "SINK_BIGQUERY_GOOGLE_CLOUD_PROJECT_ID": envs["SINK_BIGQUERY_GOOGLE_CLOUD_PROJECT_ID"], + "SINK_BIGQUERY_ROW_INSERT_ID_ENABLE": envs["SINK_BIGQUERY_ROW_INSERT_ID_ENABLE"], + "SINK_BIGQUERY_STORAGE_API_ENABLE": envs["SINK_BIGQUERY_STORAGE_API_ENABLE"], + "SINK_BIGQUERY_TABLE_LABELS": envs["SINK_BIGQUERY_TABLE_LABELS"], + "SINK_BIGQUERY_TABLE_NAME": envs["SINK_BIGQUERY_TABLE_NAME"], + "SINK_BIGQUERY_TABLE_PARTITION_EXPIRY_MS": envs["SINK_BIGQUERY_TABLE_PARTITION_EXPIRY_MS"], + "SINK_BIGQUERY_TABLE_PARTITION_KEY": envs["SINK_BIGQUERY_TABLE_PARTITION_KEY"], + "SINK_BIGQUERY_TABLE_PARTITIONING_ENABLE": envs["SINK_BIGQUERY_TABLE_PARTITIONING_ENABLE"], + } + job.DlqGcsCredentialPath = modConf.EnvVariables["DLQ_GCS_CREDENTIAL_PATH"] + + return nil } // DlqJob param here is expected to have been enriched with firehose config @@ -51,23 +109,13 @@ func mapToEntropyResource(job models.DlqJob) (*entropyv1beta1.Resource, error) { } func makeConfigStruct(job models.DlqJob) (*structpb.Value, error) { - envVars := map[string]string{} - for key, value := range job.EnvVars { - envVars[key] = value - } - envVars["DLQ_BATCH_SIZE"] = fmt.Sprintf("%d", job.BatchSize) - envVars["DLQ_NUM_THREADS"] = fmt.Sprintf("%d", job.NumThreads) - envVars["DLQ_ERROR_TYPES"] = job.ErrorTypes - envVars["DLQ_INPUT_DATE"] = job.Date - envVars["DLQ_TOPIC_NAME"] = job.Topic - envVars["METRIC_STATSD_TAGS"] = "" // TBA dlqCredentialSecretName := "firehose-bigquery-sink-credential" dlqTelegrafConfigName := "dlq-processor-telegraf" return utils.GoValToProtoStruct(entropy.JobConfig{ Replicas: int32(job.Replicas), Namespace: job.Namespace, - Name: "", // TBA + Name: buildEntropyResourceName(job), Containers: []entropy.JobContainer{ { Name: "dlq-job", @@ -87,7 +135,7 @@ func makeConfigStruct(job models.DlqJob) (*structpb.Value, error) { CPU: "0.5", // user Memory: "2gb", // user }, - EnvVariables: envVars, + EnvVariables: job.EnvVars, }, { Name: "telegraf",