Skip to content

Commit

Permalink
K8OP-295 Expose Medusa's gRPC server port configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
rzvoncek committed Nov 25, 2024
1 parent b053bf8 commit a9b2d07
Show file tree
Hide file tree
Showing 16 changed files with 149 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG/CHANGELOG-1.21.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ When cutting a new release, update the `unreleased` heading to the tag being gen
* [CHANGE] [#1441](https://github.com/k8ssandra/k8ssandra-operator/issues/1441) Use k8ssandra-client instead of k8ssandra-tools for CRD upgrades
* [BUGFIX] [#1383](https://github.com/k8ssandra/k8ssandra-operator/issues/1383) Do not create MedusaBackup if MadusaBakupJob did not fully succeed
* [ENHANCEMENT] [#1667](https://github.com/k8ssahttps://github.com/k8ssandra/k8ssandra/issues/1667) Add `skipSchemaMigration` option to `K8ssandraCluster.spec.reaper`
* [ENHANCEMENT] [#1455](https://github.com/k8ssandra/k8ssandra-operator/issues/1455) Expose configuration of Medusa's gRPC server port
10 changes: 10 additions & 0 deletions apis/medusa/v1alpha1/medusa_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,13 @@ type Storage struct {
PodStorage *PodStorageSettings `json:"podStorage,omitempty"`
}

type Service struct {
// GrpcPort to listen on when running as gRPC service
// Included grpc in the field name to avoid misunderstanding with storage.port
// +optional
GrpcPort int `json:"grpcPort,omitempty"`
}

type PodStorageSettings struct {
// Settings for the pod's storage when backups use the local storage provider.

Expand Down Expand Up @@ -160,6 +167,9 @@ type MedusaClusterTemplate struct {
// Provides all storage backend related properties for backups.
StorageProperties Storage `json:"storageProperties,omitempty"`

// Provides all service related properties for Medusa.
ServiceProperties Service `json:"serviceProperties,omitempty"`

// Certificates for Medusa if client encryption is enabled in Cassandra.
// The secret must be in the same namespace as Cassandra and must contain three keys: "rootca.crt", "client.crt_signed" and "client.key".
// See https://docs.datastax.com/en/developer/python-driver/latest/security/ for more information on the required files.
Expand Down
9 changes: 9 additions & 0 deletions charts/k8ssandra-operator/crds/k8ssandra-operator-crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26443,6 +26443,15 @@ spec:
type: string
type: object
type: object
serviceProperties:
description: Provides all service related properties for Medusa.
properties:
grpcPort:
description: |-
GrpcPort to listen on when running as gRPC service
Included grpc in the field name to avoid misunderstanding with storage.port
type: integer
type: object
storageProperties:
description: Provides all storage backend related properties for
backups.
Expand Down
9 changes: 9 additions & 0 deletions config/crd/bases/k8ssandra.io_k8ssandraclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26381,6 +26381,15 @@ spec:
type: string
type: object
type: object
serviceProperties:
description: Provides all service related properties for Medusa.
properties:
grpcPort:
description: |-
GrpcPort to listen on when running as gRPC service
Included grpc in the field name to avoid misunderstanding with storage.port
type: integer
type: object
storageProperties:
description: Provides all storage backend related properties for
backups.
Expand Down
15 changes: 13 additions & 2 deletions controllers/medusa/medusabackupjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package medusa
import (
"context"
"fmt"
"github.com/k8ssandra/k8ssandra-operator/pkg/cassandra"
"net"
"strings"

Expand Down Expand Up @@ -305,7 +306,12 @@ func (r *MedusaBackupJobReconciler) createMedusaBackup(ctx context.Context, back
}

func doMedusaBackup(ctx context.Context, name string, backupType shared.BackupType, pod *corev1.Pod, clientFactory medusa.ClientFactory, logger logr.Logger) (string, error) {
addr := net.JoinHostPort(pod.Status.PodIP, fmt.Sprint(shared.BackupSidecarPort))
medusaPort := shared.BackupSidecarPort
explicitPort, found := cassandra.FindContainerPort(pod, "medusa", "grpc")
if found {
medusaPort = explicitPort
}
addr := net.JoinHostPort(pod.Status.PodIP, fmt.Sprint(medusaPort))
logger.Info("connecting to backup sidecar", "Pod", pod.Name, "Address", addr)
if medusaClient, err := clientFactory.NewClient(ctx, addr); err != nil {
return "", err
Expand All @@ -322,7 +328,12 @@ func doMedusaBackup(ctx context.Context, name string, backupType shared.BackupTy
}

func backupStatus(ctx context.Context, name string, pod *corev1.Pod, clientFactory medusa.ClientFactory, logger logr.Logger) (medusa.StatusType, error) {
addr := net.JoinHostPort(pod.Status.PodIP, fmt.Sprint(shared.BackupSidecarPort))
medusaPort := shared.BackupSidecarPort
explicitPort, found := cassandra.FindContainerPort(pod, "medusa", "grpc")
if found {
medusaPort = explicitPort
}
addr := net.JoinHostPort(pod.Status.PodIP, fmt.Sprint(medusaPort))
logger.Info("connecting to backup sidecar", "Pod", pod.Name, "Address", addr)
if medusaClient, err := clientFactory.NewClient(ctx, addr); err != nil {
logger.Error(err, "Could not make a new medusa client")
Expand Down
4 changes: 4 additions & 0 deletions controllers/medusa/medusabackupjob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ func testMedusaBackupDatacenter(t *testing.T, ctx context.Context, f *framework.
Name: cassandraUserSecret,
},
},
// adding this did not actually break any assertions
ServiceProperties: api.Service{
GrpcPort: 1234,
},
CassandraUserSecretRef: corev1.LocalObjectReference{
Name: cassandraUserSecret,
},
Expand Down
8 changes: 7 additions & 1 deletion controllers/medusa/medusarestorejob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"github.com/go-logr/logr"
"github.com/k8ssandra/k8ssandra-operator/pkg/shared"
"k8s.io/utils/ptr"
"net"
"time"

Expand Down Expand Up @@ -273,7 +274,12 @@ func (r *MedusaRestoreJobReconciler) prepareRestore(ctx context.Context, request
}

for _, pod := range pods {
addr := net.JoinHostPort(pod.Status.PodIP, fmt.Sprint(shared.BackupSidecarPort))
medusaPort := shared.BackupSidecarPort
explicitPort, found := cassandra.FindContainerPort(ptr.To(pod), "medusa", "grpc")
if found {
medusaPort = explicitPort
}
addr := net.JoinHostPort(pod.Status.PodIP, fmt.Sprint(medusaPort))
if medusaClient, err := r.ClientFactory.NewClient(ctx, addr); err != nil {
logger.Error(err, "Failed to create Medusa client", "address", addr)
} else {
Expand Down
3 changes: 3 additions & 0 deletions controllers/medusa/medusarestorejob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ func testMedusaRestoreDatacenter(t *testing.T, ctx context.Context, f *framework
Name: cassandraUserSecret,
},
},
ServiceProperties: api.Service{
GrpcPort: 4567,
},
CassandraUserSecretRef: corev1.LocalObjectReference{
Name: cassandraUserSecret,
},
Expand Down
22 changes: 19 additions & 3 deletions controllers/medusa/medusatask_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package medusa
import (
"context"
"fmt"
"github.com/k8ssandra/k8ssandra-operator/pkg/cassandra"
"net"
"sync"

Expand Down Expand Up @@ -413,7 +414,12 @@ func (r *MedusaTaskReconciler) scheduleSyncForPurge(task *medusav1alpha1.MedusaT
}

func doPurge(ctx context.Context, task *medusav1alpha1.MedusaTask, pod *corev1.Pod, clientFactory medusa.ClientFactory) (*medusa.PurgeBackupsResponse, error) {
addr := net.JoinHostPort(pod.Status.PodIP, fmt.Sprint(shared.BackupSidecarPort))
medusaPort := shared.BackupSidecarPort
explicitPort, found := cassandra.FindContainerPort(pod, "medusa", "grpc")
if found {
medusaPort = explicitPort
}
addr := net.JoinHostPort(pod.Status.PodIP, fmt.Sprint(medusaPort))
if medusaClient, err := clientFactory.NewClient(ctx, addr); err != nil {
return nil, err
} else {
Expand All @@ -423,7 +429,12 @@ func doPurge(ctx context.Context, task *medusav1alpha1.MedusaTask, pod *corev1.P
}

func prepareRestore(ctx context.Context, task *medusav1alpha1.MedusaTask, pod *corev1.Pod, clientFactory medusa.ClientFactory) (*medusa.PurgeBackupsResponse, error) {
addr := net.JoinHostPort(pod.Status.PodIP, fmt.Sprint(shared.BackupSidecarPort))
medusaPort := shared.BackupSidecarPort
explicitPort, found := cassandra.FindContainerPort(pod, "medusa", "grpc")
if found {
medusaPort = explicitPort
}
addr := net.JoinHostPort(pod.Status.PodIP, fmt.Sprint(medusaPort))
if medusaClient, err := clientFactory.NewClient(ctx, addr); err != nil {
return nil, err
} else {
Expand All @@ -434,7 +445,12 @@ func prepareRestore(ctx context.Context, task *medusav1alpha1.MedusaTask, pod *c
}

func GetBackups(ctx context.Context, pod *corev1.Pod, clientFactory medusa.ClientFactory) ([]*medusa.BackupSummary, error) {
addr := net.JoinHostPort(pod.Status.PodIP, fmt.Sprint(shared.BackupSidecarPort))
medusaPort := shared.BackupSidecarPort
explicitPort, found := cassandra.FindContainerPort(pod, "medusa", "grpc")
if found {
medusaPort = explicitPort
}
addr := net.JoinHostPort(pod.Status.PodIP, fmt.Sprint(medusaPort))
if medusaClient, err := clientFactory.NewClient(ctx, addr); err != nil {
return nil, err
} else {
Expand Down
3 changes: 3 additions & 0 deletions controllers/medusa/medusatask_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ func testMedusaTasks(t *testing.T, ctx context.Context, f *framework.Framework,
},
MaxBackupCount: 1,
},
ServiceProperties: api.Service{
GrpcPort: 7890,
},
CassandraUserSecretRef: corev1.LocalObjectReference{
Name: cassandraUserSecret,
},
Expand Down
3 changes: 3 additions & 0 deletions docs/content/en/tasks/backup-restore/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ spec:
# accessModes:
# - ReadWriteOnce
# size: 100Mi
serviceProperties:
# which port will Medusa's gRPC server listen on
grpcPort: 50051
```
The definition above requires a secret named `medusa-bucket-key` to be present in the target namespace before the `K8ssandraCluster` object gets created. Use the following format for this secret:
Expand Down
27 changes: 27 additions & 0 deletions pkg/cassandra/datacenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,33 @@ func FindInitContainer(dcPodTemplateSpec *corev1.PodTemplateSpec, containerName
return -1, false
}

func FindPort(container *corev1.Container, portName string) (int32, bool) {
if container.Ports != nil {
for _, port := range container.Ports {
if port.Name == portName {
return port.ContainerPort, true
}
}
}
return -1, false
}
func FindContainerPort(pod *corev1.Pod, containerName, podName string) (int, bool) {
if pod.Spec.Containers != nil {
for _, container := range pod.Spec.Containers {
if container.Name == containerName {
if container.Ports != nil {
for _, port := range container.Ports {
if port.Name == podName {
return int(port.ContainerPort), true
}
}
}
}
}
}
return -1, false
}

func FindVolume(dcPodTemplateSpec *corev1.PodTemplateSpec, volumeName string) (int, bool) {
if dcPodTemplateSpec != nil {
for i, volume := range dcPodTemplateSpec.Spec.Volumes {
Expand Down
23 changes: 15 additions & 8 deletions pkg/medusa/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
const (
DefaultMedusaImageRepository = "k8ssandra"
DefaultMedusaImageName = "medusa"
DefaultMedusaVersion = "0.22.3"
DefaultMedusaVersion = "a1f7647-tmp"
DefaultMedusaPort = 50051
DefaultProbeInitialDelay = 10
DefaultProbeTimeout = 1
Expand Down Expand Up @@ -114,6 +114,9 @@ func CreateMedusaIni(kc *k8ss.K8ssandraCluster, dcConfig *cassandra.DatacenterCo
[grpc]
enabled = 1
{{- if .Spec.Medusa.ServiceProperties.GrpcPort }}
port = {{ .Spec.Medusa.ServiceProperties.GrpcPort }}
{{- end }}
[logging]
level = DEBUG
Expand Down Expand Up @@ -226,19 +229,23 @@ func CreateMedusaMainContainer(dcConfig *cassandra.DatacenterConfig, medusaSpec
setImage(medusaSpec.ContainerImage, medusaContainer)
medusaContainer.SecurityContext = medusaSpec.SecurityContext
medusaContainer.Env = medusaEnvVars(medusaSpec, k8cName, useExternalSecrets, "GRPC")
var grpcPort = DefaultMedusaPort
if medusaSpec.ServiceProperties.GrpcPort != 0 {
grpcPort = medusaSpec.ServiceProperties.GrpcPort
}
medusaContainer.Ports = []corev1.ContainerPort{
{
Name: "grpc",
ContainerPort: DefaultMedusaPort,
ContainerPort: int32(grpcPort),
Protocol: "TCP",
},
}

readinessProbe, err := generateMedusaProbe(medusaSpec.ReadinessProbe)
readinessProbe, err := generateMedusaProbe(medusaSpec.ReadinessProbe, grpcPort)
if err != nil {
return nil, err
}
livenessProbe, err := generateMedusaProbe(medusaSpec.LivenessProbe)
livenessProbe, err := generateMedusaProbe(medusaSpec.LivenessProbe, grpcPort)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -545,9 +552,9 @@ func PurgeCronJob(dcConfig *cassandra.DatacenterConfig, clusterName, namespace s
return purgeCronJob, nil
}

func generateMedusaProbe(configuredProbe *corev1.Probe) (*corev1.Probe, error) {
func generateMedusaProbe(configuredProbe *corev1.Probe, grpcPort int) (*corev1.Probe, error) {
// Goalesce the custom probe with the default probe,
defaultProbe := defaultMedusaProbe()
defaultProbe := defaultMedusaProbe(grpcPort)
if configuredProbe == nil {
return defaultProbe, nil
}
Expand All @@ -561,12 +568,12 @@ func generateMedusaProbe(configuredProbe *corev1.Probe) (*corev1.Probe, error) {
return &mergedProbe, nil
}

func defaultMedusaProbe() *corev1.Probe {
func defaultMedusaProbe(grpcPort int) *corev1.Probe {
// Goalesce the custom probe with the default probe,
probe := &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
Exec: &corev1.ExecAction{
Command: []string{"/bin/grpc_health_probe", fmt.Sprintf("--addr=:%d", DefaultMedusaPort)},
Command: []string{"/bin/grpc_health_probe", fmt.Sprintf("--addr=:%d", grpcPort)},
},
},
InitialDelaySeconds: DefaultProbeInitialDelay,
Expand Down
Loading

0 comments on commit a9b2d07

Please sign in to comment.