diff --git a/cmd/daemon/cniserver.go b/cmd/daemon/cniserver.go index 1708df65cfbd..58d6a105df08 100644 --- a/cmd/daemon/cniserver.go +++ b/cmd/daemon/cniserver.go @@ -38,6 +38,8 @@ func CmdMain() { util.LogFatalAndExit(err, "failed to do the OS initialization") } + ovs.UpdateOVSVsctlLimiter(config.OVSVsctlConcurrency) + nicBridgeMappings, err := daemon.InitOVSBridges() if err != nil { util.LogFatalAndExit(err, "failed to initialize OVS bridges") @@ -84,8 +86,6 @@ func CmdMain() { util.LogFatalAndExit(err, "failed to mv cni config file") } - ovs.InitializeOVSVsctlLimiter(config.OVSVsctlConcurrency) - mux := http.NewServeMux() if config.EnableMetrics { mux.Handle("/metrics", promhttp.Handler()) diff --git a/pkg/daemon/config.go b/pkg/daemon/config.go index 0c4c3fbd7542..bfcfb8f689b0 100644 --- a/pkg/daemon/config.go +++ b/pkg/daemon/config.go @@ -64,7 +64,7 @@ type Configuration struct { TCPConnCheckPort int UDPConnCheckPort int EnableTProxy bool - OVSVsctlConcurrency int + OVSVsctlConcurrency int32 } // ParseFlags will parse cmd args then init kubeClient and configuration @@ -103,7 +103,7 @@ func ParseFlags() *Configuration { argTCPConnectivityCheckPort = pflag.Int("tcp-conn-check-port", 8100, "TCP connectivity Check Port") argUDPConnectivityCheckPort = pflag.Int("udp-conn-check-port", 8101, "UDP connectivity Check Port") argEnableTProxy = pflag.Bool("enable-tproxy", false, "enable tproxy for vpc pod liveness or readiness probe") - argOVSVsctlConcurrency = pflag.Int("ovs-vsctl-concurrency ", 100, "concurrency limit of ovs-vsctl") + argOVSVsctlConcurrency = pflag.Int32("ovs-vsctl-concurrency", 100, "concurrency limit of ovs-vsctl") ) // mute info log for ipset lib diff --git a/pkg/ovs/ovs-vsctl.go b/pkg/ovs/ovs-vsctl.go index d6bf101d5098..0528694be9eb 100644 --- a/pkg/ovs/ovs-vsctl.go +++ b/pkg/ovs/ovs-vsctl.go @@ -1,6 +1,7 @@ package ovs import ( + "context" "fmt" "os/exec" "regexp" @@ -13,22 +14,17 @@ import ( "github.com/kubeovn/kube-ovn/pkg/util" ) -var ( - limiter *Limiter - concurrency = 100 -) +var limiter *Limiter func init() { limiter = new(Limiter) } -func InitializeOVSVsctlLimiter(c int) { +func UpdateOVSVsctlLimiter(c int32) { if c > 0 { - concurrency = c + limiter.Update(c) + klog.V(4).Infof("update ovs-vsctl concurrency limit to %d", limiter.Limit()) } - - limiter.Initialize(concurrency, time.Second) - klog.V(4).Infof("update ovs-vsctl concurrency limite to %d", limiter.Limit()) } // Glory belongs to openvswitch/ovn-kubernetes @@ -37,6 +33,9 @@ func InitializeOVSVsctlLimiter(c int) { var podNetNsRegexp = regexp.MustCompile(`pod_netns="([^"]+)"`) func Exec(args ...string) (string, error) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + var ( start time.Time elapsed float64 @@ -45,11 +44,12 @@ func Exec(args ...string) (string, error) { err error ) - if err = limiter.Wait(); err != nil { - klog.V(4).Infof("command %s %s waiting for execution timeout by concurrency limiter of %v", OvsVsCtl, strings.Join(args, " "), limiter.Limit()) + if err = limiter.Wait(ctx); err != nil { + klog.V(4).Infof("command %s %s waiting for execution timeout by concurrency limit of %d", OvsVsCtl, strings.Join(args, " "), limiter.Limit()) return "", err } defer limiter.Done() + klog.V(4).Infof("command %s %s waiting for execution concurrency %d/%d", OvsVsCtl, strings.Join(args, " "), limiter.Current(), limiter.Limit()) start = time.Now() args = append([]string{"--timeout=30"}, args...) diff --git a/pkg/ovs/util.go b/pkg/ovs/util.go index 96a322df9c1d..0bc9f98dd402 100644 --- a/pkg/ovs/util.go +++ b/pkg/ovs/util.go @@ -1,9 +1,11 @@ package ovs import ( + "context" "fmt" "regexp" "strings" + "sync/atomic" "time" kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1" @@ -238,46 +240,42 @@ func (m aclMatch) String() string { } type Limiter struct { - limit int - timeout time.Duration - tokens chan struct{} + limit int32 + current int32 } -func (l *Limiter) Initialize(limit int, timeout time.Duration) { - l.limit = limit - l.timeout = timeout - l.tokens = make(chan struct{}, l.limit) - - for i := 0; i < l.limit; i++ { - l.tokens <- struct{}{} - } -} - -func (l *Limiter) Limit() int { +func (l *Limiter) Limit() int32 { return l.limit } -func (l *Limiter) Timeout() time.Duration { - return l.timeout +func (l *Limiter) Current() int32 { + return l.current } -func (l *Limiter) Wait() error { - if l.tokens == nil { - return nil - } +func (l *Limiter) Update(limit int32) { + l.limit = limit +} - select { - case <-l.tokens: - return nil - case <-time.After(l.timeout): - return fmt.Errorf("waiting for token timeout %d second", l.timeout) +func (l *Limiter) Wait(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return fmt.Errorf("context canceled by timeout") + default: + if l.limit == 0 { + atomic.AddInt32(&l.current, 1) + return nil + } + + if atomic.LoadInt32(&l.current) < l.limit { + atomic.AddInt32(&l.current, 1) + return nil + } + time.Sleep(10 * time.Millisecond) + } } } func (l *Limiter) Done() { - if l.tokens == nil { - return - } - - l.tokens <- struct{}{} + atomic.AddInt32(&l.current, -1) } diff --git a/pkg/ovs/util_test.go b/pkg/ovs/util_test.go index 0fd0c57e9e26..9a54a9fd8013 100644 --- a/pkg/ovs/util_test.go +++ b/pkg/ovs/util_test.go @@ -1,7 +1,9 @@ package ovs import ( + "context" "testing" + "time" "github.com/stretchr/testify/require" ) @@ -229,3 +231,92 @@ func Test_OrAclMatch_Match(t *testing.T) { require.ErrorContains(t, err, "acl rule key is required") }) } + +func Test_Limiter(t *testing.T) { + t.Parallel() + + t.Run("without limit", func(t *testing.T) { + t.Parallel() + + var ( + limiter *Limiter + err error + ) + + limiter = new(Limiter) + + err = limiter.Wait(context.Background()) + require.NoError(t, err) + require.Equal(t, int32(1), limiter.Current()) + + err = limiter.Wait(context.Background()) + require.NoError(t, err) + require.Equal(t, int32(2), limiter.Current()) + + limiter.Done() + require.Equal(t, int32(1), limiter.Current()) + + limiter.Done() + require.Equal(t, int32(0), limiter.Current()) + }) + + t.Run("with limit", func(t *testing.T) { + t.Parallel() + + var ( + limiter *Limiter + err error + ) + + limiter = new(Limiter) + limiter.Update(2) + + err = limiter.Wait(context.Background()) + require.NoError(t, err) + require.Equal(t, int32(1), limiter.Current()) + + err = limiter.Wait(context.Background()) + require.NoError(t, err) + require.Equal(t, int32(2), limiter.Current()) + + time.AfterFunc(10*time.Second, func() { + limiter.Done() + require.Equal(t, int32(1), limiter.Current()) + }) + + err = limiter.Wait(context.Background()) + require.NoError(t, err) + require.Equal(t, int32(2), limiter.Current()) + }) + + t.Run("with timeout", func(t *testing.T) { + t.Parallel() + + var ( + limiter *Limiter + err error + ) + + limiter = new(Limiter) + limiter.Update(2) + + err = limiter.Wait(context.Background()) + require.NoError(t, err) + require.Equal(t, int32(1), limiter.Current()) + + err = limiter.Wait(context.Background()) + require.NoError(t, err) + require.Equal(t, int32(2), limiter.Current()) + + time.AfterFunc(10*time.Second, func() { + limiter.Done() + }) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + err = limiter.Wait(ctx) + require.ErrorContains(t, err, "context canceled by timeout") + require.Equal(t, int32(2), limiter.Current()) + }) +}