From ef8942ffe103013692647e164d001ea402248a2c Mon Sep 17 00:00:00 2001 From: Michael Street <5597260+MStreet3@users.noreply.github.com> Date: Tue, 26 Nov 2024 13:20:22 +0200 Subject: [PATCH] refactor(workflows/handlers): try to pop without failing --- .../workflows/syncer/engine_registry.go | 20 +++++++ core/services/workflows/syncer/handler.go | 57 +++++++++---------- 2 files changed, 48 insertions(+), 29 deletions(-) diff --git a/core/services/workflows/syncer/engine_registry.go b/core/services/workflows/syncer/engine_registry.go index 6dd54794e8b..8326ef715fd 100644 --- a/core/services/workflows/syncer/engine_registry.go +++ b/core/services/workflows/syncer/engine_registry.go @@ -36,6 +36,26 @@ func (r *engineRegistry) Get(id string) (*workflows.Engine, error) { return engine, nil } +// IsRunning is true if the engine exists and is ready. +func (r *engineRegistry) IsRunning(id string) bool { + r.mu.RLock() + defer r.mu.RUnlock() + engine, found := r.engines[id] + if !found { + return false + } + + return engine.Ready() == nil +} + +// Exists checks if an engine exists in the registry. +func (r *engineRegistry) Exists(id string) bool { + r.mu.RLock() + defer r.mu.RUnlock() + _, found := r.engines[id] + return found +} + // Pop removes an engine from the registry and returns the engine if found. func (r *engineRegistry) Pop(id string) (*workflows.Engine, error) { r.mu.Lock() diff --git a/core/services/workflows/syncer/handler.go b/core/services/workflows/syncer/handler.go index 924cb8e3dbf..7004c740c97 100644 --- a/core/services/workflows/syncer/handler.go +++ b/core/services/workflows/syncer/handler.go @@ -362,16 +362,9 @@ func (h *eventHandler) workflowUpdatedEvent( ctx context.Context, payload WorkflowRegistryWorkflowUpdatedV1, ) error { - oldWorkflowID := hex.EncodeToString(payload.OldWorkflowID[:]) - - // stop the old workflow engine - e, err := h.engineRegistry.Pop(oldWorkflowID) - if err != nil { - return fmt.Errorf("failed to get old workflow engine: %w", err) - } - - if err := e.Close(); err != nil { - return fmt.Errorf("failed to close old workflow engine: %w", err) + // Remove the old workflow engine from the local registry if it exists + if err := h.tryEngineCleanup(hex.EncodeToString(payload.OldWorkflowID[:])); err != nil { + return err } registeredEvent := WorkflowRegistryWorkflowRegisteredV1{ @@ -393,16 +386,9 @@ func (h *eventHandler) workflowPausedEvent( ctx context.Context, payload WorkflowRegistryWorkflowPausedV1, ) error { - wfID := hex.EncodeToString(payload.WorkflowID[:]) - - // Pop the workflow engine and close it - e, err := h.engineRegistry.Pop(wfID) - if err != nil { - return fmt.Errorf("failed to get workflow engine: %w", err) - } - err = e.Close() - if err != nil { - return fmt.Errorf("failed to close workflow engine: %w", err) + // Remove the workflow engine from the local registry if it exists + if err := h.tryEngineCleanup(hex.EncodeToString(payload.WorkflowID[:])); err != nil { + return err } // get existing workflow spec from DB @@ -432,7 +418,7 @@ func (h *eventHandler) workflowActivatedEvent( } // Do nothing if the workflow is already active - if spec.Status == job.WorkflowSpecStatusActive { + if spec.Status == job.WorkflowSpecStatusActive && h.engineRegistry.IsRunning(hex.EncodeToString(payload.WorkflowID[:])) { return nil } @@ -442,6 +428,7 @@ func (h *eventHandler) workflowActivatedEvent( return fmt.Errorf("failed to get secrets URL by ID: %w", err) } + // start a new workflow engine registeredEvent := WorkflowRegistryWorkflowRegisteredV1{ WorkflowID: payload.WorkflowID, WorkflowOwner: payload.WorkflowOwner, @@ -461,14 +448,8 @@ func (h *eventHandler) workflowDeletedEvent( ctx context.Context, payload WorkflowRegistryWorkflowDeletedV1, ) error { - wfID := hex.EncodeToString(payload.WorkflowID[:]) - - e, err := h.engineRegistry.Pop(wfID) - if err != nil { - return fmt.Errorf("failed to get workflow engine: %w", err) - } - if err := e.Close(); err != nil { - return fmt.Errorf("failed to close workflow engine: %w", err) + if err := h.tryEngineCleanup(hex.EncodeToString(payload.WorkflowID[:])); err != nil { + return err } if err := h.orm.DeleteWorkflowSpec(ctx, hex.EncodeToString(payload.WorkflowOwner), payload.WorkflowName); err != nil { @@ -504,6 +485,24 @@ func (h *eventHandler) forceUpdateSecretsEvent( return nil } +// tryEngineCleanup attempts to stop the workflow engine for the given workflow ID. Does nothing if the +// workflow engine is not running. +func (h *eventHandler) tryEngineCleanup(wfID string) error { + if h.engineRegistry.IsRunning(wfID) { + // Remove the engine from the registry + e, err := h.engineRegistry.Pop(wfID) + if err != nil { + return fmt.Errorf("failed to get workflow engine: %w", err) + } + + // Stop the engine + if err := e.Close(); err != nil { + return fmt.Errorf("failed to close workflow engine: %w", err) + } + } + return nil +} + // workflowID returns a hex encoded sha256 hash of the wasm, config and secretsURL. func workflowID(wasm, config, secretsURL []byte) string { sum := sha256.New()