Skip to content

Commit

Permalink
Merge branch 'main' into oauth_proxy_image
Browse files Browse the repository at this point in the history
  • Loading branch information
coleenquadros authored Aug 22, 2024
2 parents ba86953 + c088a5a commit 83510aa
Show file tree
Hide file tree
Showing 14 changed files with 1,244 additions and 738 deletions.
57 changes: 29 additions & 28 deletions collectors/metrics/pkg/forwarder/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ import (
"github.com/stolostron/multicluster-observability-operator/collectors/metrics/pkg/metricsclient"
"github.com/stolostron/multicluster-observability-operator/collectors/metrics/pkg/simulator"
"github.com/stolostron/multicluster-observability-operator/collectors/metrics/pkg/status"
statuslib "github.com/stolostron/multicluster-observability-operator/operators/pkg/status"
)

const (
failedStatusReportMsg = "Failed to report status"
uwlPromURL = "https://prometheus-user-workload.openshift-user-workload-monitoring.svc:9092"
)

type RuleMatcher interface {
Expand Down Expand Up @@ -98,7 +100,8 @@ type Worker struct {

status status.StatusReport

metrics *workerMetrics
metrics *workerMetrics
forwardFailures int
}

func CreateFromClient(cfg Config, metrics *workerMetrics, interval time.Duration, name string,
Expand Down Expand Up @@ -297,7 +300,9 @@ func New(cfg Config) (*Worker, error) {
}
w.recordingRules = recordingRules

s, err := status.New(logger)
standalone := os.Getenv("STANDALONE") == "true"
isUwl := strings.Contains(os.Getenv("FROM"), uwlPromURL)
s, err := status.New(logger, standalone, isUwl)
if err != nil {
return nil, fmt.Errorf("unable to create StatusReport: %w", err)
}
Expand Down Expand Up @@ -366,6 +371,21 @@ func (w *Worker) forward(ctx context.Context) error {
w.lock.Lock()
defer w.lock.Unlock()

updateStatus := func(reason statuslib.Reason, message string) {
if reason == statuslib.ForwardFailed {
w.forwardFailures += 1
if w.forwardFailures < 3 {
return
}
}

w.forwardFailures = 0

if err := w.status.UpdateStatus(ctx, reason, message); err != nil {
rlogger.Log(w.logger, rlogger.Warn, "msg", failedStatusReportMsg, "err", err)
}
}

var families []*clientmodel.MetricFamily
var err error
if w.simulatedTimeseriesFile != "" {
Expand All @@ -378,19 +398,13 @@ func (w *Worker) forward(ctx context.Context) error {
} else {
families, err = w.getFederateMetrics(ctx)
if err != nil {
statusErr := w.status.UpdateStatus(ctx, "Degraded", "Failed to retrieve metrics")
if statusErr != nil {
rlogger.Log(w.logger, rlogger.Warn, "msg", failedStatusReportMsg, "err", statusErr)
}
updateStatus(statuslib.ForwardFailed, "Failed to retrieve metrics")
return err
}

rfamilies, err := w.getRecordingMetrics(ctx)
if err != nil && len(rfamilies) == 0 {
statusErr := w.status.UpdateStatus(ctx, "Degraded", "Failed to retrieve recording metrics")
if statusErr != nil {
rlogger.Log(w.logger, rlogger.Warn, "msg", failedStatusReportMsg, "err", statusErr)
}
updateStatus(statuslib.ForwardFailed, "Failed to retrieve recording metrics")
return err
} else {
families = append(families, rfamilies...)
Expand All @@ -399,10 +413,7 @@ func (w *Worker) forward(ctx context.Context) error {

before := metricfamily.MetricsCount(families)
if err := metricfamily.Filter(families, w.transformer); err != nil {
statusErr := w.status.UpdateStatus(ctx, "Degraded", "Failed to filter metrics")
if statusErr != nil {
rlogger.Log(w.logger, rlogger.Warn, "msg", failedStatusReportMsg, "err", statusErr)
}
updateStatus(statuslib.ForwardFailed, "Failed to filter metrics")
return err
}

Expand All @@ -416,34 +427,24 @@ func (w *Worker) forward(ctx context.Context) error {

if len(families) == 0 {
rlogger.Log(w.logger, rlogger.Warn, "msg", "no metrics to send, doing nothing")
statusErr := w.status.UpdateStatus(ctx, "Available", "No metrics to send")
if statusErr != nil {
rlogger.Log(w.logger, rlogger.Warn, "msg", failedStatusReportMsg, "err", statusErr)
}
updateStatus(statuslib.ForwardSuccessful, "No metrics to send")
return nil
}

if w.to == nil {
rlogger.Log(w.logger, rlogger.Warn, "msg", "to is nil, doing nothing")
statusErr := w.status.UpdateStatus(ctx, "Available", "Metrics is not required to send")
if statusErr != nil {
rlogger.Log(w.logger, rlogger.Warn, "msg", failedStatusReportMsg, "err", statusErr)
}
updateStatus(statuslib.ForwardSuccessful, "Metrics is not required to send")
return nil
}

req := &http.Request{Method: "POST", URL: w.to}
if err := w.toClient.RemoteWrite(ctx, req, families, w.interval); err != nil {
if err := w.status.UpdateStatus(ctx, "Degraded", "Failed to send metrics"); err != nil {
rlogger.Log(w.logger, rlogger.Warn, "msg", failedStatusReportMsg, "err", err)
}
updateStatus(statuslib.ForwardFailed, "Failed to send metrics")
return err
}

if w.simulatedTimeseriesFile == "" {
if err := w.status.UpdateStatus(ctx, "Available", "Cluster metrics sent successfully"); err != nil {
rlogger.Log(w.logger, rlogger.Warn, "msg", failedStatusReportMsg, "err", err)
}
updateStatus(statuslib.ForwardSuccessful, "Cluster metrics sent successfully")
} else {
rlogger.Log(w.logger, rlogger.Warn, "msg", "Simulated metrics sent successfully")
}
Expand Down
147 changes: 31 additions & 116 deletions collectors/metrics/pkg/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,39 +7,35 @@ package status
import (
"context"
"errors"
"fmt"
"log/slog"
"os"
"slices"
"sort"
"strings"
"time"

"github.com/go-kit/log"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"github.com/go-logr/logr"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

oav1beta1 "github.com/stolostron/multicluster-observability-operator/operators/multiclusterobservability/api/v1beta1"
"github.com/stolostron/multicluster-observability-operator/operators/pkg/status"
)

const (
name = "observability-addon"
namespace = "open-cluster-management-addon-observability"
uwlPromURL = "https://prometheus-user-workload.openshift-user-workload-monitoring.svc:9092"
addonName = "observability-addon"
addonNamespace = "open-cluster-management-addon-observability"
)

type StatusReport struct {
statusClient client.Client
logger log.Logger
statusClient client.Client
standalone bool
isUwl bool
statusReporter status.Status
logger log.Logger
}

func New(logger log.Logger) (*StatusReport, error) {
func New(logger log.Logger, standalone, isUwl bool) (*StatusReport, error) {
testMode := os.Getenv("UNIT_TEST") != ""
standaloneMode := os.Getenv("STANDALONE") == "true"
var kubeClient client.Client
if testMode {
s := scheme.Scheme
Expand All @@ -50,8 +46,6 @@ func New(logger log.Logger) (*StatusReport, error) {
WithScheme(s).
WithStatusSubresource(&oav1beta1.ObservabilityAddon{}).
Build()
} else if standaloneMode {
kubeClient = nil
} else {
config, err := clientcmd.BuildConfigFromFlags("", "")
if err != nil {
Expand All @@ -67,114 +61,35 @@ func New(logger log.Logger) (*StatusReport, error) {
}
}

logger.Log("msg", "Creating status client", "standalone", standalone, "isUwl", isUwl)

statusLogger := logr.FromSlogHandler(slog.New(slog.NewTextHandler(os.Stdout, nil)).With("component", "statusclient").Handler())
return &StatusReport{
statusClient: kubeClient,
logger: log.With(logger, "component", "statusclient"),
statusClient: kubeClient,
standalone: standalone,
isUwl: isUwl,
statusReporter: status.NewStatus(kubeClient, addonName, addonNamespace, statusLogger),
logger: logger,
}, nil
}

func (s *StatusReport) UpdateStatus(ctx context.Context, t string, m string) error {
// statusClient is nil when running on the hub.
if s.statusClient == nil {
return nil
}

isUwl := false
if strings.Contains(os.Getenv("FROM"), uwlPromURL) {
isUwl = true
}

retryErr := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
addon := &oav1beta1.ObservabilityAddon{}
err := s.statusClient.Get(ctx, types.NamespacedName{
Name: name,
Namespace: namespace,
}, addon)
if err != nil {
return fmt.Errorf("failed to get ObservabilityAddon %s/%s: %w", namespace, name, err)
}

// Sort the conditions by rising LastTransitionTime
sort.Slice(addon.Status.Conditions, func(i, j int) bool {
return addon.Status.Conditions[i].LastTransitionTime.Before(&addon.Status.Conditions[j].LastTransitionTime)
})

currentCondition := addon.Status.Conditions[len(addon.Status.Conditions)-1]
newCondition := mergeCondtion(isUwl, m, currentCondition)

// If the current condition is the same, do not update
if currentCondition.Type == newCondition.Type && currentCondition.Reason == newCondition.Reason && currentCondition.Message == newCondition.Message && currentCondition.Status == newCondition.Status {
return nil
}

s.logger.Log("msg", fmt.Sprintf("Updating status of ObservabilityAddon %s/%s", namespace, name), "type", newCondition.Type, "status", newCondition.Status, "reason", newCondition.Reason)

// Reset the status of other main conditions
for i := range addon.Status.Conditions {
if slices.Contains([]string{"Available", "Degraded", "Progressing"}, addon.Status.Conditions[i].Type) {
addon.Status.Conditions[i].Status = metav1.ConditionFalse
}
}

// Set the new condition
addon.Status.Conditions = mutateOrAppend(addon.Status.Conditions, newCondition)

if err := s.statusClient.Status().Update(ctx, addon); err != nil {
return fmt.Errorf("failed to update ObservabilityAddon %s/%s: %w", namespace, name, err)
}

func (s *StatusReport) UpdateStatus(ctx context.Context, reason status.Reason, message string) error {
// Standalone mode is set when running on the hub cluster
// In this case, we do not need to update the status of the ObservabilityAddon
if s.standalone {
return nil
})
if retryErr != nil {
return retryErr
}
return nil
}

func mergeCondtion(isUwl bool, m string, condition oav1beta1.StatusCondition) oav1beta1.StatusCondition {
messages := strings.Split(condition.Message, " ; ")
if len(messages) == 1 {
messages = append(messages, "")
}
if isUwl {
messages[1] = fmt.Sprintf("User Workload: %s", m)
} else {
messages[0] = m
component := status.MetricsCollector
if s.isUwl {
component = status.UwlMetricsCollector
}
message := messages[0]
if messages[1] != "" {
message = strings.Join(messages, " ; ")
}
conditionType := "Available"
reason := "Available"
if strings.Contains(message, "Failed") {
conditionType = "Degraded"
reason = "Degraded"
}
return oav1beta1.StatusCondition{
Type: conditionType,
Status: metav1.ConditionTrue,
Reason: reason,
Message: message,
LastTransitionTime: metav1.NewTime(time.Now()),
}
}

// mutateOrAppend updates the status conditions with the new condition.
// If the condition already exists, it updates it with the new condition.
// If the condition does not exist, it appends the new condition to the status conditions.
func mutateOrAppend(conditions []oav1beta1.StatusCondition, newCondition oav1beta1.StatusCondition) []oav1beta1.StatusCondition {
if len(conditions) == 0 {
return []oav1beta1.StatusCondition{newCondition}
if wasReported, err := s.statusReporter.UpdateComponentCondition(ctx, component, reason, message); err != nil {
return err
} else if wasReported {
s.logger.Log("msg", "Status updated", "component", component, "reason", reason, "message", message)
}

for i, condition := range conditions {
if condition.Type == newCondition.Type {
// Update the existing condition
conditions[i] = newCondition
return conditions
}
}
// If the condition type does not exist, append the new condition
return append(conditions, newCondition)
return nil
}
Loading

0 comments on commit 83510aa

Please sign in to comment.