Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Draft]add custom redis scaler #3

Open
wants to merge 25 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/workflows/fossa.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
name: FOSSA
on:
push:
branches: [main]
workflow_dispatch:

jobs:
build:
Expand Down
81 changes: 40 additions & 41 deletions .github/workflows/main-build.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
name: main-build
on:
push:
branches:
- main
workflow_dispatch:

jobs:
build:
name: build
Expand Down Expand Up @@ -105,12 +104,12 @@ jobs:
AZURE_DEVOPS_PAT: ${{ secrets.AZURE_DEVOPS_PAT }}
AZURE_DEVOPS_POOL_NAME: ${{ secrets.AZURE_DEVOPS_POOL_NAME }}
AZURE_DEVOPS_PROJECT: ${{ secrets.AZURE_DEVOPS_PROJECT }}
AZURE_KEYVAULT_URI: ${{ secrets.AZURE_KEYVAULT_URI }}
AZURE_KEYVAULT_URI: ${{ secrets.AZURE_KEYVAULT_URI }}
AZURE_LOG_ANALYTICS_WORKSPACE_ID: ${{ secrets.AZURE_LOG_ANALYTICS_WORKSPACE_ID }}
AZURE_RESOURCE_GROUP: ${{ secrets.AZURE_RESOURCE_GROUP }}
AZURE_RUN_WORKLOAD_IDENTITY_TESTS: true
AZURE_SERVICE_BUS_CONNECTION_STRING: ${{ secrets.AZURE_SERVICE_BUS_CONNECTION_STRING }}
AZURE_SERVICE_BUS_ALTERNATIVE_CONNECTION_STRING: ${{ secrets.AZURE_SERVICE_BUS_ALTERNATIVE_CONNECTION_STRING }}
AZURE_SERVICE_BUS_CONNECTION_STRING: ${{ secrets.AZURE_SERVICE_BUS_CONNECTION_STRING }}
AZURE_SERVICE_BUS_ALTERNATIVE_CONNECTION_STRING: ${{ secrets.AZURE_SERVICE_BUS_ALTERNATIVE_CONNECTION_STRING }}
AZURE_SP_APP_ID: ${{ secrets.AZURE_SP_APP_ID }}
AZURE_SP_OBJECT_ID: ${{ secrets.AZURE_SP_OBJECT_ID }}
AZURE_SP_KEY: ${{ secrets.AZURE_SP_KEY }}
Expand Down Expand Up @@ -183,76 +182,76 @@ jobs:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v3

- name: Run Trivy vulnerability scanner in repo mode
uses: aquasecurity/[email protected]
with:
scan-type: 'fs'
ignore-unfixed: false
format: 'sarif'
output: 'code.sarif'
exit-code: 1
skip-dirs: tests # Remove this once the ts files are removed

- name: Upload Trivy scan results to GitHub Security tab
uses: github/codeql-action/upload-sarif@v2
if: always()
with:
sarif_file: 'code.sarif'
- uses: actions/checkout@v3

- name: Run Trivy vulnerability scanner in repo mode
uses: aquasecurity/[email protected]
with:
scan-type: "fs"
ignore-unfixed: false
format: "sarif"
output: "code.sarif"
exit-code: 1
skip-dirs: tests # Remove this once the ts files are removed

- name: Upload Trivy scan results to GitHub Security tab
uses: github/codeql-action/upload-sarif@v2
if: always()
with:
sarif_file: "code.sarif"

trivy-scan-metrics-server:
name: Trivy scan metrics server image - ${{ matrix.name }}
needs: build
runs-on: ${{ matrix.runner }}
strategy:
matrix:
matrix:
include:
- runner: ARM64
name: arm64
- runner: ubuntu-latest
name: amd64
- runner: ARM64
name: arm64
- runner: ubuntu-latest
name: amd64

steps:
- uses: actions/checkout@v3

- name: Run Trivy on metrics-server
uses: aquasecurity/[email protected]
with:
scan-type: 'image'
scan-type: "image"
image-ref: ghcr.io/kedacore/keda-metrics-apiserver:main
format: 'sarif'
output: 'metrics-server.sarif'
format: "sarif"
output: "metrics-server.sarif"

- name: Upload Trivy scan results to GitHub Security tab
uses: github/codeql-action/upload-sarif@v2
with:
sarif_file: 'metrics-server.sarif'
sarif_file: "metrics-server.sarif"

