Skip to content
This repository has been archived by the owner on Oct 25, 2023. It is now read-only.

Commit

Permalink
fix: fixed checks for list firehose handler
Browse files Browse the repository at this point in the history
  • Loading branch information
Ayushi Sharma committed Oct 23, 2023
1 parent 89f65a0 commit 07249aa
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 17 deletions.
27 changes: 10 additions & 17 deletions internal/server/v1/firehose/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ func (api *firehoseAPI) handleCreate(w http.ResponseWriter, r *http.Request) {
labelTeam: groupSlug,
labelStream: *def.Configs.StreamName,
labelDescription: def.Description,
labelSinkType: def.Configs.EnvVars["SINK_TYPE"],
labelSinkType: def.Configs.EnvVars[configSinkType],
labelTopic: def.Configs.EnvVars[configSourceKafkaTopic],
labelKubeCluster: *def.Configs.KubeCluster,
})

prj, err := project.GetProject(ctx, def.Project, api.Shield)
Expand Down Expand Up @@ -180,12 +182,6 @@ func (api *firehoseAPI) handleList(w http.ResponseWriter, r *http.Request) {
return
}

includeEnv := []string{
configSinkType,
configSourceKafkaTopic,
configSourceKafkaConsumerGroup,
}

topicName := q.Get("topic_name")
kubeCluster := q.Get("kube_cluster")
sinkTypes := sinkTypeSet(q.Get("sink_type"))
Expand All @@ -197,26 +193,19 @@ func (api *firehoseAPI) handleList(w http.ResponseWriter, r *http.Request) {
return
}

if kubeCluster != "" && *def.Configs.KubeCluster != kubeCluster {
if kubeCluster != "" && def.Labels[labelKubeCluster] != kubeCluster {
continue
}

if topicName != "" && def.Configs.EnvVars[configSourceKafkaTopic] != topicName {
if topicName != "" && def.Labels[labelTopic] != topicName {
continue
}

_, include := sinkTypes[def.Configs.EnvVars[configSinkType]]
_, include := sinkTypes[def.Labels[labelSinkType]]
if len(sinkTypes) > 0 && !include {
continue
}

// only return selected keys to reduce list response-size.
returnEnv := map[string]string{}
for _, key := range includeEnv {
returnEnv[key] = def.Configs.EnvVars[key]
}
def.Configs.EnvVars = returnEnv

arr = append(arr, def)
}

Expand Down Expand Up @@ -387,6 +376,10 @@ func (api *firehoseAPI) handlePartialUpdate(w http.ResponseWriter, r *http.Reque
labels[labelSinkType] = req.Configs.EnvVars["SINK_TYPE"]
}

if existing.Configs.EnvVars[configSourceKafkaTopic] != req.Configs.EnvVars[configSourceKafkaTopic] {
labels[labelTopic] = req.Configs.EnvVars[configSourceKafkaTopic]
}

existing.Configs.EnvVars = cloneAndMergeMaps(
existing.Configs.EnvVars,
req.Configs.EnvVars,
Expand Down
2 changes: 2 additions & 0 deletions internal/server/v1/firehose/mappings.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const (
labelStream = "stream_name"
labelDescription = "description"
labelSinkType = "sink_type"
labelKubeCluster = "kube_cluster"
labelTopic = "topic"
)

var nonAlphaNumPattern = regexp.MustCompile("[^a-zA-Z0-9]+")
Expand Down

0 comments on commit 07249aa

Please sign in to comment.