Skip to content

Commit

Permalink
update concurrency limiter to ovs-vsctl
Browse files Browse the repository at this point in the history
Signed-off-by: 夜微澜 <[email protected]>
  • Loading branch information
qiutingjun committed Oct 8, 2023
1 parent 430a6b1 commit fe80853
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 45 deletions.
4 changes: 2 additions & 2 deletions cmd/daemon/cniserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ func CmdMain() {
util.LogFatalAndExit(err, "failed to do the OS initialization")
}

ovs.UpdateOVSVsctlLimiter(config.OVSVsctlConcurrency)

nicBridgeMappings, err := daemon.InitOVSBridges()
if err != nil {
util.LogFatalAndExit(err, "failed to initialize OVS bridges")
Expand Down Expand Up @@ -84,8 +86,6 @@ func CmdMain() {
util.LogFatalAndExit(err, "failed to mv cni config file")
}

ovs.InitializeOVSVsctlLimiter(config.OVSVsctlConcurrency)

mux := http.NewServeMux()
if config.EnableMetrics {
mux.Handle("/metrics", promhttp.Handler())
Expand Down
4 changes: 2 additions & 2 deletions pkg/daemon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type Configuration struct {
TCPConnCheckPort int
UDPConnCheckPort int
EnableTProxy bool
OVSVsctlConcurrency int
OVSVsctlConcurrency int32
}

// ParseFlags will parse cmd args then init kubeClient and configuration
Expand Down Expand Up @@ -103,7 +103,7 @@ func ParseFlags() *Configuration {
argTCPConnectivityCheckPort = pflag.Int("tcp-conn-check-port", 8100, "TCP connectivity Check Port")
argUDPConnectivityCheckPort = pflag.Int("udp-conn-check-port", 8101, "UDP connectivity Check Port")
argEnableTProxy = pflag.Bool("enable-tproxy", false, "enable tproxy for vpc pod liveness or readiness probe")
argOVSVsctlConcurrency = pflag.Int("ovs-vsctl-concurrency ", 100, "concurrency limit of ovs-vsctl")
argOVSVsctlConcurrency = pflag.Int32("ovs-vsctl-concurrency", 100, "concurrency limit of ovs-vsctl")
)

// mute info log for ipset lib
Expand Down
22 changes: 11 additions & 11 deletions pkg/ovs/ovs-vsctl.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ovs

import (
"context"
"fmt"
"os/exec"
"regexp"
Expand All @@ -13,22 +14,17 @@ import (
"github.com/kubeovn/kube-ovn/pkg/util"
)

var (
limiter *Limiter
concurrency = 100
)
var limiter *Limiter

func init() {
limiter = new(Limiter)
}

func InitializeOVSVsctlLimiter(c int) {
func UpdateOVSVsctlLimiter(c int32) {
if c > 0 {
concurrency = c
limiter.Update(c)
klog.V(4).Infof("update ovs-vsctl concurrency limit to %d", limiter.Limit())
}

limiter.Initialize(concurrency, time.Second)
klog.V(4).Infof("update ovs-vsctl concurrency limite to %d", limiter.Limit())
}

// Glory belongs to openvswitch/ovn-kubernetes
Expand All @@ -37,6 +33,9 @@ func InitializeOVSVsctlLimiter(c int) {
var podNetNsRegexp = regexp.MustCompile(`pod_netns="([^"]+)"`)

func Exec(args ...string) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

var (
start time.Time
elapsed float64
Expand All @@ -45,11 +44,12 @@ func Exec(args ...string) (string, error) {
err error
)

if err = limiter.Wait(); err != nil {
klog.V(4).Infof("command %s %s waiting for execution timeout by concurrency limiter of %v", OvsVsCtl, strings.Join(args, " "), limiter.Limit())
if err = limiter.Wait(ctx); err != nil {
klog.V(4).Infof("command %s %s waiting for execution timeout by concurrency limit of %d", OvsVsCtl, strings.Join(args, " "), limiter.Limit())
return "", err
}
defer limiter.Done()
klog.V(4).Infof("command %s %s waiting for execution concurrency %d/%d", OvsVsCtl, strings.Join(args, " "), limiter.Current(), limiter.Limit())

start = time.Now()
args = append([]string{"--timeout=30"}, args...)
Expand Down
58 changes: 28 additions & 30 deletions pkg/ovs/util.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package ovs

import (
"context"
"fmt"
"regexp"
"strings"
"sync/atomic"
"time"

kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
Expand Down Expand Up @@ -238,46 +240,42 @@ func (m aclMatch) String() string {
}

type Limiter struct {
limit int
timeout time.Duration
tokens chan struct{}
limit int32
current int32
}

func (l *Limiter) Initialize(limit int, timeout time.Duration) {
l.limit = limit
l.timeout = timeout
l.tokens = make(chan struct{}, l.limit)

for i := 0; i < l.limit; i++ {
l.tokens <- struct{}{}
}
}

func (l *Limiter) Limit() int {
func (l *Limiter) Limit() int32 {
return l.limit
}

func (l *Limiter) Timeout() time.Duration {
return l.timeout
func (l *Limiter) Current() int32 {
return l.current
}

func (l *Limiter) Wait() error {
if l.tokens == nil {
return nil
}
func (l *Limiter) Update(limit int32) {
l.limit = limit
}

select {
case <-l.tokens:
return nil
case <-time.After(l.timeout):
return fmt.Errorf("waiting for token timeout %d second", l.timeout)
func (l *Limiter) Wait(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return fmt.Errorf("context canceled by timeout")
default:
if l.limit == 0 {
atomic.AddInt32(&l.current, 1)
return nil
}

if atomic.LoadInt32(&l.current) < l.limit {
atomic.AddInt32(&l.current, 1)
return nil
}
time.Sleep(10 * time.Millisecond)
}
}
}

func (l *Limiter) Done() {
if l.tokens == nil {
return
}

l.tokens <- struct{}{}
atomic.AddInt32(&l.current, -1)
}
91 changes: 91 additions & 0 deletions pkg/ovs/util_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package ovs

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -229,3 +231,92 @@ func Test_OrAclMatch_Match(t *testing.T) {
require.ErrorContains(t, err, "acl rule key is required")
})
}

func Test_Limiter(t *testing.T) {
t.Parallel()

t.Run("without limit", func(t *testing.T) {
t.Parallel()

var (
limiter *Limiter
err error
)

limiter = new(Limiter)

err = limiter.Wait(context.Background())
require.NoError(t, err)
require.Equal(t, int32(1), limiter.Current())

err = limiter.Wait(context.Background())
require.NoError(t, err)
require.Equal(t, int32(2), limiter.Current())

limiter.Done()
require.Equal(t, int32(1), limiter.Current())

limiter.Done()
require.Equal(t, int32(0), limiter.Current())
})

t.Run("with limit", func(t *testing.T) {
t.Parallel()

var (
limiter *Limiter
err error
)

limiter = new(Limiter)
limiter.Update(2)

err = limiter.Wait(context.Background())
require.NoError(t, err)
require.Equal(t, int32(1), limiter.Current())

err = limiter.Wait(context.Background())
require.NoError(t, err)
require.Equal(t, int32(2), limiter.Current())

time.AfterFunc(10*time.Second, func() {
limiter.Done()
require.Equal(t, int32(1), limiter.Current())
})

err = limiter.Wait(context.Background())
require.NoError(t, err)
require.Equal(t, int32(2), limiter.Current())
})

t.Run("with timeout", func(t *testing.T) {
t.Parallel()

var (
limiter *Limiter
err error
)

limiter = new(Limiter)
limiter.Update(2)

err = limiter.Wait(context.Background())
require.NoError(t, err)
require.Equal(t, int32(1), limiter.Current())

err = limiter.Wait(context.Background())
require.NoError(t, err)
require.Equal(t, int32(2), limiter.Current())

time.AfterFunc(10*time.Second, func() {
limiter.Done()
})

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

err = limiter.Wait(ctx)
require.ErrorContains(t, err, "context canceled by timeout")
require.Equal(t, int32(2), limiter.Current())
})
}

0 comments on commit fe80853

Please sign in to comment.