Skip to content

Commit

Permalink
Sync from server repo (84c91ff45f4)
Browse files Browse the repository at this point in the history
  • Loading branch information
releng committed Oct 17, 2024
1 parent caac7f6 commit 025a4ce
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 59 deletions.
88 changes: 66 additions & 22 deletions vclusterops/https_check_subcluster_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,19 @@ type httpsCheckSubclusterOp struct {
scName string
isSecondary bool
ctlSetSize int
cmdType CmdType
}

func makeHTTPSCheckSubclusterOp(useHTTPPassword bool, userName string, httpsPassword *string,
scName string, isPrimary bool, ctlSetSize int) (httpsCheckSubclusterOp, error) {
func makeHTTPSGetSubclusterInfoOp(useHTTPPassword bool, userName string, httpsPassword *string,
scName string, cmdType CmdType) (httpsCheckSubclusterOp, error) {
op := httpsCheckSubclusterOp{}
op.name = "HTTPSCheckSubclusterOp"
op.description = "Collect information for the specified subcluster"
op.scName = scName
op.isSecondary = !isPrimary
op.ctlSetSize = ctlSetSize
op.cmdType = cmdType

op.useHTTPPassword = useHTTPPassword

if useHTTPPassword {
err := util.ValidateUsernameAndPassword(op.name, useHTTPPassword, userName)
if err != nil {
Expand All @@ -49,6 +50,16 @@ func makeHTTPSCheckSubclusterOp(useHTTPPassword bool, userName string, httpsPass
}
return op, nil
}
func makeHTTPSCheckSubclusterOp(useHTTPPassword bool, userName string, httpsPassword *string,
scName string, isPrimary bool, ctlSetSize int) (httpsCheckSubclusterOp, error) {
op, err := makeHTTPSGetSubclusterInfoOp(useHTTPPassword, userName, httpsPassword, scName, AddSubclusterCmd)
if err != nil {
return op, err
}
op.isSecondary = !isPrimary
op.ctlSetSize = ctlSetSize
return op, nil
}

func (op *httpsCheckSubclusterOp) setupClusterHTTPRequest(hosts []string) error {
for _, host := range hosts {
Expand Down Expand Up @@ -87,10 +98,19 @@ type scInfo struct {
SCName string `json:"subcluster_name"`
IsSecondary bool `json:"is_secondary"`
CtlSetSize int `json:"control_set_size"`
Sandbox string `json:"sandbox"`
IsCritical bool `json:"is_critical"`
}

// Return true if all the results need to be scanned to figure out
// correct subcluster details
func completeScanRequired(cmdType CmdType) bool {
return cmdType == StopSubclusterCmd
}

func (op *httpsCheckSubclusterOp) processResult(_ *opEngineExecContext) error {
var err error
isSubclusterCritical := false

for host, result := range op.clusterHTTPRequest.ResultCollection {
op.logResponse(host, result)
Expand All @@ -107,40 +127,64 @@ func (op *httpsCheckSubclusterOp) processResult(_ *opEngineExecContext) error {

// decode the json-format response
// A successful response object will be like below:

/*
{
"subcluster_name": "sc1",
"control_set_size": 2,
"is_secondary": true,
"is_default": false,
"sandbox": ""
}
{
"subcluster_name": "sc1",
"control_set_size": 2,
"is_secondary": true,
"is_default": false,
"sandbox": "",
"is_critical": false
}
*/
subclusterInfo := scInfo{}
err = op.parseAndCheckResponse(host, result.content, &subclusterInfo)
if err != nil {
return fmt.Errorf(`[%s] fail to parse result on host %s, details: %w`, op.name, host, err)
}

if subclusterInfo.SCName != op.scName {
return fmt.Errorf(`[%s] new subcluster name should be '%s' but got '%s'`, op.name, op.scName, subclusterInfo.SCName)
}
if subclusterInfo.IsSecondary != op.isSecondary {
if op.isSecondary {
return fmt.Errorf(`[%s] new subcluster should be a secondary subcluster but got a primary subcluster`, op.name)
if op.cmdType == AddSubclusterCmd {
err = op.verifySubclusterDetails(&subclusterInfo)
if err != nil {
return fmt.Errorf(`[%s] fail to verify subcluster info on host %s, details: %w`, op.name, host, err)
}
return fmt.Errorf(`[%s] new subcluster should be a primary subcluster but got a secondary subcluster`, op.name)
}
if subclusterInfo.CtlSetSize != op.ctlSetSize {
return fmt.Errorf(`[%s] new subcluster should have control set size as %d but got %d`, op.name, op.ctlSetSize, subclusterInfo.CtlSetSize)

// cache subcluster critical info for stop subcluster command
if subclusterInfo.IsCritical {
isSubclusterCritical = true
}

return nil
// early return if the command only needs response from one host
if !completeScanRequired(op.cmdType) {
return nil
}
}
if op.cmdType == StopSubclusterCmd {
if isSubclusterCritical {
return fmt.Errorf(`[%s] subcluster %s is critical, shutting the subcluster down will cause the whole database/sandbox shutdown`,
op.name, op.scName)
}
}

return err
}

func (op *httpsCheckSubclusterOp) verifySubclusterDetails(subclusterInfo *scInfo) error {
if subclusterInfo.SCName != op.scName {
return fmt.Errorf(`[%s] new subcluster name should be '%s' but got '%s'`, op.name, op.scName, subclusterInfo.SCName)
}
if subclusterInfo.IsSecondary != op.isSecondary {
if op.isSecondary {
return fmt.Errorf(`[%s] new subcluster should be a secondary subcluster but got a primary subcluster`, op.name)
}
return fmt.Errorf(`[%s] new subcluster should be a primary subcluster but got a secondary subcluster`, op.name)
}
if subclusterInfo.CtlSetSize != op.ctlSetSize {
return fmt.Errorf(`[%s] new subcluster should have control set size as %d but got %d`, op.name, op.ctlSetSize, subclusterInfo.CtlSetSize)
}
return nil
}
func (op *httpsCheckSubclusterOp) finalize(_ *opEngineExecContext) error {
return nil
}
23 changes: 0 additions & 23 deletions vclusterops/https_get_up_nodes_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,25 +298,6 @@ func (op *httpsGetUpNodesOp) validateHosts(nodesStates nodesStateInfo) error {
return nil
}

// Confirm shutting down the subcluster doesn't crash the database
func (op *httpsGetUpNodesOp) isSubclusterCritical(nodesStates nodesStateInfo, upScNodes mapset.Set[NodeInfo]) error {
allUpPrimaries := mapset.NewSet[string]()
upScHosts := mapset.NewSet[string]()
for _, n := range upScNodes.ToSlice() {
upScHosts.Add(n.Address)
}
for _, node := range nodesStates.NodeList {
if node.Sandbox == op.sandbox && node.State == util.NodeUpState && node.IsPrimary {
allUpPrimaries.Add(node.Address)
}
}
remainingPrimaries := allUpPrimaries.Difference(upScHosts)
if remainingPrimaries.Cardinality() == 0 {
return fmt.Errorf("subcluster %s is critical, shutting the subcluster down will cause the whole database shutdown", op.scName)
}
return nil
}

// Check if host is eligible to add to the UP hostlist
func (op *httpsGetUpNodesOp) checkUpHostEligible(node *nodeStateInfo) bool {
// Add subcluster needs to get an UP node from main cluster as initiator
Expand Down Expand Up @@ -362,10 +343,6 @@ func (op *httpsGetUpNodesOp) collectUpHosts(nodesStates nodesStateInfo, host str
if !foundSC {
return fmt.Errorf(`[%s] cannot find subcluster %s in database %s`, op.name, op.scName, op.DBName)
}
if op.isScPrimary {
err = op.isSubclusterCritical(nodesStates, upScNodes)
return err
}
}
return nil
}
Expand Down
37 changes: 31 additions & 6 deletions vclusterops/https_poll_subscription_state_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,21 @@ import (
"github.com/vertica/vcluster/vclusterops/util"
)

const (
ACTIVE = "ACTIVE"
REMOVING = "REMOVING"
)

type httpsPollSubscriptionStateOp struct {
opBase
opHTTPSBase
timeout int
nodesToPoll *[]string
timeout int
nodesToPoll *[]string
nodesToPollForRemoval *[]string
}

func makeHTTPSPollSubscriptionStateOp(hosts []string,
useHTTPPassword bool, userName string, httpsPassword *string, nodesToPoll *[]string) (httpsPollSubscriptionStateOp, error) {
func makeHTTPSPollSubscriptionStateOp(hosts []string, useHTTPPassword bool, userName string,
httpsPassword *string, nodesToPoll *[]string, nodesToPollForRemoval *[]string) (httpsPollSubscriptionStateOp, error) {
op := httpsPollSubscriptionStateOp{}
op.name = "HTTPSPollSubscriptionStateOp"
op.description = "Wait for subcluster shard rebalance"
Expand All @@ -41,6 +47,7 @@ func makeHTTPSPollSubscriptionStateOp(hosts []string,
}

op.nodesToPoll = nodesToPoll
op.nodesToPollForRemoval = nodesToPollForRemoval

err := util.ValidateUsernameAndPassword(op.name, useHTTPPassword, userName)
if err != nil {
Expand Down Expand Up @@ -151,8 +158,11 @@ func (op *httpsPollSubscriptionStateOp) shouldStopPolling() (bool, error) {
if containsInactiveSub(&subscriptList, op.nodesToPoll) {
return false, nil
}

op.logger.PrintInfo("All subscriptions are ACTIVE")
if containsRemovingSub(&subscriptList, op.nodesToPollForRemoval) {
return false, nil
}
op.logger.Info("All subscriptions dropped on the removing nodes")
return true, nil
}
}
Expand All @@ -165,11 +175,26 @@ func (op *httpsPollSubscriptionStateOp) shouldStopPolling() (bool, error) {
func containsInactiveSub(subscriptList *subscriptionList, nodesToPoll *[]string) bool {
var allNodesWithInactiveSubs []string
for _, s := range subscriptList.SubscriptionList {
if s.SubscriptionState != "ACTIVE" {
if s.SubscriptionState != ACTIVE {
allNodesWithInactiveSubs = append(allNodesWithInactiveSubs, s.Nodename)
}
}
nodesToPollWithActiveSubs := util.SliceDiff(*nodesToPoll, allNodesWithInactiveSubs)
// all subs of all nodes in nodesToPoll are active
return len(*nodesToPoll) != len(nodesToPollWithActiveSubs)
}

// immediately after calling rebalance_shards() before actually drop a node from catalog
// the subscriptions of the node will become REMOVING status
// we need to wait until shard remove actually remove those REMOVING subscriptions in order to drop a node
func containsRemovingSub(subscriptList *subscriptionList, nodesToPollForRemoval *[]string) bool {
var allNodesWithRemovingSubs []string
for _, s := range subscriptList.SubscriptionList {
if s.SubscriptionState == REMOVING {
allNodesWithRemovingSubs = append(allNodesWithRemovingSubs, s.Nodename)
}
}

nodesToPollWithRemovingSubs := util.SliceCommon(*nodesToPollForRemoval, allNodesWithRemovingSubs)
return len(nodesToPollWithRemovingSubs) != 0
}
21 changes: 14 additions & 7 deletions vclusterops/remove_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,14 +342,21 @@ func (options *VRemoveNodeOptions) completeVDBSetting(vdb *VCoordinationDatabase
return nil
}

func getMainClusterNodes(vdb *VCoordinationDatabase, options *VRemoveNodeOptions, mainClusterNodes *[]string) {
// this finds all main cluster UP nodes, regardless if they will be removed or not
func getMainClusterNodes(vdb *VCoordinationDatabase, options *VRemoveNodeOptions, mainClusterNodes, nodesToRemove *[]string) {
// get nodes that will survive after removal, need to poll for ACTIVE subscriptions for those nodes
hostsAfterRemoval := util.SliceDiff(vdb.HostList, options.HostsToRemove)
for _, host := range hostsAfterRemoval {
vnode := vdb.HostNodeMap[host]
if vnode.Sandbox == "" && vnode.State == util.NodeUpState {
if vnode.Sandbox == util.MainClusterSandbox && vnode.State == util.NodeUpState {
*mainClusterNodes = append(*mainClusterNodes, vnode.Name)
}
}
// get nodes that will be removed to poll for REMOVING subscriptions
for _, host := range options.HostsToRemove {
vnode := vdb.HostNodeMap[host]
*nodesToRemove = append(*nodesToRemove, vnode.Name)
}
}

func getSortedHosts(hostsToRemove []string, hostNodeMap vHostNodeMap) []string {
Expand Down Expand Up @@ -414,18 +421,19 @@ func (vcc VClusterCommands) produceRemoveNodeInstructions(vdb *VCoordinationData
return instructions, err
}

// for Eon DB, we check whether all subscriptions are ACTIVE after rebalance shards
// for Eon DB, we check whether all UP nodes (nodesToPollSubs) have subscriptions being ACTIVE after rebalance shards
// also wait for all REMOVING subscriptions are gone for the nodes to remove (nodesToRemove)
// Sandboxed nodes cannot be removed, so even if the database has sandboxes,
// polling subscriptions for the main cluster is enough
var nodesToPollSubs []string
var nodesToPollSubs, nodesToRemove []string
if len(options.NodesToPullSubs) > 0 {
nodesToPollSubs = options.NodesToPullSubs
} else {
getMainClusterNodes(vdb, options, &nodesToPollSubs)
getMainClusterNodes(vdb, options, &nodesToPollSubs, &nodesToRemove)
}

httpsPollSubscriptionStateOp, e := makeHTTPSPollSubscriptionStateOp(initiatorHost,
usePassword, username, password, &nodesToPollSubs)
usePassword, username, password, &nodesToPollSubs, &nodesToRemove)
if e != nil {
return instructions, e
}
Expand All @@ -439,7 +447,6 @@ func (vcc VClusterCommands) produceRemoveNodeInstructions(vdb *VCoordinationData
}
instructions = append(instructions, &httpsRBCOp)
}

// only remove secondary nodes from spread
err = vcc.produceSpreadRemoveNodeOp(&instructions, options.HostsToRemove,
usePassword, username, password,
Expand Down
7 changes: 7 additions & 0 deletions vclusterops/stop_subcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,12 @@ func (vcc *VClusterCommands) produceStopSCInstructions(options *VStopSubclusterO
}
}

httpsGetSubclusterInfoOp, err := makeHTTPSGetSubclusterInfoOp(usePassword, options.UserName, options.Password,
options.SCName, StopSubclusterCmd)
if err != nil {
return instructions, err
}

httpsGetUpNodesOp, err := makeHTTPSGetUpScNodesOp(options.DBName, options.Hosts,
usePassword, options.UserName, options.Password, StopSubclusterCmd, options.SCName)
if err != nil {
Expand All @@ -194,6 +200,7 @@ func (vcc *VClusterCommands) produceStopSCInstructions(options *VStopSubclusterO

instructions = append(instructions,
&httpsGetUpNodesOp,
&httpsGetSubclusterInfoOp,
&httpsSyncCatalogOp,
&httpsStopSCOp,
&httpsCheckDBRunningOp,
Expand Down
2 changes: 1 addition & 1 deletion vclusterops/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,7 @@ func IsK8sEnvironment() bool {
// GetClusterName can return the correct cluster name based on the sandbox name.
// It can help people to log the cluster name.
func GetClusterName(sandbox string) string {
if sandbox == "" {
if sandbox == MainClusterSandbox {
return "main cluster"
}
return "sandbox " + sandbox
Expand Down

0 comments on commit 025a4ce

Please sign in to comment.