Skip to content

Commit

Permalink
Merge pull request #7 from beclab/feat/kvrocks
Browse files Browse the repository at this point in the history
Feat/kvrocks
  • Loading branch information
aby913 authored Jun 13, 2024
2 parents caa2d11 + 3e75f58 commit 7b066f2
Show file tree
Hide file tree
Showing 83 changed files with 5,169 additions and 782 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@

# Go workspace file
go.work
bin
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ $(LOCALBIN):

## Tool Binaries
CONTROLLER_GEN ?= $(LOCALBIN)/controller-gen
CONTROLLER_TOOLS_VERSION ?= v0.9.2
CONTROLLER_TOOLS_VERSION ?= v0.14.0


.PHONY: build-uploader run-uploader build-vault run-vault fmt vet
Expand Down
10 changes: 10 additions & 0 deletions cmd/middleware/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ import (

"bytetrade.io/web3os/tapr/cmd/middleware/app"
"bytetrade.io/web3os/tapr/cmd/middleware/operator/backup"
kvrocksbakcup "bytetrade.io/web3os/tapr/cmd/middleware/operator/kvrocks-bakcup"
kvrocksrestore "bytetrade.io/web3os/tapr/cmd/middleware/operator/kvrocks-restore"
middlewarerequest "bytetrade.io/web3os/tapr/cmd/middleware/operator/middleware-request"
"bytetrade.io/web3os/tapr/cmd/middleware/operator/pgcluster"
pgclusterbackup "bytetrade.io/web3os/tapr/cmd/middleware/operator/pgcluster-backup"
pgclusterrestore "bytetrade.io/web3os/tapr/cmd/middleware/operator/pgcluster-restore"
"bytetrade.io/web3os/tapr/cmd/middleware/operator/redixcluster"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -44,13 +47,20 @@ func main() {
pgBackupController := pgclusterbackup.NewController(config, apiCtx, pgclusterLister)
pgRestoreController := pgclusterrestore.NewController(config, apiCtx, pgclusterLister)

redixClusterController := redixcluster.NewController(config, apiCtx, func(cluster *aprv1.RedixCluster) {})
kvrocksBackupController := kvrocksbakcup.NewController(config, apiCtx)
kvrocksRestoreController := kvrocksrestore.NewController(config, apiCtx)

backupWatcher := backup.NewWatcher(config, apiCtx)

runControllers := func() {
go func() { utilruntime.Must(pgclusterController.Run(1)) }()
go func() { utilruntime.Must(requestController.Run(1)) }()
go func() { utilruntime.Must(pgBackupController.Run(1)) }()
go func() { utilruntime.Must(pgRestoreController.Run(1)) }()
go func() { utilruntime.Must(redixClusterController.Run(1)) }()
go func() { utilruntime.Must(kvrocksBackupController.Run(1)) }()
go func() { utilruntime.Must(kvrocksRestoreController.Run(1)) }()
go func() { backupWatcher.Start() }()
}

Expand Down
105 changes: 105 additions & 0 deletions cmd/middleware/operator/backup/backup_kvrocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package backup

import (
"context"
"time"

"bytetrade.io/web3os/tapr/pkg/apis/apr/v1alpha1"
"bytetrade.io/web3os/tapr/pkg/workload/kvrocks"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
)

func (w *Watcher) backupRedix() error {
clusters, err := w.aprClientSet.AprV1alpha1().RedixClusters("").List(w.ctx, metav1.ListOptions{})
if err != nil {
klog.Error("list redix clusters error, ", err)
return err
}

klog.Info("start to backup all users' redix clusters, of ", len(clusters.Items))
for _, cluster := range clusters.Items {
klog.Info("create crd to backup redix cluster, ", cluster.Name, ", ", cluster.Namespace, ", ", cluster.Spec.Type)
switch cluster.Spec.Type {
case v1alpha1.KVRocks:
backup := kvrocks.KVRocksBackup.DeepCopy()
backup.Namespace = cluster.Namespace
backup.Spec.ClusterName = cluster.Name

err = kvrocks.ForceCreateNewKVRocksBackup(w.ctx, w.aprClientSet, backup)
if err != nil {
return err
}

klog.Info("wait for kvrocks backup complete")
err = kvrocks.WaitForAllBackupComplete(w.ctx, w.aprClientSet)
if err != nil {
klog.Error("wait for kvrocks backup complete error, ", err)
}
case v1alpha1.RedisCluster:
err = w.backupRedis()
}
}

return err

}

func (w *Watcher) restoreRedix() error {
clusters, err := w.aprClientSet.AprV1alpha1().RedixClusters("").List(w.ctx, metav1.ListOptions{})
if err != nil {
klog.Error("list redix clusters error, ", err)
return err
}

klog.Info("start to restore all users' redix clusters, of ", len(clusters.Items))
for _, cluster := range clusters.Items {
klog.Info("create crd to restore redix cluster, ", cluster.Name, ", ", cluster.Namespace, ", ", cluster.Spec.Type)
switch cluster.Spec.Type {
case v1alpha1.KVRocks:
// wait for kvrocks sts deploy
err = wait.PollWithContext(w.ctx, time.Second, 30*time.Minute, func(ctx context.Context) (done bool, err error) {
_, err = w.k8sClientSet.AppsV1().
StatefulSets(cluster.Namespace).Get(w.ctx, cluster.Name, metav1.GetOptions{})

if err != nil {
if apierrors.IsNotFound(err) {
return false, nil
}

klog.Error("find kvrocks sts error, ", err)
return false, err
}

return true, nil
})

if err != nil {
return err
}

klog.Info("create kvrocks restore")
restore := kvrocks.KVRocksRestore.DeepCopy()
restore.Namespace = cluster.Namespace
restore.Spec.ClusterName = cluster.Name

err = kvrocks.ForceCreateNewKVRocksRestore(w.ctx, w.aprClientSet, restore)
if err != nil {
return err
}

klog.Info("wait for all kvrocks restore complete")
err = kvrocks.WaitForAllRestoreComplete(w.ctx, w.aprClientSet)
if err != nil {
klog.Error("wait for kvrocks restore complete error, ", err)
}

case v1alpha1.RedisCluster:
err = w.restoreRedis()
}
}

return err
}
4 changes: 2 additions & 2 deletions cmd/middleware/operator/backup/backup_mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@ func (w *Watcher) restoreMongo() error {
return err
}

if err = percona.WaitForInitializeComplete(w.ctx, *&w.dynamicClient, *&w.k8sClientSet); err != nil {
if err = percona.WaitForInitializeComplete(w.ctx, w.dynamicClient, w.k8sClientSet); err != nil {
klog.Error("mongo cluster initialize error, ", err)
return err
}

// It is possible for the MongoDB restoration process to occur repeatedly,
// leading to the MongoDB cluster service becoming abnormally unavailable.
ok, err := percona.CheckMongoRestoreStatus(w.ctx, *&w.dynamicClient)
ok, err := percona.CheckMongoRestoreStatus(w.ctx, w.dynamicClient)
if !ok && err != nil {
klog.Error("mongo restore status check, ", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/middleware/operator/backup/backup_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (w *Watcher) restoreRedis() error {
klog.Info("start to restore all users' redis clusters, of ", len(clusters))
for _, cluster := range clusters {
updateCluster := cluster
if &cluster.Status != nil && &cluster.Status.Restore != nil && cluster.Status.Restore.Backup != nil {
if cluster.Status.Restore.Backup != nil {
cluster.Status.Restore.Backup = nil
updateCluster, err = rediscluster.UpdataClusterStatus(w.ctx, w.dynamicClient, updateCluster)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/middleware/operator/backup/tools.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package backup

func (w *Watcher) getMiddlewareBackupPath(clusterNamespace string) (string, error) {
func (w *Watcher) getMiddlewareBackupPath(_ string) (string, error) {
backupPath := "/terminus/rootfs/"
backupPath += middleware_backup_path

Expand Down
4 changes: 2 additions & 2 deletions cmd/middleware/operator/backup/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (w *Watcher) backup() error {
}()
}

runBackup(w.backupRedis)
runBackup(w.backupRedix)
runBackup(w.backupMongo)
runBackup(w.backupPostgres)

Expand Down Expand Up @@ -273,7 +273,7 @@ func (w *Watcher) restore() error {
}

runRestore(w.restorePostgres)
runRestore(w.restoreRedis)
runRestore(w.restoreRedix)

wg.Wait()

Expand Down
Loading

0 comments on commit 7b066f2

Please sign in to comment.