diff --git a/internal/server/v1/dlq/handler.go b/internal/server/v1/dlq/handler.go index a0bc58b..34195e5 100644 --- a/internal/server/v1/dlq/handler.go +++ b/internal/server/v1/dlq/handler.go @@ -10,6 +10,7 @@ import ( "github.com/goto/dex/entropy" "github.com/goto/dex/generated/models" "github.com/goto/dex/internal/server/gcs" + "github.com/goto/dex/internal/server/reqctx" "github.com/goto/dex/internal/server/utils" "github.com/goto/dex/internal/server/v1/firehose" ) @@ -65,6 +66,7 @@ func (*Handler) listDlqJobs(w http.ResponseWriter, _ *http.Request) { func (h *Handler) createDlqJob(w http.ResponseWriter, r *http.Request) { // transform request body into DlqJob (validation?) ctx := r.Context() + reqCtx := reqctx.From(ctx) var dlqJob models.DlqJob if err := utils.ReadJSON(r, &dlqJob); err != nil { @@ -73,13 +75,15 @@ func (h *Handler) createDlqJob(w http.ResponseWriter, r *http.Request) { } // call service.CreateDLQJob - resp, err := h.service.CreateDLQJob(ctx, &dlqJob) + err := h.service.CreateDLQJob(ctx, reqCtx.UserEmail, &dlqJob) if err != nil { utils.WriteErr(w, err) return } // return - utils.WriteJSON(w, http.StatusOK, resp) + utils.WriteJSON(w, http.StatusOK, map[string]interface{}{ + "dlq_list": dlqJob.Urn, + }) } func (h *Handler) getDlqJob(w http.ResponseWriter, r *http.Request) { diff --git a/internal/server/v1/dlq/handler_test.go b/internal/server/v1/dlq/handler_test.go index ff77ae3..22acc7d 100644 --- a/internal/server/v1/dlq/handler_test.go +++ b/internal/server/v1/dlq/handler_test.go @@ -1,26 +1,36 @@ package dlq_test import ( + "bytes" "context" "encoding/json" "fmt" "net/http" + "net/http/httptest" "testing" entropyv1beta1 "buf.build/gen/go/gotocompany/proton/protocolbuffers/go/gotocompany/entropy/v1beta1" + "github.com/go-chi/chi/v5" + "github.com/go-openapi/strfmt" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/structpb" "github.com/goto/dex/entropy" "github.com/goto/dex/generated/models" "github.com/goto/dex/internal/server/gcs" + "github.com/goto/dex/internal/server/reqctx" "github.com/goto/dex/internal/server/utils" "github.com/goto/dex/internal/server/v1/dlq" "github.com/goto/dex/internal/server/v1/firehose" "github.com/goto/dex/mocks" ) +const ( + emailHeaderKey = "X-Auth-Email" +) + type testHTTPWriter struct { messages []string } @@ -40,7 +50,7 @@ func (*testHTTPWriter) WriteHeader(int) { func TestListTopicDates(t *testing.T) { eService := &mocks.ResourceServiceClient{} gClient := &mocks.BlobStorageClient{} - handler := dlq.NewHandler(dlq.NewService(eService, gClient, &dlq.DlqJobConfig{})) + handler := dlq.NewHandler(dlq.NewService(eService, gClient, dlq.DlqJobConfig{})) httpWriter := &testHTTPWriter{} httpRequest := &http.Request{} config := &entropy.FirehoseConfig{ @@ -115,7 +125,7 @@ func TestListTopicDates(t *testing.T) { func TestErrorFromGCSClient(t *testing.T) { eService := &mocks.ResourceServiceClient{} gClient := &mocks.BlobStorageClient{} - handler := dlq.NewHandler(dlq.NewService(eService, gClient, &dlq.DlqJobConfig{})) + handler := dlq.NewHandler(dlq.NewService(eService, gClient, dlq.DlqJobConfig{})) httpWriter := &testHTTPWriter{} httpRequest := &http.Request{} config := &entropy.FirehoseConfig{ @@ -173,7 +183,7 @@ func TestErrorFromGCSClient(t *testing.T) { func TestErrorFromFirehoseResource(t *testing.T) { eService := &mocks.ResourceServiceClient{} gClient := &mocks.BlobStorageClient{} - handler := dlq.NewHandler(dlq.NewService(eService, gClient, &dlq.DlqJobConfig{})) + handler := dlq.NewHandler(dlq.NewService(eService, gClient, dlq.DlqJobConfig{})) httpWriter := &testHTTPWriter{} httpRequest := &http.Request{} eService.On( @@ -186,3 +196,319 @@ func TestErrorFromFirehoseResource(t *testing.T) { require.NoError(t, err) assert.Equal(t, "test-error", expectedMap["cause"]) } + +func TestCreateDlqJob(t *testing.T) { + var ( + method = http.MethodPost + path = fmt.Sprintf("/jobs") + resource_id = "test-resource-id" + resource_type = "test-resource-type" + error_types = "DESERILIAZATION_ERROR" + date = "21-10-2022" + batch_size = 0 + num_threads = 0 + topic = "test-topic" + jsonPayload = fmt.Sprintf(`{ + "resource_id": "%s", + "resource_type": "%s", + "error_types": "%s", + "batch_size": %d, + "num_threads": %d, + "topic": "%s", + "date": "%s" + }`, resource_id, resource_type, error_types, batch_size, num_threads, topic, date) + ) + + t.Run("Should return resource urn", func(t *testing.T) { + // initt input + namespace := "test-namespace" + kubeCluster := "test-kube-cluster" + userEmail := "test@example.com" + config := dlq.DlqJobConfig{ + PrometheusHost: "http://sample-prom-host", + DlqJobImage: "test-image", + } + envVars := map[string]string{ + "SINK_TYPE": "bigquery", + "DLQ_BATCH_SIZE": "34", + "DLQ_NUM_THREADS": "10", + "DLQ_PREFIX_DIR": "test-firehose", + "DLQ_FINISHED_STATUS_FILE": "/shared/job-finished", + "DLQ_GCS_BUCKET_NAME": "g-pilotdata-gl-dlq", + "DLQ_ERROR_TYPES": "DEFAULT_ERROR", + "DLQ_GCS_CREDENTIAL_PATH": "/etc/secret/gcp/token", + "DLQ_GCS_GOOGLE_CLOUD_PROJECT_ID": "pilotdata-integration", + "DLQ_INPUT_DATE": "2023-04-10", + "JAVA_TOOL_OPTIONS": "-javaagent:jolokia-jvm-agent.jar=port=8778,host=localhost", + "_JAVA_OPTIONS": "-Xmx1800m -Xms1800m", + "DLQ_TOPIC_NAME": "gofood-booking-log", + "INPUT_SCHEMA_PROTO_CLASS": "gojek.esb.booking.GoFoodBookingLogMessage", + "METRIC_STATSD_TAGS": "a=b", + "SCHEMA_REGISTRY_STENCIL_ENABLE": "true", + "SCHEMA_REGISTRY_STENCIL_URLS": "http://p-godata-systems-stencil-v1beta1-ingress.golabs.io/v1beta1/namespaces/gojek/schemas/esb-log-entities", + "SINK_BIGQUERY_ADD_METADATA_ENABLED": "true", + "SINK_BIGQUERY_CLIENT_CONNECT_TIMEOUT_MS": "-1", + "SINK_BIGQUERY_CLIENT_READ_TIMEOUT_MS": "-1", + "SINK_BIGQUERY_CREDENTIAL_PATH": "/etc/secret/gcp/token", + "SINK_BIGQUERY_DATASET_LABELS": "shangchi=legend,lord=voldemort", + "SINK_BIGQUERY_DATASET_LOCATION": "US", + "SINK_BIGQUERY_DATASET_NAME": "bq_test", + "SINK_BIGQUERY_GOOGLE_CLOUD_PROJECT_ID": "pilotdata-integration", + "SINK_BIGQUERY_ROW_INSERT_ID_ENABLE": "false", + "SINK_BIGQUERY_STORAGE_API_ENABLE": "true", + "SINK_BIGQUERY_TABLE_LABELS": "hello=world,john=doe", + "SINK_BIGQUERY_TABLE_NAME": "bq_dlq_test1", + "SINK_BIGQUERY_TABLE_PARTITION_EXPIRY_MS": "2629800000", + "SINK_BIGQUERY_TABLE_PARTITION_KEY": "event_timestamp", + "SINK_BIGQUERY_TABLE_PARTITIONING_ENABLE": "true", + } + + outputStruct, err := structpb.NewStruct(map[string]interface{}{ + "namespace": namespace, + }) + require.NoError(t, err) + + firehoseConfig, err := utils.GoValToProtoStruct(entropy.FirehoseConfig{ + EnvVariables: envVars, + }) + require.NoError(t, err) + + firehoseResource := &entropyv1beta1.Resource{ + Spec: &entropyv1beta1.ResourceSpec{ + Dependencies: []*entropyv1beta1.ResourceDependency{ + { + Key: "kube_cluster", + Value: kubeCluster, + }, + }, + Configs: firehoseConfig, + }, + State: &entropyv1beta1.ResourceState{ + Output: structpb.NewStructValue(outputStruct), + }, + } + + jobResource := &entropyv1beta1.Resource{ + Urn: "test-urn", + State: &entropyv1beta1.ResourceState{ + Output: structpb.NewStructValue(outputStruct), + }, + } + + expectedEnvVars := map[string]string{ + "DLQ_BATCH_SIZE": fmt.Sprintf("%d", batch_size), + "DLQ_NUM_THREADS": fmt.Sprintf("%d", num_threads), + "DLQ_ERROR_TYPES": error_types, + "DLQ_INPUT_DATE": date, + "DLQ_TOPIC_NAME": topic, + "METRIC_STATSD_TAGS": "a=b", // TBA + "SINK_TYPE": envVars["SINK_TYPE"], + "DLQ_PREFIX_DIR": "test-firehose", + "DLQ_FINISHED_STATUS_FILE": "/shared/job-finished", + "DLQ_GCS_BUCKET_NAME": envVars["DLQ_GCS_BUCKET_NAME"], + "DLQ_GCS_CREDENTIAL_PATH": envVars["DLQ_GCS_CREDENTIAL_PATH"], + "DLQ_GCS_GOOGLE_CLOUD_PROJECT_ID": envVars["DLQ_GCS_GOOGLE_CLOUD_PROJECT_ID"], + "JAVA_TOOL_OPTIONS": envVars["JAVA_TOOL_OPTIONS"], + "_JAVA_OPTIONS": envVars["_JAVA_OPTIONS"], + "INPUT_SCHEMA_PROTO_CLASS": envVars["INPUT_SCHEMA_PROTO_CLASS"], + "SCHEMA_REGISTRY_STENCIL_ENABLE": envVars["SCHEMA_REGISTRY_STENCIL_ENABLE"], + "SCHEMA_REGISTRY_STENCIL_URLS": envVars["SCHEMA_REGISTRY_STENCIL_URLS"], + "SINK_BIGQUERY_ADD_METADATA_ENABLED": envVars["SINK_BIGQUERY_ADD_METADATA_ENABLED"], + "SINK_BIGQUERY_CLIENT_CONNECT_TIMEOUT_MS": envVars["SINK_BIGQUERY_CLIENT_CONNECT_TIMEOUT_MS"], + "SINK_BIGQUERY_CLIENT_READ_TIMEOUT_MS": envVars["SINK_BIGQUERY_CLIENT_READ_TIMEOUT_MS"], + "SINK_BIGQUERY_CREDENTIAL_PATH": envVars["SINK_BIGQUERY_CREDENTIAL_PATH"], + "SINK_BIGQUERY_DATASET_LABELS": envVars["SINK_BIGQUERY_DATASET_LABELS"], + "SINK_BIGQUERY_DATASET_LOCATION": envVars["SINK_BIGQUERY_DATASET_LOCATION"], + "SINK_BIGQUERY_DATASET_NAME": envVars["SINK_BIGQUERY_DATASET_NAME"], + "SINK_BIGQUERY_GOOGLE_CLOUD_PROJECT_ID": envVars["SINK_BIGQUERY_GOOGLE_CLOUD_PROJECT_ID"], + "SINK_BIGQUERY_ROW_INSERT_ID_ENABLE": envVars["SINK_BIGQUERY_ROW_INSERT_ID_ENABLE"], + "SINK_BIGQUERY_STORAGE_API_ENABLE": envVars["SINK_BIGQUERY_STORAGE_API_ENABLE"], + "SINK_BIGQUERY_TABLE_LABELS": envVars["SINK_BIGQUERY_TABLE_LABELS"], + "SINK_BIGQUERY_TABLE_NAME": envVars["SINK_BIGQUERY_TABLE_NAME"], + "SINK_BIGQUERY_TABLE_PARTITION_EXPIRY_MS": envVars["SINK_BIGQUERY_TABLE_PARTITION_EXPIRY_MS"], + "SINK_BIGQUERY_TABLE_PARTITION_KEY": envVars["SINK_BIGQUERY_TABLE_PARTITION_KEY"], + "SINK_BIGQUERY_TABLE_PARTITIONING_ENABLE": envVars["SINK_BIGQUERY_TABLE_PARTITIONING_ENABLE"], + } + + jobConfig, err := utils.GoValToProtoStruct(entropy.JobConfig{ + Replicas: 0, + Namespace: namespace, + Containers: []entropy.JobContainer{ + { + Name: "dlq-job", + Image: config.DlqJobImage, + ImagePullPolicy: "Always", + SecretsVolumes: []entropy.JobSecret{ + { + Name: "firehose-bigquery-sink-credential", + Mount: envVars["DLQ_GCS_CREDENTIAL_PATH"], + }, + }, + Limits: entropy.UsageSpec{ + CPU: "0.5", // user + Memory: "2gb", // user + }, + Requests: entropy.UsageSpec{ + CPU: "0.5", // user + Memory: "2gb", // user + }, + EnvVariables: expectedEnvVars, + }, + { + Name: "telegraf", + Image: "telegraf:1.18.0-alpine", + ConfigMapsVolumes: []entropy.JobConfigMap{ + { + Name: "dlq-processor-telegraf", + Mount: "/etc/telegraf", + }, + }, + EnvVariables: map[string]string{ + // To be updated by streaming + "APP_NAME": "", // TBA + "PROMETHEUS_HOST": config.PrometheusHost, + "DEPLOYMENT_NAME": "deployment-name", + "TEAM": "", + "TOPIC": topic, + "environment": "production", // TBA + "organization": "de", // TBA + "projectID": "", + }, + Command: []string{ + "/bin/bash", + }, + Args: []string{ + "-c", + "telegraf & while [ ! -f /shared/job-finished ]; do sleep 5; done; sleep 20 && exit 0", + }, + Limits: entropy.UsageSpec{ + CPU: "100m", // user + Memory: "300Mi", // user + }, + Requests: entropy.UsageSpec{ + CPU: "100m", // user + Memory: "300Mi", // user + }, + }, + }, + JobLabels: map[string]string{ + "firehose": resource_id, + "topic": topic, + "date": date, + }, + Volumes: []entropy.JobVolume{ + { + Name: "firehose-bigquery-sink-credential", + Kind: "secret", + }, + { + Name: "dlq-processor-telegraf", + Kind: "configMap", + }, + }, + }) + require.NoError(t, err) + + newJobResourcePayload := &entropyv1beta1.Resource{ + Urn: "", + Kind: entropy.ResourceKindJob, + Name: fmt.Sprintf( + "%s-%s-%s-%s", + resource_id, // firehose urn + resource_type, // firehose / dagger + topic, // + date, // + ), + Project: firehoseResource.Project, + Labels: map[string]string{ + "resource_id": resource_id, + "type": resource_type, + "date": date, + "topic": topic, + "job_type": "dlq", + }, + CreatedBy: jobResource.CreatedBy, + UpdatedBy: jobResource.UpdatedBy, + Spec: &entropyv1beta1.ResourceSpec{ + Configs: jobConfig, + Dependencies: []*entropyv1beta1.ResourceDependency{ + { + Key: "kube_cluster", + Value: kubeCluster, // from firehose configs.kube_cluster + }, + }, + }, + } + + // set conditions + entropyClient := new(mocks.ResourceServiceClient) + entropyClient.On( + "GetResource", mock.Anything, &entropyv1beta1.GetResourceRequest{}, + ).Return(&entropyv1beta1.GetResourceResponse{ + Resource: firehoseResource, + }, nil) + entropyClient.On("CreateResource", mock.Anything, &entropyv1beta1.CreateResourceRequest{ + Resource: newJobResourcePayload, + }).Return(&entropyv1beta1.CreateResourceResponse{ + Resource: jobResource, + }, nil) + defer entropyClient.AssertExpectations(t) + + // assertions + _ = models.DlqJob{ + // from input + BatchSize: int64(batch_size), + ResourceID: resource_id, + ResourceType: resource_type, + Topic: topic, + NumThreads: int64(num_threads), + Date: date, + ErrorTypes: error_types, + + // firehose resource + ContainerImage: config.DlqJobImage, + DlqGcsCredentialPath: envVars["DLQ_GCS_CREDENTIAL_PATH"], + EnvVars: expectedEnvVars, + Group: "", // + KubeCluster: kubeCluster, + Namespace: namespace, + Project: firehoseResource.Project, + PrometheusHost: config.PrometheusHost, + + // hardcoded + Replicas: 0, + + // job resource + Urn: jobResource.Urn, + Status: jobResource.GetState().GetStatus().String(), + CreatedAt: strfmt.DateTime(jobResource.CreatedAt.AsTime()), + CreatedBy: jobResource.CreatedBy, + UpdatedAt: strfmt.DateTime(jobResource.UpdatedAt.AsTime()), + UpdatedBy: jobResource.UpdatedBy, + } + assert.NoError(t, err) + requestBody := bytes.NewReader([]byte(jsonPayload)) + + response := httptest.NewRecorder() + request := httptest.NewRequest(method, path, requestBody) + request.Header.Set(emailHeaderKey, userEmail) + router := getRouter() + dlq.Routes(entropyClient, nil, config)(router) + router.ServeHTTP(response, request) + + assert.Equal(t, http.StatusOK, response.Code) + resultJSON := response.Body.Bytes() + expectedJSON, err := json.Marshal(map[string]interface{}{ + "dlq_list": "test-urn", + }) + require.NoError(t, err) + assert.JSONEq(t, string(expectedJSON), string(resultJSON)) + }) +} + +func getRouter() *chi.Mux { + router := chi.NewRouter() + router.Use(reqctx.WithRequestCtx()) + + return router +} diff --git a/internal/server/v1/dlq/mapper.go b/internal/server/v1/dlq/mapper.go index c0c5533..7ce4b50 100644 --- a/internal/server/v1/dlq/mapper.go +++ b/internal/server/v1/dlq/mapper.go @@ -20,7 +20,7 @@ const ( dlqTelegrafConfigName = "dlq-processor-telegraf" ) -func enrichDlqJob(job *models.DlqJob, res *entropyv1beta1.Resource, cfg *DlqJobConfig) error { +func enrichDlqJob(job *models.DlqJob, res *entropyv1beta1.Resource, cfg DlqJobConfig) error { var kubeCluster string for _, dep := range res.Spec.GetDependencies() { if dep.GetKey() == kubeClusterDependenciesKey { @@ -41,10 +41,14 @@ func enrichDlqJob(job *models.DlqJob, res *entropyv1beta1.Resource, cfg *DlqJobC if !ok { return ErrFirehoseNamespaceInvalid } + status := res.GetState().GetStatus().String() envs := modConf.EnvVariables - job.ResourceID = res.GetUrn() job.Namespace = namespace + job.Status = status + job.CreatedAt = strfmt.DateTime(res.CreatedAt.AsTime()) + job.UpdatedAt = strfmt.DateTime(res.UpdatedAt.AsTime()) + job.KubeCluster = kubeCluster job.ContainerImage = cfg.DlqJobImage job.PrometheusHost = cfg.PrometheusHost diff --git a/internal/server/v1/dlq/routes.go b/internal/server/v1/dlq/routes.go index 4d95fea..328f138 100644 --- a/internal/server/v1/dlq/routes.go +++ b/internal/server/v1/dlq/routes.go @@ -10,7 +10,7 @@ import ( func Routes( entropyClient entropyv1beta1rpc.ResourceServiceClient, gcsClient gcs.BlobStorageClient, - cfg *DlqJobConfig, + cfg DlqJobConfig, ) func(r chi.Router) { service := NewService(entropyClient, gcsClient, cfg) handler := NewHandler(service) @@ -19,6 +19,6 @@ func Routes( r.Get("/firehose/{firehose_urn}", handler.ListFirehoseDLQ) r.Get("/jobs", handler.listDlqJobs) r.Get("/jobs/{job_urn}", handler.getDlqJob) - r.Post("/firehose/{firehose_urn}/dlq_jobs", handler.createDlqJob) + r.Post("/jobs", handler.createDlqJob) } } diff --git a/internal/server/v1/dlq/service.go b/internal/server/v1/dlq/service.go index fcae326..391416b 100644 --- a/internal/server/v1/dlq/service.go +++ b/internal/server/v1/dlq/service.go @@ -9,7 +9,6 @@ import ( "github.com/goto/dex/generated/models" "github.com/goto/dex/internal/server/gcs" - "github.com/goto/dex/internal/server/reqctx" ) type DlqJobConfig struct { @@ -20,11 +19,10 @@ type DlqJobConfig struct { type Service struct { client entropyv1beta1rpc.ResourceServiceClient gcsClient gcs.BlobStorageClient - cfg *DlqJobConfig - Entropy entropyv1beta1rpc.ResourceServiceClient + cfg DlqJobConfig } -func NewService(client entropyv1beta1rpc.ResourceServiceClient, gcsClient gcs.BlobStorageClient, cfg *DlqJobConfig) *Service { +func NewService(client entropyv1beta1rpc.ResourceServiceClient, gcsClient gcs.BlobStorageClient, cfg DlqJobConfig) *Service { return &Service{ client: client, gcsClient: gcsClient, @@ -33,32 +31,32 @@ func NewService(client entropyv1beta1rpc.ResourceServiceClient, gcsClient gcs.Bl } // TODO: replace *DlqJob with a generated models.DlqJob -func (s *Service) CreateDLQJob(ctx context.Context, dlqJob *models.DlqJob) (*entropyv1beta1.Resource, error) { +func (s *Service) CreateDLQJob(ctx context.Context, userEmail string, dlqJob *models.DlqJob) error { // validate dlqJob for creation // fetch firehose details - def, err := s.Entropy.GetResource(ctx, &entropyv1beta1.GetResourceRequest{Urn: dlqJob.ResourceID}) + def, err := s.client.GetResource(ctx, &entropyv1beta1.GetResourceRequest{Urn: dlqJob.ResourceID}) if err != nil { - return nil, ErrFirehoseNotFound + return ErrFirehoseNotFound } // enrich DlqJob with firehose details if err := enrichDlqJob(dlqJob, def.GetResource(), s.cfg); err != nil { - return nil, ErrFirehoseNotFound + return ErrFirehoseNotFound } // map DlqJob to entropy resource -> return entropy.Resource (kind = job) res, err := mapToEntropyResource(*dlqJob) if err != nil { - return nil, err + return err } // entropy create resource - reqCtx := reqctx.From(ctx) - entropyCtx := metadata.AppendToOutgoingContext(ctx, "user-id", reqCtx.UserEmail) + entropyCtx := metadata.AppendToOutgoingContext(ctx, "user-id", userEmail) rpcReq := &entropyv1beta1.CreateResourceRequest{Resource: res} - rpcResp, err := s.Entropy.CreateResource(entropyCtx, rpcReq) + rpcResp, err := s.client.CreateResource(entropyCtx, rpcReq) + dlqJob.Urn = rpcResp.Resource.Urn if err != nil { outErr := ErrInternal - return nil, outErr + return outErr } - return rpcResp.Resource, nil + return nil } diff --git a/internal/server/v1/dlq/service_test.go b/internal/server/v1/dlq/service_test.go new file mode 100644 index 0000000..8cff87d --- /dev/null +++ b/internal/server/v1/dlq/service_test.go @@ -0,0 +1,308 @@ +package dlq_test + +import ( + "context" + "fmt" + "testing" + + entropyv1beta1 "buf.build/gen/go/gotocompany/proton/protocolbuffers/go/gotocompany/entropy/v1beta1" + "github.com/go-openapi/strfmt" + "github.com/goto/dex/entropy" + "github.com/goto/dex/generated/models" + "github.com/goto/dex/internal/server/utils" + "github.com/goto/dex/internal/server/v1/dlq" + "github.com/goto/dex/mocks" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/metadata" + "google.golang.org/protobuf/types/known/structpb" +) + +func TestServiceCreateDLQJob(t *testing.T) { + t.Run("should create a entropy resource with job kind", func(t *testing.T) { + // inputs + ctx := context.TODO() + namespace := "test-namespace" + kubeCluster := "test-kube-cluster" + userEmail := "test@example.com" + config := dlq.DlqJobConfig{ + PrometheusHost: "http://sample-prom-host", + DlqJobImage: "test-image", + } + envVars := map[string]string{ + "SINK_TYPE": "bigquery", + "DLQ_BATCH_SIZE": "34", + "DLQ_NUM_THREADS": "10", + "DLQ_PREFIX_DIR": "test-firehose", + "DLQ_FINISHED_STATUS_FILE": "/shared/job-finished", + "DLQ_GCS_BUCKET_NAME": "g-pilotdata-gl-dlq", + "DLQ_ERROR_TYPES": "DEFAULT_ERROR", + "DLQ_GCS_CREDENTIAL_PATH": "/etc/secret/gcp/token", + "DLQ_GCS_GOOGLE_CLOUD_PROJECT_ID": "pilotdata-integration", + "DLQ_INPUT_DATE": "2023-04-10", + "JAVA_TOOL_OPTIONS": "-javaagent:jolokia-jvm-agent.jar=port=8778,host=localhost", + "_JAVA_OPTIONS": "-Xmx1800m -Xms1800m", + "DLQ_TOPIC_NAME": "gofood-booking-log", + "INPUT_SCHEMA_PROTO_CLASS": "gojek.esb.booking.GoFoodBookingLogMessage", + "METRIC_STATSD_TAGS": "a=b", + "SCHEMA_REGISTRY_STENCIL_ENABLE": "true", + "SCHEMA_REGISTRY_STENCIL_URLS": "http://p-godata-systems-stencil-v1beta1-ingress.golabs.io/v1beta1/namespaces/gojek/schemas/esb-log-entities", + "SINK_BIGQUERY_ADD_METADATA_ENABLED": "true", + "SINK_BIGQUERY_CLIENT_CONNECT_TIMEOUT_MS": "-1", + "SINK_BIGQUERY_CLIENT_READ_TIMEOUT_MS": "-1", + "SINK_BIGQUERY_CREDENTIAL_PATH": "/etc/secret/gcp/token", + "SINK_BIGQUERY_DATASET_LABELS": "shangchi=legend,lord=voldemort", + "SINK_BIGQUERY_DATASET_LOCATION": "US", + "SINK_BIGQUERY_DATASET_NAME": "bq_test", + "SINK_BIGQUERY_GOOGLE_CLOUD_PROJECT_ID": "pilotdata-integration", + "SINK_BIGQUERY_ROW_INSERT_ID_ENABLE": "false", + "SINK_BIGQUERY_STORAGE_API_ENABLE": "true", + "SINK_BIGQUERY_TABLE_LABELS": "hello=world,john=doe", + "SINK_BIGQUERY_TABLE_NAME": "bq_dlq_test1", + "SINK_BIGQUERY_TABLE_PARTITION_EXPIRY_MS": "2629800000", + "SINK_BIGQUERY_TABLE_PARTITION_KEY": "event_timestamp", + "SINK_BIGQUERY_TABLE_PARTITIONING_ENABLE": "true", + } + + dlqJob := models.DlqJob{ + BatchSize: int64(5), + Date: "2012-10-30", + ErrorTypes: "DESERILIAZATION_ERROR", + // Group: "", + NumThreads: 2, + ResourceID: "test-resource-id", + ResourceType: "firehose", + Topic: "test-create-topic", + } + + outputStruct, err := structpb.NewStruct(map[string]interface{}{ + "namespace": namespace, + }) + require.NoError(t, err) + + firehoseConfig, err := utils.GoValToProtoStruct(entropy.FirehoseConfig{ + EnvVariables: envVars, + }) + require.NoError(t, err) + + firehoseResource := &entropyv1beta1.Resource{ + Spec: &entropyv1beta1.ResourceSpec{ + Dependencies: []*entropyv1beta1.ResourceDependency{ + { + Key: "kube_cluster", + Value: kubeCluster, + }, + }, + Configs: firehoseConfig, + }, + State: &entropyv1beta1.ResourceState{ + Output: structpb.NewStructValue(outputStruct), + }, + } + + jobResource := &entropyv1beta1.Resource{ + Urn: "test-urn", + State: &entropyv1beta1.ResourceState{ + Output: structpb.NewStructValue(outputStruct), + }, + } + + expectedEnvVars := map[string]string{ + "DLQ_BATCH_SIZE": fmt.Sprintf("%d", dlqJob.BatchSize), + "DLQ_NUM_THREADS": fmt.Sprintf("%d", dlqJob.NumThreads), + "DLQ_ERROR_TYPES": dlqJob.ErrorTypes, + "DLQ_INPUT_DATE": dlqJob.Date, + "DLQ_TOPIC_NAME": dlqJob.Topic, + "METRIC_STATSD_TAGS": "a=b", // TBA + "SINK_TYPE": envVars["SINK_TYPE"], + "DLQ_PREFIX_DIR": "test-firehose", + "DLQ_FINISHED_STATUS_FILE": "/shared/job-finished", + "DLQ_GCS_BUCKET_NAME": envVars["DLQ_GCS_BUCKET_NAME"], + "DLQ_GCS_CREDENTIAL_PATH": envVars["DLQ_GCS_CREDENTIAL_PATH"], + "DLQ_GCS_GOOGLE_CLOUD_PROJECT_ID": envVars["DLQ_GCS_GOOGLE_CLOUD_PROJECT_ID"], + "JAVA_TOOL_OPTIONS": envVars["JAVA_TOOL_OPTIONS"], + "_JAVA_OPTIONS": envVars["_JAVA_OPTIONS"], + "INPUT_SCHEMA_PROTO_CLASS": envVars["INPUT_SCHEMA_PROTO_CLASS"], + "SCHEMA_REGISTRY_STENCIL_ENABLE": envVars["SCHEMA_REGISTRY_STENCIL_ENABLE"], + "SCHEMA_REGISTRY_STENCIL_URLS": envVars["SCHEMA_REGISTRY_STENCIL_URLS"], + "SINK_BIGQUERY_ADD_METADATA_ENABLED": envVars["SINK_BIGQUERY_ADD_METADATA_ENABLED"], + "SINK_BIGQUERY_CLIENT_CONNECT_TIMEOUT_MS": envVars["SINK_BIGQUERY_CLIENT_CONNECT_TIMEOUT_MS"], + "SINK_BIGQUERY_CLIENT_READ_TIMEOUT_MS": envVars["SINK_BIGQUERY_CLIENT_READ_TIMEOUT_MS"], + "SINK_BIGQUERY_CREDENTIAL_PATH": envVars["SINK_BIGQUERY_CREDENTIAL_PATH"], + "SINK_BIGQUERY_DATASET_LABELS": envVars["SINK_BIGQUERY_DATASET_LABELS"], + "SINK_BIGQUERY_DATASET_LOCATION": envVars["SINK_BIGQUERY_DATASET_LOCATION"], + "SINK_BIGQUERY_DATASET_NAME": envVars["SINK_BIGQUERY_DATASET_NAME"], + "SINK_BIGQUERY_GOOGLE_CLOUD_PROJECT_ID": envVars["SINK_BIGQUERY_GOOGLE_CLOUD_PROJECT_ID"], + "SINK_BIGQUERY_ROW_INSERT_ID_ENABLE": envVars["SINK_BIGQUERY_ROW_INSERT_ID_ENABLE"], + "SINK_BIGQUERY_STORAGE_API_ENABLE": envVars["SINK_BIGQUERY_STORAGE_API_ENABLE"], + "SINK_BIGQUERY_TABLE_LABELS": envVars["SINK_BIGQUERY_TABLE_LABELS"], + "SINK_BIGQUERY_TABLE_NAME": envVars["SINK_BIGQUERY_TABLE_NAME"], + "SINK_BIGQUERY_TABLE_PARTITION_EXPIRY_MS": envVars["SINK_BIGQUERY_TABLE_PARTITION_EXPIRY_MS"], + "SINK_BIGQUERY_TABLE_PARTITION_KEY": envVars["SINK_BIGQUERY_TABLE_PARTITION_KEY"], + "SINK_BIGQUERY_TABLE_PARTITIONING_ENABLE": envVars["SINK_BIGQUERY_TABLE_PARTITIONING_ENABLE"], + } + + jobConfig, err := utils.GoValToProtoStruct(entropy.JobConfig{ + Replicas: 0, + Namespace: namespace, + Containers: []entropy.JobContainer{ + { + Name: "dlq-job", + Image: config.DlqJobImage, + ImagePullPolicy: "Always", + SecretsVolumes: []entropy.JobSecret{ + { + Name: "firehose-bigquery-sink-credential", + Mount: envVars["DLQ_GCS_CREDENTIAL_PATH"], + }, + }, + Limits: entropy.UsageSpec{ + CPU: "0.5", // user + Memory: "2gb", // user + }, + Requests: entropy.UsageSpec{ + CPU: "0.5", // user + Memory: "2gb", // user + }, + EnvVariables: expectedEnvVars, + }, + { + Name: "telegraf", + Image: "telegraf:1.18.0-alpine", + ConfigMapsVolumes: []entropy.JobConfigMap{ + { + Name: "dlq-processor-telegraf", + Mount: "/etc/telegraf", + }, + }, + EnvVariables: map[string]string{ + // To be updated by streaming + "APP_NAME": "", // TBA + "PROMETHEUS_HOST": config.PrometheusHost, + "DEPLOYMENT_NAME": "deployment-name", + "TEAM": dlqJob.Group, + "TOPIC": dlqJob.Topic, + "environment": "production", // TBA + "organization": "de", // TBA + "projectID": dlqJob.Project, + }, + Command: []string{ + "/bin/bash", + }, + Args: []string{ + "-c", + "telegraf & while [ ! -f /shared/job-finished ]; do sleep 5; done; sleep 20 && exit 0", + }, + Limits: entropy.UsageSpec{ + CPU: "100m", // user + Memory: "300Mi", // user + }, + Requests: entropy.UsageSpec{ + CPU: "100m", // user + Memory: "300Mi", // user + }, + }, + }, + JobLabels: map[string]string{ + "firehose": dlqJob.ResourceID, + "topic": dlqJob.Topic, + "date": dlqJob.Date, + }, + Volumes: []entropy.JobVolume{ + { + Name: "firehose-bigquery-sink-credential", + Kind: "secret", + }, + { + Name: "dlq-processor-telegraf", + Kind: "configMap", + }, + }, + }) + require.NoError(t, err) + entropyCtx := metadata.AppendToOutgoingContext(ctx, "user-id", userEmail) + newJobResourcePayload := &entropyv1beta1.Resource{ + Urn: dlqJob.Urn, + Kind: entropy.ResourceKindJob, + Name: fmt.Sprintf( + "%s-%s-%s-%s", + dlqJob.ResourceID, // firehose urn + dlqJob.ResourceType, // firehose / dagger + dlqJob.Topic, // + dlqJob.Date, // + ), + Project: firehoseResource.Project, + Labels: map[string]string{ + "resource_id": dlqJob.ResourceID, + "type": dlqJob.ResourceType, + "date": dlqJob.Date, + "topic": dlqJob.Topic, + "job_type": "dlq", + }, + CreatedBy: jobResource.CreatedBy, + UpdatedBy: jobResource.UpdatedBy, + Spec: &entropyv1beta1.ResourceSpec{ + Configs: jobConfig, + Dependencies: []*entropyv1beta1.ResourceDependency{ + { + Key: "kube_cluster", + Value: kubeCluster, // from firehose configs.kube_cluster + }, + }, + }, + } + + // set conditions + entropyClient := new(mocks.ResourceServiceClient) + entropyClient.On( + "GetResource", ctx, &entropyv1beta1.GetResourceRequest{Urn: dlqJob.ResourceID}, + ).Return(&entropyv1beta1.GetResourceResponse{ + Resource: firehoseResource, + }, nil) + entropyClient.On("CreateResource", entropyCtx, &entropyv1beta1.CreateResourceRequest{ + Resource: newJobResourcePayload, + }).Return(&entropyv1beta1.CreateResourceResponse{ + Resource: jobResource, + }, nil) + defer entropyClient.AssertExpectations(t) + service := dlq.NewService(entropyClient, nil, config) + + err = service.CreateDLQJob(ctx, userEmail, &dlqJob) + + // assertions + expectedDlqJob := models.DlqJob{ + // from input + BatchSize: dlqJob.BatchSize, + ResourceID: dlqJob.ResourceID, + ResourceType: dlqJob.ResourceType, + Topic: dlqJob.Topic, + NumThreads: dlqJob.NumThreads, + Date: dlqJob.Date, + ErrorTypes: dlqJob.ErrorTypes, + + // firehose resource + ContainerImage: config.DlqJobImage, + DlqGcsCredentialPath: envVars["DLQ_GCS_CREDENTIAL_PATH"], + EnvVars: expectedEnvVars, + Group: "", // + KubeCluster: kubeCluster, + Namespace: namespace, + Project: firehoseResource.Project, + PrometheusHost: config.PrometheusHost, + + // hardcoded + Replicas: 0, + + // job resource + Urn: jobResource.Urn, + Status: jobResource.GetState().GetStatus().String(), + CreatedAt: strfmt.DateTime(jobResource.CreatedAt.AsTime()), + CreatedBy: jobResource.CreatedBy, + UpdatedAt: strfmt.DateTime(jobResource.UpdatedAt.AsTime()), + UpdatedBy: jobResource.UpdatedBy, + } + + assert.NoError(t, err) + assert.Equal(t, expectedDlqJob, dlqJob) + }) +}