Skip to content

Commit

Permalink
issue-131, expose services creating to resolve public IP addresses we…
Browse files Browse the repository at this point in the history
…re implemented
  • Loading branch information
DoodgeMatvey authored and taaraora committed Apr 26, 2023
1 parent d5fe8d7 commit 6e8a007
Show file tree
Hide file tree
Showing 20 changed files with 437 additions and 2 deletions.
24 changes: 24 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,30 @@ rules:
- patch
- update
- watch
- apiGroups:
- '*'
resources:
- endpoints
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- '*'
resources:
- services
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- clusterresources.instaclustr.com
resources:
Expand Down
32 changes: 31 additions & 1 deletion controllers/clusters/cadence_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"

clustersv1alpha1 "github.com/instaclustr/operator/apis/clusters/v1alpha1"
"github.com/instaclustr/operator/pkg/exposeservice"
"github.com/instaclustr/operator/pkg/instaclustr"
"github.com/instaclustr/operator/pkg/models"
"github.com/instaclustr/operator/pkg/scheduler"
Expand Down Expand Up @@ -484,6 +485,16 @@ func (r *CadenceReconciler) HandleDeleteCluster(
return models.ReconcileRequeue
}

err = exposeservice.Delete(r.Client, cadence.Name, cadence.Namespace)
if err != nil {
logger.Error(err, "Cannot delete Cadence cluster expose service",
"cluster ID", cadence.Status.ID,
"cluster name", cadence.Spec.Name,
)

return models.ReconcileRequeue
}

logger.Info("Cadence cluster was deleted",
"cluster name", cadence.Spec.Name,
"cluster ID", cadence.Status.ID,
Expand Down Expand Up @@ -790,6 +801,8 @@ func (r *CadenceReconciler) newWatchStatusJob(cadence *clustersv1alpha1.Cadence)
"old status", cadence.Status.ClusterStatus,
)

areDCsEqual := areDataCentresEqual(iCadence.Status.ClusterStatus.DataCentres, cadence.Status.ClusterStatus.DataCentres)

patch := cadence.NewPatch()
cadence.Status.ClusterStatus = iCadence.Status.ClusterStatus
err = r.Status().Patch(context.Background(), cadence, patch)
Expand All @@ -800,6 +813,23 @@ func (r *CadenceReconciler) newWatchStatusJob(cadence *clustersv1alpha1.Cadence)
)
return err
}

if !areDCsEqual {
var nodes []*clustersv1alpha1.Node

for _, dc := range iCadence.Status.ClusterStatus.DataCentres {
nodes = append(nodes, dc.Nodes...)
}

err = exposeservice.Create(r.Client,
cadence.Name,
cadence.Namespace,
nodes,
models.CadenceConnectionPort)
if err != nil {
return err
}
}
}

