Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backend manager re-design #342

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 152 additions & 28 deletions pkg/server/backend_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func newBackend(conn agent.AgentService_ConnectServer) *backend {
// connections, i.e., get, add and remove
type BackendStorage interface {
// AddBackend adds a backend.
AddBackend(identifier string, idType pkgagent.IdentifierType, conn agent.AgentService_ConnectServer) Backend
AddBackend(identifier string, identifiers pkgagent.Identifiers, conn agent.AgentService_ConnectServer) Backend
// RemoveBackend removes a backend.
RemoveBackend(identifier string, idType pkgagent.IdentifierType, conn agent.AgentService_ConnectServer)
// NumBackends returns the number of backends.
Expand All @@ -128,9 +128,33 @@ type DefaultBackendManager struct {
*DefaultBackendStorage
}

func (dbm *DefaultBackendManager) Backend(_ context.Context) (Backend, error) {
klog.V(5).InfoS("Get a random backend through the DefaultBackendManager")
return dbm.DefaultBackendStorage.GetRandomBackend()
func (dbm *DefaultBackendManager) Backend(ctx context.Context) (Backend, error) {
var backend Backend
var err error
out:
for _, strategy := range dbm.proxyStrategies {
switch strategy {
case ProxyStrategyDestHost:
klog.V(5).InfoS("Get a backend based on DestHost strategy")
backend, err = dbm.GetBackendDestHost(ctx)
if err == nil {
break out
}
case ProxyStrategyDefaultRoute:
klog.V(5).InfoS("Get a backend based on DefaultRoute strategy")
backend, err = dbm.DefaultBackendStorage.GetBackendDefaultRoute()
if err == nil {
break out
}
default:
klog.V(5).InfoS("Get a random backend")
backend, err = dbm.DefaultBackendStorage.GetRandomBackend()
if err == nil {
break out
}
}
}
return backend, err
}

// DefaultBackendStorage is the default backend storage.
Expand All @@ -154,22 +178,37 @@ type DefaultBackendStorage struct {
// types of identifiers when associating to a specific BackendManager,
// e.g., when associating to the DestHostBackendManager, it can only use the
// identifiers of types, IPv4, IPv6 and Host.
idTypes []pkgagent.IdentifierType
idTypes []pkgagent.IdentifierType
proxyStrategies []ProxyStrategy
}

// NewDefaultBackendManager returns a DefaultBackendManager.
func NewDefaultBackendManager() *DefaultBackendManager {
func NewDefaultBackendManager(proxyStrategies []ProxyStrategy) *DefaultBackendManager {
var idTypes []pkgagent.IdentifierType
for _, ps := range proxyStrategies {
switch ps {
case ProxyStrategyDestHost:
idTypes = append(idTypes, pkgagent.IPv4, pkgagent.IPv6, pkgagent.Host)
case ProxyStrategyDefault:
idTypes = append(idTypes, pkgagent.UID)
case ProxyStrategyDefaultRoute:
idTypes = append(idTypes, pkgagent.DefaultRoute)
default:
klog.V(4).InfoS("Unknonw proxy strategy", "strategy", ps)
}
}
return &DefaultBackendManager{
DefaultBackendStorage: NewDefaultBackendStorage(
[]pkgagent.IdentifierType{pkgagent.UID})}
DefaultBackendStorage: NewDefaultBackendStorage(idTypes, proxyStrategies),
}
}

// NewDefaultBackendStorage returns a DefaultBackendStorage
func NewDefaultBackendStorage(idTypes []pkgagent.IdentifierType) *DefaultBackendStorage {
func NewDefaultBackendStorage(idTypes []pkgagent.IdentifierType, proxyStrategies []ProxyStrategy) *DefaultBackendStorage {
return &DefaultBackendStorage{
backends: make(map[string][]*backend),
random: rand.New(rand.NewSource(time.Now().UnixNano())),
idTypes: idTypes,
backends: make(map[string][]*backend),
random: rand.New(rand.NewSource(time.Now().UnixNano())),
idTypes: idTypes,
proxyStrategies: proxyStrategies,
} /* #nosec G404 */
}

Expand All @@ -179,36 +218,88 @@ func containIDType(idTypes []pkgagent.IdentifierType, idType pkgagent.Identifier
return true
}
}
klog.V(4).InfoS("Identifier type is not supported", "error", "error", &ErrWrongIDType{idType, idTypes})
return false
}

// AddBackend adds a backend.
func (s *DefaultBackendStorage) AddBackend(identifier string, idType pkgagent.IdentifierType, conn agent.AgentService_ConnectServer) Backend {
if !containIDType(s.idTypes, idType) {
klog.V(4).InfoS("fail to add backend", "backend", identifier, "error", &ErrWrongIDType{idType, s.idTypes})
return nil
}
klog.V(2).InfoS("Register backend for agent", "connection", conn, "agentID", identifier)
func (s *DefaultBackendStorage) AddBackend(agentID string, agentIdentifiers pkgagent.Identifiers, conn agent.AgentService_ConnectServer) Backend {
klog.V(2).InfoS("Register backend for agent", "connection", conn, "agentID", agentID)
s.mu.Lock()
defer s.mu.Unlock()
_, ok := s.backends[identifier]
addedBackend := newBackend(conn)

// Default behaviour which shall run always as DefaultRoute strategy builds on it
_, ok := s.backends[agentID]
if ok {
for _, v := range s.backends[identifier] {
for _, v := range s.backends[agentID] {
if v.conn == conn {
klog.V(1).InfoS("This should not happen. Adding existing backend for agent", "connection", conn, "agentID", identifier)
klog.V(1).InfoS("This should not happen. Adding existing backend for agent", "connection", conn, "agentID", agentID)
return v
}
}
s.backends[identifier] = append(s.backends[identifier], addedBackend)
return addedBackend
s.backends[agentID] = append(s.backends[agentID], addedBackend)
} else {
s.backends[agentID] = []*backend{addedBackend}
s.agentIDs = append(s.agentIDs, agentID)
if agentIdentifiers.DefaultRoute && containIDType(s.idTypes, pkgagent.DefaultRoute) {
s.defaultRouteAgentIDs = append(s.defaultRouteAgentIDs, agentID)
}
}
s.backends[identifier] = []*backend{addedBackend}
metrics.Metrics.SetBackendCount(len(s.backends))
s.agentIDs = append(s.agentIDs, identifier)
if idType == pkgagent.DefaultRoute {
s.defaultRouteAgentIDs = append(s.defaultRouteAgentIDs, identifier)

// populate the backends according for DestHost proxy strategy
if len(agentIdentifiers.IPv4) > 0 && containIDType(s.idTypes, pkgagent.IPv4) {
for _, ipv4 := range agentIdentifiers.IPv4 {
klog.V(5).InfoS("Add the agent to BackendManager", "agentIdentifier", ipv4, "agentID", agentID)
_, ok := s.backends[ipv4]
if ok {
for _, v := range s.backends[ipv4] {
if v.conn == conn {
klog.V(1).InfoS("This should not happen. Adding existing backend for agent", "connection", conn, "agentID", ipv4)
return v
}
}
s.backends[ipv4] = append(s.backends[ipv4], addedBackend)
} else {
s.backends[ipv4] = []*backend{addedBackend}
}
}
}
if len(agentIdentifiers.IPv6) > 0 && containIDType(s.idTypes, pkgagent.IPv6) {
for _, ipv6 := range agentIdentifiers.IPv6 {
klog.V(5).InfoS("Add the agent to BackendManager", "agentIdentifier", ipv6, "agentID", agentID)
_, ok := s.backends[ipv6]
if ok {
for _, v := range s.backends[ipv6] {
if v.conn == conn {
klog.V(1).InfoS("This should not happen. Adding existing backend for agent", "connection", conn, "agentID", ipv6)
return v
}
}
s.backends[ipv6] = append(s.backends[ipv6], addedBackend)
} else {
s.backends[ipv6] = []*backend{addedBackend}
}
}
}
if len(agentIdentifiers.Host) > 0 && containIDType(s.idTypes, pkgagent.Host) {
for _, host := range agentIdentifiers.Host {
klog.V(5).InfoS("Add the agent to BackendManager", "agentIdentifier", host, "agentID", agentID)
_, ok := s.backends[host]
if ok {
for _, v := range s.backends[host] {
if v.conn == conn {
klog.V(1).InfoS("This should not happen. Adding existing backend for agent", "connection", conn, "agentID", host)
return v
}
}
s.backends[host] = append(s.backends[host], addedBackend)
} else {
s.backends[host] = []*backend{addedBackend}
}
}
}
metrics.Metrics.SetBackendCount(len(s.backends))
return addedBackend
}

Expand Down Expand Up @@ -291,6 +382,39 @@ func ignoreNotFound(err error) error {
return err
}

// GetBackendDestHost tries to get a backend associating to the request destination host/address.
func (s *DefaultBackendStorage) GetBackendDestHost(ctx context.Context) (Backend, error) {
s.mu.RLock()
defer s.mu.RUnlock()
if len(s.backends) == 0 {
return nil, &ErrNotFound{}
}
destHost := ctx.Value(destHost).(string)
if destHost != "" {
bes, exist := s.backends[destHost]
if exist && len(bes) > 0 {
klog.V(5).InfoS("Get the backend through the DestHostBackendManager", "destHost", destHost)
return s.backends[destHost][0], nil
}
}
return nil, &ErrNotFound{}
}

// Backend tries to get a backend associating to the request destination host.
func (s *DefaultBackendStorage) GetBackendDefaultRoute() (Backend, error) {
s.mu.RLock()
defer s.mu.RUnlock()
if len(s.backends) == 0 {
return nil, &ErrNotFound{}
}
if len(s.defaultRouteAgentIDs) == 0 {
return nil, &ErrNotFound{}
}
agentID := s.defaultRouteAgentIDs[s.random.Intn(len(s.defaultRouteAgentIDs))]
klog.V(4).InfoS("Picked agent as backend", "agentID", agentID)
return s.backends[agentID][0], nil
}

// GetRandomBackend returns a random backend connection from all connected agents.
func (s *DefaultBackendStorage) GetRandomBackend() (Backend, error) {
s.mu.Lock()
Expand Down
Loading