Skip to content

Commit

Permalink
proxy-agent: support new SyncForever mode.
Browse files Browse the repository at this point in the history
This is a minimal proxy-agent half of
kubernetes-sigs#273

It is already useful, in the scenario where proxy-server are restarted.
  • Loading branch information
jkh52 committed Nov 22, 2021
1 parent fc56f4b commit 78b65d9
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 10 deletions.
6 changes: 6 additions & 0 deletions cmd/agent/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type GrpcProxyAgentOptions struct {
// blocking call has its own problems, so it cannot easily be made race condition safe.
// The check is an "unlocked" read but is still use at your own peril.
WarnOnChannelLimit bool

SyncForever bool
}

func (o *GrpcProxyAgentOptions) ClientSetConfig(dialOptions ...grpc.DialOption) *agent.ClientSetConfig {
Expand All @@ -66,6 +68,7 @@ func (o *GrpcProxyAgentOptions) ClientSetConfig(dialOptions ...grpc.DialOption)
DialOptions: dialOptions,
ServiceAccountTokenPath: o.ServiceAccountTokenPath,
WarnOnChannelLimit: o.WarnOnChannelLimit,
SyncForever: o.SyncForever,
}
}

Expand All @@ -89,6 +92,7 @@ func (o *GrpcProxyAgentOptions) Flags() *pflag.FlagSet {
flags.StringVar(&o.ServiceAccountTokenPath, "service-account-token-path", o.ServiceAccountTokenPath, "If non-empty proxy agent uses this token to prove its identity to the proxy server.")
flags.StringVar(&o.AgentIdentifiers, "agent-identifiers", o.AgentIdentifiers, "Identifiers of the agent that will be used by the server when choosing agent. N.B. the list of identifiers must be in URL encoded format. e.g.,host=localhost&host=node1.mydomain.com&cidr=127.0.0.1/16&ipv4=1.2.3.4&ipv4=5.6.7.8&ipv6=:::::&default-route=true")
flags.BoolVar(&o.WarnOnChannelLimit, "warn-on-channel-limit", o.WarnOnChannelLimit, "Turns on a warning if the system is going to push to a full channel. The check involves an unsafe read.")
flags.BoolVar(&o.SyncForever, "sync-forever", o.SyncForever, "If true, the agent continues syncing, in order to support server count changes.")
return flags
}

Expand All @@ -111,6 +115,7 @@ func (o *GrpcProxyAgentOptions) Print() {
klog.V(1).Infof("ServiceAccountTokenPath set to %q.\n", o.ServiceAccountTokenPath)
klog.V(1).Infof("AgentIdentifiers set to %s.\n", util.PrettyPrintURL(o.AgentIdentifiers))
klog.V(1).Infof("WarnOnChannelLimit set to %t.\n", o.WarnOnChannelLimit)
klog.V(1).Infof("SyncForever set to %v.\n", o.SyncForever)
}

func (o *GrpcProxyAgentOptions) Validate() error {
Expand Down Expand Up @@ -199,6 +204,7 @@ func NewGrpcProxyAgentOptions() *GrpcProxyAgentOptions {
KeepaliveTime: 1 * time.Hour,
ServiceAccountTokenPath: "",
WarnOnChannelLimit: false,
SyncForever: false,
}
return &o
}
42 changes: 32 additions & 10 deletions pkg/agent/clientset.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package agent

import (
"fmt"
"math"
"sync"
"time"
Expand All @@ -38,7 +37,7 @@ type ClientSet struct {
address string // proxy server address. Assuming HA proxy server
serverCount int // number of proxy server instances, should be 1
// unless it is an HA server. Initialized when the ClientSet creates
// the first client.
// the first client. When syncForever is set, it will be the most recently seen.
syncInterval time.Duration // The interval by which the agent
// periodically checks that it has connections to all instances of the
// proxy server.
Expand All @@ -57,6 +56,8 @@ type ClientSet struct {
// by the server when choosing agent

warnOnChannelLimit bool

syncForever bool // Continue syncing (support dynamic server count).
}

func (cs *ClientSet) ClientsCount() int {
Expand Down Expand Up @@ -89,9 +90,17 @@ func (cs *ClientSet) HasID(serverID string) bool {
return cs.hasIDLocked(serverID)
}

type DuplicateServerError struct {
ServerID string
}

func (dse *DuplicateServerError) Error() string {
return "duplicate server: " + dse.ServerID
}

func (cs *ClientSet) addClientLocked(serverID string, c *Client) error {
if cs.hasIDLocked(serverID) {
return fmt.Errorf("client for proxy server %s already exists", serverID)
return &DuplicateServerError{ServerID: serverID}
}
cs.clients[serverID] = c
return nil
Expand Down Expand Up @@ -124,6 +133,7 @@ type ClientSetConfig struct {
DialOptions []grpc.DialOption
ServiceAccountTokenPath string
WarnOnChannelLimit bool
SyncForever bool
}

func (cc *ClientSetConfig) NewAgentClientSet(stopCh <-chan struct{}) *ClientSet {
Expand All @@ -138,6 +148,7 @@ func (cc *ClientSetConfig) NewAgentClientSet(stopCh <-chan struct{}) *ClientSet
dialOptions: cc.DialOptions,
serviceAccountTokenPath: cc.ServiceAccountTokenPath,
warnOnChannelLimit: cc.WarnOnChannelLimit,
syncForever: cc.SyncForever,
stopCh: stopCh,
}
}
Expand All @@ -162,9 +173,16 @@ func (cs *ClientSet) sync() {
backoff := cs.resetBackoff()
var duration time.Duration
for {
if err := cs.syncOnce(); err != nil {
klog.ErrorS(err, "cannot sync once")
duration = backoff.Step()
if err := cs.connectOnce(); err != nil {
if dse, ok := err.(*DuplicateServerError); ok {
klog.V(5).InfoS("duplicate server", "serverID", dse.ServerID, "serverCount", cs.serverCount, "clientsCount", cs.ClientsCount())
if cs.serverCount != 0 && cs.ClientsCount() >= cs.serverCount {
duration = backoff.Step()
}
} else {
klog.ErrorS(err, "cannot sync once")
duration = backoff.Step()
}
} else {
backoff = cs.resetBackoff()
duration = wait.Jitter(backoff.Duration, backoff.Jitter)
Expand All @@ -178,8 +196,8 @@ func (cs *ClientSet) sync() {
}
}

func (cs *ClientSet) syncOnce() error {
if cs.serverCount != 0 && cs.ClientsCount() >= cs.serverCount {
func (cs *ClientSet) connectOnce() error {
if !cs.syncForever && cs.serverCount != 0 && cs.ClientsCount() >= cs.serverCount {
return nil
}
c, serverCount, err := cs.newAgentClient()
Expand All @@ -193,9 +211,13 @@ func (cs *ClientSet) syncOnce() error {
}
cs.serverCount = serverCount
if err := cs.AddClient(c.serverID, c); err != nil {
klog.ErrorS(err, "closing connection failure when adding a client")
if dse, ok := err.(*DuplicateServerError); ok {
klog.V(2).InfoS("closing connection to duplicate server", "serverID", dse.ServerID)
} else {
klog.ErrorS(err, "closing connection failure when adding a client")
}
c.Close()
return nil
return err
}
klog.V(2).InfoS("sync added client connecting to proxy server", "serverID", c.serverID)
go c.Serve()
Expand Down

0 comments on commit 78b65d9

Please sign in to comment.