Skip to content

Commit

Permalink
detect fargate node size
Browse files Browse the repository at this point in the history
  • Loading branch information
alexei-led committed May 18, 2023
1 parent dcba20e commit a2968e7
Show file tree
Hide file tree
Showing 3 changed files with 252 additions and 19 deletions.
4 changes: 2 additions & 2 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (s *scanner) Run(ctx context.Context, log *logrus.Entry, nodeInformer Nodes
// convert PodInfo to usage record
endTime := time.Now()
beginTime := endTime.Add(-60 * time.Minute)
record := usage.NewPodInfo(pod, beginTime, endTime, node)
record := usage.GetPodInfo(log, pod, beginTime, endTime, node)
// upload the record to EKS Lens
log.WithField("pod", record.Name).Debug("uploading one pod record to EKS Lens")
err := s.uploader.UploadOne(ctx, record)
Expand Down Expand Up @@ -113,7 +113,7 @@ func (s *scanner) Run(ctx context.Context, log *logrus.Entry, nodeInformer Nodes
if !ok {
log.Warnf("getting node %s from cache", pod.Spec.NodeName)
}
record := usage.NewPodInfo(pod, beginTime, now, node)
record := usage.GetPodInfo(log, pod, beginTime, now, node)
records = append(records, record)
}
// upload the records to EKS Lens
Expand Down
73 changes: 67 additions & 6 deletions internal/usage/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,20 @@ package usage
import (
"fmt"
"math"
"strconv"
"strings"
"time"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
)

const (
fargateType = "fargate"
)

type Allocation struct {
// CPU fraction of total CPU
CPU float64 `json:"cpu"`
Expand Down Expand Up @@ -117,7 +124,7 @@ func NodeInfoFromNode(cluster string, node *v1.Node) NodeInfo {
// assume fargate and get fargate profile name from node label
nodegroup = node.GetLabels()["eks.amazonaws.com/fargate-profile"]
if nodegroup == "" {
nodegroup = "fargate"
nodegroup = fargateType
}
}

Expand All @@ -133,10 +140,10 @@ func NodeInfoFromNode(cluster string, node *v1.Node) NodeInfo {
instanceType = node.GetLabels()["node.kubernetes.io/instance-type"]
// if empty, assume fargate and build instance type based on pattern "fargate-vCPU-memoryGB" where memory is rounded to GiB
if instanceType == "" {
// get memory in rounded GB
memory := float64(node.Status.Capacity.Memory().Value())
memoryGB := math.Round(memory / 1024 / 1024 / 1024) //nolint:gomnd
instanceType = fmt.Sprintf("fargate-%dvCPU-%dGB", node.Status.Capacity.Cpu().Value(), int(memoryGB))
// get memory in GiB
memory := float64(node.Status.Capacity.Memory().ScaledValue(resource.Giga))
// construct instance type based on pattern "fargate-vCPU-memoryGB"
instanceType = fmt.Sprintf("fargate-%dvCPU-%dGB", node.Status.Capacity.Cpu().Value(), int(memory))
}
}

Expand Down Expand Up @@ -177,7 +184,7 @@ func NodeInfoFromNode(cluster string, node *v1.Node) NodeInfo {
return result
}

func NewPodInfo(pod *v1.Pod, beginTime, endTime time.Time, node *NodeInfo) *PodInfo {
func GetPodInfo(log *logrus.Entry, pod *v1.Pod, beginTime, endTime time.Time, node *NodeInfo) *PodInfo {
record := &PodInfo{}
record.Name = pod.GetName()
record.Namespace = pod.GetNamespace()
Expand Down Expand Up @@ -214,6 +221,11 @@ func NewPodInfo(pod *v1.Pod, beginTime, endTime time.Time, node *NodeInfo) *PodI
record.BeginTime = record.StartTime
}
if node != nil {
// patch fargate node info from pod annotations, if needed
err := patchFargateNodeInfo(pod, node)
if err != nil {
log.WithError(err).WithField("node", node.Name).Warn("failed to patch fargate node info")
}
record.Node = *node
// calculate pod's allocation requests as a percentage of node's allocatable resources
record.Allocations.Requests.CPU = float64(record.Resources.Requests.CPU) / float64(node.Allocatable.CPU) * 100 //nolint:gomnd
Expand Down Expand Up @@ -242,3 +254,52 @@ func NewPodInfo(pod *v1.Pod, beginTime, endTime time.Time, node *NodeInfo) *PodI
}
return record
}

func patchFargateNodeInfo(pod *v1.Pod, node *NodeInfo) error {
if node.ComputeType != fargateType {
return nil
}
// get CPU and memory from pod annotation "CapacityProvisioned": "0.25vCPU 0.5GB",
// and patch node allocatable CPU and memory
if capacityProvisioned, ok := pod.Annotations["CapacityProvisioned"]; ok {
cpu, memory, err := parseCapacityProvisioned(capacityProvisioned)
if err != nil {
return errors.Wrap(err, "failed to parse capacity provisioned")
}
node.Allocatable.CPU = cpu
// 256MB is reserved for Kubernetes components on Fargate, so we need to subtract it from allocatable memory
node.Allocatable.Memory = memory - 256*int64(math.Pow10(6)) //nolint:gomnd
// update node instance type: "fargate-{CapacityProvisioned}", e.g. "fargate-0.25vCPU-0.5GB"
node.InstanceType = fmt.Sprintf("%s-%s", fargateType, strings.ReplaceAll(capacityProvisioned, " ", "-"))
}
return nil
}

func parseCapacityProvisioned(capacityProvisioned string) (int64, int64, error) {
// split capacity provisioned string by space
capacity := strings.Split(capacityProvisioned, " ")
if len(capacity) != 2 { //nolint:gomnd
return 0, 0, errors.Errorf("invalid capacity provisioned string: %s", capacityProvisioned)
}
// parse CPU
// remove "vCPU" suffix
capacity[0] = strings.TrimSuffix(capacity[0], "vCPU")
// convert CPU value to float and if it is less than 1 multiplies it by 1000 add "m" suffix
if cpu, err := strconv.ParseFloat(capacity[0], 64); err == nil && cpu < 1 {
capacity[0] = strconv.Itoa(int(cpu * 1000)) //nolint:gomnd
capacity[0] += "m"
}

cpu, err := resource.ParseQuantity(capacity[0])
if err != nil {
return 0, 0, errors.Wrap(err, "invalid CPU capacity provisioned")
}
// parse memory
// remove "B" suffix (GB, MB, KB)
capacity[1] = strings.TrimSuffix(capacity[1], "B")
memory, err := resource.ParseQuantity(capacity[1])
if err != nil {
return 0, 0, errors.Wrap(err, "invalid memory capacity provisioned")
}
return cpu.MilliValue(), memory.Value(), nil
}
194 changes: 183 additions & 11 deletions internal/usage/record_test.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
package usage

import (
"math"
"reflect"
"testing"
"time"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestNewPodInfo(t *testing.T) {
func TestGetPodInfo(t *testing.T) {
type args struct {
log *logrus.Entry
pod *v1.Pod
beginTime time.Time
endTime time.Time
Expand All @@ -25,6 +29,7 @@ func TestNewPodInfo(t *testing.T) {
{
name: "full record test",
args: args{
log: logrus.NewEntry(logrus.New()),
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
Expand Down Expand Up @@ -149,43 +154,210 @@ func TestNewPodInfo(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := NewPodInfo(tt.args.pod, tt.args.beginTime, tt.args.endTime, tt.args.node)
got := GetPodInfo(tt.args.log, tt.args.pod, tt.args.beginTime, tt.args.endTime, tt.args.node)

if got.Name != tt.want.Name {
t.Errorf("NewPodInfo().Name = %v, want %v", got.Name, tt.want.Name)
t.Errorf("GetPodInfo().Name = %v, want %v", got.Name, tt.want.Name)
}

if got.Namespace != tt.want.Namespace {
t.Errorf("NewPodInfo().Namespace = %v, want %v", got.Namespace, tt.want.Namespace)
t.Errorf("GetPodInfo().Namespace = %v, want %v", got.Namespace, tt.want.Namespace)
}

if got.QosClass != tt.want.QosClass {
t.Errorf("NewPodInfo().QosClass = %v, want %v", got.QosClass, tt.want.QosClass)
t.Errorf("GetPodInfo().QosClass = %v, want %v", got.QosClass, tt.want.QosClass)
}

if !reflect.DeepEqual(got.Allocations, tt.want.Allocations) {
t.Errorf("NewPodInfo().Allocations = %v, want %v", got.Allocations, tt.want.Allocations)
t.Errorf("GetPodInfo().Allocations = %v, want %v", got.Allocations, tt.want.Allocations)
}

if !reflect.DeepEqual(got.Resources, tt.want.Resources) {
t.Errorf("NewPodInfo().Resources = %v, want %v", got.Resources, tt.want.Resources)
t.Errorf("GetPodInfo().Resources = %v, want %v", got.Resources, tt.want.Resources)
}

if got.BeginTime != tt.want.BeginTime {
t.Errorf("NewPodInfo().BegibTime = %v, want %v", got.BeginTime, tt.want.BeginTime)
t.Errorf("GetPodInfo().BegibTime = %v, want %v", got.BeginTime, tt.want.BeginTime)
}

if got.EndTime != tt.want.EndTime {
t.Errorf("NewPodInfo().EndTime = %v, want %v", got.EndTime, tt.want.EndTime)
t.Errorf("GetPodInfo().EndTime = %v, want %v", got.EndTime, tt.want.EndTime)
}

if !reflect.DeepEqual(got.Labels, tt.want.Labels) {
t.Errorf("NewPodInfo().Labels = %v, want %v", got.Labels, tt.want.Labels)
t.Errorf("GetPodInfo().Labels = %v, want %v", got.Labels, tt.want.Labels)
}

if !reflect.DeepEqual(got.Node, tt.want.Node) {
t.Errorf("NewPodInfo().Node = %v, want %v", got.Node, tt.want.Node)
t.Errorf("GetPodInfo().Node = %v, want %v", got.Node, tt.want.Node)
}
})
}
}

func TestParseCapacityProvisioned(t *testing.T) {
tests := []struct {
input string
expectedCPUMilliValue int64
expectedMemoryValue int64
expectedError error
}{
{
input: "0.25 200MB",
expectedCPUMilliValue: 250,
expectedMemoryValue: 200 * int64(math.Pow10(6)), // 200MB
expectedError: nil,
},
{
input: "100m 200Mi",
expectedCPUMilliValue: 100,
expectedMemoryValue: 200 * (1 << 20), // 200Mi
expectedError: nil,
},
{
input: "2vCPU 400MB",
expectedCPUMilliValue: 2000,
expectedMemoryValue: 400 * int64(math.Pow10(6)), // 400MB
expectedError: nil,
},
{
input: "0.25vCPU 0.5GB",
expectedCPUMilliValue: 250,
expectedMemoryValue: 5 * int64(math.Pow10(8)), // 0.5GB
expectedError: nil,
},
{
input: "1 1Gi",
expectedCPUMilliValue: 1000,
expectedMemoryValue: 1 << 30, // 1Gi
expectedError: nil,
},
{
input: "500m",
expectedCPUMilliValue: 0,
expectedMemoryValue: 0,
expectedError: errors.Errorf("invalid capacity provisioned string: 500m"),
},
{
input: "1.5 2Gi",
expectedCPUMilliValue: 1500,
expectedMemoryValue: 2 * (1 << 30), // 2Gi
expectedError: nil,
},
}

for _, test := range tests {
cpu, memory, err := parseCapacityProvisioned(test.input)

if err != nil && test.expectedError == nil {
t.Errorf("Unexpected error. Input: %s, Error: %v", test.input, err)
} else if err == nil && test.expectedError != nil {
t.Errorf("Expected error not returned. Input: %s, Expected Error: %v", test.input, test.expectedError)
} else if err != nil && test.expectedError != nil && err.Error() != test.expectedError.Error() {
t.Errorf("Error mismatch. Input: %s, Expected Error: %v, Got Error: %v", test.input, test.expectedError, err)
}

if cpu != test.expectedCPUMilliValue {
t.Errorf("CPU value mismatch. Input: %s, Expected: %d, Got: %d", test.input, test.expectedCPUMilliValue, cpu)
}

if memory != test.expectedMemoryValue {
t.Errorf("Memory value mismatch. Input: %s, Expected: %d, Got: %d", test.input, test.expectedMemoryValue, memory)
}
}
}

func TestPatchFargateNodeInfo(t *testing.T) {
tests := []struct {
pod *v1.Pod
node *NodeInfo
expectedNodeAllocatable NodeInfo
expectedError error
}{
{
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
"CapacityProvisioned": "0.25vCPU 0.5GB",
},
},
},
node: &NodeInfo{
ComputeType: fargateType,
InstanceType: "fargate-1.0",
Allocatable: Capacity{
CPU: 0,
Memory: 0,
},
},
expectedNodeAllocatable: NodeInfo{
ComputeType: fargateType,
InstanceType: "fargate-0.25vCPU-0.5GB",
Allocatable: Capacity{
CPU: 250,
Memory: 244 * int64(math.Pow10(6)), // 0.244GB
},
},
expectedError: nil,
},
{
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
"CapacityProvisioned": "invalid",
},
},
},
node: &NodeInfo{
ComputeType: fargateType,
Allocatable: Capacity{
CPU: 0,
Memory: 0,
},
},
expectedNodeAllocatable: NodeInfo{
ComputeType: fargateType,
Allocatable: Capacity{
CPU: 0,
Memory: 0,
},
},
expectedError: errors.Wrap(errors.New("invalid CPU capacity provisioned"), "failed to parse capacity provisioned"),
},
{
pod: &v1.Pod{},
node: &NodeInfo{
ComputeType: "non-fargate",
Allocatable: Capacity{
CPU: 0,
Memory: 0,
},
},
expectedNodeAllocatable: NodeInfo{
ComputeType: "non-fargate",
Allocatable: Capacity{
CPU: 0,
Memory: 0,
},
},
expectedError: nil,
},
}

for _, test := range tests {
err := patchFargateNodeInfo(test.pod, test.node)

if err != nil && test.expectedError == nil {
t.Errorf("Unexpected error. Pod: %+v, Node: %+v, Error: %v", test.pod, test.node, err)
} else if err == nil && test.expectedError != nil {
t.Errorf("Expected error not returned. Pod: %+v, Node: %+v, Expected Error: %v", test.pod, test.node, test.expectedError)
}

if test.node.Allocatable.CPU != test.expectedNodeAllocatable.Allocatable.CPU {
t.Errorf("CPU value mismatch. Pod: %+v, Node: %+v, Expected CPU: %d, Got CPU: %d", test.pod, test.node, test.expectedNodeAllocatable.Allocatable.CPU, test.node.Allocatable.CPU)
}
if test.node.Allocatable.Memory != test.expectedNodeAllocatable.Allocatable.Memory {
t.Errorf("Memory value mismatch. Pod: %+v, Node: %+v, Expected Memory: %d, Got Memory: %d", test.pod, test.node, test.expectedNodeAllocatable.Allocatable.Memory, test.node.Allocatable.Memory)
}
}
}

0 comments on commit a2968e7

Please sign in to comment.