diff --git a/cmd/containerwatcher/iface.go b/cmd/containerwatcher/iface.go index 081f236..c684ede 100644 --- a/cmd/containerwatcher/iface.go +++ b/cmd/containerwatcher/iface.go @@ -20,16 +20,12 @@ const ( // Uses a success file to determine if the container has completed. // CAUTION: Does not work if the container exits because of OOM, etc WatcherTypeFile WatcherType = "file" - // Uses Kube 1.17 feature - https://kubernetes.io/docs/tasks/configure-pod-container/share-process-namespace/ - // To look for pid in the shared namespace. - WatcherTypeSharedProcessNS WatcherType = "shared-process-ns" // Dummy watcher. Exits immediately, assuming success WatcherTypeNoop WatcherType = "noop" ) var AllWatcherTypes = []WatcherType{ WatcherTypeKubeAPI, - WatcherTypeSharedProcessNS, WatcherTypeFile, WatcherTypeNoop, } diff --git a/cmd/containerwatcher/spns_watcher.go b/cmd/containerwatcher/spns_watcher.go deleted file mode 100644 index aad6109..0000000 --- a/cmd/containerwatcher/spns_watcher.go +++ /dev/null @@ -1,133 +0,0 @@ -package containerwatcher - -import ( - "context" - "os" - "time" - - "github.com/flyteorg/flytestdlib/logger" - "github.com/mitchellh/go-ps" - "github.com/pkg/errors" - "k8s.io/apimachinery/pkg/util/sets" -) - -const ( - k8sPauseContainerPid = 1 - k8sAllowedParentPid = 0 -) - -// given list of processes returns a list of processes such that, -// the pid does not match any of the given filterPids, this is to filter the /pause and current process. -// and the parentPid is the allowedParentPid. The logic for this is because every process in the shared namespace -// always has a parent pid of 0 -func FilterProcessList(procs []ps.Process, filterPids sets.Int, allowedParentPid int) ([]ps.Process, error) { - var filteredProcs []ps.Process - for _, p := range procs { - proc := p - if proc.PPid() == allowedParentPid { - if !filterPids.Has(proc.Pid()) { - filteredProcs = append(filteredProcs, proc) - } - } - } - return filteredProcs, nil -} - -type SharedNamespaceProcessLister struct { - // PID for the current process - currentProcessPid int - pidsToFilter sets.Int -} - -func (s *SharedNamespaceProcessLister) AnyProcessRunning(ctx context.Context) (bool, error) { - procs, err := s.ListRunningProcesses(ctx) - if err != nil { - return false, err - } - return len(procs) > 0, nil -} - -// Polls all processes and returns a filtered set. Refer to FilterProcessList for understanding the process of filtering -func (s *SharedNamespaceProcessLister) ListRunningProcesses(ctx context.Context) ([]ps.Process, error) { - procs, err := ps.Processes() - if err != nil { - return nil, errors.Wrap(err, "Failed to list processes") - } - filteredProcs, err := FilterProcessList(procs, s.pidsToFilter, k8sAllowedParentPid) - if err != nil { - return nil, errors.Wrapf(err, "failed to filter processes") - } - return filteredProcs, nil -} - -// The best option for this is to use https://kubernetes.io/docs/tasks/configure-pod-container/share-process-namespace/ -// This is only available as Beta as of 1.16, so we will launch with this feature only as beta -// But this is the most efficient way to monitor the pod -type sharedProcessNSWatcher struct { - // Rate at which to poll the process list - pollInterval time.Duration - // Number of cycles to wait before finalizing exit of container - cyclesToWait int - - s SharedNamespaceProcessLister -} - -func (k sharedProcessNSWatcher) wait(ctx context.Context, cyclesToWait int, f func(ctx context.Context, otherProcessRunning bool) bool) error { - t := time.NewTicker(k.pollInterval) - defer t.Stop() - cyclesOfMissingProcesses := 0 - for { - select { - case <-ctx.Done(): - logger.Infof(ctx, "Context canceled") - return ErrTimeout - case <-t.C: - logger.Infof(ctx, "Checking processes to see if any process were started...") - yes, err := k.s.AnyProcessRunning(ctx) - if err != nil { - return err - } - if f(ctx, yes) { - cyclesOfMissingProcesses++ - if cyclesOfMissingProcesses >= cyclesToWait { - logger.Infof(ctx, "Exiting wait loop") - return nil - } - } - logger.Infof(ctx, "process not yet started") - } - } -} - -func (k sharedProcessNSWatcher) WaitToStart(ctx context.Context) error { - logger.Infof(ctx, "SNPS Watcher waiting for other processes to start") - defer logger.Infof(ctx, "SNPS Watcher detected process start") - return k.wait(ctx, 1, func(ctx context.Context, otherProcessRunning bool) bool { - return otherProcessRunning - }) -} - -func (k sharedProcessNSWatcher) WaitToExit(ctx context.Context) error { - logger.Infof(ctx, "SNPS Watcher waiting for other process to exit") - defer logger.Infof(ctx, "SNPS Watcher detected process exit") - return k.wait(ctx, k.cyclesToWait, func(ctx context.Context, otherProcessRunning bool) bool { - return !otherProcessRunning - }) -} - -// c -> clock.Clock allows for injecting a fake clock. The watcher uses a timer -// pollInterval -> time.Duration, wait for this amount of time between successive process checks -// waitNumIntervalsBeforeFinalize -> Number of successive poll intervals of missing processes for the container, before assuming process is complete. 0/1 indicate the first time a process is detected to be missing, the wait if finalized. -// containerStartupTimeout -> Duration for which to wait for the container to start up. If the container has not started up in this time, exit with error. -func NewSharedProcessNSWatcher(ctx context.Context, pollInterval time.Duration, waitNumIntervalsBeforeFinalize int) (Watcher, error) { - logger.Infof(ctx, "SNPS created with poll interval %s", pollInterval.String()) - currentPid := os.Getpid() - return sharedProcessNSWatcher{ - pollInterval: pollInterval, - cyclesToWait: waitNumIntervalsBeforeFinalize, - s: SharedNamespaceProcessLister{ - currentProcessPid: currentPid, - pidsToFilter: sets.NewInt(currentPid, k8sPauseContainerPid), - }, - }, nil -} diff --git a/cmd/sidecar.go b/cmd/sidecar.go index 8b074b5..58669fc 100644 --- a/cmd/sidecar.go +++ b/cmd/sidecar.go @@ -33,14 +33,15 @@ type UploadOptions struct { // Local directory path where the sidecar should look for outputs. localDirectoryPath string // Non primitive types will be dumped in this output format - metadataFormat string - uploadMode string - timeout time.Duration - containerStartTimeout time.Duration - typedInterface []byte - startWatcherType containerwatcher.WatcherType - exitWatcherType containerwatcher.WatcherType - containerInfo containerwatcher.ContainerInformation + metadataFormat string + uploadMode string + timeout time.Duration + containerStartTimeout time.Duration + containerFinishTimeout time.Duration + typedInterface []byte + startWatcherType containerwatcher.WatcherType + exitWatcherType containerwatcher.WatcherType + containerInfo containerwatcher.ContainerInformation } func (u *UploadOptions) createWatcher(ctx context.Context, w containerwatcher.WatcherType) (containerwatcher.Watcher, error) { @@ -51,8 +52,6 @@ func (u *UploadOptions) createWatcher(ctx context.Context, w containerwatcher.Wa return containerwatcher.NewKubeAPIWatcher(ctx, u.RootOptions.kubeClient.CoreV1(), u.containerInfo) case containerwatcher.WatcherTypeFile: return containerwatcher.NewSuccessFileWatcher(ctx, u.localDirectoryPath, StartFile, SuccessFile, ErrorFile) - case containerwatcher.WatcherTypeSharedProcessNS: - return containerwatcher.NewSharedProcessNSWatcher(ctx, time.Second*2, 2) case containerwatcher.WatcherTypeNoop: return containerwatcher.NoopWatcher{}, nil } @@ -113,9 +112,11 @@ func (u *UploadOptions) uploader(ctx context.Context) error { } } - logger.Infof(ctx, "Waiting for Container to exit.") - if err := w.WaitToExit(ctx); err != nil { - logger.Errorf(ctx, "Failed waiting for container to exit. Err: %s", err) + logger.Infof(ctx, "Waiting for Container to complete with timeout %s.", u.containerFinishTimeout) + childCtx, cancelFn = context.WithTimeout(ctx, u.containerFinishTimeout) + defer cancelFn() + err = w.WaitToExit(childCtx) + if err != nil && err != containerwatcher.ErrTimeout { return err } @@ -161,7 +162,7 @@ func NewUploadCommand(opts *RootOptions) *cobra.Command { // deleteCmd represents the delete command uploadCmd := &cobra.Command{ Use: "sidecar ", - Short: "uploads flyteData from the localpath to a remote dir.", + Short: "uploads flyteData from the local path to a remote dir.", Long: `Currently it looks at the outputs.pb and creates one file per variable.`, RunE: func(cmd *cobra.Command, args []string) error { return uploadOptions.Sidecar(context.Background()) @@ -177,8 +178,9 @@ func NewUploadCommand(opts *RootOptions) *cobra.Command { uploadCmd.Flags().DurationVarP(&uploadOptions.timeout, "timeout", "t", time.Hour*1, "Max time to allow for uploads to complete, default is 1H") uploadCmd.Flags().BytesBase64VarP(&uploadOptions.typedInterface, "interface", "i", nil, "Typed Interface - core.TypedInterface, base64 encoded string of the serialized protobuf") uploadCmd.Flags().DurationVarP(&uploadOptions.containerStartTimeout, "start-timeout", "", 0, "Max time to allow for container to startup. 0 indicates wait for ever.") - uploadCmd.Flags().StringVarP(&uploadOptions.startWatcherType, "start-watcher-type", "", containerwatcher.WatcherTypeSharedProcessNS, fmt.Sprintf("Sidecar will wait for container before starting upload process. Watcher type makes the type configurable. Available Type %+v", containerwatcher.AllWatcherTypes)) - uploadCmd.Flags().StringVarP(&uploadOptions.exitWatcherType, "exit-watcher-type", "", containerwatcher.WatcherTypeSharedProcessNS, fmt.Sprintf("Sidecar will wait for completion of the container before starting upload process. Watcher type makes the type configurable. Available Type %+v", containerwatcher.AllWatcherTypes)) + uploadCmd.Flags().DurationVarP(&uploadOptions.containerFinishTimeout, "finish-timeout", "", time.Hour*24, "Max time to allow for container to execute. 0 indicates wait for ever.") + uploadCmd.Flags().StringVarP(&uploadOptions.startWatcherType, "start-watcher-type", "", containerwatcher.WatcherTypeFile, fmt.Sprintf("Sidecar will wait for container before starting upload process. Watcher type makes the type configurable. Available Type %+v", containerwatcher.AllWatcherTypes)) + uploadCmd.Flags().StringVarP(&uploadOptions.exitWatcherType, "exit-watcher-type", "", containerwatcher.WatcherTypeFile, fmt.Sprintf("Sidecar will wait for completion of the container before starting upload process. Watcher type makes the type configurable. Available Type %+v", containerwatcher.AllWatcherTypes)) uploadCmd.Flags().StringVarP(&uploadOptions.containerInfo.Name, "watch-container", "", "", "For KubeAPI watcher, Wait for this container to exit.") uploadCmd.Flags().StringVarP(&uploadOptions.containerInfo.Namespace, "namespace", "", "", "For KubeAPI watcher, Namespace of the pod [optional]") uploadCmd.Flags().StringVarP(&uploadOptions.containerInfo.Name, "pod-name", "", "", "For KubeAPI watcher, Name of the pod [optional].")