Skip to content

Commit

Permalink
cadence draft impl
Browse files Browse the repository at this point in the history
  • Loading branch information
worryg0d committed Mar 7, 2024
1 parent 98c80a4 commit 30260ea
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 70 deletions.
62 changes: 62 additions & 0 deletions apis/clusters/v1beta1/cadence_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,15 @@ func (cs *CadenceSpec) FromInstAPI(instaModel *models.CadenceCluster) {
cs.ResizeSettings.FromInstAPI(instaModel.ResizeSettings)

cs.DCsFromInstAPI(instaModel.DataCentres)
cs.StandardProvisioningFromInstAPI(instaModel.StandardProvisioning)
cs.SharedProvisioningFromInstAPI(instaModel.SharedProvisioning)
cs.TargetPrimaryCadenceFromInstAPI(instaModel.TargetPrimaryCadence)

// TODO AWS ARCHIVAL

cs.UseHTTPAPI = instaModel.UseHTTPAPI
cs.UseCadenceWebAuth = instaModel.UseCadenceWebAuth
cs.PCICompliance = instaModel.PCIComplianceMode
}

func (cs *CadenceSpec) DCsFromInstAPI(instaModels []*models.CadenceDataCentre) {
Expand Down Expand Up @@ -590,3 +599,56 @@ func (c *CadenceSpec) CalculateNodeSize(cloudProvider, solution, app string) str
}
return ""
}

func (cs *CadenceSpec) StandardProvisioningFromInstAPI(instaModels []*models.CadenceStandardProvisioning) {
provisioning := make([]*StandardProvisioning, len(instaModels))
for i, instaModel := range instaModels {
p := &StandardProvisioning{}
p.FromInstAPI(instaModel)
provisioning[i] = p
}
cs.StandardProvisioning = provisioning
}

func (s *StandardProvisioning) FromInstAPI(instaModel *models.CadenceStandardProvisioning) {
s.AdvancedVisibility = make([]*AdvancedVisibility, len(instaModel.AdvancedVisibility))
for i, visibility := range instaModel.AdvancedVisibility {
vis := &AdvancedVisibility{
TargetKafka: &CadenceDependencyTarget{
DependencyCDCID: visibility.TargetKafka.DependencyCDCID,
DependencyVPCType: visibility.TargetKafka.DependencyVPCType,
},
TargetOpenSearch: &CadenceDependencyTarget{
DependencyCDCID: visibility.TargetOpenSearch.DependencyCDCID,
DependencyVPCType: visibility.TargetOpenSearch.DependencyVPCType,
},
}
s.AdvancedVisibility[i] = vis
}

if instaModel.TargetCassandra != nil {
s.TargetCassandra = &CadenceDependencyTarget{
DependencyCDCID: instaModel.TargetCassandra.DependencyCDCID,
DependencyVPCType: instaModel.TargetCassandra.DependencyVPCType,
}
}
}

func (cs *CadenceSpec) SharedProvisioningFromInstAPI(instaModels []*models.CadenceSharedProvisioning) {
cs.SharedProvisioning = make([]*SharedProvisioning, len(instaModels))
for i, instaModel := range instaModels {
cs.SharedProvisioning[i] = &SharedProvisioning{
UseAdvancedVisibility: instaModel.UseAdvancedVisibility,
}
}
}

func (cs *CadenceSpec) TargetPrimaryCadenceFromInstAPI(instaModels []*models.CadenceDependencyTarget) {
cs.TargetPrimaryCadence = make([]*CadenceDependencyTarget, len(instaModels))
for i, instaModel := range instaModels {
cs.TargetPrimaryCadence[i] = &CadenceDependencyTarget{
DependencyCDCID: instaModel.DependencyCDCID,
DependencyVPCType: instaModel.DependencyVPCType,
}
}
}
2 changes: 1 addition & 1 deletion config/samples/clusters_v1beta1_cadence.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ kind: Cadence
metadata:
name: cadence-sample
spec:
name: "username-cadence-test"
name: "bohdan-cadence-test"
version: "1.2.2"
# standardProvisioning:
# - targetCassandra:
Expand Down
173 changes: 104 additions & 69 deletions controllers/clusters/cadence_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package clusters

import (
"context"
"encoding/json"
"errors"
"fmt"

Expand Down Expand Up @@ -117,13 +118,108 @@ func (r *CadenceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
}
}

func (r *CadenceReconciler) createCadence(ctx context.Context, c *v1beta1.Cadence, l logr.Logger) (*models.CadenceCluster, error) {
l.Info(
"Creating Cadence cluster",
"cluster name", c.Spec.Name,
"data centres", c.Spec.DataCentres,
)

cadenceAPISpec, err := c.Spec.ToInstAPI(ctx, r.Client)
if err != nil {
return nil, fmt.Errorf("failed to convert k8s manifest to Instaclustr model, err: %w", err)
}

b, err := r.API.CreateClusterRaw(instaclustr.CadenceEndpoint, cadenceAPISpec)
if err != nil {
return nil, fmt.Errorf("failed to create cluster, err: %w", err)
}

instaModel := &models.CadenceCluster{}
err = json.Unmarshal(b, instaModel)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal body to cadence model, err: %w", err)
}

r.EventRecorder.Eventf(c, models.Normal, models.Created,
"Cluster creation request is sent. Cluster ID: %s", instaModel.ID,
)

return instaModel, nil
}

