Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(workflows/handler): adds all event handlers #15400

Merged
merged 3 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions core/services/workflows/syncer/engine_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ func (r *engineRegistry) Get(id string) (*workflows.Engine, error) {
return engine, nil
}

// IsRunning is true if the engine exists and is ready.
func (r *engineRegistry) IsRunning(id string) bool {
r.mu.RLock()
defer r.mu.RUnlock()
engine, found := r.engines[id]
if !found {
return false
}

return engine.Ready() == nil
}

// Pop removes an engine from the registry and returns the engine if found.
func (r *engineRegistry) Pop(id string) (*workflows.Engine, error) {
r.mu.Lock()
Expand Down
194 changes: 181 additions & 13 deletions core/services/workflows/syncer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,82 @@ func (h *eventHandler) Handle(ctx context.Context, event WorkflowRegistryEvent)
h.lggr.Debugf("workflow 0x%x registered and started", wfID)
return nil
case WorkflowUpdatedEvent:
return h.workflowUpdatedEvent(ctx, event)
payload, ok := event.Data.(WorkflowRegistryWorkflowUpdatedV1)
if !ok {
return fmt.Errorf("invalid data type %T for event", event.Data)
}

newWorkflowID := hex.EncodeToString(payload.NewWorkflowID[:])
cma := h.emitter.With(
platform.KeyWorkflowID, newWorkflowID,
platform.KeyWorkflowName, payload.WorkflowName,
platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner),
)

if err := h.workflowUpdatedEvent(ctx, payload); err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow updated event: %v", err), h.lggr)
return err
}

return nil
case WorkflowPausedEvent:
return h.workflowPausedEvent(ctx, event)
payload, ok := event.Data.(WorkflowRegistryWorkflowPausedV1)
if !ok {
return fmt.Errorf("invalid data type %T for event", event.Data)
}

wfID := hex.EncodeToString(payload.WorkflowID[:])

cma := h.emitter.With(
platform.KeyWorkflowID, wfID,
platform.KeyWorkflowName, payload.WorkflowName,
platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner),
)

if err := h.workflowPausedEvent(ctx, payload); err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow paused event: %v", err), h.lggr)
return err
}
return nil
case WorkflowActivatedEvent:
return h.workflowActivatedEvent(ctx, event)
payload, ok := event.Data.(WorkflowRegistryWorkflowActivatedV1)
if !ok {
return fmt.Errorf("invalid data type %T for event", event.Data)
}

wfID := hex.EncodeToString(payload.WorkflowID[:])

cma := h.emitter.With(
platform.KeyWorkflowID, wfID,
platform.KeyWorkflowName, payload.WorkflowName,
platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner),
)
if err := h.workflowActivatedEvent(ctx, payload); err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow activated event: %v", err), h.lggr)
return err
}

return nil
case WorkflowDeletedEvent:
payload, ok := event.Data.(WorkflowRegistryWorkflowDeletedV1)
if !ok {
return fmt.Errorf("invalid data type %T for event", event.Data)
}

wfID := hex.EncodeToString(payload.WorkflowID[:])

cma := h.emitter.With(
platform.KeyWorkflowID, wfID,
platform.KeyWorkflowName, payload.WorkflowName,
platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner),
)

if err := h.workflowDeletedEvent(ctx, payload); err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow deleted event: %v", err), h.lggr)
return err
}

return nil
default:
return fmt.Errorf("event type unsupported: %v", event.EventType)
}
Expand Down Expand Up @@ -284,28 +355,107 @@ func (h *eventHandler) workflowRegisteredEvent(
return nil
}

// workflowUpdatedEvent handles the WorkflowUpdatedEvent event type.
// workflowUpdatedEvent handles the WorkflowUpdatedEvent event type by first finding the
// current workflow engine, stopping it, and then starting a new workflow engine with the
// updated workflow spec.
func (h *eventHandler) workflowUpdatedEvent(
_ context.Context,
_ WorkflowRegistryEvent,
ctx context.Context,
payload WorkflowRegistryWorkflowUpdatedV1,
) error {
return ErrNotImplemented
// Remove the old workflow engine from the local registry if it exists
if err := h.tryEngineCleanup(hex.EncodeToString(payload.OldWorkflowID[:])); err != nil {
return err
}

registeredEvent := WorkflowRegistryWorkflowRegisteredV1{
WorkflowID: payload.NewWorkflowID,
WorkflowOwner: payload.WorkflowOwner,
DonID: payload.DonID,
Status: 0,
WorkflowName: payload.WorkflowName,
BinaryURL: payload.BinaryURL,
ConfigURL: payload.ConfigURL,
SecretsURL: payload.SecretsURL,
}

return h.workflowRegisteredEvent(ctx, registeredEvent)
}

