Skip to content

Commit

Permalink
Improve kluster to wait for DO cluster and gen events
Browse files Browse the repository at this point in the history
  • Loading branch information
viveksinghggits committed Sep 5, 2021
1 parent 35bb53b commit 5bebcb2
Show file tree
Hide file tree
Showing 7 changed files with 1,280 additions and 4 deletions.
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ module github.com/viveksinghggits/kluster

go 1.13

replace github.com/graymeta/stow => github.com/graymeta/stow v0.1.0

require (
github.com/digitalocean/godo v1.65.0
github.com/imdario/mergo v0.3.12 // indirect
github.com/kanisterio/kanister v0.0.0-20210903215800-f8e63bf1364d
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect
k8s.io/api v0.21.3
k8s.io/apimachinery v0.21.3
k8s.io/client-go v0.21.3
k8s.io/klog v1.0.0 // indirect
k8s.io/utils v0.0.0-20210722164352-7f3ee0f31471 // indirect
)
1,165 changes: 1,165 additions & 0 deletions go.sum

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions manifests/klusterfive.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
apiVersion: viveksingh.dev/v1alpha1
kind: Kluster
metadata:
name: kluster-4
spec:
name: kluster-4
region: "nyc1"
version: "1.21.2-do.2"
tokenSecret: "default/dosecret"
nodePools:
- count: 3
name: "dummy-nodepool"
size: "s-2vcpu-2gb"
13 changes: 13 additions & 0 deletions manifests/klusterfour.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
apiVersion: viveksingh.dev/v1alpha1
kind: Kluster
metadata:
name: kluster-3
spec:
name: kluster-3
region: "nyc1"
version: "1.21.2-do.2"
tokenSecret: "default/dosecret"
nodePools:
- count: 3
name: "dummy-nodepool"
size: "s-2vcpu-2gb"
13 changes: 13 additions & 0 deletions manifests/klusterthree.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
apiVersion: viveksingh.dev/v1alpha1
kind: Kluster
metadata:
name: kluster-2
spec:
name: kluster-2
region: "nyc1"
version: "1.21.2-do.2"
tokenSecret: "default/dosecret"
nodePools:
- count: 3
name: "dummy-nodepool"
size: "s-2vcpu-2gb"
63 changes: 60 additions & 3 deletions pkg/controller/kluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,21 @@ import (

"github.com/viveksinghggits/kluster/pkg/apis/viveksingh.dev/v1alpha1"
klientset "github.com/viveksinghggits/kluster/pkg/client/clientset/versioned"
customscheme "github.com/viveksinghggits/kluster/pkg/client/clientset/versioned/scheme"
kinf "github.com/viveksinghggits/kluster/pkg/client/informers/externalversions/viveksingh.dev/v1alpha1"
klister "github.com/viveksinghggits/kluster/pkg/client/listers/viveksingh.dev/v1alpha1"
"github.com/viveksinghggits/kluster/pkg/do"

"github.com/kanisterio/kanister/pkg/poll"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
)

Expand All @@ -29,15 +36,27 @@ type Controller struct {
kLister klister.KlusterLister
// queue
wq workqueue.RateLimitingInterface

recorder record.EventRecorder
}

func NewController(client kubernetes.Interface, klient klientset.Interface, klusterInformer kinf.KlusterInformer) *Controller {
runtime.Must(customscheme.AddToScheme(scheme.Scheme))

eveBroadCaster := record.NewBroadcaster()
eveBroadCaster.StartStructuredLogging(0)
eveBroadCaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{
Interface: client.CoreV1().Events(""),
})
recorder := eveBroadCaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "Kluster"})

c := &Controller{
client: client,
klient: klient,
klusterSynced: klusterInformer.Informer().HasSynced,
kLister: klusterInformer.Lister(),
wq: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "kluster"),
recorder: recorder,
}

klusterInformer.Informer().AddEventHandler(
Expand Down Expand Up @@ -99,20 +118,58 @@ func (c *Controller) processNextItem() bool {
// do something
log.Printf("errro %s, creating the cluster", err.Error())
}

c.recorder.Event(kluster, corev1.EventTypeNormal, "ClusterCreation", "DO API was called to create the cluster")

log.Printf("cluster id that we have is %s\n", clusterID)

err = c.updateStatus(clusterID, "creating", kluster)
if err != nil {
log.Printf("error %s, updating status of the kluster %s\n", err.Error(), kluster.Name)
}

// query DO API to make sure clsuter' state is running
err = c.waitForCluster(kluster.Spec, clusterID)
if err != nil {
log.Printf("error %s, waiting for cluster to be running", err.Error())
}

err = c.updateStatus(clusterID, "running", kluster)
if err != nil {
log.Printf("error %s updaring cluster status after waiting for cluster", err.Error())
}

c.recorder.Event(kluster, corev1.EventTypeNormal, "ClusterCreationCompleted", "DO Cluster creation was completed")
return true
}

func (c *Controller) waitForCluster(spec v1alpha1.KlusterSpec, clusterID string) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()

return poll.Wait(ctx, func(ctx context.Context) (bool, error) {
state, err := do.ClusterState(c.client, spec, clusterID)
if err != nil {
return false, err
}
if state == "running" {
return true, nil
}

return false, nil
})
}

func (c *Controller) updateStatus(id, progress string, kluster *v1alpha1.Kluster) error {
kluster.Status.KlusterID = id
kluster.Status.Progress = progress
_, err := c.klient.ViveksinghV1alpha1().Klusters(kluster.Namespace).UpdateStatus(context.Background(), kluster, metav1.UpdateOptions{})
// get the latest version of kluster
k, err := c.klient.ViveksinghV1alpha1().Klusters(kluster.Namespace).Get(context.Background(), kluster.Name, metav1.GetOptions{})
if err != nil {
return err
}

k.Status.KlusterID = id
k.Status.Progress = progress
_, err = c.klient.ViveksinghV1alpha1().Klusters(kluster.Namespace).UpdateStatus(context.Background(), k, metav1.UpdateOptions{})
return err
}

Expand Down
12 changes: 12 additions & 0 deletions pkg/do/do.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,18 @@ func Create(c kubernetes.Interface, spec v1alpha1.KlusterSpec) (string, error) {
return cluster.ID, nil
}

func ClusterState(c kubernetes.Interface, spec v1alpha1.KlusterSpec, id string) (string, error) {
token, err := getToken(c, spec.TokenSecret)
if err != nil {
return "", err
}

client := godo.NewFromToken(token)

cluster, _, err := client.Kubernetes.Get(context.Background(), id)
return string(cluster.Status.State), err
}

func getToken(client kubernetes.Interface, sec string) (string, error) {
namespace := strings.Split(sec, "/")[0]
name := strings.Split(sec, "/")[1]
Expand Down

0 comments on commit 5bebcb2

Please sign in to comment.