Skip to content
This repository has been archived by the owner on Oct 25, 2023. It is now read-only.

Commit

Permalink
chore: fix mapping env vars
Browse files Browse the repository at this point in the history
  • Loading branch information
Lifosmin Simon authored and Lifosmin Simon committed Oct 12, 2023
1 parent f9e1a7c commit b1c87f4
Showing 1 changed file with 66 additions and 18 deletions.
84 changes: 66 additions & 18 deletions internal/server/v1/dlq/mapper.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dlq

import (
"errors"
"fmt"
"strconv"

Expand All @@ -13,14 +14,71 @@ import (
"github.com/goto/dex/internal/server/utils"
)

func enrichDlqJob(job *models.DlqJob, f models.Firehose, cfg *DlqJobConfig) {
job.ResourceID = f.Urn
job.Namespace = "" // TBA
job.KubeCluster = *f.Configs.KubeCluster
func enrichDlqJob(job *models.DlqJob, res *entropyv1beta1.Resource, cfg *DlqJobConfig) error {

Check failure on line 17 in internal/server/v1/dlq/mapper.go

View workflow job for this annotation

GitHub Actions / golangci-lint

`enrichDlqJob` is unused (deadcode)
var kubeCluster string
for _, dep := range res.Spec.GetDependencies() {
if dep.GetKey() == "kube_cluster" {

Check failure on line 20 in internal/server/v1/dlq/mapper.go

View workflow job for this annotation

GitHub Actions / golangci-lint

string `kube_cluster` has 2 occurrences, make it a constant (goconst)
kubeCluster = dep.GetValue()
}
}
var modConf entropy.FirehoseConfig
if err := utils.ProtoStructToGoVal(res.Spec.GetConfigs(), &modConf); err != nil {
return err
}

outputMap := res.GetState().GetOutput().GetStructValue().AsMap()
namespaceAny, exists := outputMap["namespace"]
if !exists {
return errors.New("could not find firehose namespace from resource output")
}
namespace, ok := namespaceAny.(string)
if !ok {
return errors.New("firehose namespace format is invalid")
}

envs := modConf.EnvVariables
job.ResourceID = res.GetUrn()
job.Namespace = namespace
job.KubeCluster = kubeCluster
job.ContainerImage = cfg.DlqJobImage
job.PrometheusHost = cfg.PrometheusHost
job.EnvVars = f.Configs.EnvVars
job.DlqGcsCredentialPath = f.Configs.EnvVars["DLQ_GCS_CREDENTIAL_PATH"]
job.EnvVars = map[string]string{
"DLQ_BATCH_SIZE": fmt.Sprintf("%d", job.BatchSize),
"DLQ_NUM_THREADS": fmt.Sprintf("%d", job.NumThreads),
"DLQ_ERROR_TYPES": job.ErrorTypes,
"DLQ_INPUT_DATE": job.Date,
"DLQ_TOPIC_NAME": job.Topic,
"METRIC_STATSD_TAGS": "a=b", // TBA
"SINK_TYPE": envs["SINK_TYPE"],
"DLQ_PREFIX_DIR": "test-firehose",
"DLQ_FINISHED_STATUS_FILE": "/shared/job-finished",
"DLQ_GCS_BUCKET_NAME": envs["DLQ_GCS_BUCKET_NAME"],
"DLQ_GCS_CREDENTIAL_PATH": envs["DLQ_GCS_CREDENTIAL_PATH"],
"DLQ_GCS_GOOGLE_CLOUD_PROJECT_ID": envs["DLQ_GCS_GOOGLE_CLOUD_PROJECT_ID"],
"JAVA_TOOL_OPTIONS": envs["JAVA_TOOL_OPTIONS"],
"_JAVA_OPTIONS": envs["_JAVA_OPTIONS"],
"INPUT_SCHEMA_PROTO_CLASS": envs["INPUT_SCHEMA_PROTO_CLASS"],
"SCHEMA_REGISTRY_STENCIL_ENABLE": envs["SCHEMA_REGISTRY_STENCIL_ENABLE"],
"SCHEMA_REGISTRY_STENCIL_URLS": envs["SCHEMA_REGISTRY_STENCIL_URLS"],
"SINK_BIGQUERY_ADD_METADATA_ENABLED": envs["SINK_BIGQUERY_ADD_METADATA_ENABLED"],
"SINK_BIGQUERY_CLIENT_CONNECT_TIMEOUT_MS": envs["SINK_BIGQUERY_CLIENT_CONNECT_TIMEOUT_MS"],
"SINK_BIGQUERY_CLIENT_READ_TIMEOUT_MS": envs["SINK_BIGQUERY_CLIENT_READ_TIMEOUT_MS"],
"SINK_BIGQUERY_CREDENTIAL_PATH": envs["SINK_BIGQUERY_CREDENTIAL_PATH"],
"SINK_BIGQUERY_DATASET_LABELS": envs["SINK_BIGQUERY_DATASET_LABELS"],
"SINK_BIGQUERY_DATASET_LOCATION": envs["SINK_BIGQUERY_DATASET_LOCATION"],
"SINK_BIGQUERY_DATASET_NAME": envs["SINK_BIGQUERY_DATASET_NAME"],
"SINK_BIGQUERY_GOOGLE_CLOUD_PROJECT_ID": envs["SINK_BIGQUERY_GOOGLE_CLOUD_PROJECT_ID"],
"SINK_BIGQUERY_ROW_INSERT_ID_ENABLE": envs["SINK_BIGQUERY_ROW_INSERT_ID_ENABLE"],
"SINK_BIGQUERY_STORAGE_API_ENABLE": envs["SINK_BIGQUERY_STORAGE_API_ENABLE"],
"SINK_BIGQUERY_TABLE_LABELS": envs["SINK_BIGQUERY_TABLE_LABELS"],
"SINK_BIGQUERY_TABLE_NAME": envs["SINK_BIGQUERY_TABLE_NAME"],
"SINK_BIGQUERY_TABLE_PARTITION_EXPIRY_MS": envs["SINK_BIGQUERY_TABLE_PARTITION_EXPIRY_MS"],
"SINK_BIGQUERY_TABLE_PARTITION_KEY": envs["SINK_BIGQUERY_TABLE_PARTITION_KEY"],
"SINK_BIGQUERY_TABLE_PARTITIONING_ENABLE": envs["SINK_BIGQUERY_TABLE_PARTITIONING_ENABLE"],
}
job.DlqGcsCredentialPath = modConf.EnvVariables["DLQ_GCS_CREDENTIAL_PATH"]

return nil
}

// DlqJob param here is expected to have been enriched with firehose config
Expand Down Expand Up @@ -51,23 +109,13 @@ func mapToEntropyResource(job models.DlqJob) (*entropyv1beta1.Resource, error) {
}

func makeConfigStruct(job models.DlqJob) (*structpb.Value, error) {

Check failure on line 111 in internal/server/v1/dlq/mapper.go

View workflow job for this annotation

GitHub Actions / golangci-lint

func `makeConfigStruct` is unused (unused)
envVars := map[string]string{}
for key, value := range job.EnvVars {
envVars[key] = value
}
envVars["DLQ_BATCH_SIZE"] = fmt.Sprintf("%d", job.BatchSize)
envVars["DLQ_NUM_THREADS"] = fmt.Sprintf("%d", job.NumThreads)
envVars["DLQ_ERROR_TYPES"] = job.ErrorTypes
envVars["DLQ_INPUT_DATE"] = job.Date
envVars["DLQ_TOPIC_NAME"] = job.Topic
envVars["METRIC_STATSD_TAGS"] = "" // TBA

dlqCredentialSecretName := "firehose-bigquery-sink-credential"

Check failure on line 113 in internal/server/v1/dlq/mapper.go

View workflow job for this annotation

GitHub Actions / golangci-lint

G101: Potential hardcoded credentials (gosec)
dlqTelegrafConfigName := "dlq-processor-telegraf"
return utils.GoValToProtoStruct(entropy.JobConfig{
Replicas: int32(job.Replicas),
Namespace: job.Namespace,
Name: "", // TBA
Name: buildEntropyResourceName(job),
Containers: []entropy.JobContainer{
{
Name: "dlq-job",
Expand All @@ -87,7 +135,7 @@ func makeConfigStruct(job models.DlqJob) (*structpb.Value, error) {
CPU: "0.5", // user
Memory: "2gb", // user
},
EnvVariables: envVars,
EnvVariables: job.EnvVars,
},
{
Name: "telegraf",
Expand Down

0 comments on commit b1c87f4

Please sign in to comment.