trivy-scan-keda:
name: Trivy scan keda image - ${{ matrix.name }}
needs: build
runs-on: ${{ matrix.runner }}
strategy:
matrix:
matrix:
include:
- runner: ARM64
name: arm64
- runner: ubuntu-latest
name: amd64
- runner: ARM64
name: arm64
- runner: ubuntu-latest
name: amd64

steps:
- uses: actions/checkout@v3

- name: Run Trivy on operator
uses: aquasecurity/[email protected]
with:
scan-type: 'image'
scan-type: "image"
image-ref: ghcr.io/kedacore/keda:main
format: 'sarif'
output: 'keda.sarif'
format: "sarif"
output: "keda.sarif"

- name: Upload Trivy scan results to GitHub Security tab
uses: github/codeql-action/upload-sarif@v2
with:
sarif_file: 'keda.sarif'
sarif_file: "keda.sarif"
71 changes: 71 additions & 0 deletions .github/workflows/release-docker.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
name: Build docker
on:
workflow_dispatch:

jobs:
deploy:
name: Build docker
runs-on: ubuntu-latest

permissions:
contents: write
packages: write
id-token: write # needed for signing the images with GitHub OIDC Token **not production ready**

env:
ENV: qa
VAULT_PREFIX: in2
DEPLOYMENT_NAME: keda
RELEASE_NAME: keda
GCP_PROJECT_ID: dev-in2
GCP_REGION: asia-south1
GCP_NAMESPACE: ion2
GCP_CLUSTER: dev-in2
DOCKER_GIT_TOKEN: ${{ secrets.DOCKER_GIT_TOKEN }}

container: ghcr.io/kedacore/build-tools:1.17.13

steps:
- name: Set up Repo Cloud SDK
uses: google-github-actions/[email protected]
with:
project_id: ${{ env.GCP_PROJECT_ID }}
service_account_key: ${{ secrets.GKE_SA_KEY }}
export_default_credentials: true

- name: Checkout
uses: actions/checkout@v3
with:
# ref: ${{ github.event.pull_request.head.ref }}
fetch-depth: 1
token: ${{ secrets.DOCKER_GIT_TOKEN }}

- name: Set up Dev Cloud SDK
uses: google-github-actions/[email protected]
with:
service_account_key: ${{ secrets.GCP_DEV_SA_KEY }}
export_default_credentials: true

- name: Register workspace path
run: git config --global --add safe.directory "/__w/keda/keda"

- name: Go modules sync
run: go mod tidy -compat=1.17

- name: Get the version
id: get_version
run: echo ::set-output name=VERSION::$(./version.sh)

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2

- name: Docker configuration
run: |-
gcloud --quiet auth configure-docker

