Skip to content
This repository has been archived by the owner on Oct 25, 2023. It is now read-only.

Commit

Permalink
feat(dlq): get dlq job details api
Browse files Browse the repository at this point in the history
  • Loading branch information
abhishekv24 committed Oct 18, 2023
1 parent 89f65a0 commit b1463cd
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 22 deletions.
3 changes: 3 additions & 0 deletions generated/models/dlq_job.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 9 additions & 5 deletions internal/server/v1/dlq/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,16 @@ func (*Handler) createDlqJob(w http.ResponseWriter, _ *http.Request) {
}

func (h *Handler) getDlqJob(w http.ResponseWriter, r *http.Request) {
// sample to get job urn from route params
_ = h.jobURN(r)
ctx := r.Context()
jobURN := h.jobURN(r)

utils.WriteJSON(w, http.StatusOK, map[string]interface{}{
"dlq_job": nil,
})
dlqJob, err := h.service.getDlqJob(ctx, jobURN)
if err != nil {
utils.WriteErr(w, err)
return
}

utils.WriteJSON(w, http.StatusOK, dlqJob)
}

func (*Handler) firehoseURN(r *http.Request) string {
Expand Down
39 changes: 22 additions & 17 deletions internal/server/v1/dlq/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,23 +221,28 @@ func MapToDlqJob(r *entropyv1beta1.Resource) (*models.DlqJob, error) {
errorTypes := envVars["DLQ_ERROR_TYPES"]

job := models.DlqJob{
Urn: r.Urn,
ResourceID: labels["resource_id"],
ResourceType: labels["resource_type"],
Date: labels["date"],
Topic: labels["topic"],
Namespace: modConf.Namespace,
ErrorTypes: errorTypes,
BatchSize: batchSize,
NumThreads: numThreads,
Replicas: int64(modConf.Replicas),
KubeCluster: kubeCluster,
Project: r.Project,
CreatedBy: r.CreatedBy,
UpdatedBy: r.UpdatedBy,
Status: string(r.GetState().Status),
CreatedAt: strfmt.DateTime(r.CreatedAt.AsTime()),
UpdatedAt: strfmt.DateTime(r.UpdatedAt.AsTime()),
Urn: r.Urn,
Name: r.Name,
ResourceID: labels["resource_id"],
ResourceType: labels["resource_type"],
Date: labels["date"],
Topic: labels["topic"],
PrometheusHost: labels["prometheus_host"],
Namespace: modConf.Namespace,
ContainerImage: modConf.Containers[0].Image,
ErrorTypes: errorTypes,
BatchSize: batchSize,
NumThreads: numThreads,
Replicas: int64(modConf.Replicas),
KubeCluster: kubeCluster,
Project: r.Project,
CreatedBy: r.CreatedBy,
UpdatedBy: r.UpdatedBy,
Status: string(r.GetState().Status),
CreatedAt: strfmt.DateTime(r.CreatedAt.AsTime()),
UpdatedAt: strfmt.DateTime(r.UpdatedAt.AsTime()),
EnvVars: envVars,
DlqGcsCredentialPath: envVars["DLQ_GCS_CREDENTIAL_PATH"],
}

return &job, nil
Expand Down
17 changes: 17 additions & 0 deletions internal/server/v1/dlq/service.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package dlq

import (
"context"

entropyv1beta1rpc "buf.build/gen/go/gotocompany/proton/grpc/go/gotocompany/entropy/v1beta1/entropyv1beta1grpc"
entropyv1beta1 "buf.build/gen/go/gotocompany/proton/protocolbuffers/go/gotocompany/entropy/v1beta1"

"github.com/goto/dex/generated/models"
"github.com/goto/dex/internal/server/gcs"
)

Expand All @@ -24,3 +28,16 @@ func NewService(client entropyv1beta1rpc.ResourceServiceClient, gcsClient gcs.Bl
cfg: cfg,
}
}

func (s *Service) getDlqJob(ctx context.Context, jobURN string) (*models.DlqJob, error) {
res, err := s.client.GetResource(ctx, &entropyv1beta1.GetResourceRequest{Urn: jobURN})
if err != nil {
return nil, err
}

dlqJob, err := MapToDlqJob(res.GetResource())
if err != nil {
return nil, err
}
return dlqJob, nil
}

0 comments on commit b1463cd

Please sign in to comment.