if iCadence.Status.CurrentClusterOperationStatus == models.NoOperation &&
Expand Down Expand Up @@ -1009,7 +1039,7 @@ func (r *CadenceReconciler) newOpenSearchSpec(cadence *clustersv1alpha1.Cadence)
spec := clustersv1alpha1.OpenSearchSpec{
Cluster: clustersv1alpha1.Cluster{
Name: models.OpenSearchChildPrefix + cadence.Name,
Version: models.OpensearchV1_3_7,
Version: models.OpenSearchV1_3_7,
SLATier: slaTier,
PrivateNetworkCluster: privateClusterNetwork,
TwoFactorDelete: twoFactorDelete,
Expand Down
32 changes: 32 additions & 0 deletions controllers/clusters/cassandra_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (

clusterresourcesv1alpha1 "github.com/instaclustr/operator/apis/clusterresources/v1alpha1"
clustersv1alpha1 "github.com/instaclustr/operator/apis/clusters/v1alpha1"
"github.com/instaclustr/operator/pkg/exposeservice"
"github.com/instaclustr/operator/pkg/instaclustr"
"github.com/instaclustr/operator/pkg/models"
"github.com/instaclustr/operator/pkg/scheduler"
Expand All @@ -59,6 +60,8 @@ type CassandraReconciler struct {
//+kubebuilder:rbac:groups=clusters.instaclustr.com,resources=cassandras/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=clusters.instaclustr.com,resources=cassandras/finalizers,verbs=update
//+kubebuilder:rbac:groups="",resources=events,verbs=create;patch
//+kubebuilder:rbac:groups=*,resources=services,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=*,resources=endpoints,verbs=get;list;watch;create;update;patch;delete

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
Expand Down Expand Up @@ -518,6 +521,16 @@ func (r *CassandraReconciler) handleDeleteCluster(
return models.ReconcileRequeue
}

err = exposeservice.Delete(r.Client, cassandra.Name, cassandra.Namespace)
if err != nil {
l.Error(err, "Cannot delete Cassandra cluster expose service",
"cluster ID", cassandra.Status.ID,
"cluster name", cassandra.Spec.Name,
)

return models.ReconcileRequeue
}

l.Info("Cluster has been deleted",
"cluster name", cassandra.Spec.Name,
"cluster ID", cassandra.Status.ID,
Expand Down Expand Up @@ -632,12 +645,31 @@ func (r *CassandraReconciler) newWatchStatusJob(cassandra *clustersv1alpha1.Cass
"status from Instaclustr", iCassandra.Status.ClusterStatus,
"status from k8s", cassandra.Status.ClusterStatus)

areDCsEqual := areDataCentresEqual(iCassandra.Status.ClusterStatus.DataCentres, cassandra.Status.ClusterStatus.DataCentres)

patch := cassandra.NewPatch()
cassandra.Status.ClusterStatus = iCassandra.Status.ClusterStatus
err = r.Status().Patch(context.Background(), cassandra, patch)
if err != nil {
return err
}

if !areDCsEqual {
var nodes []*clustersv1alpha1.Node

for _, dc := range iCassandra.Status.ClusterStatus.DataCentres {
nodes = append(nodes, dc.Nodes...)
}

err = exposeservice.Create(r.Client,
cassandra.Name,
cassandra.Namespace,
nodes,
models.CassandraConnectionPort)
if err != nil {
return err
}
}
}

if iCassandra.Status.CurrentClusterOperationStatus == models.NoOperation &&
Expand Down
30 changes: 30 additions & 0 deletions controllers/clusters/kafka_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"

clustersv1alpha1 "github.com/instaclustr/operator/apis/clusters/v1alpha1"
"github.com/instaclustr/operator/pkg/exposeservice"
"github.com/instaclustr/operator/pkg/instaclustr"
"github.com/instaclustr/operator/pkg/models"
"github.com/instaclustr/operator/pkg/scheduler"
Expand Down Expand Up @@ -359,6 +360,16 @@ func (r *KafkaReconciler) handleDeleteCluster(ctx context.Context, kafka *cluste
return models.ReconcileRequeue
}

err = exposeservice.Delete(r.Client, kafka.Name, kafka.Namespace)
if err != nil {
l.Error(err, "Cannot delete Kafka cluster expose service",
"cluster ID", kafka.Status.ID,
"cluster name", kafka.Spec.Name,
)

return models.ReconcileRequeue
}

l.Info("Cluster was deleted",
"cluster name", kafka.Spec.Name,
"cluster ID", kafka.Status.ID)
Expand Down Expand Up @@ -422,6 +433,8 @@ func (r *KafkaReconciler) newWatchStatusJob(kafka *clustersv1alpha1.Kafka) sched
"instacluster status", iKafka.Status,
"k8s status", kafka.Status.ClusterStatus)

areDCsEqual := areDataCentresEqual(iKafka.Status.ClusterStatus.DataCentres, kafka.Status.ClusterStatus.DataCentres)

patch := kafka.NewPatch()
kafka.Status.ClusterStatus = iKafka.Status.ClusterStatus
err = r.Status().Patch(context.Background(), kafka, patch)
Expand All @@ -430,6 +443,23 @@ func (r *KafkaReconciler) newWatchStatusJob(kafka *clustersv1alpha1.Kafka) sched
"cluster name", kafka.Spec.Name, "cluster state", kafka.Status.State)
return err
}

if !areDCsEqual {
var nodes []*clustersv1alpha1.Node

for _, dc := range iKafka.Status.ClusterStatus.DataCentres {
nodes = append(nodes, dc.Nodes...)
}

err = exposeservice.Create(r.Client,
kafka.Name,
kafka.Namespace,
nodes,
models.KafkaConnectionPort)
if err != nil {
return err
}
}
}

