Skip to content

Commit

Permalink
Added route table API locking (#48)
Browse files Browse the repository at this point in the history
  • Loading branch information
zuzzas authored Jan 18, 2023
1 parent 3ba56d8 commit 9f7e69a
Showing 1 changed file with 25 additions and 29 deletions.
54 changes: 25 additions & 29 deletions pkg/cloudprovider/yandex/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package yandex
import (
"context"
"fmt"
"sync"

"github.com/pkg/errors"
"github.com/yandex-cloud/go-genproto/yandex/cloud/operation"
"github.com/yandex-cloud/go-genproto/yandex/cloud/vpc/v1"
"google.golang.org/genproto/protobuf/field_mask"
Expand All @@ -18,7 +20,18 @@ const (
cpiNodeRoleLabel = cpiRouteLabelsPrefix + "node-role" // we store Node's name here. The reason for this is lost in time (like tears in rain).
)

// these may get called in parallel, but since we have to modify the whole Route Table, we'll synchronize operations
var routeAPILock sync.Mutex

func (yc *Cloud) ListRoutes(ctx context.Context, _ string) ([]*cloudprovider.Route, error) {
klog.Info("ListRoutes called")

if routeAPILock.TryLock() {
routeAPILock.Unlock()
} else {
return nil, errors.New("VPC route API locked")
}

req := &vpc.GetRouteTableRequest{
RouteTableId: yc.config.RouteTableID,
}
Expand All @@ -39,35 +52,6 @@ func (yc *Cloud) ListRoutes(ctx context.Context, _ string) ([]*cloudprovider.Rou
continue
}

// let's verify NextHop relevance
currentNextHop := staticRoute.NextHop.(*vpc.StaticRoute_NextHopAddress).NextHopAddress
internalIP, err := yc.getInternalIpByNodeName(nodeName)
if err != nil {
klog.Infof("Failed to verify NextHop relevance: %s", err)
} else if currentNextHop != internalIP {
klog.Warningf("Changing %q's NextHop from %s to %s", nodeName, currentNextHop, internalIP)

filteredStaticRoutes := filterStaticRoutes(routeTable.StaticRoutes, routeFilterTerm{
termType: routeFilterAddOrUpdate,
nodeName: nodeName,
destinationCIDR: staticRoute.Destination.(*vpc.StaticRoute_DestinationPrefix).DestinationPrefix,
nextHop: internalIP,
})

req := &vpc.UpdateRouteTableRequest{
RouteTableId: yc.config.RouteTableID,
UpdateMask: &field_mask.FieldMask{
Paths: []string{"static_routes"},
},
StaticRoutes: filteredStaticRoutes,
}

_, _, err := yc.yandexService.OperationWaiter(ctx, func() (*operation.Operation, error) { return yc.yandexService.VPCSvc.RouteTableSvc.Update(ctx, req) })
if err != nil {
return nil, err
}
}

cpiRoutes = append(cpiRoutes, &cloudprovider.Route{
Name: nodeName,
TargetNode: types.NodeName(nodeName),
Expand All @@ -81,6 +65,12 @@ func (yc *Cloud) ListRoutes(ctx context.Context, _ string) ([]*cloudprovider.Rou
func (yc *Cloud) CreateRoute(ctx context.Context, _ string, _ string, route *cloudprovider.Route) error {
klog.Infof("CreateRoute called with %+v", *route)

if routeAPILock.TryLock() {
defer routeAPILock.Unlock()
} else {
return errors.New("VPC route API locked")
}

rt, err := yc.yandexService.VPCSvc.RouteTableSvc.Get(ctx, &vpc.GetRouteTableRequest{RouteTableId: yc.config.RouteTableID})
if err != nil {
return err
Expand Down Expand Up @@ -114,6 +104,12 @@ func (yc *Cloud) CreateRoute(ctx context.Context, _ string, _ string, route *clo
func (yc *Cloud) DeleteRoute(ctx context.Context, _ string, route *cloudprovider.Route) error {
klog.Infof("DeleteRoute called with %+v", *route)

if routeAPILock.TryLock() {
defer routeAPILock.Unlock()
} else {
return errors.New("VPC route API locked")
}

rt, err := yc.yandexService.VPCSvc.RouteTableSvc.Get(ctx, &vpc.GetRouteTableRequest{RouteTableId: yc.config.RouteTableID})
if err != nil {
return err
Expand Down

0 comments on commit 9f7e69a

Please sign in to comment.