Skip to content

Commit

Permalink
Merge pull request #121 from helen-frank/feat/instance_cache
Browse files Browse the repository at this point in the history
feat: instance use cache
  • Loading branch information
jwcesign authored Nov 18, 2024
2 parents 407d77b + c856ba3 commit b85865c
Showing 1 changed file with 44 additions and 29 deletions.
73 changes: 44 additions & 29 deletions pkg/providers/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ import (
"math"
"net/http"
"strings"
"time"

ecsclient "github.com/alibabacloud-go/ecs-20140526/v4/client"
util "github.com/alibabacloud-go/tea-utils/v2/service"
"github.com/alibabacloud-go/tea/tea"
"github.com/patrickmn/go-cache"
"github.com/samber/lo"
"go.uber.org/multierr"
corev1 "k8s.io/api/core/v1"
Expand All @@ -48,6 +50,7 @@ const (
// TODO: After that open up the configuration options
instanceTypeFlexibilityThreshold = 5 // falling back to on-demand without flexibility risks insufficient capacity errors
maxInstanceTypes = 20
instanceCacheExpiration = 15 * time.Second
)

type Provider interface {
Expand All @@ -59,8 +62,9 @@ type Provider interface {
}

type DefaultProvider struct {
ecsClient *ecsclient.Client
region string
ecsClient *ecsclient.Client
region string
instanceCache *cache.Cache

imageFamilyResolver imagefamily.Resolver
vSwitchProvider vswitch.Provider
Expand All @@ -70,14 +74,17 @@ type DefaultProvider struct {
func NewDefaultProvider(region string, ecsClient *ecsclient.Client,
imageFamilyResolver imagefamily.Resolver, vSwitchProvider vswitch.Provider,
ackProvider ack.Provider) *DefaultProvider {
return &DefaultProvider{
ecsClient: ecsClient,
region: region,
p := &DefaultProvider{
ecsClient: ecsClient,
region: region,
instanceCache: cache.New(instanceCacheExpiration, instanceCacheExpiration),

imageFamilyResolver: imageFamilyResolver,
vSwitchProvider: vSwitchProvider,
ackProvider: ackProvider,
}

return p
}

func (p *DefaultProvider) Create(ctx context.Context, nodeClass *v1alpha1.ECSNodeClass, nodeClaim *karpv1.NodeClaim,
Expand All @@ -102,34 +109,26 @@ func (p *DefaultProvider) Create(ctx context.Context, nodeClass *v1alpha1.ECSNod
}

func (p *DefaultProvider) Get(ctx context.Context, id string) (*Instance, error) {
describeInstancesRequest := &ecsclient.DescribeInstancesRequest{
RegionId: tea.String(p.region),
InstanceIds: tea.String("[\"" + id + "\"]"),
if instance, ok := p.instanceCache.Get(id); ok {
return instance.(*Instance), nil
}
runtime := &util.RuntimeOptions{}

resp, err := p.ecsClient.DescribeInstancesWithOptions(describeInstancesRequest, runtime)
// List all instances to update the cache
instances, err := p.List(ctx)
if err != nil {
return nil, err
}
p.syncAllInstances(instances)

if resp == nil || resp.Body == nil || resp.Body.Instances == nil {
return nil, fmt.Errorf("failed to get instance %s, %s", id, tea.Prettify(resp))
currentInstance, ok := p.instanceCache.Get(id)
if ok {
return currentInstance.(*Instance), nil
}

// If the instance size is 0, which means it's deleted, return notfound error
if len(resp.Body.Instances.Instance) == 0 {
return nil, cloudprovider.NewNodeClaimNotFoundError(alierrors.WithRequestID(tea.StringValue(resp.Body.RequestId), fmt.Errorf("expected a single instance with id %s", id)))
}

if len(resp.Body.Instances.Instance) != 1 {
return nil, alierrors.WithRequestID(tea.StringValue(resp.Body.RequestId), fmt.Errorf("expected a single instance with id %s, got %d", id, len(resp.Body.Instances.Instance)))
}

return NewInstance(resp.Body.Instances.Instance[0]), nil
return nil, cloudprovider.NewNodeClaimNotFoundError(fmt.Errorf("instance not found"))
}

func (p *DefaultProvider) List(ctx context.Context) ([]*Instance, error) {
func (p *DefaultProvider) list(ctx context.Context) ([]*Instance, error) {
var instances []*Instance

describeInstancesRequest := &ecsclient.DescribeInstancesRequest{
Expand Down Expand Up @@ -174,6 +173,16 @@ func (p *DefaultProvider) List(ctx context.Context) ([]*Instance, error) {
return instances, nil
}

func (p *DefaultProvider) List(ctx context.Context) ([]*Instance, error) {
instances, err := p.list(ctx)
if err != nil {
return nil, err
}
p.syncAllInstances(instances)

return instances, nil
}

func (p *DefaultProvider) Delete(ctx context.Context, id string) error {
instance, err := p.Get(ctx, id)
if err != nil {
Expand Down Expand Up @@ -207,6 +216,7 @@ func (p *DefaultProvider) Delete(ctx context.Context, id string) error {
return fmt.Errorf("terminating instance id: %s, %w", id, err)
}

p.instanceCache.Delete(id)
return nil
}

Expand Down Expand Up @@ -234,7 +244,13 @@ func (p *DefaultProvider) CreateTags(ctx context.Context, id string, tags map[st
return fmt.Errorf("tagging instance, %w", err)
}

return nil
instances, err := p.list(ctx)
if err != nil {
return err
}
p.syncAllInstances(instances)

return err
}

// filterInstanceTypes is used to provide filtering on the list of potential instance types to further limit it to those
Expand Down Expand Up @@ -580,9 +596,8 @@ func (p *DefaultProvider) getVSwitchID(instanceType *cloudprovider.InstanceType,
return cheapestVSwitchID
}

type LaunchTemplate struct {
InstanceTypes []*cloudprovider.InstanceType
ImageID string
SecurityGroupIds []*string
SystemDisk *v1alpha1.SystemDisk
func (p *DefaultProvider) syncAllInstances(instances []*Instance) {
for _, instance := range instances {
p.instanceCache.Set(instance.ID, instance, cache.DefaultExpiration)
}
}

0 comments on commit b85865c

Please sign in to comment.