diff --git a/cli/server/server.go b/cli/server/server.go index 8b29d8c..92d805e 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -117,6 +117,6 @@ func runServer(baseCtx context.Context, nrApp *newrelic.Application, zapLog *zap &gcs.Client{StorageClient: gcsClient}, cfg.Odin.Addr, cfg.StencilAddr, - dlqConfig, + *dlqConfig, ) } diff --git a/internal/server/v1/dlq/handler.go b/internal/server/v1/dlq/handler.go index b188b9d..884da97 100644 --- a/internal/server/v1/dlq/handler.go +++ b/internal/server/v1/dlq/handler.go @@ -58,10 +58,19 @@ func (h *Handler) ListFirehoseDLQ(w http.ResponseWriter, r *http.Request) { }) } -func (*Handler) listDlqJobs(w http.ResponseWriter, _ *http.Request) { - utils.WriteJSON(w, http.StatusOK, map[string]interface{}{ - "dlq_jobs": []interface{}{}, - }) +func (h *Handler) listDlqJobs(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + firehoseUrn := chi.URLParam(r, "firehoseURN") + // fetch py resource (kind = job) + // mapToDlqJob(entropyResource) -> DqlJob + dlqJob, err := h.service.listDlqJob(ctx, firehoseUrn) + if err != nil { + utils.WriteErr(w, err) + return + } + + utils.WriteJSON(w, http.StatusOK, dlqJob) } func (h *Handler) createDlqJob(w http.ResponseWriter, r *http.Request) { @@ -93,18 +102,11 @@ func (h *Handler) createDlqJob(w http.ResponseWriter, r *http.Request) { func (h *Handler) getDlqJob(w http.ResponseWriter, r *http.Request) { // sample to get job urn from route params - ctx := r.Context() + _ = h.jobURN(r) - firehoseUrn := chi.URLParam(r, "firehoseURN") - // fetch entropy resource (kind = job) - // mapToDlqJob(entropyResource) -> DqlJob - dlqJob, err := h.service.getDlqJob(ctx, firehoseUrn) - if err != nil { - utils.WriteErr(w, err) - return - } - - utils.WriteJSON(w, http.StatusOK, dlqJob) + utils.WriteJSON(w, http.StatusOK, map[string]interface{}{ + "dlq_job": nil, + }) } func (*Handler) firehoseURN(r *http.Request) string { diff --git a/internal/server/v1/dlq/handler_test.go b/internal/server/v1/dlq/handler_test.go index d1cdb49..8fc9f55 100644 --- a/internal/server/v1/dlq/handler_test.go +++ b/internal/server/v1/dlq/handler_test.go @@ -512,7 +512,7 @@ func TestCreateDlqJob(t *testing.T) { assert.Equal(t, http.StatusOK, response.Code) resultJSON := response.Body.Bytes() expectedJSON, err := json.Marshal(map[string]interface{}{ - "dlq_list": "test-urn", + "dlq_urn": "test-urn", }) require.NoError(t, err) assert.JSONEq(t, string(expectedJSON), string(resultJSON)) diff --git a/internal/server/v1/dlq/service.go b/internal/server/v1/dlq/service.go index 44519b5..fc87d7a 100644 --- a/internal/server/v1/dlq/service.go +++ b/internal/server/v1/dlq/service.go @@ -9,6 +9,7 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" + "github.com/goto/dex/entropy" "github.com/goto/dex/generated/models" "github.com/goto/dex/internal/server/gcs" ) @@ -67,15 +68,28 @@ func (s *Service) CreateDLQJob(ctx context.Context, userEmail string, dlqJob *mo return nil } -func (s *Service) getDlqJob(ctx context.Context, firehoseUrn string) (*models.DlqJob, error) { - dlqJob := &models.DlqJob{} +func (s *Service) listDlqJob(ctx context.Context, firehoseUrn string) ([]*models.DlqJob, error) { + dlqJob := []*models.DlqJob{} - def, err := s.client.GetResource(ctx, &entropyv1beta1.GetResourceRequest{Urn: firehoseUrn}) - if err != nil { - return nil, ErrFirehoseNotFound + rpcReq := &entropyv1beta1.ListResourcesRequest{ + Kind: entropy.ResourceKindJob, } - dlqJob, err = MapToDlqJob(def.GetResource()) + rpcResp, err := s.client.ListResources(ctx, rpcReq) + if err != nil { + st := status.Convert(err) + if st.Code() == codes.NotFound { + return nil, ErrFirehoseNotFound + } + return nil, err + } + for _, res := range rpcResp.GetResources() { + def, err := MapToDlqJob(res) + if err != nil { + return nil, err + } + dlqJob = append(dlqJob, def) + } return dlqJob, nil }