Skip to content

Commit

Permalink
optimize code (#4049)
Browse files Browse the repository at this point in the history
until external_gw
---------

Signed-off-by: bobz965 <[email protected]>
Co-authored-by: 张祖建 <[email protected]>
  • Loading branch information
bobz965 and zhangzujian authored May 21, 2024
1 parent d30c1e7 commit bf1cb76
Show file tree
Hide file tree
Showing 10 changed files with 105 additions and 96 deletions.
2 changes: 1 addition & 1 deletion pkg/controller/client_go_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
)

// this file contains setup logic to initialize the myriad of places
// that client-go registers metrics. We copy the names and formats
// that client-go registers metrics. we copy the names and formats
// from Kubernetes so that we match the core controllers.

var (
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func ParseFlags() (*Configuration, error) {
var (
argOvnNbAddr = pflag.String("ovn-nb-addr", "", "ovn-nb address")
argOvnSbAddr = pflag.String("ovn-sb-addr", "", "ovn-sb address")
argOvnTimeout = pflag.Int("ovn-timeout", 60, "")
argOvnTimeout = pflag.Int("ovn-timeout", 60, "The seconds to wait ovn command timeout")
argCustCrdRetryMinDelay = pflag.Int("cust-crd-retry-min-delay", 1, "The min delay seconds between custom crd two retries")
argCustCrdRetryMaxDelay = pflag.Int("cust-crd-retry-max-delay", 20, "The max delay seconds between custom crd two retries")
argKubeConfigFile = pflag.String("kubeconfig", "", "Path to kubeconfig file with authorization and master location information. If not set use the inCluster token.")
Expand Down Expand Up @@ -163,7 +163,7 @@ func ParseFlags() (*Configuration, error) {
argExternalGatewaySwitch = pflag.String("external-gateway-switch", "external", "The name of the external gateway switch which is a ovs bridge to provide external network, default: external")
argExternalGatewayNet = pflag.String("external-gateway-net", "external", "The name of the external network which mappings with an ovs bridge, default: external")
argExternalGatewayVlanID = pflag.Int("external-gateway-vlanid", 0, "The vlanId of port ln-ovn-external, default: 0")
argNodeLocalDNSIP = pflag.String("node-local-dns-ip", "", "The node local dns ip , this feature is using the local dns cache in k8s")
argNodeLocalDNSIP = pflag.String("node-local-dns-ip", "", "The node local dns ip, this feature is using the local dns cache in k8s")

argGCInterval = pflag.Int("gc-interval", 360, "The interval between GC processes, default 360 seconds")
argInspectInterval = pflag.Int("inspect-interval", 20, "The interval between inspect processes, default 20 seconds")
Expand All @@ -176,7 +176,7 @@ func ParseFlags() (*Configuration, error) {
klogFlags := flag.NewFlagSet("klog", flag.ExitOnError)
klog.InitFlags(klogFlags)

// Sync the glog and klog flags.
// sync the glog and klog flags.
pflag.CommandLine.VisitAll(func(f1 *pflag.Flag) {
f2 := klogFlags.Lookup(f1.Name)
if f2 != nil {
Expand Down
24 changes: 12 additions & 12 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,10 @@ type Controller struct {
updateServiceQueue workqueue.RateLimitingInterface
svcKeyMutex keymutex.KeyMutex

endpointsLister v1.EndpointsLister
endpointsSynced cache.InformerSynced
updateEndpointQueue workqueue.RateLimitingInterface
epKeyMutex keymutex.KeyMutex
endpointsLister v1.EndpointsLister
endpointsSynced cache.InformerSynced
addOrUpdateEndpointQueue workqueue.RateLimitingInterface
epKeyMutex keymutex.KeyMutex

npsLister netv1.NetworkPolicyLister
npsSynced cache.InformerSynced
Expand Down Expand Up @@ -419,10 +419,10 @@ func Run(ctx context.Context, config *Configuration) {
updateServiceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateService"),
svcKeyMutex: keymutex.NewHashed(numKeyLocks),

endpointsLister: endpointInformer.Lister(),
endpointsSynced: endpointInformer.Informer().HasSynced,
updateEndpointQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateEndpoint"),
epKeyMutex: keymutex.NewHashed(numKeyLocks),
endpointsLister: endpointInformer.Lister(),
endpointsSynced: endpointInformer.Informer().HasSynced,
addOrUpdateEndpointQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateEndpoint"),
epKeyMutex: keymutex.NewHashed(numKeyLocks),

qosPoliciesLister: qosPolicyInformer.Lister(),
qosPolicySynced: qosPolicyInformer.Informer().HasSynced,
Expand Down Expand Up @@ -667,7 +667,7 @@ func Run(ctx context.Context, config *Configuration) {
UpdateFunc: controller.enqueueUpdateOvnEip,
DeleteFunc: controller.enqueueDelOvnEip,
}); err != nil {
util.LogFatalAndExit(err, "failed to add eip event handler")
util.LogFatalAndExit(err, "failed to add ovn eip event handler")
}

if _, err = ovnFipInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -822,7 +822,7 @@ func (c *Controller) shutdown() {
c.addServiceQueue.ShutDown()
c.deleteServiceQueue.ShutDown()
c.updateServiceQueue.ShutDown()
c.updateEndpointQueue.ShutDown()
c.addOrUpdateEndpointQueue.ShutDown()

c.addVlanQueue.ShutDown()
c.delVlanQueue.ShutDown()
Expand Down Expand Up @@ -920,7 +920,7 @@ func (c *Controller) startWorkers(ctx context.Context) {
go wait.Until(c.runUpdateVpcSnatWorker, time.Second, ctx.Done())
go wait.Until(c.runUpdateVpcSubnetWorker, time.Second, ctx.Done())

// add default/join subnet and wait them ready
// add default and join subnet and wait them ready
go wait.Until(c.runAddSubnetWorker, time.Second, ctx.Done())
go wait.Until(c.runAddIPPoolWorker, time.Second, ctx.Done())
go wait.Until(c.runAddVlanWorker, time.Second, ctx.Done())
Expand All @@ -932,7 +932,7 @@ func (c *Controller) startWorkers(ctx context.Context) {
return c.allSubnetReady(subnets...)
})
if err != nil {
klog.Fatalf("wait default/join subnet ready error: %v", err)
klog.Fatalf("wait default and join subnet ready, error: %v", err)
}

go wait.Until(c.runAddSgWorker, time.Second, ctx.Done())
Expand Down
12 changes: 6 additions & 6 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ import (

type fakeControllerInformers struct {
vpcInformer kubeovninformer.VpcInformer
sbunetInformer kubeovninformer.SubnetInformer
subnetInformer kubeovninformer.SubnetInformer
serviceInformer coreinformers.ServiceInformer
}

type fakeController struct {
fakeController *Controller
fakeinformers *fakeControllerInformers
fakeInformers *fakeControllerInformers
mockOvnClient *mockovs.MockNbClient
}

Expand All @@ -41,11 +41,11 @@ func newFakeController(t *testing.T) *fakeController {
kubeovnClient := kubeovnfake.NewSimpleClientset()
kubeovnInformerFactory := kubeovninformerfactory.NewSharedInformerFactory(kubeovnClient, 0)
vpcInformer := kubeovnInformerFactory.Kubeovn().V1().Vpcs()
sbunetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()
subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()

fakeInformers := &fakeControllerInformers{
vpcInformer: vpcInformer,
sbunetInformer: sbunetInformer,
subnetInformer: subnetInformer,
serviceInformer: serviceInformer,
}

Expand All @@ -56,15 +56,15 @@ func newFakeController(t *testing.T) *fakeController {
servicesLister: serviceInformer.Lister(),
vpcsLister: vpcInformer.Lister(),
vpcSynced: alwaysReady,
subnetsLister: sbunetInformer.Lister(),
subnetsLister: subnetInformer.Lister(),
subnetSynced: alwaysReady,
OVNNbClient: mockOvnClient,
syncVirtualPortsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ""),
}

return &fakeController{
fakeController: ctrl,
fakeinformers: fakeInformers,
fakeInformers: fakeInformers,
mockOvnClient: mockOvnClient,
}
}
Expand Down
45 changes: 22 additions & 23 deletions pkg/controller/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (c *Controller) enqueueAddEndpoint(obj interface{}) {
return
}
klog.V(3).Infof("enqueue add endpoint %s", key)
c.updateEndpointQueue.Add(key)
c.addOrUpdateEndpointQueue.Add(key)
}

func (c *Controller) enqueueUpdateEndpoint(oldObj, newObj interface{}) {
Expand All @@ -52,7 +52,7 @@ func (c *Controller) enqueueUpdateEndpoint(oldObj, newObj interface{}) {
return
}
klog.V(3).Infof("enqueue update endpoint %s", key)
c.updateEndpointQueue.Add(key)
c.addOrUpdateEndpointQueue.Add(key)
}

func (c *Controller) runUpdateEndpointWorker() {
Expand All @@ -61,14 +61,14 @@ func (c *Controller) runUpdateEndpointWorker() {
}

func (c *Controller) processNextUpdateEndpointWorkItem() bool {
obj, shutdown := c.updateEndpointQueue.Get()
obj, shutdown := c.addOrUpdateEndpointQueue.Get()

if shutdown {
return false
}

if err := func(obj interface{}) error {
defer c.updateEndpointQueue.Done(obj)
defer c.addOrUpdateEndpointQueue.Done(obj)

var (
key string
Expand All @@ -77,16 +77,16 @@ func (c *Controller) processNextUpdateEndpointWorkItem() bool {
)

if key, ok = obj.(string); !ok {
c.updateEndpointQueue.Forget(obj)
c.addOrUpdateEndpointQueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}

if err = c.handleUpdateEndpoint(key); err != nil {
c.updateEndpointQueue.AddRateLimited(key)
c.addOrUpdateEndpointQueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
c.updateEndpointQueue.Forget(obj)
c.addOrUpdateEndpointQueue.Forget(obj)
return nil
}(obj); err != nil {
utilruntime.HandleError(err)
Expand All @@ -104,7 +104,7 @@ func (c *Controller) handleUpdateEndpoint(key string) error {

c.epKeyMutex.LockKey(key)
defer func() { _ = c.epKeyMutex.UnlockKey(key) }()
klog.Infof("update add/update endpoint %s/%s", namespace, name)
klog.Infof("handle update endpoint %s", key)

ep, err := c.endpointsLister.Endpoints(namespace).Get(name)
if err != nil {
Expand Down Expand Up @@ -162,7 +162,7 @@ func (c *Controller) handleUpdateEndpoint(key string) error {
)

if vpc, err = c.vpcsLister.Get(vpcName); err != nil {
klog.Errorf("failed to get vpc %s of lb, %v", vpcName, err)
klog.Errorf("failed to get vpc %s, %v", vpcName, err)
return err
}

Expand All @@ -172,7 +172,7 @@ func (c *Controller) handleUpdateEndpoint(key string) error {
}
svc.Annotations[util.VpcAnnotation] = vpcName
if _, err = c.config.KubeClient.CoreV1().Services(namespace).Update(context.Background(), svc, metav1.UpdateOptions{}); err != nil {
klog.Errorf("failed to update service %s/%s: %v", namespace, svc.Name, err)
klog.Errorf("failed to update service %s: %v", key, err)
return err
}
}
Expand Down Expand Up @@ -200,14 +200,12 @@ func (c *Controller) handleUpdateEndpoint(key string) error {
backends []string
ipPortMapping, externals map[string]string
)
vip = util.JoinHostPort(lbVip, port.Port)

if !ignoreHealthCheck {
if checkIP, err = c.getHealthCheckVip(subnetName, lbVip); err != nil {
klog.Error(err)
return err
}

externals = map[string]string{
util.SwitchLBRuleSubnet: subnetName,
}
Expand All @@ -217,6 +215,7 @@ func (c *Controller) handleUpdateEndpoint(key string) error {

// for performance reason delete lb with no backends
if len(backends) != 0 {
vip = util.JoinHostPort(lbVip, port.Port)
klog.Infof("add vip endpoint %s, backends %v to LB %s", vip, backends, lb)
if err = c.OVNNbClient.LoadBalancerAddVip(lb, vip, backends...); err != nil {
klog.Errorf("failed to add vip %s with backends %s to LB %s: %v", lbVip, backends, lb, err)
Expand Down Expand Up @@ -257,7 +256,6 @@ func (c *Controller) getVpcSubnetName(pods []*v1.Pod, endpoints *v1.Endpoints, s
if len(pod.Annotations) == 0 {
continue
}

if subnetName == "" {
subnetName = pod.Annotations[util.LogicalSwitchAnnotation]
}
Expand All @@ -269,7 +267,6 @@ func (c *Controller) getVpcSubnetName(pods []*v1.Pod, endpoints *v1.Endpoints, s
if vpcName == "" {
vpcName = pod.Annotations[util.LogicalRouterAnnotation]
}

if vpcName != "" {
break LOOP
}
Expand All @@ -295,48 +292,50 @@ func (c *Controller) getVpcSubnetName(pods []*v1.Pod, endpoints *v1.Endpoints, s
return vpcName, subnetName
}

// getHealthCheckVip get health check vip for load balancer, the vip name is the subnet name
// the vip is used to check the health of the backend pod
func (c *Controller) getHealthCheckVip(subnetName, lbVip string) (string, error) {
var (
needCreateHealthCheckVip bool
checkVip *kubeovnv1.Vip
checkIP string
err error
)

checkVip, err = c.virtualIpsLister.Get(subnetName)
vipName := subnetName
checkVip, err = c.virtualIpsLister.Get(vipName)
if err != nil {
if errors.IsNotFound(err) {
needCreateHealthCheckVip = true
} else {
klog.Errorf("failed to get health check vip %s, %v", subnetName, err)
klog.Errorf("failed to get health check vip %s, %v", vipName, err)
return "", err
}
}
if needCreateHealthCheckVip {
vip := &kubeovnv1.Vip{
ObjectMeta: metav1.ObjectMeta{
Name: subnetName,
Name: vipName,
},
Spec: kubeovnv1.VipSpec{
Subnet: subnetName,
},
}
if _, err = c.config.KubeOvnClient.KubeovnV1().Vips().Create(context.Background(), vip, metav1.CreateOptions{}); err != nil {
klog.Errorf("failed to create health check vip %s, %v", subnetName, err)
klog.Errorf("failed to create health check vip %s, %v", vipName, err)
return "", err
}

// wait for vip created
time.Sleep(1 * time.Second)
checkVip, err = c.virtualIpsLister.Get(subnetName)
checkVip, err = c.virtualIpsLister.Get(vipName)
if err != nil {
klog.Errorf("failed to get health check vip %s, %v", subnetName, err)
klog.Errorf("failed to get health check vip %s, %v", vipName, err)
return "", err
}
}

if checkVip.Status.V4ip == "" && checkVip.Status.V6ip == "" {
err = fmt.Errorf("failed to get health check vip %s address", subnetName)
err = fmt.Errorf("vip %s is not ready", vipName)
klog.Error(err)
return "", err
}
Expand All @@ -348,7 +347,7 @@ func (c *Controller) getHealthCheckVip(subnetName, lbVip string) (string, error)
checkIP = checkVip.Status.V6ip
}
if checkIP == "" {
err = fmt.Errorf("failed to get health check vip subnet %s", subnetName)
err = fmt.Errorf("failed to get health check vip subnet %s", vipName)
klog.Error(err)
return "", err
}
Expand Down
Loading

0 comments on commit bf1cb76

Please sign in to comment.