func (r *CadenceReconciler) createCluster(ctx context.Context, c *v1beta1.Cadence, l logr.Logger) error {
if !c.Spec.Inherits() {
id, err := getClusterIDByName(r.API, models.CassandraAppType, c.Spec.Name)
if err != nil {
return err
}

if id != "" && c.Spec.Inherits() {
l.Info("Cluster with provided name already exists", "name", c.Spec.Name, "clusterID", id)
return fmt.Errorf("cluster %s already exists, please change name property", c.Spec.Name)
}
}

var instaModel *models.CadenceCluster
var err error

switch {
case c.Spec.Inherits():
l.Info("Inheriting from the cluster", "clusterID", c.Spec.InheritsFrom)
instaModel, err = r.API.GetCadence(c.Spec.InheritsFrom)
default:
instaModel, err = r.createCadence(ctx, c, l)
}
if err != nil {
return err
}

patch := c.NewPatch()

c.Spec.FromInstAPI(instaModel)
c.Annotations[models.ResourceStateAnnotation] = models.SyncingEvent
err = r.Patch(ctx, c, patch)
if err != nil {
return fmt.Errorf("failed to patch resource spec, err: %w", err)
}

// TODO check on creation req
//if c.Spec.Description != "" {
// err = r.API.UpdateDescriptionAndTwoFactorDelete(instaclustr.ClustersEndpointV1, id, c.Spec.Description, nil)
// if err != nil {
// l.Error(err, "Cannot update Cadence cluster description and TwoFactorDelete",
// "cluster name", c.Spec.Name,
// "description", c.Spec.Description,
// "twoFactorDelete", c.Spec.TwoFactorDelete,
// )
//
// r.EventRecorder.Eventf(c, models.Warning, models.CreationFailed,
// "Cluster description and TwoFactoDelete update is failed. Reason: %v", err)
// }
//}

c.Status.FromInstAPI(instaModel)
err = r.Status().Patch(ctx, c, patch)
if err != nil {
return fmt.Errorf("failed to patch resource status, err: %w", err)
}

l.Info(
"Cadence resource has been created",
"cluster name", c.Name,
"clusterID", c.Status.ID,
)

return nil
}

func (r *CadenceReconciler) handleCreateCluster(
ctx context.Context,
c *v1beta1.Cadence,
l logr.Logger,
) (ctrl.Result, error) {
if c.Status.ID == "" {
patch := c.NewPatch()
for _, pp := range c.Spec.PackagedProvisioning {
requeueNeeded, err := r.reconcilePackagedProvisioning(ctx, c, pp.UseAdvancedVisibility)
if err != nil {
Expand All @@ -148,67 +244,20 @@ func (r *CadenceReconciler) handleCreateCluster(
}
}

l.Info(
"Creating Cadence cluster",
"cluster name", c.Spec.Name,
"data centres", c.Spec.DataCentres,
)

cadenceAPISpec, err := c.Spec.ToInstAPI(ctx, r.Client)
if err != nil {
l.Error(err, "Cannot convert Cadence cluster manifest to API spec",
"cluster manifest", c.Spec)

r.EventRecorder.Eventf(c, models.Warning, models.ConversionFailed,
"Cluster convertion from the Instaclustr API to k8s resource is failed. Reason: %v", err)

return ctrl.Result{}, err
}

id, err := r.API.CreateCluster(instaclustr.CadenceEndpoint, cadenceAPISpec)
err := r.createCluster(ctx, c, l)
if err != nil {
l.Error(
err, "Cannot create Cadence cluster",
"c manifest", c.Spec,
)
r.EventRecorder.Eventf(c, models.Warning, models.CreationFailed,
"Cluster creation on the Instaclustr is failed. Reason: %v", err)

return ctrl.Result{}, err
}

c.Status.ID = id
err = r.Status().Patch(ctx, c, patch)
if err != nil {
l.Error(err, "Cannot update Cadence cluster status",
"cluster name", c.Spec.Name,
"cluster status", c.Status,
"Failed to create cluster. Reason: %v", err,
)

r.EventRecorder.Eventf(c, models.Warning, models.PatchFailed,
"Cluster resource status patch is failed. Reason: %v", err)

return ctrl.Result{}, err
}
}

if c.Spec.Description != "" {
err = r.API.UpdateDescriptionAndTwoFactorDelete(instaclustr.ClustersEndpointV1, id, c.Spec.Description, nil)
if err != nil {
l.Error(err, "Cannot update Cadence cluster description and TwoFactorDelete",
"cluster name", c.Spec.Name,
"description", c.Spec.Description,
"twoFactorDelete", c.Spec.TwoFactorDelete,
)

r.EventRecorder.Eventf(c, models.Warning, models.CreationFailed,
"Cluster description and TwoFactoDelete update is failed. Reason: %v", err)
}
}

if c.Status.State != models.DeletedStatus {
patch := c.NewPatch()
c.Annotations[models.ResourceStateAnnotation] = models.CreatedEvent
controllerutil.AddFinalizer(c, models.DeletionFinalizer)

err = r.Patch(ctx, c, patch)
err := r.Patch(ctx, c, patch)
if err != nil {
l.Error(err, "Cannot patch Cadence cluster",
"cluster name", c.Spec.Name, "patch", patch)
Expand All @@ -219,21 +268,7 @@ func (r *CadenceReconciler) handleCreateCluster(
return ctrl.Result{}, err
}

l.Info(
"Cadence resource has been created",
"cluster name", c.Name,
"cluster ID", c.Status.ID,
"kind", c.Kind,
"api version", c.APIVersion,
"namespace", c.Namespace,
)

r.EventRecorder.Eventf(c, models.Normal, models.Created,
"Cluster creation request is sent. Cluster ID: %s", id)
}

if c.Status.State != models.DeletedStatus {
err := r.startSyncJob(c)
err = r.startSyncJob(c)
if err != nil {
l.Error(err, "Cannot start cluster status job",
"c cluster ID", c.Status.ID,
Expand Down

0 comments on commit 30260ea

Please sign in to comment.