Skip to content

Commit

Permalink
Sync from server repo (eb1b26b32f6)
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt Spilchen committed Apr 9, 2024
1 parent 650f5e7 commit 0e3c670
Show file tree
Hide file tree
Showing 6 changed files with 526 additions and 0 deletions.
1 change: 1 addition & 0 deletions vclusterops/cluster_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,7 @@ type ClusterCommands interface {
VFetchCoordinationDatabase(options *VFetchCoordinationDatabaseOptions) (VCoordinationDatabase, error)
VUnsandbox(options *VUnsandboxOptions) error
VStopSubcluster(options *VStopSubclusterOptions) error
VFetchNodesDetails(options *VFetchNodesDetailsOptions) (NodesDetails, error)
}

type VClusterCommandsLogger struct {
Expand Down
185 changes: 185 additions & 0 deletions vclusterops/fetch_nodes_details.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
(c) Copyright [2023-2024] Open Text.
Licensed under the Apache License, Version 2.0 (the "License");
You may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package vclusterops

import (
"fmt"

"github.com/vertica/vcluster/vclusterops/util"
"github.com/vertica/vcluster/vclusterops/vlog"
)

type NodeState struct {
Name string `json:"name"`
ID uint64 `json:"node_id"`
Address string `json:"address"`
State string `json:"state"`
Database string `json:"database"`
IsPrimary bool `json:"is_primary"`
IsReadOnly bool `json:"is_readonly"`
CatalogPath string `json:"catalog_path"`
DataPath []string `json:"data_path"`
DepotPath string `json:"depot_path"`
SubclusterName string `json:"subcluster_name"`
SubclusterID uint64 `json:"subcluster_id"`
LastMsgFromNodeAt string `json:"last_msg_from_node_at"`
DownSince string `json:"down_since"`
Version string `json:"build_info"`
SandboxName string `json:"sandbox_name"`
NumberShardSubscriptions uint `json:"number_shard_subscriptions"`
}

type StorageLocation struct {
Name string `json:"name"`
ID uint64 `json:"location_id"`
Label string `json:"label"`
UsageType string `json:"location_usage_type"`
Path string `json:"location_path"`
SharingType string `json:"location_sharing_type"`
MaxSize uint64 `json:"max_size"`
DiskPercent string `json:"disk_percent"`
HasCatalog bool `json:"has_catalog"`
Retired bool `json:"retired"`
}

type StorageLocations struct {
StorageLocList []StorageLocation `json:"storage_location_list"`
}

type NodeDetails struct {
NodeState
StorageLocations
}

type NodesDetails []NodeDetails

type hostNodeDetailsMap map[string]*NodeDetails

type VFetchNodesDetailsOptions struct {
DatabaseOptions
}

func VFetchNodesDetailsOptionsFactory() VFetchNodesDetailsOptions {
opt := VFetchNodesDetailsOptions{}
// set default values to the params
opt.setDefaultValues()

return opt
}

func (options *VFetchNodesDetailsOptions) setDefaultValues() {
options.DatabaseOptions.setDefaultValues()
}

func (options *VFetchNodesDetailsOptions) validateOptions(log vlog.Printer) error {
err := options.validateBaseOptions(commandFetchNodesDetails, log)
if err != nil {
return err
}

return nil
}

func (options *VFetchNodesDetailsOptions) analyzeOptions() (err error) {
// resolve RawHosts to be IP addresses
if len(options.RawHosts) > 0 {
options.Hosts, err = util.ResolveRawHostsToAddresses(options.RawHosts, options.IPv6)
if err != nil {
return err
}
}

return nil
}

func (options *VFetchNodesDetailsOptions) validateAnalyzeOptions(log vlog.Printer) error {
if err := options.validateOptions(log); err != nil {
return err
}
return options.analyzeOptions()
}

// VFetchNodesDetails can return nodes' details including node state and storage locations for the provided hosts
func (vcc VClusterCommands) VFetchNodesDetails(options *VFetchNodesDetailsOptions) (nodesDetails NodesDetails, err error) {
/*
* - Validate Options
* - Produce Instructions
* - Create a VClusterOpEngine
* - Give the instructions to the VClusterOpEngine to run
*/

err = options.validateAnalyzeOptions(vcc.Log)
if err != nil {
return nodesDetails, err
}

hostsWithNodeDetails := make(hostNodeDetailsMap, len(options.Hosts))

instructions, err := vcc.produceFetchNodesDetailsInstructions(options, hostsWithNodeDetails)
if err != nil {
return nodesDetails, fmt.Errorf("fail to produce instructions: %w", err)
}

certs := httpsCerts{key: options.Key, cert: options.Cert, caCert: options.CaCert}
clusterOpEngine := makeClusterOpEngine(instructions, &certs)

err = clusterOpEngine.run(vcc.Log)
if err != nil {
return nodesDetails, fmt.Errorf("failed to fetch node details on hosts %v: %w", options.Hosts, err)
}

for _, nodeDetails := range hostsWithNodeDetails {
nodesDetails = append(nodesDetails, *nodeDetails)
}

return nodesDetails, nil
}

// produceFetchNodesDetails will build a list of instructions to execute for
// the fetch node details operation.
//
// The generated instructions will later perform the following operations:
// - Get nodes' state by calling /v1/node
// - Get nodes' storage locations by calling /v1/node/storage-locations
func (vcc *VClusterCommands) produceFetchNodesDetailsInstructions(options *VFetchNodesDetailsOptions,
hostsWithNodeDetails hostNodeDetailsMap) ([]clusterOp, error) {
var instructions []clusterOp

// when password is specified, we will use username/password to call https endpoints
err := options.setUsePassword(vcc.Log)
if err != nil {
return instructions, err
}

httpsGetNodeStateOp, err := makeHTTPSGetLocalNodeStateOp(*options.DBName, options.Hosts,
options.usePassword, *options.UserName, options.Password, hostsWithNodeDetails)
if err != nil {
return instructions, err
}

httpsGetStorageLocationsOp, err := makeHTTPSGetStorageLocsOp(options.Hosts, options.usePassword,
*options.UserName, options.Password, hostsWithNodeDetails)
if err != nil {
return instructions, err
}

instructions = append(instructions,
&httpsGetNodeStateOp,
&httpsGetStorageLocationsOp,
)

return instructions, nil
}
38 changes: 38 additions & 0 deletions vclusterops/fetch_nodes_details_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
(c) Copyright [2023-2024] Open Text.
Licensed under the Apache License, Version 2.0 (the "License");
You may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package vclusterops

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestRequiredOptions(t *testing.T) {
options := VFetchNodesDetailsOptionsFactory()
vcc := VClusterCommands{}

// dbName is required
nodesDetails, err := vcc.VFetchNodesDetails(&options)
assert.Empty(t, nodesDetails)
assert.ErrorContains(t, err, `must specify a database name`)

// hosts are required
*options.DBName = "testDB"
nodesDetails, err = vcc.VFetchNodesDetails(&options)
assert.Empty(t, nodesDetails)
assert.ErrorContains(t, err, `must specify a host or host list`)
}
156 changes: 156 additions & 0 deletions vclusterops/https_get_local_node_state_op.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
(c) Copyright [2023-2024] Open Text.
Licensed under the Apache License, Version 2.0 (the "License");
You may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package vclusterops

import (
"fmt"

"github.com/vertica/vcluster/vclusterops/util"
)

type httpsGetLocalNodeStateOp struct {
opBase
opHTTPSBase
dbName string
hostsWithNodeDetails hostNodeDetailsMap
}

func makeHTTPSGetLocalNodeStateOp(dbName string, hosts []string, useHTTPPassword bool, userName string,
httpsPassword *string, hostsWithNodeDetails hostNodeDetailsMap) (httpsGetLocalNodeStateOp, error) {
op := httpsGetLocalNodeStateOp{}
op.name = "HTTPSGetLocalNodeStateOp"
op.description = "Get local node state"
op.useHTTPPassword = useHTTPPassword
if useHTTPPassword {
err := util.ValidateUsernameAndPassword(op.name, useHTTPPassword, userName)
if err != nil {
return op, err
}
op.userName = userName
op.httpsPassword = httpsPassword
}
op.dbName = dbName
op.hosts = hosts
op.hostsWithNodeDetails = hostsWithNodeDetails
return op, nil
}

func (op *httpsGetLocalNodeStateOp) setupClusterHTTPRequest(hosts []string) error {
for _, host := range hosts {
httpRequest := hostHTTPRequest{}
httpRequest.Method = GetMethod
httpRequest.buildHTTPSEndpoint("node")
if op.useHTTPPassword {
httpRequest.Password = op.httpsPassword
httpRequest.Username = op.userName
}
op.clusterHTTPRequest.RequestCollection[host] = httpRequest
}

return nil
}

func (op *httpsGetLocalNodeStateOp) prepare(execContext *opEngineExecContext) error {
execContext.dispatcher.setup(op.hosts)

return op.setupClusterHTTPRequest(op.hosts)
}

func (op *httpsGetLocalNodeStateOp) execute(execContext *opEngineExecContext) error {
if err := op.runExecute(execContext); err != nil {
return err
}

return op.processResult(execContext)
}

type nodeStateResp struct {
NodeStates []NodeState `json:"node_list"`
}

func (op *httpsGetLocalNodeStateOp) processResult(_ *opEngineExecContext) error {
for host, result := range op.clusterHTTPRequest.ResultCollection {
op.logResponse(host, result)

if !result.isPassing() {
// we need to collect all nodes info, if one host failed to collect the info,
// we consider the operation failed.
return result.err
}

// decode the json-format response
// The successful response will contain one node's info:
/*
{
"detail": null,
"node_list": [
{
"name": "v_test_db_node0001",
"node_id": 45035996273704992,
"address": "192.168.1.101",
"state": "UP",
"database": "test_db",
"is_primary": true,
"is_readonly": false,
"catalog_path": "\/data\/test_db\/v_test_db_node0001_catalog\/Catalog",
"data_path": [
"\/data\/test_db\/v_test_db_node0001_data"
],
"depot_path": "\/data\/test_db\/v_test_db_node0001_depot",
"subcluster_id": 45035996273704988,
"subcluster_name": "default_subcluster",
"last_msg_from_node_at": "2024-04-05T12:33:19.975952-04",
"down_since": null,
"build_info": "v24.3.0-a0efe9ba3abb08d9e6472ffc29c8e0949b5998d2",
"sandbox_name": "",
"number_shard_subscriptions": 3
}
]
}
*/
resp := nodeStateResp{}
err := op.parseAndCheckResponse(host, result.content, &resp)
if err != nil {
return fmt.Errorf(`[%s] failed to parse result on host %s, details: %w`, op.name, host, err)
}

// verify if the endpoint returns correct node info
if len(resp.NodeStates) != 1 {
return fmt.Errorf(`[%s] response from host %s should contain state for one node rather than %d node(s)`,
op.name, host, len(resp.NodeStates))
}
nodeState := resp.NodeStates[0]
if nodeState.Database != op.dbName {
return fmt.Errorf(`[%s] node is running in database %s rather than database %s on host %s`,
op.name, nodeState.Database, op.dbName, host)
}
if nodeState.Address != host {
return fmt.Errorf(`[%s] node state is for host %s rather than host %s`,
op.name, nodeState.Address, host)
}

// collect node state
nodeDetails := NodeDetails{}
nodeDetails.NodeState = nodeState
op.hostsWithNodeDetails[host] = &nodeDetails
}

return nil
}

func (op *httpsGetLocalNodeStateOp) finalize(_ *opEngineExecContext) error {
return nil
}
Loading

0 comments on commit 0e3c670

Please sign in to comment.