From 07249aa543fb37724a28a7c2bddd707051b892c1 Mon Sep 17 00:00:00 2001 From: Ayushi Sharma Date: Mon, 23 Oct 2023 14:36:57 +0530 Subject: [PATCH] fix: fixed checks for list firehose handler --- internal/server/v1/firehose/crud.go | 27 +++++++++---------------- internal/server/v1/firehose/mappings.go | 2 ++ 2 files changed, 12 insertions(+), 17 deletions(-) diff --git a/internal/server/v1/firehose/crud.go b/internal/server/v1/firehose/crud.go index d4419f9..2878b27 100644 --- a/internal/server/v1/firehose/crud.go +++ b/internal/server/v1/firehose/crud.go @@ -70,7 +70,9 @@ func (api *firehoseAPI) handleCreate(w http.ResponseWriter, r *http.Request) { labelTeam: groupSlug, labelStream: *def.Configs.StreamName, labelDescription: def.Description, - labelSinkType: def.Configs.EnvVars["SINK_TYPE"], + labelSinkType: def.Configs.EnvVars[configSinkType], + labelTopic: def.Configs.EnvVars[configSourceKafkaTopic], + labelKubeCluster: *def.Configs.KubeCluster, }) prj, err := project.GetProject(ctx, def.Project, api.Shield) @@ -180,12 +182,6 @@ func (api *firehoseAPI) handleList(w http.ResponseWriter, r *http.Request) { return } - includeEnv := []string{ - configSinkType, - configSourceKafkaTopic, - configSourceKafkaConsumerGroup, - } - topicName := q.Get("topic_name") kubeCluster := q.Get("kube_cluster") sinkTypes := sinkTypeSet(q.Get("sink_type")) @@ -197,26 +193,19 @@ func (api *firehoseAPI) handleList(w http.ResponseWriter, r *http.Request) { return } - if kubeCluster != "" && *def.Configs.KubeCluster != kubeCluster { + if kubeCluster != "" && def.Labels[labelKubeCluster] != kubeCluster { continue } - if topicName != "" && def.Configs.EnvVars[configSourceKafkaTopic] != topicName { + if topicName != "" && def.Labels[labelTopic] != topicName { continue } - _, include := sinkTypes[def.Configs.EnvVars[configSinkType]] + _, include := sinkTypes[def.Labels[labelSinkType]] if len(sinkTypes) > 0 && !include { continue } - // only return selected keys to reduce list response-size. - returnEnv := map[string]string{} - for _, key := range includeEnv { - returnEnv[key] = def.Configs.EnvVars[key] - } - def.Configs.EnvVars = returnEnv - arr = append(arr, def) } @@ -387,6 +376,10 @@ func (api *firehoseAPI) handlePartialUpdate(w http.ResponseWriter, r *http.Reque labels[labelSinkType] = req.Configs.EnvVars["SINK_TYPE"] } + if existing.Configs.EnvVars[configSourceKafkaTopic] != req.Configs.EnvVars[configSourceKafkaTopic] { + labels[labelTopic] = req.Configs.EnvVars[configSourceKafkaTopic] + } + existing.Configs.EnvVars = cloneAndMergeMaps( existing.Configs.EnvVars, req.Configs.EnvVars, diff --git a/internal/server/v1/firehose/mappings.go b/internal/server/v1/firehose/mappings.go index cd922e2..1f05960 100644 --- a/internal/server/v1/firehose/mappings.go +++ b/internal/server/v1/firehose/mappings.go @@ -28,6 +28,8 @@ const ( labelStream = "stream_name" labelDescription = "description" labelSinkType = "sink_type" + labelKubeCluster = "kube_cluster" + labelTopic = "topic" ) var nonAlphaNumPattern = regexp.MustCompile("[^a-zA-Z0-9]+")