From 448cddadfab25c9bd0f52657830d2ca15f4cc1e8 Mon Sep 17 00:00:00 2001 From: Gerard Nguyen Date: Wed, 2 Oct 2024 12:45:06 +1000 Subject: [PATCH 01/31] wip --- pkg/supportbundle/collect.go | 381 ++++++++++++++++++++++++----- pkg/supportbundle/supportbundle.go | 13 + 2 files changed, 334 insertions(+), 60 deletions(-) diff --git a/pkg/supportbundle/collect.go b/pkg/supportbundle/collect.go index a839ca0d9..19f14efc2 100644 --- a/pkg/supportbundle/collect.go +++ b/pkg/supportbundle/collect.go @@ -8,6 +8,7 @@ import ( "io" "os" "reflect" + "sync" "github.com/pkg/errors" analyze "github.com/replicatedhq/troubleshoot/pkg/analyze" @@ -19,74 +20,25 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" + "k8s.io/utils/ptr" ) func runHostCollectors(ctx context.Context, hostCollectors []*troubleshootv1beta2.HostCollect, additionalRedactors *troubleshootv1beta2.Redactor, bundlePath string, opts SupportBundleCreateOpts) (collect.CollectorResult, error) { - collectSpecs := make([]*troubleshootv1beta2.HostCollect, 0) - collectSpecs = append(collectSpecs, hostCollectors...) - allCollectedData := make(map[string][]byte) + var collectResult map[string][]byte - var collectors []collect.HostCollector - for _, desiredCollector := range collectSpecs { - collector, ok := collect.GetHostCollector(desiredCollector, bundlePath) - if ok { - collectors = append(collectors, collector) - } + if opts.RunHostCollectorsInPod { + collectResult = runRemoteHostCollectors(ctx, hostCollectors, bundlePath, opts) + } else { + collectResult = runLocalHostCollectors(ctx, hostCollectors, bundlePath, opts) } - for _, collector := range collectors { - // TODO: Add context to host collectors - _, span := otel.Tracer(constants.LIB_TRACER_NAME).Start(ctx, collector.Title()) - span.SetAttributes(attribute.String("type", reflect.TypeOf(collector).String())) - - isExcluded, _ := collector.IsExcluded() - if isExcluded { - opts.ProgressChan <- fmt.Sprintf("[%s] Excluding host collector", collector.Title()) - span.SetAttributes(attribute.Bool(constants.EXCLUDED, true)) - span.End() - continue - } - - opts.ProgressChan <- fmt.Sprintf("[%s] Running host collector...", collector.Title()) - if opts.RunHostCollectorsInPod { - result, err := collector.RemoteCollect(opts.ProgressChan) - if err != nil { - // If the collector does not have a remote collector implementation, try to run it locally - if errors.Is(err, collect.ErrRemoteCollectorNotImplemented) { - result, err = collector.Collect(opts.ProgressChan) - if err != nil { - span.SetStatus(codes.Error, err.Error()) - opts.ProgressChan <- errors.Errorf("failed to run host collector: %s: %v", collector.Title(), err) - } - } else { - // If the collector has a remote collector implementation, but it failed to run, return the error - span.SetStatus(codes.Error, err.Error()) - opts.ProgressChan <- errors.Errorf("failed to run host collector: %s: %v", collector.Title(), err) - } - } - // If the collector has a remote collector implementation, and it ran successfully, return the result - span.End() - for k, v := range result { - allCollectedData[k] = v - } - } else { - // If the collector does not enable run host collectors in pod, run it locally - result, err := collector.Collect(opts.ProgressChan) - if err != nil { - span.SetStatus(codes.Error, err.Error()) - opts.ProgressChan <- errors.Errorf("failed to run host collector: %s: %v", collector.Title(), err) - } - span.End() - for k, v := range result { - allCollectedData[k] = v - } - } - } - - collectResult := allCollectedData - + // redact result if any globalRedactors := []*troubleshootv1beta2.Redact{} if additionalRedactors != nil { globalRedactors = additionalRedactors.Spec.Redactors @@ -253,3 +205,312 @@ func getAnalysisFile(analyzeResults []*analyze.AnalyzeResult) (io.Reader, error) return bytes.NewBuffer(analysis), nil } + +func runLocalHostCollectors(ctx context.Context, hostCollectors []*troubleshootv1beta2.HostCollect, bundlePath string, opts SupportBundleCreateOpts) map[string][]byte { + collectSpecs := make([]*troubleshootv1beta2.HostCollect, 0) + collectSpecs = append(collectSpecs, hostCollectors...) + + allCollectedData := make(map[string][]byte) + + var collectors []collect.HostCollector + for _, desiredCollector := range collectSpecs { + collector, ok := collect.GetHostCollector(desiredCollector, bundlePath) + if ok { + collectors = append(collectors, collector) + } + } + + for _, collector := range collectors { + // TODO: Add context to host collectors + _, span := otel.Tracer(constants.LIB_TRACER_NAME).Start(ctx, collector.Title()) + span.SetAttributes(attribute.String("type", reflect.TypeOf(collector).String())) + + isExcluded, _ := collector.IsExcluded() + if isExcluded { + opts.ProgressChan <- fmt.Sprintf("[%s] Excluding host collector", collector.Title()) + span.SetAttributes(attribute.Bool(constants.EXCLUDED, true)) + span.End() + continue + } + + opts.ProgressChan <- fmt.Sprintf("[%s] Running host collector...", collector.Title()) + result, err := collector.Collect(opts.ProgressChan) + if err != nil { + span.SetStatus(codes.Error, err.Error()) + opts.ProgressChan <- errors.Errorf("failed to run host collector: %s: %v", collector.Title(), err) + } + span.End() + for k, v := range result { + allCollectedData[k] = v + } + } + + return allCollectedData +} + +func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshootv1beta2.HostCollect, bundlePath string, opts SupportBundleCreateOpts) map[string][]byte { + // TODO: verify that we have access to the cluster + + // convert host collectors into a HostCollector spec + spec := createHostCollectorsSpec(hostCollectors) + specJSON, err := convertHostCollectorSpecToJSON(spec) + if err != nil { + // TODO: error handling + return nil + } + klog.V(2).Infof("HostCollector spec: %s", specJSON) + + clientset, err := kubernetes.NewForConfig(opts.KubernetesRestConfig) + if err != nil { + // TODO: error handling + return nil + } + + // create a config map for the HostCollector spec + cm, err := createHostCollectorConfigMap(ctx, clientset, specJSON) + if err != nil { + // TODO: error handling + return nil + } + klog.V(2).Infof("Created Remote Host Collector ConfigMap %s", cm.Name) + + nodeList, err := getNodeList(clientset, opts) + if err != nil { + // TODO: error handling + return nil + } + klog.V(2).Infof("Node list to run remote host collectors: %s", nodeList.Nodes) + + // create remote pod for each node + labels := map[string]string{ + "troubleshoot.sh/remote-collector": "true", + } + + var wg sync.WaitGroup + var mu sync.Mutex + nodeLogs := make(map[string][]byte) + + for _, node := range nodeList.Nodes { + wg.Add(1) + go func(node string) { + defer wg.Done() + + // TODO: set timeout waiting + + // create a remote pod spec to run the host collectors + nodeSelector := map[string]string{ + "kubernetes.io/hostname": node, + } + pod, err := createHostCollectorPod(ctx, clientset, cm.Name, nodeSelector, labels) + if err != nil { + // TODO: error handling + return + } + klog.V(2).Infof("Created Remote Host Collector Pod %s", pod.Name) + + // wait for the pod to complete + err = waitForPodCompletion(ctx, clientset, pod) + if err != nil { + // TODO: error handling + return + } + + // extract logs from the pod + var logs []byte + logs, err = getPodLogs(ctx, clientset, pod) + if err != nil { + // TODO: error handling + return + } + + mu.Lock() + nodeLogs[node] = logs + mu.Unlock() + + }(node) + } + wg.Wait() + + klog.V(2).Infof("All remote host collectors completed") + + defer func() { + // delete the config map + // delete the remote pods + }() + + // aggregate results + output := collect.NewResult() + for node, logs := range nodeLogs { + var nodeResult map[string]string + if err := json.Unmarshal(logs, &nodeResult); err != nil { + // TODO: error handling + return nil + } + for file, data := range nodeResult { + err := output.SaveResult(bundlePath, fmt.Sprintf("host-collectors/%s/%s", node, file), bytes.NewBufferString(data)) + if err != nil { + // TODO: error handling + return nil + } + } + } + + return output +} + +func createHostCollectorsSpec(hostCollectors []*troubleshootv1beta2.HostCollect) *troubleshootv1beta2.HostCollector { + return &troubleshootv1beta2.HostCollector{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "troubleshoot.sh/v1beta2", + Kind: "HostCollector", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "remoteHostCollector", + }, + Spec: troubleshootv1beta2.HostCollectorSpec{ + Collectors: hostCollectors, + }, + } +} + +func convertHostCollectorSpecToJSON(spec *troubleshootv1beta2.HostCollector) (string, error) { + jsonData, err := json.Marshal(spec) + if err != nil { + return "", errors.Wrap(err, "failed to marshal Host Collector spec") + } + return string(jsonData), nil +} + +func createHostCollectorConfigMap(ctx context.Context, clientset kubernetes.Interface, spec string) (*corev1.ConfigMap, error) { + // TODO: configurable namespaces? + ns := "default" + + data := map[string]string{ + "collector.json": spec, + } + + cm := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "remote-host-collector-specs-", + Namespace: ns, + }, + Data: data, + } + + createdConfigMap, err := clientset.CoreV1().ConfigMaps(ns).Create(ctx, cm, metav1.CreateOptions{}) + if err != nil { + return nil, errors.Wrap(err, "failed to create Remote Host Collector Spec ConfigMap") + } + + return createdConfigMap, nil +} + +func createHostCollectorPod(ctx context.Context, clientset kubernetes.Interface, specConfigMap string, nodeSelector map[string]string, labels map[string]string) (*corev1.Pod, error) { + ns := "default" + imageName := "replicated/troubleshoot:latest" + imagePullPolicy := corev1.PullAlways + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "remote-host-collector-", + Namespace: ns, + Labels: labels, + }, + Spec: corev1.PodSpec{ + NodeSelector: nodeSelector, + HostNetwork: true, + HostPID: true, + HostIPC: true, + RestartPolicy: corev1.RestartPolicyNever, + Containers: []corev1.Container{ + { + Image: imageName, + ImagePullPolicy: imagePullPolicy, + Name: "remote-collector", + Command: []string{"/bin/bash", "-c"}, + Args: []string{ + `cp /troubleshoot/collect /host/collect && + cp /troubleshoot/specs/collector.json /host/collector.json && + chroot /host /bin/bash -c './collect --collect-without-permissions --format=raw collector.json'`, + }, + SecurityContext: &corev1.SecurityContext{ + Privileged: ptr.To(true), + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "collector", + MountPath: "/troubleshoot/specs", + ReadOnly: true, + }, + { + Name: "host-root", + MountPath: "/host", + }, + }, + }, + }, + Volumes: []corev1.Volume{ + { + Name: "collector", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: specConfigMap, + }, + }, + }, + }, + { + Name: "host-root", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: "/", + }, + }, + }, + }, + }, + } + + createdPod, err := clientset.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{}) + if err != nil { + return nil, errors.Wrap(err, "failed to create Remote Host Collector Pod") + } + + return createdPod, nil +} + +func waitForPodCompletion(ctx context.Context, clientset kubernetes.Interface, pod *corev1.Pod) error { + watcher, err := clientset.CoreV1().Pods(pod.Namespace).Watch(ctx, metav1.ListOptions{ + FieldSelector: fmt.Sprintf("metadata.name=%s", pod.Name), + }) + if err != nil { + return err + } + defer watcher.Stop() + + for event := range watcher.ResultChan() { + podEvent, ok := event.Object.(*v1.Pod) + if !ok { + continue + } + if podEvent.Status.Phase == v1.PodSucceeded || podEvent.Status.Phase == v1.PodFailed { + return nil + } + } + return fmt.Errorf("pod %s did not complete", pod.Name) +} + +func getPodLogs(ctx context.Context, clientset kubernetes.Interface, pod *corev1.Pod) ([]byte, error) { + podLogOpts := corev1.PodLogOptions{ + Container: pod.Spec.Containers[0].Name, + } + req := clientset.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &podLogOpts) + logs, err := req.Stream(ctx) + if err != nil { + return nil, errors.Wrap(err, "failed to get log stream") + } + defer logs.Close() + + return io.ReadAll(logs) +} diff --git a/pkg/supportbundle/supportbundle.go b/pkg/supportbundle/supportbundle.go index d85742c45..fdab27fa3 100644 --- a/pkg/supportbundle/supportbundle.go +++ b/pkg/supportbundle/supportbundle.go @@ -23,6 +23,7 @@ import ( "github.com/replicatedhq/troubleshoot/pkg/convert" "github.com/replicatedhq/troubleshoot/pkg/version" "go.opentelemetry.io/otel" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -328,8 +329,20 @@ func getNodeList(clientset kubernetes.Interface, opts SupportBundleCreateOpts) ( nodeList := NodeList{} for _, node := range nodes.Items { + if isMasterNode(node) { + continue + } nodeList.Nodes = append(nodeList.Nodes, node.Name) } return &nodeList, nil } + +func isMasterNode(node v1.Node) bool { + for label := range node.Labels { + if label == "node-role.kubernetes.io/master" || label == "node-role.kubernetes.io/control-plane" { + return true + } + } + return false +} From 328a817a7155826ae40e4554819b7b2de5ea8a04 Mon Sep 17 00:00:00 2001 From: Gerard Nguyen Date: Wed, 2 Oct 2024 13:06:25 +1000 Subject: [PATCH 02/31] wip --- pkg/supportbundle/collect.go | 29 ++++++++++++++++++++++------- pkg/supportbundle/supportbundle.go | 21 --------------------- 2 files changed, 22 insertions(+), 28 deletions(-) diff --git a/pkg/supportbundle/collect.go b/pkg/supportbundle/collect.go index 19f14efc2..238336542 100644 --- a/pkg/supportbundle/collect.go +++ b/pkg/supportbundle/collect.go @@ -8,6 +8,7 @@ import ( "io" "os" "reflect" + "strings" "sync" "github.com/pkg/errors" @@ -249,7 +250,7 @@ func runLocalHostCollectors(ctx context.Context, hostCollectors []*troubleshootv } func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshootv1beta2.HostCollect, bundlePath string, opts SupportBundleCreateOpts) map[string][]byte { - // TODO: verify that we have access to the cluster + output := collect.NewResult() // convert host collectors into a HostCollector spec spec := createHostCollectorsSpec(hostCollectors) @@ -266,20 +267,20 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot return nil } - // create a config map for the HostCollector spec - cm, err := createHostCollectorConfigMap(ctx, clientset, specJSON) + nodeList, err := getNodeList(clientset, opts) if err != nil { // TODO: error handling return nil } - klog.V(2).Infof("Created Remote Host Collector ConfigMap %s", cm.Name) + klog.V(2).Infof("Node list to run remote host collectors: %s", nodeList.Nodes) - nodeList, err := getNodeList(clientset, opts) + // create a config map for the HostCollector spec + cm, err := createHostCollectorConfigMap(ctx, clientset, specJSON) if err != nil { // TODO: error handling return nil } - klog.V(2).Infof("Node list to run remote host collectors: %s", nodeList.Nodes) + klog.V(2).Infof("Created Remote Host Collector ConfigMap %s", cm.Name) // create remote pod for each node labels := map[string]string{ @@ -334,12 +335,12 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot klog.V(2).Infof("All remote host collectors completed") defer func() { + // TODO: // delete the config map // delete the remote pods }() // aggregate results - output := collect.NewResult() for node, logs := range nodeLogs { var nodeResult map[string]string if err := json.Unmarshal(logs, &nodeResult); err != nil { @@ -347,6 +348,8 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot return nil } for file, data := range nodeResult { + // trim host-collectors/ prefix + file = strings.TrimPrefix(file, "host-collectors/") err := output.SaveResult(bundlePath, fmt.Sprintf("host-collectors/%s/%s", node, file), bytes.NewBufferString(data)) if err != nil { // TODO: error handling @@ -355,6 +358,18 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot } } + // save node list to bundle for analyzer to use later + nodeListBytes, err := json.MarshalIndent(nodeList, "", " ") + if err != nil { + // TODO: error handling + return nil + } + err = output.SaveResult(bundlePath, constants.NODE_LIST_FILE, bytes.NewBuffer(nodeListBytes)) + if err != nil { + // TODO: error handling + return nil + } + return output } diff --git a/pkg/supportbundle/supportbundle.go b/pkg/supportbundle/supportbundle.go index fdab27fa3..3465cea97 100644 --- a/pkg/supportbundle/supportbundle.go +++ b/pkg/supportbundle/supportbundle.go @@ -3,7 +3,6 @@ package supportbundle import ( "bytes" "context" - "encoding/json" "fmt" "net/http" "os" @@ -119,26 +118,6 @@ func CollectSupportBundleFromSpec( root.End() }() - // only create a node list if we are running host collectors in a pod - if opts.RunHostCollectorsInPod { - clientset, err := kubernetes.NewForConfig(opts.KubernetesRestConfig) - if err != nil { - return nil, errors.Wrap(err, "failed to create kubernetes clientset to run host collectors in pod") - } - nodeList, err := getNodeList(clientset, opts) - if err != nil { - return nil, errors.Wrap(err, "failed to get remote node list") - } - nodeListBytes, err := json.MarshalIndent(nodeList, "", " ") - if err != nil { - return nil, errors.Wrap(err, "failed to marshal remote node list") - } - err = result.SaveResult(bundlePath, constants.NODE_LIST_FILE, bytes.NewBuffer(nodeListBytes)) - if err != nil { - return nil, errors.Wrap(err, "failed to write remote node list") - } - } - // Cache error returned by collectors and return it at the end of the function // so as to have a chance to run analyzers and archive the support bundle after. // If both host and in cluster collectors fail, the errors will be wrapped From e92157797ead38251181bb52062c8673ddc684b9 Mon Sep 17 00:00:00 2001 From: Gerard Nguyen Date: Thu, 3 Oct 2024 15:05:17 +1000 Subject: [PATCH 03/31] test log stream --- pkg/supportbundle/collect.go | 89 ++++++++++++++++++++++++++++++++++-- 1 file changed, 86 insertions(+), 3 deletions(-) diff --git a/pkg/supportbundle/collect.go b/pkg/supportbundle/collect.go index 238336542..f8afdcc18 100644 --- a/pkg/supportbundle/collect.go +++ b/pkg/supportbundle/collect.go @@ -1,6 +1,7 @@ package supportbundle import ( + "bufio" "bytes" "context" "encoding/json" @@ -10,6 +11,7 @@ import ( "reflect" "strings" "sync" + "time" "github.com/pkg/errors" analyze "github.com/replicatedhq/troubleshoot/pkg/analyze" @@ -267,6 +269,8 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot return nil } + // TODO: rbac check + nodeList, err := getNodeList(clientset, opts) if err != nil { // TODO: error handling @@ -309,6 +313,8 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot } klog.V(2).Infof("Created Remote Host Collector Pod %s", pod.Name) + go streamPodLogs(ctx, clientset, pod, node, opts) + // wait for the pod to complete err = waitForPodCompletion(ctx, clientset, pod) if err != nil { @@ -324,6 +330,9 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot return } + // wait for log stream to catch up + time.Sleep(1 * time.Second) + mu.Lock() nodeLogs[node] = logs mu.Unlock() @@ -446,7 +455,7 @@ func createHostCollectorPod(ctx context.Context, clientset kubernetes.Interface, Args: []string{ `cp /troubleshoot/collect /host/collect && cp /troubleshoot/specs/collector.json /host/collector.json && - chroot /host /bin/bash -c './collect --collect-without-permissions --format=raw collector.json'`, + chroot /host /bin/bash -c './collect --collect-without-permissions --format=raw -v=5 collector.json 2>collector.log'`, }, SecurityContext: &corev1.SecurityContext{ Privileged: ptr.To(true), @@ -463,6 +472,18 @@ func createHostCollectorPod(ctx context.Context, clientset kubernetes.Interface, }, }, }, + { + Image: "busybox", + Name: "log-tailer", + Command: []string{"sh", "-c"}, + Args: []string{"tail -F /host/collector.log"}, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "host-root", + MountPath: "/host", + }, + }, + }, }, Volumes: []corev1.Volume{ { @@ -509,8 +530,15 @@ func waitForPodCompletion(ctx context.Context, clientset kubernetes.Interface, p if !ok { continue } - if podEvent.Status.Phase == v1.PodSucceeded || podEvent.Status.Phase == v1.PodFailed { - return nil + for _, containerStatus := range podEvent.Status.ContainerStatuses { + if containerStatus.Name == "remote-collector" { + if containerStatus.State.Terminated != nil { + if containerStatus.State.Terminated.ExitCode == 0 { + return nil + } + return fmt.Errorf("container %s in pod %s failed with exit code %d", containerStatus.Name, pod.Name, containerStatus.State.Terminated.ExitCode) + } + } } } return fmt.Errorf("pod %s did not complete", pod.Name) @@ -529,3 +557,58 @@ func getPodLogs(ctx context.Context, clientset kubernetes.Interface, pod *corev1 return io.ReadAll(logs) } + +func streamPodLogs(ctx context.Context, clientset kubernetes.Interface, pod *corev1.Pod, node string, opts SupportBundleCreateOpts) { + + // todo: timeout + + send := func(msg string) { + opts.ProgressChan <- fmt.Sprintf("[%s] %s", node, msg) + } + + // wait for pod container log-tailer to start + watcher, err := clientset.CoreV1().Pods(pod.Namespace).Watch(ctx, metav1.ListOptions{ + FieldSelector: fmt.Sprintf("metadata.name=%s", pod.Name), + }) + if err != nil { + send(errors.Wrap(err, "failed to start pod watcher").Error()) + return + } + defer watcher.Stop() + + for event := range watcher.ResultChan() { + podEvent, ok := event.Object.(*corev1.Pod) + if !ok { + continue + } + for _, containerStatus := range podEvent.Status.ContainerStatuses { + if containerStatus.Name == "log-tailer" { + if containerStatus.State.Running != nil { + goto StartLogStream + } + } + } + } + +StartLogStream: + // stream logs from container named log-tailer in the pod + podLogOpts := corev1.PodLogOptions{ + Container: "log-tailer", + Follow: true, + } + req := clientset.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &podLogOpts) + logs, err := req.Stream(ctx) + if err != nil { + send(errors.Wrap(err, "failed to get log stream").Error()) + return + } + defer logs.Close() + scanner := bufio.NewScanner(logs) + for scanner.Scan() { + send(scanner.Text()) + } + if err := scanner.Err(); err != nil { + send(errors.Wrap(err, "failed to read log stream").Error()) + } + send("Log stream ended") +} From c2662c288b4e01d5434b0a1758790c99bd90a8af Mon Sep 17 00:00:00 2001 From: hedge-sparrow Date: Thu, 31 Oct 2024 11:53:41 +0000 Subject: [PATCH 04/31] wip --- pkg/supportbundle/collect.go | 146 +++++++++++++++++++++-------------- 1 file changed, 90 insertions(+), 56 deletions(-) diff --git a/pkg/supportbundle/collect.go b/pkg/supportbundle/collect.go index f8afdcc18..289b9c45d 100644 --- a/pkg/supportbundle/collect.go +++ b/pkg/supportbundle/collect.go @@ -26,7 +26,10 @@ import ( corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/remotecommand" "k8s.io/klog/v2" "k8s.io/utils/ptr" ) @@ -251,6 +254,50 @@ func runLocalHostCollectors(ctx context.Context, hostCollectors []*troubleshootv return allCollectedData } +func getExecOutputs( + ctx context.Context, clientConfig *rest.Config, client *kubernetes.Clientset, pod corev1.Pod, execCollector *troubleshootv1beta2.HostCollect, +) ([]byte, []byte, []string) { + container := pod.Spec.Containers[0].Name + + req := client.CoreV1().RESTClient().Post().Resource("pods").Name(pod.Name).Namespace(pod.Namespace).SubResource("exec") + scheme := runtime.NewScheme() + if err := corev1.AddToScheme(scheme); err != nil { + return nil, nil, []string{err.Error()} + } + + parameterCodec := runtime.NewParameterCodec(scheme) + req.VersionedParams(&corev1.PodExecOptions{ + Command: []string{"/troubleshoot/collect", "-"}, + Container: container, + Stdin: true, + Stdout: false, + Stderr: true, + TTY: false, + }, parameterCodec) + + exec, err := remotecommand.NewSPDYExecutor(clientConfig, "POST", req.URL()) + if err != nil { + return nil, nil, []string{err.Error()} + } + + stdin := new(bytes.Buffer) + stdout := new(bytes.Buffer) + stderr := new(bytes.Buffer) + + err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{ + Stdin: stdin, + Stdout: stdout, + Stderr: stderr, + Tty: false, + }) + + if err != nil { + return stdout.Bytes(), stderr.Bytes(), []string{err.Error()} + } + + return stdout.Bytes(), stderr.Bytes(), nil +} + func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshootv1beta2.HostCollect, bundlePath string, opts SupportBundleCreateOpts) map[string][]byte { output := collect.NewResult() @@ -278,14 +325,6 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot } klog.V(2).Infof("Node list to run remote host collectors: %s", nodeList.Nodes) - // create a config map for the HostCollector spec - cm, err := createHostCollectorConfigMap(ctx, clientset, specJSON) - if err != nil { - // TODO: error handling - return nil - } - klog.V(2).Infof("Created Remote Host Collector ConfigMap %s", cm.Name) - // create remote pod for each node labels := map[string]string{ "troubleshoot.sh/remote-collector": "true", @@ -306,29 +345,30 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot nodeSelector := map[string]string{ "kubernetes.io/hostname": node, } - pod, err := createHostCollectorPod(ctx, clientset, cm.Name, nodeSelector, labels) + pod, err := createHostCollectorPod(ctx, clientset, nodeSelector, labels) if err != nil { // TODO: error handling return } klog.V(2).Infof("Created Remote Host Collector Pod %s", pod.Name) - go streamPodLogs(ctx, clientset, pod, node, opts) + // wait for the pod to be running + // err = waitForPodRunning(ctx, clientset, pod) + // if err != nil { + // // TODO: error handling + // return + // } + time.Sleep(10 * time.Second) - // wait for the pod to complete - err = waitForPodCompletion(ctx, clientset, pod) - if err != nil { - // TODO: error handling - return - } + getExecOutputs() - // extract logs from the pod - var logs []byte - logs, err = getPodLogs(ctx, clientset, pod) - if err != nil { - // TODO: error handling - return - } + // // extract logs from the pod + // var logs []byte + // logs, err = getPodLogs(ctx, clientset, pod) + // if err != nil { + // // TODO: error handling + // return + // } // wait for log stream to catch up time.Sleep(1 * time.Second) @@ -429,7 +469,7 @@ func createHostCollectorConfigMap(ctx context.Context, clientset kubernetes.Inte return createdConfigMap, nil } -func createHostCollectorPod(ctx context.Context, clientset kubernetes.Interface, specConfigMap string, nodeSelector map[string]string, labels map[string]string) (*corev1.Pod, error) { +func createHostCollectorPod(ctx context.Context, clientset kubernetes.Interface, nodeSelector map[string]string, labels map[string]string) (*corev1.Pod, error) { ns := "default" imageName := "replicated/troubleshoot:latest" imagePullPolicy := corev1.PullAlways @@ -452,31 +492,10 @@ func createHostCollectorPod(ctx context.Context, clientset kubernetes.Interface, ImagePullPolicy: imagePullPolicy, Name: "remote-collector", Command: []string{"/bin/bash", "-c"}, - Args: []string{ - `cp /troubleshoot/collect /host/collect && - cp /troubleshoot/specs/collector.json /host/collector.json && - chroot /host /bin/bash -c './collect --collect-without-permissions --format=raw -v=5 collector.json 2>collector.log'`, - }, + Args: []string{"while true; do sleep 30; done;"}, SecurityContext: &corev1.SecurityContext{ Privileged: ptr.To(true), }, - VolumeMounts: []corev1.VolumeMount{ - { - Name: "collector", - MountPath: "/troubleshoot/specs", - ReadOnly: true, - }, - { - Name: "host-root", - MountPath: "/host", - }, - }, - }, - { - Image: "busybox", - Name: "log-tailer", - Command: []string{"sh", "-c"}, - Args: []string{"tail -F /host/collector.log"}, VolumeMounts: []corev1.VolumeMount{ { Name: "host-root", @@ -486,16 +505,6 @@ func createHostCollectorPod(ctx context.Context, clientset kubernetes.Interface, }, }, Volumes: []corev1.Volume{ - { - Name: "collector", - VolumeSource: corev1.VolumeSource{ - ConfigMap: &corev1.ConfigMapVolumeSource{ - LocalObjectReference: corev1.LocalObjectReference{ - Name: specConfigMap, - }, - }, - }, - }, { Name: "host-root", VolumeSource: corev1.VolumeSource{ @@ -544,6 +553,31 @@ func waitForPodCompletion(ctx context.Context, clientset kubernetes.Interface, p return fmt.Errorf("pod %s did not complete", pod.Name) } +func waitForPodRunning(ctx context.Context, clientset kubernetes.Interface, pod *corev1.Pod) error { + watcher, err := clientset.CoreV1().Pods(pod.Namespace).Watch(ctx, metav1.ListOptions{ + FieldSelector: fmt.Sprintf("metadata.name=%s", pod.Name), + }) + if err != nil { + return err + } + defer watcher.Stop() + + for event := range watcher.ResultChan() { + podEvent, ok := event.Object.(*v1.Pod) + if !ok { + continue + } + for _, containerStatus := range podEvent.Status.ContainerStatuses { + if containerStatus.Name == "remote-collector" { + if containerStatus.State.Running != nil { + return nil + } + } + } + } + return fmt.Errorf("pod %s did not complete", pod.Name) +} + func getPodLogs(ctx context.Context, clientset kubernetes.Interface, pod *corev1.Pod) ([]byte, error) { podLogOpts := corev1.PodLogOptions{ Container: pod.Spec.Containers[0].Name, From b4120ae55fb6620fa338b54f26c8a0f4cda53cd7 Mon Sep 17 00:00:00 2001 From: hedge-sparrow Date: Fri, 1 Nov 2024 10:41:30 +0000 Subject: [PATCH 05/31] Use daemonset --- cmd/troubleshoot/cli/run.go | 11 +- .../v1beta2/supportbundle_types.go | 14 +- .../v1beta2/fake/fake_supportbundle.go | 14 +- .../troubleshoot/v1beta2/supportbundle.go | 4 +- pkg/supportbundle/collect.go | 242 ++++++++++-------- 5 files changed, 148 insertions(+), 137 deletions(-) diff --git a/cmd/troubleshoot/cli/run.go b/cmd/troubleshoot/cli/run.go index f141ed87d..1a5aa5b96 100644 --- a/cmd/troubleshoot/cli/run.go +++ b/cmd/troubleshoot/cli/run.go @@ -110,7 +110,7 @@ func runTroubleshoot(v *viper.Viper, args []string) error { } if interactive { - if len(mainBundle.Spec.HostCollectors) > 0 && !util.IsRunningAsRoot() { + if len(mainBundle.Spec.HostCollectors) > 0 && !util.IsRunningAsRoot() && !mainBundle.Spec.RunHostCollectorsInPod { fmt.Print(cursor.Show()) if util.PromptYesNo(util.HOST_COLLECTORS_RUN_AS_ROOT_PROMPT) { fmt.Println("Exiting...") @@ -184,7 +184,7 @@ func runTroubleshoot(v *viper.Viper, args []string) error { OutputPath: v.GetString("output"), Redact: v.GetBool("redact"), FromCLI: true, - RunHostCollectorsInPod: mainBundle.Metadata.RunHostCollectorsInPod, + RunHostCollectorsInPod: mainBundle.Spec.RunHostCollectorsInPod, } nonInteractiveOutput := analysisOutput{} @@ -341,7 +341,6 @@ func loadSpecs(ctx context.Context, args []string, client kubernetes.Interface) ObjectMeta: metav1.ObjectMeta{ Name: "merged-support-bundle-spec", }, - RunHostCollectorsInPod: false, }, } @@ -351,11 +350,11 @@ func loadSpecs(ctx context.Context, args []string, client kubernetes.Interface) sb := sb mainBundle = supportbundle.ConcatSpec(mainBundle, &sb) //check if sb has metadata and if it has RunHostCollectorsInPod set to true - if !reflect.DeepEqual(sb.Metadata.ObjectMeta, metav1.ObjectMeta{}) && sb.Metadata.RunHostCollectorsInPod { - enableRunHostCollectorsInPod = sb.Metadata.RunHostCollectorsInPod + if !reflect.DeepEqual(sb.Metadata.ObjectMeta, metav1.ObjectMeta{}) && sb.Spec.RunHostCollectorsInPod { + enableRunHostCollectorsInPod = sb.Spec.RunHostCollectorsInPod } } - mainBundle.Metadata.RunHostCollectorsInPod = enableRunHostCollectorsInPod + mainBundle.Spec.RunHostCollectorsInPod = enableRunHostCollectorsInPod for _, c := range kinds.CollectorsV1Beta2 { mainBundle.Spec.Collectors = util.Append(mainBundle.Spec.Collectors, c.Spec.Collectors) diff --git a/pkg/apis/troubleshoot/v1beta2/supportbundle_types.go b/pkg/apis/troubleshoot/v1beta2/supportbundle_types.go index 4c1a6df39..791f68adf 100644 --- a/pkg/apis/troubleshoot/v1beta2/supportbundle_types.go +++ b/pkg/apis/troubleshoot/v1beta2/supportbundle_types.go @@ -21,17 +21,17 @@ import ( ) type SupportBundleMetadata struct { - metav1.ObjectMeta `json:",inline" yaml:",inline"` - RunHostCollectorsInPod bool `json:"runHostCollectorsInPod,omitempty" yaml:"runHostCollectorsInPod,omitempty"` + metav1.ObjectMeta `json:",inline" yaml:",inline"` } // SupportBundleSpec defines the desired state of SupportBundle type SupportBundleSpec struct { - AfterCollection []*AfterCollection `json:"afterCollection,omitempty" yaml:"afterCollection,omitempty"` - Collectors []*Collect `json:"collectors,omitempty" yaml:"collectors,omitempty"` - HostCollectors []*HostCollect `json:"hostCollectors,omitempty" yaml:"hostCollectors,omitempty"` - Analyzers []*Analyze `json:"analyzers,omitempty" yaml:"analyzers,omitempty"` - HostAnalyzers []*HostAnalyze `json:"hostAnalyzers,omitempty" yaml:"hostAnalyzers,omitempty"` + RunHostCollectorsInPod bool `json:"runHostCollectorsInPod,omitempty" yaml:"runHostCollectorsInPod,omitempty"` + AfterCollection []*AfterCollection `json:"afterCollection,omitempty" yaml:"afterCollection,omitempty"` + Collectors []*Collect `json:"collectors,omitempty" yaml:"collectors,omitempty"` + HostCollectors []*HostCollect `json:"hostCollectors,omitempty" yaml:"hostCollectors,omitempty"` + Analyzers []*Analyze `json:"analyzers,omitempty" yaml:"analyzers,omitempty"` + HostAnalyzers []*HostAnalyze `json:"hostAnalyzers,omitempty" yaml:"hostAnalyzers,omitempty"` // URI optionally defines a location which is the source of this spec to allow updating of the spec at runtime Uri string `json:"uri,omitempty" yaml:"uri,omitempty"` } diff --git a/pkg/client/troubleshootclientset/typed/troubleshoot/v1beta2/fake/fake_supportbundle.go b/pkg/client/troubleshootclientset/typed/troubleshoot/v1beta2/fake/fake_supportbundle.go index 45d40c88c..b52f07912 100644 --- a/pkg/client/troubleshootclientset/typed/troubleshoot/v1beta2/fake/fake_supportbundle.go +++ b/pkg/client/troubleshootclientset/typed/troubleshoot/v1beta2/fake/fake_supportbundle.go @@ -22,7 +22,6 @@ import ( v1beta2 "github.com/replicatedhq/troubleshoot/pkg/apis/troubleshoot/v1beta2" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - labels "k8s.io/apimachinery/pkg/labels" types "k8s.io/apimachinery/pkg/types" watch "k8s.io/apimachinery/pkg/watch" testing "k8s.io/client-go/testing" @@ -57,18 +56,7 @@ func (c *FakeSupportBundles) List(ctx context.Context, opts v1.ListOptions) (res if obj == nil { return nil, err } - - label, _, _ := testing.ExtractFromListOptions(opts) - if label == nil { - label = labels.Everything() - } - list := &v1beta2.SupportBundleList{ListMeta: obj.(*v1beta2.SupportBundleList).ListMeta} - for _, item := range obj.(*v1beta2.SupportBundleList).Items { - if label.Matches(labels.Set(item.Metadata.Labels)) { - list.Items = append(list.Items, item) - } - } - return list, err + return obj.(*v1beta2.SupportBundleList), err } // Watch returns a watch.Interface that watches the requested supportBundles. diff --git a/pkg/client/troubleshootclientset/typed/troubleshoot/v1beta2/supportbundle.go b/pkg/client/troubleshootclientset/typed/troubleshoot/v1beta2/supportbundle.go index 521f4df90..49399e767 100644 --- a/pkg/client/troubleshootclientset/typed/troubleshoot/v1beta2/supportbundle.go +++ b/pkg/client/troubleshootclientset/typed/troubleshoot/v1beta2/supportbundle.go @@ -127,7 +127,7 @@ func (c *supportBundles) Update(ctx context.Context, supportBundle *v1beta2.Supp err = c.client.Put(). Namespace(c.ns). Resource("supportbundles"). - Name(supportBundle.Metadata.Name). + Name(supportBundle.Name). VersionedParams(&opts, scheme.ParameterCodec). Body(supportBundle). Do(ctx). @@ -142,7 +142,7 @@ func (c *supportBundles) UpdateStatus(ctx context.Context, supportBundle *v1beta err = c.client.Put(). Namespace(c.ns). Resource("supportbundles"). - Name(supportBundle.Metadata.Name). + Name(supportBundle.Name). SubResource("status"). VersionedParams(&opts, scheme.ParameterCodec). Body(supportBundle). diff --git a/pkg/supportbundle/collect.go b/pkg/supportbundle/collect.go index 289b9c45d..db65aceeb 100644 --- a/pkg/supportbundle/collect.go +++ b/pkg/supportbundle/collect.go @@ -23,6 +23,7 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -34,6 +35,11 @@ import ( "k8s.io/utils/ptr" ) +const ( + selectorLabelKey = "ds-selector-label" + selectorLabelValue = "remote-host-collector" +) + func runHostCollectors(ctx context.Context, hostCollectors []*troubleshootv1beta2.HostCollect, additionalRedactors *troubleshootv1beta2.Redactor, bundlePath string, opts SupportBundleCreateOpts) (collect.CollectorResult, error) { var collectResult map[string][]byte @@ -254,45 +260,45 @@ func runLocalHostCollectors(ctx context.Context, hostCollectors []*troubleshootv return allCollectedData } +// getExecOutputs executes `collect -` with collector data passed to stdin and returns stdout, stderr and error func getExecOutputs( - ctx context.Context, clientConfig *rest.Config, client *kubernetes.Clientset, pod corev1.Pod, execCollector *troubleshootv1beta2.HostCollect, -) ([]byte, []byte, []string) { + ctx context.Context, clientConfig *rest.Config, client *kubernetes.Clientset, pod corev1.Pod, collectorData []byte, +) ([]byte, []byte, error) { container := pod.Spec.Containers[0].Name req := client.CoreV1().RESTClient().Post().Resource("pods").Name(pod.Name).Namespace(pod.Namespace).SubResource("exec") scheme := runtime.NewScheme() if err := corev1.AddToScheme(scheme); err != nil { - return nil, nil, []string{err.Error()} + return nil, nil, err } parameterCodec := runtime.NewParameterCodec(scheme) req.VersionedParams(&corev1.PodExecOptions{ - Command: []string{"/troubleshoot/collect", "-"}, + Command: []string{"/troubleshoot/collect", "-", "--chroot", "/host", "--format", "raw"}, Container: container, Stdin: true, - Stdout: false, + Stdout: true, Stderr: true, TTY: false, }, parameterCodec) exec, err := remotecommand.NewSPDYExecutor(clientConfig, "POST", req.URL()) if err != nil { - return nil, nil, []string{err.Error()} + return nil, nil, err } - stdin := new(bytes.Buffer) stdout := new(bytes.Buffer) stderr := new(bytes.Buffer) err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{ - Stdin: stdin, + Stdin: bytes.NewBuffer(collectorData), Stdout: stdout, Stderr: stderr, Tty: false, }) if err != nil { - return stdout.Bytes(), stderr.Bytes(), []string{err.Error()} + return stdout.Bytes(), stderr.Bytes(), err } return stdout.Bytes(), stderr.Bytes(), nil @@ -301,15 +307,6 @@ func getExecOutputs( func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshootv1beta2.HostCollect, bundlePath string, opts SupportBundleCreateOpts) map[string][]byte { output := collect.NewResult() - // convert host collectors into a HostCollector spec - spec := createHostCollectorsSpec(hostCollectors) - specJSON, err := convertHostCollectorSpecToJSON(spec) - if err != nil { - // TODO: error handling - return nil - } - klog.V(2).Infof("HostCollector spec: %s", specJSON) - clientset, err := kubernetes.NewForConfig(opts.KubernetesRestConfig) if err != nil { // TODO: error handling @@ -332,52 +329,72 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot var wg sync.WaitGroup var mu sync.Mutex - nodeLogs := make(map[string][]byte) + nodeLogs := make(map[string]map[string][]byte) - for _, node := range nodeList.Nodes { + ds, err := createHostCollectorDS(ctx, clientset, labels) + if err != nil { + // TODO: error handling + return map[string][]byte{} + } + + // wait for at least one pod to be scheduled + err = waitForDS(ctx, clientset, ds) + if err != nil { + // TODO error handling + return map[string][]byte{} + } + + klog.V(2).Infof("Created Remote Host Collector Daemonset %s", ds.Name) + pods, err := clientset.CoreV1().Pods(ds.Namespace).List(ctx, metav1.ListOptions{ + LabelSelector: selectorLabelKey + "=" + selectorLabelValue, + TimeoutSeconds: new(int64), + Limit: 0, + }) + + for _, pod := range pods.Items { wg.Add(1) - go func(node string) { + go func(pod corev1.Pod) { defer wg.Done() // TODO: set timeout waiting - // create a remote pod spec to run the host collectors - nodeSelector := map[string]string{ - "kubernetes.io/hostname": node, - } - pod, err := createHostCollectorPod(ctx, clientset, nodeSelector, labels) + err := waitForPodRunning(ctx, clientset, &pod) if err != nil { - // TODO: error handling + // TODO error handling return } - klog.V(2).Infof("Created Remote Host Collector Pod %s", pod.Name) - - // wait for the pod to be running - // err = waitForPodRunning(ctx, clientset, pod) - // if err != nil { - // // TODO: error handling - // return - // } - time.Sleep(10 * time.Second) - - getExecOutputs() - - // // extract logs from the pod - // var logs []byte - // logs, err = getPodLogs(ctx, clientset, pod) - // if err != nil { - // // TODO: error handling - // return - // } + + results := map[string][]byte{} + for _, collectorSpec := range hostCollectors { + // convert host collectors into a HostCollector spec + spec := createHostCollectorsSpec([]*troubleshootv1beta2.HostCollect{collectorSpec}) + specJSON, err := json.Marshal(spec) + if err != nil { + // TODO: error handling + return + } + klog.V(2).Infof("HostCollector spec: %s", specJSON) + + stdout, _, err := getExecOutputs(ctx, opts.KubernetesRestConfig, clientset, pod, specJSON) + if err != nil { + return + } + result := map[string][]byte{} + json.Unmarshal(stdout, &result) + for file, data := range result { + results[file] = data + } + time.Sleep(1 * time.Second) + } // wait for log stream to catch up time.Sleep(1 * time.Second) mu.Lock() - nodeLogs[node] = logs + nodeLogs[pod.Spec.NodeName] = results mu.Unlock() - }(node) + }(pod) } wg.Wait() @@ -387,19 +404,14 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot // TODO: // delete the config map // delete the remote pods + clientset.AppsV1().DaemonSets(ds.Namespace).Delete(ctx, ds.Name, metav1.DeleteOptions{}) }() - // aggregate results for node, logs := range nodeLogs { - var nodeResult map[string]string - if err := json.Unmarshal(logs, &nodeResult); err != nil { - // TODO: error handling - return nil - } - for file, data := range nodeResult { + for file, data := range logs { // trim host-collectors/ prefix file = strings.TrimPrefix(file, "host-collectors/") - err := output.SaveResult(bundlePath, fmt.Sprintf("host-collectors/%s/%s", node, file), bytes.NewBufferString(data)) + err := output.SaveResult(bundlePath, fmt.Sprintf("host-collectors/%s/%s", node, file), bytes.NewBuffer(data)) if err != nil { // TODO: error handling return nil @@ -469,63 +481,82 @@ func createHostCollectorConfigMap(ctx context.Context, clientset kubernetes.Inte return createdConfigMap, nil } -func createHostCollectorPod(ctx context.Context, clientset kubernetes.Interface, nodeSelector map[string]string, labels map[string]string) (*corev1.Pod, error) { +func createHostCollectorDS(ctx context.Context, clientset kubernetes.Interface, labels map[string]string) (*appsv1.DaemonSet, error) { ns := "default" imageName := "replicated/troubleshoot:latest" imagePullPolicy := corev1.PullAlways - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - GenerateName: "remote-host-collector-", - Namespace: ns, - Labels: labels, - }, - Spec: corev1.PodSpec{ - NodeSelector: nodeSelector, - HostNetwork: true, - HostPID: true, - HostIPC: true, - RestartPolicy: corev1.RestartPolicyNever, - Containers: []corev1.Container{ - { - Image: imageName, - ImagePullPolicy: imagePullPolicy, - Name: "remote-collector", - Command: []string{"/bin/bash", "-c"}, - Args: []string{"while true; do sleep 30; done;"}, - SecurityContext: &corev1.SecurityContext{ - Privileged: ptr.To(true), + labels[selectorLabelKey] = selectorLabelValue + + podSpec := corev1.PodSpec{ + HostNetwork: true, + HostPID: true, + HostIPC: true, + Containers: []corev1.Container{ + { + Image: imageName, + ImagePullPolicy: imagePullPolicy, + Name: "remote-collector", + Command: []string{"/bin/bash", "-c"}, + Args: []string{"while true; do sleep 30; done;"}, + SecurityContext: &corev1.SecurityContext{ + Privileged: ptr.To(true), + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "host-root", + MountPath: "/host", }, - VolumeMounts: []corev1.VolumeMount{ - { - Name: "host-root", - MountPath: "/host", - }, + }, + }, + }, + Volumes: []corev1.Volume{ + { + Name: "host-root", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: "/", }, }, }, - Volumes: []corev1.Volume{ - { - Name: "host-root", - VolumeSource: corev1.VolumeSource{ - HostPath: &corev1.HostPathVolumeSource{ - Path: "/", - }, + }, + } + + ds := &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "remote-host-collector", + Namespace: ns, + Labels: labels, + }, + Spec: appsv1.DaemonSetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: labels, + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: selectorLabelKey, + Operator: "In", + Values: []string{selectorLabelValue}, }, }, }, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + }, + Spec: podSpec, + }, }, } - createdPod, err := clientset.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{}) + createdDS, err := clientset.AppsV1().DaemonSets(ns).Create(ctx, ds, metav1.CreateOptions{}) if err != nil { return nil, errors.Wrap(err, "failed to create Remote Host Collector Pod") } - return createdPod, nil + return createdDS, nil } -func waitForPodCompletion(ctx context.Context, clientset kubernetes.Interface, pod *corev1.Pod) error { +func waitForPodRunning(ctx context.Context, clientset kubernetes.Interface, pod *corev1.Pod) error { watcher, err := clientset.CoreV1().Pods(pod.Namespace).Watch(ctx, metav1.ListOptions{ FieldSelector: fmt.Sprintf("metadata.name=%s", pod.Name), }) @@ -541,11 +572,8 @@ func waitForPodCompletion(ctx context.Context, clientset kubernetes.Interface, p } for _, containerStatus := range podEvent.Status.ContainerStatuses { if containerStatus.Name == "remote-collector" { - if containerStatus.State.Terminated != nil { - if containerStatus.State.Terminated.ExitCode == 0 { - return nil - } - return fmt.Errorf("container %s in pod %s failed with exit code %d", containerStatus.Name, pod.Name, containerStatus.State.Terminated.ExitCode) + if containerStatus.State.Running != nil { + return nil } } } @@ -553,9 +581,9 @@ func waitForPodCompletion(ctx context.Context, clientset kubernetes.Interface, p return fmt.Errorf("pod %s did not complete", pod.Name) } -func waitForPodRunning(ctx context.Context, clientset kubernetes.Interface, pod *corev1.Pod) error { - watcher, err := clientset.CoreV1().Pods(pod.Namespace).Watch(ctx, metav1.ListOptions{ - FieldSelector: fmt.Sprintf("metadata.name=%s", pod.Name), +func waitForDS(ctx context.Context, clientset kubernetes.Interface, ds *appsv1.DaemonSet) error { + watcher, err := clientset.AppsV1().DaemonSets(ds.Namespace).Watch(ctx, metav1.ListOptions{ + FieldSelector: fmt.Sprintf("metadata.name=%s", ds.Name), }) if err != nil { return err @@ -563,19 +591,15 @@ func waitForPodRunning(ctx context.Context, clientset kubernetes.Interface, pod defer watcher.Stop() for event := range watcher.ResultChan() { - podEvent, ok := event.Object.(*v1.Pod) + dsEvent, ok := event.Object.(*appsv1.DaemonSet) if !ok { continue } - for _, containerStatus := range podEvent.Status.ContainerStatuses { - if containerStatus.Name == "remote-collector" { - if containerStatus.State.Running != nil { - return nil - } - } + if dsEvent.Status.NumberReady > 1 { + return nil } } - return fmt.Errorf("pod %s did not complete", pod.Name) + return fmt.Errorf("pod %s did not complete", ds.Name) } func getPodLogs(ctx context.Context, clientset kubernetes.Interface, pod *corev1.Pod) ([]byte, error) { From 92e215d6b9fb791044009b8eb98f5f47aed870c4 Mon Sep 17 00:00:00 2001 From: hedge-sparrow Date: Fri, 1 Nov 2024 13:30:30 +0000 Subject: [PATCH 06/31] error handling --- pkg/supportbundle/collect.go | 55 +++++++++++++++++------------------- 1 file changed, 26 insertions(+), 29 deletions(-) diff --git a/pkg/supportbundle/collect.go b/pkg/supportbundle/collect.go index db65aceeb..dc24b33dd 100644 --- a/pkg/supportbundle/collect.go +++ b/pkg/supportbundle/collect.go @@ -23,6 +23,7 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" + "golang.org/x/sync/errgroup" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" @@ -42,10 +43,14 @@ const ( func runHostCollectors(ctx context.Context, hostCollectors []*troubleshootv1beta2.HostCollect, additionalRedactors *troubleshootv1beta2.Redactor, bundlePath string, opts SupportBundleCreateOpts) (collect.CollectorResult, error) { + var err error var collectResult map[string][]byte if opts.RunHostCollectorsInPod { - collectResult = runRemoteHostCollectors(ctx, hostCollectors, bundlePath, opts) + collectResult, err = runRemoteHostCollectors(ctx, hostCollectors, bundlePath, opts) + if err != nil { + return collectResult, err + } } else { collectResult = runLocalHostCollectors(ctx, hostCollectors, bundlePath, opts) } @@ -304,21 +309,19 @@ func getExecOutputs( return stdout.Bytes(), stderr.Bytes(), nil } -func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshootv1beta2.HostCollect, bundlePath string, opts SupportBundleCreateOpts) map[string][]byte { +func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshootv1beta2.HostCollect, bundlePath string, opts SupportBundleCreateOpts) (map[string][]byte, error) { output := collect.NewResult() clientset, err := kubernetes.NewForConfig(opts.KubernetesRestConfig) if err != nil { - // TODO: error handling - return nil + return nil, err } // TODO: rbac check nodeList, err := getNodeList(clientset, opts) if err != nil { - // TODO: error handling - return nil + return nil, err } klog.V(2).Infof("Node list to run remote host collectors: %s", nodeList.Nodes) @@ -327,21 +330,18 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot "troubleshoot.sh/remote-collector": "true", } - var wg sync.WaitGroup var mu sync.Mutex nodeLogs := make(map[string]map[string][]byte) ds, err := createHostCollectorDS(ctx, clientset, labels) if err != nil { - // TODO: error handling - return map[string][]byte{} + return nil, err } // wait for at least one pod to be scheduled err = waitForDS(ctx, clientset, ds) if err != nil { - // TODO error handling - return map[string][]byte{} + return nil, err } klog.V(2).Infof("Created Remote Host Collector Daemonset %s", ds.Name) @@ -351,33 +351,27 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot Limit: 0, }) + var eg errgroup.Group for _, pod := range pods.Items { - wg.Add(1) - go func(pod corev1.Pod) { - defer wg.Done() - + eg.Go(func() error { // TODO: set timeout waiting - err := waitForPodRunning(ctx, clientset, &pod) if err != nil { - // TODO error handling - return + return err } - results := map[string][]byte{} for _, collectorSpec := range hostCollectors { // convert host collectors into a HostCollector spec spec := createHostCollectorsSpec([]*troubleshootv1beta2.HostCollect{collectorSpec}) specJSON, err := json.Marshal(spec) if err != nil { - // TODO: error handling - return + return err } klog.V(2).Infof("HostCollector spec: %s", specJSON) stdout, _, err := getExecOutputs(ctx, opts.KubernetesRestConfig, clientset, pod, specJSON) if err != nil { - return + return err } result := map[string][]byte{} json.Unmarshal(stdout, &result) @@ -393,10 +387,13 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot mu.Lock() nodeLogs[pod.Spec.NodeName] = results mu.Unlock() - - }(pod) + return nil + }) + } + err = eg.Wait() + if err != nil { + return nil, err } - wg.Wait() klog.V(2).Infof("All remote host collectors completed") @@ -414,7 +411,7 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot err := output.SaveResult(bundlePath, fmt.Sprintf("host-collectors/%s/%s", node, file), bytes.NewBuffer(data)) if err != nil { // TODO: error handling - return nil + return nil, err } } } @@ -423,15 +420,15 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot nodeListBytes, err := json.MarshalIndent(nodeList, "", " ") if err != nil { // TODO: error handling - return nil + return nil, err } err = output.SaveResult(bundlePath, constants.NODE_LIST_FILE, bytes.NewBuffer(nodeListBytes)) if err != nil { // TODO: error handling - return nil + return nil, err } - return output + return output, nil } func createHostCollectorsSpec(hostCollectors []*troubleshootv1beta2.HostCollect) *troubleshootv1beta2.HostCollector { From 1d22bff4cce94561c3c6b88f64a5df921b943712 Mon Sep 17 00:00:00 2001 From: hedge-sparrow Date: Fri, 1 Nov 2024 15:17:53 +0000 Subject: [PATCH 07/31] linting --- pkg/supportbundle/collect.go | 105 +---------------------------------- 1 file changed, 3 insertions(+), 102 deletions(-) diff --git a/pkg/supportbundle/collect.go b/pkg/supportbundle/collect.go index dc24b33dd..4e640be33 100644 --- a/pkg/supportbundle/collect.go +++ b/pkg/supportbundle/collect.go @@ -1,7 +1,6 @@ package supportbundle import ( - "bufio" "bytes" "context" "encoding/json" @@ -350,6 +349,9 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot TimeoutSeconds: new(int64), Limit: 0, }) + if err != nil { + return nil, err + } var eg errgroup.Group for _, pod := range pods.Items { @@ -446,38 +448,6 @@ func createHostCollectorsSpec(hostCollectors []*troubleshootv1beta2.HostCollect) } } -func convertHostCollectorSpecToJSON(spec *troubleshootv1beta2.HostCollector) (string, error) { - jsonData, err := json.Marshal(spec) - if err != nil { - return "", errors.Wrap(err, "failed to marshal Host Collector spec") - } - return string(jsonData), nil -} - -func createHostCollectorConfigMap(ctx context.Context, clientset kubernetes.Interface, spec string) (*corev1.ConfigMap, error) { - // TODO: configurable namespaces? - ns := "default" - - data := map[string]string{ - "collector.json": spec, - } - - cm := &corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - GenerateName: "remote-host-collector-specs-", - Namespace: ns, - }, - Data: data, - } - - createdConfigMap, err := clientset.CoreV1().ConfigMaps(ns).Create(ctx, cm, metav1.CreateOptions{}) - if err != nil { - return nil, errors.Wrap(err, "failed to create Remote Host Collector Spec ConfigMap") - } - - return createdConfigMap, nil -} - func createHostCollectorDS(ctx context.Context, clientset kubernetes.Interface, labels map[string]string) (*appsv1.DaemonSet, error) { ns := "default" imageName := "replicated/troubleshoot:latest" @@ -598,72 +568,3 @@ func waitForDS(ctx context.Context, clientset kubernetes.Interface, ds *appsv1.D } return fmt.Errorf("pod %s did not complete", ds.Name) } - -func getPodLogs(ctx context.Context, clientset kubernetes.Interface, pod *corev1.Pod) ([]byte, error) { - podLogOpts := corev1.PodLogOptions{ - Container: pod.Spec.Containers[0].Name, - } - req := clientset.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &podLogOpts) - logs, err := req.Stream(ctx) - if err != nil { - return nil, errors.Wrap(err, "failed to get log stream") - } - defer logs.Close() - - return io.ReadAll(logs) -} - -func streamPodLogs(ctx context.Context, clientset kubernetes.Interface, pod *corev1.Pod, node string, opts SupportBundleCreateOpts) { - - // todo: timeout - - send := func(msg string) { - opts.ProgressChan <- fmt.Sprintf("[%s] %s", node, msg) - } - - // wait for pod container log-tailer to start - watcher, err := clientset.CoreV1().Pods(pod.Namespace).Watch(ctx, metav1.ListOptions{ - FieldSelector: fmt.Sprintf("metadata.name=%s", pod.Name), - }) - if err != nil { - send(errors.Wrap(err, "failed to start pod watcher").Error()) - return - } - defer watcher.Stop() - - for event := range watcher.ResultChan() { - podEvent, ok := event.Object.(*corev1.Pod) - if !ok { - continue - } - for _, containerStatus := range podEvent.Status.ContainerStatuses { - if containerStatus.Name == "log-tailer" { - if containerStatus.State.Running != nil { - goto StartLogStream - } - } - } - } - -StartLogStream: - // stream logs from container named log-tailer in the pod - podLogOpts := corev1.PodLogOptions{ - Container: "log-tailer", - Follow: true, - } - req := clientset.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &podLogOpts) - logs, err := req.Stream(ctx) - if err != nil { - send(errors.Wrap(err, "failed to get log stream").Error()) - return - } - defer logs.Close() - scanner := bufio.NewScanner(logs) - for scanner.Scan() { - send(scanner.Text()) - } - if err := scanner.Err(); err != nil { - send(errors.Wrap(err, "failed to read log stream").Error()) - } - send("Log stream ended") -} From 14b86914149791c9fb9a22b596761e5f4f568008 Mon Sep 17 00:00:00 2001 From: hedge-sparrow Date: Fri, 1 Nov 2024 17:10:31 +0000 Subject: [PATCH 08/31] oops --- pkg/supportbundle/collect.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/supportbundle/collect.go b/pkg/supportbundle/collect.go index 4e640be33..ee124dd59 100644 --- a/pkg/supportbundle/collect.go +++ b/pkg/supportbundle/collect.go @@ -562,7 +562,7 @@ func waitForDS(ctx context.Context, clientset kubernetes.Interface, ds *appsv1.D if !ok { continue } - if dsEvent.Status.NumberReady > 1 { + if dsEvent.Status.NumberReady > 0 { return nil } } From 77939a24a43d5a68d523de4e3a5288dc8242f948 Mon Sep 17 00:00:00 2001 From: Dexter Yan Date: Mon, 4 Nov 2024 15:24:16 +1300 Subject: [PATCH 09/31] use polling instead of waiting and fix save empty data --- pkg/supportbundle/collect.go | 34 ++++++++++++++++++++++++---------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/pkg/supportbundle/collect.go b/pkg/supportbundle/collect.go index ee124dd59..ceb1d3074 100644 --- a/pkg/supportbundle/collect.go +++ b/pkg/supportbundle/collect.go @@ -305,7 +305,21 @@ func getExecOutputs( return stdout.Bytes(), stderr.Bytes(), err } - return stdout.Bytes(), stderr.Bytes(), nil + // Poll until stdout is non-empty or the context times out + ticker := time.NewTicker(100 * time.Millisecond) // Adjust polling frequency as needed + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if stdout.Len() > 0 { + return stdout.Bytes(), stderr.Bytes(), nil + } + case <-ctx.Done(): + // Return whatever we have if context is canceled + return stdout.Bytes(), stderr.Bytes(), ctx.Err() + } + } } func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshootv1beta2.HostCollect, bundlePath string, opts SupportBundleCreateOpts) (map[string][]byte, error) { @@ -357,10 +371,10 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot for _, pod := range pods.Items { eg.Go(func() error { // TODO: set timeout waiting - err := waitForPodRunning(ctx, clientset, &pod) - if err != nil { + if err := waitForPodRunning(ctx, clientset, &pod); err != nil { return err } + results := map[string][]byte{} for _, collectorSpec := range hostCollectors { // convert host collectors into a HostCollector spec @@ -375,17 +389,17 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot if err != nil { return err } - result := map[string][]byte{} - json.Unmarshal(stdout, &result) + + result := map[string]string{} + if err := json.Unmarshal(stdout, &result); err != nil { + return err + } + for file, data := range result { - results[file] = data + results[file] = []byte(data) } - time.Sleep(1 * time.Second) } - // wait for log stream to catch up - time.Sleep(1 * time.Second) - mu.Lock() nodeLogs[pod.Spec.NodeName] = results mu.Unlock() From d7cf65369dc36a634da68ec286c6235acee6948f Mon Sep 17 00:00:00 2001 From: Dexter Yan Date: Mon, 4 Nov 2024 17:44:37 +1300 Subject: [PATCH 10/31] fix test fail --- pkg/supportbundle/collect.go | 21 ++++++--------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/pkg/supportbundle/collect.go b/pkg/supportbundle/collect.go index ceb1d3074..bfb5ad5af 100644 --- a/pkg/supportbundle/collect.go +++ b/pkg/supportbundle/collect.go @@ -305,21 +305,7 @@ func getExecOutputs( return stdout.Bytes(), stderr.Bytes(), err } - // Poll until stdout is non-empty or the context times out - ticker := time.NewTicker(100 * time.Millisecond) // Adjust polling frequency as needed - defer ticker.Stop() - - for { - select { - case <-ticker.C: - if stdout.Len() > 0 { - return stdout.Bytes(), stderr.Bytes(), nil - } - case <-ctx.Done(): - // Return whatever we have if context is canceled - return stdout.Bytes(), stderr.Bytes(), ctx.Err() - } - } + return stdout.Bytes(), stderr.Bytes(), nil } func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshootv1beta2.HostCollect, bundlePath string, opts SupportBundleCreateOpts) (map[string][]byte, error) { @@ -398,8 +384,13 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot for file, data := range result { results[file] = []byte(data) } + + time.Sleep(1 * time.Second) } + // wait for log stream to catch up + time.Sleep(1 * time.Second) + mu.Lock() nodeLogs[pod.Spec.NodeName] = results mu.Unlock() From 80084e460bbf9ce3cb0529e493f3915272a24007 Mon Sep 17 00:00:00 2001 From: Dexter Yan Date: Mon, 4 Nov 2024 18:06:33 +1300 Subject: [PATCH 11/31] testing polling --- pkg/supportbundle/collect.go | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/pkg/supportbundle/collect.go b/pkg/supportbundle/collect.go index bfb5ad5af..e0d7bf339 100644 --- a/pkg/supportbundle/collect.go +++ b/pkg/supportbundle/collect.go @@ -305,7 +305,22 @@ func getExecOutputs( return stdout.Bytes(), stderr.Bytes(), err } - return stdout.Bytes(), stderr.Bytes(), nil + // Poll until stdout is non-empty or the context times out + ticker := time.NewTicker(100 * time.Millisecond) // Adjust polling frequency as needed + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if stdout.Len() > 0 { + return stdout.Bytes(), stderr.Bytes(), nil + } + case <-ctx.Done(): + if errors.Is(ctx.Err(), context.DeadlineExceeded) { + return stdout.Bytes(), stderr.Bytes(), errors.New("timed out waiting for collector output") + } + } + } } func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshootv1beta2.HostCollect, bundlePath string, opts SupportBundleCreateOpts) (map[string][]byte, error) { @@ -384,13 +399,8 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot for file, data := range result { results[file] = []byte(data) } - - time.Sleep(1 * time.Second) } - // wait for log stream to catch up - time.Sleep(1 * time.Second) - mu.Lock() nodeLogs[pod.Spec.NodeName] = results mu.Unlock() From 57d24a26e5044946a482cdbbce07d4c72929ad92 Mon Sep 17 00:00:00 2001 From: Dexter Yan Date: Mon, 4 Nov 2024 18:19:07 +1300 Subject: [PATCH 12/31] reset polling --- pkg/supportbundle/collect.go | 23 ++++++----------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/pkg/supportbundle/collect.go b/pkg/supportbundle/collect.go index e0d7bf339..7894cc935 100644 --- a/pkg/supportbundle/collect.go +++ b/pkg/supportbundle/collect.go @@ -304,23 +304,7 @@ func getExecOutputs( if err != nil { return stdout.Bytes(), stderr.Bytes(), err } - - // Poll until stdout is non-empty or the context times out - ticker := time.NewTicker(100 * time.Millisecond) // Adjust polling frequency as needed - defer ticker.Stop() - - for { - select { - case <-ticker.C: - if stdout.Len() > 0 { - return stdout.Bytes(), stderr.Bytes(), nil - } - case <-ctx.Done(): - if errors.Is(ctx.Err(), context.DeadlineExceeded) { - return stdout.Bytes(), stderr.Bytes(), errors.New("timed out waiting for collector output") - } - } - } + return stdout.Bytes(), stderr.Bytes(), nil } func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshootv1beta2.HostCollect, bundlePath string, opts SupportBundleCreateOpts) (map[string][]byte, error) { @@ -399,8 +383,13 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot for file, data := range result { results[file] = []byte(data) } + + time.Sleep(1 * time.Second) } + // wait for log stream to catch up + time.Sleep(1 * time.Second) + mu.Lock() nodeLogs[pod.Spec.NodeName] = results mu.Unlock() From 80d99641acc583c783ea3691720ec3964c01c6f2 Mon Sep 17 00:00:00 2001 From: hedge-sparrow Date: Mon, 4 Nov 2024 08:22:17 +0000 Subject: [PATCH 13/31] remove isMasterNode --- pkg/supportbundle/supportbundle.go | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/pkg/supportbundle/supportbundle.go b/pkg/supportbundle/supportbundle.go index 3465cea97..31f28deed 100644 --- a/pkg/supportbundle/supportbundle.go +++ b/pkg/supportbundle/supportbundle.go @@ -22,7 +22,6 @@ import ( "github.com/replicatedhq/troubleshoot/pkg/convert" "github.com/replicatedhq/troubleshoot/pkg/version" "go.opentelemetry.io/otel" - v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -308,20 +307,8 @@ func getNodeList(clientset kubernetes.Interface, opts SupportBundleCreateOpts) ( nodeList := NodeList{} for _, node := range nodes.Items { - if isMasterNode(node) { - continue - } nodeList.Nodes = append(nodeList.Nodes, node.Name) } return &nodeList, nil } - -func isMasterNode(node v1.Node) bool { - for label := range node.Labels { - if label == "node-role.kubernetes.io/master" || label == "node-role.kubernetes.io/control-plane" { - return true - } - } - return false -} From 8db2e3ef9bb2fd59979be6f61149e4259d6615d3 Mon Sep 17 00:00:00 2001 From: Dexter Yan Date: Tue, 5 Nov 2024 18:03:05 +1300 Subject: [PATCH 14/31] add summary for host remote collectors --- pkg/supportbundle/collect.go | 34 +++++++++++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/pkg/supportbundle/collect.go b/pkg/supportbundle/collect.go index 7894cc935..35e946b8a 100644 --- a/pkg/supportbundle/collect.go +++ b/pkg/supportbundle/collect.go @@ -362,6 +362,29 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot results := map[string][]byte{} for _, collectorSpec := range hostCollectors { + collector, ok := collect.GetHostCollector(collectorSpec, bundlePath) + if !ok { + opts.ProgressChan <- "Host collector not found" + continue + } + + // Start a span for tracing + _, span := otel.Tracer(constants.LIB_TRACER_NAME).Start(ctx, collector.Title()) + span.SetAttributes(attribute.String("type", "Collect")) + + isExcluded, _ := collector.IsExcluded() + if isExcluded { + msg := fmt.Sprintf("[%s] Excluding host collector", collector.Title()) + opts.CollectorProgressCallback(opts.ProgressChan, msg) + span.SetAttributes(attribute.Bool(constants.EXCLUDED, true)) + span.End() + continue + } + + // Send progress event: starting the collector + msg := fmt.Sprintf("[%s] Running host collector...", collector.Title()) + opts.CollectorProgressCallback(opts.ProgressChan, msg) + // convert host collectors into a HostCollector spec spec := createHostCollectorsSpec([]*troubleshootv1beta2.HostCollect{collectorSpec}) specJSON, err := json.Marshal(spec) @@ -372,7 +395,10 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot stdout, _, err := getExecOutputs(ctx, opts.KubernetesRestConfig, clientset, pod, specJSON) if err != nil { - return err + // span.SetStatus(codes.Error, err.Error()) + msg := fmt.Sprintf("[%s] Error: %v", collector.Title(), err) + opts.CollectorProgressCallback(opts.ProgressChan, msg) + return errors.Wrap(err, "failed to run remote host collector") } result := map[string]string{} @@ -380,10 +406,16 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot return err } + // Send progress event: completed successfully + msg = fmt.Sprintf("[%s] Completed host collector", collector.Title()) + opts.CollectorProgressCallback(opts.ProgressChan, msg) + + // Aggregate the results for file, data := range result { results[file] = []byte(data) } + span.End() time.Sleep(1 * time.Second) } From 65f15bfc6de5683f108ba0521a675d8e63f36028 Mon Sep 17 00:00:00 2001 From: Dexter Yan Date: Tue, 5 Nov 2024 18:09:12 +1300 Subject: [PATCH 15/31] fix potential panic --- pkg/supportbundle/collect.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/pkg/supportbundle/collect.go b/pkg/supportbundle/collect.go index 35e946b8a..0d25a046a 100644 --- a/pkg/supportbundle/collect.go +++ b/pkg/supportbundle/collect.go @@ -439,7 +439,16 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot // TODO: // delete the config map // delete the remote pods - clientset.AppsV1().DaemonSets(ds.Namespace).Delete(ctx, ds.Name, metav1.DeleteOptions{}) + // check if the daemonset still exists + _, err := clientset.AppsV1().DaemonSets(ds.Namespace).Get(ctx, ds.Name, metav1.GetOptions{}) + if err != nil { + klog.Errorf("Failed to verify remote host collector daemonset %s still exists: %v", ds.Name, err) + return + } + + if err := clientset.AppsV1().DaemonSets(ds.Namespace).Delete(ctx, ds.Name, metav1.DeleteOptions{}); err != nil { + klog.Errorf("Failed to delete remote host collector daemonset %s: %v", ds.Name, err) + } }() for node, logs := range nodeLogs { From 675f058590f300d33c00a9c4296528062d1be6ba Mon Sep 17 00:00:00 2001 From: Dexter Yan Date: Wed, 6 Nov 2024 11:16:28 +1300 Subject: [PATCH 16/31] use per collector per pod --- pkg/supportbundle/collect.go | 93 +++++++++++++++++------------------- 1 file changed, 45 insertions(+), 48 deletions(-) diff --git a/pkg/supportbundle/collect.go b/pkg/supportbundle/collect.go index 0d25a046a..01b0509f5 100644 --- a/pkg/supportbundle/collect.go +++ b/pkg/supportbundle/collect.go @@ -353,45 +353,43 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot } var eg errgroup.Group - for _, pod := range pods.Items { - eg.Go(func() error { - // TODO: set timeout waiting - if err := waitForPodRunning(ctx, clientset, &pod); err != nil { - return err - } - results := map[string][]byte{} - for _, collectorSpec := range hostCollectors { - collector, ok := collect.GetHostCollector(collectorSpec, bundlePath) - if !ok { - opts.ProgressChan <- "Host collector not found" - continue - } + for _, collectorSpec := range hostCollectors { + collector, ok := collect.GetHostCollector(collectorSpec, bundlePath) + if !ok { + opts.ProgressChan <- "Host collector not found" + continue + } - // Start a span for tracing - _, span := otel.Tracer(constants.LIB_TRACER_NAME).Start(ctx, collector.Title()) - span.SetAttributes(attribute.String("type", "Collect")) + // Start a span for tracing + _, span := otel.Tracer(constants.LIB_TRACER_NAME).Start(ctx, collector.Title()) + span.SetAttributes(attribute.String("type", "Collect")) - isExcluded, _ := collector.IsExcluded() - if isExcluded { - msg := fmt.Sprintf("[%s] Excluding host collector", collector.Title()) - opts.CollectorProgressCallback(opts.ProgressChan, msg) - span.SetAttributes(attribute.Bool(constants.EXCLUDED, true)) - span.End() - continue - } + isExcluded, _ := collector.IsExcluded() + if isExcluded { + msg := fmt.Sprintf("[%s] Excluding host collector", collector.Title()) + opts.CollectorProgressCallback(opts.ProgressChan, msg) + span.SetAttributes(attribute.Bool(constants.EXCLUDED, true)) + span.End() + continue + } - // Send progress event: starting the collector - msg := fmt.Sprintf("[%s] Running host collector...", collector.Title()) - opts.CollectorProgressCallback(opts.ProgressChan, msg) + // Send progress event: starting the collector + msg := fmt.Sprintf("[%s] Running host collector...", collector.Title()) + opts.CollectorProgressCallback(opts.ProgressChan, msg) - // convert host collectors into a HostCollector spec - spec := createHostCollectorsSpec([]*troubleshootv1beta2.HostCollect{collectorSpec}) - specJSON, err := json.Marshal(spec) - if err != nil { + // convert host collectors into a HostCollector spec + spec := createHostCollectorsSpec([]*troubleshootv1beta2.HostCollect{collectorSpec}) + specJSON, err := json.Marshal(spec) + if err != nil { + return nil, err + } + klog.V(2).Infof("HostCollector spec: %s", specJSON) + for _, pod := range pods.Items { + eg.Go(func() error { + if err := waitForPodRunning(ctx, clientset, &pod); err != nil { return err } - klog.V(2).Infof("HostCollector spec: %s", specJSON) stdout, _, err := getExecOutputs(ctx, opts.KubernetesRestConfig, clientset, pod, specJSON) if err != nil { @@ -411,26 +409,25 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot opts.CollectorProgressCallback(opts.ProgressChan, msg) // Aggregate the results + mu.Lock() for file, data := range result { - results[file] = []byte(data) + if nodeLogs[pod.Spec.NodeName] == nil { + nodeLogs[pod.Spec.NodeName] = make(map[string][]byte) + } + nodeLogs[pod.Spec.NodeName][file] = []byte(data) } - - span.End() - time.Sleep(1 * time.Second) - } - - // wait for log stream to catch up + mu.Unlock() + return nil + }) time.Sleep(1 * time.Second) + } - mu.Lock() - nodeLogs[pod.Spec.NodeName] = results - mu.Unlock() - return nil - }) - } - err = eg.Wait() - if err != nil { - return nil, err + err = eg.Wait() + if err != nil { + return nil, err + } + span.End() + time.Sleep(1 * time.Second) } klog.V(2).Infof("All remote host collectors completed") From 7e9b3048ae11a144079755af94223122138facd1 Mon Sep 17 00:00:00 2001 From: Dexter Yan Date: Wed, 6 Nov 2024 11:26:34 +1300 Subject: [PATCH 17/31] improve test log --- pkg/supportbundle/collect.go | 5 ++--- test/e2e/support-bundle/host_remote_collector_e2e_test.go | 4 +++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/supportbundle/collect.go b/pkg/supportbundle/collect.go index 01b0509f5..4f2cc51cb 100644 --- a/pkg/supportbundle/collect.go +++ b/pkg/supportbundle/collect.go @@ -10,7 +10,6 @@ import ( "reflect" "strings" "sync" - "time" "github.com/pkg/errors" analyze "github.com/replicatedhq/troubleshoot/pkg/analyze" @@ -419,7 +418,7 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot mu.Unlock() return nil }) - time.Sleep(1 * time.Second) + // time.Sleep(1 * time.Second) } err = eg.Wait() @@ -427,7 +426,7 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot return nil, err } span.End() - time.Sleep(1 * time.Second) + // time.Sleep(1 * time.Second) } klog.V(2).Infof("All remote host collectors completed") diff --git a/test/e2e/support-bundle/host_remote_collector_e2e_test.go b/test/e2e/support-bundle/host_remote_collector_e2e_test.go index f8538e254..a4a2346a1 100644 --- a/test/e2e/support-bundle/host_remote_collector_e2e_test.go +++ b/test/e2e/support-bundle/host_remote_collector_e2e_test.go @@ -16,12 +16,14 @@ func TestHostRemoteCollector(t *testing.T) { feature := features.New("Host OS Remote Collector Test"). Assess("run support bundle command successfully", func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context { var out bytes.Buffer + var errOut bytes.Buffer supportbundleName := "host-os-remote-collector" cmd := exec.CommandContext(ctx, sbBinary(), "spec/remoteHostCollectors.yaml", "--interactive=false", fmt.Sprintf("-o=%s", supportbundleName)) cmd.Stdout = &out + cmd.Stderr = &errOut err := cmd.Run() if err != nil { - t.Fatalf("Failed to run the binary: %v", err) + t.Fatalf("Failed to run the binary: %v\n%s", err, errOut.String()) } defer func() { From ebfa828089a8e95ee11f06351132dabd42cf8f81 Mon Sep 17 00:00:00 2001 From: Dexter Yan Date: Wed, 6 Nov 2024 12:00:56 +1300 Subject: [PATCH 18/31] add sleep for test --- test/e2e/support-bundle/host_remote_collector_e2e_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/e2e/support-bundle/host_remote_collector_e2e_test.go b/test/e2e/support-bundle/host_remote_collector_e2e_test.go index a4a2346a1..f56fc1a3f 100644 --- a/test/e2e/support-bundle/host_remote_collector_e2e_test.go +++ b/test/e2e/support-bundle/host_remote_collector_e2e_test.go @@ -7,6 +7,7 @@ import ( "os" "os/exec" "testing" + "time" "sigs.k8s.io/e2e-framework/pkg/envconf" "sigs.k8s.io/e2e-framework/pkg/features" @@ -18,6 +19,7 @@ func TestHostRemoteCollector(t *testing.T) { var out bytes.Buffer var errOut bytes.Buffer supportbundleName := "host-os-remote-collector" + time.Sleep(5 * time.Second) cmd := exec.CommandContext(ctx, sbBinary(), "spec/remoteHostCollectors.yaml", "--interactive=false", fmt.Sprintf("-o=%s", supportbundleName)) cmd.Stdout = &out cmd.Stderr = &errOut From 01c643a93d8a6d31f696906d944da57ba2b5315e Mon Sep 17 00:00:00 2001 From: Dexter Yan Date: Wed, 6 Nov 2024 12:52:04 +1300 Subject: [PATCH 19/31] use unique name for ds --- pkg/supportbundle/collect.go | 5 ++--- test/e2e/support-bundle/host_remote_collector_e2e_test.go | 2 -- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/pkg/supportbundle/collect.go b/pkg/supportbundle/collect.go index 4f2cc51cb..f572c0f45 100644 --- a/pkg/supportbundle/collect.go +++ b/pkg/supportbundle/collect.go @@ -27,6 +27,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apiserver/pkg/storage/names" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/remotecommand" @@ -418,7 +419,6 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot mu.Unlock() return nil }) - // time.Sleep(1 * time.Second) } err = eg.Wait() @@ -426,7 +426,6 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot return nil, err } span.End() - // time.Sleep(1 * time.Second) } klog.V(2).Infof("All remote host collectors completed") @@ -532,7 +531,7 @@ func createHostCollectorDS(ctx context.Context, clientset kubernetes.Interface, ds := &appsv1.DaemonSet{ ObjectMeta: metav1.ObjectMeta{ - Name: "remote-host-collector", + Name: names.SimpleNameGenerator.GenerateName("remote-host-collector" + "-"), Namespace: ns, Labels: labels, }, diff --git a/test/e2e/support-bundle/host_remote_collector_e2e_test.go b/test/e2e/support-bundle/host_remote_collector_e2e_test.go index f56fc1a3f..a4a2346a1 100644 --- a/test/e2e/support-bundle/host_remote_collector_e2e_test.go +++ b/test/e2e/support-bundle/host_remote_collector_e2e_test.go @@ -7,7 +7,6 @@ import ( "os" "os/exec" "testing" - "time" "sigs.k8s.io/e2e-framework/pkg/envconf" "sigs.k8s.io/e2e-framework/pkg/features" @@ -19,7 +18,6 @@ func TestHostRemoteCollector(t *testing.T) { var out bytes.Buffer var errOut bytes.Buffer supportbundleName := "host-os-remote-collector" - time.Sleep(5 * time.Second) cmd := exec.CommandContext(ctx, sbBinary(), "spec/remoteHostCollectors.yaml", "--interactive=false", fmt.Sprintf("-o=%s", supportbundleName)) cmd.Stdout = &out cmd.Stderr = &errOut From ee8f11dd6e8fd0794950cd2f993d32eb2d11d813 Mon Sep 17 00:00:00 2001 From: Dexter Yan Date: Wed, 6 Nov 2024 13:50:17 +1300 Subject: [PATCH 20/31] improve ds wait check --- pkg/supportbundle/collect.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/supportbundle/collect.go b/pkg/supportbundle/collect.go index f572c0f45..201deae1c 100644 --- a/pkg/supportbundle/collect.go +++ b/pkg/supportbundle/collect.go @@ -602,7 +602,7 @@ func waitForDS(ctx context.Context, clientset kubernetes.Interface, ds *appsv1.D if !ok { continue } - if dsEvent.Status.NumberReady > 0 { + if dsEvent.Status.NumberReady > 0 && dsEvent.Status.DesiredNumberScheduled == dsEvent.Status.NumberReady { return nil } } From f87e9c4cf9e0c230a65e46123f404b9a4200522a Mon Sep 17 00:00:00 2001 From: Dexter Yan Date: Wed, 6 Nov 2024 15:57:06 +1300 Subject: [PATCH 21/31] improve wait for pod --- pkg/supportbundle/collect.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/supportbundle/collect.go b/pkg/supportbundle/collect.go index 201deae1c..0df3a8988 100644 --- a/pkg/supportbundle/collect.go +++ b/pkg/supportbundle/collect.go @@ -579,7 +579,7 @@ func waitForPodRunning(ctx context.Context, clientset kubernetes.Interface, pod } for _, containerStatus := range podEvent.Status.ContainerStatuses { if containerStatus.Name == "remote-collector" { - if containerStatus.State.Running != nil { + if containerStatus.State.Running != nil && containerStatus.State.Terminated == nil { return nil } } From 147fb0a29e586650f4d87905502f9baba1a742e8 Mon Sep 17 00:00:00 2001 From: Dexter Yan Date: Wed, 6 Nov 2024 16:06:17 +1300 Subject: [PATCH 22/31] add pod ready --- pkg/supportbundle/collect.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/supportbundle/collect.go b/pkg/supportbundle/collect.go index 0df3a8988..d323d41d0 100644 --- a/pkg/supportbundle/collect.go +++ b/pkg/supportbundle/collect.go @@ -579,7 +579,7 @@ func waitForPodRunning(ctx context.Context, clientset kubernetes.Interface, pod } for _, containerStatus := range podEvent.Status.ContainerStatuses { if containerStatus.Name == "remote-collector" { - if containerStatus.State.Running != nil && containerStatus.State.Terminated == nil { + if containerStatus.State.Running != nil && containerStatus.State.Terminated == nil && containerStatus.Ready { return nil } } From c80a99f7ca549a789eb2afa7c2257df3822248c4 Mon Sep 17 00:00:00 2001 From: Dexter Yan Date: Wed, 6 Nov 2024 16:27:47 +1300 Subject: [PATCH 23/31] fix label selector issue --- pkg/supportbundle/collect.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/pkg/supportbundle/collect.go b/pkg/supportbundle/collect.go index d323d41d0..099de817e 100644 --- a/pkg/supportbundle/collect.go +++ b/pkg/supportbundle/collect.go @@ -344,7 +344,7 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot klog.V(2).Infof("Created Remote Host Collector Daemonset %s", ds.Name) pods, err := clientset.CoreV1().Pods(ds.Namespace).List(ctx, metav1.ListOptions{ - LabelSelector: selectorLabelKey + "=" + selectorLabelValue, + LabelSelector: selectorLabelKey + "=" + ds.Name, TimeoutSeconds: new(int64), Limit: 0, }) @@ -489,11 +489,12 @@ func createHostCollectorsSpec(hostCollectors []*troubleshootv1beta2.HostCollect) } func createHostCollectorDS(ctx context.Context, clientset kubernetes.Interface, labels map[string]string) (*appsv1.DaemonSet, error) { + dsName := names.SimpleNameGenerator.GenerateName("remote-host-collector" + "-") ns := "default" imageName := "replicated/troubleshoot:latest" imagePullPolicy := corev1.PullAlways - labels[selectorLabelKey] = selectorLabelValue + labels[selectorLabelKey] = dsName podSpec := corev1.PodSpec{ HostNetwork: true, @@ -531,7 +532,7 @@ func createHostCollectorDS(ctx context.Context, clientset kubernetes.Interface, ds := &appsv1.DaemonSet{ ObjectMeta: metav1.ObjectMeta{ - Name: names.SimpleNameGenerator.GenerateName("remote-host-collector" + "-"), + Name: dsName, Namespace: ns, Labels: labels, }, @@ -542,7 +543,7 @@ func createHostCollectorDS(ctx context.Context, clientset kubernetes.Interface, { Key: selectorLabelKey, Operator: "In", - Values: []string{selectorLabelValue}, + Values: []string{dsName}, }, }, }, @@ -556,6 +557,7 @@ func createHostCollectorDS(ctx context.Context, clientset kubernetes.Interface, } createdDS, err := clientset.AppsV1().DaemonSets(ns).Create(ctx, ds, metav1.CreateOptions{}) + if err != nil { return nil, errors.Wrap(err, "failed to create Remote Host Collector Pod") } From 4bc655c6121cdadb6eff88eb82ac1370a9f58b04 Mon Sep 17 00:00:00 2001 From: Dexter Yan Date: Wed, 6 Nov 2024 16:49:14 +1300 Subject: [PATCH 24/31] improve defer delete --- pkg/supportbundle/collect.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/supportbundle/collect.go b/pkg/supportbundle/collect.go index 099de817e..b1cd576c6 100644 --- a/pkg/supportbundle/collect.go +++ b/pkg/supportbundle/collect.go @@ -25,6 +25,7 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" + kuberneteserrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apiserver/pkg/storage/names" @@ -435,14 +436,13 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot // delete the config map // delete the remote pods // check if the daemonset still exists - _, err := clientset.AppsV1().DaemonSets(ds.Namespace).Get(ctx, ds.Name, metav1.GetOptions{}) - if err != nil { - klog.Errorf("Failed to verify remote host collector daemonset %s still exists: %v", ds.Name, err) - return - } - if err := clientset.AppsV1().DaemonSets(ds.Namespace).Delete(ctx, ds.Name, metav1.DeleteOptions{}); err != nil { - klog.Errorf("Failed to delete remote host collector daemonset %s: %v", ds.Name, err) + if kuberneteserrors.IsNotFound(err) { + klog.Errorf("Remote host collector daemonset %s not found", ds.Name) + } else { + klog.Errorf("Failed to delete remote host collector daemonset %s: %v", ds.Name, err) + } + return } }() From 6ce12b5a943ac8393fed86406a1f42353c6bce1f Mon Sep 17 00:00:00 2001 From: Dexter Yan Date: Wed, 6 Nov 2024 23:48:03 +1300 Subject: [PATCH 25/31] improve TimeoutSeconds, ds nil check and use PullIfNotPresent --- pkg/supportbundle/collect.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/pkg/supportbundle/collect.go b/pkg/supportbundle/collect.go index b1cd576c6..78d2d2492 100644 --- a/pkg/supportbundle/collect.go +++ b/pkg/supportbundle/collect.go @@ -345,8 +345,9 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot klog.V(2).Infof("Created Remote Host Collector Daemonset %s", ds.Name) pods, err := clientset.CoreV1().Pods(ds.Namespace).List(ctx, metav1.ListOptions{ - LabelSelector: selectorLabelKey + "=" + ds.Name, - TimeoutSeconds: new(int64), + LabelSelector: selectorLabelKey + "=" + ds.Name, + // use the default logs collector timeout for now + TimeoutSeconds: ptr.To(int64(constants.DEFAULT_LOGS_COLLECTOR_TIMEOUT.Seconds())), Limit: 0, }) if err != nil { @@ -436,6 +437,10 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot // delete the config map // delete the remote pods // check if the daemonset still exists + if ds == nil || ds.Name == "" { + return + } + if err := clientset.AppsV1().DaemonSets(ds.Namespace).Delete(ctx, ds.Name, metav1.DeleteOptions{}); err != nil { if kuberneteserrors.IsNotFound(err) { klog.Errorf("Remote host collector daemonset %s not found", ds.Name) @@ -492,7 +497,7 @@ func createHostCollectorDS(ctx context.Context, clientset kubernetes.Interface, dsName := names.SimpleNameGenerator.GenerateName("remote-host-collector" + "-") ns := "default" imageName := "replicated/troubleshoot:latest" - imagePullPolicy := corev1.PullAlways + imagePullPolicy := corev1.PullIfNotPresent labels[selectorLabelKey] = dsName From 34acec69d88aee3113fbc5c1da1c77dbf677c2a2 Mon Sep 17 00:00:00 2001 From: Dexter Yan Date: Thu, 7 Nov 2024 10:01:15 +1300 Subject: [PATCH 26/31] use polling method --- pkg/supportbundle/collect.go | 75 ++++++++++++++++++++---------------- 1 file changed, 42 insertions(+), 33 deletions(-) diff --git a/pkg/supportbundle/collect.go b/pkg/supportbundle/collect.go index 78d2d2492..c1b5b34f0 100644 --- a/pkg/supportbundle/collect.go +++ b/pkg/supportbundle/collect.go @@ -10,6 +10,7 @@ import ( "reflect" "strings" "sync" + "time" "github.com/pkg/errors" analyze "github.com/replicatedhq/troubleshoot/pkg/analyze" @@ -39,6 +40,7 @@ import ( const ( selectorLabelKey = "ds-selector-label" selectorLabelValue = "remote-host-collector" + defaultTimeout = 30 ) func runHostCollectors(ctx context.Context, hostCollectors []*troubleshootv1beta2.HostCollect, additionalRedactors *troubleshootv1beta2.Redactor, bundlePath string, opts SupportBundleCreateOpts) (collect.CollectorResult, error) { @@ -347,7 +349,7 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot pods, err := clientset.CoreV1().Pods(ds.Namespace).List(ctx, metav1.ListOptions{ LabelSelector: selectorLabelKey + "=" + ds.Name, // use the default logs collector timeout for now - TimeoutSeconds: ptr.To(int64(constants.DEFAULT_LOGS_COLLECTOR_TIMEOUT.Seconds())), + TimeoutSeconds: ptr.To(int64(defaultTimeout)), Limit: 0, }) if err != nil { @@ -571,47 +573,54 @@ func createHostCollectorDS(ctx context.Context, clientset kubernetes.Interface, } func waitForPodRunning(ctx context.Context, clientset kubernetes.Interface, pod *corev1.Pod) error { - watcher, err := clientset.CoreV1().Pods(pod.Namespace).Watch(ctx, metav1.ListOptions{ - FieldSelector: fmt.Sprintf("metadata.name=%s", pod.Name), - }) - if err != nil { - return err - } - defer watcher.Stop() + timeoutCh := time.After(defaultTimeout * time.Second) + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() - for event := range watcher.ResultChan() { - podEvent, ok := event.Object.(*v1.Pod) - if !ok { - continue - } - for _, containerStatus := range podEvent.Status.ContainerStatuses { - if containerStatus.Name == "remote-collector" { - if containerStatus.State.Running != nil && containerStatus.State.Terminated == nil && containerStatus.Ready { - return nil + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-timeoutCh: + return fmt.Errorf("timed out waiting for pod %s to be running", pod.Name) + case <-ticker.C: + currentPod, err := clientset.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to get pod %s: %w", pod.Name, err) + } + + // Check container status + for _, containerStatus := range currentPod.Status.ContainerStatuses { + if containerStatus.Name == "remote-collector" { + if containerStatus.State.Running != nil && containerStatus.State.Terminated == nil && containerStatus.Ready { + return nil + } } } } } - return fmt.Errorf("pod %s did not complete", pod.Name) } func waitForDS(ctx context.Context, clientset kubernetes.Interface, ds *appsv1.DaemonSet) error { - watcher, err := clientset.AppsV1().DaemonSets(ds.Namespace).Watch(ctx, metav1.ListOptions{ - FieldSelector: fmt.Sprintf("metadata.name=%s", ds.Name), - }) - if err != nil { - return err - } - defer watcher.Stop() + timeoutCh := time.After(defaultTimeout * time.Second) + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() - for event := range watcher.ResultChan() { - dsEvent, ok := event.Object.(*appsv1.DaemonSet) - if !ok { - continue - } - if dsEvent.Status.NumberReady > 0 && dsEvent.Status.DesiredNumberScheduled == dsEvent.Status.NumberReady { - return nil + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-timeoutCh: + return fmt.Errorf("timed out waiting for DaemonSet %s to be ready", ds.Name) + case <-ticker.C: + currentDS, err := clientset.AppsV1().DaemonSets(ds.Namespace).Get(ctx, ds.Name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to get DaemonSet %s: %w", ds.Name, err) + } + + if currentDS.Status.NumberReady > 0 && currentDS.Status.DesiredNumberScheduled == currentDS.Status.NumberReady { + return nil + } } } - return fmt.Errorf("pod %s did not complete", ds.Name) } From c4ae3ff4daf5a93c671a01dd86c419f89e9c2ef7 Mon Sep 17 00:00:00 2001 From: Dexter Yan Date: Thu, 7 Nov 2024 10:26:09 +1300 Subject: [PATCH 27/31] use tail -f to keep live --- pkg/supportbundle/collect.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/supportbundle/collect.go b/pkg/supportbundle/collect.go index c1b5b34f0..90f80d72c 100644 --- a/pkg/supportbundle/collect.go +++ b/pkg/supportbundle/collect.go @@ -512,8 +512,7 @@ func createHostCollectorDS(ctx context.Context, clientset kubernetes.Interface, Image: imageName, ImagePullPolicy: imagePullPolicy, Name: "remote-collector", - Command: []string{"/bin/bash", "-c"}, - Args: []string{"while true; do sleep 30; done;"}, + Command: []string{"tail", "-f", "/dev/null"}, SecurityContext: &corev1.SecurityContext{ Privileged: ptr.To(true), }, From 8e2130ef2b531074fca7959140c3b2b695d4de71 Mon Sep 17 00:00:00 2001 From: Dexter Yan Date: Thu, 7 Nov 2024 11:16:22 +1300 Subject: [PATCH 28/31] allow passing namespace --- pkg/supportbundle/collect.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/supportbundle/collect.go b/pkg/supportbundle/collect.go index 90f80d72c..ca089e8fc 100644 --- a/pkg/supportbundle/collect.go +++ b/pkg/supportbundle/collect.go @@ -334,7 +334,7 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot var mu sync.Mutex nodeLogs := make(map[string]map[string][]byte) - ds, err := createHostCollectorDS(ctx, clientset, labels) + ds, err := createHostCollectorDS(ctx, clientset, labels, "default") if err != nil { return nil, err } @@ -495,9 +495,8 @@ func createHostCollectorsSpec(hostCollectors []*troubleshootv1beta2.HostCollect) } } -func createHostCollectorDS(ctx context.Context, clientset kubernetes.Interface, labels map[string]string) (*appsv1.DaemonSet, error) { +func createHostCollectorDS(ctx context.Context, clientset kubernetes.Interface, labels map[string]string, ns string) (*appsv1.DaemonSet, error) { dsName := names.SimpleNameGenerator.GenerateName("remote-host-collector" + "-") - ns := "default" imageName := "replicated/troubleshoot:latest" imagePullPolicy := corev1.PullIfNotPresent From 8d476b6a12705b6bd3cddc864dd04a0c303c2223 Mon Sep 17 00:00:00 2001 From: Dexter Yan Date: Thu, 7 Nov 2024 12:13:32 +1300 Subject: [PATCH 29/31] use version to pin troubleshoot image --- pkg/supportbundle/collect.go | 2 +- pkg/version/version.go | 20 ++++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/pkg/supportbundle/collect.go b/pkg/supportbundle/collect.go index ca089e8fc..09d3d0e0c 100644 --- a/pkg/supportbundle/collect.go +++ b/pkg/supportbundle/collect.go @@ -497,7 +497,7 @@ func createHostCollectorsSpec(hostCollectors []*troubleshootv1beta2.HostCollect) func createHostCollectorDS(ctx context.Context, clientset kubernetes.Interface, labels map[string]string, ns string) (*appsv1.DaemonSet, error) { dsName := names.SimpleNameGenerator.GenerateName("remote-host-collector" + "-") - imageName := "replicated/troubleshoot:latest" + imageName := "replicated/troubleshoot:" + version.SemVersion() imagePullPolicy := corev1.PullIfNotPresent labels[selectorLabelKey] = dsName diff --git a/pkg/version/version.go b/pkg/version/version.go index 945d8bcad..be8962357 100644 --- a/pkg/version/version.go +++ b/pkg/version/version.go @@ -4,6 +4,7 @@ import ( "fmt" "runtime" "runtime/debug" + "strings" "time" "github.com/pkg/errors" @@ -23,6 +24,7 @@ type Build struct { TimeFallback string `json:"buildTimeFallback,omitempty"` GoInfo GoInfo `json:"go,omitempty"` RunAt *time.Time `json:"runAt,omitempty"` + SemVersion string `json:"semVersion,omitempty"` } type GoInfo struct { @@ -54,6 +56,8 @@ func initBuild() { } build.Version = version + build.SemVersion = extractSemVer(version) + if len(gitSHA) >= 7 { build.GitSHA = gitSHA[:7] } @@ -78,6 +82,10 @@ func Version() string { return build.Version } +func SemVersion() string { + return build.SemVersion +} + // GitSHA gets the gitsha func GitSHA() string { return build.GitSHA @@ -118,3 +126,15 @@ func GetVersionFile() (string, error) { return string(b), nil } + +// extractSemVer extracts the semantic version from the full version string. +func extractSemVer(fullVersion string) string { + // Remove the leading 'v' if it exists + fullVersion = strings.TrimPrefix(fullVersion, "v") + // Split the string by '-' and return the first part + parts := strings.Split(fullVersion, "-") + if len(parts) > 0 { + return parts[0] + } + return fullVersion // Fallback to the original string if no '-' is found +} From a56784330bc0c3ba54659c141e356ae3c99e6841 Mon Sep 17 00:00:00 2001 From: Dexter Yan Date: Thu, 7 Nov 2024 14:01:01 +1300 Subject: [PATCH 30/31] reverse image version --- pkg/supportbundle/collect.go | 2 +- pkg/version/version.go | 20 -------------------- 2 files changed, 1 insertion(+), 21 deletions(-) diff --git a/pkg/supportbundle/collect.go b/pkg/supportbundle/collect.go index 09d3d0e0c..ca089e8fc 100644 --- a/pkg/supportbundle/collect.go +++ b/pkg/supportbundle/collect.go @@ -497,7 +497,7 @@ func createHostCollectorsSpec(hostCollectors []*troubleshootv1beta2.HostCollect) func createHostCollectorDS(ctx context.Context, clientset kubernetes.Interface, labels map[string]string, ns string) (*appsv1.DaemonSet, error) { dsName := names.SimpleNameGenerator.GenerateName("remote-host-collector" + "-") - imageName := "replicated/troubleshoot:" + version.SemVersion() + imageName := "replicated/troubleshoot:latest" imagePullPolicy := corev1.PullIfNotPresent labels[selectorLabelKey] = dsName diff --git a/pkg/version/version.go b/pkg/version/version.go index be8962357..945d8bcad 100644 --- a/pkg/version/version.go +++ b/pkg/version/version.go @@ -4,7 +4,6 @@ import ( "fmt" "runtime" "runtime/debug" - "strings" "time" "github.com/pkg/errors" @@ -24,7 +23,6 @@ type Build struct { TimeFallback string `json:"buildTimeFallback,omitempty"` GoInfo GoInfo `json:"go,omitempty"` RunAt *time.Time `json:"runAt,omitempty"` - SemVersion string `json:"semVersion,omitempty"` } type GoInfo struct { @@ -56,8 +54,6 @@ func initBuild() { } build.Version = version - build.SemVersion = extractSemVer(version) - if len(gitSHA) >= 7 { build.GitSHA = gitSHA[:7] } @@ -82,10 +78,6 @@ func Version() string { return build.Version } -func SemVersion() string { - return build.SemVersion -} - // GitSHA gets the gitsha func GitSHA() string { return build.GitSHA @@ -126,15 +118,3 @@ func GetVersionFile() (string, error) { return string(b), nil } - -// extractSemVer extracts the semantic version from the full version string. -func extractSemVer(fullVersion string) string { - // Remove the leading 'v' if it exists - fullVersion = strings.TrimPrefix(fullVersion, "v") - // Split the string by '-' and return the first part - parts := strings.Split(fullVersion, "-") - if len(parts) > 0 { - return parts[0] - } - return fullVersion // Fallback to the original string if no '-' is found -} From 76ad948007edd6cc1d8f146c42d78dc5440cfe32 Mon Sep 17 00:00:00 2001 From: Dexter Yan Date: Mon, 11 Nov 2024 17:21:21 +1300 Subject: [PATCH 31/31] fix save node_list.json fail --- pkg/supportbundle/collect.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/supportbundle/collect.go b/pkg/supportbundle/collect.go index bd5c1d505..cb3f508a2 100644 --- a/pkg/supportbundle/collect.go +++ b/pkg/supportbundle/collect.go @@ -352,7 +352,7 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot var eg errgroup.Group - if err := saveNodeList(opts, bundlePath); err != nil { + if err := saveNodeList(output, opts, bundlePath); err != nil { return nil, err } @@ -609,9 +609,7 @@ func waitForDS(ctx context.Context, clientset kubernetes.Interface, ds *appsv1.D } } -func saveNodeList(opts SupportBundleCreateOpts, bundlePath string) error { - result := make(collect.CollectorResult) - +func saveNodeList(result collect.CollectorResult, opts SupportBundleCreateOpts, bundlePath string) error { clientset, err := kubernetes.NewForConfig(opts.KubernetesRestConfig) if err != nil { return errors.Wrap(err, "failed to create kubernetes clientset to run host collectors in pod")