diff --git a/pkg/server/backend_manager.go b/pkg/server/backend_manager.go index 62954acf3..5d46147d3 100644 --- a/pkg/server/backend_manager.go +++ b/pkg/server/backend_manager.go @@ -101,7 +101,7 @@ func newBackend(conn agent.AgentService_ConnectServer) *backend { // connections, i.e., get, add and remove type BackendStorage interface { // AddBackend adds a backend. - AddBackend(identifier string, idType pkgagent.IdentifierType, conn agent.AgentService_ConnectServer) Backend + AddBackend(identifier string, identifiers pkgagent.Identifiers, conn agent.AgentService_ConnectServer) Backend // RemoveBackend removes a backend. RemoveBackend(identifier string, idType pkgagent.IdentifierType, conn agent.AgentService_ConnectServer) // NumBackends returns the number of backends. @@ -128,9 +128,33 @@ type DefaultBackendManager struct { *DefaultBackendStorage } -func (dbm *DefaultBackendManager) Backend(_ context.Context) (Backend, error) { - klog.V(5).InfoS("Get a random backend through the DefaultBackendManager") - return dbm.DefaultBackendStorage.GetRandomBackend() +func (dbm *DefaultBackendManager) Backend(ctx context.Context) (Backend, error) { + var backend Backend + var err error +out: + for _, strategy := range dbm.proxyStrategies { + switch strategy { + case ProxyStrategyDestHost: + klog.V(5).InfoS("Get a backend based on DestHost strategy") + backend, err = dbm.GetBackendDestHost(ctx) + if err == nil { + break out + } + case ProxyStrategyDefaultRoute: + klog.V(5).InfoS("Get a backend based on DefaultRoute strategy") + backend, err = dbm.DefaultBackendStorage.GetBackendDefaultRoute() + if err == nil { + break out + } + default: + klog.V(5).InfoS("Get a random backend") + backend, err = dbm.DefaultBackendStorage.GetRandomBackend() + if err == nil { + break out + } + } + } + return backend, err } // DefaultBackendStorage is the default backend storage. @@ -154,22 +178,37 @@ type DefaultBackendStorage struct { // types of identifiers when associating to a specific BackendManager, // e.g., when associating to the DestHostBackendManager, it can only use the // identifiers of types, IPv4, IPv6 and Host. - idTypes []pkgagent.IdentifierType + idTypes []pkgagent.IdentifierType + proxyStrategies []ProxyStrategy } // NewDefaultBackendManager returns a DefaultBackendManager. -func NewDefaultBackendManager() *DefaultBackendManager { +func NewDefaultBackendManager(proxyStrategies []ProxyStrategy) *DefaultBackendManager { + var idTypes []pkgagent.IdentifierType + for _, ps := range proxyStrategies { + switch ps { + case ProxyStrategyDestHost: + idTypes = append(idTypes, pkgagent.IPv4, pkgagent.IPv6, pkgagent.Host) + case ProxyStrategyDefault: + idTypes = append(idTypes, pkgagent.UID) + case ProxyStrategyDefaultRoute: + idTypes = append(idTypes, pkgagent.DefaultRoute) + default: + klog.V(4).InfoS("Unknonw proxy strategy", "strategy", ps) + } + } return &DefaultBackendManager{ - DefaultBackendStorage: NewDefaultBackendStorage( - []pkgagent.IdentifierType{pkgagent.UID})} + DefaultBackendStorage: NewDefaultBackendStorage(idTypes, proxyStrategies), + } } // NewDefaultBackendStorage returns a DefaultBackendStorage -func NewDefaultBackendStorage(idTypes []pkgagent.IdentifierType) *DefaultBackendStorage { +func NewDefaultBackendStorage(idTypes []pkgagent.IdentifierType, proxyStrategies []ProxyStrategy) *DefaultBackendStorage { return &DefaultBackendStorage{ - backends: make(map[string][]*backend), - random: rand.New(rand.NewSource(time.Now().UnixNano())), - idTypes: idTypes, + backends: make(map[string][]*backend), + random: rand.New(rand.NewSource(time.Now().UnixNano())), + idTypes: idTypes, + proxyStrategies: proxyStrategies, } /* #nosec G404 */ } @@ -179,36 +218,88 @@ func containIDType(idTypes []pkgagent.IdentifierType, idType pkgagent.Identifier return true } } + klog.V(4).InfoS("Identifier type is not supported", "error", "error", &ErrWrongIDType{idType, idTypes}) return false } // AddBackend adds a backend. -func (s *DefaultBackendStorage) AddBackend(identifier string, idType pkgagent.IdentifierType, conn agent.AgentService_ConnectServer) Backend { - if !containIDType(s.idTypes, idType) { - klog.V(4).InfoS("fail to add backend", "backend", identifier, "error", &ErrWrongIDType{idType, s.idTypes}) - return nil - } - klog.V(2).InfoS("Register backend for agent", "connection", conn, "agentID", identifier) +func (s *DefaultBackendStorage) AddBackend(agentID string, agentIdentifiers pkgagent.Identifiers, conn agent.AgentService_ConnectServer) Backend { + klog.V(2).InfoS("Register backend for agent", "connection", conn, "agentID", agentID) s.mu.Lock() defer s.mu.Unlock() - _, ok := s.backends[identifier] addedBackend := newBackend(conn) + + // Default behaviour which shall run always as DefaultRoute strategy builds on it + _, ok := s.backends[agentID] if ok { - for _, v := range s.backends[identifier] { + for _, v := range s.backends[agentID] { if v.conn == conn { - klog.V(1).InfoS("This should not happen. Adding existing backend for agent", "connection", conn, "agentID", identifier) + klog.V(1).InfoS("This should not happen. Adding existing backend for agent", "connection", conn, "agentID", agentID) return v } } - s.backends[identifier] = append(s.backends[identifier], addedBackend) - return addedBackend + s.backends[agentID] = append(s.backends[agentID], addedBackend) + } else { + s.backends[agentID] = []*backend{addedBackend} + s.agentIDs = append(s.agentIDs, agentID) + if agentIdentifiers.DefaultRoute && containIDType(s.idTypes, pkgagent.DefaultRoute) { + s.defaultRouteAgentIDs = append(s.defaultRouteAgentIDs, agentID) + } } - s.backends[identifier] = []*backend{addedBackend} - metrics.Metrics.SetBackendCount(len(s.backends)) - s.agentIDs = append(s.agentIDs, identifier) - if idType == pkgagent.DefaultRoute { - s.defaultRouteAgentIDs = append(s.defaultRouteAgentIDs, identifier) + + // populate the backends according for DestHost proxy strategy + if len(agentIdentifiers.IPv4) > 0 && containIDType(s.idTypes, pkgagent.IPv4) { + for _, ipv4 := range agentIdentifiers.IPv4 { + klog.V(5).InfoS("Add the agent to BackendManager", "agentIdentifier", ipv4, "agentID", agentID) + _, ok := s.backends[ipv4] + if ok { + for _, v := range s.backends[ipv4] { + if v.conn == conn { + klog.V(1).InfoS("This should not happen. Adding existing backend for agent", "connection", conn, "agentID", ipv4) + return v + } + } + s.backends[ipv4] = append(s.backends[ipv4], addedBackend) + } else { + s.backends[ipv4] = []*backend{addedBackend} + } + } + } + if len(agentIdentifiers.IPv6) > 0 && containIDType(s.idTypes, pkgagent.IPv6) { + for _, ipv6 := range agentIdentifiers.IPv6 { + klog.V(5).InfoS("Add the agent to BackendManager", "agentIdentifier", ipv6, "agentID", agentID) + _, ok := s.backends[ipv6] + if ok { + for _, v := range s.backends[ipv6] { + if v.conn == conn { + klog.V(1).InfoS("This should not happen. Adding existing backend for agent", "connection", conn, "agentID", ipv6) + return v + } + } + s.backends[ipv6] = append(s.backends[ipv6], addedBackend) + } else { + s.backends[ipv6] = []*backend{addedBackend} + } + } } + if len(agentIdentifiers.Host) > 0 && containIDType(s.idTypes, pkgagent.Host) { + for _, host := range agentIdentifiers.Host { + klog.V(5).InfoS("Add the agent to BackendManager", "agentIdentifier", host, "agentID", agentID) + _, ok := s.backends[host] + if ok { + for _, v := range s.backends[host] { + if v.conn == conn { + klog.V(1).InfoS("This should not happen. Adding existing backend for agent", "connection", conn, "agentID", host) + return v + } + } + s.backends[host] = append(s.backends[host], addedBackend) + } else { + s.backends[host] = []*backend{addedBackend} + } + } + } + metrics.Metrics.SetBackendCount(len(s.backends)) return addedBackend } @@ -291,6 +382,39 @@ func ignoreNotFound(err error) error { return err } +// GetBackendDestHost tries to get a backend associating to the request destination host/address. +func (s *DefaultBackendStorage) GetBackendDestHost(ctx context.Context) (Backend, error) { + s.mu.RLock() + defer s.mu.RUnlock() + if len(s.backends) == 0 { + return nil, &ErrNotFound{} + } + destHost := ctx.Value(destHost).(string) + if destHost != "" { + bes, exist := s.backends[destHost] + if exist && len(bes) > 0 { + klog.V(5).InfoS("Get the backend through the DestHostBackendManager", "destHost", destHost) + return s.backends[destHost][0], nil + } + } + return nil, &ErrNotFound{} +} + +// Backend tries to get a backend associating to the request destination host. +func (s *DefaultBackendStorage) GetBackendDefaultRoute() (Backend, error) { + s.mu.RLock() + defer s.mu.RUnlock() + if len(s.backends) == 0 { + return nil, &ErrNotFound{} + } + if len(s.defaultRouteAgentIDs) == 0 { + return nil, &ErrNotFound{} + } + agentID := s.defaultRouteAgentIDs[s.random.Intn(len(s.defaultRouteAgentIDs))] + klog.V(4).InfoS("Picked agent as backend", "agentID", agentID) + return s.backends[agentID][0], nil +} + // GetRandomBackend returns a random backend connection from all connected agents. func (s *DefaultBackendStorage) GetRandomBackend() (Backend, error) { s.mu.Lock() diff --git a/pkg/server/backend_manager_test.go b/pkg/server/backend_manager_test.go index ad0952514..4a8ea4f7d 100644 --- a/pkg/server/backend_manager_test.go +++ b/pkg/server/backend_manager_test.go @@ -17,6 +17,8 @@ limitations under the License. package server import ( + "context" + "errors" "reflect" "testing" @@ -26,6 +28,11 @@ import ( type fakeAgentServiceConnectServer struct { agent.AgentService_ConnectServer + name string +} + +func fakeNewBackend(conn fakeAgentServiceConnectServer) *backend { + return &backend{conn: conn} } func TestAddRemoveBackends(t *testing.T) { @@ -35,9 +42,9 @@ func TestAddRemoveBackends(t *testing.T) { conn22 := new(fakeAgentServiceConnectServer) conn3 := new(fakeAgentServiceConnectServer) - p := NewDefaultBackendManager() + p := NewDefaultBackendManager([]ProxyStrategy{ProxyStrategyDefault}) - p.AddBackend("agent1", pkgagent.UID, conn1) + p.AddBackend("agent1", pkgagent.Identifiers{}, conn1) p.RemoveBackend("agent1", pkgagent.UID, conn1) expectedBackends := make(map[string][]*backend) expectedAgentIDs := []string{} @@ -48,14 +55,14 @@ func TestAddRemoveBackends(t *testing.T) { t.Errorf("expected %v, got %v", e, a) } - p = NewDefaultBackendManager() - p.AddBackend("agent1", pkgagent.UID, conn1) - p.AddBackend("agent1", pkgagent.UID, conn12) + p = NewDefaultBackendManager([]ProxyStrategy{ProxyStrategyDefault}) + p.AddBackend("agent1", pkgagent.Identifiers{}, conn1) + p.AddBackend("agent1", pkgagent.Identifiers{}, conn12) // Adding the same connection again should be a no-op. - p.AddBackend("agent1", pkgagent.UID, conn12) - p.AddBackend("agent2", pkgagent.UID, conn2) - p.AddBackend("agent2", pkgagent.UID, conn22) - p.AddBackend("agent3", pkgagent.UID, conn3) + p.AddBackend("agent1", pkgagent.Identifiers{}, conn12) + p.AddBackend("agent2", pkgagent.Identifiers{}, conn2) + p.AddBackend("agent2", pkgagent.Identifiers{}, conn22) + p.AddBackend("agent3", pkgagent.Identifiers{}, conn3) p.RemoveBackend("agent2", pkgagent.UID, conn22) p.RemoveBackend("agent2", pkgagent.UID, conn2) p.RemoveBackend("agent1", pkgagent.UID, conn1) @@ -81,9 +88,9 @@ func TestAddRemoveBackendsWithDefaultRoute(t *testing.T) { conn22 := new(fakeAgentServiceConnectServer) conn3 := new(fakeAgentServiceConnectServer) - p := NewDefaultRouteBackendManager() + p := NewDefaultBackendManager([]ProxyStrategy{ProxyStrategyDefaultRoute}) - p.AddBackend("agent1", pkgagent.DefaultRoute, conn1) + p.AddBackend("agent1", pkgagent.Identifiers{DefaultRoute: true}, conn1) p.RemoveBackend("agent1", pkgagent.DefaultRoute, conn1) expectedBackends := make(map[string][]*backend) expectedAgentIDs := []string{} @@ -97,14 +104,14 @@ func TestAddRemoveBackendsWithDefaultRoute(t *testing.T) { t.Errorf("expected %v, got %v", e, a) } - p = NewDefaultRouteBackendManager() - p.AddBackend("agent1", pkgagent.DefaultRoute, conn1) - p.AddBackend("agent1", pkgagent.DefaultRoute, conn12) + p = NewDefaultBackendManager([]ProxyStrategy{ProxyStrategyDefaultRoute}) + p.AddBackend("agent1", pkgagent.Identifiers{DefaultRoute: true}, conn1) + p.AddBackend("agent1", pkgagent.Identifiers{DefaultRoute: true}, conn12) // Adding the same connection again should be a no-op. - p.AddBackend("agent1", pkgagent.DefaultRoute, conn12) - p.AddBackend("agent2", pkgagent.DefaultRoute, conn2) - p.AddBackend("agent2", pkgagent.DefaultRoute, conn22) - p.AddBackend("agent3", pkgagent.DefaultRoute, conn3) + p.AddBackend("agent1", pkgagent.Identifiers{DefaultRoute: true}, conn12) + p.AddBackend("agent2", pkgagent.Identifiers{DefaultRoute: true}, conn2) + p.AddBackend("agent2", pkgagent.Identifiers{DefaultRoute: true}, conn22) + p.AddBackend("agent3", pkgagent.Identifiers{DefaultRoute: true}, conn3) p.RemoveBackend("agent2", pkgagent.DefaultRoute, conn22) p.RemoveBackend("agent2", pkgagent.DefaultRoute, conn2) p.RemoveBackend("agent1", pkgagent.DefaultRoute, conn1) @@ -124,3 +131,216 @@ func TestAddRemoveBackendsWithDefaultRoute(t *testing.T) { t.Errorf("expected %v, got %v", e, a) } } + +func TestStrategiesDestHost(t *testing.T) { + conn1 := fakeAgentServiceConnectServer{name: "conn1"} + conn2 := fakeAgentServiceConnectServer{name: "conn2"} + conn3 := fakeAgentServiceConnectServer{name: "conn3"} + conn4 := fakeAgentServiceConnectServer{name: "conn4"} + + p := NewDefaultBackendManager([]ProxyStrategy{ProxyStrategyDestHost}) + + p.AddBackend("agent1", pkgagent.Identifiers{DefaultRoute: true}, conn1) + p.AddBackend("agent2", pkgagent.Identifiers{DefaultRoute: true}, conn2) + p.AddBackend("agent3", pkgagent.Identifiers{DefaultRoute: true, IPv4: []string{"192.168.1.103"}}, conn3) + p.AddBackend("agent4", pkgagent.Identifiers{IPv4: []string{"192.168.1.104"}}, conn4) + expectedBackends := map[string][]*backend{ + "agent1": {fakeNewBackend(conn1)}, + "agent2": {fakeNewBackend(conn2)}, + "agent3": {fakeNewBackend(conn3)}, + "192.168.1.103": {fakeNewBackend(conn3)}, + "agent4": {fakeNewBackend(conn4)}, + "192.168.1.104": {fakeNewBackend(conn4)}, + } + expectedAgentIDs := []string{"agent1", "agent2", "agent3", "agent4"} + if e, a := expectedBackends, p.backends; !reflect.DeepEqual(e, a) { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := expectedAgentIDs, p.agentIDs; !reflect.DeepEqual(e, a) { + t.Errorf("expected %v, got %v", e, a) + } + // All agents are added, but not for defaultRoute strategy + // as the server is not configured to with DefaultRoute strategy + if len(p.defaultRouteAgentIDs) > 0 { + t.Errorf("expected 0, got %v", len(p.defaultRouteAgentIDs)) + } + ctx := context.Background() + // Get agent for host 192.168.1.103 (backend3) + ctx = context.WithValue(ctx, destHost, "192.168.1.103") + res, _ := p.Backend(ctx) + v := res.(*backend) + if e, a := v.conn, conn3; !reflect.DeepEqual(e, a) { + t.Errorf("expected %v, got %v", e, a) + } + // Get agent for host 192.168.1.104 (backend4) + ctx = context.WithValue(ctx, destHost, "192.168.1.104") + res, _ = p.Backend(ctx) + v = res.(*backend) + if e, a := v.conn, conn4; !reflect.DeepEqual(e, a) { + t.Errorf("expected %v, got %v", e, a) + } + // No agent for host 192.168.1.105 + ctx = context.WithValue(ctx, destHost, "192.168.1.105") + _, err := p.Backend(ctx) + if e, a := err.Error(), errors.New("No backend available").Error(); e != a { + t.Errorf("expected %v, got %v", e, a) + } +} + +func TestStrategiesDestHostAndDefault(t *testing.T) { + conn1 := fakeAgentServiceConnectServer{name: "conn1"} + conn2 := fakeAgentServiceConnectServer{name: "conn2"} + conn3 := fakeAgentServiceConnectServer{name: "conn3"} + conn4 := fakeAgentServiceConnectServer{name: "conn4"} + + p := NewDefaultBackendManager([]ProxyStrategy{ProxyStrategyDestHost, ProxyStrategyDefault}) + + p.AddBackend("agent1", pkgagent.Identifiers{}, conn1) + p.AddBackend("agent2", pkgagent.Identifiers{}, conn2) + p.AddBackend("agent3", pkgagent.Identifiers{DefaultRoute: true, IPv4: []string{"192.168.1.103"}}, conn3) + p.AddBackend("agent4", pkgagent.Identifiers{IPv4: []string{"192.168.1.104"}}, conn4) + expectedBackends := map[string][]*backend{ + "agent1": {fakeNewBackend(conn1)}, + "agent2": {fakeNewBackend(conn2)}, + "agent3": {fakeNewBackend(conn3)}, + "192.168.1.103": {fakeNewBackend(conn3)}, + "agent4": {fakeNewBackend(conn4)}, + "192.168.1.104": {fakeNewBackend(conn4)}, + } + expectedAgentIDs := []string{"agent1", "agent2", "agent3", "agent4"} + if e, a := expectedBackends, p.backends; !reflect.DeepEqual(e, a) { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := expectedAgentIDs, p.agentIDs; !reflect.DeepEqual(e, a) { + t.Errorf("expected %v, got %v", e, a) + } + // All agents are added, but not for defaultRoute strategy + // as the server is not configured to with DefaultRoute strategy + if len(p.defaultRouteAgentIDs) > 0 { + t.Errorf("expected 0, got %v", len(p.defaultRouteAgentIDs)) + } + ctx := context.Background() + // Get agent for host 192.168.1.103 (backend3) + ctx = context.WithValue(ctx, destHost, "192.168.1.103") + res, _ := p.Backend(ctx) + v := res.(*backend) + if e, a := v.conn, conn3; !reflect.DeepEqual(e, a) { + t.Errorf("expected %v, got %v", e, a) + } + // Get agent for host 192.168.1.104 (backend4) + ctx = context.WithValue(ctx, destHost, "192.168.1.104") + res, _ = p.Backend(ctx) + v = res.(*backend) + if e, a := v.conn, conn4; !reflect.DeepEqual(e, a) { + t.Errorf("expected %v, got %v", e, a) + } + // Get random agent for host 192.168.1.105 with success + ctx = context.WithValue(ctx, destHost, "192.168.1.105") + _, err := p.Backend(ctx) + if err != nil { + t.Errorf("expected nil, got %v", err) + } +} + +func TestStrategiesDefaultRoute(t *testing.T) { + conn1 := fakeAgentServiceConnectServer{name: "conn1"} + conn2 := fakeAgentServiceConnectServer{name: "conn2"} + ctx := context.Background() + + p := NewDefaultBackendManager([]ProxyStrategy{ProxyStrategyDefaultRoute}) + p.AddBackend("agent1", pkgagent.Identifiers{DefaultRoute: true}, conn1) + p.AddBackend("agent2", pkgagent.Identifiers{DefaultRoute: true}, conn2) + expectedBackends := map[string][]*backend{ + "agent1": {fakeNewBackend(conn1)}, + "agent2": {fakeNewBackend(conn2)}, + } + expectedAgentIDs := []string{"agent1", "agent2"} + if e, a := expectedBackends, p.backends; !reflect.DeepEqual(e, a) { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := expectedAgentIDs, p.agentIDs; !reflect.DeepEqual(e, a) { + t.Errorf("expected %v, got %v", e, a) + } + expectedDefaultRouteAgentIDs := []string{"agent1", "agent2"} + if e, a := expectedDefaultRouteAgentIDs, p.defaultRouteAgentIDs; !reflect.DeepEqual(e, a) { + t.Errorf("expected %v, got %v", e, a) + } + // Get backend successfully + res, err := p.Backend(ctx) + if err != nil { + t.Errorf("expected nil, got %v", err) + } + v := res.(*backend) + if e1, e2, a := conn1, conn2, v.conn; !reflect.DeepEqual(e1, a) && !reflect.DeepEqual(e2, a) { + t.Errorf("expected %v or %v, got %v", e1, e2, a) + } + + // Get backend successfully second time + res, err = p.Backend(ctx) + if err != nil { + t.Errorf("expected nil, got %v", err) + } + v = res.(*backend) + if e1, e2, a := conn1, conn2, v.conn; !reflect.DeepEqual(e1, a) && !reflect.DeepEqual(e2, a) { + t.Errorf("expected %v or %v, got %v", e1, e2, a) + } +} + +func TestStrategiesDestHostAndDefaultRoute(t *testing.T) { + conn1 := fakeAgentServiceConnectServer{name: "conn1"} + conn2 := fakeAgentServiceConnectServer{name: "conn2"} + conn3 := fakeAgentServiceConnectServer{name: "conn3"} + conn4 := fakeAgentServiceConnectServer{name: "conn4"} + ctx := context.Background() + + p := NewDefaultBackendManager([]ProxyStrategy{ProxyStrategyDestHost, ProxyStrategyDefaultRoute}) + p.AddBackend("agent1", pkgagent.Identifiers{DefaultRoute: true}, conn1) + p.AddBackend("agent2", pkgagent.Identifiers{DefaultRoute: true}, conn2) + p.AddBackend("agent3", pkgagent.Identifiers{IPv4: []string{"192.168.1.103"}}, conn3) + p.AddBackend("agent4", pkgagent.Identifiers{IPv4: []string{"192.168.1.104"}}, conn4) + expectedBackends := map[string][]*backend{ + "agent1": {fakeNewBackend(conn1)}, + "agent2": {fakeNewBackend(conn2)}, + "agent3": {fakeNewBackend(conn3)}, + "192.168.1.103": {fakeNewBackend(conn3)}, + "agent4": {fakeNewBackend(conn4)}, + "192.168.1.104": {fakeNewBackend(conn4)}, + } + expectedAgentIDs := []string{"agent1", "agent2", "agent3", "agent4"} + if e, a := expectedBackends, p.backends; !reflect.DeepEqual(e, a) { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := expectedAgentIDs, p.agentIDs; !reflect.DeepEqual(e, a) { + t.Errorf("expected %v, got %v", e, a) + } + expectedDefaultRouteAgentIDs := []string{"agent1", "agent2"} + if e, a := expectedDefaultRouteAgentIDs, p.defaultRouteAgentIDs; !reflect.DeepEqual(e, a) { + t.Errorf("expected %v, got %v", e, a) + } + // Get agent for host 192.168.1.103 (backend3) + ctx = context.WithValue(ctx, destHost, "192.168.1.103") + res, _ := p.Backend(ctx) + v := res.(*backend) + if e, a := v.conn, conn3; !reflect.DeepEqual(e, a) { + t.Errorf("expected %v, got %v", e, a) + } + // Get agent for host 192.168.1.104 (backend4) + ctx = context.WithValue(ctx, destHost, "192.168.1.104") + res, _ = p.Backend(ctx) + v = res.(*backend) + if e, a := v.conn, conn4; !reflect.DeepEqual(e, a) { + t.Errorf("expected %v, got %v", e, a) + } + // Get random agent for host 172.31.11.35 with success + ctx = context.WithValue(ctx, destHost, "172.31.11.35") + res, err := p.Backend(ctx) + if err != nil { + t.Errorf("expected nil, got %v", err) + } + v = res.(*backend) + // it must be conn1 or conn2 + // conn3 and conn4 are not having the default-route identifier + if e1, e2, a := conn1, conn2, v.conn; !reflect.DeepEqual(e1, a) && !reflect.DeepEqual(e2, a) { + t.Errorf("expected %v or %v, got %v", e1, e2, a) + } +} diff --git a/pkg/server/default_route_backend_manager.go b/pkg/server/default_route_backend_manager.go deleted file mode 100644 index d958cff21..000000000 --- a/pkg/server/default_route_backend_manager.go +++ /dev/null @@ -1,51 +0,0 @@ -/* -Copyright 2021 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package server - -import ( - "context" - - "k8s.io/klog/v2" - "sigs.k8s.io/apiserver-network-proxy/pkg/agent" -) - -type DefaultRouteBackendManager struct { - *DefaultBackendStorage -} - -var _ BackendManager = &DefaultRouteBackendManager{} - -func NewDefaultRouteBackendManager() *DefaultRouteBackendManager { - return &DefaultRouteBackendManager{ - DefaultBackendStorage: NewDefaultBackendStorage( - []agent.IdentifierType{agent.DefaultRoute})} -} - -// Backend tries to get a backend associating to the request destination host. -func (dibm *DefaultRouteBackendManager) Backend(ctx context.Context) (Backend, error) { - dibm.mu.RLock() - defer dibm.mu.RUnlock() - if len(dibm.backends) == 0 { - return nil, &ErrNotFound{} - } - if len(dibm.defaultRouteAgentIDs) == 0 { - return nil, &ErrNotFound{} - } - agentID := dibm.defaultRouteAgentIDs[dibm.random.Intn(len(dibm.defaultRouteAgentIDs))] - klog.V(4).InfoS("Picked agent as backend", "agentID", agentID) - return dibm.backends[agentID][0], nil -} diff --git a/pkg/server/desthost_backend_manager.go b/pkg/server/desthost_backend_manager.go deleted file mode 100644 index 1d7dd89cd..000000000 --- a/pkg/server/desthost_backend_manager.go +++ /dev/null @@ -1,54 +0,0 @@ -/* -Copyright 2020 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package server - -import ( - "context" - - "k8s.io/klog/v2" - "sigs.k8s.io/apiserver-network-proxy/pkg/agent" -) - -type DestHostBackendManager struct { - *DefaultBackendStorage -} - -var _ BackendManager = &DestHostBackendManager{} - -func NewDestHostBackendManager() *DestHostBackendManager { - return &DestHostBackendManager{ - DefaultBackendStorage: NewDefaultBackendStorage( - []agent.IdentifierType{agent.IPv4, agent.IPv6, agent.Host})} -} - -// Backend tries to get a backend associating to the request destination host. -func (dibm *DestHostBackendManager) Backend(ctx context.Context) (Backend, error) { - dibm.mu.RLock() - defer dibm.mu.RUnlock() - if len(dibm.backends) == 0 { - return nil, &ErrNotFound{} - } - destHost := ctx.Value(destHost).(string) - if destHost != "" { - bes, exist := dibm.backends[destHost] - if exist && len(bes) > 0 { - klog.V(5).InfoS("Get the backend through the DestHostBackendManager", "destHost", destHost) - return dibm.backends[destHost][0], nil - } - } - return nil, &ErrNotFound{} -} diff --git a/pkg/server/server.go b/pkg/server/server.go index 48a0f683b..076c4593e 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -118,8 +118,8 @@ func (pm *PendingDialManager) Remove(random int64) { // ProxyServer type ProxyServer struct { - // BackendManagers contains a list of BackendManagers - BackendManagers []BackendManager + // BackendManager + BackendManager BackendManager // Readiness reports if the proxy server is ready, i.e., if the proxy // server has connections to proxy agents (backends). Note that the @@ -174,93 +174,61 @@ func genContext(proxyStrategies []ProxyStrategy, reqHost string) context.Context func (s *ProxyServer) getBackend(reqHost string) (Backend, error) { ctx := genContext(s.proxyStrategies, reqHost) - for _, bm := range s.BackendManagers { - be, err := bm.Backend(ctx) - if err == nil { - return be, nil - } - if ignoreNotFound(err) != nil { - // if can't find a backend through current BackendManager, move on - // to the next one - return nil, err - } + be, err := s.BackendManager.Backend(ctx) + if err == nil { + return be, nil + } + if ignoreNotFound(err) != nil { + // if can't find a backend through current BackendManager, move on + // to the next one + return nil, err } return nil, &ErrNotFound{} } func (s *ProxyServer) addBackend(agentID string, conn agent.AgentService_ConnectServer) (backend Backend) { - for i := 0; i < len(s.BackendManagers); i++ { - switch s.BackendManagers[i].(type) { - case *DestHostBackendManager: - agentIdentifiers, err := getAgentIdentifiers(conn) - if err != nil { - klog.ErrorS(err, "fail to get the agent identifiers", "agentID", agentID) - break - } - for _, ipv4 := range agentIdentifiers.IPv4 { - klog.V(5).InfoS("Add the agent to DestHostBackendManager", "agent address", ipv4) - s.BackendManagers[i].AddBackend(ipv4, pkgagent.IPv4, conn) - } - for _, ipv6 := range agentIdentifiers.IPv6 { - klog.V(5).InfoS("Add the agent to DestHostBackendManager", "agent address", ipv6) - s.BackendManagers[i].AddBackend(ipv6, pkgagent.IPv6, conn) - } - for _, host := range agentIdentifiers.Host { - klog.V(5).InfoS("Add the agent to DestHostBackendManager", "agent address", host) - s.BackendManagers[i].AddBackend(host, pkgagent.Host, conn) - } - case *DefaultRouteBackendManager: - agentIdentifiers, err := getAgentIdentifiers(conn) - if err != nil { - klog.ErrorS(err, "fail to get the agent identifiers", "agentID", agentID) - break - } - if agentIdentifiers.DefaultRoute { - klog.V(5).InfoS("Add the agent to DefaultRouteBackendManager", "agentID", agentID) - backend = s.BackendManagers[i].AddBackend(agentID, pkgagent.DefaultRoute, conn) - } - default: - klog.V(5).InfoS("Add the agent to DefaultBackendManager", "agentID", agentID) - backend = s.BackendManagers[i].AddBackend(agentID, pkgagent.UID, conn) - } + agentIdentifiers, err := getAgentIdentifiers(conn) + if err != nil { + klog.ErrorS(err, "fail to get the agent identifiers", "agentID", agentID) + return } - return + return s.BackendManager.AddBackend(agentID, agentIdentifiers, conn) } func (s *ProxyServer) removeBackend(agentID string, conn agent.AgentService_ConnectServer) { - for _, bm := range s.BackendManagers { - switch bm.(type) { - case *DestHostBackendManager: + for _, strategy := range s.proxyStrategies { + switch strategy { + case ProxyStrategyDestHost: agentIdentifiers, err := getAgentIdentifiers(conn) if err != nil { klog.ErrorS(err, "fail to get the agent identifiers", "agentID", agentID) - break + return } for _, ipv4 := range agentIdentifiers.IPv4 { - klog.V(5).InfoS("Remove the agent from the DestHostBackendManager", "agentHost", ipv4) - bm.RemoveBackend(ipv4, pkgagent.IPv4, conn) + klog.V(5).InfoS("Remove the agent from the BackendManager", "agentIdentifier", ipv4, "agentID", agentID) + s.BackendManager.RemoveBackend(ipv4, pkgagent.IPv4, conn) } for _, ipv6 := range agentIdentifiers.IPv6 { - klog.V(5).InfoS("Remove the agent from the DestHostBackendManager", "agentHost", ipv6) - bm.RemoveBackend(ipv6, pkgagent.IPv6, conn) + klog.V(5).InfoS("Remove the agent from the BackendManager", "agentIdentifier", ipv6, "agentID", agentID) + s.BackendManager.RemoveBackend(ipv6, pkgagent.IPv6, conn) } for _, host := range agentIdentifiers.Host { - klog.V(5).InfoS("Remove the agent from the DestHostBackendManager", "agentHost", host) - bm.RemoveBackend(host, pkgagent.Host, conn) + klog.V(5).InfoS("Remove the agent from the BackendManager", "agentIdentifier", host, "agentID", agentID) + s.BackendManager.RemoveBackend(host, pkgagent.Host, conn) } - case *DefaultRouteBackendManager: + case ProxyStrategyDefaultRoute: agentIdentifiers, err := getAgentIdentifiers(conn) if err != nil { klog.ErrorS(err, "fail to get the agent identifiers", "agentID", agentID) - break + return } if agentIdentifiers.DefaultRoute { - klog.V(5).InfoS("Remove the agent from the DefaultRouteBackendManager", "agentID", agentID) - bm.RemoveBackend(agentID, pkgagent.DefaultRoute, conn) + klog.V(5).InfoS("Remove the agent from the BackendManager", "agentID", agentID) + s.BackendManager.RemoveBackend(agentID, pkgagent.DefaultRoute, conn) } default: - klog.V(5).InfoS("Remove the agent from the DefaultBackendManager", "agentID", agentID) - bm.RemoveBackend(agentID, pkgagent.UID, conn) + klog.V(5).InfoS("Remove the agent from the BackendManager's default strategy", "agentID", agentID) + s.BackendManager.RemoveBackend(agentID, pkgagent.UID, conn) } } } @@ -327,29 +295,16 @@ func (s *ProxyServer) getFrontendsForBackendConn(agentID string, backend Backend // NewProxyServer creates a new ProxyServer instance func NewProxyServer(serverID string, proxyStrategies []ProxyStrategy, serverCount int, agentAuthenticationOptions *AgentTokenAuthenticationOptions, warnOnChannelLimit bool) *ProxyServer { - var bms []BackendManager - for _, ps := range proxyStrategies { - switch ps { - case ProxyStrategyDestHost: - bms = append(bms, NewDestHostBackendManager()) - case ProxyStrategyDefault: - bms = append(bms, NewDefaultBackendManager()) - case ProxyStrategyDefaultRoute: - bms = append(bms, NewDefaultRouteBackendManager()) - default: - klog.V(4).InfoS("Unknonw proxy strategy", "strategy", ps) - } - } - + bm := NewDefaultBackendManager(proxyStrategies) return &ProxyServer{ frontends: make(map[string](map[int64]*ProxyClientConnection)), PendingDial: NewPendingDialManager(), serverID: serverID, serverCount: serverCount, - BackendManagers: bms, + BackendManager: bm, AgentAuthenticationOptions: agentAuthenticationOptions, // use the first backend-manager as the Readiness Manager - Readiness: bms[0], + Readiness: bm, proxyStrategies: proxyStrategies, warnOnChannelLimit: warnOnChannelLimit, } diff --git a/tests/concurrent_client_request_test.go b/tests/concurrent_client_request_test.go index 786e61e8b..681c33a38 100644 --- a/tests/concurrent_client_request_test.go +++ b/tests/concurrent_client_request_test.go @@ -62,7 +62,7 @@ type singleTimeManager struct { used map[string]struct{} } -func (s *singleTimeManager) AddBackend(agentID string, _ pkgagent.IdentifierType, conn agent.AgentService_ConnectServer) server.Backend { +func (s *singleTimeManager) AddBackend(agentID string, _ pkgagent.Identifiers, conn agent.AgentService_ConnectServer) server.Backend { s.mu.Lock() defer s.mu.Unlock() s.backends[agentID] = conn @@ -124,7 +124,7 @@ func TestConcurrentClientRequest(t *testing.T) { t.Fatal(err) } defer cleanup() - ps.BackendManagers = []server.BackendManager{newSingleTimeGetter(server.NewDefaultBackendManager())} + ps.BackendManager = newSingleTimeGetter(server.NewDefaultBackendManager([]server.ProxyStrategy{server.ProxyStrategyDefault})) stopCh := make(chan struct{}) defer close(stopCh)