diff --git a/pkg/controller/zookeepercluster/zookeepercluster_controller.go b/pkg/controller/zookeepercluster/zookeepercluster_controller.go index 0d684e535..3609c042e 100644 --- a/pkg/controller/zookeepercluster/zookeepercluster_controller.go +++ b/pkg/controller/zookeepercluster/zookeepercluster_controller.go @@ -14,6 +14,7 @@ import ( "context" "fmt" "strconv" + "strings" "time" "github.com/operator-framework/operator-sdk/pkg/predicate" @@ -279,6 +280,8 @@ func (r *ReconcileZookeeperCluster) reconcileStatefulSet(instance *zookeeperv1be foundSTSSize := *foundSts.Spec.Replicas newSTSSize := *sts.Spec.Replicas if newSTSSize != foundSTSSize { + // If zookeeper is not running, it must stop update replicas. + // Until zookeeper is running and the client connect it successfully, decreasing Replicas will take effect. zkUri := utils.GetZkServiceUri(instance) err = r.zkClient.Connect(zkUri) if err != nil { @@ -295,7 +298,31 @@ func (r *ReconcileZookeeperCluster) reconcileStatefulSet(instance *zookeeperv1be data := "CLUSTER_SIZE=" + strconv.Itoa(int(newSTSSize)) r.log.Info("Updating Cluster Size.", "New Data:", data, "Version", version) - r.zkClient.UpdateNode(path, data, version) + err = r.zkClient.UpdateNode(path, data, version) + if err != nil { + return fmt.Errorf("Error updating cluster size %s: %v", path, err) + } + // #398 if decrease node, remove node immediately after updating node successfully. + if newSTSSize < foundSTSSize { + var removes []string + config, _, err := r.zkClient.GetConfig() + if err != nil { + return fmt.Errorf("Error GetConfig %v", err) + } + r.log.Info("Get zookeeper config.", "Config: ", config) + for myid := newSTSSize + 1; myid <= foundSTSSize; myid++ { + if strings.Contains(config, "server."+strconv.Itoa(int(myid))+"=") { + removes = append(removes, strconv.Itoa(int(myid))) + } + } + // The node that have been removed with reconfig also can still provide services for all online clients. + // So We can remove it firstly, it will avoid to error that client can't connect to server on preStop. + r.log.Info("Do reconfig to remove node.", "Remove ids", strings.Join(removes, ",")) + err = r.zkClient.IncReconfig(nil, removes, -1) + if err != nil { + return fmt.Errorf("Error reconfig remove id:%s, %v", strings.Join(removes, ","), err) + } + } } err = r.updateStatefulSet(instance, foundSts, sts) if err != nil { @@ -615,7 +642,8 @@ func (r *ReconcileZookeeperCluster) reconcileClusterStatus(instance *zookeeperv1 instance.Status.Members.Ready = readyMembers instance.Status.Members.Unready = unreadyMembers - //If Cluster is in a ready state... + // If Cluster is in a ready state... + // instance.Spec.Replicas is just an expected value that we set it, but it maybe not take effect by k8s. if instance.Spec.Replicas == instance.Status.ReadyReplicas && (!instance.Status.MetaRootCreated) { r.log.Info("Cluster is Ready, Creating ZK Metadata...") zkUri := utils.GetZkServiceUri(instance) @@ -635,7 +663,7 @@ func (r *ReconcileZookeeperCluster) reconcileClusterStatus(instance *zookeeperv1 r.log.Info("Updating zookeeper status", "StatefulSet.Namespace", instance.Namespace, "StatefulSet.Name", instance.Name) - if instance.Status.ReadyReplicas == instance.Spec.Replicas { + if instance.Status.ReadyReplicas == instance.Spec.Replicas && instance.Status.Replicas == instance.Spec.Replicas { instance.Status.SetPodsReadyConditionTrue() } else { instance.Status.SetPodsReadyConditionFalse() @@ -765,7 +793,9 @@ func (r *ReconcileZookeeperCluster) getPVCCount(instance *zookeeperv1beta1.Zooke func (r *ReconcileZookeeperCluster) cleanupOrphanPVCs(instance *zookeeperv1beta1.ZookeeperCluster) (err error) { // this check should make sure we do not delete the PVCs before the STS has scaled down - if instance.Status.ReadyReplicas == instance.Spec.Replicas { + // instance.Spec.Replicas is just an expected value that we set it, but it maybe not take effect by k8s if update state failly. + // So we should check that instance.Status.Replicas is equal to ReadyReplicas and Spec.Replicas, which means cluster already. + if instance.Status.ReadyReplicas == instance.Spec.Replicas && instance.Status.Replicas == instance.Spec.Replicas { pvcCount, err := r.getPVCCount(instance) if err != nil { return err diff --git a/pkg/controller/zookeepercluster/zookeepercluster_controller_test.go b/pkg/controller/zookeepercluster/zookeepercluster_controller_test.go index 048be79eb..3ec151320 100644 --- a/pkg/controller/zookeepercluster/zookeepercluster_controller_test.go +++ b/pkg/controller/zookeepercluster/zookeepercluster_controller_test.go @@ -12,6 +12,7 @@ package zookeepercluster import ( "context" + "fmt" "os" "testing" "time" @@ -60,10 +61,117 @@ func (client *MockZookeeperClient) NodeExists(zNodePath string) (version int32, return 0, nil } +func (client *MockZookeeperClient) IncReconfig(joining []string, leaving []string, version int64) (err error) { + return nil +} + +func (client *MockZookeeperClient) GetConfig() (config string, version int32, err error) { + return "server.1=xxx,server.2=xxxx,server.3=xxxx", 0, nil +} + func (client *MockZookeeperClient) Close() { return } +type TestZookeeperClient struct { + // dummy struct +} + +func (client *TestZookeeperClient) Connect(zkUri string) (err error) { + // do nothing + return nil +} + +func (client *TestZookeeperClient) CreateNode(zoo *v1beta1.ZookeeperCluster, zNodePath string) (err error) { + return nil +} + +func (client *TestZookeeperClient) UpdateNode(path string, data string, version int32) (err error) { + return fmt.Errorf("Error") +} + +func (client *TestZookeeperClient) NodeExists(zNodePath string) (version int32, err error) { + return 0, nil +} + +func (client *TestZookeeperClient) IncReconfig(joining []string, leaving []string, version int64) (err error) { + return fmt.Errorf("Error") +} + +func (client *TestZookeeperClient) GetConfig() (config string, version int32, err error) { + return "server.1=xxx,server.2=xxxx,server.3=xxxx", 0, fmt.Errorf("Error") +} + +func (client *TestZookeeperClient) Close() { + return +} + +type GetConfigFailZookeeperClient struct { + // dummy struct +} + +func (client *GetConfigFailZookeeperClient) Connect(zkUri string) (err error) { + // do nothing + return nil +} + +func (client *GetConfigFailZookeeperClient) CreateNode(zoo *v1beta1.ZookeeperCluster, zNodePath string) (err error) { + return nil +} + +func (client *GetConfigFailZookeeperClient) UpdateNode(path string, data string, version int32) (err error) { + return nil +} + +func (client *GetConfigFailZookeeperClient) NodeExists(zNodePath string) (version int32, err error) { + return 0, nil +} + +func (client *GetConfigFailZookeeperClient) IncReconfig(joining []string, leaving []string, version int64) (err error) { + return nil +} + +func (client *GetConfigFailZookeeperClient) GetConfig() (config string, version int32, err error) { + return "server.1=xxx,server.2=xxxx,server.3=xxxx", 0, fmt.Errorf("Error") +} + +func (client *GetConfigFailZookeeperClient) Close() { + return +} + +type IncReconfigFailZookeeperClient struct { + // dummy struct +} + +func (client *IncReconfigFailZookeeperClient) Connect(zkUri string) (err error) { + // do nothing + return nil +} + +func (client *IncReconfigFailZookeeperClient) CreateNode(zoo *v1beta1.ZookeeperCluster, zNodePath string) (err error) { + return nil +} + +func (client *IncReconfigFailZookeeperClient) UpdateNode(path string, data string, version int32) (err error) { + return nil +} + +func (client *IncReconfigFailZookeeperClient) NodeExists(zNodePath string) (version int32, err error) { + return 0, nil +} + +func (client *IncReconfigFailZookeeperClient) IncReconfig(joining []string, leaving []string, version int64) (err error) { + return fmt.Errorf("Error") +} + +func (client *IncReconfigFailZookeeperClient) GetConfig() (config string, version int32, err error) { + return "server.1=xxx,server.2=xxxx,server.3=xxxx", 0, nil +} + +func (client *IncReconfigFailZookeeperClient) Close() { + return +} + var _ = Describe("ZookeeperCluster Controller", func() { const ( Name = "example" @@ -71,9 +179,12 @@ var _ = Describe("ZookeeperCluster Controller", func() { ) var ( - s = scheme.Scheme - mockZkClient = new(MockZookeeperClient) - r *ReconcileZookeeperCluster + s = scheme.Scheme + mockZkClient = new(MockZookeeperClient) + testZkClient = new(TestZookeeperClient) + getConfigZkClient = new(GetConfigFailZookeeperClient) + incReconfigZkClient = new(IncReconfigFailZookeeperClient) + r *ReconcileZookeeperCluster ) Context("Reconcile", func() { @@ -223,6 +334,80 @@ var _ = Describe("ZookeeperCluster Controller", func() { }) }) + Context("With scale down Replicas", func() { + var ( + cl client.Client + err error + ) + + BeforeEach(func() { + z.WithDefaults() + z.Spec.Pod.ServiceAccountName = "zookeeper" + z.Status.Init() + next := z.DeepCopy() + st := zk.MakeStatefulSet(z) + next.Spec.Replicas = 1 + cl = fake.NewFakeClient([]runtime.Object{next, st}...) + r = &ReconcileZookeeperCluster{client: cl, scheme: s, zkClient: mockZkClient} + res, err = r.Reconcile(req) + }) + + It("should not raise an error", func() { + Ω(err).To(BeNil()) + }) + + It("should update the sts", func() { + foundSts := &appsv1.StatefulSet{} + err = cl.Get(context.TODO(), req.NamespacedName, foundSts) + Ω(err).To(BeNil()) + Ω(*foundSts.Spec.Replicas).To(BeEquivalentTo(1)) + }) + }) + + Context("With scale down Replicas but fail", func() { + var ( + cl client.Client + err error + count int + new_count int + ) + + BeforeEach(func() { + z.WithDefaults() + z.Spec.Pod.ServiceAccountName = "zookeeper" + z.Status.Init() + next := z.DeepCopy() + st := zk.MakeStatefulSet(z) + next.Spec.Replicas = 1 + cl = fake.NewFakeClient([]runtime.Object{next, st}...) + r = &ReconcileZookeeperCluster{client: cl, scheme: s, zkClient: testZkClient} + count, _ = r.getPVCCount(z) + res, err = r.Reconcile(req) + }) + + It("should raise an error in case of zookeeper not running", func() { + Ω(err).NotTo(BeNil()) + r = &ReconcileZookeeperCluster{client: cl, scheme: s, zkClient: getConfigZkClient} + _, err = r.Reconcile(req) + Ω(err).NotTo(BeNil()) + r = &ReconcileZookeeperCluster{client: cl, scheme: s, zkClient: incReconfigZkClient} + _, err = r.Reconcile(req) + Ω(err).NotTo(BeNil()) + }) + + It("should not update the sts in case of zookeeper not running", func() { + foundSts := &appsv1.StatefulSet{} + err = cl.Get(context.TODO(), req.NamespacedName, foundSts) + Ω(err).To(BeNil()) + Ω(*foundSts.Spec.Replicas).To(BeEquivalentTo(3)) + }) + It("should not delete pvc in case of zookeeper not running", func() { + new_count, err = r.getPVCCount(z) + Ω(err).To(BeNil()) + Ω(new_count).To(BeEquivalentTo(count)) + }) + }) + Context("With no update to sts", func() { var ( cl client.Client diff --git a/pkg/zk/zookeeper_client.go b/pkg/zk/zookeeper_client.go index 81692f023..da1eeea9e 100644 --- a/pkg/zk/zookeeper_client.go +++ b/pkg/zk/zookeeper_client.go @@ -24,6 +24,8 @@ type ZookeeperClient interface { CreateNode(*v1beta1.ZookeeperCluster, string) error NodeExists(string) (int32, error) UpdateNode(string, string, int32) error + IncReconfig([]string, []string, int64) error + GetConfig() (string, int32, error) Close() } @@ -31,6 +33,9 @@ type DefaultZookeeperClient struct { conn *zk.Conn } +// zookeeper configure path +const ZOO_CONFIG_PATH = "/zookeeper/config" + func (client *DefaultZookeeperClient) Connect(zkUri string) (err error) { host := []string{zkUri} conn, _, err := zk.Connect(host, time.Second*5) @@ -74,6 +79,21 @@ func (client *DefaultZookeeperClient) NodeExists(zNodePath string) (version int3 return zNodeStat.Version, err } +func (client *DefaultZookeeperClient) IncReconfig(joining []string, leaving []string, version int64) (err error) { + if _, err := client.conn.IncrementalReconfig(joining, leaving, version); err != nil { + return fmt.Errorf("Failed to reconfig node:%s, err:%v", strings.Join(leaving, ","), err) + } + return nil +} + +func (client *DefaultZookeeperClient) GetConfig() (config string, version int32, err error) { + data, stat, err := client.conn.Get(ZOO_CONFIG_PATH) + if err != nil { + return "", -1, fmt.Errorf("Get config %s error, err:%v", ZOO_CONFIG_PATH, err) + } + return string(data), stat.Version, nil +} + func (client *DefaultZookeeperClient) Close() { client.conn.Close() } diff --git a/pkg/zk/zookeeper_client_test.go b/pkg/zk/zookeeper_client_test.go index 334bc9a18..6e9b16c90 100644 --- a/pkg/zk/zookeeper_client_test.go +++ b/pkg/zk/zookeeper_client_test.go @@ -21,7 +21,7 @@ import ( var _ = Describe("Zookeeper Client", func() { Context("with a valid update of Service port", func() { - var err1, err2, err3, err4, err5 error + var err1, err2, err3, err4, err5, err6, err7 error BeforeEach(func() { z := &v1beta1.ZookeeperCluster{ ObjectMeta: metav1.ObjectMeta{ @@ -36,6 +36,8 @@ var _ = Describe("Zookeeper Client", func() { err5 = zkclient.CreateNode(z, "temp/tmp") err3 = zkclient.UpdateNode("temp/tem/temp", "dasd", 2) _, err4 = zkclient.NodeExists("temp") + _, _, err6 = zkclient.GetConfig() + err7 = zkclient.IncReconfig(nil, nil, -1) zkclient.Close() }) It("err1 should be nil", func() { @@ -53,5 +55,11 @@ var _ = Describe("Zookeeper Client", func() { It("err5 should be not nil", func() { Ω(err5).ShouldNot(BeNil()) }) + It("err6 should be not nil", func() { + Ω(err6).ShouldNot(BeNil()) + }) + It("err7 should be not nil", func() { + Ω(err7).ShouldNot(BeNil()) + }) }) })