Skip to content

Commit

Permalink
refactor(workflows/handlers): try to pop without failing
Browse files Browse the repository at this point in the history
  • Loading branch information
MStreet3 committed Nov 26, 2024
1 parent 3a86b46 commit ef8942f
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 29 deletions.
20 changes: 20 additions & 0 deletions core/services/workflows/syncer/engine_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,26 @@ func (r *engineRegistry) Get(id string) (*workflows.Engine, error) {
return engine, nil
}

// IsRunning is true if the engine exists and is ready.
func (r *engineRegistry) IsRunning(id string) bool {
r.mu.RLock()
defer r.mu.RUnlock()
engine, found := r.engines[id]
if !found {
return false
}

return engine.Ready() == nil
}

// Exists checks if an engine exists in the registry.
func (r *engineRegistry) Exists(id string) bool {
r.mu.RLock()
defer r.mu.RUnlock()
_, found := r.engines[id]
return found
}

// Pop removes an engine from the registry and returns the engine if found.
func (r *engineRegistry) Pop(id string) (*workflows.Engine, error) {
r.mu.Lock()
Expand Down
57 changes: 28 additions & 29 deletions core/services/workflows/syncer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,16 +362,9 @@ func (h *eventHandler) workflowUpdatedEvent(
ctx context.Context,
payload WorkflowRegistryWorkflowUpdatedV1,
) error {
oldWorkflowID := hex.EncodeToString(payload.OldWorkflowID[:])

// stop the old workflow engine
e, err := h.engineRegistry.Pop(oldWorkflowID)
if err != nil {
return fmt.Errorf("failed to get old workflow engine: %w", err)
}

if err := e.Close(); err != nil {
return fmt.Errorf("failed to close old workflow engine: %w", err)
// Remove the old workflow engine from the local registry if it exists
if err := h.tryEngineCleanup(hex.EncodeToString(payload.OldWorkflowID[:])); err != nil {
return err
}

registeredEvent := WorkflowRegistryWorkflowRegisteredV1{
Expand All @@ -393,16 +386,9 @@ func (h *eventHandler) workflowPausedEvent(
ctx context.Context,
payload WorkflowRegistryWorkflowPausedV1,
) error {
wfID := hex.EncodeToString(payload.WorkflowID[:])

// Pop the workflow engine and close it
e, err := h.engineRegistry.Pop(wfID)
if err != nil {
return fmt.Errorf("failed to get workflow engine: %w", err)
}
err = e.Close()
if err != nil {
return fmt.Errorf("failed to close workflow engine: %w", err)
// Remove the workflow engine from the local registry if it exists
if err := h.tryEngineCleanup(hex.EncodeToString(payload.WorkflowID[:])); err != nil {
return err
}

// get existing workflow spec from DB
Expand Down Expand Up @@ -432,7 +418,7 @@ func (h *eventHandler) workflowActivatedEvent(
}

// Do nothing if the workflow is already active
if spec.Status == job.WorkflowSpecStatusActive {
if spec.Status == job.WorkflowSpecStatusActive && h.engineRegistry.IsRunning(hex.EncodeToString(payload.WorkflowID[:])) {
return nil
}

Expand All @@ -442,6 +428,7 @@ func (h *eventHandler) workflowActivatedEvent(
return fmt.Errorf("failed to get secrets URL by ID: %w", err)
}

// start a new workflow engine
registeredEvent := WorkflowRegistryWorkflowRegisteredV1{
WorkflowID: payload.WorkflowID,
WorkflowOwner: payload.WorkflowOwner,
Expand All @@ -461,14 +448,8 @@ func (h *eventHandler) workflowDeletedEvent(
ctx context.Context,
payload WorkflowRegistryWorkflowDeletedV1,
) error {
wfID := hex.EncodeToString(payload.WorkflowID[:])

e, err := h.engineRegistry.Pop(wfID)
if err != nil {
return fmt.Errorf("failed to get workflow engine: %w", err)
}
if err := e.Close(); err != nil {
return fmt.Errorf("failed to close workflow engine: %w", err)
if err := h.tryEngineCleanup(hex.EncodeToString(payload.WorkflowID[:])); err != nil {
return err
}

if err := h.orm.DeleteWorkflowSpec(ctx, hex.EncodeToString(payload.WorkflowOwner), payload.WorkflowName); err != nil {
Expand Down Expand Up @@ -504,6 +485,24 @@ func (h *eventHandler) forceUpdateSecretsEvent(
return nil
}

// tryEngineCleanup attempts to stop the workflow engine for the given workflow ID. Does nothing if the
// workflow engine is not running.
func (h *eventHandler) tryEngineCleanup(wfID string) error {
if h.engineRegistry.IsRunning(wfID) {
// Remove the engine from the registry
e, err := h.engineRegistry.Pop(wfID)
if err != nil {
return fmt.Errorf("failed to get workflow engine: %w", err)
}

// Stop the engine
if err := e.Close(); err != nil {
return fmt.Errorf("failed to close workflow engine: %w", err)
}
}
return nil
}

// workflowID returns a hex encoded sha256 hash of the wasm, config and secretsURL.
func workflowID(wasm, config, secretsURL []byte) string {
sum := sha256.New()
Expand Down

0 comments on commit ef8942f

Please sign in to comment.