// workflowPausedEvent handles the WorkflowPausedEvent event type.
func (h *eventHandler) workflowPausedEvent(
_ context.Context,
_ WorkflowRegistryEvent,
ctx context.Context,
payload WorkflowRegistryWorkflowPausedV1,
) error {
return ErrNotImplemented
// Remove the workflow engine from the local registry if it exists
if err := h.tryEngineCleanup(hex.EncodeToString(payload.WorkflowID[:])); err != nil {
return err
}

// get existing workflow spec from DB
spec, err := h.orm.GetWorkflowSpec(ctx, hex.EncodeToString(payload.WorkflowOwner), payload.WorkflowName)
if err != nil {
return fmt.Errorf("failed to get workflow spec: %w", err)
}

// update the status of the workflow spec
spec.Status = job.WorkflowSpecStatusPaused
if _, err := h.orm.UpsertWorkflowSpec(ctx, spec); err != nil {
return fmt.Errorf("failed to update workflow spec: %w", err)
}

return nil
}

// workflowActivatedEvent handles the WorkflowActivatedEvent event type.
func (h *eventHandler) workflowActivatedEvent(
_ context.Context,
_ WorkflowRegistryEvent,
ctx context.Context,
payload WorkflowRegistryWorkflowActivatedV1,
) error {
// fetch the workflow spec from the DB
spec, err := h.orm.GetWorkflowSpec(ctx, hex.EncodeToString(payload.WorkflowOwner), payload.WorkflowName)
if err != nil {
return fmt.Errorf("failed to get workflow spec: %w", err)
}

// Do nothing if the workflow is already active
if spec.Status == job.WorkflowSpecStatusActive && h.engineRegistry.IsRunning(hex.EncodeToString(payload.WorkflowID[:])) {
return nil
}

// get the secrets url by the secrets id
secretsURL, err := h.orm.GetSecretsURLByID(ctx, spec.SecretsID.Int64)
if err != nil {
return fmt.Errorf("failed to get secrets URL by ID: %w", err)
}

// start a new workflow engine
registeredEvent := WorkflowRegistryWorkflowRegisteredV1{
WorkflowID: payload.WorkflowID,
WorkflowOwner: payload.WorkflowOwner,
DonID: payload.DonID,
Status: 0,
WorkflowName: payload.WorkflowName,
BinaryURL: spec.BinaryURL,
ConfigURL: spec.ConfigURL,
SecretsURL: secretsURL,
}

return h.workflowRegisteredEvent(ctx, registeredEvent)
}

// workflowDeletedEvent handles the WorkflowDeletedEvent event type.
func (h *eventHandler) workflowDeletedEvent(
ctx context.Context,
payload WorkflowRegistryWorkflowDeletedV1,
) error {
return ErrNotImplemented
if err := h.tryEngineCleanup(hex.EncodeToString(payload.WorkflowID[:])); err != nil {
return err
}

if err := h.orm.DeleteWorkflowSpec(ctx, hex.EncodeToString(payload.WorkflowOwner), payload.WorkflowName); err != nil {
return fmt.Errorf("failed to delete workflow spec: %w", err)
}
return nil
}

// forceUpdateSecretsEvent handles the ForceUpdateSecretsEvent event type.
Expand Down Expand Up @@ -335,6 +485,24 @@ func (h *eventHandler) forceUpdateSecretsEvent(
return nil
}

// tryEngineCleanup attempts to stop the workflow engine for the given workflow ID. Does nothing if the
// workflow engine is not running.
func (h *eventHandler) tryEngineCleanup(wfID string) error {
if h.engineRegistry.IsRunning(wfID) {
// Remove the engine from the registry
e, err := h.engineRegistry.Pop(wfID)
if err != nil {
return fmt.Errorf("failed to get workflow engine: %w", err)
}

// Stop the engine
if err := e.Close(); err != nil {
return fmt.Errorf("failed to close workflow engine: %w", err)
}
}
return nil
}

// workflowID returns a hex encoded sha256 hash of the wasm, config and secretsURL.
func workflowID(wasm, config, secretsURL []byte) string {
sum := sha256.New()
Expand Down
Loading
Loading