if iKafka.Status.CurrentClusterOperationStatus == models.NoOperation &&
Expand Down
30 changes: 30 additions & 0 deletions controllers/clusters/kafkaconnect_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"

clustersv1alpha1 "github.com/instaclustr/operator/apis/clusters/v1alpha1"
"github.com/instaclustr/operator/pkg/exposeservice"
"github.com/instaclustr/operator/pkg/instaclustr"
"github.com/instaclustr/operator/pkg/models"
"github.com/instaclustr/operator/pkg/scheduler"
Expand Down Expand Up @@ -352,6 +353,16 @@ func (r *KafkaConnectReconciler) handleDeleteCluster(ctx context.Context, kc *cl
return models.ReconcileRequeue
}

err = exposeservice.Delete(r.Client, kc.Name, kc.Namespace)
if err != nil {
l.Error(err, "Cannot delete Kafka Connect cluster expose service",
"cluster ID", kc.Status.ID,
"cluster name", kc.Spec.Name,
)

return models.ReconcileRequeue
}

l.Info("Kafka Connect cluster was deleted",
"cluster name", kc.Spec.Name,
"cluster ID", kc.Status.ID)
Expand Down Expand Up @@ -453,6 +464,8 @@ func (r *KafkaConnectReconciler) newWatchStatusJob(kc *clustersv1alpha1.KafkaCon
"instaclustr status", iKC.Status,
"status", kc.Status.ClusterStatus)

areDCsEqual := areDataCentresEqual(iKC.Status.ClusterStatus.DataCentres, kc.Status.ClusterStatus.DataCentres)

patch := kc.NewPatch()
kc.Status.ClusterStatus = iKC.Status.ClusterStatus
err = r.Status().Patch(context.Background(), kc, patch)
Expand All @@ -461,6 +474,23 @@ func (r *KafkaConnectReconciler) newWatchStatusJob(kc *clustersv1alpha1.KafkaCon
"cluster name", kc.Spec.Name, "cluster state", kc.Status.State)
return err
}

if !areDCsEqual {
var nodes []*clustersv1alpha1.Node

for _, dc := range iKC.Status.ClusterStatus.DataCentres {
nodes = append(nodes, dc.Nodes...)
}

err = exposeservice.Create(r.Client,
kc.Name,
kc.Namespace,
nodes,
models.KafkaConnectConnectionPort)
if err != nil {
return err
}
}
}

