diff --git a/CHANGELOG.md b/CHANGELOG.md index 5749a4d..8056c00 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Fixed +- CASMTRIAGE-7594 - clean up resilience, rebalance nodes, and accept other worker nodes +- CASMCMS-9126 - watch permissions on log files to insure they can be written to ## [2.6.0] - 2024-11-22 ### Fixed diff --git a/src/console_node/consoleNodeMain.go b/src/console_node/consoleNodeMain.go index b061452..18a8b47 100644 --- a/src/console_node/consoleNodeMain.go +++ b/src/console_node/consoleNodeMain.go @@ -55,6 +55,9 @@ var httpListen string = ":26776" // global to signify service is shutting down var inShutdown bool = false +// global pointer to the active OperatorService +var opService OperatorService = nil + // identify what the name of this pod is func setPodName() { // The pod name is set as an env variable by the k8s system on pod @@ -142,10 +145,10 @@ func main() { setPodName() // Construct services - operatorService := NewOperatorService() + opService = NewOperatorService() // Find pod location in k8s, this must block and retry - setPodLocation(operatorService) + setPodLocation(opService) // start the aggregation log respinAggLog() diff --git a/src/console_node/data.go b/src/console_node/data.go index 60f39dd..9b6ca07 100644 --- a/src/console_node/data.go +++ b/src/console_node/data.go @@ -62,6 +62,36 @@ type NodeConsoleInfo struct { NodeConsoleName string `json:"nodeconsolename"` // the pod console } +// Struct to hold information about currently active node pods +type NodePodInfo struct { + NumActivePods int `json:"numactivepods"` +} + +// Query the console-data pod to get the number of currently active console-node pods +func getNumActiveNodePods() (int, error) { + retVal := 1 + // make the call to console-data + url := fmt.Sprintf("%s/activepods", dataAddrBase) + rb, _, err := getURL(url, nil) + if err != nil { + log.Printf("Error in console-data active pods query: %s", err) + return retVal, err + } + + // process the return + var numPodsInfo NodePodInfo + if rb != nil { + // should be an array of nodeConsoleInfo structs + err := json.Unmarshal(rb, &numPodsInfo) + if err != nil { + log.Printf("Error unmarshalling active pods return data: %s", err) + return retVal, err + } + retVal = numPodsInfo.NumActivePods + } + return retVal, nil +} + // Function to acquire new consoles to monitor func acquireNewNodes(numMtn, numRvr int, podLocation *PodLocationDataResponse) []nodeConsoleInfo { // NOTE: in doGetNewNodes thread diff --git a/src/console_node/monitor.go b/src/console_node/monitor.go index 2a477da..39d539b 100644 --- a/src/console_node/monitor.go +++ b/src/console_node/monitor.go @@ -29,6 +29,7 @@ package main import ( "bytes" "crypto/sha256" + "fmt" "io" "log" "os" @@ -60,12 +61,35 @@ func checkForChanges() { restartConman = true } + // make sure that the log files still have the correct permissions + checkLogFiles() + //restart conman if necessary if restartConman { signalConmanTERM() } } +// function to check the permissions on the log files +func checkLogFiles() { + // gather the names of the current nodes + nodes := getCurrNodeXnames() + + // check the write permissions of the log files + for _, nn := range nodes { + filename := fmt.Sprintf("/var/log/conman/console.%s", nn) + fs, err := os.Stat(filename) + if err != nil { + continue + } + if fs.Mode()&0600 == 0 { + log.Printf("Log file %s not user read/write - changing permissions", nn) + newMod := fs.Mode() | 0600 + os.Chmod(filename, newMod) + } + } +} + // function to continuously monitor for changes that require conman to restart func doMonitor() { // NOTE: this is intended to be constantly running in its own thread diff --git a/src/console_node/nodes.go b/src/console_node/nodes.go index ad38458..969c846 100644 --- a/src/console_node/nodes.go +++ b/src/console_node/nodes.go @@ -80,6 +80,7 @@ var currentRvrNodes map[string]*nodeConsoleInfo = make(map[string]*nodeConsoleIn var currentPdsNodes map[string]*nodeConsoleInfo = make(map[string]*nodeConsoleInfo) // [xname,*consoleInfo] // Number of nodes this pod should be watching +// NOTE: deprecated, prefer to call OperatorService.getCurrentTargets() var targetRvrNodes int = -1 var targetMtnNodes int = -1 @@ -94,6 +95,27 @@ var newNodeLookupSec int = 30 // a shared file system so console-node pods can read what is set here const targetNodeFile string = "/var/log/console/TargetNodes.txt" +// function to safely get the current node xnames +func getCurrNodeXnames() []string { + // put a lock on the current nodes while looking for new ones + currNodesMutex.Lock() + defer currNodesMutex.Unlock() + + // gather the names of all the current nodes being watched + var retVal []string + for key := range currentMtnNodes { + retVal = append(retVal, key) + } + for key := range currentRvrNodes { + retVal = append(retVal, key) + } + for key := range currentPdsNodes { + retVal = append(retVal, key) + } + + return retVal +} + // small helper function to insure correct number of nodes asked for func pinNumNodes(numAsk, numMax int) int { // insure the input number ends in range [0,numMax] @@ -107,7 +129,101 @@ func pinNumNodes(numAsk, numMax int) int { return numAsk } +// local max function for ints +func locMax(a, b int) int { + // NOTE: this may be removed in favor of the standard 'max' function when we upgrade past 1.18 + if a > b { + return a + } + return b +} + +// helper function to calculate how many nodes to ask for +func calcChangeInNodes() (deltaMtn, deltaRvr int) { + // The change to the number of nodes being monitored - positive + // means to acquire more, negative to release some. + deltaMtn = 0 + deltaRvr = 0 + + // Update the target number of nodes being monitored + updateNodesPerPod() + + // NOTE: the 'target' values should be the total number of nodes divided + // by the number of console-node pods. Where this gets tricky is if + // one or more console-node pods has failed. Then we want to take over + // the unmonitored nodes until that pod is back up, then give them back. + // We also always want to have extra space to allow a node to shift so + // a console-node pod isn't monitoring the worker it is running on. + + // Get the current number of nodes being monitored here + currNumRvr := len(currentRvrNodes) + currNumMtn := len(currentMtnNodes) + + // get the number of currently active nodes from the data service + numPods, errNumPods := getNumActiveNodePods() + if errNumPods != nil { + log.Print("Unable to find current number of active nodes, defaulting to 1") + numPods = 1 + } + + // get the current targets from the operator service, default to something + // reasonable if we can't contact the operator service + currTargets, errCurrTargets := opService.getCurrentTargets() + if errCurrTargets != nil { + log.Print("Unable to find current targets from operator service - defaulting to something reasonable") + } + + // Figure out how to adjust the number of nodes managed by this pod + if errCurrTargets == nil && errNumPods == nil { + // we have good current information from data and operator services, so use it + // Try to balance the number of nodes with the total of node pods + safeNumPods := locMax(numPods, 1) + idealNumRvr := (currTargets.TotalRvrNodes / safeNumPods) + 1 + idealNumMtn := (currTargets.TotalMtnNodes / safeNumPods) + 1 + + log.Printf("calcChangeInNodes") + log.Printf(" Active pods: %d", numPods) + log.Printf(" Current river nodes: %d", currNumRvr) + log.Printf(" Op: total river nodes: %d", currTargets.TotalRvrNodes) + log.Printf(" Op: target river nodes: %d", currTargets.TargetNumRvrNodes) + log.Printf(" Ideal river nodes: %d", idealNumRvr) + + if currNumRvr < idealNumRvr { + log.Printf(" ** (currNumRvr < idealNumRvr) = true") + } + if float64(currNumRvr) > (1.1 * float64(idealNumRvr)) { + log.Printf(" ** (float64(currNumRvr) > (1.1 * float64(idealNumRvr))) = true") + log.Printf(" float64(currNumRvr)=%f, (1.1 * float64(idealNumRvr))=%f", float64(currNumRvr), 1.1*float64(idealNumRvr)) + } + // if we are under the ideal number, try to match + // if we are more than 10% over the ideal, give some back + // NOTE: there is a gap where we don't change the number to try and avoid bouncing + if (currNumRvr < idealNumRvr) || (float64(currNumRvr) > (1.1 * float64(idealNumRvr))) { + // get more to bring us up to the ideal number of nodes + deltaRvr = idealNumRvr - currNumRvr + log.Printf(" ** Setting deltaRvr=%d", deltaRvr) + } + if (currNumMtn < idealNumMtn) || (float64(currNumMtn) > (1.1 * float64(idealNumMtn))) { + // get more to bring us up to the ideal number of nodes + deltaMtn = idealNumMtn - currNumMtn + } + } else { + log.Printf("calcChangeInNodes: unable to do detailed calculation") + // we have having problems contacting the data and operator services, so guess + deltaRvr = pinNumNodes(targetRvrNodes-currNumRvr, maxAcquireRvr) + deltaMtn = pinNumNodes(targetMtnNodes-currNumMtn, maxAcquireMtn) + } + + return deltaMtn, deltaRvr +} + func doGetNewNodes() { + // if the pod is shutting down, don't touch the current nodes + if inShutdown { + log.Print("In pod shutdown, skipping doGetNewNodes") + return + } + // put a lock on the current nodes while looking for new ones currNodesMutex.Lock() defer currNodesMutex.Unlock() @@ -115,46 +231,49 @@ func doGetNewNodes() { // keep track of if we need to redo the configuration changed := false - // Update the target number of nodes being monitored - updateNodesPerPod() + // Figure out how to adjust the number of nodes being monitored + deltaMtn, deltaRvr := calcChangeInNodes() - // Check if we need to gather more nodes - don't take more - // if the service is shutting down - if !inShutdown && (len(currentRvrNodes) < targetRvrNodes || len(currentMtnNodes) < targetMtnNodes) { - // figure out how many of each to ask for - numRvr := pinNumNodes(targetRvrNodes-len(currentRvrNodes), maxAcquireRvr) - numMtn := pinNumNodes(targetMtnNodes-len(currentMtnNodes), maxAcquireMtn) - - // attempt to acquire more nodes - if numRvr > 0 || numMtn > 0 { - // NOTE: this should be the ONLY place where the maps of - // current nodes is updated!!! - // NOTE: paradise nodes are included in mountain count - newNodes := acquireNewNodes(numMtn, numRvr, podLocData) - // process the new nodes - for i, node := range newNodes { - //log.Printf(" Processing node: %s", node.String()) - if node.isRiver() { - currentRvrNodes[node.NodeName] = &newNodes[i] - changed = true - } else if node.isMountain() { - currentMtnNodes[node.NodeName] = &newNodes[i] - changed = true - } else if node.isParadise() { - currentPdsNodes[node.NodeName] = &newNodes[i] - changed = true - } + // Make sure we can always take one river node if we need to move a worker node + if deltaRvr == 0 { + deltaRvr = 1 + } + + log.Printf("doGetNewNodes - deltaRvr: %d, deltaMtn: %d", deltaRvr, deltaMtn) + + // From the change numbers, pull out how many to add (if any) + // NOTE: paradise nodes are included in mountain count + numAcqRvr := pinNumNodes(deltaRvr, maxAcquireRvr) + numAcqMtn := pinNumNodes(deltaMtn, maxAcquireMtn) + + if numAcqRvr > 0 || numAcqMtn > 0 { + newNodes := acquireNewNodes(numAcqMtn, numAcqRvr, podLocData) + // process the new nodes + // NOTE: this should be the ONLY place where the maps of + // current nodes is updated!!! + newRvr := 0 + newMtn := 0 + newPds := 0 + for i, node := range newNodes { + if node.isRiver() { + currentRvrNodes[node.NodeName] = &newNodes[i] + changed = true + newRvr++ + } else if node.isMountain() { + currentMtnNodes[node.NodeName] = &newNodes[i] + changed = true + newMtn++ + } else if node.isParadise() { + currentPdsNodes[node.NodeName] = &newNodes[i] + changed = true + newPds++ } - } else { - log.Printf("Nothing to acquire after pin...") } - } else { - log.Printf("Skipping acquire - at capacity. CurRvr:%d, TarRvr:%d, CurMtn:%d, TarMtn:%d", - len(currentRvrNodes), targetRvrNodes, len(currentMtnNodes)+len(currentPdsNodes), targetMtnNodes) + log.Printf(" Added River:%d, Mountain:%d, Paradise:%d", newRvr, newMtn, newPds) } // See if we have too many nodes - if rebalanceNodes() { + if rebalanceNodes(deltaRvr, deltaMtn) { changed = true } @@ -182,68 +301,69 @@ func watchForNodes() { } // If we have too many nodes, release some -func rebalanceNodes() bool { +func rebalanceNodes(deltaRvr, deltaMtn int) bool { // NOTE: this function just modifies currentNodes lists and stops // tailing operation. The configuration files will be triggered to be // regenerated outside of this operation. // NOTE: in doGetNewNodes thread - // see if we need to release any nodes - if len(currentRvrNodes) <= targetRvrNodes && len(currentMtnNodes) <= targetMtnNodes { - log.Printf("Current number of nodes within target range - no rebalance needed") - return false - } - // gather nodes to give back var rn []nodeConsoleInfo // release river nodes until match target number // NOTE: map iteration is random - for key, ni := range currentRvrNodes { - if len(currentRvrNodes) > targetRvrNodes { - // remove another one - rn = append(rn, *ni) - delete(currentRvrNodes, key) - - // stop tailing this file - stopTailing(key) - } else { - // done so break - break + if deltaRvr < 0 { + endNumRvr := len(currentRvrNodes) + deltaRvr + for key, ni := range currentRvrNodes { + if len(currentRvrNodes) > endNumRvr { + // remove another one + rn = append(rn, *ni) + delete(currentRvrNodes, key) + + // stop tailing this file + stopTailing(key) + } else { + // done so break + break + } } } // release mtn nodes until match target number // NOTE: paradise nodes count towards mountain limits, remove from both - for len(currentMtnNodes)+len(currentPdsNodes) > targetMtnNodes { - // balance removal so take from whichever pool is larger, one at a time - targetPool := ¤tPdsNodes - if len(currentMtnNodes) > len(currentPdsNodes) { - targetPool = ¤tMtnNodes - } + if deltaMtn < 0 { + endNumMtn := len(currentMtnNodes) + len(currentPdsNodes) + deltaMtn + for len(currentMtnNodes)+len(currentPdsNodes) > endNumMtn { + // balance removal so take from whichever pool is larger, one at a time + targetPool := ¤tPdsNodes + if len(currentMtnNodes) > len(currentPdsNodes) { + targetPool = ¤tMtnNodes + } - // make sure we didn't hit some weird condition where both lists are empty - if len(*targetPool) == 0 { - break - } + // make sure we didn't hit some weird condition where both lists are empty + if len(*targetPool) == 0 { + break + } - // remove a node from the target pool - // NOTE: map iteration is random - use it to grab a random node to remove - for key, ni := range *targetPool { - // remove node - rn = append(rn, *ni) - delete(*targetPool, key) + // remove a node from the target pool + // NOTE: map iteration is random - use it to grab a random node to remove + for key, ni := range *targetPool { + // remove node + rn = append(rn, *ni) + delete(*targetPool, key) - // stop tailing this file - stopTailing(key) + // stop tailing this file + stopTailing(key) - // only want to remove one at a time - break + // only want to remove one at a time + break + } } } if len(rn) > 0 { + log.Printf("Rebalance operation is releasing %d nodes", len(rn)) // notify console-data that we are no longer tracking these nodes releaseNodes(rn) @@ -281,13 +401,12 @@ func releaseNode(xname string) bool { // Update the number of target consoles per node pod func updateNodesPerPod() { - // NOTE: for the time being we will just put this information - // into a simple text file on a pvc shared with console-operator - // and console-node pods. The console-operator will write changes - // and the console-node pods will read periodically for changes. - // This mechanism can be made more elegant later if needed but it - // needs to be something that can be picked up by all console-node - // pods without restarting them. + + // NOTE: this is in the process of being deprecated - now the number of + // targeted nodes should be retrieved through the console-operator + // http api via the OperatorService.getCurrentTargets() function call. + // Ths is being left in for a backup mechanism in case the http function + // fails. // NOTE: in doGetNewNodes thread diff --git a/src/console_node/operator.go b/src/console_node/operator.go index 6e95d23..82d678d 100644 --- a/src/console_node/operator.go +++ b/src/console_node/operator.go @@ -36,6 +36,7 @@ import ( type OperatorService interface { getPodLocation(podId string) (podLoc *PodLocationDataResponse, err error) OperatorRetryInterval() time.Duration + getCurrentTargets() (*CurrentTargets, error) } type OperatorManager struct { @@ -55,6 +56,14 @@ func (om OperatorManager) OperatorRetryInterval() time.Duration { return om.operatorRetryInterval } +type CurrentTargets struct { + TargetNumRvrNodes int `json:"targetnumrvrnodes"` + TargetNumMtnNodes int `json:"targetnummtnnodes"` + TotalRvrNodes int `json:"totalrvrnodes"` + TotalMtnNodes int `json:"totalmtnnodes"` + TargetNumNodePods int `json:"targetnumnodepods"` +} + type PodLocationDataResponse struct { PodName string `json:"podname"` Alias string `json:"alias"` @@ -85,3 +94,30 @@ func (om OperatorManager) getPodLocation(podID string) (data *PodLocationDataRes return resp, nil } + +// Function to get the current target and node count information from console-operator +func (om OperatorManager) getCurrentTargets() (*CurrentTargets, error) { + // make the http call + url := fmt.Sprintf("%s/currentTargets", om.operatorAddrBase) + rb, sc, err := getURL(url, nil) + if err != nil { + log.Printf("Error making GET to %s\n", url) + return nil, err + } + + if sc != 200 && err == nil { + log.Printf("Failed to get current targets, sc=%d", sc) + return nil, errors.New("failed to get current targets") + } + + var resp = new(CurrentTargets) + if rb != nil { + err := json.Unmarshal(rb, &resp) + if err != nil { + log.Printf("Error unmarshalling return data: %s\n", err) + return nil, err + } + } + + return resp, nil +}