Skip to content

Commit

Permalink
Use NodeSystemInfo rather than node labels (#152)
Browse files Browse the repository at this point in the history
* Use NodeSystemInfo rather than node labels

Signed-off-by: Raul Sevilla <[email protected]>

* Add missing err check

Signed-off-by: Raul Sevilla <[email protected]>

---------

Signed-off-by: Raul Sevilla <[email protected]>
  • Loading branch information
rsevilla87 authored Oct 1, 2024
1 parent 22f69b9 commit e0563da
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 169 deletions.
4 changes: 1 addition & 3 deletions cmd/k8s-netperf/k8s-netperf.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ var rootCmd = &cobra.Command{

if pavail {
for i, npr := range sr.Results {
if len(npr.ClientNodeInfo.Hostname) > 0 && len(npr.ServerNodeInfo.Hostname) > 0 {
if len(npr.ClientNodeInfo.NodeName) > 0 && len(npr.ServerNodeInfo.NodeName) > 0 {
sr.Results[i].ClientMetrics, _ = metrics.QueryNodeCPU(npr.ClientNodeInfo, pcon, npr.StartTime, npr.EndTime)
sr.Results[i].ServerMetrics, _ = metrics.QueryNodeCPU(npr.ServerNodeInfo, pcon, npr.StartTime, npr.EndTime)
sr.Results[i].ClientPodCPU, _ = metrics.TopPodCPU(npr.ClientNodeInfo, pcon, npr.StartTime, npr.EndTime)
Expand Down Expand Up @@ -465,8 +465,6 @@ func executeWorkload(nc config.Config,
npr.EndTime = time.Now()
npr.ClientNodeInfo = s.ClientNodeInfo
npr.ServerNodeInfo = s.ServerNodeInfo
npr.ServerNodeLabels, _ = k8s.GetNodeLabels(s.ClientSet, s.ServerNodeInfo.Hostname)
npr.ClientNodeLabels, _ = k8s.GetNodeLabels(s.ClientSet, s.ClientNodeInfo.Hostname)

return npr
}
Expand Down
108 changes: 55 additions & 53 deletions pkg/archive/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,36 +19,36 @@ const ltcyMetric = "usec"

// Doc struct of the JSON document to be indexed
type Doc struct {
UUID string `json:"uuid"`
Timestamp time.Time `json:"timestamp"`
HostNetwork bool `json:"hostNetwork"`
Driver string `json:"driver"`
Parallelism int `json:"parallelism"`
Profile string `json:"profile"`
Duration int `json:"duration"`
Service bool `json:"service"`
Local bool `json:"local"`
Virt bool `json:"virt"`
AcrossAZ bool `json:"acrossAZ"`
Samples int `json:"samples"`
Messagesize int `json:"messageSize"`
Burst int `json:"burst"`
Throughput float64 `json:"throughput"`
Latency float64 `json:"latency"`
TputMetric string `json:"tputMetric"`
LtcyMetric string `json:"ltcyMetric"`
TCPRetransmit float64 `json:"tcpRetransmits"`
UDPLossPercent float64 `json:"udpLossPercent"`
ToolVersion string `json:"toolVersion"`
ToolGitCommit string `json:"toolGitCommit"`
Metadata result.Metadata `json:"metadata"`
ServerNodeCPU metrics.NodeCPU `json:"serverCPU"`
ServerPodCPU []metrics.PodCPU `json:"serverPods"`
ClientNodeCPU metrics.NodeCPU `json:"clientCPU"`
ClientPodCPU []metrics.PodCPU `json:"clientPods"`
ClientNodeLabels map[string]string `json:"clientNodeLabels"`
ServerNodeLabels map[string]string `json:"serverNodeLabels"`
Confidence []float64 `json:"confidence"`
UUID string `json:"uuid"`
Timestamp time.Time `json:"timestamp"`
HostNetwork bool `json:"hostNetwork"`
Driver string `json:"driver"`
Parallelism int `json:"parallelism"`
Profile string `json:"profile"`
Duration int `json:"duration"`
Service bool `json:"service"`
Local bool `json:"local"`
Virt bool `json:"virt"`
AcrossAZ bool `json:"acrossAZ"`
Samples int `json:"samples"`
Messagesize int `json:"messageSize"`
Burst int `json:"burst"`
Throughput float64 `json:"throughput"`
Latency float64 `json:"latency"`
TputMetric string `json:"tputMetric"`
LtcyMetric string `json:"ltcyMetric"`
TCPRetransmit float64 `json:"tcpRetransmits"`
UDPLossPercent float64 `json:"udpLossPercent"`
ToolVersion string `json:"toolVersion"`
ToolGitCommit string `json:"toolGitCommit"`
Metadata result.Metadata `json:"metadata"`
ServerNodeCPU metrics.NodeCPU `json:"serverCPU"`
ServerPodCPU []metrics.PodCPU `json:"serverPods"`
ClientNodeCPU metrics.NodeCPU `json:"clientCPU"`
ClientPodCPU []metrics.PodCPU `json:"clientPods"`
Confidence []float64 `json:"confidence"`
ServerNodeInfo metrics.NodeInfo `json:"serverNodeInfo"`
ClientNodeInfo metrics.NodeInfo `json:"clientNodeInfo"`
}

// Connect returns a client connected to the desired cluster.
Expand Down Expand Up @@ -89,29 +89,31 @@ func BuildDocs(sr result.ScenarioResults, uuid string) ([]interface{}, error) {
}
c := []float64{lo, hi}
d := Doc{
UUID: uuid,
Timestamp: time,
ToolVersion: sr.Version,
ToolGitCommit: sr.GitCommit,
Driver: r.Driver,
HostNetwork: r.HostNetwork,
Parallelism: r.Parallelism,
Profile: r.Profile,
Duration: r.Duration,
Virt: sr.Virt,
Samples: r.Samples,
Service: r.Service,
Messagesize: r.MessageSize,
Burst: r.Burst,
TputMetric: r.Metric,
LtcyMetric: ltcyMetric,
ServerNodeCPU: r.ServerMetrics,
ClientNodeCPU: r.ClientMetrics,
ServerPodCPU: r.ServerPodCPU.Results,
ClientPodCPU: r.ClientPodCPU.Results,
Metadata: sr.Metadata,
AcrossAZ: r.AcrossAZ,
Confidence: c,
UUID: uuid,
Timestamp: time,
ToolVersion: sr.Version,
ToolGitCommit: sr.GitCommit,
Driver: r.Driver,
HostNetwork: r.HostNetwork,
Parallelism: r.Parallelism,
Profile: r.Profile,
Duration: r.Duration,
Virt: sr.Virt,
Samples: r.Samples,
Service: r.Service,
Messagesize: r.MessageSize,
Burst: r.Burst,
TputMetric: r.Metric,
LtcyMetric: ltcyMetric,
ServerNodeCPU: r.ServerMetrics,
ClientNodeCPU: r.ClientMetrics,
ServerPodCPU: r.ServerPodCPU.Results,
ClientPodCPU: r.ClientPodCPU.Results,
Metadata: sr.Metadata,
AcrossAZ: r.AcrossAZ,
Confidence: c,
ClientNodeInfo: r.ClientNodeInfo,
ServerNodeInfo: r.ServerNodeInfo,
}
UDPLossPercent, e := result.Average(r.LossSummary)
if e != nil {
Expand Down
141 changes: 34 additions & 107 deletions pkg/k8s/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -153,16 +154,11 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error {
}

// Get node count
nodes, err := client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{LabelSelector: "node-role.kubernetes.io/worker="})
nodes, err := client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{LabelSelector: "node-role.kubernetes.io/worker=,node-role.kubernetes.io/infra!="})
if err != nil {
return err
}
ncount := 0
for _, node := range nodes.Items {
if _, ok := node.Labels["node-role.kubernetes.io/infra"]; !ok {
ncount++
}
}
ncount := len(nodes.Items)
log.Debugf("Number of nodes with role worker: %d", ncount)
if (s.HostNetwork || !s.NodeLocal) && ncount < 2 {
return fmt.Errorf(" not enough nodes with label worker= to execute test (current number of nodes: %d).", ncount)
Expand Down Expand Up @@ -218,7 +214,10 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error {
return err
}
}
s.ClientNodeInfo, _ = GetPodNodeInfo(client, cdp)
s.ClientNodeInfo, err = GetPodNodeInfo(client, labels.Set(cdp.Labels).String())
if err != nil {
return err
}
}

// Create iperf service
Expand Down Expand Up @@ -410,7 +409,6 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error {
}
sdpHost.PodAntiAffinity = antiAffinity
}

if ncount > 1 {
if s.HostNetwork {
if !s.VM {
Expand All @@ -431,9 +429,12 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error {
if err != nil {
return err
}
s.ServerNodeInfo, _ = GetPodNodeInfo(client, sdp)
s.ServerNodeInfo, err = GetPodNodeInfo(client, labels.Set(sdp.Labels).String())
if err != nil {
return err
}
if !s.NodeLocal {
s.ClientNodeInfo, _ = GetPodNodeInfo(client, cdpAcross)
s.ClientNodeInfo, err = GetPodNodeInfo(client, labels.Set(cdpAcross.Labels).String())
}
if err != nil {
return err
Expand All @@ -459,17 +460,17 @@ func launchServerVM(perf *config.PerfScenarios, name string, podAff *corev1.PodA
return err
}
if strings.Contains(name, "host") {
perf.ServerHost, err = GetNakedPods(perf.ClientSet, fmt.Sprintf("app=%s", serverRole))
perf.ServerHost, err = GetPods(perf.ClientSet, fmt.Sprintf("app=%s", serverRole))
if err != nil {
return err
}
} else {
perf.Server, err = GetNakedPods(perf.ClientSet, fmt.Sprintf("app=%s", serverRole))
perf.Server, err = GetPods(perf.ClientSet, fmt.Sprintf("app=%s", serverRole))
if err != nil {
return err
}
}
perf.ServerNodeInfo, _ = GetNakedPodNodeInfo(perf.ClientSet, fmt.Sprintf("app=%s", serverRole))
perf.ServerNodeInfo, _ = GetPodNodeInfo(perf.ClientSet, fmt.Sprintf("app=%s", serverRole))
return nil
}

Expand All @@ -485,17 +486,17 @@ func launchClientVM(perf *config.PerfScenarios, name string, podAff *corev1.PodA
return err
}
if strings.Contains(name, "host") {
perf.ClientHost, err = GetNakedPods(perf.ClientSet, fmt.Sprintf("app=%s", name))
perf.ClientHost, err = GetPods(perf.ClientSet, fmt.Sprintf("app=%s", name))
if err != nil {
return err
}
} else {
perf.ClientAcross, err = GetNakedPods(perf.ClientSet, fmt.Sprintf("app=%s", name))
perf.ClientAcross, err = GetPods(perf.ClientSet, fmt.Sprintf("app=%s", name))
if err != nil {
return err
}
}
perf.ClientNodeInfo, _ = GetNakedPodNodeInfo(perf.ClientSet, fmt.Sprintf("app=%s", name))
perf.ClientNodeInfo, _ = GetPodNodeInfo(perf.ClientSet, fmt.Sprintf("app=%s", name))
return nil
}

Expand Down Expand Up @@ -525,7 +526,7 @@ func deployDeployment(client *kubernetes.Clientset, dp DeploymentParams) (corev1
return pods, err
}
// Retrieve pods which match the server/client role labels
pods, err = GetPods(client, dp)
pods, err = GetPods(client, labels.Set(dp.Labels).String())
if err != nil {
return pods, err
}
Expand All @@ -544,7 +545,7 @@ func WaitForReady(c *kubernetes.Clientset, dp DeploymentParams) (bool, error) {
for event := range dw.ResultChan() {
d, ok := event.Object.(*appsv1.Deployment)
if !ok {
fmt.Println("❌ Issue with the Deployment")
log.Error("❌ Issue with the Deployment")
}
if d.Name == dp.Name {
if d.Status.ReadyReplicas == 1 {
Expand Down Expand Up @@ -660,46 +661,8 @@ func CreateDeployment(dp DeploymentParams, client *kubernetes.Clientset) (*appsv
return dc.Create(context.TODO(), deployment, metav1.CreateOptions{})
}

// GetNodeLabels Return Labels for a specific node
func GetNodeLabels(c *kubernetes.Clientset, node string) (map[string]string, error) {
log.Debugf("Looking for Node labels for node - %s", node)
nodeInfo, err := c.CoreV1().Nodes().Get(context.TODO(), node, metav1.GetOptions{})
if err != nil {
return nil, err
}
return nodeInfo.GetLabels(), nil
}

// GetPodNodeInfo collects the node information for a specific pod
func GetPodNodeInfo(c *kubernetes.Clientset, dp DeploymentParams) (metrics.NodeInfo, error) {
var info metrics.NodeInfo
d, err := c.AppsV1().Deployments(dp.Namespace).Get(context.TODO(), dp.Name, metav1.GetOptions{})
if err != nil {
return info, fmt.Errorf("❌ Failure to capture deployment: %v", err)
}
selector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector)
if err != nil {
return info, fmt.Errorf("❌ Failure to capture deployment label: %v", err)
}
pods, err := c.CoreV1().Pods(dp.Namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: selector.String(), FieldSelector: "status.phase=Running"})
if err != nil {
return info, fmt.Errorf("❌ Failure to capture pods: %v", err)
}
for pod := range pods.Items {
p := pods.Items[pod]
if pods.Items[pod].DeletionTimestamp != nil {
continue
} else {
info.IP = p.Status.HostIP
info.Hostname = p.Spec.NodeName
}
}
log.Debugf("%s Running on %s with IP %s", d.Name, info.Hostname, info.IP)
return info, nil
}

// GetNakedPodNodeInfo collects the node information for a specific pod
func GetNakedPodNodeInfo(c *kubernetes.Clientset, label string) (metrics.NodeInfo, error) {
// GetPodNodeInfo collects the node information for a node running a pod with a specific label
func GetPodNodeInfo(c *kubernetes.Clientset, label string) (metrics.NodeInfo, error) {
var info metrics.NodeInfo
listOpt := metav1.ListOptions{
LabelSelector: label,
Expand All @@ -709,65 +672,29 @@ func GetNakedPodNodeInfo(c *kubernetes.Clientset, label string) (metrics.NodeInf
if err != nil {
return info, fmt.Errorf("❌ Failure to capture pods: %v", err)
}
for pod := range pods.Items {
p := pods.Items[pod]
if pods.Items[pod].DeletionTimestamp != nil {
continue
} else {
info.IP = p.Status.HostIP
info.Hostname = p.Spec.NodeName
}
}
log.Debugf("Machine with lablel %s is Running on %s with IP %s", label, info.Hostname, info.IP)
return info, nil
}

// GetPods searches for a specific set of pods from DeploymentParms
// It returns a PodList if the deployment is found.
// NOTE : Since we can update the replicas to be > 1, is why I return a PodList.
func GetPods(c *kubernetes.Clientset, dp DeploymentParams) (corev1.PodList, error) {
d, err := c.AppsV1().Deployments(dp.Namespace).Get(context.TODO(), dp.Name, metav1.GetOptions{})
npl := corev1.PodList{}
info.NodeName = pods.Items[0].Spec.NodeName
info.IP = pods.Items[0].Status.HostIP
node, err := c.CoreV1().Nodes().Get(context.TODO(), info.NodeName, metav1.GetOptions{})
if err != nil {
return npl, fmt.Errorf("❌ Failure to capture deployment: %v", err)
return info, err
}
selector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector)
if err != nil {
return npl, fmt.Errorf("❌ Failure to capture deployment label: %v", err)
}
pods, err := c.CoreV1().Pods(dp.Namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: selector.String(), FieldSelector: "status.phase=Running"})
if err != nil {
return npl, fmt.Errorf("❌ Failure to capture pods: %v", err)
}
for pod := range pods.Items {
if pods.Items[pod].DeletionTimestamp != nil {
continue
} else {
npl.Items = append(npl.Items, pods.Items[pod])
}
}
return npl, nil
info.NodeSystemInfo = node.Status.NodeInfo
log.Debugf("Machine with label %s is Running on %s with IP %s", label, info.NodeName, info.IP)
return info, nil
}

// GetNakedPods when we deploy pods without a higher-level controller like deployment
func GetNakedPods(c *kubernetes.Clientset, label string) (corev1.PodList, error) {
npl := corev1.PodList{}
// GetPods returns pods with a specific label
func GetPods(c *kubernetes.Clientset, label string) (corev1.PodList, error) {
listOpt := metav1.ListOptions{
LabelSelector: label,
FieldSelector: "status.phase=Running",
}
log.Infof("Looking for pods with label %s", fmt.Sprint(label))
pods, err := c.CoreV1().Pods(namespace).List(context.TODO(), listOpt)
if err != nil {
return npl, fmt.Errorf("❌ Failure to capture pods: %v", err)
}
for pod := range pods.Items {
if pods.Items[pod].DeletionTimestamp != nil {
continue
} else {
npl.Items = append(npl.Items, pods.Items[pod])
}
return *pods, fmt.Errorf("❌ Failure to capture pods: %v", err)
}
return npl, nil
return *pods, nil

}

Expand Down
Loading

0 comments on commit e0563da

Please sign in to comment.