Skip to content
This repository has been archived by the owner on Oct 25, 2023. It is now read-only.

Commit

Permalink
feat: Create & List DLQ job api (#70)
Browse files Browse the repository at this point in the history
* feat: create DLQ job

* feat: create DLQ job

* Merge

* feat: Create Dlq job api witht testing

* feat: swagger

* feat: add dlqJob name

* fix: mapper

* test: created testing

* feat: list dlq Job

* fix: response createdlqjob

* feat: ListDlqJob api

* feat(dlq): missing project in Create job api

* feat: filter for listDLQ

* test: add error testing for create dlq job

* fix: lint error

* feat(dlq): store dlq information in entropy labels

* fix: lint warning

* feat: should not create dlq job with empty image and prom host

---------

Co-authored-by: Lifosmin Simon <[email protected]>
Co-authored-by: Stewart Jingga <[email protected]>
  • Loading branch information
3 people authored Oct 24, 2023
1 parent cf39840 commit cee9ad8
Show file tree
Hide file tree
Showing 16 changed files with 1,646 additions and 60 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# dex

Data Experience

## Setting up GCloud Credentials
1. login to cloud - `gcloud auth login`
2. setup ADC - `gcloud auth application-default login`
3 changes: 1 addition & 2 deletions cli/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ func runServer(baseCtx context.Context, nrApp *newrelic.Application, zapLog *zap
}

wardenClient := warden.NewClient(cfg.Warden.Addr)
dlqConfig := &dlq.DlqJobConfig{
// TODO: map cfg.Dlq\
dlqConfig := dlq.DlqJobConfig{
DlqJobImage: cfg.Dlq.DlqJobImage,
PrometheusHost: cfg.Dlq.PrometheusHost,
}
Expand Down
1 change: 0 additions & 1 deletion dex
Submodule dex deleted from 786037
3 changes: 3 additions & 0 deletions generated/models/dlq_job.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

208 changes: 208 additions & 0 deletions generated/models/dlq_job_form.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func Serve(ctx context.Context, addr string,
odinAddr string,
stencilAddr string,
wardenClient *warden.Client,
dlqConfig *dlqv1.DlqJobConfig,
dlqConfig dlqv1.DlqJobConfig,
) error {
alertSvc := alertsv1.NewService(sirenClient)

Expand Down
3 changes: 3 additions & 0 deletions internal/server/v1/dlq/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,7 @@ import "errors"
var (
ErrFirehoseNamespaceNotFound = errors.New("could not find firehose namespace from resource output")
ErrFirehoseNamespaceInvalid = errors.New("invalid firehose namespace from resource output")
ErrFirehoseNotFound = errors.New("firehose not found")
ErrEmptyConfigImage = errors.New("empty dlq job image")
ErrEmptyConfigPrometheusHost = errors.New("empty prometheus host")
)
38 changes: 38 additions & 0 deletions internal/server/v1/dlq/fixtures/list_dlq_jobs.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
"dlq_jobs": [
{
"resource_id": "test-resource-id",
"resource_type": "test-resource-type",
"topic": "test-topic",
"date": "2022-10-21",
"name": "test1-firehose-test-topic-2022-10-21",
"group": "test-group",
"kube_cluster": "test-kube-cluster",
"project": "test-project-1",
"prometheus_host": "prom_host",
"urn": "test-urn-1",
"status": "STATUS_UNSPECIFIED",
"created_at": "2022-12-10T00:00:00.000Z",
"created_by": "[email protected]",
"updated_at": "2023-12-10T02:00:00.000Z",
"updated_by": "[email protected]"
},
{
"resource_id": "test-resource-id",
"resource_type": "test-resource-type",
"topic": "test-topic",
"date": "2022-10-21",
"name": "test2-firehose-test-topic-2022-10-21",
"group": "test-group",
"kube_cluster": "test-kube-cluster",
"project": "test-project-1",
"prometheus_host": "prom_host2",
"urn": "test-urn-2",
"status": "STATUS_UNSPECIFIED",
"created_at": "2012-10-10T04:00:00.000Z",
"created_by": "[email protected]",
"updated_at": "2013-02-12T02:04:00.000Z",
"updated_by": "[email protected]"
}
]
}
75 changes: 68 additions & 7 deletions internal/server/v1/dlq/handler.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package dlq

import (
"errors"
"log"
"net/http"

entropyv1beta1 "buf.build/gen/go/gotocompany/proton/protocolbuffers/go/gotocompany/entropy/v1beta1"
"github.com/go-chi/chi/v5"

"github.com/goto/dex/entropy"
"github.com/goto/dex/generated/models"
"github.com/goto/dex/internal/server/gcs"
"github.com/goto/dex/internal/server/reqctx"
"github.com/goto/dex/internal/server/utils"
"github.com/goto/dex/internal/server/v1/firehose"
)
Expand Down Expand Up @@ -55,18 +58,76 @@ func (h *Handler) ListFirehoseDLQ(w http.ResponseWriter, r *http.Request) {
})
}

func (*Handler) listDlqJobs(w http.ResponseWriter, _ *http.Request) {
func (h *Handler) listDlqJobs(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()

labelFilter := map[string]string{}
if resourceID := r.URL.Query().Get("resource_id"); resourceID != "" {
labelFilter["resource_id"] = resourceID
}
if resourceType := r.URL.Query().Get("resource_type"); resourceType != "" {
labelFilter["resource_type"] = resourceType
}
if date := r.URL.Query().Get("date"); date != "" {
labelFilter["date"] = date
}

dlqJob, err := h.service.ListDlqJob(ctx, labelFilter)
if err != nil {
if errors.Is(err, ErrFirehoseNotFound) {
utils.WriteErrMsg(w, http.StatusNotFound, err.Error())
return
}
utils.WriteErr(w, err)
return
}

utils.WriteJSON(w, http.StatusOK, map[string]interface{}{
"dlq_jobs": []interface{}{},
})
"dlq_jobs": dlqJob,
},
)
}

func (*Handler) createDlqJob(w http.ResponseWriter, _ *http.Request) {
// transform request body into DlqJob (validation?)
// call service.CreateDLQJob
func (h *Handler) createDlqJob(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
reqCtx := reqctx.From(ctx)
if reqCtx.UserEmail == "" {
utils.WriteErrMsg(w, http.StatusUnauthorized, "user header is required")
return
}

var body models.DlqJobForm
if err := utils.ReadJSON(r, &body); err != nil {
utils.WriteErr(w, err)
return
}
if err := body.Validate(nil); err != nil {
utils.WriteErrMsg(w, http.StatusBadRequest, err.Error())
return
}

dlqJob := models.DlqJob{
BatchSize: *body.BatchSize,
Date: *body.Date,
ErrorTypes: body.ErrorTypes,
NumThreads: *body.NumThreads,
ResourceID: *body.ResourceID,
ResourceType: *body.ResourceType,
Topic: *body.Topic,
}

updatedDlqJob, err := h.service.CreateDLQJob(ctx, reqCtx.UserEmail, dlqJob)
if err != nil {
if errors.Is(err, ErrFirehoseNotFound) {
utils.WriteErrMsg(w, http.StatusNotFound, err.Error())
return
}
utils.WriteErr(w, err)
return
}

utils.WriteJSON(w, http.StatusOK, map[string]interface{}{
"dlq_job": nil,
"dlq_job": updatedDlqJob,
})
}

Expand Down
Loading

0 comments on commit cee9ad8

Please sign in to comment.