- name: Publish KEDA images
run: make publish
env:
VERSION: ${{ steps.get_version.outputs.VERSION }}
IMAGE_REGISTRY: gcr.io/dev-in-309805
IMAGE_REPO: 100mslive
7 changes: 6 additions & 1 deletion pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"os"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -386,11 +387,15 @@ func (s *kafkaScaler) getLagForPartition(topic string, partitionID int32, offset
if block == nil {
errMsg := fmt.Errorf("error finding offset block for topic %s and partition %d", topic, partitionID)
s.logger.Error(errMsg, "")
return 0, errMsg
s.logger.Error(fmt.Errorf("exiting to due to custom exit"), "")
os.Exit(1)
//return 0, errMsg
}
if block.Err > 0 {
errMsg := fmt.Errorf("error finding offset block for topic %s and partition %d: %s", topic, partitionID, offsets.Err.Error())
s.logger.Error(errMsg, "")
s.logger.Error(fmt.Errorf("exiting to due to custom exit"), "")
os.Exit(1)
}

consumerOffset := block.Offset
Expand Down
45 changes: 43 additions & 2 deletions pkg/scalers/kubernetes_workload_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type kubernetesWorkloadScaler struct {
const (
kubernetesWorkloadMetricType = "External"
podSelectorKey = "podSelector"
allPodsSelectorKey = "allPodsSelector"
valueKey = "value"
activationValueKey = "activationValue"
)
Expand All @@ -36,6 +37,7 @@ var phasesCountedAsTerminated = []corev1.PodPhase{

type kubernetesWorkloadMetadata struct {
podSelector labels.Selector
allPodsSelector labels.Selector
namespace string
value float64
activationValue float64
Expand Down Expand Up @@ -70,6 +72,10 @@ func parseWorkloadMetadata(config *ScalerConfig) (*kubernetesWorkloadMetadata, e
if err != nil || meta.podSelector.String() == "" {
return nil, fmt.Errorf("invalid pod selector")
}
meta.allPodsSelector, err = labels.Parse(config.TriggerMetadata[allPodsSelectorKey])
if err != nil || meta.allPodsSelector.String() == "" {
return nil, fmt.Errorf("invalid all pods selector")
}
meta.value, err = strconv.ParseFloat(config.TriggerMetadata[valueKey], 64)
if err != nil || meta.value == 0 {
return nil, fmt.Errorf("value must be a float greater than 0")
Expand All @@ -94,8 +100,14 @@ func (s *kubernetesWorkloadScaler) IsActive(ctx context.Context) (bool, error) {
if err != nil {
return false, err
}
totalPods, err := s.getTotalValue(ctx)
if err != nil {
return false, err
}
logger := s.logger.WithValues("scaledjob.AllPodsSelector", s.metadata.allPodsSelector)
logger.Info("Workload", "Value", fmt.Sprintf("%v,%v", pods, totalPods))

return float64(pods) > s.metadata.activationValue, nil
return float64(pods)/float64(totalPods) > s.metadata.activationValue, nil
}

// Close no need for kubernetes workload scaler
Expand All @@ -121,8 +133,12 @@ func (s *kubernetesWorkloadScaler) GetMetrics(ctx context.Context, metricName st
if err != nil {
return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error inspecting kubernetes workload: %s", err)
}
totalPods, err := s.getTotalValue(ctx)
if err != nil {
return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error inspecting kubernetes workload: %s", err)
}

metric := GenerateMetricInMili(metricName, float64(pods))
metric := GenerateMetricInMili(metricName, float64(pods)/float64(totalPods))

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}
Expand All @@ -149,6 +165,31 @@ func (s *kubernetesWorkloadScaler) getMetricValue(ctx context.Context) (int64, e
return count, nil
}

func (s *kubernetesWorkloadScaler) getTotalValue(ctx context.Context) (int64, error) {
podList := &corev1.PodList{}
listOptions := client.ListOptions{}
listOptions.LabelSelector = s.metadata.allPodsSelector
listOptions.Namespace = s.metadata.namespace
opts := []client.ListOption{
&listOptions,
}

err := s.kubeClient.List(ctx, podList, opts...)
if err != nil {
return 0, err
}

var count int64
for _, pod := range podList.Items {
count += getCountValue(pod)
}
if count == 0 {
count = 1
}

return count, nil
}

func getCountValue(pod corev1.Pod) int64 {
for _, ignore := range phasesCountedAsTerminated {
if pod.Status.Phase == ignore {
Expand Down
19 changes: 16 additions & 3 deletions pkg/scalers/mongo_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,11 @@ func (s *mongoDBScaler) Close(ctx context.Context) error {
return nil
}

type resultVal struct {
Desired int64 `json:"desired" bson:"desired"`
Current int64 `json:"current" bson:"current"`
}

// getQueryResult query mongoDB by meta.query
func (s *mongoDBScaler) getQueryResult(ctx context.Context) (int64, error) {
ctx, cancel := context.WithTimeout(ctx, mongoDBDefaultTimeOut)
Expand All @@ -235,13 +240,21 @@ func (s *mongoDBScaler) getQueryResult(ctx context.Context) (int64, error) {
return 0, err
}

docsNum, err := s.client.Database(s.metadata.dbName).Collection(s.metadata.collection).CountDocuments(ctx, filter)
var result resultVal

err = s.client.Database(s.metadata.dbName).Collection(s.metadata.collection).FindOne(ctx, filter).Decode(&result)
if err != nil {
s.logger.Error(err, fmt.Sprintf("failed to query %v in %v, because of %v", s.metadata.dbName, s.metadata.collection, err))
return 0, err
}

return docsNum, nil
res := result.Desired - result.Current
if res > result.Desired {
res = result.Desired
}
if res < 0 {
res = 0
}
return res, nil
}

// GetMetrics query from mongoDB,and return to external metrics
Expand Down
Loading
Loading