diff --git a/internal/proxy/envoy/xds/ads/pubsub.go b/internal/proxy/envoy/xds/ads/pubsub.go index d455a3c23..92140351e 100644 --- a/internal/proxy/envoy/xds/ads/pubsub.go +++ b/internal/proxy/envoy/xds/ads/pubsub.go @@ -6,6 +6,7 @@ type Message struct { NodeID string VersionInfo string Nonce string + TypeUrl string } type MessageChan chan Message diff --git a/internal/proxy/envoy/xds/ads/service.go b/internal/proxy/envoy/xds/ads/service.go index 1fa9a8f12..c41cbdba4 100644 --- a/internal/proxy/envoy/xds/ads/service.go +++ b/internal/proxy/envoy/xds/ads/service.go @@ -77,7 +77,7 @@ func (s Service) getCluster(rule rule.Rule) *cluster.Cluster { ClusterDiscoveryType: &cluster.Cluster_Type{ Type: cluster.Cluster_LOGICAL_DNS, }, - DnsLookupFamily: cluster.Cluster_V4_ONLY, + DnsLookupFamily: cluster.Cluster_V4_PREFERRED, Name: rule.Backend.Namespace, ConnectTimeout: durationpb.New(1 * time.Second), LoadAssignment: s.getEndpoint(rule), diff --git a/internal/proxy/envoy/xds/ads/stream.go b/internal/proxy/envoy/xds/ads/stream.go index 9098e5706..4a2311ed4 100644 --- a/internal/proxy/envoy/xds/ads/stream.go +++ b/internal/proxy/envoy/xds/ads/stream.go @@ -20,12 +20,8 @@ type DiscoveryResource struct { } type Client struct { - NodeID string - LatestVersionSent string - LatestVersionACK string - LatestNonceSent string - LatestNonceACK string - LastUpdated time.Time + NodeID string + LastUpdated time.Time } type Stream struct { @@ -45,10 +41,10 @@ func NewStream(logger log.Logger, refreshInterval time.Duration, stream xds.Aggr ctx: ctx, cancel: cancel, logger: logger, - refreshInterval: refreshInterval, stream: stream, - messageChan: make(MessageChan), services: services, + messageChan: make(MessageChan), + refreshInterval: refreshInterval, } } @@ -65,44 +61,31 @@ func (s Stream) Stream() error { if err == io.EOF { return } + if err != nil { + s.logger.Error(err.Error()) return } if in.ResponseNonce == "" { - versionInfo := strconv.FormatInt(time.Now().UnixNano(), 10) - nonce := strconv.FormatInt(time.Now().UnixNano(), 10) + s.logger.Info("received request on stream", "typeurl", in.TypeUrl) message := Message{ NodeID: in.Node.Id, - VersionInfo: versionInfo, - Nonce: nonce, + VersionInfo: strconv.FormatInt(time.Now().UnixNano(), 10), + Nonce: strconv.FormatInt(time.Now().UnixNano(), 10), + TypeUrl: in.TypeUrl, } s.messageChan.Push(message) s.client.LastUpdated = time.Now() - s.client.LatestVersionSent = versionInfo - s.client.LatestNonceSent = nonce if s.client.NodeID == "" { s.client.NodeID = in.Node.Id - s.PushUpdatePeriodically() + go s.PushUpdatePeriodically() } + } else if in.ErrorDetail == nil { + s.logger.Info("received ACK on stream", "typeurl", in.TypeUrl, "version_info", in.VersionInfo) } else { - if in.ResponseNonce == s.client.LatestNonceSent { - s.client.LatestVersionACK = in.VersionInfo - s.client.LatestNonceACK = in.ResponseNonce - s.logger.Info("received ACK on stream", in) - } else { - s.logger.Info("received NACK on stream", in.ErrorDetail) - nonce := strconv.FormatInt(time.Now().UnixNano(), 10) - message := Message{ - NodeID: s.client.NodeID, - VersionInfo: s.client.LatestVersionSent, - Nonce: nonce, - } - s.client.LatestNonceSent = nonce - s.messageChan.Push(message) - s.client.LastUpdated = time.Now() - } + s.logger.Info("received NACK on stream", "typeurl", in.TypeUrl, "version_info", in.VersionInfo, "error", in.ErrorDetail) } } } @@ -111,7 +94,7 @@ func (s Stream) Stream() error { go func() { for e := range s.messageChan { if err := s.streamResponses(e); err != nil { - s.logger.Debug("error while streaming response", err) + s.logger.Debug("error while streaming response", "error", err) } } }() @@ -136,53 +119,60 @@ func (s Stream) streamResponses(message Message) error { } } - // When using ADS we need to order responses. - // https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#eventual-consistency-considerations responseStream := NewResponseStream(s.stream, message.VersionInfo, message.Nonce) - if err := responseStream.StreamCDS(cfg.Clusters); err != nil { - return err - } - if err := responseStream.StreamLDS(cfg.Listeners); err != nil { - return err - } - if err := responseStream.StreamRDS(cfg.Routes); err != nil { - return err + switch message.TypeUrl { + case CLUSTER_TYPE_URL: + if err := responseStream.StreamCDS(cfg.Clusters); err != nil { + return err + } + case LISTENER_TYPE_URL: + if err := responseStream.StreamLDS(cfg.Listeners); err != nil { + return err + } + case ROUTER_TYPE_URL: + if err := responseStream.StreamRDS(cfg.Routes); err != nil { + return err + } + default: + if err := responseStream.StreamCDS(cfg.Clusters); err != nil { + return err + } + if err := responseStream.StreamLDS(cfg.Listeners); err != nil { + return err + } + if err := responseStream.StreamRDS(cfg.Routes); err != nil { + return err + } } return nil } func (s Stream) PushUpdatePeriodically() { - ticker := time.NewTicker(s.refreshInterval) - defer ticker.Stop() - service, ok := s.services[s.client.NodeID] if !ok { - s.logger.Debug("service not found for node id", s.client.NodeID) + s.logger.Debug("service not found", "node_id", s.client.NodeID) return } for { select { - case <-ticker.C: + case <-s.ctx.Done(): + return + default: + time.Sleep(s.refreshInterval) if service.IsUpdated(s.ctx, s.client.LastUpdated) { - s.logger.Debug("discovery resource update found") - versionInfo := strconv.FormatInt(time.Now().UnixNano(), 10) - nonce := strconv.FormatInt(time.Now().UnixNano(), 10) + s.logger.Debug("discovery resource update found", "node_id", s.client.NodeID) message := Message{ NodeID: s.client.NodeID, - VersionInfo: versionInfo, - Nonce: nonce, + VersionInfo: strconv.FormatInt(time.Now().UnixNano(), 10), + Nonce: strconv.FormatInt(time.Now().UnixNano(), 10), } s.messageChan.Push(message) - s.client.LatestVersionSent = versionInfo - s.client.LatestNonceSent = nonce s.client.LastUpdated = time.Now() } else { - s.logger.Debug("no discovery resource update") + s.logger.Debug("no discovery resource update", "node_id", s.client.NodeID) } - case <-s.ctx.Done(): - return } } }