if iKC.Status.CurrentClusterOperationStatus == models.NoOperation &&
Expand Down
30 changes: 30 additions & 0 deletions controllers/clusters/opensearch_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (

clusterresourcesv1alpha1 "github.com/instaclustr/operator/apis/clusterresources/v1alpha1"
clustersv1alpha1 "github.com/instaclustr/operator/apis/clusters/v1alpha1"
"github.com/instaclustr/operator/pkg/exposeservice"
"github.com/instaclustr/operator/pkg/instaclustr"
"github.com/instaclustr/operator/pkg/models"
"github.com/instaclustr/operator/pkg/scheduler"
Expand Down Expand Up @@ -512,6 +513,16 @@ func (r *OpenSearchReconciler) HandleDeleteCluster(
return models.ReconcileRequeue
}

err = exposeservice.Delete(r.Client, o.Name, o.Namespace)
if err != nil {
logger.Error(err, "Cannot delete OpenSearch cluster expose service",
"cluster ID", o.Status.ID,
"cluster name", o.Spec.Name,
)

return models.ReconcileRequeue
}

logger.Info("OpenSearch cluster was deleted",
"cluster name", o.Spec.Name,
"cluster ID", o.Status.ID,
Expand Down Expand Up @@ -628,6 +639,8 @@ func (r *OpenSearchReconciler) newWatchStatusJob(o *clustersv1alpha1.OpenSearch)
"old status", o.Status.ClusterStatus,
)

areDCsEqual := areDataCentresEqual(iO.Status.ClusterStatus.DataCentres, o.Status.ClusterStatus.DataCentres)

patch := o.NewPatch()
o.Status.ClusterStatus = iO.Status.ClusterStatus
err = r.Status().Patch(context.Background(), o, patch)
Expand All @@ -639,6 +652,23 @@ func (r *OpenSearchReconciler) newWatchStatusJob(o *clustersv1alpha1.OpenSearch)

return err
}

if !areDCsEqual {
var nodes []*clustersv1alpha1.Node

for _, dc := range iO.Status.ClusterStatus.DataCentres {
nodes = append(nodes, dc.Nodes...)
}

err = exposeservice.Create(r.Client,
o.Name,
o.Namespace,
nodes,
models.OpenSearchConnectionPort)
if err != nil {
return err
}
}
}

maintEvents, err := r.API.GetMaintenanceEvents(o.Status.ID)
Expand Down
30 changes: 30 additions & 0 deletions controllers/clusters/postgresql_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (

clusterresourcesv1alpha1 "github.com/instaclustr/operator/apis/clusterresources/v1alpha1"
clustersv1alpha1 "github.com/instaclustr/operator/apis/clusters/v1alpha1"
"github.com/instaclustr/operator/pkg/exposeservice"
"github.com/instaclustr/operator/pkg/instaclustr"
"github.com/instaclustr/operator/pkg/models"
"github.com/instaclustr/operator/pkg/scheduler"
Expand Down Expand Up @@ -681,6 +682,16 @@ func (r *PostgreSQLReconciler) HandleDeleteCluster(
return models.ReconcileRequeue
}

err = exposeservice.Delete(r.Client, pg.Name, pg.Namespace)
if err != nil {
logger.Error(err, "Cannot delete PostgreSQL cluster expose service",
"cluster ID", pg.Status.ID,
"cluster name", pg.Spec.Name,
)

return models.ReconcileRequeue
}

logger.Info("PostgreSQL cluster was deleted",
"cluster name", pg.Spec.Name,
"cluster ID", pg.Status.ID,
Expand Down Expand Up @@ -832,6 +843,8 @@ func (r *PostgreSQLReconciler) newWatchStatusJob(pg *clustersv1alpha1.PostgreSQL
"old cluster status", pg.Status,
)

areDCsEqual := areDataCentresEqual(iPg.Status.ClusterStatus.DataCentres, pg.Status.ClusterStatus.DataCentres)

patch := pg.NewPatch()
pg.Status.ClusterStatus = iPg.Status.ClusterStatus
err = r.Status().Patch(context.Background(), pg, patch)
Expand All @@ -843,6 +856,23 @@ func (r *PostgreSQLReconciler) newWatchStatusJob(pg *clustersv1alpha1.PostgreSQL
)
return err
}

if !areDCsEqual {
var nodes []*clustersv1alpha1.Node

for _, dc := range iPg.Status.ClusterStatus.DataCentres {
nodes = append(nodes, dc.Nodes...)
}

err = exposeservice.Create(r.Client,
pg.Name,
pg.Namespace,
nodes,
models.PgConnectionPort)
if err != nil {
return err
}
}
}

maintEvents, err := r.API.GetMaintenanceEvents(pg.Status.ID)
Expand Down
Loading

0 comments on commit 6e8a007

Please sign in to comment.