Skip to content

Commit

Permalink
instancetype implement UpdateInstanceTypes and UpdateInstanceTypeOffe…
Browse files Browse the repository at this point in the history
…rings
  • Loading branch information
helen-frank committed Sep 29, 2024
1 parent 0d7d12c commit 9eb3c59
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 17 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/alibabacloud-go/darabonba-openapi/v2 v2.0.10
github.com/alibabacloud-go/ecs-20140526/v4 v4.25.1
github.com/alibabacloud-go/tea v1.2.2
github.com/alibabacloud-go/tea-utils/v2 v2.0.6
github.com/aliyun/aliyun-cli v0.0.0-20240925084117-158a70e275f0
github.com/awslabs/operatorpkg v0.0.0-20240805231134-67d0acfb6306
github.com/patrickmn/go-cache v2.1.0+incompatible
Expand All @@ -28,7 +29,6 @@ require (
github.com/alibabacloud-go/endpoint-util v1.1.0 // indirect
github.com/alibabacloud-go/openapi-util v0.1.0 // indirect
github.com/alibabacloud-go/tea-utils v1.3.1 // indirect
github.com/alibabacloud-go/tea-utils/v2 v2.0.6 // indirect
github.com/alibabacloud-go/tea-xml v1.1.3 // indirect
github.com/aliyun/credentials-go v1.3.10 // indirect
github.com/beorn7/perks v1.0.1 // indirect
Expand Down
151 changes: 135 additions & 16 deletions pkg/providers/instancetype/instancetype.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,22 @@ package instancetype

import (
"context"
"errors"
"net/http"
"sync"
"sync/atomic"

ecsclient "github.com/alibabacloud-go/ecs-20140526/v4/client"
util "github.com/alibabacloud-go/tea-utils/v2/service"
"github.com/alibabacloud-go/tea/tea"
"github.com/patrickmn/go-cache"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/utils/pretty"

"github.com/cloudpilot-ai/karpenter-provider-alicloud/pkg/apis/v1alpha1"
"github.com/cloudpilot-ai/karpenter-provider-alicloud/pkg/providers/pricing"
)

type Provider interface {
Expand All @@ -37,18 +44,18 @@ type Provider interface {
}

type DefaultProvider struct {
region string
// ec2api ec2iface.ECSAPI
region string
ecsClient *ecsclient.Client
// subnetProvider subnet.Provider
// pricingProvider pricing.Provider
pricingProvider pricing.Provider

// Values stored *before* considering insufficient capacity errors from the unavailableOfferings cache.
// Fully initialized Instance Types are also cached based on the set of all instance types, zones, unavailableOfferings cache,
// ECSNodeClass, and kubelet configuration from the NodePool

muInstanceTypeInfo sync.RWMutex
// TODO @engedaam: Look into only storing the needed ECSInstanceTypeInfo
// instanceTypesInfo []*ec2.InstanceTypeInfo

instanceTypesInfo []*ecsclient.DescribeInstanceTypesResponseBodyInstanceTypesInstanceType

muInstanceTypeOfferings sync.RWMutex
instanceTypeOfferings map[string]sets.Set[string]
Expand All @@ -63,13 +70,13 @@ type DefaultProvider struct {
instanceTypeOfferingsSeqNum uint64
}

func NewDefaultProvider(region string, instanceTypesCache *cache.Cache) *DefaultProvider {
func NewDefaultProvider(region string, ecsClient *ecsclient.Client, instanceTypesCache *cache.Cache, pricingProvider pricing.Provider) *DefaultProvider {
return &DefaultProvider{
// ec2api: ec2api,
region: region,
ecsClient: ecsClient,
region: region,
// subnetProvider: subnetProvider,
// pricingProvider: pricingProvider,
// instanceTypesInfo: []*ec2.InstanceTypeInfo{},
pricingProvider: pricingProvider,
instanceTypesInfo: []*ecsclient.DescribeInstanceTypesResponseBodyInstanceTypesInstanceType{},
instanceTypeOfferings: map[string]sets.Set[string]{},
instanceTypesCache: instanceTypesCache,
// unavailableOfferings: unavailableOfferingsCache,
Expand All @@ -78,12 +85,9 @@ func NewDefaultProvider(region string, instanceTypesCache *cache.Cache) *Default
}
}

// func (p *DefaultProvider) List(ctx context.Context, kc *v1.KubeletConfiguration, nodeClass *v1.ECSNodeClass) ([]*cloudprovider.InstanceType, error) {

func (p *DefaultProvider) LivenessProbe(req *http.Request) error {

// TODO: implement me
return nil
return p.pricingProvider.LivenessProbe(req)
}

func (p *DefaultProvider) List(ctx context.Context, kc *v1alpha1.KubeletConfiguration, nodeClass *v1alpha1.ECSNodeClass) ([]*cloudprovider.InstanceType, error) {
Expand All @@ -93,13 +97,128 @@ func (p *DefaultProvider) List(ctx context.Context, kc *v1alpha1.KubeletConfigur
}

func (p *DefaultProvider) UpdateInstanceTypes(ctx context.Context) error {
// DO NOT REMOVE THIS LOCK ----------------------------------------------------------------------------
// We lock here so that multiple callers to getInstanceTypeOfferings do not result in cache misses and multiple
// calls to ECS when we could have just made one call.

p.muInstanceTypeInfo.Lock()
defer p.muInstanceTypeInfo.Unlock()

instanceTypes, err := getAllInstanceTypes(p.ecsClient)
if err != nil {
return err
}

if p.cm.HasChanged("instance-types", instanceTypes) {
// Only update instanceTypesSeqNun with the instance types have been changed
// This is to not create new keys with duplicate instance types option
atomic.AddUint64(&p.instanceTypesSeqNum, 1)
log.FromContext(ctx).WithValues(
"count", len(instanceTypes)).V(1).Info("discovered instance types")
}
p.instanceTypesInfo = instanceTypes

// TODO: implement me
return nil
}

func (p *DefaultProvider) UpdateInstanceTypeOfferings(ctx context.Context) error {
// DO NOT REMOVE THIS LOCK ----------------------------------------------------------------------------
// We lock here so that multiple callers to getInstanceTypeOfferings do not result in cache misses and multiple
// calls to EC2 when we could have just made one call.

p.muInstanceTypeOfferings.Lock()
defer p.muInstanceTypeOfferings.Unlock()

// Get offerings from ECS
instanceTypeOfferings := map[string]sets.Set[string]{}
describeAvailableResourceRequest := &ecsclient.DescribeAvailableResourceRequest{
RegionId: tea.String(p.region),
DestinationResource: tea.String("InstanceType"),
}
runtime := &util.RuntimeOptions{}

// TODO: implement me
// TODO: we may use other better API in the future.
resp, err := p.ecsClient.DescribeAvailableResourceWithOptions(describeAvailableResourceRequest, runtime)
if err != nil {
return err
}

if resp.Body == nil || resp.Body.AvailableZones == nil || len(resp.Body.AvailableZones.AvailableZone) == 0 {
return errors.New("DescribeAvailableResourceWithOptions failed to return any instance types")
}

for _, az := range resp.Body.AvailableZones.AvailableZone {
// TODO: Later, `ClosedWithStock` will be tested to determine if `ClosedWithStock` should be added.
if *az.StatusCategory == "WithStock" { // WithStock, ClosedWithStock, WithoutStock, ClosedWithoutStock
processAvailableResources(az, instanceTypeOfferings)
}
}

if p.cm.HasChanged("instance-type-offering", instanceTypeOfferings) {
// Only update instanceTypesSeqNun with the instance type offerings have been changed
// This is to not create new keys with duplicate instance type offerings option
atomic.AddUint64(&p.instanceTypeOfferingsSeqNum, 1)
log.FromContext(ctx).WithValues("instance-type-count", len(instanceTypeOfferings)).V(1).Info("discovered offerings for instance types")
}
p.instanceTypeOfferings = instanceTypeOfferings
return nil
}

func processAvailableResources(az *ecsclient.DescribeAvailableResourceResponseBodyAvailableZonesAvailableZone, instanceTypeOfferings map[string]sets.Set[string]) {
if az.AvailableResources == nil || az.AvailableResources.AvailableResource == nil {
return
}

for _, ar := range az.AvailableResources.AvailableResource {
if ar.SupportedResources == nil || ar.SupportedResources.SupportedResource == nil {
continue
}

for _, sr := range ar.SupportedResources.SupportedResource {
// TODO: Later, `ClosedWithStock` will be tested to determine if `ClosedWithStock` should be added.
if *sr.StatusCategory == "WithStock" { // WithStock, ClosedWithStock, WithoutStock, ClosedWithoutStock
if _, ok := instanceTypeOfferings[*sr.Value]; !ok {
instanceTypeOfferings[*sr.Value] = sets.New[string]()
}
instanceTypeOfferings[*sr.Value].Insert(*az.ZoneId)
}
}
}
}

func getAllInstanceTypes(client *ecsclient.Client) ([]*ecsclient.DescribeInstanceTypesResponseBodyInstanceTypesInstanceType, error) {
var InstanceTypes []*ecsclient.DescribeInstanceTypesResponseBodyInstanceTypesInstanceType

describeInstanceTypesRequest := &ecsclient.DescribeInstanceTypesRequest{
/*
Reference: https://api.aliyun.com/api/Ecs/2014-05-26/DescribeInstanceTypes caveat:
The maximum value of Max Results (maximum number of entries per page) parameter is 100,
for users who have called this API in 2022, the maximum value of Max Results parameter is still 1600,
on and after November 15, 2023, we will reduce the maximum value of Max Results parameter to 100 for all users,
and no longer support 1600, if If you do not pass the Next Token parameter for paging when you call this API,
only the first page of the specification (no more than 100 items) will be returned by default.
*/
MaxResults: tea.Int64(100),
}
runtime := &util.RuntimeOptions{}

for {
resp, err := client.DescribeInstanceTypesWithOptions(describeInstanceTypesRequest, runtime)
if err != nil {
return nil, err
}

if resp.Body == nil || resp.Body.InstanceTypes == nil {
return nil, errors.New("DescribeInstanceTypesWithOptions failed to return any instance types")
}

if resp.Body.NextToken == nil || *resp.Body.NextToken == "" || len(resp.Body.InstanceTypes.InstanceType) == 0 {
break
}

describeInstanceTypesRequest.NextToken = resp.Body.NextToken
InstanceTypes = append(InstanceTypes, resp.Body.InstanceTypes.InstanceType...)
}

return InstanceTypes, nil
}

0 comments on commit 9eb3c59

Please sign in to comment.